# <font color='White'>Projeto SX Negócios</font>
# <font color='White'>Estudo de Caso com Python e Spark</font>

## <font color='White'>Candidato: Alexandre Rodel de Almeida</font>

### <font color='White'>Análise Exploratória e Transformação de Dados do ENEM</font>

Qualquer detalhe a respeito do Projeto será colocado aqui como comentário.

![title](SX_logo.png)

In [1]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

Versão da Linguagem Python Usada Neste Jupyter Notebook: 3.9.7


In [2]:
# !pip install --upgrade pip

In [3]:
# https://pypi.org/project/findspark/
# !pip install -q findspark
#!pip install watermark
# !pip install matplotlib seaborn numpy pandas pyspark

In [4]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [5]:
# Imports
import pyspark
import pandas as pd
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

In [6]:
# Formatação das saídas
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)
from matplotlib.axes._axes import _log as matplotlib_axes_logger
matplotlib_axes_logger.setLevel('ERROR')

In [7]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "SolutionSXAlexandre" --iversions

Author: SolutionSXAlexandre

sys       : 3.9.7 (tags/v3.9.7:1016ef3, Aug 30 2021, 20:19:38) [MSC v.1929 64 bit (AMD64)]
numpy     : 2.0.2
findspark : 2.0.1
pandas    : 2.2.3
seaborn   : 0.13.2
matplotlib: 3.9.2
platform  : 1.0.8
py4j      : 0.10.9.7
decimal   : 1.70
pyspark   : 3.5.3



## Preparando o Ambiente Spark

Criando e conectando um SparkContext (cluster Spark).

In [8]:
# Definindo semente aleatória (seed) para reprodutibilidade do notebook
rnd_seed = 23
np.random.seed = rnd_seed
np.random.set_state = rnd_seed

In [9]:
# Criando o Spark Context
sc = SparkContext(appName = "transformacaoDados")

In [19]:
# Criando a sessão Spark
spark_session = SparkSession.Builder().getOrCreate()

In [20]:
# Visualiza o objeto spark_session
spark_session

## Carregando os Dados

In [None]:
# Carrega os dados a partir da sessão Spark
df_spark = spark_session.read.csv(path = '"D:\Documents\UNIVERSIDADE\CURSOS_APERFEICOAMENTO\DATA_SCIENCE\Testes_Empresas\SX_Negocios\DesafioSX\DADOS\MICRODADOS_ENEM_2020.csv"', 
                                  sep = ';',
                                  header = True, 
                                  inferSchema = True)

In [13]:
# Tipo do objeto
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [14]:
# Visualiza os dados
df_spark.show()

+------------+------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+---------+------------+----------------+-------------------+---------+---------+----------------------+------------------+---------------+------------------+--------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|NU_INSCRICAO|NU_ANO|TP_FAIXA_ETARIA|TP_SEXO|TP_ESTADO_CIVIL|TP_COR_RACA|TP_NACIONALIDADE|TP_ST_CONCLUSAO|

In [15]:
# Visualiza os metadados (schema)
df_spark.printSchema()

root
 |-- NU_INSCRICAO: long (nullable = true)
 |-- NU_ANO: integer (nullable = true)
 |-- TP_FAIXA_ETARIA: integer (nullable = true)
 |-- TP_SEXO: string (nullable = true)
 |-- TP_ESTADO_CIVIL: integer (nullable = true)
 |-- TP_COR_RACA: integer (nullable = true)
 |-- TP_NACIONALIDADE: integer (nullable = true)
 |-- TP_ST_CONCLUSAO: integer (nullable = true)
 |-- TP_ANO_CONCLUIU: integer (nullable = true)
 |-- TP_ESCOLA: integer (nullable = true)
 |-- TP_ENSINO: integer (nullable = true)
 |-- IN_TREINEIRO: integer (nullable = true)
 |-- CO_MUNICIPIO_ESC: integer (nullable = true)
 |-- NO_MUNICIPIO_ESC: string (nullable = true)
 |-- CO_UF_ESC: integer (nullable = true)
 |-- SG_UF_ESC: string (nullable = true)
 |-- TP_DEPENDENCIA_ADM_ESC: integer (nullable = true)
 |-- TP_LOCALIZACAO_ESC: integer (nullable = true)
 |-- TP_SIT_FUNC_ESC: integer (nullable = true)
 |-- CO_MUNICIPIO_PROVA: integer (nullable = true)
 |-- NO_MUNICIPIO_PROVA: string (nullable = true)
 |-- CO_UF_PROVA: integer 

In [16]:
# Verifica o número de linhas
df_spark.count()

5783109

## Data Wrangling com SparkSQL

Filtragem para a obtenção das informações importantes e pertinentes para que se possa prosseguir.

In [21]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when

## --------------------------------------------------- ##
# Funções Auxiliares
def tratar_valores_nulos(df_spark, colunas, valor_substituicao=0):
    """
    Substitui valores nulos em colunas específicas por um valor padrão.
    """
    for coluna in colunas:
        df_spark = df_spark.withColumn(coluna, when(col(coluna).isNull(), valor_substituicao).otherwise(col(coluna)))
    return df_spark


def substituir_valores(df_spark, substituicoes):
    """
    Substitui valores entre colunas de origem e destino conforme um mapeamento.
    """
    for destino, origem in substituicoes.items():
        df_spark = df_spark.withColumn(destino, when(col(origem).isNotNull(), col(origem)).otherwise(col(destino)))
    return df_spark


def verificar_nulos(tabela):
    """
    Verifica e exibe a contagem de valores nulos por coluna.
    """
    null_counts = tabela.select([F.count(F.when(col(c).isNull(), c)).alias(c) for c in tabela.columns])
    null_counts.show()


## --------------------------------------------------- ##
# Pré-processamento
# Definições de Colunas e Regras
colunas_a_tratar = ['TP_ENSINO', 'TP_DEPENDENCIA_ADM_ESC', 'TP_LOCALIZACAO_ESC', 'TP_SIT_FUNC_ESC']
substituicoes_relacionadas = {
    "CO_MUNICIPIO_ESC": "CO_MUNICIPIO_PROVA",
    "NO_MUNICIPIO_ESC": "NO_MUNICIPIO_PROVA",
    "CO_UF_ESC": "CO_UF_PROVA",
    "SG_UF_ESC": "SG_UF_PROVA"
}

# Tratando valores nulos
df_spark = tratar_valores_nulos(df_spark, colunas_a_tratar, valor_substituicao=0)
df_spark = substituir_valores(df_spark, substituicoes_relacionadas)

## --------------------------------------------------- ##
# Modelagem Dimensional
# Listas de Colunas para as Tabelas
tabelas_modelagem = {
    "tabelaDesempenho": [
        'NU_INSCRICAO', 'CO_MUNICIPIO_PROVA', 'CO_MUNICIPIO_ESC', 'NU_ANO', 'IN_TREINEIRO',
        'TP_PRESENCA_CN', 'TP_PRESENCA_CH', 'TP_PRESENCA_LC', 'TP_PRESENCA_MT',
        'NU_NOTA_CN', 'NU_NOTA_CH', 'NU_NOTA_LC', 'NU_NOTA_MT',
        'NU_NOTA_COMP1', 'NU_NOTA_COMP2', 'NU_NOTA_COMP3', 'NU_NOTA_COMP4', 'NU_NOTA_COMP5',
        'NU_NOTA_REDACAO'
    ],
    "tabelaAluno": [
        'NU_INSCRICAO', 'TP_FAIXA_ETARIA', 'TP_SEXO', 'TP_ESTADO_CIVIL', 'TP_COR_RACA', 'TP_NACIONALIDADE'
    ],
    "tabelaEscola": [
        'NU_INSCRICAO', 'CO_MUNICIPIO_ESC', 'NO_MUNICIPIO_ESC', 'CO_UF_ESC', 'SG_UF_ESC',
        'TP_ST_CONCLUSAO', 'TP_ANO_CONCLUIU', 'TP_ESCOLA', 'TP_ENSINO',
        'TP_DEPENDENCIA_ADM_ESC', 'TP_LOCALIZACAO_ESC', 'TP_SIT_FUNC_ESC'
    ],
    "tabelaProva": [
        'NU_INSCRICAO', 'CO_MUNICIPIO_PROVA', 'NO_MUNICIPIO_PROVA', 'CO_UF_PROVA', 'SG_UF_PROVA', 'TP_LINGUA'
    ]
}

# Criando as tabelas e registrando-as como views temporárias
for nome, colunas in tabelas_modelagem.items():
    df_temp = df_spark.select(colunas)
    globals()[nome] = df_temp  # Cria a variável com o nome da tabela
    df_temp.createOrReplaceTempView(nome)  # Registra como view temporária

# Verificando nulos em cada tabela
for nome in tabelas_modelagem.keys():
    print(f"Valores nulos na tabela {nome}:")
    verificar_nulos(globals()[nome])

## --------------------------------------------------- ##
# Conclusão
print("Processamento e modelagem concluídos!")

Valores nulos na tabela tabelaDesempenho:
+------------+------------------+----------------+------+------------+--------------+--------------+--------------+--------------+----------+----------+----------+----------+-------------+-------------+-------------+-------------+-------------+---------------+
|NU_INSCRICAO|CO_MUNICIPIO_PROVA|CO_MUNICIPIO_ESC|NU_ANO|IN_TREINEIRO|TP_PRESENCA_CN|TP_PRESENCA_CH|TP_PRESENCA_LC|TP_PRESENCA_MT|NU_NOTA_CN|NU_NOTA_CH|NU_NOTA_LC|NU_NOTA_MT|NU_NOTA_COMP1|NU_NOTA_COMP2|NU_NOTA_COMP3|NU_NOTA_COMP4|NU_NOTA_COMP5|NU_NOTA_REDACAO|
+------------+------------------+----------------+------+------------+--------------+--------------+--------------+--------------+----------+----------+----------+----------+-------------+-------------+-------------+-------------+-------------+---------------+
|           0|                 0|               0|     0|           0|             0|             0|             0|             0|   3185669|   3028969|   3028969|   3185669| 

In [22]:
# Consultas SQL nas tabelas usando spark.sql()

# Exemplo de consulta SQL para verificar o conteúdo da tabelaAluno
resultado_tabela_aluno = spark.sql("SELECT * FROM tabelaAluno LIMIT 5")
resultado_tabela_aluno.show()

# Exemplo de consulta SQL para verificar a tabelaEscola com uma condição
resultado_tabela_escola = spark.sql("SELECT * FROM tabelaEscola WHERE TP_ESCOLA = 'Pública'")
resultado_tabela_escola.show()

# Exemplo de consulta SQL para verificar o desempenho de alunos com nota CN maior que 500
resultado_desempenho = spark.sql("SELECT NU_INSCRICAO, NU_NOTA_CN FROM tabelaDesempenho WHERE NU_NOTA_CN > 500")
resultado_desempenho.show()


NameError: name 'spark' is not defined

In [None]:
## ANALISE DE DADOS, MODELAGEM E ETL UTILIZANDO "APACHE SPARK" ##

# from pyspark.sql import functions as F
# from pyspark.sql.functions import col, when

# ## --------------------------------------------------- ##
# # Funções Auxiliares
# def tratar_valores_nulos(df_spark, colunas, valor_substituicao=0):
#     """
#     Substitui valores nulos em colunas específicas por um valor padrão.
#     """
#     for coluna in colunas:
#         df_spark = df_spark.withColumn(coluna, when(col(coluna).isNull(), valor_substituicao).otherwise(col(coluna)))
#     return df_spark


# def substituir_valores(df_spark, substituicoes):
#     """
#     Substitui valores entre colunas de origem e destino conforme um mapeamento.
#     """
#     for destino, origem in substituicoes.items():
#         df_spark = df_spark.withColumn(destino, when(col(origem).isNotNull(), col(origem)).otherwise(col(destino)))
#     return df_spark


# def verificar_nulos(tabela):
#     """
#     Verifica e exibe a contagem de valores nulos por coluna.
#     """
#     null_counts = tabela.select([F.count(F.when(col(c).isNull(), c)).alias(c) for c in tabela.columns])
#     null_counts.show()


# ## --------------------------------------------------- ##
# # Pré-processamento
# # Definições de Colunas e Regras
# colunas_a_tratar = ['TP_ENSINO', 'TP_DEPENDENCIA_ADM_ESC', 'TP_LOCALIZACAO_ESC', 'TP_SIT_FUNC_ESC']
# substituicoes_relacionadas = {
#     "CO_MUNICIPIO_ESC": "CO_MUNICIPIO_PROVA",
#     "NO_MUNICIPIO_ESC": "NO_MUNICIPIO_PROVA",
#     "CO_UF_ESC": "CO_UF_PROVA",
#     "SG_UF_ESC": "SG_UF_PROVA"
# }

# # Tratando valores nulos
# df_spark = tratar_valores_nulos(df_spark, colunas_a_tratar, valor_substituicao=0)
# df_spark = substituir_valores(df_spark, substituicoes_relacionadas)

# # Substituindo valores relacionados a presença e notas
# # presenca_notas = {
# #     'NU_NOTA_CN': 'TP_PRESENCA_CN',
# #     'NU_NOTA_CH': 'TP_PRESENCA_CH',
# #     'NU_NOTA_MT': 'TP_PRESENCA_MT',
# #     'NU_NOTA_LC': 'TP_PRESENCA_LC'
# # }
# # for nota, presenca in presenca_notas.items():
# #     df_spark = df_spark.withColumn(nota, when(col(presenca).isNull(), 'NAO_COMPARECEU').otherwise(col(nota)))

# ## --------------------------------------------------- ##
# # Modelagem Dimensional
# # Listas de Colunas para as Tabelas
# tabelas_modelagem = {
#     "tabelaDesempenho": [
#         'NU_INSCRICAO', 'CO_MUNICIPIO_PROVA', 'CO_MUNICIPIO_ESC', 'NU_ANO', 'IN_TREINEIRO',
#         'TP_PRESENCA_CN', 'TP_PRESENCA_CH', 'TP_PRESENCA_LC', 'TP_PRESENCA_MT',
#         'NU_NOTA_CN', 'NU_NOTA_CH', 'NU_NOTA_LC', 'NU_NOTA_MT',
#         'NU_NOTA_COMP1', 'NU_NOTA_COMP2', 'NU_NOTA_COMP3', 'NU_NOTA_COMP4', 'NU_NOTA_COMP5',
#         'NU_NOTA_REDACAO'
#     ],
#     "tabelaAluno": [
#         'NU_INSCRICAO', 'TP_FAIXA_ETARIA', 'TP_SEXO', 'TP_ESTADO_CIVIL', 'TP_COR_RACA', 'TP_NACIONALIDADE'
#     ],
#     "tabelaEscola": [
#         'NU_INSCRICAO', 'CO_MUNICIPIO_ESC', 'NO_MUNICIPIO_ESC', 'CO_UF_ESC', 'SG_UF_ESC',
#         'TP_ST_CONCLUSAO', 'TP_ANO_CONCLUIU', 'TP_ESCOLA', 'TP_ENSINO',
#         'TP_DEPENDENCIA_ADM_ESC', 'TP_LOCALIZACAO_ESC', 'TP_SIT_FUNC_ESC'
#     ],
#     "tabelaProva": [
#         'NU_INSCRICAO', 'CO_MUNICIPIO_PROVA', 'NO_MUNICIPIO_PROVA', 'CO_UF_PROVA', 'SG_UF_PROVA', 'TP_LINGUA'
#     ]
# }

# # Criando as tabelas
# for nome, colunas in tabelas_modelagem.items():
#     globals()[nome] = df_spark.select(colunas)

# # Verificando nulos em cada tabela
# for nome in tabelas_modelagem.keys():
#     print(f"Valores nulos na tabela {nome}:")
#     verificar_nulos(globals()[nome])

# ## --------------------------------------------------- ##
# # Conclusão
# print("Processamento e modelagem concluídos!")


In [None]:
df_spark.show(20)

In [23]:
# Cria uma tabela temporária a partir do dataframe
# As tabelas temporárias são úteis quando você deseja que o conjunto de resultados fique visível 
# para todas as outras sessões Spark
df_spark.createOrReplaceTempView('dados_enem') 

In [24]:
# Executa uma consulta SQL
df_enem = spark_session.sql("select * from dados_enem")

In [25]:
type(df_enem)

pyspark.sql.dataframe.DataFrame

In [26]:
# Visualiza os dados
df_enem.show()

+------------+------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+---------+------------+----------------+--------------------+---------+---------+----------------------+------------------+---------------+------------------+--------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|NU_INSCRICAO|NU_ANO|TP_FAIXA_ETARIA|TP_SEXO|TP_ESTADO_CIVIL|TP_COR_RACA|TP_NACIONALIDADE|TP_ST_CONCLUSAO

In [28]:
# Número de registros
df_enem.count()

5783109

In [29]:
# Converte o dataframe do Spark para o Pandas. 
# Por quê? Porque isso vai facilitar a análise exploratória de dados.
df_pandas = df_enem.toPandas()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\spark\python\pyspark\errors\exceptions\captured.py", line 179, in deco
    return f(*a, **kw)
  File "C:\spark\python\lib\py4j-0.10.9.7-src.zip\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\spark\python\lib\py4j-0.10.9.7-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\alexa\AppData\Local\Programs\Python\Python39\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\spark\python\lib\py4j-0.10.

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [None]:
type(df_pandas)

## Análise Exploratória

Leia os manuais em pdf no Capítulo 11 do curso com detalhes sobre a forma ideal de fazer análise exploratória.

In [None]:
# Heatmap para visualizar a correlação
corr = df_pandas.corr()
f,ax = plt.subplots(figsize = (10, 10))
sns.heatmap(corr, annot = True, linewidths = .5, fmt = '.1f', ax = ax)

Idealmente queremos alta correlação entre as variáveis de entrada e a variável de saída e baixa correlação entre as variáveis de entrada!

In [None]:
# Scatter Plot Volume Bitcoin x Volume Moeda
plt.figure(figsize = (12,5))
sns.set(style = 'whitegrid')
df_pandas.plot(kind = 'scatter', x = 'VolBTC', y = 'VolCurrency')
plt.xlabel('Volume Bitcoin')            
plt.ylabel('Volume Moeda')
plt.title('Scatter Plot Volume Bitcoin x Volume Moeda') 
plt.show()

In [None]:
# Line Plot Cotação Open x High 
plt.figure(figsize = (16,5))
df_pandas.Open.plot(kind = 'line', 
                    color = 'r', 
                    label = 'Open', 
                    alpha = 0.5, 
                    linewidth = 5, 
                    grid = True, 
                    linestyle = ':')
df_pandas.High.plot(color = 'g', 
                    label = 'High', 
                    linewidth = 1, 
                    alpha = 0.5, 
                    grid = True, 
                    linestyle = '-.')
plt.legend(loc = 'upper left') 
plt.xlabel('Tempo')
plt.ylabel('Cotação')
plt.title('Line Plot Cotação Open x High ')
plt.show()

In [None]:
# Histograma da cotação de abertura
df_pandas.Open.plot(kind = 'hist', bins = 50)

In [None]:
# Plot do valor ponderado da cotação (variável alvo) por hora
plt.plot(hour, weighted_price , 'g*')
plt.xlabel('Hora')            
plt.ylabel('Valor Ponderado da Cotação')
plt.title('Valor Ponderado da Cotação Por Hora') 
plt.show()

In [None]:
# Plot do valor ponderado da cotação por dia da semana
plt.plot(date_of_week, weighted_price, 'b*')
plt.xlabel('Dia da Semana')            
plt.ylabel('Valor Ponderado da Cotação')
plt.title('Valor Ponderado da Cotação Por Dia da Semana') 
plt.show()

In [None]:
# Plot do VolBTC por hora
plt.plot(hour, volume_BTC, 'r*')
plt.xlabel('Hora')            
plt.ylabel('VolBTC')
plt.title('Volume Negociado de Bitcoin Por Hora') 
plt.show()

In [None]:
# Plot de VolBTC por dia da semana
plt.plot(date_of_week, volume_BTC, 'yo')
plt.xlabel('Dia da Semana')            
plt.ylabel('VolBTC')
plt.title('Volume Negociado de Bitcoin Por Dia da Semana') 
plt.show()

In [None]:
# Plot do valor ponderado da cotação por ano
plt.plot(year, weighted_price , 'm^')
plt.xlabel('Ano')            
plt.ylabel('Valor Ponderado da Cotação')
plt.title('Valor Ponderado da Cotação Por Ano') 
plt.show()

In [None]:
# Plot do Volume por ano
plt.plot(year, volume_BTC , 'kD')
plt.xlabel('Ano')            
plt.ylabel('volume BTC')
plt.title('Volume de BTC Negociado Por Ano') 
plt.show()

## Engenharia de Atributos com PySpark

In [None]:
df_enem.printSchema()

In [None]:
# Prepara o vetor de atributos
assembler = VectorAssembler(inputCols = ['Open', 'VolBTC', 'VolCurrency'], 
                            outputCol = "features")

In [None]:
# Cria o dataframe do vetor de atributos
df_assembled = assembler.transform(df_enem)

In [None]:
# Visualiza os dados
df_assembled.show(10, truncate = False)

## Normalização

In [None]:
# Divisão em dados de treino e teste
dados_treino, dados_teste = df_assembled.randomSplit([.7,.3], seed = rnd_seed)

In [None]:
type(dados_treino)

In [None]:
# Cria o scaler
scaler = MinMaxScaler(inputCol = "features", outputCol = "scaled_features")

In [None]:
# Fit nos dados de treino
scalerModel = scaler.fit(dados_treino)

In [None]:
# Fit e transform nos dados de treino
dados_treino_scaled = scalerModel.transform(dados_treino)

In [None]:
# Transform nos dados de teste
dados_teste_scaled = scalerModel.transform(dados_teste)

In [None]:
dados_treino_scaled.select("features", "scaled_features").show(10, truncate = False)

In [None]:
dados_treino_scaled.columns

## Machine Learning

> Versão 1 do Modelo (Benchmark)

In [None]:
# Cria o modelo de regressão
modelo_lr_v1 = (LinearRegression(featuresCol = 'scaled_features', 
                                 labelCol = "Weighted_Price", 
                                 predictionCol = 'Predicted_price', 
                                 maxIter = 100, 
                                 regParam = 0.3, 
                                 elasticNetParam = 0.8, 
                                 standardization = False))

In [None]:
# Treina o modelo
modelo_v1 = modelo_lr_v1.fit(dados_treino_scaled)

Se tiver mensagem de WARN, isso indica que o Spark não encontrou a biblioteca de otimização de álgebra linear (que precisa ser instalada, mas não é requerida para este projeto). Mais detalhes aqui:

https://spark.apache.org/docs/latest/ml-linalg-guide.html

In [None]:
# Salva o modelo em disco
modelo_v1.write().overwrite().save("modelos/modelo_v1")

## Avaliação do Modelo

In [None]:
# Previsões com dados de teste
previsoes_v1 = modelo_v1.transform(dados_teste_scaled)

In [None]:
# Seleciona as colunas
pred_data_v1 = previsoes_v1.select("Predicted_price", "Weighted_Price").show(10)

In [None]:
# Mean Absolute Error
print("Mean Absolute Error (MAE) nos dados de teste: {0}".format(modelo_v1.summary.meanAbsoluteError))

In [None]:
# Cria um avaliador para o modelo de regressão
evaluator = RegressionEvaluator(labelCol = "Weighted_Price", 
                                predictionCol = "Predicted_price", 
                                metricName = "rmse")

In [None]:
# Aplica o avaliador
rmse_v1 = evaluator.evaluate(previsoes_v1)
print("Root Mean Squared Error (RMSE) nos dados de teste = %g" % rmse_v1)

In [None]:
# Extrai as previsões
pred_results_v1 = modelo_v1.evaluate(dados_teste_scaled)

In [None]:
# Valores reais de Y sendo convertidos para o formato do Pandas
Y = pred_results_v1.predictions.select('Weighted_Price').toPandas()

In [None]:
# Valores previstos de Y sendo convertidos para o formato do Pandas
_Y = pred_results_v1.predictions.select("Predicted_price").toPandas()

In [None]:
# Distribuição dos valores reais x valores previstos
sns.set_style("dark")
ax1 = sns.displot(Y, color = "r", label = "Valores Reais")
sns.displot(_Y, color = "b", label = "Valores Previstos")

In [None]:
# Plot dos valores reais x valores previstos
plt.figure(figsize = (12,7))
plt.plot(Y, color = 'green', marker = '*', linestyle = 'dashed', label = 'Predicted Price')
plt.plot(_Y, color = 'red', label = 'Weighted Price')
plt.title('Resultado do Modelo')
plt.xlabel('Valor Real')
plt.ylabel('Valor Previsto')
plt.legend()

> Versão 2 do Modelo (Otimização de Hiperparâmetros)

In [None]:
# Cria o modelo
modelo_lr_v2 = (LinearRegression(featuresCol = 'scaled_features', 
                                 labelCol = "Weighted_Price", 
                                 predictionCol = 'Predicted_price'))

In [None]:
# Cria um grid para otimização de hiperparâmetros
grid = ParamGridBuilder().addGrid(modelo_lr_v2.maxIter, [50, 100]).build()

In [None]:
# Cria o avaliador (será usado na validação cruzada)
evaluator = RegressionEvaluator(labelCol = "Weighted_Price", 
                                predictionCol = "Predicted_price", 
                                metricName = "rmse")

In [None]:
# Cria o CrossValidator
cv = CrossValidator(estimator = modelo_lr_v2, estimatorParamMaps = grid, evaluator = evaluator, parallelism = 2)

In [None]:
# Treina o CrossValidator
cvModel = cv.fit(dados_treino_scaled)

In [None]:
# Extrai o melhor modelo do CrossValidator
modelo_v2 = cvModel.bestModel

In [None]:
# Salva o modelo em disco
modelo_v2.write().overwrite().save("modelos/modelo_v2")

## Avaliação do Modelo

In [None]:
# Previsões com dados de teste
previsoes_v2 = modelo_v2.transform(dados_teste_scaled)

In [None]:
# Seleciona as colunas
pred_data_v2 = previsoes_v2.select("Predicted_price", "Weighted_Price").show(10)

In [None]:
# Mean Absolute Error
print("MAE: {0}".format(modelo_v2.summary.meanAbsoluteError))

In [None]:
evaluator = RegressionEvaluator(labelCol = "Weighted_Price", 
                                predictionCol = "Predicted_price", 
                                metricName = "rmse")

In [None]:
# Aplica o avaliador
rmse_v2 = evaluator.evaluate(previsoes_v2)
print("Root Mean Squared Error (RMSE) nos dados de teste = %g" % rmse_v2)

In [None]:
# Plot dos valores reais x valores previstos

# Extrai as previsões
pred_results_v2 = modelo_v2.evaluate(dados_teste_scaled)

# Valores reais de Y sendo convertidos para o formato do Pandas
Y = pred_results_v2.predictions.select('Weighted_Price').toPandas()

# Valores previstos de Y sendo convertidos para o formato do Pandas
_Y = pred_results_v2.predictions.select("Predicted_price").toPandas()

# Plot
sns.set_style("dark")
ax1 = sns.displot(Y, color = "r", label = "Valores Reais")
sns.displot(_Y, color = "b", label = "Valores Previstos")

In [None]:
# Plot dos valores reais x valores previstos
plt.figure(figsize = (12,7))
plt.plot(Y, color = 'green', marker = '*', linestyle = 'dashed', label = 'Predicted Price')
plt.plot(_Y, color = 'red', label = 'Weighted Price')
plt.title('Resultado do Modelo')
plt.xlabel('Valor Real')
plt.ylabel('Valor Previsto')
plt.legend()

As mensagens de WARN no treinamento do modelo_v2 indicam que o modelo parece instável e talvez esteja com overfitting. Usaremos o modelo_v1.

## Previsões em Tempo Real

In [None]:
# Novos dados
novos_dados = [[20546.29, 3422.57, 72403082.02], [21620.85, 3271.14, 71319207.5]]

In [None]:
# Prepara o dataframe do Pandas
df_novos_dados = pd.DataFrame(novos_dados, columns = ['Open', 'VolBTC', 'VolCurrency'])

In [None]:
# Visualiza
df_novos_dados

In [None]:
# Converte o dataframe do Pandas para dataframe do Spark
df_novos_dados_spark = spark_session.createDataFrame(df_novos_dados) 

In [None]:
# Schema
df_novos_dados_spark.printSchema()

In [None]:
# Visualiza
df_novos_dados_spark.show()

In [None]:
# Cria o dataframe do vetor de atributos
df_assembled = assembler.transform(df_novos_dados_spark)

In [None]:
# Visualiza os dados
df_assembled.show()

In [None]:
# Normaliza os dados
df_assembled_scaled = scalerModel.transform(df_assembled)

In [None]:
# Previsões com os novos dados
previsoes = modelo_v1.transform(df_assembled_scaled)

In [None]:
# Imprime as previsões
pred_data = previsoes.select("Predicted_price").show()

In [None]:
# Encerra a sessão Spark
spark_session.stop()

# Fim