# ETL da camada silver para a camada gold

Esta célula importa todas as bibliotecas necessárias para o processo de ETL. Aqui são carregados os pacotes para a manipulação de dados, conexão com o banco de dados e tratamento de avisos.

In [2]:
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
import os

import numpy as np
import pandas as pd

## 1. Extract

Esta célula define as configurações de conexão com o banco de dados PostgreSQL e extrai os dados da camada silver para um DataFrame do Pandas.

In [3]:
BASE_PATH = Path(os.getcwd()).parent.parent
DB_CONFIG = {
    "host": os.getenv("DB_HOST"),
    "port": os.getenv("DB_PORT"),
    "database": os.getenv("POSTGRES_DB"),
    "user": os.getenv("POSTGRES_USER"),
    "password": os.getenv("POSTGRES_PASSWORD"),
}

DB_SCHEMA = "silver"
TABLE_NAME = "sinistros"
TABLE_FULL_NAME = f"{DB_SCHEMA}.{TABLE_NAME}"

print("DB host:", DB_CONFIG["host"])
print("DB port:", DB_CONFIG["port"])
print("DB name:", DB_CONFIG["database"])

def get_database_connection(db_config):
    conn = psycopg2.connect(
        host=db_config["host"],
        port=db_config["port"],
        database=db_config["database"],
        user=db_config["user"],
        password=db_config["password"],
    )
    return conn

def extract_silver_data(conn) -> pd.DataFrame:
    query = f"SELECT * FROM {TABLE_FULL_NAME};"
    df = pd.read_sql_query(query, conn)
    return df

def close_database_connection(conn):
    conn.close()

connection = get_database_connection(DB_CONFIG)
silver_df = extract_silver_data(connection)
close_database_connection(connection)

print(f"Total de registros extraídos da camada silver: {len(silver_df)}")

DB host: localhost
DB port: 5432
DB name: prf


  df = pd.read_sql_query(query, conn)


Total de registros extraídos da camada silver: 339183


In [3]:
silver_df.head()

Unnamed: 0,ano_arquivo,sinistro_id,pessoa_id,veiculo_id,data_hora,dia_semana_num,uf,municipio,delegacia,latitude,...,condicao_meteorologica,tipo_pista,tracado_via,caracteristicas_via,tipo_envolvido,estado_fisico,faixa_etaria,sexo,tipo_veiculo,faixa_idade_veiculo
0,2024,571772,1268971,1018215.0,2024-01-01 00:05:00,0,RJ,TANGUA,DEL02-RJ,-22.72936,...,Céu Claro,Dupla,Reta,urbano,Condutor,obito,20-29,Masculino,Motocicleta,0-4
1,2024,571774,1268985,1018226.0,2024-01-01 00:05:00,0,GO,ANAPOLIS,DEL02-GO,-16.229185,...,Céu Claro,Dupla,Reta,rural,Condutor,ileso,30-39,Feminino,Automóvel,15-19
2,2024,571777,1269020,1018251.0,2024-01-01 01:45:00,0,ES,SERRA,DEL02-ES,-20.172928,...,Nublado,Múltipla,Interseção de Vias;Reta,urbano,Condutor,ileso,50-59,Masculino,Caminhonete,15-19
3,2024,571778,1269028,1018261.0,2024-01-01 00:45:00,0,SC,PENHA,DEL03-SC,-26.83477,...,Chuva,Dupla,Curva,rural,Condutor,ileso,50-59,Masculino,Camioneta,10-14
4,2024,571778,1269045,1018261.0,2024-01-01 00:45:00,0,SC,PENHA,DEL03-SC,-26.83477,...,Chuva,Dupla,Curva,rural,Passageiro,leve,30-39,Feminino,Camioneta,10-14


## 2. Transform

Esta célula realiza as transformações necessárias nos dados extraídos da camada silver, preparando-os para a carga na camada gold. Ele aplica o dicionário `column_mappings` para renomear as colunas do DataFrame conforme o padrão desejado na camada gold.

In [4]:
column_mappings = {
    # Dimensão Tempo
    "ano_arquivo": "ano_arq",
    "data_hora": "dat_ocr",
    "dia_semana_num": "dia_sem_num",
    "fase_dia": "fas_dia",
    
    # Dimensão Localidade
    "uf": "uni_fed",
    "municipio": "nom_mun",
    "delegacia": "nom_del",
    "latitude": "num_lat",
    "longitude": "num_lon",
    
    # Dimensão Circunstância
    "causa_acidente": "cau_aci",
    "tipo_acidente": "tip_aci",
    "sentido_via": "sen_via",
    "condicao_meteorologica": "con_met",
    "tipo_pista": "tip_pis",
    "tracado_via": "tra_via",
    "caracteristicas_via": "car_via",
    
    # Dimensão Pessoa
    "pessoa_id": "cod_pes",
    "tipo_envolvido": "tip_env",
    "estado_fisico": "est_fis",
    "faixa_etaria": "fax_eta",
    "sexo": "gen",
    
    # Dimensão Veículo
    "veiculo_id": "cod_vei",
    "tipo_veiculo": "tip_vei",
    "faixa_idade_veiculo": "fax_ida",
    
    # Tabela Fato
    "sinistro_id": "cod_sin"
}

silver_df_renamed = silver_df.rename(columns=column_mappings)
table_mappings = {
    # Dimensão Tempo
    "dim_tem": ["dat_ocr", "ano_arq", "dia_sem_num", "fas_dia"],
    # Dimensão Localidade
    "dim_loc": ["uni_fed", "nom_mun", "nom_del", "num_lat", "num_lon"],
    # Dimensão Circunstância
    "dim_cir": ["cau_aci", "tip_aci", "sen_via", "con_met", "tip_pis", "tra_via", "car_via"],
    # Dimensão Pessoa
    "dim_pes": ["cod_pes", "tip_env", "est_fis", "fax_eta", "gen"],
    # Dimensão Veículo
    "dim_vei": ["cod_vei", "tip_vei", "fax_ida"],
    # Tabela Fato
    "fat_sin": ["cod_sin"]
}
table_primary_keys = {
    "dim_tem": "srk_tem",
    "dim_loc": "srk_loc",
    "dim_cir": "srk_cir",
    "dim_pes": "srk_pes",
    "dim_vei": "srk_vei",
    "fat_sin": "srk_sin"
}

## 3. Load

Esta célula carrega os dados transformados para a camada gold no banco de dados PostgreSQL. Ela cria a tabela na camada gold, se ainda não existir, e insere os dados do DataFrame na tabela correspondente.

In [None]:
DB_SCHEMA_GOLD = "dw"

DDL_GOLD_PATH = BASE_PATH / "data_layer" / "gold" / "ddl.sql"

connection = get_database_connection(DB_CONFIG)

def load_ddl_to_database(conn, ddl_path):
    with open(ddl_path, 'r') as file:
        ddl_sql = file.read()
    with conn.cursor() as cursor:
        cursor.execute(ddl_sql)

load_ddl_to_database(connection, DDL_GOLD_PATH)

def insert_into_table(conn, table_full_name: str, data: dict, returning_column: str = None):
    columns = ', '.join(data.keys())
    values = ', '.join(['%s'] * len(data))
    insert_query = f"INSERT INTO {table_full_name} ({columns}) VALUES ({values})"
    if returning_column:
        insert_query += f" RETURNING {returning_column};"
    else:
        insert_query += ";"
    
    with conn.cursor() as cursor:
        cursor.execute(insert_query, list(data.values()))
        if returning_column:
            returned_id = cursor.fetchone()[0]
            return returned_id
        conn.commit()
    return None

def load_data_to_gold_table(conn, df: pd.DataFrame, table_mappings: object, table_primary_keys: object, schema: str):
    for idx, row in df.iterrows():
        surrogate_keys = {}
        
        for table_name, columns in table_mappings.items():
            if table_name == "fat_sin":
                continue
            
            cod_columns = [col for col in columns if col.startswith("cod_")]
            if cod_columns and pd.isna(row[cod_columns[0]]):
                continue
            
            table_full_name = f"{schema}.{table_name}"
            data = {col: row[col] for col in columns}
            pk_column = table_primary_keys[table_name]
            surrogate_key = insert_into_table(conn, table_full_name, data, returning_column=pk_column)
            surrogate_keys[pk_column] = surrogate_key
        
        fat_sin_full_name = f"{schema}.fat_sin"
        fat_sin_data = {col: row[col] for col in table_mappings["fat_sin"]}
        fat_sin_data.update(surrogate_keys)
        insert_into_table(conn, fat_sin_full_name, fat_sin_data)

load_data_to_gold_table(connection, silver_df_renamed, table_mappings, table_primary_keys, DB_SCHEMA_GOLD)
close_database_connection(connection)
print("Carga concluída!")