<a href="https://colab.research.google.com/github/Erasnilson/Trat-dados-via-PySpark/blob/main/Trat_dados_via_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Tratamento de dados via PySpark
- Nesse exemplo utilizaremos uma base de dados da Agência Nacional de Aviação Civil (ANAC).

- Fluxo de voos no mês 06-2024



## Java e Spark

O Apache Spark é uma plataforma de processamento de dados em larga escala, projetada para ser rápida e eficiente. Ele é amplamente utilizado em várias áreas, permite a análise de grandes volumes de dados em paralelo. As tarefas podem ser distribuídas por vários nós de uma rede, o que acelera o processamento.


In [None]:
#download do Java via colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
#download do spark via colab
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

In [None]:
#descompactando o spark
!tar xf spark-3.5.1-bin-hadoop3.tgz

### Instalando o findspark

A biblioteca findspark é utilizada para realizar a configuração do pyspark.
O findspark permite que você especifique o caminho de instalação do Spark manualmente dentro do seu código, sem precisar configurar variáveis de ambiente do sistema.

In [None]:
#instalando o findspark
!pip install -q findspark

In [None]:
#importando as bilbiotecas
import findspark
import os

In [None]:
#configurando a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

### Iniciando uma sessão do spark em modo local

In [None]:
!pip install pyspark==3.4.0

Collecting pyspark==3.4.0
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317122 sha256=7e428f702b00684b0ccd6b809f36aeb4da9996d2e4092e17a793e5f00d4978f4
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
#informando a pasta raíz do spark
findspark.init("spark-3.5.1-bin-hadoop3")

In [None]:
#iniciando uma sessão do Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


In [None]:
#visualizando a versão do Spark
spark

In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

In [None]:
#iniciando o objeto SQLContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)



## Dataset [Sirios-ANAC](https://siros.anac.gov.br/siros/registros/diversos/vra/2024/)

## Base de dados da Agência Nacional de Aviação Civil (ANAC).

 - Fluxo de voos no mês 06-2024
 - Aerpoporto de Guarulhos



In [None]:
!wget https://siros.anac.gov.br/siros/registros/diversos/vra/2024/VRA_2024_06.csv

--2024-08-11 23:32:47--  https://siros.anac.gov.br/siros/registros/diversos/vra/2024/VRA_2024_06.csv
Resolving siros.anac.gov.br (siros.anac.gov.br)... 189.84.138.178
Connecting to siros.anac.gov.br (siros.anac.gov.br)|189.84.138.178|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22207071 (21M) [application/octet-stream]
Saving to: ‘VRA_2024_06.csv’


2024-08-11 23:33:13 (896 KB/s) - ‘VRA_2024_06.csv’ saved [22207071/22207071]



In [None]:
#realizando o download da pasta
#!wget -r -np -nH --cut-dirs=4 -A csv https://siros.anac.gov.br/siros/registros/diversos/vra/2023/
#descompactando o arquivo de dados em zip
#!unzip 2019.zip

In [None]:
data_voo = sqlContext.read.load("VRA_2024_06.csv", #pasta "2023/*"
                     format='csv',
                     header='true',
                     #lineSep='',#\r\n
                     inferSchema='true',
                     delimiter=";",
                     multiLine = 'true',
                     dateFormat = "dd/MM/yyyy",
                     timestampFormat = "dd/MM/yyyy HH:mm")

In [None]:
# tratando a cofiguração data-hora dentro de cada pasta (por ano)
data_voo.show(5)
data_voo.printSchema()

+------------------------+--------------------+----------+---------+-----------------+------------------+------------------+---------------------------+--------------------------+-------------------+-------------------+----------------------------+---------------------------+-------------------+-------------------+------------+-------------+----------+----------------+----------------+
|Sigla ICAO Empresa Aérea|       Empresa Aérea|Número Voo|Código DI|Código Tipo Linha|Modelo Equipamento|Número de Assentos|Sigla ICAO Aeroporto Origem|Descrição Aeroporto Origem|   Partida Prevista|       Partida Real|Sigla ICAO Aeroporto Destino|Descrição Aeroporto Destino|   Chegada Prevista|       Chegada Real|Situação Voo|Justificativa|Referência|Situação Partida|Situação Chegada|
+------------------------+--------------------+----------+---------+-----------------+------------------+------------------+---------------------------+--------------------------+-------------------+-------------------+---


# A variável "referência"  foi classificada como string, deve ser classificada como data.

In [None]:
# Tornando a variável referencia no mesmo padrão dos outros anos
from pyspark.sql.functions import *
#data_voov2=data_voo.withColumn('Referência', substring('Referência', 1,10))

# deixar a variavel referencia no mesmo padrao das demais
data_voov2 = data_voo.withColumn('Referência', to_date(data_voo['Referência'], 'yyyy-MM-dd'))

In [None]:
# tratando a cofiguração data-hora dentro de cada pasta (por ano)
data_voov2.show(5)
data_voov2.printSchema()

+------------------------+--------------------+----------+---------+-----------------+------------------+------------------+---------------------------+--------------------------+-------------------+-------------------+----------------------------+---------------------------+-------------------+-------------------+------------+-------------+----------+----------------+----------------+
|Sigla ICAO Empresa Aérea|       Empresa Aérea|Número Voo|Código DI|Código Tipo Linha|Modelo Equipamento|Número de Assentos|Sigla ICAO Aeroporto Origem|Descrição Aeroporto Origem|   Partida Prevista|       Partida Real|Sigla ICAO Aeroporto Destino|Descrição Aeroporto Destino|   Chegada Prevista|       Chegada Real|Situação Voo|Justificativa|Referência|Situação Partida|Situação Chegada|
+------------------------+--------------------+----------+---------+-----------------+------------------+------------------+---------------------------+--------------------------+-------------------+-------------------+---

Uma das estratégias ofertadas pelo Spark, com foco em aumentar o desempenho da execução de operações sobre os dados é o método cache(), que permite guardar em memória os resultados dos dados obtidos por operações, para serem utilizados em consultas posteriores.

In [None]:
spark

In [None]:
#criando um cache dos dados
df_train = data_voov2.cache()

## Organizando a base de dados

In [None]:
# deletar coluna que possui null
# eliminando colunas que sem "data de partidas"/"chegada do voo" (Referência = data prevista sem horas)
df_train = df_train.dropna(subset=('Partida Real','Chegada Real','Referência'))
df_train.show(5)

+------------------------+--------------------+----------+---------+-----------------+------------------+------------------+---------------------------+--------------------------+-------------------+-------------------+----------------------------+---------------------------+-------------------+-------------------+------------+-------------+----------+----------------+----------------+
|Sigla ICAO Empresa Aérea|       Empresa Aérea|Número Voo|Código DI|Código Tipo Linha|Modelo Equipamento|Número de Assentos|Sigla ICAO Aeroporto Origem|Descrição Aeroporto Origem|   Partida Prevista|       Partida Real|Sigla ICAO Aeroporto Destino|Descrição Aeroporto Destino|   Chegada Prevista|       Chegada Real|Situação Voo|Justificativa|Referência|Situação Partida|Situação Chegada|
+------------------------+--------------------+----------+---------+-----------------+------------------+------------------+---------------------------+--------------------------+-------------------+-------------------+---

# Criando novas variáveis a partir da data de Referência

In [None]:
# subdividir uma variavel por carachter
from pyspark.sql.types import IntegerType

df_train=df_train.withColumn('year', substring('Referência', 1,4).cast(IntegerType()))\
.withColumn('month', substring('Referência', 6,2).cast(IntegerType()))\
.withColumn('day', substring('Referência', 9,2).cast(IntegerType()))

df_train.show(5)
df_train.printSchema()
# 4 digitos (incluíndo 1) = ano  ex: 2024-06-01
# 2 digitos (incluíndo 6) = mês
# 2 digitos (incluíndo 9) = mês

+------------------------+--------------------+----------+---------+-----------------+------------------+------------------+---------------------------+--------------------------+-------------------+-------------------+----------------------------+---------------------------+-------------------+-------------------+------------+-------------+----------+----------------+----------------+----+-----+---+
|Sigla ICAO Empresa Aérea|       Empresa Aérea|Número Voo|Código DI|Código Tipo Linha|Modelo Equipamento|Número de Assentos|Sigla ICAO Aeroporto Origem|Descrição Aeroporto Origem|   Partida Prevista|       Partida Real|Sigla ICAO Aeroporto Destino|Descrição Aeroporto Destino|   Chegada Prevista|       Chegada Real|Situação Voo|Justificativa|Referência|Situação Partida|Situação Chegada|year|month|day|
+------------------------+--------------------+----------+---------+-----------------+------------------+------------------+---------------------------+--------------------------+-------------

In [None]:
# Filtrar a base de dados que será utilizada (considerando uma pré-análise)
df_train.filter( (col('Situação Voo') == "REALIZADO") & (col('Sigla ICAO Aeroporto Origem') == "SBGR") & (col('year')==2024) ).show(5)
# SBGR - Guarulhos

+------------------------+--------------------+----------+---------+-----------------+------------------+------------------+---------------------------+--------------------------+-------------------+-------------------+----------------------------+---------------------------+-------------------+-------------------+------------+-------------+----------+----------------+----------------+----+-----+---+
|Sigla ICAO Empresa Aérea|       Empresa Aérea|Número Voo|Código DI|Código Tipo Linha|Modelo Equipamento|Número de Assentos|Sigla ICAO Aeroporto Origem|Descrição Aeroporto Origem|   Partida Prevista|       Partida Real|Sigla ICAO Aeroporto Destino|Descrição Aeroporto Destino|   Chegada Prevista|       Chegada Real|Situação Voo|Justificativa|Referência|Situação Partida|Situação Chegada|year|month|day|
+------------------------+--------------------+----------+---------+-----------------+------------------+------------------+---------------------------+--------------------------+-------------

# Filtros em pyspark/pandas


In [None]:
# Demais filtros
#df_train.filter( (col('Situação Voo') == "REALIZADO") & (col('Sigla ICAO Aeroporto Origem') == "SBGR") & (col('year')==2024) ).toPandas().head(5)
#df_train.groupBy('year','month').agg({"Referência": "count"}).sort('year','month').toPandas()
#df_train.groupBy('Situação Chegada',"Empresa Aérea","DiffInHoursR").count().show()
#df_train.groupBy('Situação Chegada',"Empresa Aérea","Data_Ref").agg({"horas_voo": "avg"}).sort("Empresa Aérea").show()
#'Sigla ICAO Aeroporto Destino'Referência .sort('year','month')
#.sort("Situação Voo", "Número de Assentos")
#.agg({"salary": "avg", "age": "max"}).show()

## Groupby

In [None]:
from pandas.core.indexes.api import safe_sort_index
from numpy import select
from pandas.core.groupby import groupby

# Voos por dia em Guarulhos
df_train.groupBy("Referência").agg({"Referência": "count"}).sort('Referência').show(5)

# Renomear as variáveis
from pyspark.sql.functions import *
#df_train.withColumnRenamed("Referência","Data").withColumnRenamed("count(Referência)","n")

+----------+-----------------+
|Referência|count(Referência)|
+----------+-----------------+
|2024-06-01|             1886|
|2024-06-02|             2365|
|2024-06-03|             2570|
|2024-06-04|             2563|
|2024-06-05|             2555|
+----------+-----------------+
only showing top 5 rows



In [None]:
# Converter o DataFrame PySpark para um DataFrame Pandas
pd_df = df_train.toPandas()

In [None]:
!pip install unidecode
import unidecode

pd_df.columns = (
    pd_df.columns
    .map(unidecode.unidecode)  # Remove a acentuação de todas as letras
    .str.replace('[^A-Za-z0-9_]', '', regex=True)  # Remove caracteres especiais, mantendo letras e números
)
pd_df.columns

Collecting unidecode
  Downloading Unidecode-1.3.8-py3-none-any.whl.metadata (13 kB)
Downloading Unidecode-1.3.8-py3-none-any.whl (235 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/235.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 kB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.8


Index(['SiglaICAOEmpresaAerea', 'EmpresaAerea', 'NumeroVoo', 'CodigoDI',
       'CodigoTipoLinha', 'ModeloEquipamento', 'NumerodeAssentos',
       'SiglaICAOAeroportoOrigem', 'DescricaoAeroportoOrigem',
       'PartidaPrevista', 'PartidaReal', 'SiglaICAOAeroportoDestino',
       'DescricaoAeroportoDestino', 'ChegadaPrevista', 'ChegadaReal',
       'SituacaoVoo', 'Justificativa', 'Referencia', 'SituacaoPartida',
       'SituacaoChegada', 'year', 'month', 'day'],
      dtype='object')

In [None]:


# Filtrar os dados com as condições desejadas e mostrar as 5 primeiras linhas
filtered_df = pd_df[(pd_df['SituacaoVoo'] == "REALIZADO") &
                    (pd_df['SiglaICAOAeroportoOrigem'] == "SBGR") &
                    (pd_df['year'] == 2024)]



In [None]:
#get info about the dataset:
print(filtered_df.info())
print("-"*25)
# to know more about the shape of the dataset:
print("shape of the dataset -->>",np.shape(filtered_df))

<class 'pandas.core.frame.DataFrame'>
Index: 11146 entries, 2 to 75195
Data columns (total 23 columns):
 #   Column                     Non-Null Count  Dtype         
---  ------                     --------------  -----         
 0   SiglaICAOEmpresaAerea      11146 non-null  object        
 1   EmpresaAerea               11146 non-null  object        
 2   NumeroVoo                  11146 non-null  object        
 3   CodigoDI                   11146 non-null  object        
 4   CodigoTipoLinha            11146 non-null  object        
 5   ModeloEquipamento          11146 non-null  object        
 6   NumerodeAssentos           11146 non-null  int32         
 7   SiglaICAOAeroportoOrigem   11146 non-null  object        
 8   DescricaoAeroportoOrigem   11146 non-null  object        
 9   PartidaPrevista            10919 non-null  datetime64[ns]
 10  PartidaReal                11146 non-null  datetime64[ns]
 11  SiglaICAOAeroportoDestino  11146 non-null  object        
 12  Descricao

In [None]:
#check if there null values in data:
filtered_df.isna().mean()


Unnamed: 0,0
SiglaICAOEmpresaAerea,0.0
EmpresaAerea,0.0
NumeroVoo,0.0
CodigoDI,0.0
CodigoTipoLinha,0.0
ModeloEquipamento,0.0
NumerodeAssentos,0.0
SiglaICAOAeroportoOrigem,0.0
DescricaoAeroportoOrigem,0.0
PartidaPrevista,0.020366


# Quantitativo de cada item em cada coluna:

In [None]:
# Especificar as colunas que você deseja iterar
variaveis = ['EmpresaAerea', 'ModeloEquipamento', 'DescricaoAeroportoOrigem']

# Loop para contar itens nas colunas especificadas
for col in variaveis:
    print(f"Counts of items in {col} -->> {filtered_df[col].value_counts()}") #\n
    print("-" * 25)

Counts of items in EmpresaAerea -->> EmpresaAerea
TAM LINHAS AÉREAS S.A.                                       5314
GOL LINHAS AÉREAS S.A. (EX- VRG LINHAS AÉREAS S.A.)          2473
AZUL LINHAS AÉREAS BRASILEIRAS S/A                            745
PASSAREDO TRANSPORTES AÉREOS S.A.                             264
LATAM AIRLINES GROUP (EX - LAN AIRLINES S/A)                  211
SIDERAL LINHAS AÉREAS LTDA.                                   193
COMPAÑIA PANAMEÑA DE AVIACION S.A. (COPA AIRLINES)            160
AMERICAN AIRLINES, INC.                                       119
UNITED AIRLINES, INC                                          117
AEROLINEAS ARGENTINAS S/A                                     112
TAP - TRANSPORTES AÉREOS PORTUGUESES S/A                       92
AEROVIAS DEL CONTINENTE AMERICANO S.A. AVIANCA                 90
DELTA AIR LINES INC.                                           90
TURKISH AIRLINES INC                                           85
QATAR AIRWAYS GROUP       

In [None]:
# Extract all Types of columns from the data :
for col in filtered_df.columns:
    obj_cols=filtered_df.select_dtypes("object").columns

    num_cols=filtered_df.select_dtypes("number").columns

    disc_cols=filtered_df.select_dtypes(include=("int64")).columns

    conts_cols=filtered_df.select_dtypes(include=("float64")).columns

    Nomial_cols=filtered_df.nunique()[filtered_df.nunique()<3]

print(f"object_columns ----->>> {obj_cols}\n Numerical_columns ----->>> {num_cols} \n Discrete_columns ---->>> {disc_cols} \n Continous_cols ----->> {conts_cols} \n Nominal Columns --->> {Nomial_cols}")


object_columns ----->>> Index(['SiglaICAOEmpresaAerea', 'EmpresaAerea', 'NumeroVoo', 'CodigoDI',
       'CodigoTipoLinha', 'ModeloEquipamento', 'SiglaICAOAeroportoOrigem',
       'DescricaoAeroportoOrigem', 'SiglaICAOAeroportoDestino',
       'DescricaoAeroportoDestino', 'SituacaoVoo', 'Justificativa',
       'Referencia', 'SituacaoPartida', 'SituacaoChegada'],
      dtype='object')
 Numerical_columns ----->>> Index(['NumerodeAssentos', 'year', 'month', 'day'], dtype='object') 
 Discrete_columns ---->>> Index([], dtype='object') 
 Continous_cols ----->> Index([], dtype='object') 
 Nominal Columns --->> SiglaICAOAeroportoOrigem    1
DescricaoAeroportoOrigem    1
SituacaoVoo                 1
Justificativa               0
year                        1
month                       1
dtype: int64
