# <font color='gray'>Thiago</font>
# <font color='gray'>Big Data Real-Time Analytics com Python e Spark</font>


## <font color='gray'>Machine Learning com PySpark</font>

In [1]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

Versão da Linguagem Python Usada Neste Jupyter Notebook: 3.9.13


In [2]:
# Para atualizar um pacote, execute o comando abaixo no terminal ou prompt de comando:
# pip install -U nome_pacote

# Para instalar a versão exata de um pacote, execute o comando abaixo no terminal ou prompt de comando:
#!pip install nome_pacote==versão_desejada

# Depois de instalar ou atualizar o pacote, reinicie o jupyter notebook.

# Instala o pacote watermark. 
# Esse pacote é usado para gravar as versões de outros pacotes usados neste jupyter notebook.
#!pip install -q -U watermark

In [2]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [3]:
# Imports
import numpy as np
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Thiago Gragnanello" --iversions

Author: Thiago Gragnanello

findspark: 2.0.1
numpy    : 1.21.5
pyspark  : 3.5.0



## Carregando os Dados

In [5]:
# Criando o Spark Context
sc = SparkContext(appName = "Lab5")

23/10/15 20:54:51 WARN Utils: Your hostname, thiago-Nitro resolves to a loopback address: 127.0.1.1; using 192.168.0.110 instead (on interface wlp0s20f3)
23/10/15 20:54:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/15 20:54:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
sc.setLogLevel("ERROR")

In [7]:
# Criando o Spark Session
spSession = SparkSession.builder.master("local").getOrCreate()

In [8]:
# Carregando os dados e gerando um RDD
dados = sc.textFile("dados/dataset1.csv")

In [10]:
type(dados)

pyspark.rdd.RDD

In [11]:
# Colocando o RDD em cache. Esse processo otimiza a performance

# Coloca os dados em uma área que o processo pode ser mais rápido


dados.cache()

dados/dataset1.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
# Número de registros
dados.count()

[Stage 0:>                                                          (0 + 2) / 2]                                                                                

399

In [10]:
# Visualizando as primeiras linhas

# Variável consumo será a variável Target

dados.take(5)

['consumo,numero_cilindros,capacidade,horsepower,peso,aceleracao,ano,nome',
 '30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200']

In [11]:
# Removendo a primeira linha do arquivo (cabeçalho)

# Retorne o valor de x menos as linhas que tenha horsepower, por exemplo. Tira a linha do cabeçalho


dados2 = dados.filter(lambda x: "horsepower" not in x)
dados2.count()

398

In [12]:
# Visualizando as primeiras linhas
dados2.take(5)

['30,4,79,70,2074,19.5,71,peugeot 304',
 '30,4,88,76,2065,14.5,71,fiat 124b',
 '31,4,71,65,1773,19,71,toyota corolla 1200',
 '35,4,72,69,1613,18,71,datsun 1200',
 '27,4,97,60,1834,19,71,volkswagen model 111']

## Limpeza dos Dados

Vamos verificar se há valores ausentes. RDDs são ótimos para processamento, mas ruins para exploração, então converteremos o RDD para DataFrame Spark e então para DataFrame Pandas.

In [13]:
# Converte RDD para DataFrame Spark

# Não é convertido para o Pandas diretamente, pois precisa sofrer algumas alterações antes
# Essa mudança é necessaria para manipular os dados. Em pandas é mais fácil

# map = mapeia uma função por linha
# split divide cada linha pela ",", ou seja, separando as colunas
# Converto para string "str"
# toDF() converto para DF spark


df_spark = dados2.map(lambda x: str(x)).map(lambda w: w.split(',')).toDF()

In [14]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [15]:
# Converte DataFrame Spark para DataFrame Pandas
df_pandas = df_spark.toPandas()

In [16]:
type(df_pandas)

pandas.core.frame.DataFrame

In [17]:
df_pandas.head()

Unnamed: 0,_1,_2,_3,_4,_5,_6,_7,_8
0,30,4,79,70,2074,19.5,71,peugeot 304
1,30,4,88,76,2065,14.5,71,fiat 124b
2,31,4,71,65,1773,19.0,71,toyota corolla 1200
3,35,4,72,69,1613,18.0,71,datsun 1200
4,27,4,97,60,1834,19.0,71,volkswagen model 111


In [18]:
# Tem valores nulos?
df_pandas.isnull().values.any()

False

Não há valor nulo, mas será que há valor ausente (falta de informação)? Vamos checar.

In [19]:
# Verifica se há alguma linha com caracter especial "?"

# Cada valor "values"
# contém "contains" a string "?"
# se tiver, some


total = np.sum(df_pandas.apply(lambda x: x.str.contains('\?')).values)
total

6

In [20]:
# Vejamos quais linhas e colunas têm o caracter especial
np.where(df_pandas.apply(lambda x: x.str.contains('\?')).values)

(array([ 48, 126, 330, 336, 354, 374]), array([3, 3, 3, 3, 3, 3]))

In [21]:
# Visualizando uma linha
df_pandas.iloc[330]

_1                    40.9
_2                       4
_3                      85
_4                       ?
_5                    1835
_6                    17.3
_7                      80
_8    renault lecar deluxe
Name: 330, dtype: object

In [22]:
# Usando um valor padrão para average HP (que será usado para preencher os valores ausentes)
mediaHP = sc.broadcast(75.0)

In [23]:
# Função para limpeza dos dados
def limpaDados(inputStr) :
    
    # Variável global
    global mediaHP
    
    # Lista de atributos
    attList = inputStr.split(",")
    
    # Substitui o caracter ? por um valor na coluna de índice 3
    hpValue = attList[3]
    if hpValue == "?":
        hpValue = mediaHP.value
       
    # Cria uma linha usando a função Row, limpando e convertendo os dados de string para float
    linhas = Row(consumo = float(attList[0]), 
                 numero_cilindros = float(attList[1]), 
                 capacidade = float(attList[2]), 
                 hosrsepower = float(hpValue), 
                 peso = float(attList[4]), 
                 aceleracao = float(attList[5]), 
                 ano = float(attList[6]), 
                 nome = attList[7]) 
    return linhas

In [24]:
# Executa a função no RDD
dados3 = dados2.map(limpaDados)
dados3.cache()
dados3.take(5)

[Row(consumo=30.0, numero_cilindros=4.0, capacidade=79.0, hosrsepower=70.0, peso=2074.0, aceleracao=19.5, ano=71.0, nome='peugeot 304'),
 Row(consumo=30.0, numero_cilindros=4.0, capacidade=88.0, hosrsepower=76.0, peso=2065.0, aceleracao=14.5, ano=71.0, nome='fiat 124b'),
 Row(consumo=31.0, numero_cilindros=4.0, capacidade=71.0, hosrsepower=65.0, peso=1773.0, aceleracao=19.0, ano=71.0, nome='toyota corolla 1200'),
 Row(consumo=35.0, numero_cilindros=4.0, capacidade=72.0, hosrsepower=69.0, peso=1613.0, aceleracao=18.0, ano=71.0, nome='datsun 1200'),
 Row(consumo=27.0, numero_cilindros=4.0, capacidade=97.0, hosrsepower=60.0, peso=1834.0, aceleracao=19.0, ano=71.0, nome='volkswagen model 111')]

## Análise Exploratória de Dados

In [25]:
# Cria um Dataframe
df_carros = spSession.createDataFrame(dados3)

In [26]:
# Estatísticas descritivas de duas variáveis (como exemplo)
df_carros.select("consumo", "numero_cilindros").describe().show()

+-------+-----------------+-----------------+
|summary|          consumo| numero_cilindros|
+-------+-----------------+-----------------+
|  count|              398|              398|
|   mean|23.51457286432161|5.454773869346734|
| stddev|7.815984312565782|1.701004244533212|
|    min|              9.0|              3.0|
|    max|             46.6|              8.0|
+-------+-----------------+-----------------+



In [27]:
# Encontrando a correlação entre a variável target com as variáveis preditoras (exceto o nome)
for i in df_carros.columns:
    if not(isinstance(df_carros.select(i).take(1)[0][0], str)) :
        print("Correlação da Variável Target com:", i, df_carros.stat.corr('consumo', i))

Correlação da Variável Target com: consumo 1.0
Correlação da Variável Target com: numero_cilindros -0.7753962854205548
Correlação da Variável Target com: capacidade -0.8042028248058978
Correlação da Variável Target com: hosrsepower -0.774704152349872
Correlação da Variável Target com: peso -0.8317409332443347
Correlação da Variável Target com: aceleracao 0.4202889121016496
Correlação da Variável Target com: ano 0.5792671330833099


## Pré-Processamento dos Dados

# <font color = 'black'> VETOR DENSO x VETOR ESPARSO </font>

In [28]:
# Convertendo para um LabeledPoint (target, Vector[features])
# Remove colunas não relevantes ou com baixa correlação


# Vetor Denso: Quando a maioria dos valores são diferentes de zero

# Vetor Esparso: Contém muitos valores zero


# Formato de tupla

def transformaVar(row) :
    obj = (row["consumo"], Vectors.dense([row["peso"], row["capacidade"], row["numero_cilindros"]]))
    return obj

In [29]:
# Aplica a função no RDD e cria outro RDD
dados4 = dados3.map(transformaVar)

In [30]:
# Visualiza
dados4.take(5)

[(30.0, DenseVector([2074.0, 79.0, 4.0])),
 (30.0, DenseVector([2065.0, 88.0, 4.0])),
 (31.0, DenseVector([1773.0, 71.0, 4.0])),
 (35.0, DenseVector([1613.0, 72.0, 4.0])),
 (27.0, DenseVector([1834.0, 97.0, 4.0]))]

In [31]:
# Converte o RDD para DataFrame do Spark
df_carros = spSession.createDataFrame(dados4, ["label", "features"])

In [32]:
# Visualiza label (y) e atributos (x)
df_carros.select("label","features").show(10)

+-----+------------------+
|label|          features|
+-----+------------------+
| 30.0| [2074.0,79.0,4.0]|
| 30.0| [2065.0,88.0,4.0]|
| 31.0| [1773.0,71.0,4.0]|
| 35.0| [1613.0,72.0,4.0]|
| 27.0| [1834.0,97.0,4.0]|
| 26.0| [1955.0,91.0,4.0]|
| 24.0|[2278.0,113.0,4.0]|
| 25.0| [2126.0,97.5,4.0]|
| 23.0| [2254.0,97.0,4.0]|
| 20.0|[2408.0,140.0,4.0]|
+-----+------------------+
only showing top 10 rows



In [33]:
# Divisão em dados de Treino e de Teste com split 70/30
(dados_treino, dados_teste) = df_carros.randomSplit([0.7, 0.3])

In [34]:
dados_treino.count()

291

In [35]:
dados_teste.count()

107

## Machine Learning

In [36]:
# Cria o objeto
linearReg = LinearRegression()

In [37]:
# Treina o objeto com dados e cria o modelo
modelo = linearReg.fit(dados_treino)

In [38]:
print(modelo)

LinearRegressionModel: uid=LinearRegression_3ab15ef57878, numFeatures=3


In [39]:
# Imprimindo os coeficientes (o que o modelo aprendeu)
print("Coeficientes: " + str(modelo.coefficients))
print("Intercepto: " + str(modelo.intercept))

Coeficientes: [-0.005892548512961131,-0.011700245587835476,-0.19225314041309582]
Intercepto: 44.43383876145064


In [40]:
# Previsões com dados de teste
predictions = modelo.transform(dados_teste)

In [41]:
# Visualiza as previsões
predictions.select("features", "prediction").show()

+------------------+------------------+
|          features|        prediction|
+------------------+------------------+
|[4732.0,304.0,8.0]| 11.45539941611181|
|[4376.0,307.0,8.0]|13.518045949962467|
|[3664.0,350.0,8.0]| 17.21042993091387|
|[4456.0,350.0,8.0]|12.543531508648655|
|[3169.0,302.0,8.0]| 20.68885323304573|
|[3821.0,360.0,8.0]|16.168297358500617|
|[4100.0,350.0,8.0]|14.641278779262816|
|[4274.0,350.0,8.0]| 13.61597533800758|
|[4363.0,351.0,8.0]|13.079838274766203|
|[4464.0,400.0,8.0]|11.911378841153187|
|[4502.0,350.0,8.0]|12.272474277052446|
|[4735.0,440.0,8.0]| 9.846488370627306|
|[5140.0,400.0,8.0]|  7.92801604639147|
|[4077.0,318.0,8.0]|15.151215253871658|
|[4385.0,400.0,8.0]|12.376890173677118|
|[4425.0,455.0,8.0]|11.497674725827721|
|[3777.0,318.0,8.0]|16.918979807759996|
|[3892.0,304.0,8.0]|16.405140166999164|
|[4135.0,318.0,8.0]| 14.80944744011991|
|[3433.0,304.0,8.0]| 19.10981993444832|
+------------------+------------------+
only showing top 20 rows



In [42]:
# Coeficiente de determinação R2
avaliador = RegressionEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "r2")

In [43]:
# Resultado
avaliador.evaluate(predictions) 

0.7355596084362468

# Fim