# ETL Pipeline (Staging → OLTP → DW → Load → Validations)

Este notebook implementa todas as etapas solicitadas:

1. STAGING — carregar CSVs para tabelas temporárias (staging)
2. OLTP — normalização e limpeza
3. DW MODEL — criação das dimensões e fato (vazias)
4. ETL LOAD — carregar dimensões (SCD2 simplificado) e fato
5. VALIDAÇÕES — contagens e checagens de integridade

Observação: o notebook assume que você tem um arquivo `connection.py` no mesmo projeto
com a função `get_engine()` que retorna uma SQLAlchemy Engine conectada ao seu Postgres.


In [1]:
import pandas as pd
from sqlalchemy import text
from connection import get_engine
from pathlib import Path
import os

# Diretório atual do notebook (scripts). Ajuste se necessário.
NOTEBOOK_DIR = Path(os.getcwd())
DATA_DIR = NOTEBOOK_DIR.parent / 'data'

ORDERS_PATH = DATA_DIR / 'olist_orders_dataset.csv'
ITEMS_PATH = DATA_DIR / 'olist_order_items_dataset.csv'
CUSTOMERS_PATH = DATA_DIR / 'olist_customers_dataset.csv'
PRODUCTS_PATH = DATA_DIR / 'olist_products_dataset.csv'
SELLERS_PATH = DATA_DIR / 'olist_sellers_dataset.csv'

print('Notebook dir:', NOTEBOOK_DIR)
print('Data dir:', DATA_DIR)
print('Files:')
print(list(DATA_DIR.glob('*.csv')))


Notebook dir: C:\Users\mmsantos\Downloads\db p2\P2-BDA-\projeto-dw-grupo-A\scripts
Data dir: C:\Users\mmsantos\Downloads\db p2\P2-BDA-\projeto-dw-grupo-A\data
Files:
[WindowsPath('C:/Users/mmsantos/Downloads/db p2/P2-BDA-/projeto-dw-grupo-A/data/olist_customers_dataset.csv'), WindowsPath('C:/Users/mmsantos/Downloads/db p2/P2-BDA-/projeto-dw-grupo-A/data/olist_orders_dataset.csv'), WindowsPath('C:/Users/mmsantos/Downloads/db p2/P2-BDA-/projeto-dw-grupo-A/data/olist_order_items_dataset.csv'), WindowsPath('C:/Users/mmsantos/Downloads/db p2/P2-BDA-/projeto-dw-grupo-A/data/olist_products_dataset.csv'), WindowsPath('C:/Users/mmsantos/Downloads/db p2/P2-BDA-/projeto-dw-grupo-A/data/olist_sellers_dataset.csv')]


In [2]:
# Carregar CSVs com pandas (preview)

df_orders = pd.read_csv(ORDERS_PATH)
df_items = pd.read_csv(ITEMS_PATH)
df_customers = pd.read_csv(CUSTOMERS_PATH)
df_products = pd.read_csv(PRODUCTS_PATH)
df_sellers = pd.read_csv(SELLERS_PATH)

print('orders rows, cols:', df_orders.shape)
print('items rows, cols:', df_items.shape)

# mostrar cabeçalhos
from IPython.display import display
print('\norders head:')
display(df_orders.head())
print('\nitems head:')
display(df_items.head())


orders rows, cols: (99441, 8)
items rows, cols: (112650, 7)

orders head:


Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00



items head:


Unnamed: 0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
0,00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29
1,00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03 11:05:13,239.9,19.93
2,000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,2018-01-18 14:48:30,199.0,17.87
3,00024acbcdf0a6daa1e931b038114c75,1,7634da152a4610f1595efa32f14722fc,9d7a1d34a5052409006425275ba1c2b4,2018-08-15 10:10:18,12.99,12.79
4,00042b26cf59d7ce69dfabb4e55b4fd9,1,ac6c3623068f30de03045865e4e10089,df560393f3a51e74553ab94004ba5c87,2017-02-13 13:57:51,199.9,18.14


In [4]:
engine = get_engine()

def run_sql(sql_text, fetch=False):
    """Executa SQL com engine.begin() para garantir commit.
    Se fetch for True, retorna um DataFrame."""
    with engine.begin() as conn:
        if fetch:
            return pd.read_sql(text(sql_text), conn)
        else:
            conn.execute(text(sql_text))

print('Engine ready:', engine)


Engine ready: Engine(postgresql://avnadmin:***@pg-602cf04-teusmath89-cc57.h.aivencloud.com:23430/defaultdb)


## 1) STAGING — criar schema `staging` e carregar CSVs (sem transformação)

In [5]:
# Criar schema staging se não existir
run_sql("CREATE SCHEMA IF NOT EXISTS staging;")

# Enviar DataFrames para staging (usando types automáticos do pandas -> SQLAlchemy)
print('Writing df_orders to staging.orders_raw...')
df_orders.columns = [c.lower().strip().replace(' ', '_') for c in df_orders.columns]
df_items.columns = [c.lower().strip().replace(' ', '_') for c in df_items.columns]

# to_sql via engine, schema staging
# if_exists='replace' para desenvolvimento; em produção pode ser 'append'
df_orders.to_sql('orders_raw', engine, schema='staging', if_exists='replace', index=False)
print('orders_raw written')

df_items.to_sql('items_raw', engine, schema='staging', if_exists='replace', index=False)
print('items_raw written')

# Também enviar other small tables for convenience
for df, name in [(df_customers, 'customers_raw'), (df_products, 'products_raw'), (df_sellers, 'sellers_raw')]:
    df.columns = [c.lower().strip().replace(' ', '_') for c in df.columns]
    df.to_sql(name, engine, schema='staging', if_exists='replace', index=False)
    print(f'{name} written')

run_sql("SELECT COUNT(*) FROM staging.orders_raw;", fetch=False)
print('Staging load complete')


Writing df_orders to staging.orders_raw...
orders_raw written
items_raw written
customers_raw written
products_raw written
sellers_raw written
Staging load complete


## 2) OLTP — normalização (schema `oltp`)
Criaremos tabelas normalizadas removendo duplicatas e tratando nulos básicos.

In [7]:
# Criar schema oltp
run_sql('CREATE SCHEMA IF NOT EXISTS oltp;')

# 1) customers
run_sql("DROP TABLE IF EXISTS oltp.customers;")
run_sql("CREATE TABLE oltp.customers AS\nSELECT DISTINCT customer_id\nFROM staging.orders_raw\nWHERE customer_id IS NOT NULL;")
print('oltp.customers created')

# 2) products
run_sql("DROP TABLE IF EXISTS oltp.products;")
run_sql("CREATE TABLE oltp.products AS\nSELECT DISTINCT product_id\nFROM staging.items_raw\nWHERE product_id IS NOT NULL;")
print('oltp.products created')

# 3) sellers
run_sql("DROP TABLE IF EXISTS oltp.sellers;")
run_sql("CREATE TABLE oltp.sellers AS\nSELECT DISTINCT seller_id\nFROM staging.items_raw\nWHERE seller_id IS NOT NULL;")
print('oltp.sellers created')

# 4) orders
run_sql("DROP TABLE IF EXISTS oltp.orders;")
run_sql("CREATE TABLE oltp.orders AS\nSELECT DISTINCT order_id, customer_id, order_status, order_purchase_timestamp::timestamp AS order_purchase_timestamp\nFROM staging.orders_raw\nWHERE order_id IS NOT NULL;")
print('oltp.orders created')

# 5) order_items
run_sql("DROP TABLE IF EXISTS oltp.order_items;")
run_sql("CREATE TABLE oltp.order_items AS\nSELECT order_id, order_item_id::integer AS order_item_id, product_id, seller_id, shipping_limit_date::timestamp AS shipping_limit_date, price::numeric AS price, freight_value::numeric AS freight_value\nFROM staging.items_raw;")
print('oltp.order_items created')

# Quick checks
display(run_sql('SELECT COUNT(*) AS cnt_orders FROM oltp.orders', fetch=True))
display(run_sql('SELECT COUNT(*) AS cnt_orders_item FROM oltp.order_items', fetch=True))
display(run_sql('SELECT COUNT(*) AS cnt_sellers FROM oltp.sellers', fetch=True))
display(run_sql('SELECT COUNT(*) AS cnt_products FROM oltp.products', fetch=True))
display(run_sql('SELECT COUNT(*) AS cnt_customers FROM oltp.customer', fetch=True))


oltp.customers created
oltp.products created
oltp.sellers created
oltp.orders created
oltp.order_items created


Unnamed: 0,cnt_orders
0,99441


Unnamed: 0,cnt_orders_item
0,112650


Unnamed: 0,cnt_sellers
0,3095


Unnamed: 0,cnt_products
0,32951


ProgrammingError: (psycopg2.errors.UndefinedTable) relation "oltp.customer" does not exist
LINE 1: SELECT COUNT(*) AS cnt_customers FROM oltp.customer
                                              ^

[SQL: SELECT COUNT(*) AS cnt_customers FROM oltp.customer]
(Background on this error at: https://sqlalche.me/e/20/f405)

## 3) DW MODEL — criar as dimensões e fato (schema `analytics`)

In [9]:
# Criar schema analytics e tabelas DW (drop if exists para desenvolvimento)
run_sql('CREATE SCHEMA IF NOT EXISTS analytics;')

# Drop existing DW tables (caution in production)
for t in ['fact_sales','dim_date','dim_customer','dim_product','dim_seller']:
    run_sql(f"DROP TABLE IF EXISTS analytics.{t} CASCADE;")

# Create dims and fact
run_sql('''
CREATE TABLE analytics.dim_customer (
    customer_sk SERIAL PRIMARY KEY,
    customer_id TEXT UNIQUE,
    effective_from DATE,
    effective_to DATE,
    is_current BOOLEAN
);
''')

run_sql('''
CREATE TABLE analytics.dim_product (
    product_sk SERIAL PRIMARY KEY,
    product_id TEXT UNIQUE
);
''')

run_sql('''
CREATE TABLE analytics.dim_seller (
    seller_sk SERIAL PRIMARY KEY,
    seller_id TEXT UNIQUE
);
''')

run_sql('''
CREATE TABLE analytics.dim_date (
    date_sk SERIAL PRIMARY KEY,
    full_date DATE UNIQUE,
    year INT,
    month INT,
    day INT
);
''')

run_sql('''
CREATE TABLE analytics.fact_sales (
    sale_sk SERIAL PRIMARY KEY,
    order_id TEXT,
    customer_sk INT,
    product_sk INT,
    seller_sk INT,
    date_sk INT,
    price NUMERIC(10,2),
    freight_value NUMERIC(10,2),
    FOREIGN KEY(customer_sk) REFERENCES analytics.dim_customer(customer_sk),
    FOREIGN KEY(product_sk) REFERENCES analytics.dim_product(product_sk),
    FOREIGN KEY(seller_sk) REFERENCES analytics.dim_seller(seller_sk),
    FOREIGN KEY(date_sk) REFERENCES analytics.dim_date(date_sk)
);
''')

print('DW model created')

# show created tables
display(run_sql("SELECT table_name FROM information_schema.tables WHERE table_schema='analytics'", fetch=True))


DW model created


Unnamed: 0,table_name
0,df_items
1,df_orders
2,dim_customer
3,dim_date
4,dim_product
5,dim_seller
6,fact_sales


## 4) ETL LOAD — popula dimensões e fato
- Dim date via generate_series entre min/max de oltp.orders
- Dims: inserir se não existir (SCD2 simplificado)
- Fact: mapear SKs e inserir

In [10]:
# 4.1 Populate dim_date
run_sql('''
INSERT INTO analytics.dim_date(full_date, year, month, day)
SELECT date::date, EXTRACT(YEAR FROM date), EXTRACT(MONTH FROM date), EXTRACT(DAY FROM date)
FROM generate_series(
    (SELECT MIN(order_purchase_timestamp) FROM oltp.orders),
    (SELECT MAX(order_purchase_timestamp) FROM oltp.orders),
    interval '1 day'
) AS t(date)
ON CONFLICT (full_date) DO NOTHING;
''')
print('dim_date populated')

display(run_sql('SELECT COUNT(*) FROM analytics.dim_date', fetch=True))


dim_date populated


Unnamed: 0,count
0,773


In [12]:
# 4.2 Populate dim_customer (SCD2 simplified: insert new customers as current)
run_sql('''
INSERT INTO analytics.dim_customer(customer_id, effective_from, effective_to, is_current)
SELECT c.customer_id, CURRENT_DATE, NULL, TRUE
FROM oltp.customers c
LEFT JOIN analytics.dim_customer d ON d.customer_id = c.customer_id
WHERE d.customer_id IS NULL;
''')
print('dim_customer upserted')

display(run_sql('SELECT COUNT(*) FROM analytics.dim_customer', fetch=True))


dim_customer upserted


Unnamed: 0,count
0,99441


In [13]:
# 4.3 Populate dim_product and dim_seller
run_sql("""
INSERT INTO analytics.dim_product(product_id)
SELECT p.product_id FROM oltp.products p
LEFT JOIN analytics.dim_product d ON d.product_id = p.product_id
WHERE d.product_id IS NULL;

INSERT INTO analytics.dim_seller(seller_id)
SELECT s.seller_id FROM oltp.sellers s
LEFT JOIN analytics.dim_seller d ON d.seller_id = s.seller_id
WHERE d.seller_id IS NULL;
""")
print('dim_product and dim_seller populated')

display(run_sql('SELECT COUNT(*) FROM analytics.dim_product', fetch=True))
display(run_sql('SELECT COUNT(*) FROM analytics.dim_seller', fetch=True))


dim_product and dim_seller populated


Unnamed: 0,count
0,32951


Unnamed: 0,count
0,3095


In [14]:
# 4.4 Populate fact_sales (map to SKs)
# We will insert rows by joining oltp.order_items -> oltp.orders and mapping to dim SKs
insert_fact = '''
INSERT INTO analytics.fact_sales(order_id, customer_sk, product_sk, seller_sk, date_sk, price, freight_value)
SELECT
    oi.order_id,
    c.customer_sk,
    p.product_sk,
    s.seller_sk,
    d.date_sk,
    oi.price::numeric,
    oi.freight_value::numeric
FROM oltp.order_items oi
JOIN oltp.orders o ON o.order_id = oi.order_id
LEFT JOIN analytics.dim_customer c ON c.customer_id = o.customer_id AND c.is_current = TRUE
LEFT JOIN analytics.dim_product p ON p.product_id = oi.product_id
LEFT JOIN analytics.dim_seller s ON s.seller_id = oi.seller_id
LEFT JOIN analytics.dim_date d ON d.full_date = DATE(o.order_purchase_timestamp)
;'''

with engine.begin() as conn:
    conn.execute(text(insert_fact))

print('fact_sales loaded')

# quick counts
print('OLTP order_items:', run_sql('SELECT COUNT(*) FROM oltp.order_items', fetch=True).iloc[0,0])
print('DW fact_sales:', run_sql('SELECT COUNT(*) FROM analytics.fact_sales', fetch=True).iloc[0,0])


fact_sales loaded
OLTP order_items: 112650
DW fact_sales: 112650


## 5) VALIDAÇÕES — conferir contagens e integridade

In [15]:
# Validations: counts and FK null checks
print('Counts:')
print('staging.orders_raw', run_sql('SELECT COUNT(*) FROM staging.orders_raw', fetch=True).iloc[0,0])
print('oltp.orders', run_sql('SELECT COUNT(*) FROM oltp.orders', fetch=True).iloc[0,0])
print('oltp.order_items', run_sql('SELECT COUNT(*) FROM oltp.order_items', fetch=True).iloc[0,0])
print('dw.fact_sales', run_sql('SELECT COUNT(*) FROM analytics.fact_sales', fetch=True).iloc[0,0])

print('\nFK null checks:')
print('customer_sk nulls in fact:', run_sql('SELECT COUNT(*) FROM analytics.fact_sales WHERE customer_sk IS NULL', fetch=True).iloc[0,0])
print('product_sk nulls in fact:', run_sql('SELECT COUNT(*) FROM analytics.fact_sales WHERE product_sk IS NULL', fetch=True).iloc[0,0])
print('seller_sk nulls in fact:', run_sql('SELECT COUNT(*) FROM analytics.fact_sales WHERE seller_sk IS NULL', fetch=True).iloc[0,0])
print('date_sk nulls in fact:', run_sql('SELECT COUNT(*) FROM analytics.fact_sales WHERE date_sk IS NULL', fetch=True).iloc[0,0])


Counts:
staging.orders_raw 99441
oltp.orders 99441
oltp.order_items 112650
dw.fact_sales 112650

FK null checks:
customer_sk nulls in fact: 0
product_sk nulls in fact: 0
seller_sk nulls in fact: 0
date_sk nulls in fact: 0
