# Previsão de consumo de combustível

In [1]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

Versão da Linguagem Python Usada Neste Jupyter Notebook: 3.9.12


In [2]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [3]:
# Imports
import numpy as np
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Henrique Bardella" --iversions

Author: Henrique Bardella

findspark: 2.0.1
pyspark  : 3.3.0
numpy    : 1.21.5



## Carregando os dados

In [5]:
# Criando o spark context
sc = SparkContext(appName = "Lab5")

In [21]:
sc

In [6]:
sc.setLogLevel("ERROR")

In [7]:
# Criando o Spark Session
spSession = SparkSession.builder.master("local").getOrCreate()

In [20]:
spSession

In [11]:
# Carregando os dados e gerando um RDD
dados = sc.textFile("dados/dataset1.csv")

In [12]:
type(dados)

pyspark.rdd.RDD

In [13]:
dados.take(5)

['consumo,numero_cilindros,capacidade,horsepower,peso,aceleracao,ano,nome',
 '30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200']

In [14]:
# Removendo o cabeçalho
dados2 = dados.filter(lambda x: "horsepower" not in x)
dados2.count()

398

In [15]:
dados2.take(5)

['30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200',
 '27,4,97,60,1834,19,71,volkswagen model 111']

## Limpeza dos dados

#### Vamos verificar se há valores ausentes. RDDs são ótimos para processamento, mas ruins para exploração, então converteremos o RDD para DataFrame Spark e então para DataFrame Pandas.


In [24]:
# Converte RDD para DataFrame Spark
df_spark = dados2.map(lambda x: str(x)).map(lambda w: w.split(',')).toDF()

In [25]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [26]:
# Converte DataFrame Spark para DataFrame Pandas
df_pandas = df_spark.toPandas()

In [27]:
type(df_pandas)

pandas.core.frame.DataFrame

In [28]:
df_pandas.head()

Unnamed: 0,_1,_2,_3,_4,_5,_6,_7,_8
0,30,4,79,70,2074,19.5,71,peugeot 304
1,30,4,88,76,2065,14.5,71,fiat 124b
2,31,4,71,65,1773,19.0,71,toyota corolla 1200
3,35,4,72,69,1613,18.0,71,datsun 1200
4,27,4,97,60,1834,19.0,71,volkswagen model 111


In [29]:
# Tem valores nulos?
df_pandas.isnull().values.any()

False

In [30]:
# Verifica se há alguma linha com caracter especial "?"
total = np.sum(df_pandas.apply(lambda x: x.str.contains('\?')).values)
total

6

In [31]:
# Vejamos quais linhas e colunas têm o caracter especial
np.where(df_pandas.apply(lambda x: x.str.contains('\?')).values)

(array([ 48, 126, 330, 336, 354, 374], dtype=int64),
 array([3, 3, 3, 3, 3, 3], dtype=int64))

In [32]:
# Visualizando uma linha
df_pandas.iloc[330]

_1                    40.9
_2                       4
_3                      85
_4                       ?
_5                    1835
_6                    17.3
_7                      80
_8    renault lecar deluxe
Name: 330, dtype: object

In [33]:
# Usando um valor padrão para average HP (que será usado para preencher os valores ausentes)
mediaHP = sc.broadcast(75.0)

In [34]:
# Função para limpeza dos dados
def limpaDados(inputStr) :
    
    # Variável global
    global mediaHP
    
    # Lista de atributos
    attList = inputStr.split(",")
    
    # Substitui o caracter ? por um valor na coluna de índice 3
    hpValue = attList[3]
    if hpValue == "?":
        hpValue = mediaHP.value
       
    # Cria uma linha usando a função Row, limpando e convertendo os dados de string para float
    linhas = Row(consumo = float(attList[0]), 
                 numero_cilindros = float(attList[1]), 
                 capacidade = float(attList[2]), 
                 hosrsepower = float(hpValue), 
                 peso = float(attList[4]), 
                 aceleracao = float(attList[5]), 
                 ano = float(attList[6]), 
                 nome = attList[7]) 
    
    return linhas

In [35]:
# Executa a função no RDD
dados3 = dados2.map(limpaDados)
dados3.cache()
dados3.take(5)

[Row(consumo=30.0, numero_cilindros=4.0, capacidade=79.0, hosrsepower=70.0, peso=2074.0, aceleracao=19.5, ano=71.0, nome='peugeot 304'),
 Row(consumo=30.0, numero_cilindros=4.0, capacidade=88.0, hosrsepower=76.0, peso=2065.0, aceleracao=14.5, ano=71.0, nome='fiat 124b'),
 Row(consumo=31.0, numero_cilindros=4.0, capacidade=71.0, hosrsepower=65.0, peso=1773.0, aceleracao=19.0, ano=71.0, nome='toyota corolla 1200'),
 Row(consumo=35.0, numero_cilindros=4.0, capacidade=72.0, hosrsepower=69.0, peso=1613.0, aceleracao=18.0, ano=71.0, nome='datsun 1200'),
 Row(consumo=27.0, numero_cilindros=4.0, capacidade=97.0, hosrsepower=60.0, peso=1834.0, aceleracao=19.0, ano=71.0, nome='volkswagen model 111')]

## Análise exploratória dos dados 

In [36]:
# Cria um df
df_carros = spSession.createDataFrame(dados3)

In [37]:
# Estatística descritiva de duas variáveis (como exemplo)
df_carros.select("consumo", "numero_cilindros").describe().show()

+-------+-----------------+-----------------+
|summary|          consumo| numero_cilindros|
+-------+-----------------+-----------------+
|  count|              398|              398|
|   mean|23.51457286432161|5.454773869346734|
| stddev|7.815984312565782|1.701004244533212|
|    min|              9.0|              3.0|
|    max|             46.6|              8.0|
+-------+-----------------+-----------------+



In [38]:
# Encontrando a correlação entre a variável target com as variáveis preditoras (exceto o nome)
for i in df_carros.columns:
    if not(isinstance(df_carros.select(i).take(1)[0][0], str)) :
        print("Correlação da Variável Target com:", i, df_carros.stat.corr('consumo', i))

Correlação da Variável Target com: consumo 1.0
Correlação da Variável Target com: numero_cilindros -0.7753962854205546
Correlação da Variável Target com: capacidade -0.8042028248058979
Correlação da Variável Target com: hosrsepower -0.7747041523498721
Correlação da Variável Target com: peso -0.8317409332443347
Correlação da Variável Target com: aceleracao 0.42028891210164976
Correlação da Variável Target com: ano 0.5792671330833099


## Pré-processamento dos dados

In [39]:
# Convertendo para um LabeledPoint (target, Vector[features])
# Remove colunas não relevantes ou com baixa correlação
def transformaVar(row) :
    obj = (row["consumo"], Vectors.dense([row["peso"], row["capacidade"], row["numero_cilindros"]])) # Tupla label, vetor attrs
    return obj

In [40]:
# Aplica a função no RDD e cria outro RDD
dados4 = dados3.map(transformaVar)

In [41]:
# Visualiza
  # Variável alvo, vetor denso
    
dados4.take(5)

[(30.0, DenseVector([2074.0, 79.0, 4.0])),
 (30.0, DenseVector([2065.0, 88.0, 4.0])),
 (31.0, DenseVector([1773.0, 71.0, 4.0])),
 (35.0, DenseVector([1613.0, 72.0, 4.0])),
 (27.0, DenseVector([1834.0, 97.0, 4.0]))]

In [47]:
type(dados4)

pyspark.rdd.PipelinedRDD

In [42]:
# Converte o RDD para DataFrame do Spark
df_carros = spSession.createDataFrame(dados4, ["label", "features"])

In [43]:
# Visualiza label (y) e atributos (x)
df_carros.select("label","features").show(10)

+-----+------------------+
|label|          features|
+-----+------------------+
| 30.0| [2074.0,79.0,4.0]|
| 30.0| [2065.0,88.0,4.0]|
| 31.0| [1773.0,71.0,4.0]|
| 35.0| [1613.0,72.0,4.0]|
| 27.0| [1834.0,97.0,4.0]|
| 26.0| [1955.0,91.0,4.0]|
| 24.0|[2278.0,113.0,4.0]|
| 25.0| [2126.0,97.5,4.0]|
| 23.0| [2254.0,97.0,4.0]|
| 20.0|[2408.0,140.0,4.0]|
+-----+------------------+
only showing top 10 rows



In [44]:
# Divisão em dados de Treino e de Teste com split 70/30
(dados_treino, dados_teste) = df_carros.randomSplit([0.7, 0.3])

In [45]:
dados_treino.count()

280

In [46]:
dados_teste.count()

118

## Machine learning

### Regressão Linear

In [48]:
# Cria o objeto
linearReg = LinearRegression()

In [49]:
type(linearReg)

pyspark.ml.regression.LinearRegression

In [50]:
# Treino o objeto com os dados e cria o modelo
modelo = linearReg.fit(dados_treino)

In [51]:
print(modelo)

LinearRegressionModel: uid=LinearRegression_1df8f6f83026, numFeatures=3


In [52]:
# Imprimindo os coeficientes (o que o modelo aprendeu)
print("Coeficientes: " + str(modelo.coefficients))
print("Intercepto: " + str(modelo.intercept))

Coeficientes: [-0.005294376367931427,-0.01382839777740593,-0.4297792292596532]
Intercepto: 44.14210796650924


In [53]:
# Previsões com dados de teste
predictions = modelo.transform(dados_teste)

In [54]:
# Visualizando as previsões
predictions.select("features", "prediction").show()

+------------------+------------------+
|          features|        prediction|
+------------------+------------------+
|[4732.0,304.0,8.0]|  11.4470522350491|
|[4615.0,360.0,8.0]|11.292103994562346|
|[4382.0,318.0,8.0]| 13.10648639494142|
|[4997.0,400.0,8.0]| 8.716516310916305|
|[4456.0,350.0,8.0]|12.272193814837507|
|[4951.0,455.0,8.0]| 8.199495746083826|
|[4955.0,383.0,8.0]| 9.173962880585329|
|[3169.0,302.0,8.0]|19.749819293680737|
|[4100.0,350.0,8.0]|14.156991801821093|
|[4294.0,302.0,8.0]|13.793645879757882|
|[4464.0,400.0,8.0]|11.538418915023755|
|[4746.0,400.0,8.0]|10.045404779267095|
|[3086.0,455.0,8.0]|18.073507672275937|
|[4096.0,318.0,8.0]|14.620678036169807|
|[4154.0,351.0,8.0]| 13.85726708017539|
|[4257.0,304.0,8.0]| 13.96188100981653|
|[4425.0,455.0,8.0]|10.984337715615759|
|[3399.0,318.0,8.0]|18.310858364618014|
|[3632.0,258.0,6.0]|18.766530996053653|
|[4141.0,302.0,8.0]|14.603685464051388|
+------------------+------------------+
only showing top 20 rows

