# Spark MLLib - Regressão Linear

### Usaremos Regressão Linear para prever os valores de MPG (Miles Per Gallon)

MPG será a variável target e as demais variáveis serão as features (variáveis preditoras).

In [37]:
# Imports
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# Spark Session - usada quando se traballha com Dataframes no Spark
spSession = SparkSession.builder.master("local").appName("DSA-SparkMLLib").getOrCreate()

In [3]:
# Carregando os dados e gerando um RDD
carrosRDD = sc.textFile("data/carros.csv")

In [4]:
# Colocando o RDD em cache. Esse processo otimiza a performace.
carrosRDD.cache()

data/carros.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
# Números de linhas
carrosRDD.count()

399

In [6]:
# Vizualizando as 5 primeira linhas
carrosRDD.take(5)

['MPG,CYLINDERS,DISPLACEMENT,HORSEPOWER,WEIGHT,ACCELERATION,MODELYEAR,NAME',
 '18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,buick skylark 320',
 '18,8,318,150,3436,11,70,plymouth satellite',
 '16,8,304,150,3433,12,70,amc rebel sst']

In [7]:
# Removendo a primeira linha do arquivo (cabeçalho)
carrosRDD2 = carrosRDD.filter(lambda x: "MPG" not in x)
carrosRDD2.take(5)

['18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,buick skylark 320',
 '18,8,318,150,3436,11,70,plymouth satellite',
 '16,8,304,150,3433,12,70,amc rebel sst',
 '17,8,302,140,3449,10.5,70,ford torino']

In [8]:
carrosRDD2.count()

398

## Limpeza dos Dados

In [39]:
carrosRDD2.take(10)

['18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,buick skylark 320',
 '18,8,318,150,3436,11,70,plymouth satellite',
 '16,8,304,150,3433,12,70,amc rebel sst',
 '17,8,302,140,3449,10.5,70,ford torino',
 '15,8,429,198,4341,10,70,ford galaxie 500',
 '14,8,454,220,4354,9,70,chevrolet impala',
 '14,8,440,215,4312,8.5,70,plymouth fury iii',
 '14,8,455,225,4425,10,70,pontiac catalina',
 '15,8,390,190,3850,8.5,70,amc ambassador dpl']

In [10]:
# Usando um valor padrão para average HP (que será usado para preencher os valores missing) 
mediaHP = sc.broadcast(75.0)

In [11]:
# Função para limpezza dos dados
def limpaDados(inputStr):
    
    global mediaHP
    attList = inputStr.split(",")
    
    # Substituir o caracter ? por um valor
    hpValue = attList[3]
    if hpValue == "?":
        hpValue = mediaHP.value
        
    # Criando um linha usando a função Row, limpando e convertendo os dados de string para float
    linhas = Row(MPG = float(attList[0]), CYLINDERS = float(attList[1]), DISPLACEMENT = float(attList[2]), 
                 HORSEPOWER = float(hpValue), WEIGHT = float(attList[4]), ACCELERATION = float(attList[5]), 
                 MODELYEAR = float(attList[6]), NAME = attList[7]) 
    return linhas

In [12]:
# Executando a função no RDD
carrosRDD3 = carrosRDD2.map(limpaDados)
carrosRDD3.cache()
carrosRDD3.take(5)

[Row(MPG=18.0, CYLINDERS=8.0, DISPLACEMENT=307.0, HORSEPOWER=130.0, WEIGHT=3504.0, ACCELERATION=12.0, MODELYEAR=70.0, NAME='chevrolet chevelle malibu'),
 Row(MPG=15.0, CYLINDERS=8.0, DISPLACEMENT=350.0, HORSEPOWER=165.0, WEIGHT=3693.0, ACCELERATION=11.5, MODELYEAR=70.0, NAME='buick skylark 320'),
 Row(MPG=18.0, CYLINDERS=8.0, DISPLACEMENT=318.0, HORSEPOWER=150.0, WEIGHT=3436.0, ACCELERATION=11.0, MODELYEAR=70.0, NAME='plymouth satellite'),
 Row(MPG=16.0, CYLINDERS=8.0, DISPLACEMENT=304.0, HORSEPOWER=150.0, WEIGHT=3433.0, ACCELERATION=12.0, MODELYEAR=70.0, NAME='amc rebel sst'),
 Row(MPG=17.0, CYLINDERS=8.0, DISPLACEMENT=302.0, HORSEPOWER=140.0, WEIGHT=3449.0, ACCELERATION=10.5, MODELYEAR=70.0, NAME='ford torino')]

## Análise Exploratória de Dados

In [13]:
# Criando um DataFrame
carrosDF1 = spSession.createDataFrame(carrosRDD3)

In [14]:
# Vizualizando 
carrosDF1.select("*").show(5)

+----+---------+------------+----------+------+------------+---------+--------------------+
| MPG|CYLINDERS|DISPLACEMENT|HORSEPOWER|WEIGHT|ACCELERATION|MODELYEAR|                NAME|
+----+---------+------------+----------+------+------------+---------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|     70.0|chevrolet chevell...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|     70.0|   buick skylark 320|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|     70.0|  plymouth satellite|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|     70.0|       amc rebel sst|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|     70.0|         ford torino|
+----+---------+------------+----------+------+------------+---------+--------------------+
only showing top 5 rows



In [16]:
# Encontrando a correlação entre a variável target com as variáveis preditoras
for i in carrosDF1.columns:
    if not(isinstance(carrosDF1.select(i).take(1)[0][0], str)):
        print("Correlação da variável MPG com", i, carrosDF1.stat.corr("MPG", i))

Correlação da variável MPG com MPG 1.0
Correlação da variável MPG com CYLINDERS -0.7753962854205539
Correlação da variável MPG com DISPLACEMENT -0.8042028248058979
Correlação da variável MPG com HORSEPOWER -0.7747041523498721
Correlação da variável MPG com WEIGHT -0.8317409332443344
Correlação da variável MPG com ACCELERATION 0.42028891210165054
Correlação da variável MPG com MODELYEAR 0.5792671330833092


In [17]:
# Estatísticas descrítivas
carrosDF1.select("MPG","ACCELERATION","DISPLACEMENT","WEIGHT").describe().show()

+-------+------------------+------------------+------------------+-----------------+
|summary|               MPG|      ACCELERATION|      DISPLACEMENT|           WEIGHT|
+-------+------------------+------------------+------------------+-----------------+
|  count|               398|               398|               398|              398|
|   mean|23.514572864321615|15.568090452261291|193.42587939698493|2970.424623115578|
| stddev| 7.815984312565783| 2.757688929812676|104.26983817119587|846.8417741973268|
|    min|               9.0|               8.0|              68.0|           1613.0|
|    max|              46.6|              24.8|             455.0|           5140.0|
+-------+------------------+------------------+------------------+-----------------+



## Pré-Processamento dos Dados

In [18]:
# Convertendo para um LabeledPoint (target, Vactor[features])
# Removendo colunbas não relevantes para o moelo ou com baixa correlação
def transformaVar(row):
    obj = (row["MPG"], Vectors.dense([row["ACCELERATION"], row["DISPLACEMENT"], row["WEIGHT"]]))
    return obj

In [19]:
# Utilizando o RDD, palica a função, converte para DataFrame e palica a função select()
carrosRDD4 = carrosRDD3.map(transformaVar)
carrosDF = spSession.createDataFrame(carrosRDD4,["label", "features"])
carrosDF.select("label", "features").show(10)

+-----+-------------------+
|label|           features|
+-----+-------------------+
| 18.0|[12.0,307.0,3504.0]|
| 15.0|[11.5,350.0,3693.0]|
| 18.0|[11.0,318.0,3436.0]|
| 16.0|[12.0,304.0,3433.0]|
| 17.0|[10.5,302.0,3449.0]|
| 15.0|[10.0,429.0,4341.0]|
| 14.0| [9.0,454.0,4354.0]|
| 14.0| [8.5,440.0,4312.0]|
| 14.0|[10.0,455.0,4425.0]|
| 15.0| [8.5,390.0,3850.0]|
+-----+-------------------+
only showing top 10 rows



In [20]:
carrosRDD4.take(5)

[(18.0, DenseVector([12.0, 307.0, 3504.0])),
 (15.0, DenseVector([11.5, 350.0, 3693.0])),
 (18.0, DenseVector([11.0, 318.0, 3436.0])),
 (16.0, DenseVector([12.0, 304.0, 3433.0])),
 (17.0, DenseVector([10.5, 302.0, 3449.0]))]

## Machine Learning

In [22]:
# Dados de Treino e de Teste
(dados_treino, dados_teste) = carrosDF.randomSplit([0.7,0.3])

In [23]:
dados_treino.count()

279

In [24]:
dados_teste.count()

119

In [25]:
# Treinamento e criação do modelo
linearReg = LinearRegression(maxIter = 10)
modelo = linearReg.fit(dados_treino)

In [26]:
print(modelo)

LinearRegressionModel: uid=LinearRegression_da65f2a57539, numFeatures=3


In [27]:
# Imprimindo as métricas
print("Coeficientes: " + str(modelo.coefficients))
print("Intercepto: " + str(modelo.intercept))

Coeficientes: [0.1587199090492492,-0.008287347950874687,-0.006493145439454635]
Intercepto: 41.934345241874695


In [29]:
# Previsões com dados de teste
predictions = modelo.transform(dados_teste)
predictions.select("features", "prediction").show()

+-------------------+------------------+
|           features|        prediction|
+-------------------+------------------+
|[18.5,304.0,4732.0]|11.625745562720567|
|[11.5,383.0,4955.0]| 8.412034278258346|
|[12.5,350.0,4499.0]|11.805110990077772|
|[12.0,302.0,3169.0]|20.759427171669795|
|[12.0,400.0,4464.0]| 11.53864372839032|
|[12.0,400.0,5140.0]| 7.149277411318991|
|[12.5,400.0,4422.0]|11.890715791372042|
|[13.0,360.0,4654.0]|10.795159921978176|
|[13.2,318.0,3940.0]|15.811078361495376|
|[14.5,350.0,4699.0]|10.823921720285341|
|[15.0,302.0,3870.0]| 16.68389194575984|
| [8.5,440.0,4312.0]|11.638588235480068|
|[10.0,455.0,3086.0]|  19.7129541885622|
|[12.0,350.0,4209.0]|13.608763212994987|
|[13.5,351.0,4154.0]|14.195678727787993|
|[14.5,302.0,4042.0]| 15.48771097564902|
| [9.5,400.0,3761.0]|15.706525199703805|
|[12.5,304.0,3892.0]| 16.12766827756697|
|[13.5,318.0,4135.0]|14.592530973516496|
|[13.7,318.0,4140.0]|14.591809228129073|
+-------------------+------------------+
only showing top

In [33]:
# Coeficiente de determinação R2
avaliador = RegressionEvaluator(predictionCol= "prediction", labelCol="label", metricName="r2")
r2 = avaliador.evaluate(predictions)

print("O Modelo apresenta Acuracidade de " + str(r2*100))

O Modelo apresenta Acuracidade de 71.12316606711612
