## Spark-MLlib - Regressão Linear

#### Problema de Negócio
- Objetivo: Prever os valores de MPG (Miles Per Gallon)
    - MPG será a variável target e as demais variáveis serão as features (variáveis preditoras).

#### Dataset
 - Nome: Auto MPG Data Set
 - Fonte: https://archive.ics.uci.edu/ml/datasets/auto+mpg
 - Descrição: Os dados se referem ao consumo de combustível in galões por milha.
 - Atributos:
  - 1. mpg: continuous
  - 2. cylinders: multi-valued discrete
  - 3. displacement: continuous
  - 4. horsepower: continuous
  - 5. weight: continuous
  - 6. acceleration: continuous
  - 7. model year: multi-valued discrete
  - 8. origin: multi-valued discrete
  - 9. car name: string (unique for each instance)

#### Tecnologias utilizadas
- Modelo Preditivo: Regressão Linear
 - Método para avaliar o relacionamento entre variáveis.
 - Estima o valor de uma variável dependente a partir dos valores das variáveis independentes.
 - Métrica de Avaliação: Coeficiente de determinação R2 
- JDK 1.8
- Apache Spark 2.4.2

#### SparkSession e importação do dataset

In [3]:
# Bibliotecas
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [5]:
# Spark Session
spSession = SparkSession.builder.master("local").appName("SparkMLLib").getOrCreate()

In [6]:
# Carregando o dataset e gerando o RDD
carrosRDD = sc.textFile('data/carros.csv')

In [7]:
# Colocando o RDD em cache (otimizando a performance)
carrosRDD.cache()

data/carros.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [8]:
carrosRDD.count()

399

In [9]:
carrosRDD.take(3)

['MPG,CYLINDERS,DISPLACEMENT,HORSEPOWER,WEIGHT,ACCELERATION,MODELYEAR,NAME',
 '18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,buick skylark 320']

In [11]:
# Removendo cabeçalhos
carrosRDD2= carrosRDD.filter(lambda x: "DISPLACEMENT" not in x)
carrosRDD2.count()

398

#### Limpeza de Dados

In [12]:
# Usando um valor padrão para average HP em valores missing (broadcast)
mediaHP = sc.broadcast(75.0)

In [14]:
# Função para a limpeza de dados
def limpaDados(inputStr):
    global mediaHP
    attList = inputStr.split(',')
    
    #Tratando valores missing
    hpValue = attList[3]
    if hpValue == "?":
        hpValue = mediaHP.value
        
    #Cria uma linha usando a função Row, limpando e convertendo os dados de string para float
    linhas = Row(MPG = float(attList[0]), CYLINDERS = float(attList[1]), DISPLACEMENT = float(attList[2]), 
                 HORSEPOWER = float(hpValue), WEIGHT = float(attList[4]), ACCELERATION = float(attList[5]), 
                 MODELYEAR = float(attList[6]), NAME = attList[7]) 
    return linhas    

In [15]:
# Executa a função no RDD
carrosRDD3 = carrosRDD2.map(limpaDados)
carrosRDD3.cache()
carrosRDD3.take(3)

[Row(ACCELERATION=12.0, CYLINDERS=8.0, DISPLACEMENT=307.0, HORSEPOWER=130.0, MODELYEAR=70.0, MPG=18.0, NAME='chevrolet chevelle malibu', WEIGHT=3504.0),
 Row(ACCELERATION=11.5, CYLINDERS=8.0, DISPLACEMENT=350.0, HORSEPOWER=165.0, MODELYEAR=70.0, MPG=15.0, NAME='buick skylark 320', WEIGHT=3693.0),
 Row(ACCELERATION=11.0, CYLINDERS=8.0, DISPLACEMENT=318.0, HORSEPOWER=150.0, MODELYEAR=70.0, MPG=18.0, NAME='plymouth satellite', WEIGHT=3436.0)]

#### Análise exploratória

In [16]:
# Criando um dataframe (para utilizar spark SQL)
carrosDF = spSession.createDataFrame(carrosRDD3)

In [18]:
# Estatística descritiva
carrosDF.select("MPG","CYLINDERS").describe().show()

+-------+-----------------+------------------+
|summary|              MPG|         CYLINDERS|
+-------+-----------------+------------------+
|  count|              398|               398|
|   mean|23.51457286432161| 5.454773869346734|
| stddev|7.815984312565782|1.7010042445332125|
|    min|              9.0|               3.0|
|    max|             46.6|               8.0|
+-------+-----------------+------------------+



In [21]:
# Encontrando a correlação entre a variável target com as variáveis preditoras
for i in carrosDF.columns:
    if not(isinstance(carrosDF.select(i).take(1)[0][0], str)):
        print("Correlação da variável MPG com ", i, carrosDF.stat.corr('MPG', i))

Correlação da variável MPG com  ACCELERATION 0.4202889121016501
Correlação da variável MPG com  CYLINDERS -0.7753962854205548
Correlação da variável MPG com  DISPLACEMENT -0.8042028248058979
Correlação da variável MPG com  HORSEPOWER -0.774704152349872
Correlação da variável MPG com  MODELYEAR 0.5792671330833091
Correlação da variável MPG com  MPG 1.0
Correlação da variável MPG com  WEIGHT -0.8317409332443347


#### Pré-Processamento

In [22]:
# Convertendo para um LabelPoint (target, Vector[features]))
# Remove colunas não relevantes para o modelo ou com baixa correlação
def transformaVar(row) :
    obj = (row["MPG"], Vectors.dense([row["ACCELERATION"], row["DISPLACEMENT"], row["WEIGHT"]]))
    return obj

In [23]:
# Utiliza o RDD, aplica a função, converte para Dataframe e aplica a função select()
carrosRDD4 = carrosRDD3.map(transformaVar)
carrosDF = spSession.createDataFrame(carrosRDD4,["label", "features"])
carrosDF.select("label","features").show(10)

+-----+-------------------+
|label|           features|
+-----+-------------------+
| 18.0|[12.0,307.0,3504.0]|
| 15.0|[11.5,350.0,3693.0]|
| 18.0|[11.0,318.0,3436.0]|
| 16.0|[12.0,304.0,3433.0]|
| 17.0|[10.5,302.0,3449.0]|
| 15.0|[10.0,429.0,4341.0]|
| 14.0| [9.0,454.0,4354.0]|
| 14.0| [8.5,440.0,4312.0]|
| 14.0|[10.0,455.0,4425.0]|
| 15.0| [8.5,390.0,3850.0]|
+-----+-------------------+
only showing top 10 rows



In [25]:
carrosRDD4.take(5)

[(18.0, DenseVector([12.0, 307.0, 3504.0])),
 (15.0, DenseVector([11.5, 350.0, 3693.0])),
 (18.0, DenseVector([11.0, 318.0, 3436.0])),
 (16.0, DenseVector([12.0, 304.0, 3433.0])),
 (17.0, DenseVector([10.5, 302.0, 3449.0]))]

#### Machine Learning

In [26]:
# Split do dataset
(dados_treino, dados_teste) = carrosDF.randomSplit([0.7, 0.3])

In [27]:
dados_treino.count()

267

In [29]:
dados_teste.count()

131

In [30]:
# Treinamento e Criação do modelo
linearReg = LinearRegression(maxIter = 10)
modelo = linearReg.fit(dados_treino)

In [31]:
print(modelo)

LinearRegression_0efafd055b5e


In [32]:
type(modelo)

pyspark.ml.regression.LinearRegressionModel

In [33]:
# Imprimindo as métricas
print("Coeficientes: " + str(modelo.coefficients))
print("Intercepto: " + str(modelo.intercept))

Coeficientes: [0.1535883241143041,-0.009747961123586465,-0.006225482377566938]
Intercepto: 41.530257781102236


In [34]:
# Previsões com dados de teste
predictions = modelo.transform(dados_teste)
predictions.select("features", "prediction").show()

+-------------------+------------------+
|           features|        prediction|
+-------------------+------------------+
|[14.0,360.0,4615.0]|11.440627141739949|
|[11.5,429.0,4952.0]| 8.286059452686665|
|[12.5,350.0,4499.0]| 12.02988022260212|
|[12.5,400.0,4422.0]| 12.02184430949545|
|[13.0,350.0,3988.0]|15.287895879595975|
|[13.0,360.0,4654.0]|11.044245004900535|
|[14.0,307.0,4098.0]|15.175843470492136|
| [8.5,440.0,4312.0]|11.702375629627138|
| [9.0,454.0,4354.0]| 11.38122807609627|
|[10.0,455.0,3086.0]|19.418980093841867|
|[10.0,455.0,4425.0]|11.083059190279737|
|[13.5,318.0,4457.0]|12.756873562529005|
|[13.5,351.0,4154.0]|14.321512005853432|
|[14.5,302.0,4042.0]| 15.65000445131097|
|[14.5,318.0,4237.0]|14.280068009708032|
|[12.8,351.0,4215.0]|13.834245753941836|
| [8.5,390.0,3850.0]|15.065946544242387|
|[10.0,429.0,4341.0]|11.859446699208604|
|[11.0,318.0,3399.0]|18.959463107709063|
|[11.5,350.0,3693.0]| 16.89403069480677|
+-------------------+------------------+
only showing top

In [35]:
# Coeficiente de determinação R2
avaliador = RegressionEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "r2")
avaliador.evaluate(predictions) 

0.6983088414198804