In [1]:
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

# Clean any libpq overrides that force TCP/login as 'postgres'
load_dotenv(override=True)
for k in ("PGHOST","PGUSER","PGPASSWORD","PGPORT","PGDATABASE"):
    os.environ.pop(k, None)

DSN = "postgresql+psycopg2://twh@/olist?host=/tmp"   # Homebrew default socket
engine = create_engine(DSN, future=True)

with engine.connect() as conn:
    print(conn.execute(text("select current_user, current_database()")).all())


[('twh', 'olist')]


In [2]:
from pathlib import Path
schema_path = Path("/Users/twh/Desktop/store data proj/olist/sql/01_schema.sql")  # adjust if your path differs
assert schema_path.exists(), schema_path
with engine.begin() as conn:
    conn.exec_driver_sql(schema_path.read_text())

import pandas as pd
pd.read_sql("select schemaname, tablename from pg_tables where schemaname='olist' order by 2", engine)


NameError: name 'engine' is not defined

In [5]:
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

DSN = "postgresql+psycopg2://twh@/olist?host=/tmp" 
engine = create_engine(DSN, future=True)
pd.read_sql("select schemaname, tablename from pg_tables where schemaname='olist' order by 2", engine)

pd.read_sql("""
SELECT 'dim_customer' AS tbl, COUNT(*)::bigint AS rows FROM olist.dim_customer
UNION ALL SELECT 'dim_product', COUNT(*) FROM olist.dim_product
UNION ALL SELECT 'dim_seller', COUNT(*) FROM olist.dim_seller
UNION ALL SELECT 'dim_date', COUNT(*) FROM olist.dim_date
UNION ALL SELECT 'fact_order_item', COUNT(*) FROM olist.fact_order_item
UNION ALL SELECT 'fact_payment', COUNT(*) FROM olist.fact_payment
UNION ALL SELECT 'fact_review', COUNT(*) FROM olist.fact_review
ORDER BY 1;
""", engine)


Unnamed: 0,tbl,rows
0,dim_customer,99441
1,dim_date,1314
2,dim_product,32951
3,dim_seller,3095
4,fact_order_item,112650
5,fact_payment,103886
6,fact_review,98410


In [11]:
from sqlalchemy import text
with engine.begin() as conn:
    conn.execute(text("TRUNCATE olist.dim_customer RESTART IDENTITY CASCADE"))


In [7]:
import pandas as pd

csv = "/Users/twh/Desktop/store data proj/data/olist_customers_dataset.csv"
df_cust = pd.read_csv(csv, usecols=["customer_id","customer_unique_id","customer_city","customer_state"]) \
            .rename(columns={"customer_city":"city","customer_state":"state"}) \
            .drop_duplicates("customer_id")

with engine.begin() as conn:
    df_cust.to_sql("dim_customer", conn, schema="olist", if_exists="append", index=False, method="multi", chunksize=10000)

len(df_cust)


99441

In [None]:
import pandas as pd, numpy as np
from sqlalchemy import text

p = "/Users/twh/Desktop/store data proj/data"
customers = pd.read_csv("/Users/twh/Desktop/store data proj/data/olist_customers_dataset.csv")
orders    = pd.read_csv(f"{p}/olist_orders_dataset.csv", parse_dates=[
    "order_purchase_timestamp","order_approved_at",
    "order_delivered_carrier_date","order_delivered_customer_date",
    "order_estimated_delivery_date"
])
items     = pd.read_csv(f"{p}/olist_order_items_dataset.csv", parse_dates=["shipping_limit_date"])
payments  = pd.read_csv(f"{p}/olist_order_payments_dataset.csv")
reviews   = pd.read_csv(f"{p}/olist_order_reviews_dataset.csv", parse_dates=[
    "review_creation_date","review_answer_timestamp"
])
products  = pd.read_csv(f"{p}/olist_products_dataset.csv")
catmap    = pd.read_csv(f"{p}/product_category_name_translation.csv")
sellers   = pd.read_csv(f"{p}/olist_sellers_dataset.csv")

# products + category translation
products = products.merge(catmap, how="left",
                          left_on="product_category_name",
                          right_on="product_category_name").rename(
    columns={"product_category_name":"category_name",
             "product_category_name_english":"category_name_english"}
)

# ---- build dim_date from min/max over all date cols ----
date_series = []
for df, cols in [(orders, ["order_purchase_timestamp","order_approved_at",
                           "order_delivered_carrier_date","order_delivered_customer_date",
                           "order_estimated_delivery_date"]),
                 (items, ["shipping_limit_date"]),
                 (reviews, ["review_creation_date","review_answer_timestamp"])]:
    for c in cols:
        if c in df.columns:
            date_series.append(pd.to_datetime(df[c]).dropna())

if date_series:
    dmin = min(s.min() for s in date_series).normalize()
    dmax = max(s.max() for s in date_series).normalize()
    dates = pd.date_range(dmin, dmax, freq="D")
    dim_date = pd.DataFrame({"date_key": dates})
    dim_date["year"]    = dim_date["date_key"].dt.year
    dim_date["quarter"] = dim_date["date_key"].dt.quarter
    dim_date["month"]   = dim_date["date_key"].dt.month
    dim_date["day"]     = dim_date["date_key"].dt.day
    dim_date["dow"]     = dim_date["date_key"].dt.weekday
else:
    dim_date = pd.DataFrame(columns=["date_key","year","quarter","month","day","dow"])

# ---- dims ----
dim_customer = customers.rename(columns={"customer_city":"city","customer_state":"state"})[
    ["customer_id","customer_unique_id","city","state"]
].drop_duplicates("customer_id")

dim_seller = sellers.rename(columns={"seller_city":"city","seller_state":"state"})[
    ["seller_id","city","state"]
].drop_duplicates("seller_id")

dim_product = products[[
    "product_id","category_name","category_name_english",
    "product_weight_g","product_length_cm","product_height_cm","product_width_cm"
]].rename(columns={
    "product_weight_g":"weight_g","product_length_cm":"length_cm",
    "product_height_cm":"height_cm","product_width_cm":"width_cm"
}).drop_duplicates("product_id")

with engine.begin() as conn:
    # upserts naive: rely on empty tables (fresh load). For re-runs, you'd stage & merge.
    dim_date.to_sql("dim_date", conn, schema="olist", if_exists="append", index=False, method="multi")
    dim_customer.to_sql("dim_customer", conn, schema="olist", if_exists="append", index=False, method="multi")
    dim_seller.to_sql("dim_seller", conn, schema="olist", if_exists="append", index=False, method="multi")
    dim_product.to_sql("dim_product", conn, schema="olist", if_exists="append", index=False, method="multi")

# --- FIXED key map helper + rebuild maps ---

def key_map(schema: str, table: str, key_col: str, natural_col: str):
    q = f'SELECT {key_col} AS sk, {natural_col} AS nk FROM {schema}.{table};'
    df = pd.read_sql(q, engine)
    # dict: natural key -> surrogate key
    return dict(zip(df["nk"], df["sk"]))

cust_map   = key_map("olist", "dim_customer", "customer_key", "customer_id")
seller_map = key_map("olist", "dim_seller",   "seller_key",   "seller_id")
prod_map   = key_map("olist", "dim_product",  "product_key",  "product_id")

# quick sanity checks
print("cust_map:", len(cust_map), "seller_map:", len(seller_map), "prod_map:", len(prod_map))


# ---- fact_order_item ----
o = orders.rename(columns={"order_purchase_timestamp":"order_purchase_date"})
fact = items.merge(
    o[["order_id","customer_id","order_purchase_date","order_approved_at",
       "order_delivered_carrier_date","order_delivered_customer_date",
       "order_estimated_delivery_date"]],
    on="order_id", how="left"
)

fact["customer_key"] = fact["customer_id"].map(cust_map)
fact["seller_key"]   = fact["seller_id"].map(seller_map)
fact["product_key"]  = fact["product_id"].map(prod_map)

# Ensure dates are pure dates (match dim_date PK)
for c in ["order_purchase_date","order_approved_at","order_delivered_carrier_date",
          "order_delivered_customer_date","order_estimated_delivery_date","shipping_limit_date"]:
    if c in fact.columns:
        fact[c] = pd.to_datetime(fact[c]).dt.date

# Olist has no discount at item level → revenue ≈ price
fact["revenue"] = fact["price"].fillna(0)

fact_items = fact[[
    "order_id","order_item_id","customer_key","seller_key","product_key",
    "order_purchase_date","order_approved_at","order_delivered_carrier_date",
    "order_delivered_customer_date","order_estimated_delivery_date","shipping_limit_date",
    "price","freight_value","revenue"
]]

with engine.begin() as conn:
    fact_items.to_sql("fact_order_item", conn, schema="olist", if_exists="append",
                      index=False, method="multi", chunksize=10000)

# ---- payments & reviews ----
with engine.begin() as conn:
    payments.to_sql("fact_payment", conn, schema="olist", if_exists="append",
                    index=False, method="multi", chunksize=10000)

    rv = reviews[["review_id","order_id","review_score","review_creation_date","review_answer_timestamp"]]
    # cast to date to match schema
    for c in ["review_creation_date","review_answer_timestamp"]:
        rv[c] = pd.to_datetime(rv[c]).dt.date
    rv.to_sql("fact_review", conn, schema="olist", if_exists="append",
              index=False, method="multi", chunksize=10000)

"ETL done"


In [None]:


def key_map(schema: str, table: str, key_col: str, natural_col: str):
    q = f'SELECT {key_col} AS sk, {natural_col} AS nk FROM {schema}.{table};'
    df = pd.read_sql(q, engine)
    # dict: natural key -> surrogate key
    return dict(zip(df["nk"], df["sk"]))

cust_map   = key_map("olist", "dim_customer", "customer_key", "customer_id")
seller_map = key_map("olist", "dim_seller",   "seller_key",   "seller_id")
prod_map   = key_map("olist", "dim_product",  "product_key",  "product_id")

# quick sanity checks
print("cust_map:", len(cust_map), "seller_map:", len(seller_map), "prod_map:", len(prod_map))


# ---- fact_order_item ----
o = orders.rename(columns={"order_purchase_timestamp":"order_purchase_date"})
fact = items.merge(
    o[["order_id","customer_id","order_purchase_date","order_approved_at",
       "order_delivered_carrier_date","order_delivered_customer_date",
       "order_estimated_delivery_date"]],
    on="order_id", how="left"
)

fact["customer_key"] = fact["customer_id"].map(cust_map)
fact["seller_key"]   = fact["seller_id"].map(seller_map)
fact["product_key"]  = fact["product_id"].map(prod_map)

# Ensure dates are pure dates (match dim_date PK)
for c in ["order_purchase_date","order_approved_at","order_delivered_carrier_date",
          "order_delivered_customer_date","order_estimated_delivery_date","shipping_limit_date"]:
    if c in fact.columns:
        fact[c] = pd.to_datetime(fact[c]).dt.date

# Olist has no discount at item level → revenue ≈ price
fact["revenue"] = fact["price"].fillna(0)

fact_items = fact[[
    "order_id","order_item_id","customer_key","seller_key","product_key",
    "order_purchase_date","order_approved_at","order_delivered_carrier_date",
    "order_delivered_customer_date","order_estimated_delivery_date","shipping_limit_date",
    "price","freight_value","revenue"
]]

with engine.begin() as conn:
    fact_items.to_sql("fact_order_item", conn, schema="olist", if_exists="append",
                      index=False, method="multi", chunksize=10000)

# ---- payments & reviews ----
with engine.begin() as conn:
    payments.to_sql("fact_payment", conn, schema="olist", if_exists="append",
                    index=False, method="multi", chunksize=10000)

    rv = reviews[["review_id","order_id","review_score","review_creation_date","review_answer_timestamp"]]
    # cast to date to match schema
    for c in ["review_creation_date","review_answer_timestamp"]:
        rv[c] = pd.to_datetime(rv[c]).dt.date
    rv.to_sql("fact_review", conn, schema="olist", if_exists="append",
              index=False, method="multi", chunksize=10000)

"ETL done"


In [18]:
import pandas as pd
pd.read_sql("""
SELECT 'dim_customer' AS tbl, COUNT(*)::bigint FROM olist.dim_customer
UNION ALL SELECT 'dim_product', COUNT(*) FROM olist.dim_product
UNION ALL SELECT 'dim_seller', COUNT(*) FROM olist.dim_seller
UNION ALL SELECT 'dim_date', COUNT(*) FROM olist.dim_date
UNION ALL SELECT 'fact_order_item', COUNT(*) FROM olist.fact_order_item
UNION ALL SELECT 'fact_payment', COUNT(*) FROM olist.fact_payment
UNION ALL SELECT 'fact_review', COUNT(*) FROM olist.fact_review
ORDER BY 1;
""", engine)


Unnamed: 0,tbl,count
0,dim_customer,99441
1,dim_date,1314
2,dim_product,32951
3,dim_seller,3095
4,fact_order_item,112650
5,fact_payment,103886
6,fact_review,98410


In [16]:

payments = pd.read_csv(f"{p}/olist_order_payments_dataset.csv")

# keep only modeled columns & ensure uniqueness on the PK
pay = payments[["order_id","payment_sequential","payment_type","payment_installments","payment_value"]]\
         .drop_duplicates(["order_id","payment_sequential"])

with engine.begin() as conn:
    pay.to_sql("fact_payment", conn, schema="olist", if_exists="append",
               index=False, method="multi", chunksize=50000)

len(pay)

103886

In [17]:
reviews = pd.read_csv(
    f"{p}/olist_order_reviews_dataset.csv",
    parse_dates=["review_creation_date","review_answer_timestamp"]
)

rv = reviews[["review_id","order_id","review_score","review_creation_date","review_answer_timestamp"]]\
       .drop_duplicates("review_id")

# cast to DATE to match schema + FK to dim_date
rv["review_creation_date"]   = pd.to_datetime(rv["review_creation_date"]).dt.date
rv["review_answer_timestamp"] = pd.to_datetime(rv["review_answer_timestamp"]).dt.date

with engine.begin() as conn:
    rv.to_sql("fact_review", conn, schema="olist", if_exists="append",
              index=False, method="multi", chunksize=50000)

len(rv)


98410

In [20]:

# Payment mix
pd.read_sql("""
SELECT payment_type, COUNT(*) AS payments, SUM(payment_value) AS value
FROM olist.fact_payment
GROUP BY 1
ORDER BY value DESC;
""", engine)

# Review score distribution
pd.read_sql("""
SELECT review_score, COUNT(*) AS reviews
FROM olist.fact_review
GROUP BY 1
ORDER BY 1;
""", engine)

# Monthly revenue & orders
pd.read_sql("""
SELECT date_trunc('month', order_purchase_date)::date AS month,
       SUM(revenue) AS revenue,
       COUNT(DISTINCT order_id) AS orders
FROM olist.fact_order_item
GROUP BY 1 ORDER BY 1;
""", engine)


Unnamed: 0,month,revenue,orders
0,2016-09-01,267.36,3
1,2016-10-01,49507.66,308
2,2016-12-01,10.9,1
3,2017-01-01,120312.87,789
4,2017-02-01,247303.02,1733
5,2017-03-01,374344.3,2641
6,2017-04-01,359927.23,2391
7,2017-05-01,506071.14,3660
8,2017-06-01,433038.6,3217
9,2017-07-01,498031.48,3969


In [24]:
import pandas as pd
df = pd.read_sql("""
SELECT date_trunc('month', order_purchase_date)::date AS month,
       SUM(revenue) AS revenue,
       COUNT(DISTINCT order_id) AS orders
FROM olist.fact_order_item
GROUP BY 1 ORDER BY 1;
""", engine)
with pd.ExcelWriter("/Users/twh/Desktop/store data proj/reports/monthly_sales.xlsx") as xw:
    df.to_excel(xw, index=False, sheet_name="Revenue by Month")
"Saved reports/monthly_sales.xlsx"


'Saved reports/monthly_sales.xlsx'