In [5]:
!pip install pandas



In [1]:
!pip install unicode



In [10]:
from unidecode import unidecode
import re
# função responsável por realizar snake_case
def snake_case(text:str) -> str:
    
    return unidecode(text.replace(" ","_").lower()) if " " in text else re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', unidecode(text.replace("ICAO", "icao_"))).lower()

In [11]:
import pandas as pd
import os

# criando variáveis com diretórios arquivo de air_cia
air_cia_input_path = list(map(lambda item: "./AIR_CIA/"+item, os.listdir("./AIR_CIA/")))
air_cia_raw_path = "raw/air_cia.csv"
air_cia_clean_path = "clean/air_cia.csv"
air_cia_curated_path = "curated/air_cia.csv"

dfs = []

# realizando a leitura dos arquivos csv de air_cia e os listando
for file in air_cia_input_path:
    df = pd.read_csv(file, delimiter=';')
    dfs.append(df)

#unificando lista de dataframes dos dados
df_air_cia = pd.concat(dfs, ignore_index=True)

#checando se diretório existe e o criando caso não, para garantir funcionamento
if not os.path.exists("raw"):
    os.makedirs("raw")

#escrevendo os dados na camada bronze
df_air_cia.to_csv(air_cia_raw_path, index=False, sep=';', encoding='utf-8')

# realizando a leitura dos dados da camada bronze
df_air_cia_clean = pd.read_csv("./"+air_cia_raw_path, delimiter=';')

# aplicando snake_case em nomes de colunas
df_air_cia_clean.rename(columns=lambda x: snake_case(x), inplace=True)

if not os.path.exists("clean"):
    os.makedirs("clean")

#salvando os dados na camada prata
df_air_cia_clean.to_csv(air_cia_clean_path, index=False, sep=';', encoding='utf-8')

#realizando a leitura de dados da camada prata
df_air_cia_curated = pd.read_csv("./"+air_cia_clean_path, delimiter=';')

#separando as colunas de ICAO e de IATA
df_air_cia_curated[['icao','iata']] = df_air_cia_curated["icao_iata"].str.split(' ', expand=True)

if not os.path.exists("curated"):
    os.makedirs("curated")

#gravando os dados na camada ouro
df_air_cia_curated.to_csv(air_cia_curated_path, index=False, sep=';', encoding='utf-8')

In [12]:
# criando variáveis com diretórios de uso de VRA
vra_input_path = list(map(lambda item: "./VRA/"+item, os.listdir("./VRA/")))
vra_raw_path = "raw/vra.csv"
vra_clean_path = "clean/vra.csv"

dfs = []

#percorrendo lista de arquivos de VRA, transformando em DF pandas e os listando
for file in vra_input_path:
    df = pd.read_json(file, encoding = 'utf-8-sig')
    dfs.append(df)

#concatenando lista de DFs de VRA
df_vra_raw = pd.concat(dfs, ignore_index=True)

if not os.path.exists("raw"):
    os.makedirs("raw")

#salvando df em camada bronze
df_vra_raw.to_csv(vra_raw_path, index=False, sep=';', encoding='utf-8')

#realizando a leitura do arquivo da camada bronze
df_vra_clean = pd.read_csv("./"+vra_raw_path, delimiter=';')

# aplicando snake_case em nomes de colunas
df_vra_clean.rename(columns=lambda x: snake_case(x), inplace=True)

if not os.path.exists("clean"):
    os.makedirs("clean")

# escrevendo dados na camada prata
df_vra_clean.to_csv(vra_clean_path, index=False, sep=';', encoding='utf-8')

  df_vra_clean = pd.read_csv("./"+vra_raw_path, delimiter=';')


In [88]:
import requests
import json

aerodromo_info_raw_path = "raw/aerodromo_info.csv"
aerodromo_info_clean_path = "clean/aerodromo_info.csv"

# url de chamada
url = "https://airport-info.p.rapidapi.com/airport"

# header com dados de autenticação
headers = {
	"X-RapidAPI-Key": "d7d32e551bmsh0fbf0c07b7c7057p107255jsndd38c5445b21",
	"X-RapidAPI-Host": "airport-info.p.rapidapi.com"
}

# leitura de VRA com dados de ICAO
df_vra_clean = pd.read_csv("./"+vra_clean_path, delimiter=';')

#listando dados de ICAO
lista_icao_aerodromo_origem = df_vra_clean['icao_aerodromo_origem'].tolist()
lista_icao_aerodromo_destino = df_vra_clean['icao_aerodromo_destino'].tolist()

#unindo listas com o auxilio de SET para não ser realizzado a chamada da API de forma repetida 
lista_icao = list(set(lista_icao_aerodromo_origem + lista_icao_aerodromo_destino))

dfs = []
i = 0

# laço de repetição com chamada de API e criação de query string com resultado de ICAO 
for icao in lista_icao:
    querystring = {"icao":icao}
    # chamada de API com resultados
    response = requests.get(url, headers=headers, params=querystring)
    # checando valores de chamadas para checar sucesso de chamada
    if response.status_code == 200 and 'error' not in response.text:
        # realizando parse de resultado de chamda da API para JSON
        data = json.loads(response.text)
        df = pd.DataFrame(data, index=[0])
        dfs.append(df)
    # print para acompanhamento de execução, existem 315 chamadas no total
    i+=1
    print("chamada de API número:"+str(i))

df_aerodromo_info = pd.concat(dfs, ignore_index=True)

if not os.path.exists("raw"):
    os.makedirs("raw")

#salvando df em camada bronze
df_aerodromo_info.to_csv(aerodromo_info_raw_path, index=False, sep=';', encoding='utf-8')

#realizando a leitura do arquivo da camada bronze
df_aerodromo_info_clean = pd.read_csv("./"+aerodromo_info_raw_path, delimiter=';')

# aplicando snake_case em nomes de colunas
df_aerodromo_info_clean.rename(columns=lambda x: snake_case(x), inplace=True)

if not os.path.exists("clean"):
    os.makedirs("clean")

# escrevendo dados na camada prata
df_aerodromo_info_clean.to_csv(aerodromo_info_clean_path, index=False, sep=';', encoding='utf-8')

  df_vra_clean = pd.read_csv("./"+vra_clean_path, delimiter=';')


chamada de API número:1
chamada de API número:2
chamada de API número:3
chamada de API número:4
chamada de API número:5
chamada de API número:6
chamada de API número:7
chamada de API número:8
chamada de API número:9
chamada de API número:10
chamada de API número:11
chamada de API número:12
chamada de API número:13
chamada de API número:14
chamada de API número:15
chamada de API número:16
chamada de API número:17
chamada de API número:18
chamada de API número:19
chamada de API número:20
chamada de API número:21
chamada de API número:22
chamada de API número:23
chamada de API número:24
chamada de API número:25
chamada de API número:26
chamada de API número:27
chamada de API número:28
chamada de API número:29
chamada de API número:30
chamada de API número:31
chamada de API número:32
chamada de API número:33
chamada de API número:34
chamada de API número:35
chamada de API número:36
chamada de API número:37
chamada de API número:38
chamada de API número:39
chamada de API número:40
chamada d

  df_aerodromo_info_clean = pd.read_csv("./"+vra_raw_path, delimiter=';')


In [None]:
#### Extras:
#   - Descrever qual estratégia você usaria para ingerir estes dados de forma incremental caso precise capturar esses dados a cada mes?
#   Resposta: identificaria o campo incremental e realizaria a leitura por ele, de forama a escrever os dados particionados pelo campo incremental também.
#   - Justifique em cada etapa sobre a escalabilidade da tecnologia utilizada.
#   Resposta: A tecnologia escolhida não possui escabilidade, por ter sido desenvolvido de forma local e com o uso de pandas para explorar os dados, este tem o beneficio de baixo custo, também podendo ser inserido em ferramentas serveless em núvem mantendo o baixo custo, para se obter escabilidade poderá ser desenvolvido com o uso de pyspark e com o uso do Databricks ou com o uso de ferramentas de dados em núvem como Databricks ou Synape no caso de Azure
#   - Justifique as camadas utilizadas durante o processo de ingestão até a disponibilização dos dados.
#   Resposta: foi utilizado um modelo de datalake para a construção das camadas, de forma a raw(bronze) possuir os dados de forma crua, assim como são ingeridos, de forma a considerar a sua entrada apenas após o processamento, a camada clean(prata) é responsável por conter dados após limpeza e/ou normalização, como a única normalização necessária foi a dos cabeçalhos esta foi aplicada para esta camada, por último a camada curated(ouro), está possui informações de dados curados, que possuem unificações e um certo nível de tratativas, desta forma os dados de AIR_CIA foram os únicos que precisaram de algum tipo de modificação, desta forma este foi escolhido para esta mas apenas neste caso em especifico.

In [None]:
## conexão com banco de dados para inserção de dados.
## banco de dados esta em container docker

In [92]:
!pip install sqlalchemy



In [93]:
!pip install psycopg2

Collecting psycopg2
  Obtaining dependency information for psycopg2 from https://files.pythonhosted.org/packages/bc/bc/6572dec6834e779668421e25f8812a872d978e241f85491a5e4dda606a98/psycopg2-2.9.9-cp310-cp310-win_amd64.whl.metadata
  Downloading psycopg2-2.9.9-cp310-cp310-win_amd64.whl.metadata (4.5 kB)
Downloading psycopg2-2.9.9-cp310-cp310-win_amd64.whl (1.2 MB)
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   ---------------------------------------- 0.0/1.2 MB ? eta -:--:--
   -- ------------------------------------- 0.1/1.2 MB 544.7 kB/s eta 0:00:03
   ------------------- -------------------- 0.6/1.2 MB 3.9 MB/s eta 0:00:01
   ---------------------------------------  1.2/1.2 MB 6.1 MB/s eta 0:00:01
   ---------------------------------------- 1.2/1.2 MB 5.7 MB/s eta 0:00:00
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.9


In [94]:
from sqlalchemy import create_engine

db_user = "postgre"
db_password = "pass123"
db_host = "localhost"
db_port = "5432"
db_name = "aerodromo"

connection_string = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
engine = create_engine(connection_string)

In [97]:
df_aerodromo_info_clean = pd.read_csv("./"+aerodromo_info_clean_path, delimiter=';')

table_name = 'aerodromo_info'

df_aerodromo_info_clean.to_sql(table_name, engine, index=False)

276

In [99]:
df_vra_clean = pd.read_csv("./"+vra_clean_path, delimiter=';')

table_name = 'vra'

df_vra_clean.to_sql(table_name, engine, index=False)

  df_vra_clean = pd.read_csv("./"+vra_clean_path, delimiter=';')


803

In [100]:
df_air_cia_curated = pd.read_csv("./"+air_cia_curated_path, delimiter=';')

table_name = 'air_cia'

df_air_cia_curated.to_sql(table_name, engine, index=False)

20