In [None]:
import os
import json
import time

from utils import init_spark, remove_accents, to_snake_case, value_splitter
from airport_api import AirportApi

import pandas as pd

from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import *

### Diretórios dos dados

In [None]:
relative_air_cia_dir = '../../AIR_CIA'
relative_vra_dir = '../../VRA'

target_processed_data_dir = './processed_data'

### Inicialização do PySpark

In [None]:
spark = init_spark()

In [None]:
processed_air_cia = raw_air_cia = spark.read.options(header = True, delimiter=';').csv(f'{relative_air_cia_dir}')
processed_vra = raw_vra = spark.read.json(f'{relative_vra_dir}')

### Mudar nomes de coluna para snake case

In [None]:
for column_name in raw_air_cia.columns:
    processed_air_cia = processed_air_cia.withColumnRenamed(column_name, to_snake_case(remove_accents(column_name)))

for column_name in raw_vra.columns:
    processed_vra = processed_vra.withColumnRenamed(column_name, to_snake_case(remove_accents(column_name)))

## Procedimentos VRA

### `icao_empresa_aerea` para `iata_empresa_aerea`
Os valores desta coluna, na verdade estão listando os códigos IATA, é possível verificar isso através da chamada de API, quando o mesmo código não existe no índice de ICAO.

In [None]:
processed_vra = processed_vra.withColumnRenamed('icao_empresa_aerea', 'iata_empresa_aerea')

### Colunas de timestamp para `DateType`

In [None]:
processed_vra = processed_vra.withColumn('partida_prevista', to_timestamp(col('partida_prevista')))\
    .withColumn('chegada_prevista', to_timestamp(col('chegada_prevista')))\
    .withColumn('partida_real', to_timestamp(col('partida_real')))\
    .withColumn('chegada_real', to_timestamp(col('chegada_real')))

In [None]:
processed_vra.write.format('parquet').mode('overwrite').save(f'{target_processed_data_dir}/vra/vra.parquet')

## Procedimentos AIR_CIA

### Separação de colunas `icao_iata`

In [None]:
processed_air_cia = processed_air_cia.withColumn('iata', value_splitter(col('icao_iata'))[0])\
    .withColumn('icao', value_splitter(col('icao_iata'))[1])\
        .drop(col('icao_iata'))

In [None]:
processed_air_cia.show()

In [None]:
processed_air_cia.write.format('parquet').mode('overwrite').save(f'{target_processed_data_dir}/air_cia/air_cia.parquet')

## Procedimentos aerodromos

In [None]:
processed_vra = spark.read.parquet(f'{target_processed_data_dir}/vra/vra.parquet')

### Coletar IATA e ICAO

In [None]:
key = "my_rapidapi_key" # Adiquirir chave para API em https://rapidapi.com/Active-api/api/airport-info
host = "airport-info.p.rapidapi.com"

api = AirportApi(key, host)

In [None]:
collected_icao = []
collected_iata = []

for row in processed_vra.select('icao_aerodromo_origem', 'icao_aerodromo_destino', 'iata_empresa_aerea').collect():
    if row['icao_aerodromo_origem'] not in collected_icao:
        collected_icao.append(row['icao_aerodromo_origem'])
    if row['icao_aerodromo_destino'] not in collected_icao:
        collected_icao.append(row['icao_aerodromo_destino'])

    if row['iata_empresa_aerea'] not in collected_iata:
        collected_iata.append(row['iata_empresa_aerea'])

In [None]:
icao_data = []

for icao in collected_icao:
    this_icao = api.get_airport_info_by_icao(icao)
    this_icao = json.loads(this_icao)

    if this_icao.get('error', None):
        this_icao['icao'] = icao
        
    icao_data.append(this_icao)

iata_data = []

for iata in collected_iata:
    this_iata = api.get_airport_info_by_iata(iata)
    this_iata = json.loads(this_iata)

    if this_iata.get('error', None):
        this_iata['iata'] = iata

    iata_data.append(this_iata)

In [None]:
icao_dataframe = pd.DataFrame(data = icao_data + iata_data)

In [None]:
filtered_airport = icao_dataframe[icao_dataframe.error != {'text': 'No airport found'}]
not_found_airport = icao_dataframe[icao_dataframe.error == {'text': 'No airport found'}]

filtered_airport['id'] = filtered_airport['id'].astype(int)
filtered_airport['longitude'] = filtered_airport['longitude'].astype(float)
filtered_airport['latitude'] = filtered_airport['latitude'].astype(float)

In [None]:
aerodromo_schema = StructType([
	StructField('id', IntegerType(), True),
	StructField('iata', StringType(), True),
	StructField('icao', StringType(), True),
	StructField('name', StringType(), True),
	StructField('location', StringType(), True),
	StructField('street_number', StringType(), True),
	StructField('street', StringType(), True),
	StructField('city', StringType(), True),
	StructField('county', StringType(), True),
	StructField('state', StringType(), True),
	StructField('country_iso', StringType(), True),
	StructField('country', StringType(), True),
	StructField('postal_code', StringType(), True),
	StructField('phone', StringType(), True),
	StructField('latitude', DoubleType(), True),
	StructField('longitude', DoubleType(), True),
	StructField('utc', IntegerType(), True),
	StructField('websiite', StringType(), True)
])

aerodromo = spark.createDataFrame(filtered_airport)
aerodromo = aerodromo.drop(col('error'))

In [None]:
aerodromo.write.format('parquet').mode('overwrite').save(f'{target_processed_data_dir}/aerodromo/aerodromo.parquet')

## Views

In [None]:
aerodromo = spark.read.parquet(f'{target_processed_data_dir}/aerodromo/aerodromo.parquet')
vra = spark.read.parquet(f'{target_processed_data_dir}/vra/vra.parquet')
air_cia = spark.read.parquet(f'{target_processed_data_dir}/air_cia/air_cia.parquet')

aerodromo.createOrReplaceTempView('aerodromo')
vra.createOrReplaceTempView('vra')
air_cia.createOrReplaceTempView('air_cia')

### Origem-destino, mais frequente por empresa aérea

In [None]:
origin_destiny_most_frequence_query = """
SELECT
    iata_empresa_aerea,
    ac.razao_social,
    oad.name as nome_origem,
    icao_aerodromo_origem as icao_origem,
    oad.state as estado_origem,
    dad.name as nome_destino,
    icao_aerodromo_destino as icao_destino,
    dad.state as estado_destino,
    num_voos
FROM (
    -- Rota mais frequente para cada empresa aérea
    SELECT
       iata_empresa_aerea,
       icao_aerodromo_origem,
       icao_aerodromo_destino,
       COUNT(CONCAT(icao_aerodromo_origem, '_', icao_aerodromo_destino)) AS num_voos,
       -- Falta remover trajetos empatados em numeros
       row_number() OVER (PARTITION BY iata_empresa_aerea ORDER BY count(*) desc) AS rank
    FROM vra
    GROUP BY iata_empresa_aerea, icao_aerodromo_origem, icao_aerodromo_destino
    ORDER BY num_voos desc
) AS vra_most
LEFT JOIN air_cia ac ON vra_most.iata_empresa_aerea = ac.iata
LEFT JOIN aerodromo oad ON vra_most.icao_aerodromo_origem = oad.icao
LEFT JOIN aerodromo dad ON vra_most.icao_aerodromo_destino = dad.icao
WHERE rank = 1
ORDER BY vra_most.num_voos DESC
"""

airline_origin_destiny_view = spark.sql(origin_destiny_most_frequence_query)
airline_origin_destiny_view.show(10, truncate = False)

### Contagem total de decolagens e pouso no Aeroporto com a empresa mais frequênte
Totaliza o número de decolagens e pousos realizados no aeroporto pela empresa aérea que mais atuou no mesmo.

Contabiliza o total de vezes que o aeroporto foi usado como origem ou destino pela empresa; é possível filtrar por voos realizados para totalizar o número de decolagens e pousos reais.

In [None]:
airport_to_airline_relation = """
SELECT
    ad.name as icao_name,
    ad_to_cia.icao as icao,
    ad_to_cia.iata_empresa_aerea as iata,
    ac.razao_social as iata_razao_social,
    o_count.num_origem as origem_total,
    d_count.num_destino as destino_total,
    ad_to_cia.num_cia as voos_total
FROM (
    -- Agrupamento e contagem de relação com empresa aérea e aeroporto
    SELECT
        ad.icao,
        v.iata_empresa_aerea,
        COUNT(v.iata_empresa_aerea) as num_cia,
        row_number() OVER (PARTITION BY ad.icao ORDER BY count(*) desc) AS rank
    FROM aerodromo ad
    LEFT JOIN vra v ON v.icao_aerodromo_origem = ad.icao OR v.icao_aerodromo_destino = ad.icao
    WHERE v.situacao_voo = "REALIZADO"
    GROUP BY ad.icao, v.iata_empresa_aerea
) AS ad_to_cia
LEFT JOIN (
    -- Contagem de total de voos como origem da empresa no aeroporto
    SELECT
        v.icao_aerodromo_origem as icao_origem,
        v.iata_empresa_aerea as iata_origem,
        COUNT(v.iata_empresa_aerea) as num_origem
    FROM vra v
    -- WHERE v.situacao_voo = "REALIZADO" -- Filtra rotas efetivamente realizadas, ao comentar, totaliza rotas registradas
    GROUP BY v.icao_aerodromo_origem, v.iata_empresa_aerea
) as o_count
ON o_count.icao_origem = ad_to_cia.icao and o_count.iata_origem = ad_to_cia.iata_empresa_aerea
LEFT JOIN (
    -- Contagem de total de voos como destino da empresa no aeroporto
    SELECT
        v.icao_aerodromo_destino as icao_destino,
        v.iata_empresa_aerea as iata_destino,
        COUNT(v.iata_empresa_aerea) as num_destino
    FROM vra v
    -- WHERE v.situacao_voo = "REALIZADO" -- Filtra rotas efetivamente realizadas, ao comentar, totaliza rotas registradas
    GROUP BY v.icao_aerodromo_destino, v.iata_empresa_aerea
) as d_count
ON d_count.icao_destino = ad_to_cia.icao and d_count.iata_destino = ad_to_cia.iata_empresa_aerea
LEFT JOIN aerodromo ad ON ad.icao = ad_to_cia.icao
LEFT JOIN air_cia ac ON ac.iata = ad_to_cia.iata_empresa_aerea

WHERE ad_to_cia.rank = 1
ORDER BY ad_to_cia.num_cia DESC
"""

airport_airline_view = spark.sql(airport_to_airline_relation)
airport_airline_view.show(10, truncate = False)

In [None]:
airline_origin_destiny_view.write.format('parquet').mode('overwrite').save(f'{target_processed_data_dir}/airline_origin_destiny_view/airline_origin_destiny_view.parquet')
airport_airline_view.write.format('parquet').mode('overwrite').save(f'{target_processed_data_dir}/airport_airline_view/airport_airline_view.parquet')