## <font color='blue'>Machine Learning com PySpark</font>

## <font color='blue'>Regressão Linear</font>

Usaremos Regressão Linear para prever o consumo de combustível de automóveis.

A variável **consumo** no dataset1.csv será a variável target (dependente) e as demais variáveis serão candidatas a features (variáveis preditoras ou independentes). 

Este é, portanto, será um problema de Regressão Linear Múltipla (quando temos mais de 1 variável preditora ou independente).

# 0.0 IMPORTS

In [1]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [2]:
# Imports
import numpy as np # manipulação de dados

import pyspark #algoritmo
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors 
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

## 0.1 Help Functions

In [3]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

Versão da Linguagem Python Usada Neste Jupyter Notebook: 3.9.7


In [4]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Camila D'Angelo" --iversions

Author: Camila D'Angelo

numpy    : 1.20.3
pyspark  : 3.3.0
findspark: 2.0.1



## Loading data

In [5]:
# Criando o Spark Context
sc = SparkContext(appName = "Lab5")

In [6]:
sc.setLogLevel("ERROR")

In [7]:
# Criando o Spark Session
spSession = SparkSession.builder.master("local").getOrCreate()

In [8]:
# Carregando os dados e gerando um RDD
dados = sc.textFile("dados/dataset1.csv")

In [9]:
type(dados)

pyspark.rdd.RDD

In [10]:
# Colocando o RDD em cache. Esse processo otimiza a performance
dados.cache()

dados/dataset1.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [11]:
# Número de registros
dados.count()

399

In [12]:
# Visualizando as primeiras linhas
dados.take(5)

['consumo,numero_cilindros,capacidade,horsepower,peso,aceleracao,ano,nome',
 '30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200']

In [13]:
# Removendo a primeira linha do arquivo (cabeçalho)
dados2 = dados.filter(lambda x: "horsepower" not in x)
dados2.count()

398

In [14]:
# Visualizando as primeiras linhas
dados2.take(5)

['30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200',
 '27,4,97,60,1834,19,71,volkswagen model 111']

## Limpeza dos Dados

Vamos verificar se há valores ausentes. 

**Nota**: RDDs são ótimos para processamento, mas ruins para exploração, então converteremos o RDD para DataFrame Spark e então para DataFrame Pandas.

In [15]:
# Converte RDD para DataFrame Spark: 1º
df_spark = dados2.map(lambda x: str(x)).map(lambda w: w.split(',')).toDF()

In [16]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [17]:
# Converte DataFrame Spark para DataFrame Pandas: 2º
df_pandas = df_spark.toPandas()

In [18]:
type(df_pandas)

pandas.core.frame.DataFrame

In [19]:
df_pandas.head()

Unnamed: 0,_1,_2,_3,_4,_5,_6,_7,_8
0,30,4,79,70,2074,19.5,71,peugeot 304
1,30,4,88,76,2065,14.5,71,fiat 124b
2,31,4,71,65,1773,19.0,71,toyota corolla 1200
3,35,4,72,69,1613,18.0,71,datsun 1200
4,27,4,97,60,1834,19.0,71,volkswagen model 111


In [20]:
# Tem valores nulos?
df_pandas.isnull().values.any()

False

> Não há valor nulo, mas será que há valor ausente (falta de informação)?

In [21]:
# Verifica se há alguma linha com caracter especial "?"
total = np.sum(df_pandas.apply(lambda x: x.str.contains('\?')).values)
total

6

In [22]:
# Vejamos quais linhas e colunas têm o caracter especial
# linha
# coluna
np.where(df_pandas.apply(lambda x: x.str.contains('\?')).values)

(array([ 48, 126, 330, 336, 354, 374], dtype=int64),
 array([3, 3, 3, 3, 3, 3], dtype=int64))

In [23]:
# Visualizando uma linha
df_pandas.iloc[330]

_1                    40.9
_2                       4
_3                      85
_4                       ?
_5                    1835
_6                    17.3
_7                      80
_8    renault lecar deluxe
Name: 330, dtype: object

In [24]:
# Usando um valor padrão para average HP (que será usado para preencher os valores ausentes)
mediaHP = sc.broadcast(75.0)

In [25]:
# Função para limpeza dos dados
def limpaDados(inputStr) :
    
    # Variável global
    global mediaHP
    
    # Lista de atributos
    attList = inputStr.split(",")
    
    # Substitui o caracter ? por um valor na coluna de índice 3
    hpValue = attList[3]
    if hpValue == "?":
        hpValue = mediaHP.value
       
    # Cria uma linha usando a função Row, limpando e convertendo os dados de string para float
    linhas = Row(consumo = float(attList[0]), 
                 numero_cilindros = float(attList[1]), 
                 capacidade = float(attList[2]), 
                 hosrsepower = float(hpValue), 
                 peso = float(attList[4]), 
                 aceleracao = float(attList[5]), 
                 ano = float(attList[6]), 
                 nome = attList[7]) 
    return linhas

In [26]:
# Executa a função no RDD
dados3 = dados2.map(limpaDados)
dados3.cache()
dados3.take(5)

[Row(consumo=30.0, numero_cilindros=4.0, capacidade=79.0, hosrsepower=70.0, peso=2074.0, aceleracao=19.5, ano=71.0, nome='peugeot 304'),
 Row(consumo=30.0, numero_cilindros=4.0, capacidade=88.0, hosrsepower=76.0, peso=2065.0, aceleracao=14.5, ano=71.0, nome='fiat 124b'),
 Row(consumo=31.0, numero_cilindros=4.0, capacidade=71.0, hosrsepower=65.0, peso=1773.0, aceleracao=19.0, ano=71.0, nome='toyota corolla 1200'),
 Row(consumo=35.0, numero_cilindros=4.0, capacidade=72.0, hosrsepower=69.0, peso=1613.0, aceleracao=18.0, ano=71.0, nome='datsun 1200'),
 Row(consumo=27.0, numero_cilindros=4.0, capacidade=97.0, hosrsepower=60.0, peso=1834.0, aceleracao=19.0, ano=71.0, nome='volkswagen model 111')]

## Análise Exploratória de Dados

In [27]:
# Cria um Dataframe
df_carros = spSession.createDataFrame(dados3)

In [28]:
# Estatísticas descritivas de duas variáveis (como exemplo)
df_carros.select("consumo", "numero_cilindros").describe().show()

+-------+-----------------+-----------------+
|summary|          consumo| numero_cilindros|
+-------+-----------------+-----------------+
|  count|              398|              398|
|   mean|23.51457286432161|5.454773869346734|
| stddev|7.815984312565782|1.701004244533212|
|    min|              9.0|              3.0|
|    max|             46.6|              8.0|
+-------+-----------------+-----------------+



In [29]:
# Encontrando a correlação entre a variável target com as variáveis preditoras (exceto o nome)
for i in df_carros.columns:
    if not(isinstance(df_carros.select(i).take(1)[0][0], str)) :
        print("Correlação da Variável Target com:", i, df_carros.stat.corr('consumo', i))

Correlação da Variável Target com: consumo 1.0
Correlação da Variável Target com: numero_cilindros -0.7753962854205546
Correlação da Variável Target com: capacidade -0.8042028248058979
Correlação da Variável Target com: hosrsepower -0.7747041523498721
Correlação da Variável Target com: peso -0.8317409332443347
Correlação da Variável Target com: aceleracao 0.42028891210164976
Correlação da Variável Target com: ano 0.5792671330833099


## Pré-Processamento dos Dados

In [30]:
# Convertendo para um LabeledPoint (target, Vector[features])
# Remove colunas não relevantes ou com baixa correlação
def transformaVar(row) :
    obj = (row["consumo"], Vectors.dense([row["peso"], row["capacidade"], row["numero_cilindros"]]))
    return obj

In [31]:
# Aplica a função no RDD e cria outro RDD
dados4 = dados3.map(transformaVar)

In [32]:
# Visualiza
dados4.take(5)

[(30.0, DenseVector([2074.0, 79.0, 4.0])),
 (30.0, DenseVector([2065.0, 88.0, 4.0])),
 (31.0, DenseVector([1773.0, 71.0, 4.0])),
 (35.0, DenseVector([1613.0, 72.0, 4.0])),
 (27.0, DenseVector([1834.0, 97.0, 4.0]))]

In [33]:
# Converte o RDD para DataFrame do Spark
df_carros = spSession.createDataFrame(dados4, ["label", "features"])

In [34]:
# Visualiza label (y) e atributos (x)
# Label: target
# Features: preditoras
df_carros.select("label","features").show(10)

+-----+------------------+
|label|          features|
+-----+------------------+
| 30.0| [2074.0,79.0,4.0]|
| 30.0| [2065.0,88.0,4.0]|
| 31.0| [1773.0,71.0,4.0]|
| 35.0| [1613.0,72.0,4.0]|
| 27.0| [1834.0,97.0,4.0]|
| 26.0| [1955.0,91.0,4.0]|
| 24.0|[2278.0,113.0,4.0]|
| 25.0| [2126.0,97.5,4.0]|
| 23.0| [2254.0,97.0,4.0]|
| 20.0|[2408.0,140.0,4.0]|
+-----+------------------+
only showing top 10 rows



> Dados de **Treino e Teste**

In [35]:
# Divisão em dados de Treino e de Teste com split 70/30
(dados_treino, dados_teste) = df_carros.randomSplit([0.7, 0.3])

In [36]:
dados_treino.count()

274

In [37]:
dados_teste.count()

124

## Machine Learning

In [38]:
# Cria o objeto
linearReg = LinearRegression()

In [39]:
# Treina o objeto com dados e cria o modelo
modelo = linearReg.fit(dados_treino)

In [40]:
print(modelo)

LinearRegressionModel: uid=LinearRegression_d63d6fd0629e, numFeatures=3


In [41]:
# Imprimindo os coeficientes (o que o modelo aprendeu)
print("Coeficientes: " + str(modelo.coefficients))
print("Intercepto: " + str(modelo.intercept))

Coeficientes: [-0.006409661769255323,-0.006556894624644381,-0.2761909233422483]
Intercepto: 45.51760263246458


In [42]:
# Previsões com dados de teste
predictions = modelo.transform(dados_teste)

In [43]:
# Visualiza as previsões
predictions.select("features", "prediction").show()

+------------------+------------------+
|          features|        prediction|
+------------------+------------------+
|[4615.0,360.0,8.0]|11.367004115741302|
|[3664.0,350.0,8.0]|17.528161404549557|
|[4952.0,429.0,8.0]| 8.754522370401787|
|[3169.0,302.0,8.0]|21.015674922313874|
|[4464.0,400.0,8.0]|12.072587257913078|
|[4699.0,350.0,8.0]|10.894161473370296|
|[5140.0,400.0,8.0]| 7.739655901896477|
|[3672.0,304.0,8.0]|17.778501263129154|
|[4042.0,302.0,8.0]|15.420040197753977|
|[4077.0,318.0,8.0]|15.090791721835728|
|[4129.0,351.0,8.0]|14.541111787221187|
|[4457.0,318.0,8.0]|12.655120249518703|
|[4657.0,351.0,8.0]|11.156810373054377|
|[3158.0,250.0,6.0]| 21.97952156894168|
|[3336.0,250.0,6.0]|20.838601774014233|
|[3432.0,250.0,6.0]|20.223274244165722|
|[3693.0,350.0,8.0]|17.342281213241154|
|[3730.0,258.0,6.0]|18.260739879930483|
|[4082.0,350.0,8.0]|14.848922785000834|
|[3278.0,250.0,6.0]|21.210362156631042|
+------------------+------------------+
only showing top 20 rows



In [44]:
# Coeficiente de determinação R2
avaliador = RegressionEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "r2")

In [45]:
# Resultado
avaliador.evaluate(predictions) 

0.6658633721716773

In [47]:
print("Coeficiente de determinação (nivel de performance):", avaliador.evaluate(predictions) )

Coeficiente de determinação (nivel de performance): 0.6658633721716773
