<a href="https://colab.research.google.com/github/AfonsoFeliciano/30-day-challenges-with-sql/blob/main/Desafio_Eleflow_BigData_Airlines_Solution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<img src="https://cdn.eleflow.com.br/ef-web/wp-content/uploads/2016/08/21181642/Eleflow.png" alt="Eleflow BigData" width="200"/>

# Data engineering capstone

## BigData Airlines

A Eleflow irá atender um novo cliente, a _BigData Airlines_, e você será o engenheiro de dados responsável por fazer a ingestão de dados e preparar algumas tabelas para os cientistas de dados e analistas de dados. 

### Capstone

- Carregar os dados de VRA
  - Normalizar o cabeçalho para snake case
  - Salvar estes dados
- Carregar dos dados de AIR_CIA
  - Normalizar o cabeçalho para snake case
  - Separar a coluna 'ICAO IATA' em duas colunas, seu conteúdo está separado por espaço e pode não conter o código IATA, caso não contenha o código IATA, deixe o valor nulo.
  - Salvar estes dados
- Criar nova tabela aerodromos
  - Através da API [https://rapidapi.com/Active-api/api/airport-info/]() trazer os aeródramos através do código ICAO presente nos dados de VRA.
  - Salvar estes dados
- Criar as seguintes views (Priorize o uso de SQL para esta parte):
  - Para cada companhia aérea trazer a rota mais utilizada com as seguintes informações:
    - Razão social da companhia aérea
    - Nome Aeroporto de Origem
    - ICAO do aeroporto de origem
    - Estado/UF do aeroporto de origem
    - Nome do Aeroporto de Destino
    - ICAO do Aeroporto de destino
    - Estado/UF do aeroporto de destino
  - Para cada aeroporto trazer a companhia aérea com maior atuação no ano com as seguintes informações:
    - Nome do Aeroporto
    - ICAO do Aeroporto
    - Razão social da Companhia Aérea
    - Quantidade de Rotas à partir daquele aeroporto
    - Quantidade de Rotas com destino àquele aeroporto
    - Quantidade total de pousos e decolagens naquele aeroporto

#### Extras:
  - Descrever qual estratégia você usaria para ingerir estes dados de forma incremental caso precise capturar esses dados a cada mes?
  - Justifique em cada etapa sobre a escalabilidade da tecnologia utilizada.
  - Justifique as camadas utilizadas durante o processo de ingestão até a disponibilização dos dados.

#### Observações:
   - Você pode utilizar a tecnologia de sua preferência ou seguir a recomendação:
     - Notebooks Jupyter
     - Google Colab
     - Databricks Community
   - Pode incluir comentários sobre a abordagem de extração/transformação que você está fazendo
   - Pode disponibilizar o projeto via Git, URL ou .zip

<br>

**Candidato:** Afonso de Paula Feliciano

**Tech Recruiter:** Nathália Pires

## Configurando variável com a data e hora inicial

Essa configuração foi utilizada para quantificar o tempo de execução do código desde a configuração do ambiente spark, Extração, Transformação e Escrita dos Dados.

In [1]:
from datetime import datetime
started = datetime.now()

In [2]:
## Configurando ambiente Spark

In [3]:
%%bash

# Instalação java
apt-get update && apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Instalação PySpark
pip install -q pyspark

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:5 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:13 http://security.ubuntu.c

In [4]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

## Realizando o mount do google drive no google colab para realização da leitura e escrita dos arquivos quando necessário.

In [5]:
# Carregando biblioteca do google drive
from google.colab import drive

# Realizando mount e autenticando via e-mail
drive.mount('/content/drive')

Mounted at /content/drive


## Definindo as bibliotecas de apoio durante a execução do projeto

In [230]:
import shutil
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, ArrayType, LongType, DoubleType
from pyspark.sql.functions import split, col, element_at, to_date, to_timestamp, when, lit, length, trim
import requests
import time
sc = spark.sparkContext

## Definindo as variáveis gerais para utilização no projeto

In [7]:
#Variáveis gerais
link_rapidi_api = "https://rapidapi.com/Active-api/api/airport-info/"
path_mounted_google_drive = "/content/drive/MyDrive/Eleflow_Data_Engineer/"
path_transient = "/content/transient/"
path_raw = "/content/raw/"
path_trusted = "/content/trusted/"
path_refined = "/content/refined/"

## Copiando os arquivos com mount em google drive para diretório de transição. 

O objetivo dessa operação é simular uma ingestão de dados/staging area no qual os dados são movidos para uma camada de transição na qual são realizados os tratamentos iniciais evitando qualquer tipo de complicações em um ambiente transacional produtivo. 

### AIRCIA

In [8]:
#Limpa o diretório
!rm -rf '/content/transient/AIR_CIA'

#Realiza a copia e lista os arquivos
if not os.path.exists(path_transient + "AIR_CIA"):
  shutil.copytree(path_mounted_google_drive + "AIR_CIA", path_transient + "AIR_CIA")
  !ls '/content/transient/AIR_CIA'

ANAC_20211220_203627.csv  ANAC_20211220_203643.csv  ANAC_20211220_203733.csv


### VRA

In [9]:
#Limpa o diretório
!rm -rf '/content/transient/VRA'

#Realiza a copia e lista os arquivos
if not os.path.exists(path_transient + "VRA"):
  shutil.copytree(path_mounted_google_drive + "VRA", path_transient + "VRA")
  !ls '/content/transient/VRA'

VRA_202110.json  VRA_20212.json  VRA_20215.json  VRA_20218.json
VRA_202111.json  VRA_20213.json  VRA_20216.json  VRA_20219.json
VRA_20211.json	 VRA_20214.json  VRA_20217.json


**Observação:** No bloco de códigos acima, foi utilizado o comando 

```
!rm -rf /caminho a ser removido
```

Combinanado com uma checagem via if 

```
if not os.path.exists(caminho a ser checado)
```

Visto que alguns testes foram realizados com o parâmetro **dirs_exist_ok** no qual aparenta possuir instabilidade visto que não está checando corretamente o diretório e permitir um modo overwrite ao realizar as cópias. 

## Buscando os arquivos na camada transient e salvando na camada raw 

Nesta etapa os arquivos serão salvos na camada raw em modo parquet com o objetivo de compressão, otimizando o espaço em disco. Após isso, os diretórios na camada transient serão removidos. 



Nesse momento, nenhum tratamento referente a tipagem de dados ou enriquecimento será realizado. 

### AIR_CIA

In [10]:
#Realizando a sua leitura
df_air_cia = spark.read.csv(path_transient + "AIR_CIA", sep=";", multiLine=True, header=True)

In [11]:
#Exibindo o seu conteúdo
df_air_cia.show()

+--------------------+---------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+
|        Razão Social|ICAO IATA|              CNPJ|   Atividades Aéreas|       Endereço Sede|            Telefone|              E-Mail| Decisão Operacional|Data Decisão Operacional|Validade Operacional|
+--------------------+---------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+
|ABSA - AEROLINHAS...|   LTG M3|00.074.635/0001-33|TRANSPORTE AÉREO ...|AEROPORTO INTERNA...|      (11) 5582-8055|      gar@tam.com.br|       DECISÃO Nº 41|              22/04/2015|          23/04/2025|
|AEROSUL TÁXI AÉRE...|   ASO 2S|27.315.694/0001-02|SERVIÇOS AÉREOS P...|RODOVIA PR 218, K...|      (43) 3176-4030|operacoes@aerosul...|      DECISÃO Nº 282|              10/02/2021|       

In [12]:
#Exibindo o seu schema
df_air_cia.printSchema()

root
 |-- Razão Social: string (nullable = true)
 |-- ICAO IATA: string (nullable = true)
 |-- CNPJ: string (nullable = true)
 |-- Atividades Aéreas: string (nullable = true)
 |-- Endereço Sede: string (nullable = true)
 |-- Telefone: string (nullable = true)
 |-- E-Mail: string (nullable = true)
 |-- Decisão Operacional: string (nullable = true)
 |-- Data Decisão Operacional: string (nullable = true)
 |-- Validade Operacional: string (nullable = true)



In [13]:
#Gravando o seu conteúdo em parquet na camada raw
df_air_cia.write.mode('overwrite').parquet(path_raw + "AIR_CIA/")

In [14]:
#Listando o diretório da camada raw
!ls '/content/raw/AIR_CIA'

part-00000-f37b992f-564d-478a-86e1-bf5249796209-c000.snappy.parquet  _SUCCESS
part-00001-f37b992f-564d-478a-86e1-bf5249796209-c000.snappy.parquet


### VRA

In [15]:
#Realizando a sua leitura
df_vra = spark.read.json(path_transient + "VRA", multiLine=True)

In [16]:
#Exibindo os dados
df_vra.show()

+-------------------+-------------------+-----------------+-------------------+---------------+--------------------+-------------------+----------------+---------+-------------------+-------------------+-----------+
|    ChegadaPrevista|        ChegadaReal|CódigoAutorização|CódigoJustificativa|CódigoTipoLinha|ICAOAeródromoDestino|ICAOAeródromoOrigem|ICAOEmpresaAérea|NúmeroVoo|    PartidaPrevista|        PartidaReal|SituaçãoVoo|
+-------------------+-------------------+-----------------+-------------------+---------------+--------------------+-------------------+----------------+---------+-------------------+-------------------+-----------+
|2021-11-12 08:30:00|2021-11-12 08:24:00|                0|                N/A|              X|                KORD|               SBGR|             UAL|     0844|2021-11-11 22:00:00|2021-11-11 22:14:00|  REALIZADO|
|2021-11-15 08:30:00|2021-11-15 08:05:00|                0|                N/A|              X|                KORD|               SBGR|

In [17]:
#Visualizando o schema
df_vra.printSchema()

root
 |-- ChegadaPrevista: string (nullable = true)
 |-- ChegadaReal: string (nullable = true)
 |-- CódigoAutorização: string (nullable = true)
 |-- CódigoJustificativa: string (nullable = true)
 |-- CódigoTipoLinha: string (nullable = true)
 |-- ICAOAeródromoDestino: string (nullable = true)
 |-- ICAOAeródromoOrigem: string (nullable = true)
 |-- ICAOEmpresaAérea: string (nullable = true)
 |-- NúmeroVoo: string (nullable = true)
 |-- PartidaPrevista: string (nullable = true)
 |-- PartidaReal: string (nullable = true)
 |-- SituaçãoVoo: string (nullable = true)



In [18]:
#Gravando o seu conteúdo em parquet na camada raw
df_vra.write.mode('overwrite').parquet(path_raw + "VRA/")

In [19]:
#Listando o diretório da camada raw
!ls '/content/raw/VRA/'

part-00000-0da7eaad-955c-4d86-a459-3e68e19850a7-c000.snappy.parquet
part-00001-0da7eaad-955c-4d86-a459-3e68e19850a7-c000.snappy.parquet
part-00002-0da7eaad-955c-4d86-a459-3e68e19850a7-c000.snappy.parquet
_SUCCESS


### Limpando a camada transient para liberar espaço em disco

In [20]:
!rm -rf '/content/transient/AIR_CIA'
!rm -rf '/content/transient/VRA'

## Realizando tratamentos de tipagens, adição de colunas, etc

Agora que os dados já estão devidamente armazenados em camada raw, torna-se possível realizar os tratamentos adequados para continuidade do projeto.

### AIR_CIA

In [21]:
#Realizando a leitura do dataframe em parquet
df_air_cia_raw = spark.read.parquet(path_raw + "AIR_CIA")

In [22]:
#Exibindo os dados
df_air_cia_raw.show()

+--------------------+---------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+
|        Razão Social|ICAO IATA|              CNPJ|   Atividades Aéreas|       Endereço Sede|            Telefone|              E-Mail| Decisão Operacional|Data Decisão Operacional|Validade Operacional|
+--------------------+---------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+
|ABSA - AEROLINHAS...|   LTG M3|00.074.635/0001-33|TRANSPORTE AÉREO ...|AEROPORTO INTERNA...|      (11) 5582-8055|      gar@tam.com.br|       DECISÃO Nº 41|              22/04/2015|          23/04/2025|
|AEROSUL TÁXI AÉRE...|   ASO 2S|27.315.694/0001-02|SERVIÇOS AÉREOS P...|RODOVIA PR 218, K...|      (43) 3176-4030|operacoes@aerosul...|      DECISÃO Nº 282|              10/02/2021|       

In [23]:
#Exibindo o schema
df_air_cia_raw.printSchema()

root
 |-- Razão Social: string (nullable = true)
 |-- ICAO IATA: string (nullable = true)
 |-- CNPJ: string (nullable = true)
 |-- Atividades Aéreas: string (nullable = true)
 |-- Endereço Sede: string (nullable = true)
 |-- Telefone: string (nullable = true)
 |-- E-Mail: string (nullable = true)
 |-- Decisão Operacional: string (nullable = true)
 |-- Data Decisão Operacional: string (nullable = true)
 |-- Validade Operacional: string (nullable = true)



In [24]:
#Renomeando as colunas para modo Snake Case
df_air_cia_raw = (df_air_cia_raw 
                    .withColumnRenamed("Razão Social","razao_social") 
                    .withColumnRenamed("ICAO IATA","icao_iata") 
                    .withColumnRenamed("CNPJ","cnpj") 
                    .withColumnRenamed("Atividades Aéreas","atividades_aereas")
                    .withColumnRenamed("Endereço Sede","endereco_sede")
                    .withColumnRenamed("Telefone","telefone")
                    .withColumnRenamed("E-mail","email")
                    .withColumnRenamed("Decisão Operacional","decisao_operacional")
                    .withColumnRenamed("Data Decisão Operacional","data_decisao_operacional")
                    .withColumnRenamed("Validade Operacional","validade_operacional"))

In [25]:
#Visualizando a transformação aplicada
df_air_cia_raw.show()

+--------------------+---------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+
|        razao_social|icao_iata|              cnpj|   atividades_aereas|       endereco_sede|            telefone|               email| decisao_operacional|data_decisao_operacional|validade_operacional|
+--------------------+---------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+
|ABSA - AEROLINHAS...|   LTG M3|00.074.635/0001-33|TRANSPORTE AÉREO ...|AEROPORTO INTERNA...|      (11) 5582-8055|      gar@tam.com.br|       DECISÃO Nº 41|              22/04/2015|          23/04/2025|
|AEROSUL TÁXI AÉRE...|   ASO 2S|27.315.694/0001-02|SERVIÇOS AÉREOS P...|RODOVIA PR 218, K...|      (43) 3176-4030|operacoes@aerosul...|      DECISÃO Nº 282|              10/02/2021|       

In [26]:
#Seprando a coluna ICAO IATA em duas através do delimitador " "
df_air_cia_raw = (df_air_cia_raw
                    .withColumn("icao", split(col("icao_iata"), " ").getItem(0))
                    .withColumn("iata", split(col("icao_iata"), " ").getItem(1))
                  )
                    


In [27]:
#Visualizando a transformação aplicada
df_air_cia_raw.show()

+--------------------+---------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+----+----+
|        razao_social|icao_iata|              cnpj|   atividades_aereas|       endereco_sede|            telefone|               email| decisao_operacional|data_decisao_operacional|validade_operacional|icao|iata|
+--------------------+---------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+----+----+
|ABSA - AEROLINHAS...|   LTG M3|00.074.635/0001-33|TRANSPORTE AÉREO ...|AEROPORTO INTERNA...|      (11) 5582-8055|      gar@tam.com.br|       DECISÃO Nº 41|              22/04/2015|          23/04/2025| LTG|  M3|
|AEROSUL TÁXI AÉRE...|   ASO 2S|27.315.694/0001-02|SERVIÇOS AÉREOS P...|RODOVIA PR 218, K...|      (43) 3176-4030|operacoes@aerosul...|      DECISÃO

In [28]:
#Convertendo a tipagem dos dados e selecionando as colunas necessárias
df_air_cia_raw = df_air_cia_raw.select(
    
                    "razao_social",
                    "icao",
                    "iata",
                    "cnpj",
                    "atividades_aereas",
                    "endereco_sede",
                    "telefone",
                    "email",
                    "decisao_operacional",
                    to_date(col("data_decisao_operacional"),"dd/MM/yyyy").alias("data_decisao_operacional"),
                    to_date(col("validade_operacional"),"dd/MM/yyyy").alias("validade_operacional")
                    
)




In [29]:
#Visualizando os dados
df_air_cia_raw.show()

+--------------------+----+----+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+
|        razao_social|icao|iata|              cnpj|   atividades_aereas|       endereco_sede|            telefone|               email| decisao_operacional|data_decisao_operacional|validade_operacional|
+--------------------+----+----+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+
|ABSA - AEROLINHAS...| LTG|  M3|00.074.635/0001-33|TRANSPORTE AÉREO ...|AEROPORTO INTERNA...|      (11) 5582-8055|      gar@tam.com.br|       DECISÃO Nº 41|              2015-04-22|          2025-04-23|
|AEROSUL TÁXI AÉRE...| ASO|  2S|27.315.694/0001-02|SERVIÇOS AÉREOS P...|RODOVIA PR 218, K...|      (43) 3176-4030|operacoes@aerosul...|      DECISÃO Nº 282|              2021-02-10|       

In [30]:
#visualizando o schema
df_air_cia_raw.printSchema()

root
 |-- razao_social: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- cnpj: string (nullable = true)
 |-- atividades_aereas: string (nullable = true)
 |-- endereco_sede: string (nullable = true)
 |-- telefone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- decisao_operacional: string (nullable = true)
 |-- data_decisao_operacional: date (nullable = true)
 |-- validade_operacional: date (nullable = true)



In [31]:
#Gravando dados em camada trusted
df_air_cia_raw.write.mode('overwrite').parquet(path_trusted + "AIR_CIA/")

In [32]:
#Listando o diretório da camada raw
!ls '/content/trusted/AIR_CIA/'

part-00000-bfaa623e-440d-46b6-8b2e-054328e24499-c000.snappy.parquet  _SUCCESS
part-00001-bfaa623e-440d-46b6-8b2e-054328e24499-c000.snappy.parquet


### VRA

In [104]:
#Realizando a leitura do dataframe em parquet
df_vra_raw = spark.read.parquet(path_raw + "VRA")

In [105]:
#Exibindo os dados
df_vra_raw.show()

+-------------------+-------------------+-----------------+-------------------+---------------+--------------------+-------------------+----------------+---------+-------------------+-------------------+-------------+
|    ChegadaPrevista|        ChegadaReal|CódigoAutorização|CódigoJustificativa|CódigoTipoLinha|ICAOAeródromoDestino|ICAOAeródromoOrigem|ICAOEmpresaAérea|NúmeroVoo|    PartidaPrevista|        PartidaReal|  SituaçãoVoo|
+-------------------+-------------------+-----------------+-------------------+---------------+--------------------+-------------------+----------------+---------+-------------------+-------------------+-------------+
|2021-08-15 20:10:00|               null|                0|                N/A|              N|                SBGR|               SBGL|             IPM|     5555|2021-08-15 19:15:00|               null|NÃO INFORMADO|
|2021-08-16 20:25:00|               null|                0|                N/A|              N|                SBGR|            

In [106]:
#Exibindo o schema
df_vra_raw.printSchema()

root
 |-- ChegadaPrevista: string (nullable = true)
 |-- ChegadaReal: string (nullable = true)
 |-- CódigoAutorização: string (nullable = true)
 |-- CódigoJustificativa: string (nullable = true)
 |-- CódigoTipoLinha: string (nullable = true)
 |-- ICAOAeródromoDestino: string (nullable = true)
 |-- ICAOAeródromoOrigem: string (nullable = true)
 |-- ICAOEmpresaAérea: string (nullable = true)
 |-- NúmeroVoo: string (nullable = true)
 |-- PartidaPrevista: string (nullable = true)
 |-- PartidaReal: string (nullable = true)
 |-- SituaçãoVoo: string (nullable = true)



In [107]:
#Renomeando as colunas para modo Snake Case
df_vra_raw = (df_vra_raw 
                    .withColumnRenamed("ChegadaPrevista","chegada_prevista") 
                    .withColumnRenamed("ChegadaReal","chegada_real") 
                    .withColumnRenamed("CódigoAutorização","codigo_autorizacao") 
                    .withColumnRenamed("CódigoJustificativa","codigo_justificativa")
                    .withColumnRenamed("CódigoTipoLinha","codigo_tipo_linha")
                    .withColumnRenamed("ICAOAeródromoDestino","icao_aerodromo_destino")
                    .withColumnRenamed("ICAOAeródromoOrigem","icao_aerodromo_origem")
                    .withColumnRenamed("ICAOEmpresaAérea","icao_empresa_aerea")
                    .withColumnRenamed("NúmeroVoo","numero_voo")
                    .withColumnRenamed("PartidaPrevista","partida_prevista")
                    .withColumnRenamed("PartidaReal","partida_real")
                    .withColumnRenamed("SituaçãoVoo","situacao_voo")
                    
                    )

In [108]:
#Exibindo dataframe após renomear as colunas
df_vra_raw.show()

+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+-------------+
|   chegada_prevista|       chegada_real|codigo_autorizacao|codigo_justificativa|codigo_tipo_linha|icao_aerodromo_destino|icao_aerodromo_origem|icao_empresa_aerea|numero_voo|   partida_prevista|       partida_real| situacao_voo|
+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+-------------+
|2021-08-15 20:10:00|               null|                 0|                 N/A|                N|                  SBGR|                 SBGL|               IPM|      5555|2021-08-15 19:15:00|               null|NÃO INFORMADO|
|2021-08-16 20:25:00|               null|                 0|                 N/A|   

In [109]:
#Realizando alteração nos tipos das colunas
df_vra_raw = (df_vra_raw
            .withColumn("chegada_prevista", to_timestamp("chegada_prevista", "yyyy-MM-dd HH:mm:ss"))
            .withColumn("chegada_real", to_timestamp("chegada_real", "yyyy-MM-dd HH:mm:ss"))
            .withColumn("partida_prevista", to_timestamp("partida_prevista", "yyyy-MM-dd HH:mm:ss"))
            .withColumn("partida_real", to_timestamp("partida_real", "yyyy-MM-dd HH:mm:ss"))
            )

In [110]:
#Exibindo schema após alteração
df_vra_raw.printSchema()

root
 |-- chegada_prevista: timestamp (nullable = true)
 |-- chegada_real: timestamp (nullable = true)
 |-- codigo_autorizacao: string (nullable = true)
 |-- codigo_justificativa: string (nullable = true)
 |-- codigo_tipo_linha: string (nullable = true)
 |-- icao_aerodromo_destino: string (nullable = true)
 |-- icao_aerodromo_origem: string (nullable = true)
 |-- icao_empresa_aerea: string (nullable = true)
 |-- numero_voo: string (nullable = true)
 |-- partida_prevista: timestamp (nullable = true)
 |-- partida_real: timestamp (nullable = true)
 |-- situacao_voo: string (nullable = true)



In [111]:
#Exibindo dataframe após renomear as colunas
df_vra_raw.show()

+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+-------------+
|   chegada_prevista|       chegada_real|codigo_autorizacao|codigo_justificativa|codigo_tipo_linha|icao_aerodromo_destino|icao_aerodromo_origem|icao_empresa_aerea|numero_voo|   partida_prevista|       partida_real| situacao_voo|
+-------------------+-------------------+------------------+--------------------+-----------------+----------------------+---------------------+------------------+----------+-------------------+-------------------+-------------+
|2021-08-15 20:10:00|               null|                 0|                 N/A|                N|                  SBGR|                 SBGL|               IPM|      5555|2021-08-15 19:15:00|               null|NÃO INFORMADO|
|2021-08-16 20:25:00|               null|                 0|                 N/A|   

**Curiosidade:**

Ao realizar a escrita de um arquivo parquet foi encontrado o erro: 

```
You may get a different result due to the upgrading to Spark >= 3.0: 
writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z
```

Esse erro normalmente ocorre devido a versão do Spark 3.0 ou > não possuir suporte a calendários de datas hibridos nos quais utilizam tanto o Gregoriano quanto o Juliano. Em específico nestes casos o Spark utiliza apenas o calendário Gregoriano no qual suporta datas maiores que 1900. Neste caso, um tratamento adicional será realizado, onde datas menores que 1900 serão substituídas por 1900.


Fonte: https://www.roelpeters.be/spark-3-0-solve-error-the-dates-before-1582-10-15/

In [112]:
df_vra_raw = (df_vra_raw
              .withColumn("chegada_prevista", when(col("chegada_prevista") <= '1900-01-01', to_date(lit('1900-01-01'), 'yyyy-MM-dd')).otherwise(col("chegada_prevista")))
              .withColumn("chegada_real", when(col("chegada_real") <= '1900-01-01', to_date(lit('1900-01-01'), 'yyyy-MM-dd')).otherwise(col("chegada_real")))
              .withColumn("partida_prevista", when(col("partida_prevista") <= '1900-01-01', to_date(lit('1900-01-01'), 'yyyy-MM-dd')).otherwise(col("partida_prevista")))
              .withColumn("partida_real", when(col("partida_real") <= '1900-01-01', to_date(lit('1900-01-01'), 'yyyy-MM-dd')).otherwise(col("partida_real")))
              
              )

In [113]:
#Gravando dados em camada trusted
df_vra_raw.write.mode('overwrite').parquet(path_trusted + "VRA/")

## Realizando consumo da API https://rapidapi.com/Active-api/api/airport-info/

O primeiro passo para realizar o  consumo, foi estudar o funcionamento da API através da sua documentação e visto que a API possui compatibilidade com diversas linguagens e métodos de consumos, foi utilizada a linguagem Python juntamente com a biblioteca Requests. 

### Validando consumo de maneira estática

In [74]:
#Realizando teste consumindo um valor fixo

icao_teste = "SBGR"


#Exemplo fornecido via documentação

querystring = {"icao":icao_teste}

url = "https://airport-info.p.rapidapi.com/airport"
headers = {
  "X-RapidAPI-Host": "airport-info.p.rapidapi.com",
  "X-RapidAPI-Key": "50f502021bmsh2f8a94665ca2054p10d43ajsnb2e1577f0da1"
}

response = requests.request("GET", url, headers=headers, params=querystring)

print(response.text)

{"id":2695,"iata":"GRU","icao":"SBGR","name":"São Paulo–Guarulhos International Airport","location":"São Paulo, Brazil","street_number":"s/nº","street":"Rod. Hélio Smidt","city":"","county":"Guarulhos","state":"São Paulo","country_iso":"BR","country":"Brazil","postal_code":"07190-100","phone":"+55 11 2445-2945","latitude":-23.430573,"longitude":-46.47304,"uct":-180,"website":"http://www.gru.com.br/"}



### Realizando consumo de maneira dinâmica

In [120]:
#Realizando a leitura do dataframe em parquet em camada trusted e selecionando apenas as colunas necessárias
df_icao_origem = spark.read.parquet(path_trusted + "VRA").select("icao_aerodromo_origem").distinct()
df_icao_destino = spark.read.parquet(path_trusted + "VRA").select("icao_aerodromo_destino").distinct()

In [122]:
#Exibindo os dataframes
df_icao_origem.show(5)
df_icao_destino.show(5)

+---------------------+
|icao_aerodromo_origem|
+---------------------+
|                 SBPS|
|                 LFPG|
|                 KATL|
|                 SBIH|
|                 EDDK|
+---------------------+
only showing top 5 rows

+----------------------+
|icao_aerodromo_destino|
+----------------------+
|                  LYBE|
|                  SBPS|
|                  LFPG|
|                  KATL|
|                  SBBU|
+----------------------+
only showing top 5 rows



In [138]:
#Renomeando as colunas para o mesmo nome
df_icao_origem = df_icao_origem.withColumnRenamed("icao_aerodromo_origem", "icao")
df_icao_destino = df_icao_destino.withColumnRenamed("icao_aerodromo_destino", "icao")

#Realizando union dos dataframes
df_icaos = df_icao_origem.union(df_icao_destino).distinct()

df_icaos.show(5)

+----+
|icao|
+----+
|SBPS|
|LFPG|
|KATL|
|SBIH|
|EDDK|
+----+
only showing top 5 rows



In [145]:
#Criando lista com os icaos

lista_icaos = []
lista_icaos = [i.icao for i in df_icao_origem.select('icao').distinct().collect() ]


#Com rdd a performance não é tão boa, deste modo, foi alterado para
#lista_icaos = df_icao_origem.select("icao").rdd.flatMap(lambda x: x).collect()

Agora que a lista com todos os códigos foi criada. Será elaborada uma função para percorrer todos os elementos da lista e realizar a requisição na API. 

In [149]:
#Criando a função
def fn_get_icao(icao):

  url = "https://airport-info.p.rapidapi.com/airport"
  headers = {
    "X-RapidAPI-Host": "airport-info.p.rapidapi.com",
    "X-RapidAPI-Key": "50f502021bmsh2f8a94665ca2054p10d43ajsnb2e1577f0da1"
  }
  querystring = {"icao":icao}
  response = requests.request("GET", url, headers=headers, params=querystring)

  return response.text

In [155]:
#Realizando as chamadas dinamicamente
lista_coletada = []

for i in lista_icaos:
  lista_coletada.append(fn_get_icao(i))

  #Aguardando 0.25 para realizar a próxima coleta
  time.sleep(0.25)

In [162]:
#Visualizando se houveram retornos
for i in lista_coletada:
  print(i)

{"id":908,"iata":"BPS","icao":"SBPS","name":"Porto Seguro Airport","location":"Porto Seguro, Bahia, Brazil","street_number":"68","street":"Rua Primeiro de Maio","city":"","county":"Porto Seguro","state":"Bahia","country_iso":"BR","country":"Brazil","postal_code":"45810-000","phone":"+55 73 3288-1880","latitude":-16.439308,"longitude":-39.081562,"uct":-180,"website":"http://www.aeroportoseguro.com.br/"}

{"id":1213,"iata":"CDG","icao":"LFPG","name":"Charles de Gaulle Airport (Roissy Airport)","location":"Paris, Île-de-France, France","street_number":"","street":"","city":"Roissy-en-France","county":"","state":"Île-de-France","country_iso":"FR","country":"France","postal_code":"95700","phone":"+33 1 70 36 39 50","latitude":49.00969,"longitude":2.5479245,"uct":120,"website":"http://www.parisaeroport.fr/"}

{"id":410,"iata":"ATL","icao":"KATL","name":"Hartsfield–Jackson Atlanta International Airport","location":"Atlanta, Georgia, United States","street_number":"6000","street":"North Termin

Pode-se observar que em alguns casos o aeroporto não é encontrado retornando: 

```
{"error":{"text":"No airport found"}
```

Desse modo, o próximo passo será remover esses valores antes de realizar a escrita em arquivo parquet.

In [189]:
lista_final = []

for i in lista_coletada:
  if 'No airport found' not in i:
    lista_final.append(i)

In [236]:
#Realizando a leitura da lista final utilizando rdd visto que não foi possível inferir um schema manualmente ou de maneira automática
rdd = sc.parallelize(lista_final)
df_aerodromo_raw = spark.read.json(rdd)

#Exibindo o dataframe
df_aerodromo_raw.show(5)

#Exibindo o schema
df_aerodromo_raw.printSchema()

+----------------+-------------+-----------+------------+----+----+----+----------+--------------------+----------+--------------------+-----------------+-----------+-------------------+--------------------+-------------+----+--------------------+
|            city|      country|country_iso|      county|iata|icao|  id|  latitude|            location| longitude|                name|            phone|postal_code|              state|              street|street_number| uct|             website|
+----------------+-------------+-----------+------------+----+----+----+----------+--------------------+----------+--------------------+-----------------+-----------+-------------------+--------------------+-------------+----+--------------------+
|                |       Brazil|         BR|Porto Seguro| BPS|SBPS| 908|-16.439308|Porto Seguro, Bah...|-39.081562|Porto Seguro Airport| +55 73 3288-1880|  45810-000|              Bahia|Rua Primeiro de Maio|           68|-180|http://www.aeropo...|
|Roissy-

### Gravando em camada raw

In [238]:
df_aerodromo_raw.write.mode('overwrite').parquet(path_raw + "AERODROMO_API/")

## Realizando tratamentos nos aerodromos

In [239]:
#Realizando leitura da camada raw
df_aerodromo_raw_to_trusted = spark.read.parquet(path_raw + "AERODROMO_API/")

In [246]:
#Convertendo valores tabulados e com espaços em nulos
df_aerodromo_raw_to_trusted = (df_aerodromo_raw_to_trusted.select(
                                  "id",
                                  when(length(trim("city")) != 0, trim("city")).alias("city"),
                                  when(length(trim("country")) != 0, trim("country")).alias("country"),
                                  when(length(trim("country_iso")) != 0, trim("country_iso")).alias("country_iso"),
                                  when(length(trim("county")) != 0, trim("county")).alias("county"),
                                  when(length(trim("iata")) != 0, trim("iata")).alias("iata"),
                                  when(length(trim("icao")) != 0, trim("icao")).alias("icao"),
                                  when(length(trim("location")) != 0, trim("location")).alias("location"),
                                  "latitude",
                                  "longitude",
                                  "uct",
                                  when(length(trim("name")) != 0, trim("name")).alias("name"),
                                  when(length(trim("phone")) != 0, trim("phone")).alias("phone"),
                                  when(length(trim("postal_code")) != 0, trim("postal_code")).alias("postal_code"),
                                  when(length(trim("state")) != 0, trim("state")).alias("state"),
                                  when(length(trim("street")) != 0, trim("street")).alias("street"),
                                  when(length(trim("street_number")) != 0, trim("street_number")).alias("street_number"),
                                  when(length(trim("website")) != 0, trim("website")).alias("website")

                                                        ))

df_aerodromo_raw_to_trusted.show(5)

+----+----------------+-------------+-----------+------------+----+----+--------------------+----------+----------+----+--------------------+-----------------+-----------+-------------------+--------------------+-------------+--------------------+
|  id|            city|      country|country_iso|      county|iata|icao|            location|  latitude| longitude| uct|                name|            phone|postal_code|              state|              street|street_number|             website|
+----+----------------+-------------+-----------+------------+----+----+--------------------+----------+----------+----+--------------------+-----------------+-----------+-------------------+--------------------+-------------+--------------------+
| 908|            null|       Brazil|         BR|Porto Seguro| BPS|SBPS|Porto Seguro, Bah...|-16.439308|-39.081562|-180|Porto Seguro Airport| +55 73 3288-1880|  45810-000|              Bahia|Rua Primeiro de Maio|           68|http://www.aeropo...|
|1213|Ro

### Gravando resultado em camada trusted

In [247]:
#Gravando em camada trusted
df_aerodromo_raw_to_trusted.write.mode('overwrite').parquet(path_trusted + "AERODROMO_API/")

## Tempo total de execução

In [42]:
#Exibe o tempo total da execução do código do desafio
finished = datetime.now()
print(finished - started)

0:02:23.112555
