<a href="https://colab.research.google.com/github/Tiao553/objective-desafio-ed/blob/main/nootbooks-desenvolvimento/codes_dev.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<img src="https://cdn.eleflow.com.br/ef-web/wp-content/uploads/2016/08/21181642/Eleflow.png" alt="Eleflow BigData" width="200"/>

# Data engineering capstone

## BigData Airlines

A Eleflow irá atender um novo cliente, a _BigData Airlines_, e você será o engenheiro de dados responsável por fazer a ingestão de dados e preparar algumas tabelas para os cientistas de dados e analistas de dados. 

---

## Projeto de desenvolvimento

#### Resultado esperado

A BigData Airline tem principalmente duas fonte de dados **(1) VRA**  assim como o **(2) AIR_CIA**. Ambas serão utilizadas para construção de duas visualizações que serão utilizadas para análises. Dessa forma teremos como entrega as seguites views:

> #### View com as companhias aéreas representando a rota mais utilizada


| campo 	            | descrição                           	|
|:-------:	            |:-------------------------------------:|
| companhia      	    | Razão social da companhia aérea     	|
| aeroporto_origem      | Nome Aeroporto de Origem            	|
| icao_origem      	    | ICAO do aeroporto de origem       	|
| estado_origem         | Estado/UF do aeroporto de origem  	|
| icao_destino        | Nome do Aeroporto de Destino      	|
| icao_destino   | ICAO do Aeroporto de destino      	|
| estado_destino     | Estado/UF do aeroporto de destino 	|

---

> #### View com os aeroportos contendo as companhias aérea com maior atuação no ano:

| campo 	            | descrição                           	|
|:-------:	            |:-------------------------------------:|
| aeroporto             |  Nome do Aeroporto	|
| icao                  |  ICAO do Aeroporto	|
| companhia             |  Razão social da Companhia Aérea	|
| qtd_rotas_origem     |  Quantidade de Rotas à partir daquele aeroporto	|
| qtd_rotas_destino     |  Quantidade de Rotas com destino àquele aeroporto	|
| qtd_pousos_decolagens |  Quantidade total de pousos e decolagens naquele aeroporto	|

#### Infraestrtura proposta

Para que possamos implementar uma infraestrura que irá conseguir atender a atualização incremental dos dados o primeiro ponto é entender a dinâmica de insersão e consumo dos dados de ambas as tabelas. O primeiro ponto é que iremos supor que a inserção dos dados atua com a disponibilização manual deste dados em uma volumetria mensal em ambos os casos. Dessa forma necessitaremos de uma estrutura que consiga monitorar esse deploy deste dado e acionar o processamento dos dados, ou seja:

<center>
    <img src="https://raw.githubusercontent.com/Tiao553/objective-desafio-ed/main/nootbooks-desenvolvimento/img/sensor.png" alt="Diagrama de blocos do processo" width="600"/>
</center>

O processo segue a sequencia de camadas onde iremos dividir a evolução do dado em três etapas. Sendo elas (1) Raw/bronze onde o dado deve estar conforme o consumido mantendo-se o mesmo formato e sem nenhum tratamento, (2) trusted/silver nesta etapa devemos garantir a qualidade do dado onde garantiremos que não exista linhas duplicadas, como também garantiremos o esquema correto dos dados e por fim a (3) curated/gold nesta se encontra as view especiais para os analistas em que aplicamos a regras de negócio.

<center>
    <img src="https://raw.githubusercontent.com/Tiao553/objective-desafio-ed/main/nootbooks-desenvolvimento/img/processo.png" alt="Diagrama de blocos do processo" width="1000"/>
</center>

Para podemos conseguir utilizar esses processos de forma escalonável e gerenciavel teremos a seguinte arquitetura na cloud da AWS onde gerenciaremos os processos.

<center>
    <img src="https://raw.githubusercontent.com/Tiao553/objective-desafio-ed/main/nootbooks-desenvolvimento/img/infra.png" alt="Diagrama de blocos do processo" width="1200"/>
</center>

- Como cluster para processamento e gestão de recursos de forma orquestrada estaremos utilizando o EKS que é o sistema gerenciado de kubernetes na AWS. Dentro deste iremos gerir o Airflow que trata-se de um orquestrador de tarefas onde neste conseguigos agendar tarefas ou garantir que ela execute dado algum evento. Como mecanismo de processsamento, e pensando em escalonamento estaremos utilizando o spark, mas se a volumetria se manter seria possível alocar um python operator no cluster kubernetes e realizar as tratativas via pandas ou dask. Considerando que trabalharemos com spark este possui um workload dentro do kubernetes chamado sparkoperator onde conseguimos rodar aplicações. Para isso precisamos compilar esta em um container e dipobilizar como uma imagem para ser acessível para o spark operator executar. Dessa forma utilizamos a ECR(elastic container registry) que irá salvar as imagens para serem executadas pelo workload.

- Os dados serão salvos no diretorio de objetos e de alta disponibilidade que é o S3, dividiremos as camadas por buckets e pensando em redução de custos e otimizar o uso poderemos utilizar nas camadas silver e gold o formato dos arquivos como delta que apresenta maior compressão dos arquivos e versionamento, além de outras features. Para facilitar a leitura deste sem nenhum outro processo de alocação em um banco de dados utilizaremos o athena que é um vizualizador de arquivos com liguagem sql baseada no prestosql.

- Para governança e boas praticas as dags que são aonde cada proceso estará alocado e sequenciado a sua execução irão ser alocados em um repositório git onde o própio airflow garante o sincronismo com este direto com a main. Para o processo 1 estaremos alocando o operatorSensor do airflow para verificar no airflow se existem novos arquivos para as tabelas air_cia e vra, onde irá acionar o processo de processamento do processo 2. Neste iremos discorrer neste notebook o processamento das tabelas, como também a criação da tabelas dos aerodromos. Ja no processo 3 iremos gerir os agrupamentos para que possamos disponibilizar no athena.

- Para conseguirmos entender o que acontece nos nosso processos estaremos utilizando o sistema de monitoramento de logs o prometheus e disponibolizaremos os resultados com um dashboard no grafana.

- Para monitorarmos as metricas do cluster estaremos utilizando o sistema gerenciado da aws o cloudWactch.

- E para garantirmos o CD das aplicações no cluster estaremos utilizando o argoCD que é um gerenciador de deployments dentro do cluster kubernetes.

--- 

# Desenvolvimento




---

## Setup Geral

Se estiver executando este exercício no Google Colab, execute as próximas duas células. 

Caso esteja executando localmente, não é necessário executar mas certifique-se de que o **pyspark** está instalado e configurado em sua máquina.

In [1]:
%%bash

# Instal Java
apt-get update && apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
pip install -q pyspark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:13 http://security.ubuntu.com/ubuntu bionic-security/universe amd64 Packages [1,573 kB]
Get:14 http://p

In [2]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

## Diponibilização do dados no ambiente de trabalho

Embora os dados tenham sido propostos serem diponibilizados no S3 no momento de desenvolvimento estaremos trabalhando no ambiente do colab onde faz-se necessário a disponibilização dos dados. Dessa maneira, como os dados iniciais encontram-se no repositorio do github estaremos baixando estes no ambiente.

In [3]:
%%bash

mkdir air_cia

curl https://raw.githubusercontent.com/Tiao553/objective-desafio-ed/main/nootbooks-desenvolvimento/data/AIR_CIA/ANAC_20211220_203627.csv -o air_cia/ANAC_20211220_203627.csv
curl https://raw.githubusercontent.com/Tiao553/objective-desafio-ed/main/nootbooks-desenvolvimento/data/AIR_CIA/ANAC_20211220_203643.csv -o air_cia/ANAC_20211220_203643.csv
curl https://raw.githubusercontent.com/Tiao553/objective-desafio-ed/main/nootbooks-desenvolvimento/data/AIR_CIA/ANAC_20211220_203733.csv -o air_cia/ANAC_20211220_203733.csv

mkdir vra

for i in {1..11}; 
do 
    curl https://raw.githubusercontent.com/Tiao553/objective-desafio-ed/main/nootbooks-desenvolvimento/data/VRA/VRA_2021$i.json -o vra/VRA_2021$i.json; 
done

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  4346  100  4346    0     0  10083      0 --:--:-- --:--:-- --:--:-- 10083
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  1863  100  1863    0     0   3703      0 --:--:-- --:--:-- --:--:--  3696
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100   754  100   754    0     0   1923      0 --:

Com os dados iniciados podemos realizar a leituras destes e iniciar a tratativa dos dados conforme solicitado pela equipe de negócio. Dessa forma, iremos criar uma função de leitura para acelerar o processo de desenvolvimento.

In [4]:
# Setup Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("criacao_do_processo_2").getOrCreate()

# Setup function to read data
def read_data(path, format, delimiter="\t"):
    if format == 'csv':
        df = (
            spark
            .read
            .csv(path, inferSchema=True, header=True,sep=delimiter)
        )
    else:
        df =(
            spark
            .read
            .format(format)
            .load(path)
        )
    return df

# read bronze table
vra = read_data("vra/","json")

# read bronze table
air_cia = read_data("air_cia/","csv",";")

Vamos verificar os dados:

In [5]:
vra.show(2)

air_cia.show(2)

+-------------------+-------------------+-----------------+-------------------+---------------+--------------------+-------------------+----------------+---------+-------------------+-------------------+-----------+
|    ChegadaPrevista|        ChegadaReal|CódigoAutorização|CódigoJustificativa|CódigoTipoLinha|ICAOAeródromoDestino|ICAOAeródromoOrigem|ICAOEmpresaAérea|NúmeroVoo|    PartidaPrevista|        PartidaReal|SituaçãoVoo|
+-------------------+-------------------+-----------------+-------------------+---------------+--------------------+-------------------+----------------+---------+-------------------+-------------------+-----------+
|2021-11-12 08:30:00|2021-11-12 08:24:00|                0|                N/A|              X|                KORD|               SBGR|             UAL|     0844|2021-11-11 22:00:00|2021-11-11 22:14:00|  REALIZADO|
|2021-11-15 08:30:00|2021-11-15 08:05:00|                0|                N/A|              X|                KORD|               SBGR|

Para ambas as tabelas faz-se necessário apresentar as colunas no formato snake case. Dessa forma, iremos criar um função que iremos aplicar em ambas as tabelas.

>  **snake case:** refere-se ao estilo de escrita em que cada espaço é substituído por um caractere de sublinhado (_) e a primeira letra de cada palavra é escrita em minúscula

In [6]:
%%bash
pip install Unidecode

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting Unidecode
  Downloading Unidecode-1.3.6-py3-none-any.whl (235 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 235.9/235.9 KB 5.9 MB/s eta 0:00:00
Installing collected packages: Unidecode
Successfully installed Unidecode-1.3.6


In [7]:
 from unidecode import unidecode
 
 def snake_case(df):
    def capital_letter_divider(a):
        aux = 0 
        for index,value in enumerate(a):
            if index != 0 and value.isupper():
                a = a[:(index+aux)] + "_" + a[(index+aux):]
                aux += 1
        return a

    def special_cases(a):
        return a.replace("ICAO","Icao").replace("CNPJ","Cnpj").replace("IATA","Iata")

    return [unidecode(capital_letter_divider(special_cases(str(a))).replace('-','').replace(' _','_').replace(' ','_').lower()) for a in df.columns]

In [8]:
vra_snake = (
    vra
    .toDF(*snake_case(vra))
)

air_cia_snake = (
    air_cia
    .toDF(*snake_case(air_cia))
)

vra_snake.show(2)
air_cia_snake.show(2)

+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+------------+
|   chegada_prevista|       chegada_real|codigo_autorizacao|codigo_justificativa|codigo_tipo_linha|icao_aerodromo_destino|icao_aerodromo_origem|icao_empresa_aerea|numero_voo|   partida_prevista|       partida_real|situacao_voo|
+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+------------+
|2021-11-12 08:30:00|2021-11-12 08:24:00|                 0|                 N/A|                X|                  KORD|                 SBGR|               UAL|      0844|2021-11-11 22:00:00|2021-11-11 22:14:00|   REALIZADO|
|2021-11-15 08:30:00|2021-11-15 08:05:00|                 0|                 N/A|       

Para aplicarmos o snake case realizamos a construção de uma função onde foi construido uma funcionalidade para varrer cada letrar com objetivo de encontrar as que são maiusculas e a partir desta adicionar o *underline*. Com o objetivo de criar uma função génerica acabamos encontrando alguns casos especiais e optei por não avançar na investigação para garantir o tratamento destes e tratamos pontualmente. Além disso, também adicionamos a função unidecode que garante o tratamento de caracteres especiais.

Aplicado o Snake case iremos avaliar os tipos dos dados. Para isso estaremos utilizando o funcionalidade `printSchema` que nos retorna o tipo de cada colunas.

In [9]:
air_cia_snake.printSchema()
vra_snake.printSchema()

root
 |-- razao_social: string (nullable = true)
 |-- icao_iata: string (nullable = true)
 |-- cnpj: string (nullable = true)
 |-- atividades_aereas: string (nullable = true)
 |-- endereco_sede: string (nullable = true)
 |-- telefone: string (nullable = true)
 |-- e_mail: string (nullable = true)
 |-- decisao_operacional: string (nullable = true)
 |-- data_decisao_operacional: string (nullable = true)
 |-- validade_operacional: string (nullable = true)

root
 |-- chegada_prevista: string (nullable = true)
 |-- chegada_real: string (nullable = true)
 |-- codigo_autorizacao: string (nullable = true)
 |-- codigo_justificativa: string (nullable = true)
 |-- codigo_tipo_linha: string (nullable = true)
 |-- icao_aerodromo_destino: string (nullable = true)
 |-- icao_aerodromo_origem: string (nullable = true)
 |-- icao_empresa_aerea: string (nullable = true)
 |-- numero_voo: string (nullable = true)
 |-- partida_prevista: string (nullable = true)
 |-- partida_real: string (nullable = true)
 |-

Por meio do `printSchema` podemos verificar que todos os campos apresentam-se como string. Sendo assim, ao lermos os arquivos da camada bronze precisaremos forçar o esquema leitura. Utilizando o formato de schema Data Definition Language (DDL) para definir o esquema Spark poderemos forçar na leitura.

Abaixo temos a representação do schema em DDL, que é o formato onde definiremos esquema por meio de um string.

In [10]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

schema_air_cia = """
    razao_social string,
    icao_iata string,
    cnpj string,
    atividades_aereas string,
    endereco_sede string,
    telefone string,
    e_mail string,
    decisao_operacional string,
    data_decisao_operacional string,
    validade_operacional string
"""

schema_vra = """
    chegada_prevista timestamp,
    chegada_real timestamp,
    codigo_autorizacao string,
    codigo_justificativa string,
    codigo_tipo_linha  string,
    icao_aerodromo_destino string,
    icao_aerodromo_origem string,
    icao_empresa_aerea string,
    numero_voo int,
    partida_prevista timestamp,
    partida_real timestamp,
    situacao_voo string
"""

schema_test = StructType(
    [
        StructField('chegada_prevista', IntegerType()),
        StructField('chegada_real', IntegerType()),
        StructField('codigo_autorizacao', StringType()),
        StructField('codigo_justificativa', StringType()),
        StructField('codigo_tipo_linha', StringType()),
        StructField('icao_aerodromo_destino', StringType()),
        StructField('icao_aerodromo_origem', StringType()),
        StructField('icao_empresa_aerea', StringType()),
        StructField('numero_voo', IntegerType()),
        StructField('partida_prevista', IntegerType()),
        StructField('partida_real', IntegerType()),
        StructField('situacao_voo', StringType()),
    ]
)

# Setup function to read data
def read_data(path, format, delimiter="\t",schema=None):
    if format == 'csv':
        df = (
            spark
            .read
            .csv(path, inferSchema=True, header=True,sep=delimiter, schema=schema)
        )
    else:
        df =(
            spark
            .read
            .format(format)
            .load(path)
        )
    return df

# read bronze table
vra = read_data("vra/","json", schema=schema_test)

# read bronze table
air_cia = read_data("air_cia/","csv",delimiter=";",schema=schema_air_cia)

vra_snake_type = (
    vra
    .toDF(*snake_case(vra))
)

air_cia_snake_type = (
    air_cia
    .toDF(*snake_case(air_cia))
)

print("air_cia:")
air_cia_snake_type.show(2)
air_cia_snake_type.printSchema()

print("vra:")
vra_snake_type.show(2)
vra_snake_type.printSchema()

air_cia:
+--------------------+---------+------------------+--------------------+--------------------+--------------+--------------------+-------------------+------------------------+--------------------+
|        razao_social|icao_iata|              cnpj|   atividades_aereas|       endereco_sede|      telefone|              e_mail|decisao_operacional|data_decisao_operacional|validade_operacional|
+--------------------+---------+------------------+--------------------+--------------------+--------------+--------------------+-------------------+------------------------+--------------------+
|ABSA - AEROLINHAS...|   LTG M3|00.074.635/0001-33|TRANSPORTE AÉREO ...|AEROPORTO INTERNA...|(11) 5582-8055|      gar@tam.com.br|      DECISÃO Nº 41|              22/04/2015|          23/04/2025|
|AEROSUL TÁXI AÉRE...|   ASO 2S|27.315.694/0001-02|SERVIÇOS AÉREOS P...|RODOVIA PR 218, K...|(43) 3176-4030|operacoes@aerosul...|     DECISÃO Nº 282|              10/02/2021|                null|
+----------

Como o dataframe nao aceitou o esquema forçado a leitura teremos que ajustar esses formatos por meio da codificação utilizando os metodos do spark para forçarmos os tipos dos dados. Além disso, iremos remover os dados duplicados.

In [11]:
vra_snake_type = (
    vra_snake_type
    .withColumn('chegada_prevista',to_timestamp(col('chegada_prevista')))
    .withColumn('chegada_real',to_timestamp(col('chegada_real')))
    .withColumn('partida_prevista',to_timestamp(col('partida_prevista')))
    .withColumn('partida_real',to_timestamp(col('partida_real')))
    .withColumn('numero_voo',col('numero_voo').cast('int'))
    .drop_duplicates()
)

print("vra:")
vra_snake_type.show(2)
vra_snake_type.printSchema()

vra:
+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+------------+
|   chegada_prevista|       chegada_real|codigo_autorizacao|codigo_justificativa|codigo_tipo_linha|icao_aerodromo_destino|icao_aerodromo_origem|icao_empresa_aerea|numero_voo|   partida_prevista|       partida_real|situacao_voo|
+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+------------+
|2021-11-13 08:55:00|2021-11-13 08:40:00|                 0|                 N/A|                X|                  SBGL|                 SABE|               ARG|      1260|2021-11-13 06:00:00|2021-11-13 06:15:00|   REALIZADO|
|2021-11-08 07:45:00|               null|                 0|                 N/A|  

Para garantir a qualidade dos dados precimos garantir que as colunas em `string` não apresentem espaços em branco em extras oque impacta no uso dos dados.

In [12]:
def processing_string(df):
    for column in [a[0] for a in df.dtypes if a[1] == 'string']:
        df = df.withColumn(column, trim(col(column)))
        return df

air_cia_snake_type = processing_string(air_cia_snake_type)
vra_snake_type = processing_string(vra_snake_type)

print("air_cia:")
air_cia_snake_type.show(2)
air_cia_snake_type.printSchema()

print("vra:")
vra_snake_type.show(2)
vra_snake_type.printSchema()

air_cia:
+--------------------+---------+------------------+--------------------+--------------------+--------------+--------------------+-------------------+------------------------+--------------------+
|        razao_social|icao_iata|              cnpj|   atividades_aereas|       endereco_sede|      telefone|              e_mail|decisao_operacional|data_decisao_operacional|validade_operacional|
+--------------------+---------+------------------+--------------------+--------------------+--------------+--------------------+-------------------+------------------------+--------------------+
|ABSA - AEROLINHAS...|   LTG M3|00.074.635/0001-33|TRANSPORTE AÉREO ...|AEROPORTO INTERNA...|(11) 5582-8055|      gar@tam.com.br|      DECISÃO Nº 41|              22/04/2015|          23/04/2025|
|AEROSUL TÁXI AÉRE...|   ASO 2S|27.315.694/0001-02|SERVIÇOS AÉREOS P...|RODOVIA PR 218, K...|(43) 3176-4030|operacoes@aerosul...|     DECISÃO Nº 282|              10/02/2021|                null|
+----------

Com os tipos validados e os cuidados com os campos em string podemos finalizar a tabela silver do VRA. Dessa maneira, podemos avaliar os dados  da tabela Air_cia, onde garantiremos a qualidade e a sua limpeza de forma profunda pensando na sua utilização tanto para análises quanto para uso em aplicações.

para isso iremos realizar a limpeza dos caracteres especiais contidos na coluna CNPJ e telefone. Na coluna telefone ainda contem mais de 1 alocados para o mesma companhia, neste caso iremos construir uma lista de telefones. Como também iremos garantir os tipos de dados que precisaram ser forçados utilizando os metodos do spark.

In [96]:
#air_cia_snake_type_final =
@udf (returnType = StringType())
def cnpj_transforming(string):
    if string:
        return string.replace('.','').replace('/','').replace('-','')
    else:
        return None

@udf (returnType = StringType())
def telefone_transforming(string):
    if string:
        return string.replace('(','').replace(')','').replace(' ','').replace('-','').replace("|Fax:","/")
    else:
        return None

air_cia_snake_type_final =(
    air_cia_snake_type
    .withColumn("icao", split(col("icao_iata"), " ")[0])
    .withColumn("iata", split(col("icao_iata"), " ")[1])
    .drop(col("icao_iata"))
    .withColumn("cnpj", cnpj_transforming(col("cnpj")))
    .withColumn("telefone", split(telefone_transforming(col('telefone')),"/"))
    #.withColumn("decisao_operacional", regexp_replace('decisao_operacional',"DECISÃO Nº ",'').cast("int"))
    .withColumn("data_decisao_operacional" , to_date(col('data_decisao_operacional'),'dd/MM/yyyy'))
    .withColumn("validade_operacional" , to_date(col('validade_operacional'),'dd/MM/yyyy'))
    .withColumn("atividades_aereas", split("atividades_aereas", ","))
    .drop_duplicates()
)
air_cia_snake_type_final.show(30, truncate=False)

+------------------------------------------------------------------+--------------+------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+-----------------------------------+----------------------------+------------------------+--------------------+----+----+
|razao_social                                                      |cnpj          |atividades_aereas                                                                                     |endereco_sede                                                                                                                                                       |telefone                    |e_mail                             |decisao_operacional         |data_decisao_operacional|validade_operacional|icao|ia

Como regra de negocio foi proposto que quando não encontrassemos o valor de iata era necessário considerar este valor como nulo. Portanto, na representação abaixo temos todos os casos sendo representado, assim como a evidência do caso.

In [14]:
air_cia_snake_type_final.select("icao","iata").distinct().show(30)

+----+----+
|icao|iata|
+----+----+
| MWM|  WD|
|null|null|
| ASO|  2S|
| ACN|  2F|
| AZU|  AD|
| PTB|  2Z|
| TAM|  JJ|
| OMI|  O1|
| GLO|  G3|
| TTL|  L1|
| ABJ|null|
| SUL|  0A|
| PAM|  7M|
| SID|  0S|
| LTG|  M3|
| CKP|null|
| RIM|  0R|
+----+----+



### Construção da tabela aerodromos

Com ambas as tabelas sendo finalizadas podemos utilizar os dados para que possamos consultar a API para retorno das informações do aerodromos. Como estratégia iremos primeiramente identificar os valores unicos de cada icao para que não realizemos requisições mais que o necessário.

In [15]:
df_uniques_icao = vra_snake_type.select("icao_aerodromo_destino","icao_aerodromo_origem").distinct()

Como existem duas tabelas contendo os icao's teremos que contruir uma logica que garanta a eficiência das requisições. Para realizar a requisição esteremos utilizando um função onde informaremos o icao a api retorna um conjunto de dados. Esta API tem a necessidade de uma token de identificação de requisão. Como seguraça estaremos definindo este como variável de ambiente para nao te-lo exposto no código.

In [23]:
import os
# Set environment variables
os.environ['X-RapidAPI-Key'] = ""

In [24]:
def request_icao(string):
    if string:
        import requests
        url = "https://airport-info.p.rapidapi.com/airport"
        querystring = {"icao":string}
        headers = {
            "X-RapidAPI-Key": os.getenv('X-RapidAPI-Key'),
            "X-RapidAPI-Host": "airport-info.p.rapidapi.com"
        }
        return requests.request("GET", url, headers=headers, params=querystring).json()
    else:
        return None

Com a função criada vamos para logica de requisão dos dados. Neste estaremos utilzando a ideia de PythonOperator onde trabalharemos com tipo nativo a lista para alocação dos dados e json como requisão dos dados. Iremos coletar as linhas do dataframe spark contendo os icao's unicos, ao lermos cada linhas e cada coluna iremos adicionar cada icao requisitado em uma lista e verificar se este ja existe nesta. Para evitarmos timeouts utilizaremos a logica de try except e quando esbarrarmos em um esperaremos por 60 segundos em seguida retornaremos a requisitar o dado.

In [25]:
import time
icao_vector = []
vector_data = []
for row in df_uniques_icao.collect():
    #print(row[0],row[1])
    if row[0] not in icao_vector:
        icao_vector.append(row[0])
        try:
            vector_data.append(request_icao(row[0]))
        except:
            time.sleep(60)
            vector_data.append(request_icao(row[0]))
    if row[1] not in icao_vector:
        icao_vector.append(row[1])
        try:
            vector_data.append(request_icao(row[1]))
        except:
            time.sleep(60)
            vector_data.append(request_icao(row[1]))

Para mapearmos cada icao que não obtemos retorno teremos que forçar o vetor de icaos solicitados nos campos dos json. dessa forma, tirando o campo icao, o restante todos estaram nulos. 

In [30]:
data = []
for index, value in enumerate(vector_data):
    value["icao"] = icao_vector[index]
    data.append([value])

Tendo estruturado os dados em uma lista podemos criar a tabela em spark deste conjunto de dados. Estaremos lendo os Jsons, como string para garantirmos o campos corretos ao extrair as strings, ja que ao forçar o schema ele pode indexar incorretamente os campos aos quais os dados pertencem.

In [45]:
schema_aerodromo = StructType(
    [
        StructField('id', StringType()),
        StructField('iata', StringType()),
        StructField('icao', StringType()),
        StructField('name', StringType()),
        StructField('location', StringType()),
        StructField('street_number', StringType()),
        StructField('street', StringType()),
        StructField('city', StringType()),
        StructField('county', StringType()),
        StructField('state', StringType()),
        StructField('country_iso', StringType()),
        StructField('country', StringType()),
        StructField('postal_code', StringType()),
        StructField('phone', StringType()),
        StructField('latitude', StringType()),
        StructField('longitude', StringType()),
        StructField('uct', IntegerType()),
        StructField('website', StringType())
    ]
)

df_distincts = spark.createDataFrame(data = vector_data, schema=schema_aerodromo)
df_distincts.show(truncate=False)

+----+----+----+-----------------------------------------------------------------------+---------------------------------------+-------------+---------------------------------+--------------+--------------------+--------------------------+-----------+------------------+-----------+----------------+----------+----------+----+-----------------------------------------------------------------------------------------------------+
|id  |iata|icao|name                                                                   |location                               |street_number|street                           |city          |county              |state                     |country_iso|country           |postal_code|phone           |latitude  |longitude |uct |website                                                                                              |
+----+----+----+-----------------------------------------------------------------------+---------------------------------------+-------------+

Com a tabela instanciada como dataframe spark podemos desenvolver o processando desta. O primeiro ponto observado é no campo state contendo string denessário e que pode estar gerando duplicidade **State of**. Outro ponto, é que o campo de latitude e longitude tiveram que ser inseridos como string pois existiam dois formatos inseridos pela API.

In [50]:
df_aerodromo_final = (
    df_distincts
    .withColumn("state", regexp_replace('state',"State of ",''))
    .withColumn("latitude", col("latitude").cast('float'))
    .withColumn("longitude", col("longitude").cast('float'))
)

df_aerodromo_final = processing_string(df_aerodromo_final)

Portanto esses foram os processamentos observados como necessários para que podemos declarar esses dados como confiaveis ou trusted ou mesmo silver. Sendo assim, podemos salvar estes ou no caso deste notebook instancia-los como uma tabela.

In [58]:
# Setup function to enable table
def create_view(df, name):
    df.createOrReplaceTempView(name)

create_view(df_aerodromo_final, "silver_aerodromo_api")
create_view(air_cia_snake_type_final, "silver_air_cia")
create_view(vra_snake_type, "silver_vra")

## Views

Utima estapa do processo, ja com dados consumidos e pronto para o uso iremos realizar a contrução das views por meio da linguagem sql. Dessa forma vamos contruir uma função para ser possível realizar consultas nas tabelas silver criadas.

In [52]:
def query(query, n=30):
    return spark.sql(query).show(n=n, truncate=False)

Como representação dos dados utilizados temos:

In [54]:
query("select * from silver_aerodromo_api",n=1)

+----+----+----+-----------------------------------+---------------------------------------+-------------+------+----+-----------------+-----------+-------+-----------+----------------+----------+---------+----+---------------------------------------------------------------------------------------------+
|id  |iata|icao|name                               |location                               |street_number|street|city|state            |country_iso|country|postal_code|phone           |latitude  |longitude|uct |website                                                                                      |
+----+----+----+-----------------------------------+---------------------------------------+-------------+------+----+-----------------+-----------+-------+-----------+----------------+----------+---------+----+---------------------------------------------------------------------------------------------+
|6029|POA |SBPA|Salgado Filho International Airport|Porto Alegre, Rio Grande do Su

In [91]:
query("select * from silver_air_cia",n=1)

+----------------------------------+--------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------+------------+--------------+-------------------+------------------------+--------------------+----+----+
|razao_social                      |cnpj          |atividades_aereas         |endereco_sede                                                                                                               |telefone    |e_mail        |decisao_operacional|data_decisao_operacional|validade_operacional|icao|iata|
+----------------------------------+--------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------+------------+--------------+-------------------+------------------------+--------------------+----+----+
|ABSA - AEROLINHAS BRASILEIRAS S.A.|00074635000133|[TRANSPORTE AÉREO REGULAR

In [56]:
query("select * from silver_vra",n=1)

+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+------------+
|chegada_prevista   |chegada_real       |codigo_autorizacao|codigo_justificativa|codigo_tipo_linha|icao_aerodromo_destino|icao_aerodromo_origem|icao_empresa_aerea|numero_voo|partida_prevista   |partida_real       |situacao_voo|
+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+------------+
|2021-11-13 08:55:00|2021-11-13 08:40:00|0                 |N/A                 |X                |SBGL                  |SABE                 |ARG               |1260      |2021-11-13 06:00:00|2021-11-13 06:15:00|REALIZADO   |
+-------------------+-------------------+------------------+--------------------+-------

### View companhia

- Para cada companhia aérea trazer a rota mais utilizada com as seguintes informações:
    - Razão social da companhia aérea
    - Nome Aeroporto de Origem
    - ICAO do aeroporto de origem
    - Estado/UF do aeroporto de origem
    - Nome do Aeroporto de Destino
    - ICAO do Aeroporto de destino
    - Estado/UF do aeroporto de destino

Query construida:

In [68]:
query(
"""
select
    rank_cia.companhia as companhia,
    rank_cia.icao_origem as icao_origem,
    aero1.name as nome_origem,
    aero1.state as estado_origem,
    rank_cia.icao_destino as icao_destino,
    aero2.name as nome_destino,
    aero2.state as estado_destino
from
    (
        select
            air.razao_social as companhia,
            vra.icao_aerodromo_origem as icao_origem ,
            vra.icao_aerodromo_destino as icao_destino,
            count(*) as count
        from 
            silver_vra as vra
            LEFT JOIN silver_air_cia as air
                ON vra.icao_empresa_aerea = air.icao
        group by
            1,2,3
        order by
            count(*)  desc
    ) as rank_cia
    LEFT JOIN silver_aerodromo_api as aero1
        ON rank_cia.icao_origem = aero1.icao
    LEFT JOIN silver_aerodromo_api as aero2
        ON rank_cia.icao_destino = aero2.icao
"""
)

+---------------------------------------------------+-----------+-----------------------------------------------------------------------+-----------------+------------+-----------------------------------------------------------------------+-----------------+
|companhia                                          |icao_origem|nome_origem                                                            |estado_origem    |icao_destino|nome_destino                                                           |estado_destino   |
+---------------------------------------------------+-----------+-----------------------------------------------------------------------+-----------------+------------+-----------------------------------------------------------------------+-----------------+
|TAM LINHAS AÉREAS S.A.                             |SBSP       |São Paulo–Congonhas Airport                                            |São Paulo        |SBRJ        |Santos Dumont Airport                                  

> ### Por meio de windows functions é possível otmizar esta query.

### View Aeroporto

- Para cada aeroporto trazer a companhia aérea com maior atuação no ano com as seguintes informações:
    - Nome do Aeroporto
    - ICAO do Aeroporto
    - Razão social da Companhia Aérea
    - Quantidade de Rotas à partir daquele aeroporto
    - Quantidade de Rotas com destino àquele aeroporto
    - Quantidade total de pousos e decolagens naquele aeroporto

- Precisamos garantir a localização do icao origem com a do icao aeroporto para que realizemos a contagem de quantidade de Rotas à partir daquele aeroporto.
- Precisamos garantir a localização do icao destino com a do icao aeroporto para que realizemos a contagem de quantidade de Rotas com destino àquele aeroporto.
- A quantidade total de pousos e decolagens naquele aeroporto é a soma dos dois anteriores

Query construida:

In [128]:
query(
"""
SELECT
    origem.aeroporto,
    origem.icao,
    air.razao_social as companhia,
    origem.qtd_rotas_origem,
    destino.qtd_rotas_destino,
    (origem.qtd_rotas_origem + destino.qtd_rotas_destino) as qtd_pousos_decolagens
FROM
    (
        select
            aero.name as aeroporto,
            aero.icao as icao,
            vra1.icao_empresa_aerea as icao_empresa_aerea,
            count(vra1.icao_aerodromo_origem) as qtd_rotas_origem
        from 
            silver_aerodromo_api as aero
            LEFT JOIN silver_vra as vra1
                ON  vra1.icao_aerodromo_origem = aero.icao   
        group by
            1,2,3
        order by
            4
        desc
    ) as origem
    LEFT JOIN (
        select
            aero.name as aeroporto,
            aero.icao as icao,
            vra2.icao_empresa_aerea as icao_empresa_aerea,
            count(vra2.icao_aerodromo_destino) as qtd_rotas_destino
        from 
            silver_aerodromo_api as aero
            LEFT JOIN silver_vra as vra2
                ON  vra2.icao_aerodromo_destino = aero.icao    
        group by
            1,2,3
        order by
            4
        desc
    ) as destino
    ON (
        origem.aeroporto = destino.aeroporto and
        origem.icao = destino.icao and
        origem.icao_empresa_aerea = destino.icao_empresa_aerea
    )
    LEFT JOIN silver_air_cia as air
        ON origem.icao_empresa_aerea = air.icao
"""
)          

+-----------------------------------------------------------------------+----+---------------------------------------------------+----------------+-----------------+---------------------+
|aeroporto                                                              |icao|companhia                                          |qtd_rotas_origem|qtd_rotas_destino|qtd_pousos_decolagens|
+-----------------------------------------------------------------------+----+---------------------------------------------------+----------------+-----------------+---------------------+
|São Paulo–Guarulhos International Airport                              |SBGR|GOL LINHAS AÉREAS S.A. (EX- VRG LINHAS AÉREAS S.A.)|23225           |24322            |47547                |
|Salgado Filho International Airport                                    |SBPA|TAM LINHAS AÉREAS S.A.                             |4442            |4442             |8884                 |
|Santos Dumont Airport                                      

> ### A query apresenta redundância mas conseguimos reuzir o shufle de dados unificando em 1 join a query.