<!-- Projeto Desenvolvido na Data Science Academy - www.datascienceacademy.com.br -->
# <font color='blue'>Data Science Academy</font>
## <font color='blue'>PySpark e Apache Kafka Para Processamento de Dados em Batch e Streaming</font>
## <font color='blue'>Projeto 3</font>
### <font color='blue'>Pipeline de Limpeza e Transformação Para Aplicações de IA com PySpark SQL</font>

## Pacotes Python Usados no Projeto

In [1]:
# Imports
import os
import pyspark
import pandas as pd
import numpy as np
import pyspark.sql.functions as F
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import  VectorAssembler
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import round, desc

In [2]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Data Science Academy"

Author: Data Science Academy



## Criando a Sessão Spark e Definindo o Nível de Log

In [3]:
# Cria a sessão Spark com YARN como gerenciador de recursos e especifica parâmetros do cluster
spark = SparkSession.builder \
    .appName('Projeto3-Exp') \
    .master('yarn') \
    .config('spark.submit.deployMode', 'client') \
    .config('spark.driver.memory', '4g') \
    .config('spark.executor.memory', '1g') \
    .config('spark.executor.cores', '2') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/21 21:31:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/21 21:31:07 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [4]:
# Define o nível de log
spark.sparkContext.setLogLevel("ERROR")

<!-- Projeto Desenvolvido na Data Science Academy - www.datascienceacademy.com.br -->
## Carregando os Datasets a Partir do HDFS

In [None]:
# Carrega o arquivo 1
df_dsa_aeroportos = spark.read.csv("/opt/spark/data/dataset1.csv", header = True)

[Stage 0:>                                                          (0 + 0) / 1]

In [None]:
type(df_dsa_aeroportos)

In [None]:
df_dsa_aeroportos.show(10)

In [None]:
# Carrega o arquivo 2
df_dsa_voos = spark.read.csv("/opt/spark/data/dataset2.csv", header = True)

In [None]:
df_dsa_voos.show(10)

In [None]:
# Carrega o arquivo 3
df_dsa_aeronaves = spark.read.csv("/opt/spark/data/dataset3.csv", header = True)

In [None]:
df_dsa_aeronaves.show(10)

Vamos converter esses dados para o formato:

- Dados de entrada --> ['month', 'air_time', 'carr_fact', 'dest_fact', 'plane_age'] como o vetor features.
- Dados de saída --> ['is_late'] com o nome label.

E então usaremos os dados nesse formato para treinar e avaliar dois modelos de Machine Learning. Escolheremos o melhor modelo e então criaremos o job de automação do processo de treinamento no cluster Spark.

## Exploração e Limpeza dos Dados

In [None]:
# Cria tabela temporária
df_dsa_voos.createOrReplaceTempView('voos')

Se você deseja executar consultas SQL diretamente sobre os dados, criar uma tabela temporária permite usar a sintaxe SQL para filtrar, agrupar e manipular os dados de forma que pode ser mais intuitiva ou mais fácil de expressar do que utilizando as APIs do DataFrame.

In [None]:
# Lista as tabelas
spark.catalog.listTables()

In [None]:
# Consulta SQL
query = """
SELECT 
    carrier AS companhia_aerea,
    COUNT(*) AS total_voos,
    ROUND(AVG(dep_delay), 2) AS media_atraso_partida,
    ROUND(AVG(arr_delay), 2) AS media_atraso_chegada,
    MAX(dep_delay) AS maior_atraso_partida,
    MAX(arr_delay) AS maior_atraso_chegada
FROM 
    voos
WHERE 
    dep_delay > 0 OR arr_delay > 0
GROUP BY 
    carrier
ORDER BY 
    media_atraso_chegada DESC
"""

In [None]:
# Executa a consulta SQL e armazena o resultado em um DataFrame
df_result = spark.sql(query)

In [None]:
# Mostra o resultado
df_result.show()

Criar um DataFrame diretamente a partir de outro DataFrame é mais direto e consome menos recursos do que criar uma tabela temporária intermediária. Se seu uso é apenas para operações simples ou manipulações diretas, é mais eficiente trabalhar com o DataFrame diretamente.

In [None]:
# Cria um dataframe a partir da tabela temporária
df_voos = spark.table('voos')

In [None]:
df_voos.show(10)

In [None]:
# Cria a coluna de duração dos voos em horas (tarefa de engenharia de atributos)
df_dsa_voos = df_dsa_voos.withColumn('duration_hrs', round(df_dsa_voos.air_time / 60, 2))

In [None]:
df_dsa_voos.show(10)

In [None]:
# Filtro para visualizar os voos mais longos
df_voos_longos_1 = df_dsa_voos.filter('distance > 1000')

In [None]:
df_voos_longos_1.show(10)

In [None]:
# Ordena o DataFrame pela coluna 'duration_hrs' em ordem decrescente
df_voos_longos_1_sorted = df_voos_longos_1.orderBy(desc('duration_hrs'))

In [None]:
# Exibe o resultado ordenado
df_voos_longos_1_sorted.show(10)

In [None]:
# Mesma regra anterior, com sintaxe diferente
# Filtra os voos com distância maior que 1000 e ordena pela coluna 'distance' em ordem descendente
df_voos_longos_2 = df_dsa_voos.filter(df_dsa_voos.distance > 1000).orderBy(desc('duration_hrs'))

In [None]:
df_voos_longos_2.show(10)

In [None]:
# Selecionando 3 colunas
selected_1 = df_dsa_voos.select('tailnum', 'origin', 'dest')

In [None]:
# Select de 3 colunas com outra sintaxe
temp = df_dsa_voos.select(df_dsa_voos.origin, df_dsa_voos.dest, df_dsa_voos.carrier)

In [None]:
# Criando 2 filtros
FilterA = df_dsa_voos.origin == 'SEA'
FilterB = df_dsa_voos.dest == 'PDX'

In [None]:
# Aplicando a função Filter com os filtros criados
selected_2 = temp.filter(FilterA).filter(FilterB)

In [None]:
selected_2.show()

In [None]:
# Calculando a velocidade média dos voos
avg_speed = (round(df_dsa_voos.distance / (df_dsa_voos.air_time / 60), 2)).alias("avg_speed")

In [None]:
df_dsa_voos.show(5)

In [None]:
# Adicionando a nova variável ao select
speed_1 = df_dsa_voos.select('origin', 'dest', 'tailnum', avg_speed)

In [None]:
speed_1.show()

In [None]:
# Fazendo o cálculo direto no select
speed_2 = df_dsa_voos.selectExpr('origin', 'dest', 'tailnum', 'round(distance/(air_time/60), 2) as avg_speed')

In [None]:
speed_2.show()

In [None]:
# Resumo de 2 variáveis
df_dsa_voos.describe('air_time', 'distance').show()

In [None]:
# Mostra o tipo de dados de cada coluna
df_dsa_voos.dtypes

In [None]:
# Ajustando o tipo de dado de duas colunas
df_dsa_voos = df_dsa_voos.withColumn('distance', df_dsa_voos.distance.cast('float'))
df_dsa_voos = df_dsa_voos.withColumn('air_time', df_dsa_voos.air_time.cast('float'))

In [None]:
# Mostra o tipo de dados de cada coluna
df_dsa_voos.dtypes

In [None]:
# Resumo de 2 variáveis
df_dsa_voos.describe('air_time', 'distance').show()

In [None]:
# Agrupamento por aeronave
by_plane = df_dsa_voos.groupBy('tailnum')

In [None]:
# Contagem
by_plane.count().show()

In [None]:
# Agrupamento por origem do voo
by_origin = df_dsa_voos.groupBy('origin')

In [None]:
# Média de tempo no ar por origem do voo
by_origin.avg('air_time').show()

In [None]:
# Resumo 
df_dsa_voos.describe('dep_delay').show()

In [None]:
# Ajustando o tipo de dado
df_dsa_voos = df_dsa_voos.withColumn('dep_delay', df_dsa_voos.dep_delay.cast('float'))

In [None]:
# Resumo 
df_dsa_voos.describe('dep_delay').show()

In [None]:
# Agrupamento por mês e destino do voo
by_month_dest = df_dsa_voos.groupBy('month', 'dest')

In [None]:
# Calculando a média
by_month_dest.avg('dep_delay').show()

In [None]:
df_dsa_aeroportos.show()

In [None]:
# Ajusta o título da coluna
df_dsa_aeroportos = df_dsa_aeroportos.withColumnRenamed('faa', 'dest')

In [None]:
df_dsa_aeronaves.show(5)

In [None]:
# Ajusta o título da coluna
df_dsa_aeronaves = df_dsa_aeronaves.withColumnRenamed('year', 'plane_year')

<!-- Projeto Desenvolvido na Data Science Academy - www.datascienceacademy.com.br -->
## Concatenando os Datasets e Preparando o Dataset Final

In [None]:
df_dsa_aeroportos.show(3)

In [None]:
df_dsa_voos.show(3)

In [None]:
df_dsa_aeronaves.show(3)

In [None]:
# Concatena 2 datasets
df_dsa_voos_aeroportos = df_dsa_voos.join(df_dsa_aeroportos, on='dest', how='leftouter')

**on='dest':** Este parâmetro especifica que o join deve ser feito com base na coluna dest (destino) presente nos dois DataFrames. Neste caso, está associando os dados de voos (df_dsa_voos) com os dados dos aeroportos (df_dsa_aeroportos) com base na coluna dest, que representa o aeroporto de destino dos voos.

**how='leftouter':** Especifica o tipo de join. O 'leftouter' (ou LEFT OUTER JOIN) significa que todos os registros do DataFrame à esquerda (df_dsa_voos) serão mantidos, e os registros correspondentes do DataFrame à direita (df_dsa_aeroportos) serão adicionados.
Se não houver correspondência no DataFrame da direita (df_dsa_aeroportos), os valores das colunas do DataFrame direito serão null.

In [None]:
df_dsa_voos_aeroportos.show(5)

In [None]:
# Concatena 2 datasets
df_dsa_final = df_dsa_voos_aeroportos.join(df_dsa_aeronaves, on='tailnum', how='leftouter')

In [None]:
df_dsa_final.show(10)

In [None]:
df_dsa_final.dtypes

In [None]:
# Ajusta o tipo de dado
df_dsa_final = df_dsa_final.withColumn('month', df_dsa_final.month.cast('integer'))
df_dsa_final = df_dsa_final.withColumn('air_time' , df_dsa_final.air_time.cast('integer'))
df_dsa_final = df_dsa_final.withColumn('arr_delay', df_dsa_final.arr_delay.cast('integer'))
df_dsa_final = df_dsa_final.withColumn('plane_year', df_dsa_final.plane_year.cast('integer'))

## Engenharia de Atributos

In [None]:
df_dsa_final.describe('month', 'air_time', 'arr_delay', 'plane_year').show()

In [None]:
# Cria uma variável com a idade do avião
df_dsa_final = df_dsa_final.withColumn('plane_age', df_dsa_final.year - df_dsa_final.plane_year)

In [None]:
df_dsa_final.select('month', 'air_time', 'arr_delay', 'plane_age').show(10)

In [None]:
# Cria a variável "is_late" somente para os casos onde o atraso na chegada foi maior do que zero
df_dsa_final = df_dsa_final.withColumn('is_late', df_dsa_final.arr_delay > 0)

In [None]:
df_dsa_final.select('month', 'air_time', 'arr_delay', 'plane_age', 'is_late').show(10)

In [None]:
# A variável alvo (label) será "is_late", ou seja, se o voo vai atrasar ou não
# Observe que o nome da coluna precisa ser "label" pois isso que o Spark espera como nome da coluna alvo
df_dsa_final = df_dsa_final.withColumn('label', df_dsa_final.is_late.cast('integer'))

In [None]:
df_dsa_final.select('month', 'air_time', 'arr_delay', 'plane_age', 'is_late', 'label').show(10)

## Pré-Processamento com String Indexer e One Hot Encoder

In [None]:
df_dsa_final.select('carrier', 'dest').show(10)

In [None]:
# Cria os indexadores StringIndexer
carr_indexer = StringIndexer(inputCol='carrier', outputCol='carrier_index')
dest_indexer = StringIndexer(inputCol='dest', outputCol='dest_index')

In [None]:
# Cria os codificadores OneHotEncoder
carr_encoder = OneHotEncoder(inputCol='carrier_index', outputCol='carr_fact')
dest_encoder = OneHotEncoder(inputCol='dest_index', outputCol='dest_fact')

In [None]:
# Cria o vector assembler apenas fazendo skip para qualquer registro inválido
# As variáveis de entrada estarão no vetor chamado features (tem que ser esse nome, o que é requerido pelo Spark)
vec_assembler = VectorAssembler(inputCols = ['month', 'air_time', 'carr_fact', 'dest_fact', 'plane_age'],
                                outputCol = 'features',
                                handleInvalid = "skip")

- Dados de entrada --> ['month', 'air_time', 'carr_fact', 'dest_fact', 'plane_age'] como o vetor features.
- Dados de saída --> ['is_late'] com o nome label.

## Criando o Pipeline de Pré-Processamento

In [None]:
# Cria o pipeline de transformação e pré-processamento
dsa_pipe = Pipeline(stages = [dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

In [None]:
# Treina e aplica o pipeline
piped_data = dsa_pipe.fit(df_dsa_final).transform(df_dsa_final)

In [None]:
piped_data.show(5)

In [None]:
# Dados de entrada e saída
piped_data.select("features", "label").show(truncate = False)

In [None]:
# Divide os dados em treino e teste com proporção 70/30
dados_treino, dados_teste = piped_data.randomSplit([.7, .3])

## Ajustando o Número de Partições

Se as partições dos dados forem muito grandes, o Spark pode ter dificuldades para distribuí-las eficientemente. Ajustar o tamanho das partições pode ajudar no processamento. Você pode utilizar o método repartition para dividir os dados em mais ou menos partições.

In [None]:
# Escolha um número de partições adequado ao tamanho do seu cluster e dos seus dados
dados_treino = dados_treino.repartition(10)  

## Pipeline de Treinamento do Modelo de IA com PySpark em Ambiente Distribuído

## Versão 1 do Modelo

In [None]:
%%time

# Inicializa o modelo RandomForest
modelo_dsa_rf = RandomForestClassifier()

# Avaliador para medir a métrica "areaUnderROC"
evaluator = BinaryClassificationEvaluator(metricName = 'areaUnderROC')

# Cria o grid de parâmetros
grid = ParamGridBuilder()

# Adiciona os hiperparâmetros ao grid
grid = grid.addGrid(modelo_dsa_rf.numTrees, [10, 50, 100])
grid = grid.addGrid(modelo_dsa_rf.maxDepth, [5, 10, 20])

# Constrói o grid
grid = grid.build()

# Cria o CrossValidator
cv = CrossValidator(estimator = modelo_dsa_rf,
                    estimatorParamMaps = grid,
                    evaluator = evaluator)

# Treina os modelos com validação cruzada
modelos = cv.fit(dados_treino)

# Extrai o melhor modelo
best_rf = modelos.bestModel

# Usa o modelo para prever o conjunto de teste
test_results_rf = best_rf.transform(dados_teste)

# Avalia as previsões
print(evaluator.evaluate(test_results_rf))

## Versão 2 do Modelo

In [None]:
%%time

# Incializa o modelo de Regressão Logística
modelo_dsa_rl = LogisticRegression()

# Avaliador para medir a métrica "areaUnderROC"
evaluator = evals.BinaryClassificationEvaluator(metricName = 'areaUnderROC')

# Cria o grid de parâmetros
grid = tune.ParamGridBuilder()

# Adiciona os hiperparâmetros ao grid
grid = grid.addGrid(modelo_dsa_rl.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(modelo_dsa_rl.elasticNetParam, [0,1])

# Constrói o grid
grid = grid.build()

# Cria o CrossValidator
cv = tune.CrossValidator(estimator = modelo_dsa_rl,
                         estimatorParamMaps = grid,
                         evaluator = evaluator)

# Treina os modelos com validação cruzada
modelos = cv.fit(dados_treino)

# Extrai o melhor modelo
best_lr = modelos.bestModel

# Usa o modelo para prever o conjunto de teste
test_results_rl = best_lr.transform(dados_teste)

# Avalia as previsões
print(evaluator.evaluate(test_results_rl))

## Salvando Dados e Modelo em Formato Parquet

In [None]:
# Salva o DataFrame de treino em formato Parquet
dados_treino.write.mode('overwrite').parquet('/opt/spark/data/dados_treino.parquet')

In [None]:
# Salva o DataFrame de teste em formato Parquet
dados_teste.write.mode('overwrite').parquet('/opt/spark/data/dados_teste.parquet')

In [None]:
# Salva o melhor modelo no disco
best_lr.write().overwrite().save('/opt/spark/data/dsa_melhor_modelo_lr')

In [None]:
# Verifica se os dados foram salvos no HDFS
!hdfs dfs -ls /opt/spark/data/ | awk '{print $1, $2, $3, $4, $8}'

In [None]:
%reload_ext watermark
%watermark -a "Data Science Academy"

# Fim