In [1]:
import findspark
findspark.init()

In [2]:
import numpy as np
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
sc = SparkContext(appName = 'Lab5')

In [4]:
sc.setLogLevel('ERROR')

In [5]:
#SparkSession
ss = SparkSession.builder.master("local").getOrCreate()

In [6]:
#Carregar os dados gerando um RDD
dados = sc.textFile('dataset1.csv')

In [7]:
type(dados)

pyspark.rdd.RDD

In [8]:
#Cache otimiza a performance
dados.cache()

dataset1.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
dados.count()

399

In [10]:
dados.take(5)

['consumo,numero_cilindros,capacidade,horsepower,peso,aceleracao,ano,nome',
 '30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200']

In [11]:
#Removendo a primeira linha (cabeçalho)
dados2 = dados.filter(lambda x:"peso" not in x)
dados2.count()

398

In [12]:
dados2.take(5)

['30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200',
 '27,4,97,60,1834,19,71,volkswagen model 111']

#### Para processamento, RDDs são bons, mas são ruins para exploração de dados. Portanto, iremos converter o RDD para o Spark Dataframe e depois para o Pandas Dataframe

In [13]:
spark_df = dados2.map(lambda x:str(x)).map(lambda y:y.split(",")).toDF()

In [14]:
spark_df.show()

+---+---+----+---+----+----+---+--------------------+
| _1| _2|  _3| _4|  _5|  _6| _7|                  _8|
+---+---+----+---+----+----+---+--------------------+
| 30|  4|  79| 70|2074|19.5| 71|         peugeot 304|
| 30|  4|  88| 76|2065|14.5| 71|           fiat 124b|
| 31|  4|  71| 65|1773|  19| 71| toyota corolla 1200|
| 35|  4|  72| 69|1613|  18| 71|         datsun 1200|
| 27|  4|  97| 60|1834|  19| 71|volkswagen model 111|
| 26|  4|  91| 70|1955|20.5| 71|    plymouth cricket|
| 24|  4| 113| 95|2278|15.5| 72|toyota corona har...|
| 25|  4|97.5| 80|2126|  17| 72|  dodge colt hardtop|
| 23|  4|  97| 54|2254|23.5| 72|   volkswagen type 3|
| 20|  4| 140| 90|2408|19.5| 72|      chevrolet vega|
| 21|  4| 122| 86|2226|16.5| 72| ford pinto runabout|
| 13|  8| 350|165|4274|  12| 72|    chevrolet impala|
| 14|  8| 400|175|4385|  12| 72|    pontiac catalina|
| 15|  8| 318|150|4135|13.5| 72|   plymouth fury iii|
| 14|  8| 351|153|4129|  13| 72|    ford galaxie 500|
| 17|  8| 304|150|3672|11.5|

In [15]:
pandas_df = spark_df.toPandas()

In [16]:
pandas_df

Unnamed: 0,_1,_2,_3,_4,_5,_6,_7,_8
0,30,4,79,70,2074,19.5,71,peugeot 304
1,30,4,88,76,2065,14.5,71,fiat 124b
2,31,4,71,65,1773,19,71,toyota corolla 1200
3,35,4,72,69,1613,18,71,datsun 1200
4,27,4,97,60,1834,19,71,volkswagen model 111
...,...,...,...,...,...,...,...,...
393,27,4,140,86,2790,15.6,82,ford mustang gl
394,44,4,97,52,2130,24.6,82,vw pickup
395,32,4,135,84,2295,11.6,82,dodge rampage
396,28,4,120,79,2625,18.6,82,ford ranger


In [17]:
pandas_df.isnull().values.any()

False

In [18]:
np.sum(pandas_df.apply(lambda x:x.str.contains('\?')).values)

6

In [19]:
np.where(pandas_df.apply(lambda x:x.str.contains('\?').values))

(array([ 48, 126, 330, 336, 354, 374], dtype=int64),
 array([3, 3, 3, 3, 3, 3], dtype=int64))

In [20]:
pandas_df.iloc[48]

_1            25
_2             4
_3            98
_4             ?
_5          2046
_6            19
_7            71
_8    ford pinto
Name: 48, dtype: object

In [21]:
media = pandas_df[~pandas_df["_4"].str.contains("\?")]["_4"].astype(int).mean()

In [22]:
#Valor padrão do HP (média para substituir valores ausentes)
mediaHP = sc.broadcast(media)

In [23]:
#Limpeza dos dados - função
def limpeza(string):
    
    global mediaHP
    
    #Lista de atributos
    str_final = string.split(",")
    
    possivel_nulo = str_final[3]
    
    #Substitui o caractere pela média
    if possivel_nulo == "?":
        possivel_nulo = mediaHP.value
    
    #Criando as linhas do data_frame
    linhas = Row(consumo = float(str_final[0]), 
                 numero_cilindros = float(str_final[1]), 
                 capacidade = float(str_final[2]), 
                 horsepower = float(possivel_nulo), 
                 peso = float(str_final[4]), 
                 aceleracao = float(str_final[5]), 
                 ano = float(str_final[6]), 
                 nome = str_final[7])
    return linhas
    

In [24]:
dados3 = dados2.map(limpeza)
dados3.cache()
dados3.take(5)

[Row(consumo=30.0, numero_cilindros=4.0, capacidade=79.0, horsepower=70.0, peso=2074.0, aceleracao=19.5, ano=71.0, nome='peugeot 304'),
 Row(consumo=30.0, numero_cilindros=4.0, capacidade=88.0, horsepower=76.0, peso=2065.0, aceleracao=14.5, ano=71.0, nome='fiat 124b'),
 Row(consumo=31.0, numero_cilindros=4.0, capacidade=71.0, horsepower=65.0, peso=1773.0, aceleracao=19.0, ano=71.0, nome='toyota corolla 1200'),
 Row(consumo=35.0, numero_cilindros=4.0, capacidade=72.0, horsepower=69.0, peso=1613.0, aceleracao=18.0, ano=71.0, nome='datsun 1200'),
 Row(consumo=27.0, numero_cilindros=4.0, capacidade=97.0, horsepower=60.0, peso=1834.0, aceleracao=19.0, ano=71.0, nome='volkswagen model 111')]

In [25]:
df_carros = dados3.toDF()

In [26]:
df_carros.show()

+-------+----------------+----------+----------+------+----------+----+--------------------+
|consumo|numero_cilindros|capacidade|horsepower|  peso|aceleracao| ano|                nome|
+-------+----------------+----------+----------+------+----------+----+--------------------+
|   30.0|             4.0|      79.0|      70.0|2074.0|      19.5|71.0|         peugeot 304|
|   30.0|             4.0|      88.0|      76.0|2065.0|      14.5|71.0|           fiat 124b|
|   31.0|             4.0|      71.0|      65.0|1773.0|      19.0|71.0| toyota corolla 1200|
|   35.0|             4.0|      72.0|      69.0|1613.0|      18.0|71.0|         datsun 1200|
|   27.0|             4.0|      97.0|      60.0|1834.0|      19.0|71.0|volkswagen model 111|
|   26.0|             4.0|      91.0|      70.0|1955.0|      20.5|71.0|    plymouth cricket|
|   24.0|             4.0|     113.0|      95.0|2278.0|      15.5|72.0|toyota corona har...|
|   25.0|             4.0|      97.5|      80.0|2126.0|      17.0|72.0

In [27]:
#Correlação entre a variável target a as outras variáveis
for coluna in df_carros.columns:
    if not (isinstance(df_carros.select(coluna).take(1)[0][0],str)):
        print('Correlação entre variável target e: ',coluna,df_carros.stat.corr('consumo',coluna))

Correlação entre variável target e:  consumo 1.0
Correlação entre variável target e:  numero_cilindros -0.7753962854205548
Correlação entre variável target e:  capacidade -0.8042028248058978
Correlação entre variável target e:  horsepower -0.7714371350025528
Correlação entre variável target e:  peso -0.8317409332443347
Correlação entre variável target e:  aceleracao 0.4202889121016496
Correlação entre variável target e:  ano 0.5792671330833099


#### Pré processamento

In [28]:
#Removendo as colunas não relevantes ou com baixa correlação
def transformar(row):
    obj = (row['consumo'],Vectors.dense([row['peso'],row['capacidade'],row['numero_cilindros']]))
    return obj

In [29]:
dados4 = dados3.map(transformar)

In [30]:
dados4.take(5)

[(30.0, DenseVector([2074.0, 79.0, 4.0])),
 (30.0, DenseVector([2065.0, 88.0, 4.0])),
 (31.0, DenseVector([1773.0, 71.0, 4.0])),
 (35.0, DenseVector([1613.0, 72.0, 4.0])),
 (27.0, DenseVector([1834.0, 97.0, 4.0]))]

In [31]:
df_carros = ss.createDataFrame(dados4,['label','features'])

In [32]:
df_carros.select('label','features').show()

+-----+------------------+
|label|          features|
+-----+------------------+
| 30.0| [2074.0,79.0,4.0]|
| 30.0| [2065.0,88.0,4.0]|
| 31.0| [1773.0,71.0,4.0]|
| 35.0| [1613.0,72.0,4.0]|
| 27.0| [1834.0,97.0,4.0]|
| 26.0| [1955.0,91.0,4.0]|
| 24.0|[2278.0,113.0,4.0]|
| 25.0| [2126.0,97.5,4.0]|
| 23.0| [2254.0,97.0,4.0]|
| 20.0|[2408.0,140.0,4.0]|
| 21.0|[2226.0,122.0,4.0]|
| 13.0|[4274.0,350.0,8.0]|
| 14.0|[4385.0,400.0,8.0]|
| 15.0|[4135.0,318.0,8.0]|
| 14.0|[4129.0,351.0,8.0]|
| 17.0|[3672.0,304.0,8.0]|
| 18.0|[3504.0,307.0,8.0]|
| 15.0|[3693.0,350.0,8.0]|
| 18.0|[3436.0,318.0,8.0]|
| 16.0|[3433.0,304.0,8.0]|
+-----+------------------+
only showing top 20 rows



In [33]:
#Divisão treino e teste
(dados_treino,dados_teste) = df_carros.randomSplit([0.8,0.2])

In [34]:
dados_treino.count()

321

In [35]:
dados_teste.count()

77

### Machine Learning

In [36]:
linearReg = LinearRegression()

In [37]:
modelo = linearReg.fit(dados_treino)

In [39]:
#Coeficientes aprendidos pelo modelo
print("Coeficiente: " + str(modelo.coefficients))
print("Intercepto: " + str(modelo.intercept))

Coeficiente: [-0.006002798811775153,-0.013596403160059682,-0.140564658529604]
Intercepto: 44.845578965894575


In [40]:
#Previsão
predictions = modelo.transform(dados_teste)

In [42]:
predictions.select('features','prediction').show()

+------------------+------------------+
|          features|        prediction|
+------------------+------------------+
|[4906.0,400.0,8.0]|  8.83276946306497|
|[4952.0,429.0,8.0]| 8.162345026081582|
|[3821.0,360.0,8.0]|15.889662300243394|
|[3988.0,350.0,8.0]|15.023158930277543|
|[4464.0,400.0,8.0]|11.486006537869585|
|[4699.0,350.0,8.0]|10.755168975105413|
|[3609.0,340.0,8.0]| 17.43418371154092|
|[4042.0,302.0,8.0]|15.351635146124547|
|[4096.0,318.0,8.0]|14.809941559727736|
|[4385.0,400.0,8.0]|11.960227643999822|
|[3761.0,400.0,8.0]|15.705974102547515|
|[3777.0,318.0,8.0]|16.724834380684012|
|[3892.0,304.0,8.0]|  16.2248621615707|
|[4341.0,429.0,8.0]|11.830055100076201|
|[3278.0,250.0,6.0]|20.925915719703077|
|[4668.0,400.0,8.0]| 10.26143558026746|
|[3907.0,231.0,6.0]|17.408486927137638|
|[4215.0,305.0,8.0]|14.272361742207266|
|[2933.0,121.0,4.0]|25.031946634472416|
|[2868.0,121.0,4.0]|25.422128557237798|
+------------------+------------------+
only showing top 20 rows



In [43]:
#Coeficiente de determinação R2
avaliador = RegressionEvaluator(predictionCol = 'prediction',labelCol = 'label',metricName = 'r2')

In [45]:
avaliador.evaluate(predictions)

0.700207897081574