In [254]:
from pyspark.sql import SparkSession, functions as F

In [255]:
spark = SparkSession.builder.getOrCreate()

## Carregar dos dados de AIR_CIA

In [256]:
df_aircia = spark.read.csv('bronze/AIR_CIA/', sep=';', header=True)

In [257]:
df_aircia.show(vertical=True)

-RECORD 0----------------------------------------
 Razão Social             | ABSA - AEROLINHAS... 
 ICAO IATA                | LTG M3               
 CNPJ                     | 00.074.635/0001-33   
 Atividades Aéreas        | TRANSPORTE AÉREO ... 
 Endereço Sede            | AEROPORTO INTERNA... 
 Telefone                 | (11) 5582-8055       
 E-Mail                   | gar@tam.com.br       
 Decisão Operacional      | DECISÃO Nº 41        
 Data Decisão Operacional | 22/04/2015           
 Validade Operacional     | 23/04/2025           
-RECORD 1----------------------------------------
 Razão Social             | AEROSUL TÁXI AÉRE... 
 ICAO IATA                | ASO 2S               
 CNPJ                     | 27.315.694/0001-02   
 Atividades Aéreas        | SERVIÇOS AÉREOS P... 
 Endereço Sede            | RODOVIA PR 218, K... 
 Telefone                 | (43) 3176-4030       
 E-Mail                   | operacoes@aerosul... 
 Decisão Operacional      | DECISÃO Nº 282       


### Normalizar o cabeçalho para snake case

In [258]:
##CRIANDO DICIONARIO COM OS ACENTOS e o Ç ALEM DE CARACTERES ESPECIAIS (NECESSITA A ADICAO DE MAIS COLOQUEI APENAS O - PARA SERVIR DE EXEMPLO)
accented_characters = {
 'á': 'a', 'à': 'a', 'â': 'a', 'ã': 'a', 'ä': 'a',
    'é': 'e', 'è': 'e', 'ê': 'e', 'ë': 'e',
    'í': 'i', 'ì': 'i', 'î': 'i', 'ï': 'i',
    'ó': 'o', 'ò': 'o', 'ô': 'o', 'õ': 'o', 'ö': 'o',
    'ú': 'u', 'ù': 'u', 'û': 'u', 'ü': 'u',
    'ç': 'c','-':''
    }

##TRANSFORMANDO O DICIONARIO EM SUA VERSAO "UNICODE" PARA PODER USAR O METODO STR.TRANSLATE
translation_table = str.maketrans(accented_characters)
novas_colunas = []
for column in df_aircia.columns:
    novas_colunas.append(column.lower().translate(translation_table).replace(' ','_'))

## USANDO UM FOR PARA TROCAR TODOS OS NOMES ANTIGOS PELOS NOVOS NOMES
## O ZIP JUNTA AS DUAS LISTAS (A LISTA ORIGINAL DE COLUNAS COM A LISTA QUE EU CRIEI COM OS NOVOS NOMES)
## A FUNCAO WITHCOLUMNRENAMED TROCA OS NOME DAS COLUNAS (FUNCIONA COMO SE FOSSE UM SELECT COLUMN AS EXAMPLE)
for nome_antigo,nome_novo in zip(df_aircia.columns,novas_colunas):
    df_aircia = df_aircia.withColumnRenamed(nome_antigo,nome_novo)

In [259]:
df_aircia.show(vertical=True)

-RECORD 0----------------------------------------
 razao_social             | ABSA - AEROLINHAS... 
 icao_iata                | LTG M3               
 cnpj                     | 00.074.635/0001-33   
 atividades_aereas        | TRANSPORTE AÉREO ... 
 endereco_sede            | AEROPORTO INTERNA... 
 telefone                 | (11) 5582-8055       
 email                    | gar@tam.com.br       
 decisao_operacional      | DECISÃO Nº 41        
 data_decisao_operacional | 22/04/2015           
 validade_operacional     | 23/04/2025           
-RECORD 1----------------------------------------
 razao_social             | AEROSUL TÁXI AÉRE... 
 icao_iata                | ASO 2S               
 cnpj                     | 27.315.694/0001-02   
 atividades_aereas        | SERVIÇOS AÉREOS P... 
 endereco_sede            | RODOVIA PR 218, K... 
 telefone                 | (43) 3176-4030       
 email                    | operacoes@aerosul... 
 decisao_operacional      | DECISÃO Nº 282       


### Separar a coluna 'ICAO IATA' em duas colunas, seu conteúdo está separado por espaço e pode não conter o código IATA, caso não contenha o código IATA, deixe o valor nulo.

In [260]:
df_aircia = (
    df_aircia
    .withColumn('icao',F.split(F.col('icao_iata'),' ')[0])
    .withColumn('iata',F.split(F.col('icao_iata'),' ')[1])
    .drop(F.col('icao_iata'))
)

df_aircia.show(vertical=True)

-RECORD 0----------------------------------------
 razao_social             | ABSA - AEROLINHAS... 
 cnpj                     | 00.074.635/0001-33   
 atividades_aereas        | TRANSPORTE AÉREO ... 
 endereco_sede            | AEROPORTO INTERNA... 
 telefone                 | (11) 5582-8055       
 email                    | gar@tam.com.br       
 decisao_operacional      | DECISÃO Nº 41        
 data_decisao_operacional | 22/04/2015           
 validade_operacional     | 23/04/2025           
 icao                     | LTG                  
 iata                     | M3                   
-RECORD 1----------------------------------------
 razao_social             | AEROSUL TÁXI AÉRE... 
 cnpj                     | 27.315.694/0001-02   
 atividades_aereas        | SERVIÇOS AÉREOS P... 
 endereco_sede            | RODOVIA PR 218, K... 
 telefone                 | (43) 3176-4030       
 email                    | operacoes@aerosul... 
 decisao_operacional      | DECISÃO Nº 282       


### Salvar estes dados

In [279]:
df_aircia.coalesce(1).write.mode('overwrite').parquet('silver/AIR_CIA/aircia.parquet')

## Carrega os dados VRA

In [262]:
##PARA EVITAR O USO EXCESSIVO DE MEMORIA NO IFFERSCHEMA DO SPARK, ESTOU PEGANDO UM UNICO ARQUIVO E USANDO O SCHEMA DELE PARA TODOS
df_vra = spark.read.json('bronze/VRA/VRA_20213.json')
schema = df_vra.schema

## DESALOCO A VARIAVEL DF_VRA COMO BOA PRATICA POIS VOU REUSALO
del df_vra
df_vra = spark.read.json('bronze/VRA/',schema=schema)
df_vra.show(vertical=True)

-RECORD 0-----------------------------------
 ChegadaPrevista      | 2021-01-19 14:45:00 
 ChegadaReal          | 2021-01-19 19:22:00 
 CódigoAutorização    | 7                   
 CódigoJustificativa  | N/A                 
 CódigoTipoLinha      | C                   
 ICAOAeródromoDestino | SBEG                
 ICAOAeródromoOrigem  | SBBE                
 ICAOEmpresaAérea     | MWM                 
 NúmeroVoo            | 9702                
 PartidaPrevista      | 2021-01-19 12:35:00 
 PartidaReal          | 2021-01-19 17:17:00 
 SituaçãoVoo          | REALIZADO           
-RECORD 1-----------------------------------
 ChegadaPrevista      | 2021-01-26 12:05:00 
 ChegadaReal          | 2021-01-26 12:04:00 
 CódigoAutorização    | 7                   
 CódigoJustificativa  | N/A                 
 CódigoTipoLinha      | C                   
 ICAOAeródromoDestino | SBPA                
 ICAOAeródromoOrigem  | SBGL                
 ICAOEmpresaAérea     | MWM                 
 NúmeroVoo

### Normalizar o cabeçalho para snake case

In [263]:
import re

def pascal_to_snake(pascal_case_string):
    # Use regular expression to insert underscores before uppercase letters
    snake_case_string = re.sub(r'(?<=.)([A-Z])', r'_\1', pascal_case_string)
    # Convert the string to lowercase
    snake_case_string = snake_case_string.lower()
    return snake_case_string

# Example usage:
pascal_string = "PascalCaseString"
snake_string = pascal_to_snake(pascal_string)
print(snake_string)  # Output: "pascal_case_string"


pascal_case_string


In [264]:
 ##TRANSFORMANDO O DICIONARIO EM SUA VERSAO "UNICODE" PARA PODER USAR O METODO STR.TRANSLATE O ACCENTD JÁ FOI DEFINIDO ANTERIORMENTE
novas_colunas = []
for column in df_vra.columns:
#     novas_colunas.append(pascal_to_snake(column))
    novas_colunas.append(pascal_to_snake(column.replace('ICAO','Icao').translate(translation_table)))

## USANDO UM FOR PARA TROCAR TODOS OS NOMES ANTIGOS PELOS NOVOS NOMES
## O ZIP JUNTA AS DUAS LISTAS (A LISTA ORIGINAL DE COLUNAS COM A LISTA QUE EU CRIEI COM OS NOVOS NOMES)
## A FUNCAO WITHCOLUMNRENAMED TROCA OS NOME DAS COLUNAS (FUNCIONA COMO SE FOSSE UM SELECT COLUMN AS EXAMPLE)
for nome_antigo,nome_novo in zip(df_vra.columns,novas_colunas):
    df_vra = df_vra.withColumnRenamed(nome_antigo,nome_novo)

In [265]:
df_vra.show(vertical=True)

-RECORD 0-------------------------------------
 chegada_prevista       | 2021-01-19 14:45:00 
 chegada_real           | 2021-01-19 19:22:00 
 codigo_autorizacao     | 7                   
 codigo_justificativa   | N/A                 
 codigo_tipo_linha      | C                   
 icao_aerodromo_destino | SBEG                
 icao_aerodromo_origem  | SBBE                
 icao_empresa_aerea     | MWM                 
 numero_voo             | 9702                
 partida_prevista       | 2021-01-19 12:35:00 
 partida_real           | 2021-01-19 17:17:00 
 situacao_voo           | REALIZADO           
-RECORD 1-------------------------------------
 chegada_prevista       | 2021-01-26 12:05:00 
 chegada_real           | 2021-01-26 12:04:00 
 codigo_autorizacao     | 7                   
 codigo_justificativa   | N/A                 
 codigo_tipo_linha      | C                   
 icao_aerodromo_destino | SBPA                
 icao_aerodromo_origem  | SBGL                
 icao_empresa

### Salva os dados

In [280]:
df_vra.coalesce(1).write.mode('overwrite').parquet('silver/VRA/vra.parquet')

## Carrega os dados aerodromos

In [267]:
## TRANSFORMANDO DOIS SELECTS DISTINCT EM LISTA E JUNTANDO ELES EM UMA LISTA SÓ, LOGO EM SEGUIDA TRANSFORMO ELES EM UM CONJUNTO USNADO O SET() PARA REALZIAR UMA DEDUPLICAÇÂO ENTRE AS LISTAS DEPOIS TRANSFORNO O CONJUNTO NOVAMENTE EM UMA LISTA COM O LIST()
var_icao = list(
    set(
        [i['icao_aerodromo_destino'] for i in df_vra.select('icao_aerodromo_destino').distinct().collect()]
        +
        [i['icao_aerodromo_origem'] for i in df_vra.select('icao_aerodromo_origem').distinct().collect()]
    )
)

In [268]:
import requests

def get_airport_info(icao):
    url = "https://airport-info.p.rapidapi.com/airport"

    querystring = {"icao":icao}

    headers = {
        "X-RapidAPI-Key": "55207401d8mshd8f1d164058cbebp132c1ajsne2a24955c36e",
        "X-RapidAPI-Host": "airport-info.p.rapidapi.com"
    }

    response = requests.get(url, headers=headers, params=querystring)

    return(response.json())

In [269]:
airport_info = [get_airport_info(icao) for icao in var_icao]

In [270]:
###EXCLUDING ERRORS FROM API
airport_info = [i for i in airport_info if len(i)>1]

In [271]:
pd.DataFrame(airport_info).to_parquet('silver/aerodromos/aerodromos.parquet')

## Criação das views

In [314]:
### LEITURA DAS TABELAS
df_vra = spark.read.parquet('silver/VRA/vra.parquet')

df_aircia = spark.read.parquet('silver/AIR_CIA/aircia.parquet')

df_aerodromos = spark.read.parquet('silver/aerodromos/aerodromos.parquet')

1) Para cada companhia aérea trazer a rota mais utilizada com as seguintes informações:
- Razão social da companhia aérea - air cia
- Nome Aeroporto de Origem - aerodromos
- ICAO do aeroporto de origem - vra
- Estado/UF do aeroporto de origem - aerodromos
- Nome do Aeroporto de Destino - aerodromos
- ICAO do Aeroporto de destino - vra
- Estado/UF do aeroporto de destino - aerodromos


In [323]:
# CRIA AS VIEWS TEMPORÁRIAS

df_aircia.createTempView('AIR_CIA')
df_vra.createTempView('VRA')
df_aerodromos.createTempView('aerodromo')

True

In [394]:
df = spark.sql('''
    SELECT 
            RANKED_BASE.qtd_voos, 
            RANKED_BASE.razao_social,
            RANKED_BASE.icao_aerodromo_destino, 
            RANKED_BASE.icao_aerodromo_origem,
            AEROPORTO_ORIGEM.name as name_aeroporto_origem, 
            AEROPORTO_ORIGEM.state as state_aeroporto_origem, 
            AEROPORTO_DESTINO.name as name_aeroporto_destino, 
            AEROPORTO_DESTINO.state as state_aeroporto_destino
        FROM (
        SELECT 
        RANK() over(PARTITION BY BASE.razao_social order by qtd_voos DESC) as rank, *
        FROM (
            SELECT
                COUNT(*) AS qtd_voos,
                A.razao_social,
                V.icao_aerodromo_destino,
                V.icao_aerodromo_origem
            FROM AIR_CIA A
            INNER JOIN VRA V
            ON A.icao = V.icao_empresa_aerea
            GROUP BY
                A.razao_social,
                V.icao_empresa_aerea,
                V.icao_aerodromo_destino,
                V.icao_aerodromo_origem
            ORDER BY 1 DESC
        ) BASE
    )RANKED_BASE
   LEFT JOIN aerodromo AEROPORTO_DESTINO ON AEROPORTO_DESTINO.icao = RANKED_BASE.icao_aerodromo_destino
LEFT JOIN aerodromo AEROPORTO_ORIGEM ON AEROPORTO_ORIGEM.icao = RANKED_BASE.icao_aerodromo_origem
   where rank = 1
    
''').show(vertical=True, truncate=100)




-RECORD 0------------------------------------------------------------------------------------------
 qtd_voos                | 377                                                                     
 razao_social            | ABSA - AEROLINHAS BRASILEIRAS S.A.                                      
 icao_aerodromo_destino  | KMIA                                                                    
 icao_aerodromo_origem   | SEQM                                                                    
 name_aeroporto_origem   | Mariscal Sucre International Airport                                    
 state_aeroporto_origem  | Pichincha                                                               
 name_aeroporto_destino  | Miami International Airport                                             
 state_aeroporto_destino | Florida                                                                 
-RECORD 1------------------------------------------------------------------------------------------


In [399]:
spark.sql('''
WITH base_crua as (
    SELECT
        COUNT(*) AS qtd_voos,
        A.razao_social,
        V.icao_aerodromo_destino,
        V.icao_aerodromo_origem
    FROM AIR_CIA A
    INNER JOIN VRA V
    ON A.icao = V.icao_empresa_aerea
    GROUP BY
        A.razao_social,
        V.icao_empresa_aerea,
        V.icao_aerodromo_destino,
        V.icao_aerodromo_origem
),
ranked_base as (
    SELECT 
        RANK() over(PARTITION BY razao_social order by qtd_voos DESC) as rank, *
    FROM base_crua
),


select * from base_crua
''')

DataFrame[qtd_voos: bigint, razao_social: string, icao_aerodromo_destino: string, icao_aerodromo_origem: string]

2) Para cada aeroporto trazer a companhia aérea com maior atuação no ano com as seguintes informações:
- Nome do Aeroporto - aerodromos
- ICAO do Aeroporto - aerodromos
- Razão social da Companhia Aérea - air cia
- Quantidade de Rotas à partir daquele aeroporto - 
- Quantidade de Rotas com destino àquele aeroporto - 
- Quantidade total de pousos e decolagens naquele aeroporto - 

In [307]:

df_aircia.show(vertical=True)

-RECORD 0----------------------------------------
 razao_social             | ABSA - AEROLINHAS... 
 cnpj                     | 00.074.635/0001-33   
 atividades_aereas        | TRANSPORTE AÉREO ... 
 endereco_sede            | AEROPORTO INTERNA... 
 telefone                 | (11) 5582-8055       
 email                    | gar@tam.com.br       
 decisao_operacional      | DECISÃO Nº 41        
 data_decisao_operacional | 22/04/2015           
 validade_operacional     | 23/04/2025           
 icao                     | LTG                  
 iata                     | M3                   
-RECORD 1----------------------------------------
 razao_social             | AEROSUL TÁXI AÉRE... 
 cnpj                     | 27.315.694/0001-02   
 atividades_aereas        | SERVIÇOS AÉREOS P... 
 endereco_sede            | RODOVIA PR 218, K... 
 telefone                 | (43) 3176-4030       
 email                    | operacoes@aerosul... 
 decisao_operacional      | DECISÃO Nº 282       


In [300]:
df_vra.show(vertical=True)

-RECORD 0-------------------------------------
 chegada_prevista       | 2021-01-19 14:45:00 
 chegada_real           | 2021-01-19 19:22:00 
 codigo_autorizacao     | 7                   
 codigo_justificativa   | N/A                 
 codigo_tipo_linha      | C                   
 icao_aerodromo_destino | SBEG                
 icao_aerodromo_origem  | SBBE                
 icao_empresa_aerea     | MWM                 
 numero_voo             | 9702                
 partida_prevista       | 2021-01-19 12:35:00 
 partida_real           | 2021-01-19 17:17:00 
 situacao_voo           | REALIZADO           
-RECORD 1-------------------------------------
 chegada_prevista       | 2021-01-26 12:05:00 
 chegada_real           | 2021-01-26 12:04:00 
 codigo_autorizacao     | 7                   
 codigo_justificativa   | N/A                 
 codigo_tipo_linha      | C                   
 icao_aerodromo_destino | SBPA                
 icao_aerodromo_origem  | SBGL                
 icao_empresa

In [299]:
df_aerodromos.show(vertical=True)

-RECORD 0-----------------------------
 id            | 6811                 
 iata          | SJK                  
 icao          | SBSJ                 
 name          | Professor Urbano ... 
 location      | São José dos Camp... 
 street_number | 1941                 
 street        | Avenida Brigadeir... 
 city          |                      
 county        | São José dos Campos  
 state         | São Paulo            
 country_iso   | BR                   
 country       | Brazil               
 postal_code   | 12227-000            
 phone         | +55 12 3946-3000     
 latitude      | -23.225304           
 longitude     | -45.861576           
 uct           | -180                 
 website       | http://www.infrae... 
-RECORD 1-----------------------------
 id            | 7410                 
 iata          | TJL                  
 icao          | SBTG                 
 name          | Plínio Alarcom Ai... 
 location      | Três Lagoas, Mato... 
 street_number |         