# Data engineering capstone - Eleflow

**Noah Diunkz**

[Github](https://github.com/diunkz)

[LinkedIn](http://linkedin.com/in/diunkz|)

#### Instalando dependências

In [1]:
!pip install pandas requests pendulum pandasql



#### imports necessários

In [2]:
import os
import json
import pendulum
import requests
import pandas as pd
from pandasql import sqldf

## Carregar os dados de VRA
- Normalizar o cabeçalho para snake case
- Salvar estes dados

In [3]:
# Função para percorrer o diretório, pegar todos os arquivos e carregar as informações
def ler_jsons(diretorio):
    arquivos_json = [arquivo for arquivo in os.listdir(diretorio) if arquivo.endswith('.json')]

    for nome_arquivo in arquivos_json:
        caminho_arquivo = os.path.join(diretorio, nome_arquivo)
        with open(caminho_arquivo, 'r', encoding='utf-8-sig') as arquivo:
            yield json.load(arquivo)

In [4]:
# Nome das colunas em snake_case
nomes_colunas_vra = ['icao_empresa_aerea', 'numero_voo', 'codigo_autorizacao', 'codigo_tipo_linha',
                     'icao_aerodromo_origem', 'icao_aerodromo_destino', 'partida_prevista',
                     'partida_real', 'chegada_prevista', 'chegada_real', 'situacao_voo', 'codigo_justificativa']


In [5]:
# Diretório onde estão os arquivos JSON
diretorio_vra = './data_engineer_test/VRA'

In [6]:
# Criar um gerador
vra_generator = ler_jsons(diretorio_vra)

In [7]:
# Criação do dataframe com os dados de vra
vra_df = pd.DataFrame()

In [8]:
# Adicionando informações de cada arquivo JSON no dataframe
for lista_de_dicts in vra_generator:
    vra_df = vra_df._append(lista_de_dicts, ignore_index=True)

In [9]:
# Modificando nome das colunas do dataframe VRA
vra_df.columns = nomes_colunas_vra

In [10]:
# Visualização da tabela vra_df
vra_df

Unnamed: 0,icao_empresa_aerea,numero_voo,codigo_autorizacao,codigo_tipo_linha,icao_aerodromo_origem,icao_aerodromo_destino,partida_prevista,partida_real,chegada_prevista,chegada_real,situacao_voo,codigo_justificativa
0,AZU,4478,0,N,SBRJ,SBGO,2021-06-30 22:15:00,2021-06-30 22:00:00,2021-06-30 23:55:00,2021-06-30 23:56:00,REALIZADO,
1,AZU,4479,0,N,SBGO,SBRJ,2021-06-07 06:20:00,2021-06-07 06:14:00,2021-06-07 08:00:00,2021-06-07 07:47:00,REALIZADO,
2,AZU,4479,0,N,SBGO,SBRJ,2021-06-08 06:20:00,2021-06-08 06:13:00,2021-06-08 08:00:00,2021-06-08 07:48:00,REALIZADO,
3,AZU,4479,0,N,SBGO,SBRJ,2021-06-09 06:20:00,2021-06-09 06:10:00,2021-06-09 08:00:00,2021-06-09 07:55:00,REALIZADO,
4,AZU,4479,0,N,SBGO,SBRJ,2021-06-10 06:20:00,2021-06-10 06:13:00,2021-06-10 08:00:00,2021-06-10 07:53:00,REALIZADO,
...,...,...,...,...,...,...,...,...,...,...,...,...
535798,PVV,8653,9,X,SBGR,DGAA,2021-08-29 04:00:00,2021-08-29 03:58:00,2021-08-29 11:00:00,2021-08-29 11:32:00,REALIZADO,
535799,PVV,8752,9,X,VVNB,VCBI,2021-08-30 13:00:00,2021-08-30 13:00:00,2021-08-30 18:00:00,2021-08-30 18:00:00,REALIZADO,
535800,PVV,8752,9,X,VCBI,OEJN,2021-08-31 15:00:00,2021-08-31 15:00:00,2021-08-31 20:00:00,2021-08-31 20:00:00,REALIZADO,
535801,PVV,8752,2,I,GUCY,SBEG,2021-09-02 02:00:00,,2021-09-02 09:00:00,,CANCELADO,


In [11]:
# Formatando a data (dia, mês e ano com o horário de Manaus)
datetime = pendulum.now('America/Manaus').strftime('%d-%m-%y_%I:%M%p')

In [12]:
# Salvando o arquivo com os dados das viagens
vra_df.to_csv(f'./data_export/VRA/vra_data_{datetime}.csv', index=False)

## Carregar dos dados de AIR_CIA
- Normalizar o cabeçalho para snake case
- Separar a coluna 'ICAO IATA' em duas colunas, seu conteúdo está separado por espaço e pode não conter o código IATA, caso não contenha o código IATA, deixe o valor nulo.
- Salvar estes dados

In [13]:
# Diretório onde estão os arquivos CSV
diretorio_air_cia = './data_engineer_test/AIR_CIA/'

In [14]:
# Normalizando as colunas para snake_case
nomes_colunas_air_cia = ['razao_social', 'icao_iata', 'cnpj', 'atividades_aereas',
                         'endereco_sede', 'telefone', 'email', 'decisao_operacional', 
                         'data_decisao_operacional', 'validade_operacional']

In [15]:
# Inicializa um DataFrame vazio para armazenar os dados
df_air_cia = pd.DataFrame()

In [16]:
# Lista todos os arquivos CSV na pasta
for filename in os.listdir(diretorio_air_cia):
    if filename.endswith(".csv"):
        # Cria o caminho completo para o arquivo
        arquivo = os.path.join(diretorio_air_cia, filename)
        
        # Lê o arquivo CSV e adicione os dados ao DataFrame total
        df = pd.read_csv(arquivo, sep=';')
        df_air_cia = pd.concat([df_air_cia, df], ignore_index=True)

In [17]:
# Modificando nome das colunas do dataframe AIR_CIA
df_air_cia.columns = nomes_colunas_air_cia


In [18]:
# Separando os valores pelo espaço em icao_iata e colocando nas novas colunas: icao e iata
df_air_cia[['icao', 'iata']] = df_air_cia['icao_iata'].str.split(n=1, expand=True)

In [19]:
# Removendo icao_iata e deixando as novas colunas na mesma posição que ela estava 
posicao_icao_iata = nomes_colunas_air_cia.index('icao_iata')
nomes_colunas_air_cia.pop(posicao_icao_iata)
nomes_colunas_air_cia.insert(posicao_icao_iata, 'icao')
nomes_colunas_air_cia.insert(posicao_icao_iata + 1, 'iata')
df_air_cia = df_air_cia[nomes_colunas_air_cia]

In [20]:
# Eliminando as duplicatas das companhias aéreas
df_air_cia = df_air_cia.drop_duplicates()

In [21]:
# Formatando a data (dia, mês e ano com o horário de Manaus)
datetime = pendulum.now('America/Manaus').strftime('%d-%m-%y_%I:%M%p')

In [22]:
# Salvando o arquivo com os dados das companhias aéreas
df_air_cia.to_csv(f'./data_export/AIR_CIA/air_cia_data_{datetime}.csv', index=False)

## Criar nova tabela aerodromos
- Através da API https://rapidapi.com/Active-api/api/airport-info/ trazer os aeródramos através do código ICAO presente nos dados de VRA.
- Salvar estes dados


In [23]:
# Definindo URL e headers para utilizar a API
url = 'https://airport-info.p.rapidapi.com/airport'

query_string = {"icao":"value"}

headers = {
	"X-RapidAPI-Key": "a593951eafmsh253dc4c7ca30e1cp1eec96jsnaef5fd9a9c11",
	"X-RapidAPI-Host": "airport-info.p.rapidapi.com"
}

In [24]:
# Coletando os ICAO únicos
icao_unicos = pd.concat([vra_df['icao_aerodromo_origem'], vra_df['icao_aerodromo_destino']]).unique()

In [25]:
# Criação do dataframe que receberá os dados dos aeródromos
aerodromos_df = pd.DataFrame([])

In [26]:
# Consultando a API para receber os dados dos aeródromos
for icao in icao_unicos:
    query_string['icao'] = icao
    response = requests.get(url, headers=headers, params=query_string)
    
    if response.status_code == 200:
        if 'error' not in response.json():
            temp_data = pd.DataFrame([response.json()])
            aerodromos_df = pd.concat([aerodromos_df, temp_data], ignore_index=True)

## Tabelas geradas até o momento:

![Tabelas](./img/tables.png)


## Criar as seguintes views (Priorize o uso de SQL para esta parte):

In [27]:
# Diretório onde as views serão salvas
diretorio = './data_export/views'

Para cada companhia aérea trazer a rota mais utilizada com as seguintes informações:
- Razão social da companhia aérea
- Nome Aeroporto de Origem
- ICAO do aeroporto de origem
- Estado/UF do aeroporto de origem
- Nome do Aeroporto de Destino
- ICAO do Aeroporto de destino
- Estado/UF do aeroporto de destino

In [28]:
query = """
        WITH CombinaçõesRankeadas AS (
            SELECT
                vra.icao_empresa_aerea,
                vra.icao_aerodromo_origem,
                vra.icao_aerodromo_destino,
                COUNT(*) AS quantidade,
                ROW_NUMBER() OVER (PARTITION BY vra.icao_empresa_aerea ORDER BY COUNT(*) DESC) AS rank
            FROM vra_df vra
            JOIN df_air_cia f ON vra.icao_empresa_aerea = f.icao
            GROUP BY vra.icao_empresa_aerea, vra.icao_aerodromo_origem, vra.icao_aerodromo_destino
        )
        SELECT
            icao_empresa_aerea,
            icao_aerodromo_origem,
            icao_aerodromo_destino,
            quantidade
        FROM CombinaçõesRankeadas
        WHERE rank = 1;
        """

rotas_mais_utilizadas = sqldf(query)


Aqui podemos ver a rota mais utilizada para cada companhia aérea, e quantas vezes ela foi utilizada:

In [29]:
rotas_mais_utilizadas

Unnamed: 0,icao_empresa_aerea,icao_aerodromo_origem,icao_aerodromo_destino,quantidade
0,ACN,SWKO,SBEG,34
1,ASO,SBFL,SBCD,26
2,AZU,SBKP,SBRJ,2540
3,GLO,SBRJ,SBSP,2514
4,LTG,SEQM,KMIA,377
5,MWM,SBKP,SBBR,162
6,OMI,SBAR,SBGL,15
7,PAM,SBEG,SBUY,480
8,PTB,SBPS,SBSV,724
9,SID,SBGR,SBRF,752


Em seguida, a tabelá será detalhada conforme solicitado via README:

In [30]:
query = """
        SELECT DISTINCT
            rc.icao_empresa_aerea,
            ac.razao_social AS razao_social_companhia,
            ad_origem.name AS nome_aeroporto_origem,
            rc.icao_aerodromo_origem AS icao_origem,
            ad_origem.county AS estado_origem,
            ad_origem.state AS uf_origem,
            ad_destino.name AS nome_aeroporto_destino,
            rc.icao_aerodromo_destino AS icao_destino,
            ad_destino.county AS estado_destino,
            ad_destino.state AS uf_destino
        FROM rotas_mais_utilizadas rc
        JOIN df_air_cia ac ON rc.icao_empresa_aerea = ac.icao
        JOIN aerodromos_df ad_origem ON rc.icao_aerodromo_origem = ad_origem.icao
        JOIN aerodromos_df ad_destino ON rc.icao_aerodromo_destino = ad_destino.icao;
        """

rotas_mais_utilizadas_view = sqldf(query)

Abaixo é possível ver a tabela resultante da query acima:

In [31]:
rotas_mais_utilizadas_view

Unnamed: 0,icao_empresa_aerea,razao_social_companhia,nome_aeroporto_origem,icao_origem,estado_origem,uf_origem,nome_aeroporto_destino,icao_destino,estado_destino,uf_destino
0,ACN,AZUL CONECTA LTDA. (EX TWO TAXI AEREO LTDA),Coari Airport,SWKO,Coari,State of Amazonas,Eduardo Gomes International Airport,SBEG,Manaus,Amazonas
1,ASO,AEROSUL TÁXI AÉREO LTDA (EX.: AUSTEN TÁXI AÉRE...,Hercílio Luz International Airport,SBFL,Florianópolis,Santa Catarina,Caçador Airport,SBCD,Caçador,Santa Catarina
2,AZU,AZUL LINHAS AÉREAS BRASILEIRAS S/A,Viracopos/Campinas International Airport,SBKP,Campinas,São Paulo,Santos Dumont Airport,SBRJ,Rio de Janeiro,Rio de Janeiro
3,GLO,GOL LINHAS AÉREAS S.A. (EX- VRG LINHAS AÉREAS ...,Santos Dumont Airport,SBRJ,Rio de Janeiro,Rio de Janeiro,São Paulo–Congonhas Airport,SBSP,São Paulo,São Paulo
4,LTG,ABSA - AEROLINHAS BRASILEIRAS S.A.,Mariscal Sucre International Airport,SEQM,Distrito Metropolitano de Quito,Pichincha,Miami International Airport,KMIA,Miami-Dade County,Florida
5,MWM,MODERN TRANSPORTE AEREO DE CARGA S.A,Viracopos/Campinas International Airport,SBKP,Campinas,São Paulo,Brasília International Airport (Presidente J. ...,SBBR,Brasília,Distrito Federal
6,OMI,OMNI TÁXI AÉREO S.A.,Santa Maria Airport,SBAR,Santa Barbara County,California,Rio de Janeiro–Galeão International Airport,SBGL,Rio de Janeiro,Rio de Janeiro
7,PTB,PASSAREDO TRANSPORTES AÉREOS S.A.,Porto Seguro Airport,SBPS,Porto Seguro,Bahia,Deputado Luís Eduardo Magalhães International ...,SBSV,Salvador,Bahia
8,SID,SIDERAL LINHAS AÉREAS LTDA.,São Paulo–Guarulhos International Airport,SBGR,Guarulhos,São Paulo,Recife/Guararapes–Gilberto Freyre Internationa...,SBRF,Recife,Pernambuco
9,SUL,ASTA LINHAS AÉREAS LTDA ( EX - AMÉRICA DO SUL ...,Marechal Rondon International Airport,SBCY,Várzea Grande,Mato Grosso,Juína Airport,SWJN,Juína,State of Mato Grosso


In [32]:
# Formatando a data (dia, mês e ano com o horário de Manaus)
datetime = pendulum.now('America/Manaus').strftime('%d-%m-%y_%I:%M%p')

# Salvando o arquivo com os dados da primeira view
vra_df.to_csv(f'./data_export/views/companhias_aereas/rotas_mais_utilizadas_{datetime}.csv', index=False)


## Criar as seguintes views (Priorize o uso de SQL para esta parte):
Para cada aeroporto trazer a companhia aérea com maior atuação no ano com as seguintes informações:
- Nome do Aeroporto
- ICAO do Aeroporto
- Razão social da Companhia Aérea
- Quantidade de Rotas à partir daquele aeroporto
- Quantidade de Rotas com destino àquele aeroporto
- Quantidade total de pousos e decolagens naquele aeroporto

In [33]:
query = """
WITH MaxAtuacaoPorAeroporto AS (
    SELECT
        ad.icao AS aeroporto_icao,
        vra.icao_empresa_aerea,
        COUNT(DISTINCT vra.icao_aerodromo_origem) + COUNT(DISTINCT vra.icao_aerodromo_destino) AS total_atuacao,
        RANK() OVER (PARTITION BY ad.icao ORDER BY COUNT(*) DESC) AS r
    FROM aerodromos_df ad
    JOIN vra_df vra ON ad.icao = vra.icao_aerodromo_origem OR ad.icao = vra.icao_aerodromo_destino
    GROUP BY ad.icao, vra.icao_empresa_aerea
)
SELECT
    ad.name AS Nome_do_Aeroporto,
    ad.icao AS aeroporto_icao,
    ma.icao_empresa_aerea,
    ca.razao_social AS Razao_Social_da_Companhia_Aerea,
    COUNT(DISTINCT CASE WHEN vra.icao_aerodromo_origem = ad.icao THEN vra.icao_aerodromo_origem || vra.icao_aerodromo_destino END) AS quantidade_rotas_origem,
    COUNT(DISTINCT CASE WHEN vra.icao_aerodromo_destino = ad.icao THEN vra.icao_aerodromo_origem || vra.icao_aerodromo_destino END) AS quantidade_rotas_destino,
    SUM(CASE WHEN ad.icao = vra.icao_aerodromo_origem OR ad.icao = vra.icao_aerodromo_destino THEN 1 ELSE 0 END) AS quantidade_pousos_decolagens
FROM aerodromos_df ad
JOIN MaxAtuacaoPorAeroporto ma ON ad.icao = ma.aeroporto_icao AND ma.r = 1
JOIN df_air_cia ca ON ma.icao_empresa_aerea = ca.icao
LEFT JOIN vra_df vra ON (ad.icao = vra.icao_aerodromo_origem OR ad.icao = vra.icao_aerodromo_destino) AND ma.icao_empresa_aerea = vra.icao_empresa_aerea
GROUP BY ad.name, ad.icao, ma.icao_empresa_aerea, ca.razao_social
"""

aeroporto_view_df = sqldf(query)

Abaixo é possível ver a tabela resultante da query acima:

In [34]:
aeroporto_view_df

Unnamed: 0,Nome_do_Aeroporto,aeroporto_icao,icao_empresa_aerea,Razao_Social_da_Companhia_Aerea,quantidade_rotas_origem,quantidade_rotas_destino,quantidade_pousos_decolagens
0,Afonso Pena International Airport,SBCT,AZU,AZUL LINHAS AÉREAS BRASILEIRAS S/A,16,18,12352
1,Alberto Alcolumbre International Airport,SBMQ,GLO,GOL LINHAS AÉREAS S.A. (EX- VRG LINHAS AÉREAS ...,2,2,1327
2,Alta Floresta Airport,SBAT,AZU,AZUL LINHAS AÉREAS BRASILEIRAS S/A,1,1,482
3,Altamira Airport,SBHT,AZU,AZUL LINHAS AÉREAS BRASILEIRAS S/A,1,1,620
4,Apucarana Airport,SSAP,ASO,AEROSUL TÁXI AÉREO LTDA (EX.: AUSTEN TÁXI AÉRE...,1,1,28
...,...,...,...,...,...,...,...
119,Una-Comandatuba Airport,SBTC,TAM,TAM LINHAS AÉREAS S.A.,2,2,122
120,Val de Cans International Airport,SBBE,AZU,AZUL LINHAS AÉREAS BRASILEIRAS S/A,18,17,12009
121,Vilhena Airport (Brigadeiro Camarão Airport),SBVH,AZU,AZUL LINHAS AÉREAS BRASILEIRAS S/A,1,1,296
122,Viracopos/Campinas International Airport,SBKP,AZU,AZUL LINHAS AÉREAS BRASILEIRAS S/A,69,70,83423


In [35]:
# Formatando a data (dia, mês e ano com o horário de Manaus)
datetime = pendulum.now('America/Manaus').strftime('%d-%m-%y_%I:%M%p')

# Salvando o arquivo com os dados da primeira view
vra_df.to_csv(f'./data_export/views/aeroportos/detalhes_aerodromos_{datetime}.csv', index=False)


## Extras


#### Descrever qual estratégia você usaria para ingerir estes dados de forma incremental caso precise capturar esses dados a cada mes?

Eu preparia um pipeline no airflow que executasse 1 vez ao mês, ele cuidaria da extração dos dados, transformação e carregamento, podendo enviar emails para a equipe de engenharia de dados, caso houvesse falha na sua execução.
Dependendo de quem utilizasse os dados, poderiam ser disponibilizados via BigQuery, facilitando o compartilhamento e acesso para equipes, pois poderíamos escolher os projetos que teriam acesso aos dados.
O airflow seria responsável até na parte de subir para a cloud.

#### Justifique em cada etapa sobre a escalabilidade da tecnologia utilizada.

Tive o cuidado de utilizar bibliotecas "básicas" do python que já estão presentes há tempo, quando o assunto é "dados".
A modularização utilizada acima, é um fator importante, pois podemos separar facilmente as partes ou até mesmo removê-las, caso seja necessário no futuro.

#### Justifique as camadas utilizadas durante o processo de ingestão até a disponibilização dos dados.

**1 - Ingestão de Dados:**
Aqui utilizamos arquivos json, csv e até mesmo uma API, mostrando que podem surgir diversas fontes de dados e que necessitam de experiência para trabalhar com os mesmos.

**2 - Processamento de Dados:**
Os dados foram processados tendo como base o a solicitação via readme, desde coisas básicas como normalizar um cabeçalho, até separação de colunas e tratamento de duplicatas.

**3 - Armazenamento de Dados:**
A dados de saída estão sendo armazenados localmente, levando em consideração os poucos dados/arquivos gerados. Dependendo da quantidade, talvez seja necessário repensar este método.

**4 - Disponibilização de Dados:**
Os dados estão disponibilizados na pasta /views, em arquivos csv, para falicitar a leitura. Porém, poderiam ser colocados em bancos de dados (locais ou nuvem), para os usuários consultarem via SQL.