# **ETL da raw para a silver**

Esse notebook ira realisar o ETL, um processo de três etapas — Extrair, Transformar e Carregar, usado para integrar dados de diferentes fontes em um único dat warehouse. 
Essa metodologia combina dados, limpando-os e organizando-os para análise, relatórios e tomada de decisões de negócios. 




A célula abaixo instala as bibliotecas Python necessárias (como Pandas para dados e SQLAlchemy para banco de dados) no ambiente do notebook.

In [43]:
%pip install -q pandas sqlalchemy psycopg2-binary python-dotenv pyarrow tqdm

Note: you may need to restart the kernel to use updated packages.


#### Importando as Bibliotecas necessarias

In [44]:
import pandas as pd
from sqlalchemy import create_engine, text
from tqdm import tqdm
from pathlib import Path
import os

In [45]:
# ---------- Configuração de schema e função utilitária para leitura ----------
Int = "Int64"  # inteiros que aceitam NA
Str = "string"

DTYPES = {
    "olist_orders_dataset.csv": {
        "order_id": Str,
        "customer_id": Str,
        "order_status": Str,
    },
    "olist_order_items_dataset.csv": {
        "order_id": Str,
        "order_item_id": Int,
        "product_id": Str,
        "seller_id": Str,
        "price": "float64",
        "freight_value": "float64",
    },
    "olist_order_payments_dataset.csv": {
        "order_id": Str,
        "payment_sequential": Int,
        "payment_type": Str,
        "payment_installments": Int,
        "payment_value": "float64",
    },
    "olist_order_reviews_dataset.csv": {
        "review_id": Str,
        "order_id": Str,
        "review_score": Int,
        "review_comment_title": Str,
        "review_comment_message": Str,
    },
    "olist_products_dataset.csv": {
        "product_id": Str,
        "product_category_name": Str,
        "product_name_lenght": Int,
        "product_description_lenght": Int,
        "product_photos_qty": Int,
        "product_weight_g": Int,
        "product_length_cm": Int,
        "product_height_cm": Int,
        "product_width_cm": Int,
    },
    "olist_sellers_dataset.csv": {
        "seller_id": Str,
        "seller_zip_code_prefix": Int,
        "seller_city": Str,
        "seller_state": Str,
    },
    "olist_customers_dataset.csv": {
        "customer_id": Str,
        "customer_unique_id": Str,
        "customer_zip_code_prefix": Int,
        "customer_city": Str,
        "customer_state": Str,
    },
    "olist_geolocation_dataset.csv": {
        "geolocation_zip_code_prefix": Int,
        "geolocation_lat": "float64",
        "geolocation_lng": "float64",
        "geolocation_city": Str,
        "geolocation_state": Str,
    },
    "product_category_name_translation.csv": {
        "product_category_name": Str,
        "product_category_name_english": Str,
    },
}

PARSE_DATES = {
    "olist_orders_dataset.csv": [
        "order_purchase_timestamp",
        "order_approved_at",
        "order_delivered_carrier_date",
        "order_delivered_customer_date",
        "order_estimated_delivery_date",
    ],
    "olist_order_items_dataset.csv": [
        "shipping_limit_date",
    ],
    "olist_order_reviews_dataset.csv": [
        "review_creation_date",
        "review_answer_timestamp",
    ],
}

def load_csv(filename: str, dtypes=None, parse_dates=None):
    """Carrega um CSV da Bronze, padroniza nomes das colunas (lower) e retorna DataFrame."""
    path = RAW_DIR / filename
    if not path.exists():
        raise FileNotFoundError(f"CSV não encontrado: {path}")

    df = pd.read_csv(
        path,
        dtype=dtypes or {},
        parse_dates=parse_dates,
        keep_default_na=True,
        encoding="utf-8",
        infer_datetime_format=True,
        low_memory=False,
    )
    df.columns = [c.strip().lower() for c in df.columns]
    return df


A célula abaixo encontra a pasta raiz do projeto buscando por data_layer, define os caminhos importantes (raw, sql), e verifica se os dados brutos (raw e o arquivo de pedidos) existem e podem ser lidos, exibindo as 5 primeiras linhas como prova.

In [46]:
# 1) Detectar automaticamente a raiz que contém "data_layer"
CWD = Path.cwd()
PROJECT_ROOT = None
for candidate in [CWD, *CWD.parents]:
    if (candidate / "data_layer").exists():
        PROJECT_ROOT = candidate
        break

if PROJECT_ROOT is None:
    raise FileNotFoundError(
        f'Não achei a pasta "data_layer" a partir de {CWD}. '
        f'Abra o notebook a partir do repositório ou mova este .ipynb para dentro dele.'
    )

# 2) Recalcular caminhos com base na raiz correta
RAW_DIR = PROJECT_ROOT / "data_layer" /  "raw"
# DDL está na pasta silver (mesmo que não vamos criar pasta silver)
DDL_PATH = PROJECT_ROOT / "data_layer" / "silver" / "DDL.sql"

print("CWD:", CWD)
print("PROJECT_ROOT:", PROJECT_ROOT)
print("RAW_DIR:", RAW_DIR)
print("DDL_PATH:", DDL_PATH)

# 3) Validar que CSVs existem e sao kegiveis
assert RAW_DIR.exists(), f"Pasta Bronze (raw) não encontrada: {RAW_DIR}"

# 4) Checagem mínima: tentar ler 5 linhas do orders
orders_csv = RAW_DIR / "olist_orders_dataset.csv"
assert orders_csv.exists(), f"Arquivo esperado não encontrado: {orders_csv}"
display(pd.read_csv(orders_csv, nrows=5).head())
print("✅ Bronze encontrada e legível.")


CWD: /home/oem/Documentos/FGA/SBD2/SenTry/brazilian_e-commerce_analysis/Transfomer
PROJECT_ROOT: /home/oem/Documentos/FGA/SBD2/SenTry/brazilian_e-commerce_analysis
RAW_DIR: /home/oem/Documentos/FGA/SBD2/SenTry/brazilian_e-commerce_analysis/data_layer/raw
DDL_PATH: /home/oem/Documentos/FGA/SBD2/SenTry/brazilian_e-commerce_analysis/data_layer/silver/DDL.sql


Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00


✅ Bronze encontrada e legível.


# EXTRACT — Ler dados da Bronze

- Carrega todos os CSVs de `olist-ecommerce-pipeline/data/raw`.
- Define dtypes explícitos e faz *parse* de datas.
- Exibe *shape*, tipos e amostra de linhas para validação.


### DB Config & Connection Test

_Lê variáveis do`` =.env`` (se existir) ou usa defaults locais.
Testa conexão, cria o schema ``DL`` (se não existir) e ajusta ``search_path``.
Não falha o notebook se o Postgres não estiver no ar apenas avisa._

In [47]:
# 1) Carregar .env se existir (opcional)
ENV_PATH = PROJECT_ROOT / ".env"
if ENV_PATH.exists():
    # carregamento leve do .env (sem dependências)
    with open(ENV_PATH, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#") or "=" not in line:
                continue
            k, v = line.split("=", 1)
            os.environ.setdefault(k.strip(), v.strip())

# 2) Variáveis de conexão (definidas aqui, mas a conexão será aberta somente na etapa LOAD no final do notebook)
DB_HOST   = os.getenv("PGHOST", "localhost")
DB_PORT   = os.getenv("PGPORT", "5435")
DB_NAME   = os.getenv("PGDATABASE", "olist")
DB_USER   = os.getenv("PGUSER", "postgres")
DB_PASS   = os.getenv("PGPASSWORD", "postgres")
DB_SCHEMA = os.getenv("PGSCHEMA", "DL")   # Data Layer (DL)

print("DB_HOST:", DB_HOST, "| DB_PORT:", DB_PORT, "| DB_NAME:", DB_NAME, "| SCHEMA:", DB_SCHEMA)

# Nota: Não criamos `engine` nem tentamos abrir conexão aqui —
# a conexão com o banco será feita apenas na etapa final de LOAD (para evitar depender do DB durante o desenvolvimento/transformações).


DB_HOST: localhost | DB_PORT: 5435 | DB_NAME: olist | SCHEMA: DL


A célula abaixo, define uma função que verifica a existência de um arquivo CSV, carrega seu conteúdo da pasta bruta (Bronze) para a memória aplicando tipagens específicas, e, em seguida, executa um loop para extrair todos os arquivos listados ``(CSV_LIST)``, criando DataFrames em variáveis de acesso rápido.

In [48]:
def load_csv(filename: str, dtypes=None, parse_dates=None):
    """Carrega um CSV da Bronze, padroniza nomes das colunas (lower) e retorna DataFrame."""
    # 1. Encontra e verifica o caminho do arquivo no disco
    path = RAW_DIR / filename
    if not path.exists():
        raise FileNotFoundError(f"CSV não encontrado: {path}")

    # 2. LÊ o arquivo do disco para um DataFrame do Pandas (Extração Central)
    df = pd.read_csv(
        path,
        dtype=dtypes or {},
        parse_dates=parse_dates or [],
        keep_default_na=True,
        encoding="utf-8",
        # infer_datetime_format pode acelerar parsing em versões antigas do pandas,
        # mas foi comentado antes por gerar barras duplicadas em alguns ambientes.
        # infer_datetime_format=True, 
        low_memory=False,
    )
    # Padroniza nomes das colunas: remove espaços e coloca em lower-case (transformação) 
    df.columns = [c.strip().lower() for c in df.columns]
    return df

# ---------- load all ----------
# Lista de CSVs esperados na pasta raw. 
CSV_LIST = [
    "olist_customers_dataset.csv",
    "olist_geolocation_dataset.csv",
    "olist_orders_dataset.csv",
    "olist_order_items_dataset.csv",
    "olist_order_payments_dataset.csv",
    "olist_order_reviews_dataset.csv",
    "olist_products_dataset.csv",
    "olist_sellers_dataset.csv",
    "product_category_name_translation.csv",
]

# Safety: garantir que DTYPES e PARSE_DATES existam mesmo se o usuário executou células fora de ordem
if 'DTYPES' not in globals():
    print("[AVISO] DTYPES não encontrado. Definindo DTYPES = {} por segurança.")
    DTYPES = {}
if 'PARSE_DATES' not in globals():
    print("[AVISO] PARSE_DATES não encontrado. Definindo PARSE_DATES = {} por segurança.")
    PARSE_DATES = {}

dfs = {}
# 3. Itera sobre a lista e chama a função de leitura para extrair todos
for name in tqdm(CSV_LIST, desc="Lendo Bronze"):
    dfs[name] = load_csv(
        name,
        dtypes=DTYPES.get(name),
        parse_dates=PARSE_DATES.get(name),
    )

# 4. Cria aliases para os DataFrames extraídos
customers   = dfs["olist_customers_dataset.csv"]
geos        = dfs["olist_geolocation_dataset.csv"]
orders      = dfs["olist_orders_dataset.csv"]
items       = dfs["olist_order_items_dataset.csv"]
payments    = dfs["olist_order_payments_dataset.csv"]
reviews     = dfs["olist_order_reviews_dataset.csv"]
products    = dfs["olist_products_dataset.csv"]
sellers     = dfs["olist_sellers_dataset.csv"]
prod_trans  = dfs["product_category_name_translation.csv"]

Lendo Bronze: 100%|██████████| 9/9 [00:03<00:00,  2.26it/s]


# TRANSFORM - Normalização para a Camada Silver

- Padroniza strings, remove nulos críticos e duplicidades.
- Enriquece `products` com tradução de categoria.
- Garante integridade referencial: `items`, `payments` e `reviews` só com `order_id` válido; `items` só com `product_id`/`seller_id` válidos.
- Agrega/geolocalização deduplicada por CEP prefixo.
- Deriva campos úteis em `orders` (datas e métricas de entrega).
- Cria dataframes finais: `silver_customers`, `silver_orders`, `silver_order_items`, `silver_products`, `silver_sellers`, `silver_payments`, `silver_reviews`, `silver_geolocation`.


Definindo a função ``load_csv`` para leitura e padronização, e configura os dicionários ``(DTYPES, PARSE_DATES)`` que especificam os tipos de dados e colunas de data esperados para cada arquivo CSV, preparando as regras de transformação.

In [49]:
def load_csv(filename: str, dtypes=None, parse_dates=None):
    """Carrega um CSV da Bronze, padroniza nomes das colunas (lower) e retorna DataFrame."""
    path = RAW_DIR / filename
    if not path.exists():
        raise FileNotFoundError(f"CSV não encontrado: {path}")
    df = pd.read_csv(
        path,
        dtype=dtypes or {},
        parse_dates=parse_dates,
        keep_default_na=True,
        encoding="utf-8",
        infer_datetime_format=True,
        low_memory=False,
    )
    df.columns = [c.strip().lower() for c in df.columns]
    return df


In [50]:
# dtypes por arquivo (usando tipos que aceitam NA quando necessário)
Int = "Int64"  # inteiro com suporte a NA
Str = "string"

DTYPES = {
    "olist_orders_dataset.csv": {
        "order_id": Str,
        "customer_id": Str,
        "order_status": Str,
        # timestamps lidos via parse_dates
    },
    "olist_order_items_dataset.csv": {
        "order_id": Str,
        "order_item_id": Int,
        "product_id": Str,
        "seller_id": Str,
        "price": "float64",
        "freight_value": "float64",
        # shipping_limit_date via parse_dates
    },
    "olist_order_payments_dataset.csv": {
        "order_id": Str,
        "payment_sequential": Int,
        "payment_type": Str,
        "payment_installments": Int,
        "payment_value": "float64",
    },
    "olist_order_reviews_dataset.csv": {
        "review_id": Str,
        "order_id": Str,
        "review_score": Int,
        "review_comment_title": Str,
        "review_comment_message": Str,
        # creation/answer via parse_dates
    },
    "olist_products_dataset.csv": {
        "product_id": Str,
        "product_category_name": Str,
        "product_name_lenght": Int,
        "product_description_lenght": Int,
        "product_photos_qty": Int,
        "product_weight_g": Int,
        "product_length_cm": Int,
        "product_height_cm": Int,
        "product_width_cm": Int,
    },
    "olist_sellers_dataset.csv": {
        "seller_id": Str,
        "seller_zip_code_prefix": Int,
        "seller_city": Str,
        "seller_state": Str,
    },
    "olist_customers_dataset.csv": {
        "customer_id": Str,
        "customer_unique_id": Str,
        "customer_zip_code_prefix": Int,
        "customer_city": Str,
        "customer_state": Str,
    },
    "olist_geolocation_dataset.csv": {
        "geolocation_zip_code_prefix": Int,
        "geolocation_lat": "float64",
        "geolocation_lng": "float64",
        "geolocation_city": Str,
        "geolocation_state": Str,
    },
    "product_category_name_translation.csv": {
        "product_category_name": Str,
        "product_category_name_english": Str,
    },
}

PARSE_DATES = {
    "olist_orders_dataset.csv": [
        "order_purchase_timestamp",
        "order_approved_at",
        "order_delivered_carrier_date",
        "order_delivered_customer_date",
        "order_estimated_delivery_date",
    ],
    "olist_order_items_dataset.csv": [
        "shipping_limit_date",
    ],
    "olist_order_reviews_dataset.csv": [
        "review_creation_date",
        "review_answer_timestamp",
    ],
}


Exibindo o número de linhas e colunas (shape) de todos os DataFrames carregados e imprime o detalhe dos tipos de dados (dtypes) do DataFrame de orders, fornecendo uma visão rápida da estrutura.

In [51]:
# ---------- quick summary: Shapes ----------
print("\n# Shapes")
for k, v in dfs.items():
    print(f"{k:40s} -> {v.shape}")

print("\n# Dtypes (orders)")
print(orders.dtypes)


# Shapes
olist_customers_dataset.csv              -> (99441, 5)
olist_geolocation_dataset.csv            -> (1000163, 5)
olist_orders_dataset.csv                 -> (99441, 8)
olist_order_items_dataset.csv            -> (112650, 7)
olist_order_payments_dataset.csv         -> (103886, 5)
olist_order_reviews_dataset.csv          -> (99224, 7)
olist_products_dataset.csv               -> (32951, 9)
olist_sellers_dataset.csv                -> (3095, 4)
product_category_name_translation.csv    -> (71, 2)

# Dtypes (orders)
order_id                         string[python]
customer_id                      string[python]
order_status                     string[python]
order_purchase_timestamp         datetime64[ns]
order_approved_at                datetime64[ns]
order_delivered_carrier_date     datetime64[ns]
order_delivered_customer_date    datetime64[ns]
order_estimated_delivery_date    datetime64[ns]
dtype: object


Exibe as três primeiras linhas (head(3)) de alguns DataFrames principais (orders, items, payments) para uma inspeção visual rápida e validação dos dados.

In [52]:
# ---------- quick summary: Amostras ----------
print("\n# Amostras")
display(orders.head(3))
display(items.head(3))
display(payments.head(3))


# Amostras


Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04


Unnamed: 0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
0,00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29
1,00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03 11:05:13,239.9,19.93
2,000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,2018-01-18 14:48:30,199.0,17.87


Unnamed: 0,order_id,payment_sequential,payment_type,payment_installments,payment_value
0,b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33
1,a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39
2,25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71


Define e executa a função ``na_overview`` para calcular e imprimir a contagem de valores ausentes (NAs) nas colunas dos DataFrames principais, identificando onde a limpeza de dados (Transformação) será necessária.

In [53]:
# NAs por coluna (visão geral) — útil para planejar a transformação
def na_overview(df, name):
    s = df.isna().sum()
    if (s > 0).any():
        print(f"\nNA overview — {name}")
        print(s[s > 0].sort_values(ascending=False).head(12))

na_overview(orders, "orders")
na_overview(items, "items")
na_overview(products, "products")
na_overview(reviews, "reviews")


NA overview — orders
order_delivered_customer_date    2965
order_delivered_carrier_date     1783
order_approved_at                 160
dtype: int64

NA overview — products
product_category_name         610
product_name_lenght           610
product_description_lenght    610
product_photos_qty            610
product_weight_g                2
product_length_cm               2
product_height_cm               2
product_width_cm                2
dtype: int64

NA overview — reviews
review_comment_title      87656
review_comment_message    58247
dtype: int64


### Funções Auxiliares de Limpeza (Transformação)
Define três funções auxiliares essenciais para a fase de Transformação: norm_str (padronização de texto), drop_nulls (remoção de linhas com valores nulos) e drop_dups (remoção de linhas duplicadas, reportando a contagem de linhas afetadas).

In [54]:
def norm_str(df, cols):
    for c in cols:
        if c in df.columns:
            df[c] = (
                df[c]
                .astype("string")
                .str.strip()
            )

def drop_nulls(df, cols, name):
    before = len(df)
    df2 = df.dropna(subset=[c for c in cols if c in df.columns])
    removed = before - len(df2)
    if removed:
        print(f"[{name}] removidas {removed} linhas por nulos em {cols}")
    return df2

def drop_dups(df, keys, name):
    before = len(df)
    df2 = df.drop_duplicates(subset=keys, keep="first")
    removed = before - len(df2)
    if removed:
        print(f"[{name}] removidas {removed} duplicatas por {keys}")
    return df2


### Limpeza e Transformação das Dimensões Mestre
Aplica as funções de limpeza aos DataFrames de customers, sellers e geolocation (tabelas mestre/dimensão), garantindo que as chaves primárias não contenham nulos ou duplicatas, e realiza o merge para traduzir a categoria de products.

In [55]:
# ---------- Customers, Sellers, Geolocation (Limpeza Básica) ----------
silver_customers = customers.copy()
norm_str(silver_customers, ["customer_id","customer_unique_id","customer_city","customer_state"])
silver_customers = drop_nulls(silver_customers, ["customer_id"], "customers")
silver_customers = drop_dups(silver_customers, ["customer_id"], "customers")

silver_sellers = sellers.copy()
norm_str(silver_sellers, ["seller_id","seller_city","seller_state"])
silver_sellers = drop_nulls(silver_sellers, ["seller_id"], "sellers")
silver_sellers = drop_dups(silver_sellers, ["seller_id"], "sellers")

silver_geolocation = geos.copy()
silver_geolocation = drop_nulls(silver_geolocation, ["geolocation_zip_code_prefix"], "geolocation")
silver_geolocation = drop_dups(silver_geolocation, ["geolocation_zip_code_prefix"], "geolocation")

# ---------- Products (+ tradução da categoria) ----------
silver_products = products.copy()
norm_str(silver_products, ["product_id","product_category_name"])
silver_products = silver_products.merge(
    prod_trans, on="product_category_name", how="left"
)
silver_products = silver_products.rename(
    columns={"product_category_name_english": "product_category_en"}
)
silver_products = drop_nulls(silver_products, ["product_id"], "products")
silver_products = drop_dups(silver_products, ["product_id"], "products")

[geolocation] removidas 981148 duplicatas por ['geolocation_zip_code_prefix']


### Transformação e Derivação de Pedidos (Orders)
Limpa a tabela de orders e calcula métricas importantes de tempo de entrega, como o tempo total de delivery, o atraso em relação à estimativa e uma flag binária para indicar se o pedido foi entregue com atraso.

In [56]:
# ---------- Orders (Limpeza e Derivados de Entrega) ----------
silver_orders = orders.copy()
norm_str(silver_orders, ["order_id","customer_id","order_status"])
silver_orders = drop_nulls(silver_orders, ["order_id","customer_id"], "orders")
silver_orders = drop_dups(silver_orders, ["order_id"], "orders")

# Garantir que colunas de data estão no tipo datetime (prevenindo AttributeError ao usar .dt)
date_cols = [
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_carrier_date",
    "order_delivered_customer_date",
    "order_estimated_delivery_date",
]
for c in date_cols:
    if c in silver_orders.columns:
        silver_orders[c] = pd.to_datetime(silver_orders[c], errors='coerce')

# derivados úteis
if "order_purchase_timestamp" in silver_orders.columns:
    silver_orders["order_purchase_date"] = silver_orders["order_purchase_timestamp"].dt.date
else:
    silver_orders["order_purchase_date"] = pd.NaT

silver_orders["delivery_time_days"] = (
    silver_orders["order_delivered_customer_date"] - silver_orders["order_purchase_timestamp"]
)
# extrai dias e armazena como Int64 (suporta NA)
silver_orders["delivery_time_days"] = silver_orders["delivery_time_days"].dt.days.astype("Int64")

silver_orders["delivery_delay_days"] = (
    silver_orders["order_delivered_customer_date"] - silver_orders["order_estimated_delivery_date"]
)
silver_orders["delivery_delay_days"] = silver_orders["delivery_delay_days"].dt.days.astype("Int64")

# flag binária indicando se foi entregue com atraso (1) ou não (0); trata NAs
silver_orders["delivered_late"] = (
    (silver_orders["order_delivered_customer_date"] > silver_orders["order_estimated_delivery_date"]) 
)
# converte para Int64 (1/0/NA)
silver_orders["delivered_late"] = silver_orders["delivered_late"].astype("Int64")


### Integridade Referencial e Limpeza de Transações
Garante a integridade dos dados ao filtrar as tabelas transacionais (items, payments, reviews), removendo registros que não possuem chaves válidas nas tabelas mestre recém-limpas (Silver), e aplica a limpeza final de nulos e duplicatas nessas tabelas.

In [57]:
# ---------- Itens, Pagamentos, Avaliações (Integridade Referencial) ----------
valid_orders = set(silver_orders["order_id"])
valid_products = set(silver_products["product_id"])
valid_sellers  = set(silver_sellers["seller_id"])

# Items
silver_order_items = items.copy()
norm_str(silver_order_items, ["order_id","product_id","seller_id"])
silver_order_items = drop_nulls(silver_order_items, ["order_id","order_item_id","product_id","seller_id"], "order_items")
silver_order_items = silver_order_items[
    silver_order_items["order_id"].isin(valid_orders)
    & silver_order_items["product_id"].isin(valid_products)
    & silver_order_items["seller_id"].isin(valid_sellers)
].copy()
silver_order_items = drop_dups(silver_order_items, ["order_id","order_item_id"], "order_items")

# Payments
silver_payments = payments.copy()
norm_str(silver_payments, ["order_id","payment_type"])
silver_payments = drop_nulls(silver_payments, ["order_id"], "payments")
silver_payments = silver_payments[silver_payments["order_id"].isin(valid_orders)].copy()

# Reviews
silver_reviews = reviews.copy()
norm_str(silver_reviews, ["review_id","order_id"])
silver_reviews = drop_nulls(silver_reviews, ["review_id","order_id"], "reviews")
silver_reviews = silver_reviews[silver_reviews["order_id"].isin(valid_orders)].copy()
silver_reviews = drop_dups(silver_reviews, ["review_id"], "reviews")

[reviews] removidas 814 duplicatas por ['review_id']


### Sanity Check Final da Camada Silver
Exibe um resumo final da estrutura (shape) e amostras dos DataFrames recém-criados e limpos da camada Silver, confirmando que a Transformação foi concluída com sucesso e os dados estão prontos para a fase de Carregamento.

In [58]:
# Crie o dicionário de resumo dos shapes
summary = {
    "silver_customers": silver_customers.shape,
    "silver_sellers": silver_sellers.shape,
    "silver_products": silver_products.shape,
    "silver_geolocation": silver_geolocation.shape,
    "silver_orders": silver_orders.shape,
    "silver_order_items": silver_order_items.shape,
    "silver_payments": silver_payments.shape,
    "silver_reviews": silver_reviews.shape,
}

# Converte o dicionário em um DataFrame para exibição tabular
summary_df = pd.DataFrame(
    summary.values(), 
    index=summary.keys(), 
    columns=["Linhas", "Colunas"]
)

# Título do Resumo
print("✅ Resumo de Shapes da Camada Silver:")

# Exibe o DataFrame de resumo
display(summary_df)

✅ Resumo de Shapes da Camada Silver:


Unnamed: 0,Linhas,Colunas
silver_customers,99441,5
silver_sellers,3095,4
silver_products,32951,10
silver_geolocation,19015,5
silver_orders,99441,12
silver_order_items,112650,7
silver_payments,103886,5
silver_reviews,98410,7


In [59]:
print("\n--- Amostras da Camada Silver ---")

print("\nsilver_orders (Pedidos):")
display(silver_orders.head(3))

print("\nsilver_order_items (Itens de Pedido):")
display(silver_order_items.head(3))

print("\nsilver_products (Produtos):")
display(silver_products.head(3))


--- Amostras da Camada Silver ---

silver_orders (Pedidos):


Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,order_purchase_date,delivery_time_days,delivery_delay_days,delivered_late
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18,2017-10-02,8,-8,0
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13,2018-07-24,13,-6,0
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04,2018-08-08,9,-18,0



silver_order_items (Itens de Pedido):


Unnamed: 0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
0,00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29
1,00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03 11:05:13,239.9,19.93
2,000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,2018-01-18 14:48:30,199.0,17.87



silver_products (Produtos):


Unnamed: 0,product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,product_category_en
0,1e9e8ef04dbcff4541ed26657ea517e5,perfumaria,40,287,1,225,16,10,14,perfumery
1,3aa071139cb16b67ca9e5dea641aaa2f,artes,44,276,1,1000,30,18,20,art
2,96bd76ec8810374ed1b65e291975717f,esporte_lazer,46,250,1,154,18,9,15,sports_leisure


# LOAD


## Conectar e aplicar o DDL

Esta célula configura a conexão com o banco de dados PostgreSQL lendo credenciais de um arquivo .env ou usando defaults, tenta se conectar, garante que o esquema DL (Data Layer) exista e, se o arquivo DDL for encontrado, executa os comandos SQL para criar a estrutura da tabela única ORDER_ITEMS, preparando o banco para receber os dados limpos.

In [60]:
# Connection test — tenta detectar a porta PG funcional e ajusta PGPORT para as próximas células
# Esta célula tenta se conectar via psycopg2 em uma lista de portas candidatas
import os
from pathlib import Path
import psycopg2

# Recarrega .env leve (se existir)
ENV_PATH = PROJECT_ROOT / '.env'
if ENV_PATH.exists():
    for line in ENV_PATH.read_text(encoding='utf-8').splitlines():
        line = line.strip()
        if not line or line.startswith('#') or '=' not in line:
            continue
        k, v = line.split('=', 1)
        os.environ.setdefault(k.strip(), v.strip())

candidates = []
env_port = os.getenv('PGPORT')
if env_port:
    try:
        candidates.append(int(env_port))
    except ValueError:
        pass
# common ports to try (preserves order)
for p in (5435, 5432, 5434, 5436):
    if p not in candidates:
        candidates.append(p)

DB_HOST = os.getenv('PGHOST', 'localhost')
DB_NAME = os.getenv('PGDATABASE', 'olist')
DB_USER = os.getenv('PGUSER', 'postgres')
DB_PASS = os.getenv('PGPASSWORD', 'postgres')

print('Candidates to test:', candidates)

def try_connect(port, timeout=3):
    try:
        conn = psycopg2.connect(host=DB_HOST, port=port, dbname=DB_NAME, user=DB_USER, password=DB_PASS, connect_timeout=timeout)
        conn.close()
        return True, None
    except Exception as e:
        return False, e

success = None
for port in candidates:
    ok, err = try_connect(port)
    if ok:
        print(f'✅ Connection successful on port {port} (host={DB_HOST}, db={DB_NAME}, user={DB_USER})')
        os.environ['PGPORT'] = str(port)
        DB_PORT = str(port)
        success = port
        break
    else:
        print(f'✖ port {port}: {type(err).__name__}: {err}')

if success is None:
    print('\nNo successful connection using provided credentials and tried ports.')
    print('If you expect a Docker container to expose Postgres, check `docker ps` and `docker logs` and/or fix port mapping in docker-compose.yml.')
else:
    print('\nSet PGPORT to', success, 'for subsequent cells. You can re-run the LOAD cell now.')


Candidates to test: [5435, 5432, 5434, 5436]
✖ port 5435: OperationalError: connection to server at "localhost" (127.0.0.1), port 5435 failed: Connection refused
	Is the server running on that host and accepting TCP/IP connections?

✖ port 5432: OperationalError: connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  password authentication failed for user "postgres"
connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL:  password authentication failed for user "postgres"

✖ port 5434: OperationalError: connection to server at "localhost" (127.0.0.1), port 5434 failed: server closed the connection unexpectedly
	This probably means the server terminated abnormally
	before or while processing the request.

✖ port 5436: OperationalError: connection to server at "localhost" (127.0.0.1), port 5436 failed: Connection refused
	Is the server running on that host and accepting TCP/IP connections?


No successful connection using provided credentials and tr

In [61]:
# 2.6.1 — Conectar no Postgres e aplicar o DDL (Data Layer)
# Reusa PROJECT_ROOT, DDL_PATH, etc. já definidos antes
ENV_PATH = PROJECT_ROOT / ".env"
if ENV_PATH.exists():
    for line in ENV_PATH.read_text(encoding="utf-8").splitlines():
        line = line.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        k, v = line.split("=", 1)
        os.environ.setdefault(k.strip(), v.strip())

DB_HOST   = os.getenv("PGHOST", "localhost")
DB_PORT   = os.getenv("PGPORT", "5433")
DB_NAME   = os.getenv("PGDATABASE", "olist")
DB_USER   = os.getenv("PGUSER", "postgres")
DB_PASS   = os.getenv("PGPASSWORD", "postgres")
DB_SCHEMA = os.getenv("PGSCHEMA", "DL")   # Data Layer (DL)

db_url = f"postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(db_url, pool_pre_ping=True, future=True)

print("→ Tentando conectar…")
try:
    with engine.begin() as conn:
        row = conn.exec_driver_sql("select current_database(), current_schema(), version();").fetchone()
        print("Conectado! ->", row)

        # Garante o schema Silver e seta o search_path
        conn.exec_driver_sql(f'CREATE SCHEMA IF NOT EXISTS "{DB_SCHEMA}";')
        conn.exec_driver_sql(f'SET search_path TO "{DB_SCHEMA}", public;')

        # Executa o DDL do Pablo (se existir)
        if DDL_PATH.exists():
            sql = DDL_PATH.read_text(encoding="utf-8").strip()
            print(f"→ Executando DDL de {DDL_PATH.name} (tamanho ~{DDL_PATH.stat().st_size} bytes)…")
            conn.exec_driver_sql(sql)
            print("✅ DDL aplicado.")
        else:
            print("⚠️ DDL não encontrado — seguiremos com to_sql para criar as tabelas.")

except Exception as e:
    import traceback
    print("\n[AVISO] Não foi possível conectar ao Postgres agora. Detalhe do erro:")
    print(type(e).__name__, str(e))
    traceback.print_exc()


→ Tentando conectar…
Conectado! -> ('olist', 'public', 'PostgreSQL 16.10 (Debian 16.10-1.pgdg13+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 14.2.0-19) 14.2.0, 64-bit')
→ Executando DDL de DDL.sql (tamanho ~1797 bytes)…
✅ DDL aplicado.


In [62]:
# 2.5 — Build single unified table for DL (Data Layer)

# Payments aggregated per order (matching DDL.sql structure)
payments_agg = (
    silver_payments
    .groupby("order_id", as_index=False)
    .agg(
        qtd_payment_sequential=("payment_sequential", "count"),
        primeiro_payment_type=("payment_type", "first"),
        valor_total_pagamento=("payment_value", "sum"),
        maximo_payment_installments=("payment_installments", "max")
    )
)

# Get geolocation for sellers (by zip_code_prefix)
seller_geo = (
    silver_geolocation
    .groupby("geolocation_zip_code_prefix", as_index=False)
    .agg(
        vendedor_geolocation_lat=("geolocation_lat", "first"),
        vendedor_geolocation_lng=("geolocation_lng", "first")
    )
    .rename(columns={"geolocation_zip_code_prefix": "seller_zip_code_prefix"})
)

# Get geolocation for customers (by zip_code_prefix)
customer_geo = (
    silver_geolocation
    .groupby("geolocation_zip_code_prefix", as_index=False)
    .agg(
        cliente_geolocation_lat=("geolocation_lat", "first"),
        cliente_geolocation_lng=("geolocation_lng", "first")
    )
    .rename(columns={"geolocation_zip_code_prefix": "customer_zip_code_prefix"})
)

# Build unified table by joining all normalized Silver tables
# Starting from order_items as base (one row per item)
dl_order_items = (
    silver_order_items
    # Join orders
    .merge(
        silver_orders[[
            "order_id", "customer_id", "order_status",
            "order_purchase_timestamp", "order_approved_at",
            "order_delivered_carrier_date", "order_delivered_customer_date",
            "order_estimated_delivery_date"
        ]],
        on="order_id", how="left"
    )
    # Join products
    .merge(
        silver_products[[
            "product_id", "product_category_name",
            "product_name_lenght", "product_description_lenght",
            "product_photos_qty", "product_weight_g",
            "product_length_cm", "product_height_cm", "product_width_cm"
        ]],
        on="product_id", how="left"
    )
    # Join customers
    .merge(
        silver_customers[[
            "customer_id", "customer_unique_id",
            "customer_zip_code_prefix", "customer_city", "customer_state"
        ]],
        on="customer_id", how="left"
    )
    # Join sellers
    .merge(
        silver_sellers[[
            "seller_id", "seller_zip_code_prefix",
            "seller_city", "seller_state"
        ]],
        on="seller_id", how="left"
    )
    # Join payments aggregated
    .merge(
        payments_agg, on="order_id", how="left"
    )
    # Join reviews (one review per order, so we can merge on order_id)
    .merge(
        silver_reviews[[
            "order_id", "review_score", "review_comment_title",
            "review_comment_message", "review_creation_date", "review_answer_timestamp"
        ]],
        on="order_id", how="left"
    )
    # Join seller geolocation
    .merge(
        seller_geo, on="seller_zip_code_prefix", how="left"
    )
    # Join customer geolocation
    .merge(
        customer_geo, on="customer_zip_code_prefix", how="left"
    )
)

# Ensure order_item_id is string (as per DDL: VARCHAR(255))
dl_order_items["order_item_id"] = dl_order_items["order_item_id"].astype(str)

# Select and order columns to match DDL.sql structure
final_cols = [
    "order_item_id",
    "product_id",
    "seller_id",
    "order_id",
    "shipping_limit_date",
    "price",
    "freight_value",
    "product_category_name",
    "product_name_lenght",
    "product_description_lenght",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm",
    "seller_zip_code_prefix",
    "seller_city",
    "seller_state",
    "vendedor_geolocation_lat",
    "vendedor_geolocation_lng",
    "review_comment_title",
    "review_comment_message",
    "customer_unique_id",
    "order_status",
    "qtd_payment_sequential",
    "primeiro_payment_type",
    "valor_total_pagamento",
    "maximo_payment_installments",
    "order_purchase_timestamp",
    "order_delivered_customer_date",
    "order_approved_at",
    "order_delivered_carrier_date",
    "order_estimated_delivery_date",
    "review_score",
    "review_creation_date",
    "review_answer_timestamp",
    "customer_zip_code_prefix",
    "customer_city",
    "customer_state",
    "cliente_geolocation_lat",
    "cliente_geolocation_lng",
]

# Select only columns that exist in the dataframe
dl_order_items = dl_order_items[[c for c in final_cols if c in dl_order_items.columns]].copy()

print(f"✅ Tabela unificada criada: {len(dl_order_items):,} linhas, {len(dl_order_items.columns)} colunas")
print(f"   Colunas: {', '.join(dl_order_items.columns[:5])}... ({len(dl_order_items.columns)} total)")


✅ Tabela unificada criada: 112,952 linhas, 41 colunas
   Colunas: order_item_id, product_id, seller_id, order_id, shipping_limit_date... (41 total)


## Load para DL (Data Layer) - Tabela única ORDER_ITEMS

In [63]:
print("--- Iniciando carga da tabela única ORDER_ITEMS no schema DL ---")

if "engine" not in globals():
    raise RuntimeError("Engine SQLAlchemy não inicializado. Execute a célula de conexão antes desta.")
if "dl_order_items" not in globals():
    raise RuntimeError("DataFrame dl_order_items não encontrado. Execute as etapas de transformação antes da carga.")

try:
    with engine.begin() as conn:
        # Set search path to DL schema
        conn.exec_driver_sql(f'SET search_path TO "{DB_SCHEMA}", public;')
        
        # Load single unified table ORDER_ITEMS
        dl_order_items.to_sql(
            "ORDER_ITEMS",
            conn,
            schema=DB_SCHEMA,
            if_exists="replace",
            index=False,
            method="multi",
            chunksize=1000,
        )
        print(f"→ ORDER_ITEMS: {len(dl_order_items):,} linhas carregadas no schema {DB_SCHEMA}.")
    print("✅ Carga concluída.")
except Exception as exc:
    print("❌ Falha ao carregar dados no Postgres:", exc)
    raise

--- Iniciando carga da tabela única ORDER_ITEMS no schema DL ---
→ ORDER_ITEMS: 112,952 linhas carregadas no schema DL.
✅ Carga concluída.
