In [6]:
from pathlib import Path
import os

print("CWD:", os.getcwd())

# Find the project root by walking upward until we see the /data folder
ROOT = None
for p in [Path.cwd(), *Path.cwd().parents]:
    if (p / "data").exists() and (p / "reports").exists():
        ROOT = p
        break

print("ROOT:", ROOT)

RAW_DIR = ROOT / "data/raw/olist/2026-01-12"
OUT_DIR = ROOT / "data/curated/olist/2026-01-12"
OUT_DIR.mkdir(parents=True, exist_ok=True)

print("RAW_DIR:", RAW_DIR)
print("Orders exists?:", (RAW_DIR / "olist_orders_dataset.csv").exists())


CWD: c:\Users\TAN\OneDrive\Cloud_KPI_Reporting_Pipeline\notebooks
ROOT: c:\Users\TAN\OneDrive\Cloud_KPI_Reporting_Pipeline
RAW_DIR: c:\Users\TAN\OneDrive\Cloud_KPI_Reporting_Pipeline\data\raw\olist\2026-01-12
Orders exists?: True


In [8]:
import pandas as pd
import numpy as np

orders = pd.read_csv(
    RAW_DIR/"olist_orders_dataset.csv",
    parse_dates=[
        "order_purchase_timestamp","order_approved_at",
        "order_delivered_carrier_date","order_delivered_customer_date",
        "order_estimated_delivery_date"
    ]
)

items = pd.read_csv(RAW_DIR/"olist_order_items_dataset.csv", parse_dates=["shipping_limit_date"])
payments = pd.read_csv(RAW_DIR/"olist_order_payments_dataset.csv")
customers = pd.read_csv(RAW_DIR/"olist_customers_dataset.csv")
products = pd.read_csv(RAW_DIR/"olist_products_dataset.csv")
trans = pd.read_csv(RAW_DIR/"product_category_name_translation.csv")

print("orders", orders.shape)
print("items", items.shape)
print("payments", payments.shape)
print("customers", customers.shape)
print("products", products.shape)
print("trans", trans.shape)
print("\norder_status counts:\n", orders["order_status"].value_counts())


orders (99441, 8)
items (112650, 7)
payments (103886, 5)
customers (99441, 5)
products (32951, 9)
trans (71, 2)

order_status counts:
 order_status
delivered      96478
shipped         1107
canceled         625
unavailable      609
invoiced         314
processing       301
created            5
approved           2
Name: count, dtype: int64


In [9]:
import pandas as pd

# date range: cover purchase + delivered + estimated dates
min_date = orders["order_purchase_timestamp"].dt.date.min()

max_date = pd.concat([
    orders["order_purchase_timestamp"].dropna(),
    orders["order_delivered_customer_date"].dropna(),
    orders["order_estimated_delivery_date"].dropna()
]).dt.date.max()

date_range = pd.date_range(min_date, max_date, freq="D")

dim_date = pd.DataFrame({"date": date_range})
dim_date["date_id"] = dim_date["date"].dt.strftime("%Y%m%d").astype(int)
dim_date["year"] = dim_date["date"].dt.year
dim_date["quarter"] = dim_date["date"].dt.quarter
dim_date["month_num"] = dim_date["date"].dt.month
dim_date["month_name"] = dim_date["date"].dt.strftime("%B")
dim_date["year_month"] = dim_date["date"].dt.strftime("%Y-%m")
dim_date["week_num"] = dim_date["date"].dt.isocalendar().week.astype(int)
dim_date["day_of_week_num"] = dim_date["date"].dt.dayofweek + 1  # Mon=1
dim_date["day_of_week_name"] = dim_date["date"].dt.strftime("%A")
dim_date["is_weekend"] = dim_date["day_of_week_num"].isin([6, 7]).astype(int)

dim_date = dim_date[[
    "date_id","date","year","quarter","month_num","month_name","year_month",
    "week_num","day_of_week_num","day_of_week_name","is_weekend"
]]

dim_date.to_csv(OUT_DIR/"dim_date.csv", index=False)

print("dim_date saved:", dim_date.shape)
print("min/max:", dim_date["date"].min().date(), "→", dim_date["date"].max().date())


dim_date saved: (800, 11)
min/max: 2016-09-04 → 2018-11-12


In [10]:
# Build dim_product by joining products to translation table
dim_product = products.merge(trans, on="product_category_name", how="left")

# Rename for curated naming
dim_product = dim_product.rename(columns={
    "product_category_name_english": "product_category_name_en"
})

# Keep only curated columns
dim_product = dim_product[[
    "product_id",
    "product_category_name",
    "product_category_name_en",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm"
]]

dim_product.to_csv(OUT_DIR/"dim_product.csv", index=False)

coverage = dim_product["product_category_name_en"].notna().mean()

print("dim_product saved:", dim_product.shape)
print("EN category coverage:", round(coverage*100, 2), "%")
print("missing EN categories:", dim_product["product_category_name_en"].isna().sum())


dim_product saved: (32951, 7)
EN category coverage: 98.11 %
missing EN categories: 623


In [11]:
# Map orders -> customers so we can pick one row per customer_unique_id
cust_orders = orders[["customer_id", "order_purchase_timestamp"]].merge(
    customers, on="customer_id", how="left"
)

# Latest purchase rule: sort ascending and take the last row per customer_unique_id
cust_orders = cust_orders.sort_values("order_purchase_timestamp")
dim_customer = cust_orders.groupby("customer_unique_id", as_index=False).tail(1)

dim_customer = dim_customer[[
    "customer_unique_id",
    "customer_id",
    "customer_zip_code_prefix",
    "customer_city",
    "customer_state"
]]

dim_customer.to_csv(OUT_DIR/"dim_customer.csv", index=False)

print("dim_customer saved:", dim_customer.shape)
print("unique customer_unique_id:", dim_customer["customer_unique_id"].nunique())
print("null customer_state:", dim_customer["customer_state"].isna().sum())


dim_customer saved: (96096, 5)
unique customer_unique_id: 96096
null customer_state: 0


In [13]:
import numpy as np
import pandas as pd

# 1) Aggregate payments to order_id
pay_agg = payments.groupby("order_id", as_index=False).agg(
    payment_total=("payment_value", "sum"),
    payment_count=("payment_value", "size")
)

# 2) Build base fact from orders + customer_unique_id
fact_orders = orders.merge(
    customers[["customer_id", "customer_unique_id"]],
    on="customer_id",
    how="left"
).merge(
    pay_agg,
    on="order_id",
    how="left"
)

# 3) Patch edge case: delivered orders missing payment_total -> fill with sum(price+freight)
items_tmp = items.copy()
items_tmp["item_value"] = items_tmp["price"] + items_tmp["freight_value"]
item_sum = items_tmp.groupby("order_id", as_index=False).agg(item_value_sum=("item_value", "sum"))

mask_missing_pay = (fact_orders["order_status"] == "delivered") & (fact_orders["payment_total"].isna())

fact_orders = fact_orders.merge(item_sum, on="order_id", how="left")
fact_orders.loc[mask_missing_pay, "payment_total"] = fact_orders.loc[mask_missing_pay, "item_value_sum"]
fact_orders.loc[mask_missing_pay, "payment_count"] = 0  # 0 = patched/unknown payment rows
fact_orders = fact_orders.drop(columns=["item_value_sum"])

# 4) Date IDs
fact_orders["purchase_date_id"] = fact_orders["order_purchase_timestamp"].dt.strftime("%Y%m%d").astype(int)

# delivered_date_id as nullable integer (Int64)
fact_orders["delivered_date_id"] = fact_orders["order_delivered_customer_date"].dt.strftime("%Y%m%d")
fact_orders.loc[fact_orders["order_delivered_customer_date"].isna(), "delivered_date_id"] = pd.NA
fact_orders["delivered_date_id"] = fact_orders["delivered_date_id"].astype("Int64")


# 5) Flags + delivery metrics
fact_orders["is_delivered"] = (fact_orders["order_status"] == "delivered").astype(int)
fact_orders["is_canceled_or_unavailable"] = fact_orders["order_status"].isin(["canceled", "unavailable"]).astype(int)

fact_orders["delivery_days"] = (
    (fact_orders["order_delivered_customer_date"] - fact_orders["order_purchase_timestamp"])
    .dt.total_seconds() / 86400
)

fact_orders["on_time_flag"] = np.where(
    (fact_orders["order_delivered_customer_date"].notna()) & (fact_orders["order_estimated_delivery_date"].notna()),
    (fact_orders["order_delivered_customer_date"] <= fact_orders["order_estimated_delivery_date"]).astype(int),
    np.nan
)

# 6) Select curated columns + export
fact_orders = fact_orders[[
    "order_id", "customer_unique_id", "order_status",
    "order_purchase_timestamp", "order_approved_at",
    "order_delivered_carrier_date", "order_delivered_customer_date",
    "order_estimated_delivery_date",
    "purchase_date_id", "delivered_date_id",
    "is_delivered", "is_canceled_or_unavailable",
    "delivery_days", "on_time_flag",
    "payment_total", "payment_count"
]]

fact_orders.to_csv(OUT_DIR/"fact_orders.csv", index=False)

# 7) Print checkpoints
patched_count = mask_missing_pay.sum()
still_missing = ((fact_orders["order_status"] == "delivered") & (fact_orders["payment_total"].isna())).sum()

print("fact_orders saved:", fact_orders.shape)
print("patched delivered orders (missing payments):", int(patched_count))
print("delivered orders still missing payment_total:", int(still_missing))
print("null customer_unique_id:", int(fact_orders["customer_unique_id"].isna().sum()))


fact_orders saved: (99441, 16)
patched delivered orders (missing payments): 1
delivered orders still missing payment_total: 0
null customer_unique_id: 0


In [14]:
import numpy as np

fact_order_items = items.copy()

# Derived item value
fact_order_items["item_value"] = fact_order_items["price"] + fact_order_items["freight_value"]

# Join payment_total from fact_orders for allocation
fact_order_items = fact_order_items.merge(
    fact_orders[["order_id", "payment_total"]],
    on="order_id",
    how="left"
)

# Compute item share within each order
order_item_total = fact_order_items.groupby("order_id")["item_value"].transform("sum").replace({0: np.nan})
fact_order_items["order_item_value_share"] = fact_order_items["item_value"] / order_item_total

# Allocate revenue back down to items
fact_order_items["allocated_revenue"] = fact_order_items["payment_total"] * fact_order_items["order_item_value_share"]

# Keep curated columns
fact_order_items = fact_order_items[[
    "order_id", "order_item_id", "product_id",
    "shipping_limit_date", "price", "freight_value",
    "item_value", "order_item_value_share", "allocated_revenue"
]]

fact_order_items.to_csv(OUT_DIR/"fact_order_items.csv", index=False)

# Checkpoint: do shares sum to ~1 per order?
share_sum = fact_order_items.groupby("order_id")["order_item_value_share"].sum()
ok_rate = share_sum.between(0.999, 1.001).mean()

print("fact_order_items saved:", fact_order_items.shape)
print("share_sum ~1 rate:", round(ok_rate*100, 2), "%")
print("null allocated_revenue:", int(fact_order_items["allocated_revenue"].isna().sum()))


fact_order_items saved: (112650, 9)
share_sum ~1 rate: 100.0 %
null allocated_revenue: 0


In [15]:
from pathlib import Path

REPORT_DIR = ROOT / "reports" / "data_quality"
REPORT_DIR.mkdir(parents=True, exist_ok=True)

run_date = "2026-01-12"
report_path = REPORT_DIR / f"data_quality_report_{run_date}.md"

# Raw counts
raw_counts = {
    "orders": len(orders),
    "order_items": len(items),
    "order_payments": len(payments),
    "customers": len(customers),
    "products": len(products),
    "translation": len(trans),
}

# Curated counts
curated_counts = {
    "dim_date": len(dim_date),
    "dim_customer": len(dim_customer),
    "dim_product": len(dim_product),
    "fact_orders": len(fact_orders),
    "fact_order_items": len(fact_order_items),
}

# BLOCKER checks
blockers = []
blockers.append(("fact_orders order_id unique", fact_orders["order_id"].is_unique))
blockers.append(("fact_orders order_id not null", fact_orders["order_id"].notna().all()))
blockers.append(("fact_orders customer_unique_id not null", fact_orders["customer_unique_id"].notna().all()))
blockers.append(("fact_orders purchase_timestamp not null", fact_orders["order_purchase_timestamp"].notna().all()))
blockers.append(("fact_order_items (order_id, order_item_id) unique",
                 fact_order_items.set_index(["order_id","order_item_id"]).index.is_unique))
blockers.append(("fact_order_items product_id not null", fact_order_items["product_id"].notna().all()))
blockers.append(("fact_order_items price >= 0", (fact_order_items["price"] >= 0).all()))
blockers.append(("fact_order_items freight_value >= 0", (fact_order_items["freight_value"] >= 0).all()))

delivered_missing = ((fact_orders["order_status"]=="delivered") & (fact_orders["payment_total"].isna())).sum()
blockers.append(("delivered orders payment_total present", delivered_missing == 0))

cust_set = set(dim_customer["customer_unique_id"])
blockers.append(("RI: fact_orders.customer_unique_id in dim_customer",
                 fact_orders["customer_unique_id"].isin(cust_set).all()))

prod_set = set(dim_product["product_id"])
blockers.append(("RI: fact_order_items.product_id in dim_product",
                 fact_order_items["product_id"].isin(prod_set).all()))

# WARNING checks
coverage_en = dim_product["product_category_name_en"].notna().mean()
share_sum = fact_order_items.groupby("order_id")["order_item_value_share"].sum()
share_ok_rate = share_sum.between(0.999, 1.001).mean()

overall_pass = all(ok for _, ok in blockers)

patched_count = int(((orders["order_status"]=="delivered")
                     .values).sum())  # not used; keep report note below

def pct(x): 
    return f"{x*100:.2f}%"

with open(report_path, "w", encoding="utf-8") as f:
    f.write(f"# Data Quality Report — {run_date}\n\n")
    f.write("## Run Metadata\n")
    f.write(f"- Snapshot: data/raw/olist/2026-01-12/\n")
    f.write(f"- Curated output: data/curated/olist/2026-01-12/\n")
    f.write(f"- Overall status: {'PASS' if overall_pass else 'FAIL'}\n\n")

    f.write("## Row Counts\n")
    f.write("| Table | Rows |\n|---|---:|\n")
    for k,v in raw_counts.items():
        f.write(f"| raw.{k} | {v} |\n")
    for k,v in curated_counts.items():
        f.write(f"| curated.{k} | {v} |\n")
    f.write("\n")

    f.write("## BLOCKER Checks\n")
    f.write("| Check | Result |\n|---|---|\n")
    for name, ok in blockers:
        f.write(f"| {name} | {'PASS' if ok else 'FAIL'} |\n")
    f.write("\n")

    f.write("## WARNING Checks\n")
    f.write("| Check | Value |\n|---|---|\n")
    f.write(f"| Category EN coverage | {pct(coverage_en)} |\n")
    f.write(f"| Share sum ≈ 1 (rate) | {pct(share_ok_rate)} |\n\n")

    f.write("## Notes\n")
    f.write("- 1 delivered order had missing payment rows; payment_total was patched using SUM(price + freight) for that order.\n")

print("Wrote DQ report:", report_path)
print("Overall:", "PASS" if overall_pass else "FAIL")


Wrote DQ report: c:\Users\TAN\OneDrive\Cloud_KPI_Reporting_Pipeline\reports\data_quality\data_quality_report_2026-01-12.md
Overall: PASS
