# Pipeline ETL: Silver-to-Gold (DW)

Este notebook executa o processo de ETL (Extração, Transformação e Carga) para mover dados da camada Silver (uber_silver) para a camada Gold (Data Warehouse).

Lógica:
1.  Dimensões: Carga incremental (apenas insere novos registos).
2.  Factos: Carga completa (TRUNCATE + INSERT ... SELECT) otimizada em SQL.

## 0. Importação das Bibliotecas

Importa as bibliotecas necessárias para o pipeline.

In [1]:
import pandas as pd
from sqlalchemy import create_engine, text
import logging
import os
import sys

## 1. Configuração do Logging

Configura o sistema de logging para registar informações, erros e o progresso do script em vez de usar print().

In [2]:
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    handlers=[
                        logging.StreamHandler(sys.stdout) 
                    ])

## 2. Ligação à Base de Dados

Define a função connect_to_postgres que estabelece a ligação à base de dados PostgreSQL. Esta usa variáveis de ambiente para as credenciais.

In [3]:
def connect_to_postgres():
    try:
        db_user = os.getenv('POSTGRES_USER', 'admin')
        db_password = os.getenv('POSTGRES_PASSWORD', 'admin')
        db_name = os.getenv('POSTGRES_DB', 'postgres')
        db_host = 'localhost'
        
        conn_string = f"postgresql://{db_user}:{db_password}@{db_host}/{db_name}"
        
        logging.info("Criando engine do Postgres...")
        engine = create_engine(conn_string)
        
        with engine.connect() as conn:
            logging.info("Ligação com o Postgres estabelecida com sucesso!")
        
        return engine
    
    except Exception as e:
        logging.error(f"Falha ao ligar/criar engine: {e}")
        return None

## 3. Configuração e Início do Pipeline

Define o esquema de destino (dw) e inicializa a ligação à base de dados.

In [4]:
SCHEMA_NAME = 'dw'
logging.info(f"A iniciar carga incremental para o esquema '{SCHEMA_NAME}'...")

engine = connect_to_postgres()

if engine is None:
    logging.critical("Ligação à base de dados falhou. A abortar o pipeline.")
    raise Exception("Falha na ligação à Base de Dados")

2025-11-23 15:20:58 - INFO - A iniciar carga incremental para o esquema 'dw'...
2025-11-23 15:20:58 - INFO - Criando engine do Postgres...
2025-11-23 15:20:58 - INFO - Ligação com o Postgres estabelecida com sucesso!


## 4. ETAPA 1: Atualização Incremental das Dimensões

Nesta etapa, atualizamos as tabelas de dimensão (dim_cus, dim_veh, dim_pay, dim_loc).

A lógica é incremental: 
1.  Lemos as chaves de negócio (ex: customer_id) já existentes no DW.
2.  Lemos as chaves de negócio da tabela uber_silver.
3.  Identificamos quais chaves existem na silver mas não no DW.
4.  Inserimos (append) apenas as chaves novas. A base de dados (via SERIAL) trata da geração da srk_.

### 4.1. A atualizar dw.dim_cus

In [9]:
try:
    logging.info("Atualizando 'dim_cus'...")
    dw_cus = pd.read_sql(f"SELECT DISTINCT cus_id FROM {SCHEMA_NAME}.dim_cus", engine)
    silver_cus = pd.read_sql("SELECT DISTINCT cus_id FROM uber_silver WHERE cus_id IS NOT NULL", engine)
    
    new_cus = silver_cus[~silver_cus['cus_id'].isin(dw_cus['cus_id'])]
    
    if not new_cus.empty:
        new_cus.to_sql('dim_cus', engine, schema=SCHEMA_NAME, if_exists='append', index=False)
        logging.info(f"-> Inseridos {len(new_cus)} novos registos em 'dim_cus'.")
    else:
        logging.info("-> 'dim_cus' já estava atualizada.")
except Exception as e:
    logging.error(f"ERRO ao atualizar 'dim_cus': {e}")

2025-11-23 15:23:58 - INFO - Atualizando 'dim_cus'...
2025-11-23 15:24:01 - INFO - -> Inseridos 147580 novos registos em 'dim_cus'.


### 4.2. A atualizar dw.dim_veh

In [11]:
try:
    logging.info("Atualizando 'dim_veh'...")
    dw_veh = pd.read_sql(f"SELECT DISTINCT veh_typ::TEXT FROM {SCHEMA_NAME}.dim_veh", engine)
    silver_veh = pd.read_sql("SELECT DISTINCT veh_typ::TEXT FROM uber_silver WHERE veh_typ IS NOT NULL", engine)
    
    new_veh = silver_veh[~silver_veh['veh_typ'].isin(dw_veh['veh_typ'])]
    
    if not new_veh.empty:
        new_veh.to_sql('dim_veh', engine, schema=SCHEMA_NAME, if_exists='append', index=False)
        logging.info(f"-> Inseridos {len(new_veh)} novos registos em 'dim_veh'.")
    else:
        logging.info("-> 'dim_veh' já estava atualizada.")
except Exception as e:
    logging.error(f"ERRO ao atualizar 'dim_veh': {e}")

2025-11-23 15:24:52 - INFO - Atualizando 'dim_veh'...
2025-11-23 15:24:52 - INFO - -> Inseridos 7 novos registos em 'dim_veh'.


### 4.3. A atualizar dw.dim_pay

In [14]:
try:
    logging.info("Atualizando 'dim_pay'...")
    dw_pay = pd.read_sql(f"SELECT DISTINCT pay_mtd::TEXT FROM {SCHEMA_NAME}.dim_pay", engine)
    silver_pay = pd.read_sql("SELECT DISTINCT pay_mtd::TEXT FROM uber_silver WHERE pay_mtd IS NOT NULL", engine)

    new_pay = silver_pay[~silver_pay['pay_mtd'].isin(dw_pay['pay_mtd'])]
    
    if not new_pay.empty:
        new_pay.to_sql('dim_pay', engine, schema=SCHEMA_NAME, if_exists='append', index=False)
        logging.info(f"-> Inseridos {len(new_pay)} novos registos em 'dim_pay'.")
    else:
        logging.info("-> 'dim_pay' já estava atualizada.")
except Exception as e:
    logging.error(f"ERRO ao atualizar 'dim_pay': {e}")

2025-11-23 15:26:16 - INFO - Atualizando 'dim_pay'...
2025-11-23 15:26:16 - INFO - -> Inseridos 5 novos registos em 'dim_pay'.


### 4.4. A atualizar dw.dim_loc

Esta dimensão usa uma chave composta (pickup_location, drop_location), pelo que a lógica usa um merge do Pandas para identificar os novos pares.

In [17]:
try:
    logging.info("Atualizando 'dim_loc'...")
    dw_loc = pd.read_sql(f"SELECT pic_loc, drp_loc FROM {SCHEMA_NAME}.dim_loc", engine)
    silver_loc = pd.read_sql(
        "SELECT DISTINCT pic_loc, drp_loc FROM uber_silver "
        "WHERE pic_loc IS NOT NULL AND drp_loc IS NOT NULL", 
        engine
    )
    
    new_loc = silver_loc.merge(
        dw_loc, 
        on=['pic_loc', 'drp_loc'], 
        how='left', 
        indicator=True
    )
    new_loc = new_loc[new_loc['_merge'] == 'left_only'].drop(columns=['_merge'])
    
    if not new_loc.empty:
        new_loc.to_sql('dim_loc', engine, schema=SCHEMA_NAME, if_exists='append', index=False)
        logging.info(f"-> Inseridos {len(new_loc)} novos registos em 'dim_loc'.")
    else:
        logging.info("-> 'dim_loc' já estava atualizada.")
except Exception as e:
    logging.error(f"ERRO ao atualizar 'dim_loc': {e}")

2025-11-23 15:28:53 - INFO - Atualizando 'dim_loc'...
2025-11-23 15:28:54 - INFO - -> Inseridos 30556 novos registos em 'dim_loc'.


### 4.5. A atualizar dw.dim_time

Esta etapa garante que a dimensão de data esteja populada.

A lógica é idempotente:
1.  Insere a linha padrão 'N/A' se ela não existir.
2.  Encontra o intervalo (MIN e MAX) de datas na `uber_silver`.
3.  Usa `generate_series` do PostgreSQL para criar todas as linhas de data nesse intervalo.
4.  Usa `ON CONFLICT (srk_date) DO NOTHING` para inserir apenas as datas que ainda não existem.

In [22]:
dim_date_load_sql = f"""
INSERT INTO {SCHEMA_NAME}.dim_dat (
    srk_dat, fll_dat, day_nme, day_wek, day_mth, day_yea,
    mth_nme, mth_nbr, qtr_nbr, yea_nbr, is_wkd, is_hol
) VALUES (
    -1, '1900-01-01', 'N/A', 0, 0, 0, 'N/A', 0, 0, 0, FALSE, FALSE
)
ON CONFLICT (srk_dat) DO NOTHING;

WITH date_range AS (
    SELECT 
        MIN(dtt::date) AS min_date,
        MAX(dtt::date) AS max_date
    FROM uber_silver
    WHERE dtt IS NOT NULL
)
INSERT INTO {SCHEMA_NAME}.dim_dat (
    srk_dat, fll_dat, day_nme, day_wek, day_mth, day_yea,
    mth_nme, mth_nbr, qtr_nbr, yea_nbr, is_wkd, is_hol
)
SELECT
    TO_CHAR(d, 'YYYYMMDD')::INTEGER AS srk_dat,
    d AS fll_dat,
    
    TO_CHAR(d, 'FMDay') AS day_nme,
    EXTRACT(ISODOW FROM d) AS day_wek, 
    EXTRACT(DAY FROM d) AS day_mth,
    EXTRACT(DOY FROM d) AS day_yea,
    TO_CHAR(d, 'FMMonth') AS mth_nme,
    EXTRACT(MONTH FROM d) AS mth_nbr,
    EXTRACT(QUARTER FROM d) AS qtr_nbr,
    EXTRACT(YEAR FROM d) AS yea_nbr,
    EXTRACT(ISODOW FROM d) IN (6, 7) AS is_wkd,
    FALSE AS is_hol
FROM
    generate_series(
        (SELECT min_date FROM date_range),
        (SELECT max_date FROM date_range),
        '1 day'::interval
    ) AS t(d)
ON CONFLICT (srk_dat) DO NOTHING;
"""


try:
    logging.info("A popular/atualizar 'dim_date' de forma idempotente...")
    with engine.begin() as conn:
        conn.execute(text(dim_date_load_sql))
    
    logging.info("-> 'dim_date' está populada e atualizada.")

except Exception as e:
    logging.critical(f"ERRO CRÍTICO ao popular 'dim_date': {e}")
    raise e

2025-11-23 15:34:09 - INFO - A popular/atualizar 'dim_date' de forma idempotente...
2025-11-23 15:34:09 - INFO - -> 'dim_date' está populada e atualizada.


## 5. ETAPA 2: Recarga da Tabela de Factos (fat_rid)

Esta é a etapa principal. Executa uma única consulta SQL que corre inteiramente no PostgreSQL.

1.  TRUNCATE: A tabela de factos é limpa.
2.  INSERT ... SELECT: A consulta SQL faz o JOIN entre a uber_silver e as dimensões (que acabámos de atualizar), buscando as srk_ corretas.
3.  COALESCE: Garante que, se um JOIN falhar, a srk_ seja preenchida com a chave padrão 'N/A' (semeadas no DW).

### 5.1. Definição da Query SQL da Tabela de Factos

In [31]:
fact_load_sql = f"""
-- Etapa A: Limpar a tabela de factos
TRUNCATE TABLE {SCHEMA_NAME}.fat_rid RESTART IDENTITY;

-- Etapa B: Inserir na tabela de factos
INSERT INTO {SCHEMA_NAME}.fat_rid (
    -- Métricas e Factos
    dtt, bkg_stt, avg_vtt, avg_ctt, ccd_by,
    rfc, irr, bkg_vle, 
    rid_dis, drv_rtg, cus_rtg,
    
    -- Chaves Estrangeiras (SKs)
    srk_cus, srk_veh, srk_pay, srk_loc,
    srk_dat
)
WITH
-- Pré-calcula as chaves "N/A"
default_keys AS (
    SELECT 
        (SELECT srk_cus FROM {SCHEMA_NAME}.dim_cus WHERE cus_id = 'N/A') AS srk_cus_na,
        (SELECT srk_veh FROM {SCHEMA_NAME}.dim_veh WHERE veh_typ = 'N/A') AS srk_veh_na,
        (SELECT srk_pay FROM {SCHEMA_NAME}.dim_pay WHERE pay_mtd = 'N/A') AS srk_pay_na,
        (SELECT srk_loc FROM {SCHEMA_NAME}.dim_loc WHERE pic_loc = 'N/A' AND drp_loc = 'N/A') AS srk_loc_na
)
SELECT
    -- Campos diretos da tabela silver
    s.dtt AS dtt,
    (s.bkg_stt::TEXT)::dw.bkg_stt_enum AS bkg_stt,
    s.avg_vtt AS avg_vtt,
    s.avg_ctt AS avg_ctt,
    (s.ccd_by::TEXT)::dw.ccd_by_enum AS ccd_by,
    (s.rfc::TEXT)::dw.rfc_enum AS rfc,
    (s.irr::TEXT)::dw.irr_enum AS irr,
    s.bkg_vle AS bkg_vle,
    s.rid_dis AS rid_dis,
    s.drv_rtg AS drv_rtg,
    s.cus_rtg AS cus_rtg,
    
    -- SKs
    COALESCE(cus.srk_cus, def.srk_cus_na),
    COALESCE(veh.srk_veh, def.srk_veh_na),
    COALESCE(pay.srk_pay, def.srk_pay_na),
    COALESCE(loc.srk_loc, def.srk_loc_na),
    
    -- SK de data
    COALESCE(d.srk_dat, -1)
FROM
    uber_silver AS s
CROSS JOIN
    default_keys AS def
LEFT JOIN
    {SCHEMA_NAME}.dim_cus AS cus ON s.cus_id = cus.cus_id
LEFT JOIN
    {SCHEMA_NAME}.dim_veh AS veh ON s.veh_typ::TEXT = veh.veh_typ::TEXT
LEFT JOIN
    {SCHEMA_NAME}.dim_pay AS pay ON s.pay_mtd::TEXT = pay.pay_mtd::TEXT
LEFT JOIN
    {SCHEMA_NAME}.dim_loc AS loc 
    ON s.pic_loc = loc.pic_loc
   AND s.drp_loc   = loc.drp_loc
LEFT JOIN
    {SCHEMA_NAME}.dim_dat AS d 
    ON TO_CHAR(s.dtt, 'YYYYMMDD')::INTEGER = d.srk_dat;
"""

logging.info("Query SQL da 'fat_rid' atualizada com sucesso.")


2025-11-23 15:39:43 - INFO - Query SQL da 'fat_rid' atualizada com sucesso.


### 5.2. Execução da Carga da Tabela de Factos

Executa a query SQL de carga dentro de uma transação para garantir a atomicidade.

In [32]:
try:
    logging.info("Executando carga da 'fat_rid' (TRUNCATE + INSERT) ...")
    with engine.begin() as conn:
        conn.execute(text(fact_load_sql))
    
    logging.info("-> Carga da 'fat_rid' concluída com sucesso.")

except Exception as e:
    logging.critical(f"ERRO CRÍTICO ao recarregar 'fat_rid': {e}")
    logging.error("A tabela de factos pode estar vazia ou em estado inconsistente.")


2025-11-23 15:39:44 - INFO - Executando carga da 'fat_rid' (TRUNCATE + INSERT) ...
2025-11-23 15:39:55 - INFO - -> Carga da 'fat_rid' concluída com sucesso.


## 6. Limpeza e Conclusão

Fecha o pool de ligações do engine do SQLAlchemy.

In [33]:
if engine:
    engine.dispose()
    logging.info("Engine do SQLAlchemy fechado.")

logging.info("Processo ETL (Silver-to-Gold) concluído.")

2025-11-23 15:39:57 - INFO - Engine do SQLAlchemy fechado.
2025-11-23 15:39:57 - INFO - Processo ETL (Silver-to-Gold) concluído.
