# Spark MLLIB Regressão linear

 - Método para avaliar o relacionamento entre variaveis
 - Estima o valor de uma variável dependente a partir dos valores de variáveis independentes 
 - Usado quando as variaveis dependentes e independentes são continuas possuem alguma correlação
 - O R-square mede o quao perdo os dados estão da loinha de regreção. O valor do R-quare será sempre entre 0 e 1, quanto maior o valor melhor.
 - Os dados de entreda e de saida são usados para construção do modelo. A equeção linear retorna os valores dos coeficientes
 - A equação linear representa o modelo
 
 ### Vantagens
 - baixo custo
 - veloz
 - Excelent para relações lineares
 
 ### Desvantagens
 - somente variáveis numéricas
 - sensivel a outliers
 
 ### Aplicação
 - Um dos modelos mais antigos e pode resolver varios problemas

## Exemplo irá prever os valores de milhas por galão MPG


In [1]:
# imports
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [5]:
# criando spark session
spSession = SparkSession.builder.master("local").appName('Diego-SparkLib').getOrCreate()

In [4]:
# carregando os dados e gerando um RDD
carrosRDD = sc.textFile('5-Arquivos-Cap11/data/carros.csv')

In [6]:
# colocando os dados em um cache para otimizar a performance
carrosRDD.cache()

5-Arquivos-Cap11/data/carros.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [7]:
carrosRDD.count()

399

In [8]:
carrosRDD.take(5)

['MPG,CYLINDERS,DISPLACEMENT,HORSEPOWER,WEIGHT,ACCELERATION,MODELYEAR,NAME',
 '18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,buick skylark 320',
 '18,8,318,150,3436,11,70,plymouth satellite',
 '16,8,304,150,3433,12,70,amc rebel sst']

In [10]:
carrosRDD2 = carrosRDD.filter(lambda x: 'DISPLACEMENT' not in x)
carrosRDD2.count()

398

### limpeza dos dados

In [28]:
# criando um valor padrão para HP
medianHP = sc.broadcast(75.0) # criando uma variavel ready only

In [35]:
# função para limpar os dados
def limpaDados(dado):
    global medianHP # pegando a variavel ready only
    attList = dado.split(',') # quebrando os dados por linha
    
    hpValue = attList[3] #pegando a coluna de indice 3
    if hpValue == "?":
        hpValue = medianHP.value
        
    # criando um dataseta atravez da função ROW, limpando os dados e convertendo dados para float 
    linhas = Row(MPG = float(attList[0]), 
                 CYLINDERS = float(attList[1]), 
                 DISPLACEMENT = float(attList[2]), 
                 HORSEPOWER = float(hpValue), 
                 HWEIGHT = float(attList[4]), 
                 ACCELERATION = float(attList[5]),
                 MODELYEAR = float(attList[6]), 
                 NAME = attList[7])
    return linhas

In [36]:
#aplicando a funcao limpaDados
carrosRDD3 = carrosRDD2.map(limpaDados)
carrosRDD3.cache()
carrosRDD3.take(5)

[Row(MPG=18.0, CYLINDERS=8.0, DISPLACEMENT=307.0, HORSEPOWER=130.0, HWEIGHT=3504.0, ACCELERATION=12.0, MODELYEAR=70.0, NAME='chevrolet chevelle malibu'),
 Row(MPG=15.0, CYLINDERS=8.0, DISPLACEMENT=350.0, HORSEPOWER=165.0, HWEIGHT=3693.0, ACCELERATION=11.5, MODELYEAR=70.0, NAME='buick skylark 320'),
 Row(MPG=18.0, CYLINDERS=8.0, DISPLACEMENT=318.0, HORSEPOWER=150.0, HWEIGHT=3436.0, ACCELERATION=11.0, MODELYEAR=70.0, NAME='plymouth satellite'),
 Row(MPG=16.0, CYLINDERS=8.0, DISPLACEMENT=304.0, HORSEPOWER=150.0, HWEIGHT=3433.0, ACCELERATION=12.0, MODELYEAR=70.0, NAME='amc rebel sst'),
 Row(MPG=17.0, CYLINDERS=8.0, DISPLACEMENT=302.0, HORSEPOWER=140.0, HWEIGHT=3449.0, ACCELERATION=10.5, MODELYEAR=70.0, NAME='ford torino')]

## Análise exploratória dos dados

In [37]:
carrosDF = spSession.createDataFrame(carrosRDD3)

In [38]:
carrosDF.take(5)

[Row(MPG=18.0, CYLINDERS=8.0, DISPLACEMENT=307.0, HORSEPOWER=130.0, HWEIGHT=3504.0, ACCELERATION=12.0, MODELYEAR=70.0, NAME='chevrolet chevelle malibu'),
 Row(MPG=15.0, CYLINDERS=8.0, DISPLACEMENT=350.0, HORSEPOWER=165.0, HWEIGHT=3693.0, ACCELERATION=11.5, MODELYEAR=70.0, NAME='buick skylark 320'),
 Row(MPG=18.0, CYLINDERS=8.0, DISPLACEMENT=318.0, HORSEPOWER=150.0, HWEIGHT=3436.0, ACCELERATION=11.0, MODELYEAR=70.0, NAME='plymouth satellite'),
 Row(MPG=16.0, CYLINDERS=8.0, DISPLACEMENT=304.0, HORSEPOWER=150.0, HWEIGHT=3433.0, ACCELERATION=12.0, MODELYEAR=70.0, NAME='amc rebel sst'),
 Row(MPG=17.0, CYLINDERS=8.0, DISPLACEMENT=302.0, HORSEPOWER=140.0, HWEIGHT=3449.0, ACCELERATION=10.5, MODELYEAR=70.0, NAME='ford torino')]

In [39]:
# estatistica descritiva
carrosDF.select('MPG','CYLINDERS').describe().show()

+-------+-----------------+------------------+
|summary|              MPG|         CYLINDERS|
+-------+-----------------+------------------+
|  count|              398|               398|
|   mean|23.51457286432161| 5.454773869346734|
| stddev|7.815984312565782|1.7010042445332125|
|    min|              9.0|               3.0|
|    max|             46.6|               8.0|
+-------+-----------------+------------------+



In [41]:
# encontrando a correlação entre a variáveos
for i in carrosDF.columns:
    if not(isinstance(carrosDF.select(i).take(1)[0][0], str)):
        print("Correlação entre a variável target MPG e a variavel ", i, carrosDF.stat.corr('MPG',i))

Correlação entre a variável target MPG e a variavel  MPG 1.0
Correlação entre a variável target MPG e a variavel  CYLINDERS -0.7753962854205548
Correlação entre a variável target MPG e a variavel  DISPLACEMENT -0.8042028248058979
Correlação entre a variável target MPG e a variavel  HORSEPOWER -0.7747041523498721
Correlação entre a variável target MPG e a variavel  HWEIGHT -0.8317409332443347
Correlação entre a variável target MPG e a variavel  ACCELERATION 0.4202889121016501
Correlação entre a variável target MPG e a variavel  MODELYEAR 0.5792671330833091


## Pré processamento dos dados

Vetores esparsos são vetores que possuem muitos valores como 0. Enquanto que um vetor denso é quando a maioria dos valores do vetor são diferentes de 0
Conceitualmente são o mesmo objeto, Apenas um vetor. Normalmente um vetor esparso é representado por uma tupla (id, valor). 
Por exemplo, um vetor denso (1,2,0,0,5,0,9,0,0) seria representado como esparso dessa forma (0,1,4,6),(1,2,5,9)

In [111]:
# convertend para um labeld point (target, vector[features])
# removendo as colunas não relevantes para o modelo ou com baixa correlação
def transformaVar(row):
    obj = (row['MPG'], Vectors.dense([row['HORSEPOWER'], row['DISPLACEMENT'], row['HWEIGHT'], row['CYLINDERS']]))
    return obj

In [112]:
# Utiliza o RDD, aplica a função, converte para dataframe e aplca a função select
carrosRDD4 = carrosRDD3.map(transformaVar)
carrosDF = spSession.createDataFrame(carrosRDD4,['label','features'])
carrosDF.select('label','features').show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
| 18.0|[130.0,307.0,3504...|
| 15.0|[165.0,350.0,3693...|
| 18.0|[150.0,318.0,3436...|
| 16.0|[150.0,304.0,3433...|
| 17.0|[140.0,302.0,3449...|
| 15.0|[198.0,429.0,4341...|
| 14.0|[220.0,454.0,4354...|
| 14.0|[215.0,440.0,4312...|
| 14.0|[225.0,455.0,4425...|
| 15.0|[190.0,390.0,3850...|
+-----+--------------------+
only showing top 10 rows



## Machine Learning

In [113]:
# dividindo em dado de treino e teste
(dados_treino, dados_teste) = carrosDF.randomSplit([0.7,0.3])

In [114]:
dados_treino.count()

288

In [115]:
dados_teste.count()

110

In [116]:
linearReg = LinearRegression(maxIter=10)
modelo = linearReg.fit(dados_treino)

In [117]:
# imprimindo as metricas
print('Coeficientes' + str(modelo.coefficients))
print('Intercepto' + str(modelo.intercept))

Coeficientes[-0.044004700223541825,0.0020292710398785514,-0.0052801063634036706,-0.516117219416024]
Intercepto46.1810509835858


In [118]:
predictions = modelo.transform(dados_teste)
predictions.select('features','prediction').show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[215.0,360.0,4615...| 8.953949387444446|
|[160.0,350.0,4456...|12.193452101121657|
|[225.0,455.0,4951...| 6.932567395893862|
|[130.0,307.0,4098...|15.316612531211643|
|[150.0,400.0,4464...| 12.69272180444377|
|[158.0,351.0,4363...|12.774540664405158|
|[175.0,350.0,4100...|13.413099463140234|
|[137.0,302.0,4042...|15.294119230798064|
|[140.0,302.0,4638...|12.015161737538854|
|[150.0,318.0,4457...|12.563282323717551|
|[175.0,400.0,4464...|11.592604298855228|
|[225.0,455.0,4425...| 9.709903343044196|
|[72.0,250.0,3158....| 23.74875111533549|
|[150.0,318.0,3777...| 16.15375465083205|
|[150.0,318.0,4135...|14.263476572733534|
|[150.0,400.0,3761...|16.404636577916552|
|[190.0,390.0,3850...|14.154226392233163|
|[100.0,250.0,3781...| 19.22711324467583|
|[150.0,318.0,4190...|13.973070722746336|
|[100.0,250.0,3329...|21.613721320934292|
+--------------------+------------

In [119]:
# coeficiente de determinação
avaliador = RegressionEvaluator(predictionCol="prediction",labelCol="label", metricName="r2")
avaliador.evaluate(predictions)

0.7116182846692942