In [1]:
# Hello guys here it is an END-TO-END LAKEHOUSE PROJECT
# ETL -> Bronze/Silver/Gold -> Star Schema (DuckDB) -> KPI "Dashboard" -> ML Forecast
# Data is synthetic (safe & reproducible)


!pip -q install duckdb pyarrow fastparquet scikit-learn plotly ipywidgets

import os, json, math, random
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import duckdb

from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.ensemble import RandomForestRegressor

import plotly.express as px
import plotly.graph_objects as go
import ipywidgets as widgets
from IPython.display import display, Markdown

SEED = 42
random.seed(SEED)
np.random.seed(SEED)

BASE_DIR = "/content/lakehouse_e2e_portfolio"
RAW_DIR  = os.path.join(BASE_DIR, "bronze_raw")
SILVER_DIR = os.path.join(BASE_DIR, "silver_clean")
GOLD_DIR = os.path.join(BASE_DIR, "gold_star_schema")

os.makedirs(RAW_DIR, exist_ok=True)
os.makedirs(SILVER_DIR, exist_ok=True)
os.makedirs(GOLD_DIR, exist_ok=True)

display(Markdown("## ‚úÖ Environment ready"))


[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.8/1.8 MB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.6/1.6 MB[0m [31m27.6 MB/s[0m eta [36m0:00:00[0m
[?25h

## ‚úÖ Environment ready

In [2]:
# ===============================
# 1) EXTRACT (simulate ingestion from multiple sources)
# - Products (master data), Stores, Calendar, Promotions, Sales transactions
# ===============================

def generate_products(n=120):
    categories = ["Beverages", "Snacks", "Dairy", "Personal Care", "Household"]
    brands = ["Aurum", "Nova", "PrimeCo", "GreenLeaf", "Skyline", "Urban", "Sol"]
    rows = []
    for pid in range(1, n+1):
        cat = random.choice(categories)
        brand = random.choice(brands)
        cost = round(np.clip(np.random.normal(8, 4), 1.5, 40), 2)
        margin = np.clip(np.random.normal(0.35, 0.12), 0.10, 0.70)
        price = round(cost * (1 + margin), 2)
        rows.append([pid, f"SKU-{pid:04d}", cat, brand, cost, price])
    return pd.DataFrame(rows, columns=["product_id","sku","category","brand","unit_cost","unit_price"])

def generate_stores(n=18):
    regions = ["SP", "RJ", "MG", "PR", "SC", "RS", "GO", "BA"]
    channels = ["Retail", "Wholesale"]
    rows = []
    for sid in range(1, n+1):
        region = random.choice(regions)
        channel = np.random.choice(channels, p=[0.8, 0.2])
        size = np.random.choice(["S","M","L"], p=[0.35,0.45,0.20])
        rows.append([sid, f"Store-{sid:03d}", region, channel, size])
    return pd.DataFrame(rows, columns=["store_id","store_name","region","channel","store_size"])

def generate_calendar(start="2024-01-01", days=400):
    start_dt = datetime.fromisoformat(start)
    rows = []
    for i in range(days):
        d = start_dt + timedelta(days=i)
        rows.append([d.date().isoformat(), d.year, d.month, d.isocalendar().week, d.weekday(), int(d.weekday()>=5)])
    return pd.DataFrame(rows, columns=["date","year","month","week","weekday","is_weekend"])

def generate_promotions(calendar_df, products_df, stores_df, promo_rate=0.10):
    rows = []
    for _, r in calendar_df.iterrows():
        date = r["date"]
        if random.random() < promo_rate:
            for _ in range(random.randint(3, 10)):
                pid = int(random.choice(products_df["product_id"].values))
                sid = int(random.choice(stores_df["store_id"].values))
                discount = round(np.random.choice([0.05,0.10,0.15,0.20,0.25]), 2)
                rows.append([date, sid, pid, discount])
    return pd.DataFrame(rows, columns=["date","store_id","product_id","discount"])

def generate_sales(calendar_df, products_df, stores_df, promos_df):
    size_factor = {"S":0.8, "M":1.0, "L":1.25}
    cat_factor = {"Beverages":1.15, "Snacks":1.10, "Dairy":0.95, "Personal Care":0.75, "Household":0.85}

    promo_map = promos_df.set_index(["date","store_id","product_id"])["discount"].to_dict()

    rows = []
    for _, drow in calendar_df.iterrows():
        date = drow["date"]
        is_weekend = int(drow["is_weekend"])
        season = 1.0 + 0.15*math.sin(2*math.pi*(int(drow["month"])-1)/12)

        for _ in range(800):
            pid = int(random.choice(products_df["product_id"].values))
            sid = int(random.choice(stores_df["store_id"].values))

            prod = products_df.loc[products_df["product_id"]==pid].iloc[0]
            store = stores_df.loc[stores_df["store_id"]==sid].iloc[0]

            base = 1.2 * cat_factor[prod["category"]] * size_factor[store["store_size"]] * season
            base *= (1.12 if is_weekend else 1.0)

            disc = promo_map.get((date, sid, pid), 0.0)
            lam = max(0.1, base * (1.0 + disc*2.2))

            units = np.random.poisson(lam=lam)
            if units == 0:
                continue

            revenue = units * float(prod["unit_price"]) * (1 - disc)
            rows.append([date, sid, pid, disc, int(units), round(float(revenue), 2)])

    return pd.DataFrame(rows, columns=["date","store_id","product_id","discount","units","revenue"])

products = generate_products()
stores = generate_stores()
calendar = generate_calendar()
promos = generate_promotions(calendar, products, stores)
sales = generate_sales(calendar, products, stores, promos)

display(products.head())
display(stores.head())
display(sales.head())
display(Markdown(f"**Rows:** sales={len(sales):,} promos={len(promos):,}"))


Unnamed: 0,product_id,sku,category,brand,unit_cost,unit_price
0,1,SKU-0001,Beverages,Aurum,9.99,13.32
1,2,SKU-0002,Dairy,Nova,10.59,16.23
2,3,SKU-0003,Snacks,Nova,7.06,9.33
3,4,SKU-0004,Beverages,Urban,14.32,20.65
4,5,SKU-0005,Household,Aurum,6.12,8.66


Unnamed: 0,store_id,store_name,region,channel,store_size
0,1,Store-001,RS,Retail,S
1,2,Store-002,RJ,Retail,L
2,3,Store-003,PR,Wholesale,M
3,4,Store-004,SC,Retail,S
4,5,Store-005,BA,Retail,S


Unnamed: 0,date,store_id,product_id,discount,units,revenue
0,2024-01-01,6,33,0.0,3,51.06
1,2024-01-01,1,15,0.0,1,7.36
2,2024-01-01,8,46,0.0,2,24.6
3,2024-01-01,4,68,0.0,1,6.64
4,2024-01-01,3,96,0.0,2,18.04


**Rows:** sales=215,085 promos=285

In [3]:
# ===============================
# 2) BRONZE (raw parquet)
# ===============================
products.to_parquet(os.path.join(RAW_DIR, "products.parquet"), index=False)
stores.to_parquet(os.path.join(RAW_DIR, "stores.parquet"), index=False)
calendar.to_parquet(os.path.join(RAW_DIR, "calendar.parquet"), index=False)
promos.to_parquet(os.path.join(RAW_DIR, "promotions.parquet"), index=False)
sales.to_parquet(os.path.join(RAW_DIR, "sales.parquet"), index=False)

display(Markdown("## ‚úÖ BRONZE layer saved (raw parquet)"))


## ‚úÖ BRONZE layer saved (raw parquet)

In [4]:
# ===============================
# 3) SILVER (clean + data quality checks)
# ===============================
def dq_check(df, rules: dict, df_name="df"):
    issues = []
    for rule_name, rule_fn in rules.items():
        ok, msg = rule_fn(df)
        if not ok:
            issues.append(f"{df_name} | {rule_name}: {msg}")
    return issues

products_r = pd.read_parquet(os.path.join(RAW_DIR, "products.parquet"))
stores_r = pd.read_parquet(os.path.join(RAW_DIR, "stores.parquet"))
calendar_r = pd.read_parquet(os.path.join(RAW_DIR, "calendar.parquet"))
promos_r = pd.read_parquet(os.path.join(RAW_DIR, "promotions.parquet"))
sales_r = pd.read_parquet(os.path.join(RAW_DIR, "sales.parquet"))

sales_s = sales_r.copy()
sales_s["date"] = pd.to_datetime(sales_s["date"])
sales_s["units"] = sales_s["units"].astype(int)
sales_s["revenue"] = sales_s["revenue"].astype(float)
sales_s["discount"] = sales_s["discount"].astype(float)

issues = []
issues += dq_check(products_r, {
    "unique_product_id": lambda d: (d["product_id"].is_unique, "product_id not unique"),
    "no_null_sku": lambda d: (d["sku"].notna().all(), "null sku found"),
}, "products")

issues += dq_check(stores_r, {
    "unique_store_id": lambda d: (d["store_id"].is_unique, "store_id not unique"),
}, "stores")

issues += dq_check(sales_s, {
    "no_negative_units": lambda d: ((d["units"] >= 0).all(), "negative units"),
    "valid_discount": lambda d: (((d["discount"] >= 0) & (d["discount"] <= 0.8)).all(), "discount out of range"),
    "date_not_null": lambda d: (d["date"].notna().all(), "null date"),
}, "sales")

if issues:
    display(Markdown("### ‚ùå Data Quality Issues"))
    for i in issues:
        display(Markdown(f"- {i}"))
    raise ValueError("Data quality checks failed.")
else:
    display(Markdown("### ‚úÖ Data Quality Checks: PASSED"))

products_r.to_parquet(os.path.join(SILVER_DIR, "products_clean.parquet"), index=False)
stores_r.to_parquet(os.path.join(SILVER_DIR, "stores_clean.parquet"), index=False)
calendar_r.to_parquet(os.path.join(SILVER_DIR, "calendar_clean.parquet"), index=False)
promos_r.to_parquet(os.path.join(SILVER_DIR, "promotions_clean.parquet"), index=False)
sales_s.to_parquet(os.path.join(SILVER_DIR, "sales_clean.parquet"), index=False)

display(Markdown("## ‚úÖ SILVER layer saved (clean parquet)"))


### ‚úÖ Data Quality Checks: PASSED

## ‚úÖ SILVER layer saved (clean parquet)

In [5]:
# ===============================
# 4) GOLD (star schema) + DuckDB Warehouse
# ===============================
con = duckdb.connect(database=os.path.join(BASE_DIR, "warehouse.duckdb"))

con.execute(f"CREATE OR REPLACE VIEW products AS SELECT * FROM read_parquet('{SILVER_DIR}/products_clean.parquet');")
con.execute(f"CREATE OR REPLACE VIEW stores AS SELECT * FROM read_parquet('{SILVER_DIR}/stores_clean.parquet');")
con.execute(f"CREATE OR REPLACE VIEW calendar AS SELECT * FROM read_parquet('{SILVER_DIR}/calendar_clean.parquet');")
con.execute(f"CREATE OR REPLACE VIEW promotions AS SELECT * FROM read_parquet('{SILVER_DIR}/promotions_clean.parquet');")
con.execute(f"CREATE OR REPLACE VIEW sales AS SELECT * FROM read_parquet('{SILVER_DIR}/sales_clean.parquet');")

con.execute("""
CREATE OR REPLACE TABLE dim_product AS
SELECT product_id, sku, category, brand, unit_cost, unit_price
FROM products;
""")

con.execute("""
CREATE OR REPLACE TABLE dim_store AS
SELECT store_id, store_name, region, channel, store_size
FROM stores;
""")

con.execute("""
CREATE OR REPLACE TABLE dim_date AS
SELECT CAST(date AS DATE) AS date, year, month, week, weekday, is_weekend
FROM calendar;
""")

con.execute("""
CREATE OR REPLACE TABLE fact_sales AS
SELECT
  CAST(s.date AS DATE) AS date,
  s.store_id,
  s.product_id,
  COALESCE(p.discount, 0.0) AS discount,
  SUM(s.units) AS units,
  SUM(s.revenue) AS revenue
FROM sales s
LEFT JOIN promotions p
  ON CAST(s.date AS DATE) = CAST(p.date AS DATE)
 AND s.store_id = p.store_id
 AND s.product_id = p.product_id
GROUP BY 1,2,3,4;
""")

for t in ["dim_product","dim_store","dim_date","fact_sales"]:
    con.execute(f"COPY {t} TO '{GOLD_DIR}/{t}.parquet' (FORMAT PARQUET);")

display(Markdown("## ‚úÖ GOLD star schema created + loaded into DuckDB"))
display(con.execute("SELECT COUNT(*) AS rows_fact_sales FROM fact_sales;").df())


## ‚úÖ GOLD star schema created + loaded into DuckDB

Unnamed: 0,rows_fact_sales
0,190296


In [6]:
# ===============================
# 5) KPI layer (SQL as BI consumption)
# ===============================
kpis_monthly = con.execute("""
SELECT
  d.year,
  d.month,
  SUM(f.revenue) AS total_revenue,
  SUM(f.units) AS total_units,
  AVG(f.discount) AS avg_discount
FROM fact_sales f
JOIN dim_date d ON f.date = d.date
GROUP BY 1,2
ORDER BY 1,2;
""").df()

display(kpis_monthly.head())
fig = px.line(kpis_monthly, x="month", y="total_revenue", color="year", title="Revenue Trend (Monthly)")
fig.show()


Unnamed: 0,year,month,total_revenue,total_units,avg_discount
0,2024,1,299096.42,28201.0,9.5e-05
1,2024,2,302984.8,28448.0,4.5e-05
2,2024,3,341473.99,32304.0,4.8e-05
3,2024,4,331660.03,31521.0,1e-05
4,2024,5,340726.78,32267.0,3e-06


In [8]:
# ===============================
# 6) ML: Forecast daily demand by store/category (time-based validation)
# ===============================
daily = con.execute("""
SELECT
  f.date,
  s.store_id,
  s.region,
  s.store_size,
  p.category,
  SUM(f.units) AS units,
  SUM(f.revenue) AS revenue,
  AVG(f.discount) AS avg_discount,
  d.weekday,
  d.is_weekend,
  d.month
FROM fact_sales f
JOIN dim_store s ON f.store_id = s.store_id
JOIN dim_product p ON f.product_id = p.product_id
JOIN dim_date d ON f.date = d.date
GROUP BY 1,2,3,4,5,9,10,11
ORDER BY 1;
""").df()



In [9]:
daily["date"] = pd.to_datetime(daily["date"])
daily["y"] = daily["units"]

daily = daily.sort_values(["store_id","category","date"]).reset_index(drop=True)
for lag in [1,7,14]:
    daily[f"lag_{lag}"] = daily.groupby(["store_id","category"])["y"].shift(lag)

daily["rolling_7"] = daily.groupby(["store_id","category"])["y"].shift(1).rolling(7).mean()
daily["rolling_14"] = daily.groupby(["store_id","category"])["y"].shift(1).rolling(14).mean()

model_df = daily.dropna().copy()

X = model_df[["store_id","region","store_size","category","avg_discount","weekday","is_weekend","month","lag_1","lag_7","lag_14","rolling_7","rolling_14"]]
y = model_df["y"]

X_enc = pd.get_dummies(X, columns=["region","store_size","category"], drop_first=False)

cutoff = model_df["date"].max() - pd.Timedelta(days=30)
train_idx = model_df["date"] <= cutoff
test_idx  = model_df["date"] > cutoff

X_train, y_train = X_enc.loc[train_idx], y.loc[train_idx]
X_test, y_test   = X_enc.loc[test_idx],  y.loc[test_idx]


In [11]:
model = RandomForestRegressor(
    n_estimators=250,
    random_state=SEED,
    max_depth=18,
    min_samples_leaf=2,
    n_jobs=-1
)
model.fit(X_train, y_train)

pred = model.predict(X_test)
mae = mean_absolute_error(y_test, pred)
rmse = mean_squared_error(y_test, pred) ** 0.5

display(Markdown(f"### ‚úÖ ML Evaluation (time-based holdout)\n- **MAE:** {mae:.2f}\n- **RMSE:** {rmse:.2f}"))

### ‚úÖ ML Evaluation (time-based holdout)
- **MAE:** 3.87
- **RMSE:** 5.04

In [12]:
# ===============================
# 7) Notebook dashboard (interactive)
# ===============================
dash_df = daily.dropna(subset=["y"]).copy()
dash_df["date"] = pd.to_datetime(dash_df["date"])

stores_list = sorted(dash_df["store_id"].unique().tolist())
categories_list = sorted(dash_df["category"].unique().tolist())

store_dd = widgets.Dropdown(options=stores_list, value=stores_list[0], description="Store:")
cat_dd   = widgets.Dropdown(options=categories_list, value=categories_list[0], description="Category:")
metric_dd = widgets.Dropdown(options=["units","revenue","avg_discount"], value="units", description="Metric:")

out = widgets.Output()

def render_dashboard(store_id, category, metric):
    with out:
        out.clear_output()
        df = dash_df[(dash_df["store_id"]==store_id) & (dash_df["category"]==category)].copy()

        last_30 = df[df["date"] > (df["date"].max() - pd.Timedelta(days=30))]
        kpi_units = int(last_30["units"].sum())
        kpi_rev   = float(last_30["revenue"].sum())
        kpi_disc  = float(last_30["avg_discount"].mean())

        display(Markdown(f"""
### üìä Executive Dashboard (Notebook)
**Store:** {store_id} | **Category:** {category}

**Last 30 days KPIs**
- **Units:** {kpi_units:,}
- **Revenue:** ${kpi_rev:,.2f}
- **Avg Discount:** {kpi_disc:.2%}
        """))

        if metric == "avg_discount":
            agg = df.groupby("date", as_index=False)["avg_discount"].mean()
            ycol = "avg_discount"
            title = "Average Discount (Daily)"
        else:
            agg = df.groupby("date", as_index=False)[metric].sum()
            ycol = metric
            title = f"{metric.title()} (Daily)"

        px.line(agg, x="date", y=ycol, title=title).show()

        model_view = model_df[(model_df["store_id"]==store_id) & (model_df["category"]==category)].copy()
        if len(model_view) > 0:
            Xv = model_view[["store_id","region","store_size","category","avg_discount","weekday","is_weekend","month","lag_1","lag_7","lag_14","rolling_7","rolling_14"]]
            Xv_enc = pd.get_dummies(Xv, columns=["region","store_size","category"], drop_first=False)

            for col in X_train.columns:
                if col not in Xv_enc.columns:
                    Xv_enc[col] = 0
            Xv_enc = Xv_enc[X_train.columns]

            model_view["pred_units"] = model.predict(Xv_enc)
            last = model_view[model_view["date"] > (model_view["date"].max() - pd.Timedelta(days=30))]

            if len(last) > 0:
                fig2 = go.Figure()
                fig2.add_trace(go.Scatter(x=last["date"], y=last["y"], mode="lines", name="Actual Units"))
                fig2.add_trace(go.Scatter(x=last["date"], y=last["pred_units"], mode="lines", name="Predicted Units"))
                fig2.update_layout(title="ML Forecast Overlay (Last 30 days)")
                fig2.show()

render_btn = widgets.Button(description="Update Dashboard", button_style="primary")
render_btn.on_click(lambda _: render_dashboard(store_dd.value, cat_dd.value, metric_dd.value))

display(widgets.HBox([store_dd, cat_dd, metric_dd, render_btn]))
display(out)
render_dashboard(store_dd.value, cat_dd.value, metric_dd.value)


HBox(children=(Dropdown(description='Store:', options=(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, ‚Ä¶

Output()

In [13]:
# ===============================
# 8) Portfolio deliverables (exports + data catalog)
# ===============================
kpis_monthly.to_csv(os.path.join(BASE_DIR, "kpis_monthly.csv"), index=False)

pred_out = model_df.loc[test_idx, ["date","store_id","category","y"]].copy()
pred_out["pred_units"] = model.predict(X_enc.loc[test_idx])
pred_out.to_csv(os.path.join(BASE_DIR, "ml_predictions_last_30_days.csv"), index=False)

catalog = {
  "layers": {"bronze": RAW_DIR, "silver": SILVER_DIR, "gold": GOLD_DIR},
  "tables": {
    "fact_sales": "Grain: date, store_id, product_id, discount. Measures: units, revenue.",
    "dim_product": "Product attributes: category, brand, pricing.",
    "dim_store": "Store attributes: region, channel, store_size.",
    "dim_date": "Calendar attributes: year, month, week, weekday, is_weekend."
  },
  "dq_checks": [
    "Unique IDs in dimensions",
    "No negative units",
    "Discount in valid range",
    "Non-null date"
  ],
  "ml": {
    "task": "Forecast daily demand (units) by store/category",
    "model": "RandomForestRegressor",
    "validation": "Time-based holdout (last 30 days)",
    "metrics": {"MAE": float(mae), "RMSE": float(rmse)}
  }
}
with open(os.path.join(BASE_DIR, "data_catalog.json"), "w") as f:
    json.dump(catalog, f, indent=2)

display(Markdown("## ‚úÖ Exports created"))
display(Markdown(f"- `{BASE_DIR}/kpis_monthly.csv`\n- `{BASE_DIR}/ml_predictions_last_30_days.csv`\n- `{BASE_DIR}/data_catalog.json`\n- Star schema parquet files in `{GOLD_DIR}`\n- DuckDB warehouse: `{BASE_DIR}/warehouse.duckdb`"))


## ‚úÖ Exports created

- `/content/lakehouse_e2e_portfolio/kpis_monthly.csv`
- `/content/lakehouse_e2e_portfolio/ml_predictions_last_30_days.csv`
- `/content/lakehouse_e2e_portfolio/data_catalog.json`
- Star schema parquet files in `/content/lakehouse_e2e_portfolio/gold_star_schema`
- DuckDB warehouse: `/content/lakehouse_e2e_portfolio/warehouse.duckdb`