<a href="https://colab.research.google.com/github/imarkes/BigDataAirlines/blob/main/BigDataAirlines.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import pyspark
import re
import requests
import unicodedata
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import udf
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, DateType, TimestampType, \
    IntegerType


In [3]:
spark = (SparkSession
         .builder
         .master('local[*]')
         .appName('BigData Airlines')
         .getOrCreate())

# BigData Airlines
A Eleflow irá atender um novo cliente, a BigData Airlines, e você será o engenheiro de dados responsável por fazer a ingestão de dados e preparar algumas tabelas para os cientistas de dados e analistas de dados.

##Carregar os dados de VRA
* Normalizar o cabeçalho para snake case
Salvar estes dados

In [4]:
# Carregamento dos dados de VRA
vra_path = '/content/spark-warehouse/VRA/VRA_*.json'

df_vra = (
    spark
    .read
    .option('inferSchema', 'true')
    .json(vra_path)
)

df_vra.printSchema()
print('total de registros: ', df_vra.count())

root
 |-- ChegadaPrevista: string (nullable = true)
 |-- ChegadaReal: string (nullable = true)
 |-- CódigoAutorização: string (nullable = true)
 |-- CódigoJustificativa: string (nullable = true)
 |-- CódigoTipoLinha: string (nullable = true)
 |-- ICAOAeródromoDestino: string (nullable = true)
 |-- ICAOAeródromoOrigem: string (nullable = true)
 |-- ICAOEmpresaAérea: string (nullable = true)
 |-- NúmeroVoo: string (nullable = true)
 |-- PartidaPrevista: string (nullable = true)
 |-- PartidaReal: string (nullable = true)
 |-- SituaçãoVoo: string (nullable = true)

total de registros:  535803


In [5]:
# Normalizar o cabeçalho VRA
def normalizer_headers_vra(words: str) -> str:
    """
    :param: ChegadaPrevista
    :return: chegada_prevista
    """
    assert isinstance(words, str)

    old_words = ''.join(ch for ch in unicodedata.normalize('NFKD', words)
    if not unicodedata.combining(ch))

    words = re.findall('[A-Z]*[^A-Z]*', old_words)  # split letras maiúsculas
    
    words = [x.lower() for x in words]

    return '_'.join(words)[:-1]


rename_cols = {col_name: normalizer_headers_vra(col_name) for col_name in df_vra.columns}
df_vra = df_vra.select([when(col(c) == "", None).otherwise(col(c)).alias(rename_cols.get(c, c)) for c in df_vra.columns])



In [6]:
# Visualiza cabeçalhos normalizados
df_vra.show(5)

+-------------------+-------------------+------------------+--------------------+-----------------+---------------------+--------------------+-----------------+----------+-------------------+-------------------+------------+
|   chegada_prevista|       chegada_real|codigo_autorizacao|codigo_justificativa|codigo_tipo_linha|icaoaerodromo_destino|icaoaerodromo_origem|icaoempresa_aerea|numero_voo|   partida_prevista|       partida_real|situacao_voo|
+-------------------+-------------------+------------------+--------------------+-----------------+---------------------+--------------------+-----------------+----------+-------------------+-------------------+------------+
|2021-11-12 08:30:00|2021-11-12 08:24:00|                 0|                 N/A|                X|                 KORD|                SBGR|              UAL|      0844|2021-11-11 22:00:00|2021-11-11 22:14:00|   REALIZADO|
|2021-11-15 08:30:00|2021-11-15 08:05:00|                 0|                 N/A|                X| 

## Carregar dos dados de AIR_CIA
* Normalizar o cabeçalho para snake case
* Separar a coluna 'ICAO IATA' em duas colunas, seu conteúdo está separado por espaço e pode não conter o código IATA, caso não contenha o código IATA, deixe o valor nulo.
* Salvar estes dados

In [7]:
#Carregamento dos dados de AIR_CIA
AIE_CIA_PATH = '/content/spark-warehouse/AIR_CIA/ANAC_20211220_*.csv'

df_air_cia = (
    spark
    .read
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('sep', ';')
    .csv(AIE_CIA_PATH)
)

df_air_cia.printSchema()
print('total de registros: ', df_air_cia.count())

root
 |-- Razão Social: string (nullable = true)
 |-- ICAO IATA: string (nullable = true)
 |-- CNPJ: string (nullable = true)
 |-- Atividades Aéreas: string (nullable = true)
 |-- Endereço Sede: string (nullable = true)
 |-- Telefone: string (nullable = true)
 |-- E-Mail: string (nullable = true)
 |-- Decisão Operacional: string (nullable = true)
 |-- Data Decisão Operacional: string (nullable = true)
 |-- Validade Operacional: string (nullable = true)

total de registros:  20


In [8]:
# Normalizar o cabeçalho AIR_CIA
def normalizer_headers(words: str) -> str:
    """
    :param: Atividades Aéreas
    :return: atividades_aereas
    """
    assert isinstance(words, str)

    old_words = ''.join(old for old in unicodedata.normalize('NFKD', words)
    if not unicodedata.combining(old))
    return old_words.replace(' ', '_').replace('-','_').lower()

rename_cols = {col_name: normalizer_headers(col_name) for col_name in df_air_cia.columns}
df_air_cia = df_air_cia.select([when(col(c) == "", None).otherwise(col(c)).alias(rename_cols.get(c, c)) for c in df_air_cia.columns])

# Visualiza cabeçalhos normalizados
print(df_air_cia.columns)

['razao_social', 'icao_iata', 'cnpj', 'atividades_aereas', 'endereco_sede', 'telefone', 'e_mail', 'decisao_operacional', 'data_decisao_operacional', 'validade_operacional']


In [9]:
# Separar a coluna 'ICAO IATA'
df_air_cia = df_air_cia.withColumn('icao', substring('icao_iata', 0,3))\
                        .withColumn('iata', when(col('icao_iata') == '', None).otherwise(substring('icao_iata', 4,6)))\
                        .drop(col('icao_iata'))
df_air_cia.show(5)

+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+----+----+
|        razao_social|              cnpj|   atividades_aereas|       endereco_sede|            telefone|              e_mail| decisao_operacional|data_decisao_operacional|validade_operacional|icao|iata|
+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+----+----+
|ABSA - AEROLINHAS...|00.074.635/0001-33|TRANSPORTE AÉREO ...|AEROPORTO INTERNA...|      (11) 5582-8055|      gar@tam.com.br|       DECISÃO Nº 41|              22/04/2015|          23/04/2025| LTG|  M3|
|AEROSUL TÁXI AÉRE...|27.315.694/0001-02|SERVIÇOS AÉREOS P...|RODOVIA PR 218, K...|      (43) 3176-4030|operacoes@aerosul...|      DECISÃO Nº 282|              10/02/2021|                n

## Criar nova tabela aerodromos
* Através da API https://rapidapi.com/Active-api/api/airport-info/ trazer os aeródramos através do código ICAO presente nos dados de VRA.
* Salvar estes dados

In [10]:
# Tabela aerodromos baseados no codgos ICAO VRA
def get_data_api(codgos):
    """Get data URL."""
    try:
        url = "https://airport-info.p.rapidapi.com/airport"
        headers = {
            'x-rapidapi-host': "airport-info.p.rapidapi.com",
            'x-rapidapi-key': "4e66b5eee7mshe07cecc516f5f82p1d7870jsnece9b9399d9e"
        }
        response = []
        for cod in codgos:
            querystring = {"icao": cod}
            resp = requests.request("GET", url, headers=headers, params=querystring).json()

            # valida o codgo icao.
            if not 'error' in resp:

                # trata o name
                resp['name'] = str(resp['name']).replace("'", '')

                # trata street
                resp['street'] = str(resp['street']).replace("'", '')
                
                response.append(resp)
       
        return response

    except Exception as e:
        print(f'Não foi possivel obter os dados da requisição HTTP.. Erro: {e}')



In [11]:
# codgos ICAO distintos
filter_icao_vra_origem = df_vra.select('icaoaerodromo_origem').distinct()
filter_icao_vra_destino = df_vra.select('icaoaerodromo_destino').distinct()

# lista de codgos
icao_vra_origem=filter_icao_vra_origem.collect()
icao_vra_destino=filter_icao_vra_destino.collect()

# Info aerodromos
aerodromos_origem = get_data_api(icao_vra_origem)

aerodromos_destino = get_data_api(icao_vra_destino)


In [12]:
# Tratando os dados
def cast_col(data,ncol=None):
    cast = [c[ncol] if c[ncol] != 0 else 0.0 for c in data]
    for i in data:
        i[ncol] = cast
    return data

aerodromos_origem = cast_col(aerodromos_origem, 'latitude')
aerodromos_origem = cast_col(aerodromos_origem, 'longitude')
aerodromos_destino = cast_col(aerodromos_destino, 'latitude')
aerodromos_destino = cast_col(aerodromos_destino, 'longitude')


In [13]:
# Salvando os dataframes
df_aerodromos_origem = spark.createDataFrame(aerodromos_origem)
df_aerodromos_origem.show(5)

+----------------+-------------+-----------+-------------------+----+----+----+--------------------+--------------------+--------------------+--------------------+-----------------+-----------+-------------+--------------------+-------------+----+--------------------+
|            city|      country|country_iso|             county|iata|icao|  id|            latitude|            location|           longitude|                name|            phone|postal_code|        state|              street|street_number| uct|             website|
+----------------+-------------+-----------+-------------------+----+----+----+--------------------+--------------------+--------------------+--------------------+-----------------+-----------+-------------+--------------------+-------------+----+--------------------+
|Roissy-en-France|       France|         FR|                   | CDG|LFPG|1213|[49.00969, -16.43...|Paris, Île-de-Fra...|[2.5479245, -39.0...|Charles de Gaulle...|+33 1 70 36 39 50|      95700|

In [14]:
df_aerodromos_destino = spark.createDataFrame(aerodromos_destino)
df_aerodromos_destino.show(5)

+----------------+-------------+-----------+-------------------+----+----+----+--------------------+--------------------+--------------------+--------------------+-----------------+-----------+-------------+--------------------+-------------+----+--------------------+
|            city|      country|country_iso|             county|iata|icao|  id|            latitude|            location|           longitude|                name|            phone|postal_code|        state|              street|street_number| uct|             website|
+----------------+-------------+-----------+-------------------+----+----+----+--------------------+--------------------+--------------------+--------------------+-----------------+-----------+-------------+--------------------+-------------+----+--------------------+
|Roissy-en-France|       France|         FR|                   | CDG|LFPG|1213|[49.00969, -16.43...|Paris, Île-de-Fra...|[2.5479245, -39.0...|Charles de Gaulle...|+33 1 70 36 39 50|      95700|

#Criar as seguintes views (Priorize o uso de SQL para esta parte):
Para cada companhia aérea trazer a rota mais utilizada com as seguintes informações:
* Razão social da companhia aérea
* Nome Aeroporto de Origem
* ICAO do aeroporto de origem
* Estado/UF do aeroporto de origem
* Nome do Aeroporto de Destino
* ICAO do Aeroporto de destino
* Estado/UF do aeroporto de destino

In [15]:
# Views
df_air_cia.createOrReplaceTempView("empresa")
df_aerodromos_origem.createOrReplaceTempView("origem")
df_aerodromos_destino.createOrReplaceTempView("destino")
df_vra.createOrReplaceTempView('voos')

In [16]:
str_query = """select
                emp.razao_social,
                org.name name_origem,
                org.icao icao_origem, 
                org.state state_origem, 
                org.country_iso uf_origem,
                dest.name name_dest,
                dest.icao icao_dest,
                dest.state state_dest,
                dest.country_iso uf_dest,
                count(concat(vo.icaoaerodromo_origem,"-",vo.icaoaerodromo_destino)) as total_voos
                from voos vo
                left join empresa emp on emp.icao = vo.icaoempresa_aerea
                left join origem org on vo.icaoaerodromo_origem = org.icao
                left join destino dest on vo.icaoaerodromo_destino = dest.icao
                group by 1,2,3,4,5,6,7,8,9
                order by total_voos desc"""


# Quantidade de voos por Empresa.                       
contagem_voos = spark.sql(str_query).show(5)
                                

+--------------------+--------------------+-----------+--------------+---------+--------------------+---------+--------------+-------+----------+
|        razao_social|         name_origem|icao_origem|  state_origem|uf_origem|           name_dest|icao_dest|    state_dest|uf_dest|total_voos|
+--------------------+--------------------+-----------+--------------+---------+--------------------+---------+--------------+-------+----------+
|TAM LINHAS AÉREAS...|São Paulo–Congonh...|       SBSP|     São Paulo|       BR|Santos Dumont Air...|     SBRJ|Rio de Janeiro|     BR|      2744|
|TAM LINHAS AÉREAS...|Santos Dumont Air...|       SBRJ|Rio de Janeiro|       BR|São Paulo–Congonh...|     SBSP|     São Paulo|     BR|      2741|
|AZUL LINHAS AÉREA...|Viracopos/Campina...|       SBKP|     São Paulo|       BR|Santos Dumont Air...|     SBRJ|Rio de Janeiro|     BR|      2540|
|AZUL LINHAS AÉREA...|Santos Dumont Air...|       SBRJ|Rio de Janeiro|       BR|Viracopos/Campina...|     SBKP|     São Paul

Para cada aeroporto trazer a companhia aérea com maior atuação no ano com as seguintes informações:
* Nome do Aeroporto
* ICAO do Aeroporto
* Razão social da Companhia Aérea
* Quantidade de Rotas à partir daquele aeroporto
* Quantidade de Rotas com destino àquele aeroporto
* Quantidade total de pousos e decolagens naquele aeroporto

In [37]:
#quantas empresas vooaram nesse aeroporto durante o ano.
str_query = """select
                org.name name_origem,
                date_part('year',to_date(partida_real)) as ano,
                emp.razao_social,
                count(vo.icaoempresa_aerea) total_voos
                from voos vo
                left join origem org on vo.icaoaerodromo_origem = org.icao
                left join empresa emp on emp.icao = vo.icaoempresa_aerea
                group by 1,2,3 order by total_voos desc
                """

df_contagem_voos = spark.sql(str_query)
df_contagem_voos.show(5)

+--------------------+----+--------------------+----------+
|         name_origem| ano|        razao_social|total_voos|
+--------------------+----+--------------------+----------+
|Viracopos/Campina...|2021|AZUL LINHAS AÉREA...|     40758|
|São Paulo–Guarulh...|2021|TAM LINHAS AÉREAS...|     32998|
|São Paulo–Guarulh...|2021|GOL LINHAS AÉREAS...|     22731|
|Tancredo Neves In...|2021|AZUL LINHAS AÉREA...|     20732|
|Recife/Guararapes...|2021|AZUL LINHAS AÉREA...|     20213|
+--------------------+----+--------------------+----------+
only showing top 5 rows



In [38]:
str_query = """
                select icao,
                count(distinct 
                    case when tipo = 'origem' then numero_voo else null end) as qtd_voo_origem,
                count(distinct
                    case when tipo = 'destino' then numero_voo else null end) as qtd_voo_destino
                from 
                (select vo.numero_voo, vo.icaoaerodromo_origem as icao,'origem' as tipo
                from voos vo
                inner join
                (select
                vo.numero_voo,
                min(partida_real) min_partida_real
                from voos vo
                group by numero_voo) as pr
                on vo.numero_voo = pr.numero_voo
                and vo.partida_real = pr.min_partida_real
                union
                select vo.numero_voo, vo.icaoaerodromo_destino as icao , 'destino' as tipo 
                from voos vo
                inner join
                (select
                vo.numero_voo,
                max(chegada_real) max_chegada_real
                from voos vo
                group by numero_voo) as cr
                on vo.numero_voo = cr.numero_voo
                and vo.chegada_real = cr.max_chegada_real)
                group by icao
                
                """

                       
df_contagem_rotas = spark.sql(str_query)
df_contagem_rotas.show(5)

+----+--------------+---------------+
|icao|qtd_voo_origem|qtd_voo_destino|
+----+--------------+---------------+
|SBPS|            81|             71|
|KATL|             2|              3|
|LFPG|             7|              7|
|SBIH|             2|              1|
|EDDK|             1|              0|
+----+--------------+---------------+
only showing top 5 rows



In [39]:
str_query = """ select icao, 
                sum(qtd_pousos) as qtd_pousos,
                sum(qtd_decolagem) as qtd_decolagem
                from
                (select
                vo.icaoaerodromo_destino as icao,
                0 as qtd_decolagem,
                count(distinct vo.numero_voo) as qtd_pousos
                from voos vo
                group by 1
                union
                select
                vo.icaoaerodromo_origem as icao,
                count(distinct vo.numero_voo) as qtd_decolagem,
                0 as qtd_pousos
                from voos vo
                group by 1) voos
                group by icao
                """

                       
df_contagem_pousos = spark.sql(str_query)
df_contagem_pousos.show(5)

+----+----------+-------------+
|icao|qtd_pousos|qtd_decolagem|
+----+----------+-------------+
|LYBE|         1|            0|
|LFPG|        10|           11|
|SBPS|       184|          182|
|SBBU|         1|            0|
|KATL|         4|            2|
+----+----------+-------------+
only showing top 5 rows



#Extras:
* Descrever qual estratégia você usaria para ingerir estes dados de forma incremental caso precise capturar esses dados a cada mes?
    -  Utilizaria o aiflow para schedular todo o processo.
* Justifique em cada etapa sobre a escalabilidade da tecnologia utilizada.
    - Utilizaria pyspark e python para o processamento e ETL dos dados, aws s3 para armazenamento dos dados brutos, aws redshift para disponibilizar os dados. 
* Justifique as camadas utilizadas durante o processo de ingestão até a disponibilização dos dados.
    - A camadas seria: a coleta/processamento, mensageria, land zone, raw zone, trusted zone, refined zone.