In [1]:
import pandas as pd
import json
from sqlalchemy import create_engine, types
import unicodedata
import os

In [2]:
path = '../data/AIR_CIA/'

In [3]:
engine = create_engine('postgresql://data_engineer_test_user:8WBNsM8B^?eDpN$q@localhost:5432/data_engineer_test')

In [4]:
dtype={
    "razao_social": types.VARCHAR(length=150),
    "cnpj": types.VARCHAR(length=30),
    "atividades_aereas": types.VARCHAR(length=255),
    "endereco_sede": types.VARCHAR(length=255),
    "telefone": types.VARCHAR(length=150),
    "e-mail": types.VARCHAR(length=150),
    "decisao_operacional": types.VARCHAR(length=100),
    "data_decisao_operacional": types.Date,
    "validade_operacional": types.Date,
    "icao": types.VARCHAR(length=4),
    "iata": types.VARCHAR(length=4)
}

In [5]:
def get_csv_files():
    files = os.listdir(path)
    # pega apenas os arquivos CSVs, ignorando os arquivos de Debugs e de outras extensões
    csv_files = [file for file in files if file.endswith(".csv") and "normalizado_" not in file]
    return csv_files

In [6]:
def remove_normalized_file(path_to_remove):
    # se for um arquivo de Debug remove
    if "normalizado_" in path_to_remove and os.path.exists(path_to_remove):
        print(f"{path}{csv_file}")
        os.remove(path_to_remove)

In [7]:
# divide a coluna em duas, para casos onde não tem valor seta como nulo
def split_columns(coluna):
    col_split = str(coluna).split()
    if len(col_split) == 2:
        return col_split[0], col_split[1]
    elif len(col_split) == 1:
        return col_split[0], None
    else:
        return None, None

In [8]:
def to_snake_case(coluna):
    coluna_snake = coluna.replace(" ", "_").lower()
    # eliminar os caracteres com acento
    coluna_snake = unicodedata.normalize('NFD', coluna_snake).encode('ascii', 'ignore').decode("utf-8")
    return coluna_snake

In [9]:
def csv_transformation(csv_file):
    # remove_normalized_file(f"{path}{csv_file}")

    df_air_cia = pd.read_csv(f"{path}{csv_file}", sep=";")

    df_air_cia["ICAO"], df_air_cia["IATA"] = zip(*df_air_cia["ICAO IATA"].map(split_columns))

    # em alguns casos o valor nulo estava sendo colocado como string, isso garante que serão realmente nulos
    df_air_cia.replace(['None', 'NaN', 'nan'], [None, None, None], regex=True, inplace=True)

    df_air_cia.drop(["ICAO IATA"], inplace=True, axis=1)

    df_air_cia.rename(inplace=True, columns=lambda x: to_snake_case(x))

    # df_air_cia.to_csv(f"{path}normalizado_{csv_file}", index=False)

    return df_air_cia

In [10]:
# foi verificado que alguns dados repetiam em diferentes CSVs, isso garante que serão unicos
def remove_duplicates(frames):
    df_result = pd.concat(frames)

    return df_result.drop_duplicates()

In [11]:
# foi utilizada a propria função do pandas para carregar os dados no banco, mas tomando cuidado com o tipos dos dados
def load_df_in_database(df):
    with engine.connect() as conn:
        df.to_sql(
            name='air_cia',
            con=conn,
            schema='data_engineer',
            index=False,
            if_exists='replace',
            dtype=dtype
        )

In [12]:
csv_files = get_csv_files()

list_df = []

for csv_file in csv_files:
    list_df.append(csv_transformation(csv_file))

df_clean = remove_duplicates(list_df)
load_df_in_database(df_clean)