
## <font color='blue'>Machine Learning com PySpark</font>

Usaremos Regressão Linear para prever o consumo de combustível de automóveis.

A variável **consumo** no dataset1.csv será a variável target (dependente) e as demais variáveis serão candidatas a features (variáveis preditoras ou independentes). 

Será um problema de Regressão Linear Múltipla (quando temos mais de 1 variável preditora ou independente).

In [1]:
# Versão da Linguagem Python
from platform import python_version
print('Versão Python Usada Neste Jupyter Notebook:', python_version())

Versão Python Usada Neste Jupyter Notebook: 3.9.12


In [2]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [3]:
# Imports
import numpy as np
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Henrique Bardella" --iversions

Author: Henrique Bardella

numpy    : 1.21.6
findspark: 2.0.1
pyspark  : 3.3.0



In [5]:
# Carregando os Dados

In [6]:
# Criando o Spark Context
sc = SparkContext(appName = 'Lab5-Pyspark-MLlib-Consumo_Combustivel')

In [8]:
sc.setLogLevel("ERROR")

In [9]:
# Criando o spark session
spSession = SparkSession.builder.master('local').getOrCreate()

In [10]:
# Carregfando o dataset e gerando um RDD
dados = sc.textFile("dados/dataset1.csv")

In [11]:
type(dados)

pyspark.rdd.RDD

In [12]:
# Colocando o RDD em cache. Esse processo otimiza a performance
dados.cache()

dados/dataset1.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [13]:
# Número de registros
dados.count()

399

In [14]:
# Visualizando as primeiras linhas
dados.take(5)

['consumo,numero_cilindros,capacidade,horsepower,peso,aceleracao,ano,nome',
 '30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200']

In [15]:
# Removendo a primeira linha do arquivo
dados2 = dados.filter(lambda x: "horsepower" not in x)
dados2.count()

398

In [16]:
# Limpeza de Dados

In [17]:
# Converte um RDD para dataframe spark
df_spark = dados2.map(lambda x: str(x)).map(lambda w: w.split(',')).toDF()

In [18]:
# Converte dataframe spark para dataframe pandas
df_pandas = df_spark.toPandas()

In [19]:
df_pandas.head()

Unnamed: 0,_1,_2,_3,_4,_5,_6,_7,_8
0,30,4,79,70,2074,19.5,71,peugeot 304
1,30,4,88,76,2065,14.5,71,fiat 124b
2,31,4,71,65,1773,19.0,71,toyota corolla 1200
3,35,4,72,69,1613,18.0,71,datsun 1200
4,27,4,97,60,1834,19.0,71,volkswagen model 111


In [20]:
# verificacao de valores nulos
df_pandas.isnull().values.any()

False

In [22]:
# verificacao valores ausentes ('?')
total = np.sum(df_pandas.apply(lambda x: x.str.contains('\?')).values)
total

6

In [23]:
# verificacao quals linhas e colunas têm caracter especial
np.where(df_pandas.apply(lambda x: x.str.contains('\?')).values)

(array([ 48, 126, 330, 336, 354, 374], dtype=int64),
 array([3, 3, 3, 3, 3, 3], dtype=int64))

In [28]:
df_pandas.iloc[48]

_1            25
_2             4
_3            98
_4             ?
_5          2046
_6            19
_7            71
_8    ford pinto
Name: 48, dtype: object

In [38]:
df_pandas.describe()

Unnamed: 0,_1,_2,_3,_4,_5,_6,_7,_8
count,398,398,398,398,398,398.0,398,398
unique,129,5,82,94,351,95.0,13,305
top,13,4,97,150,1985,14.5,73,ford pinto
freq,20,204,21,22,4,23.0,40,6


In [31]:
# Usando o valor average HP para preencher os valores ausente
# variável broadcast é uma variavel global no spark
mediaHP = sc.broadcast(75.0)

In [39]:
# Função para limpeza dos dados
def limpaDados(inputStr) :
    
    # Variável global
    global mediaHP
    
    # Lista de atributos
    attList = inputStr.split(",")
    
    # Substitui o caracter ? por um valor na coluna de índice 3
    hpValue = attList[3]
    if hpValue == "?":
        hpValue = mediaHP.value
       
    # Cria uma linha usando a função Row, limpando e convertendo os dados de string para float
    linhas = Row(consumo = float(attList[0]), 
                 numero_cilindros = float(attList[1]), 
                 capacidade = float(attList[2]), 
                 hosrsepower = float(hpValue), 
                 peso = float(attList[4]), 
                 aceleracao = float(attList[5]), 
                 ano = float(attList[6]), 
                 nome = attList[7]) 
    return linhas

In [40]:
# Executa a função no RDD
dados3 = dados2.map(limpaDados)
dados3.cache()
dados3.take(5)

[Row(consumo=30.0, numero_cilindros=4.0, capacidade=79.0, hosrsepower=70.0, peso=2074.0, aceleracao=19.5, ano=71.0, nome='peugeot 304'),
 Row(consumo=30.0, numero_cilindros=4.0, capacidade=88.0, hosrsepower=76.0, peso=2065.0, aceleracao=14.5, ano=71.0, nome='fiat 124b'),
 Row(consumo=31.0, numero_cilindros=4.0, capacidade=71.0, hosrsepower=65.0, peso=1773.0, aceleracao=19.0, ano=71.0, nome='toyota corolla 1200'),
 Row(consumo=35.0, numero_cilindros=4.0, capacidade=72.0, hosrsepower=69.0, peso=1613.0, aceleracao=18.0, ano=71.0, nome='datsun 1200'),
 Row(consumo=27.0, numero_cilindros=4.0, capacidade=97.0, hosrsepower=60.0, peso=1834.0, aceleracao=19.0, ano=71.0, nome='volkswagen model 111')]

In [41]:
# Análise Exploratória de dados

In [42]:
# cria um df
df_carros = spSession.createDataFrame(dados3)

In [43]:
# Estatísticas descritivas de duas variáveis (como exemplo)
df_carros.select("consumo", "numero_cilindros").describe().show()

+-------+-----------------+-----------------+
|summary|          consumo| numero_cilindros|
+-------+-----------------+-----------------+
|  count|              398|              398|
|   mean|23.51457286432161|5.454773869346734|
| stddev|7.815984312565782|1.701004244533212|
|    min|              9.0|              3.0|
|    max|             46.6|              8.0|
+-------+-----------------+-----------------+



In [44]:
# Encontrando a correlação entre a variável target com as variáveis preditoras (exceto o nome)
for i in df_carros.columns:
    if not(isinstance(df_carros.select(i).take(1)[0][0], str)) :
        print("Correlação da Variável Target com:", i, df_carros.stat.corr('consumo', i))

Correlação da Variável Target com: consumo 1.0
Correlação da Variável Target com: numero_cilindros -0.7753962854205546
Correlação da Variável Target com: capacidade -0.8042028248058979
Correlação da Variável Target com: hosrsepower -0.7747041523498721
Correlação da Variável Target com: peso -0.8317409332443347
Correlação da Variável Target com: aceleracao 0.42028891210164976
Correlação da Variável Target com: ano 0.5792671330833098


# Pré-processamento

In [46]:
# Convertendo para um LabeledPoint (target, Vector[features])
# Remove colunas não relevantes ou com baixa correlação
def transformaVar(row) :
    obj = (row["consumo"], Vectors.dense([row["peso"], row["capacidade"], row["numero_cilindros"]]))
    return obj

In [47]:
# Aplica a função no RDD e cria outro RDD
dados4 = dados3.map(transformaVar)

In [48]:
# Visualiza
dados4.take(5)

[(30.0, DenseVector([2074.0, 79.0, 4.0])),
 (30.0, DenseVector([2065.0, 88.0, 4.0])),
 (31.0, DenseVector([1773.0, 71.0, 4.0])),
 (35.0, DenseVector([1613.0, 72.0, 4.0])),
 (27.0, DenseVector([1834.0, 97.0, 4.0]))]

In [49]:
# Converte o RDD para DataFrame do Spark
df_carros = spSession.createDataFrame(dados4, ["label", "features"])

In [50]:
# Visualiza label (y) e atributos (x)
df_carros.select("label","features").show(10)

+-----+------------------+
|label|          features|
+-----+------------------+
| 30.0| [2074.0,79.0,4.0]|
| 30.0| [2065.0,88.0,4.0]|
| 31.0| [1773.0,71.0,4.0]|
| 35.0| [1613.0,72.0,4.0]|
| 27.0| [1834.0,97.0,4.0]|
| 26.0| [1955.0,91.0,4.0]|
| 24.0|[2278.0,113.0,4.0]|
| 25.0| [2126.0,97.5,4.0]|
| 23.0| [2254.0,97.0,4.0]|
| 20.0|[2408.0,140.0,4.0]|
+-----+------------------+
only showing top 10 rows



In [51]:
# Divisão em dados de Treino e de Teste com split 70/30
(dados_treino, dados_teste) = df_carros.randomSplit([0.7, 0.3])

In [52]:
dados_treino.count()

279

In [53]:
dados_teste.count()

119

In [54]:
# Machine Learning


In [55]:
# cria o objeto
linearReg = LinearRegression()

In [56]:
# Treina o objeto com dados e cria o modelo
modelo = linearReg.fit(dados_treino)

In [57]:
print(modelo)

LinearRegressionModel: uid=LinearRegression_8ea789cee5d5, numFeatures=3


In [58]:
# Imprimindo os coeficientes (o que o modelo aprendeu)
print("Coeficientes: " + str(modelo.coefficients))
print("Intercepto: " + str(modelo.intercept))

Coeficientes: [-0.005162466506450172,-0.018710149458404123,-0.24096296646809603]
Intercepto: 43.62162228646625


In [59]:
# Previsões com dados de teste
predictions = modelo.transform(dados_teste)

In [60]:
# Visualiza as previsões
predictions.select("features", "prediction").show()

+------------------+------------------+
|          features|        prediction|
+------------------+------------------+
|[3664.0,350.0,8.0]|16.230088964646608|
|[4382.0,318.0,8.0]|13.122162795684318|
|[4906.0,400.0,8.0]| 8.882798090715283|
|[4100.0,350.0,8.0]|13.979253567834334|
|[4502.0,350.0,8.0]|11.903942032241364|
|[4654.0,360.0,8.0]|10.932145628676892|
|[4699.0,350.0,8.0]| 10.88693613047068|
|[4735.0,440.0,8.0]|   9.0171738849821|
|[4746.0,400.0,8.0]| 9.708792731747316|
|[4077.0,318.0,8.0]| 14.69671508015162|
|[4129.0,351.0,8.0]|13.810831889688874|
|[4154.0,351.0,8.0]| 13.68177022702762|
|[4209.0,350.0,8.0]|13.416544718631265|
|[4257.0,304.0,8.0]|14.029413201408246|
|[4354.0,454.0,8.0]| 10.72213153152196|
|[4425.0,455.0,8.0]|10.336886260105594|
|[4457.0,318.0,8.0]|12.734977807700556|
|[4638.0,302.0,8.0]|12.099933761367542|
|[4215.0,351.0,8.0]|13.366859770134159|
|[3693.0,350.0,8.0]| 16.08037743595955|
+------------------+------------------+
only showing top 20 rows



In [61]:
# Coeficiente de determinação R2
avaliador = RegressionEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "r2")

In [62]:
# Resultado
avaliador.evaluate(predictions) 

0.6710647104158914