# AI-Driven Inventory Intelligence System — Gold Layer
### Weekly Demand Signals • ML Price Imputation • Decision-Ready Gold Table

This notebook builds the **Gold analytical layer** of a retail intelligence pipeline using the **M5 Forecasting (Accuracy)** dataset.  
It transforms a cleaned Silver daily dataset into **weekly demand signals**, trains a supervised ML model to **impute structurally missing prices**, and produces a **Power BI–ready Gold table**.

---

## Objectives
- Align daily sales to **weekly price grain**
- Engineer **temporal demand signals** (lags, rolling averages, volatility)
- Train **Gradient Boosted Trees** on observed prices
- Impute missing prices with distributional sanity checks
- Persist a **complete, decision-ready Gold dataset**


## Environment & Tools

**Platform:** Databricks Lakehouse  
**Language:** PySpark / Python  
**Libraries:** Spark ML, Seaborn, Matplotlib  
**Dataset:** M5 Forecasting – Accuracy  

**Upstream Dependency:**  
`workspace.database.silver_daily_sales_v2`  
(Daily, scoped, quality-checked dataset from Notebook 1)


In [0]:
#Imports
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

## Step 1 — Load Silver Layer

We begin from the persisted Silver dataset rather than recomputing raw joins.  
This enforces **data layer separation** and mirrors real production pipelines.


In [0]:
#Load silver table
silver_df=spark.read.table("workspace.database.silver_daily_sales_v2")
display(silver_df.limit(5))

In [0]:
silver_df.select(
    F.count("*").alias("rows"),
    F.countDistinct("store_id").alias("stores"),
    F.countDistinct("item_id").alias("items"),
    F.countDistinct("wm_yr_wk").alias("weeks")
).display()

display(silver_df.limit(5))

## Step 2 — Daily → Weekly Aggregation

Prices in the M5 dataset are recorded at the **weekly grain**.  
We aggregate daily sales and event signals to weekly level to ensure:
- Target alignment for ML
- Reduced noise
- Faster model training


In [0]:
weekly_df = (
    silver_df
    .groupBy("store_id","item_id","cat_id","dept_id","wm_yr_wk")
    .agg(
        F.sum(F.col("sales")).alias("weekly_sales"),
        F.max(F.col("sell_price")).alias("sell_price"),
        F.max(F.col("missing_prices")).alias("missing_prices"),
        F.max(F.col("price_observed")).alias("price_observed"),
        F.mean(F.col("has_event")).alias("event_ratio"),
        F.mean(F.col("snap_CA")).alias("snap_ratio")
    )
)

weekly_df.select(
    F.count("*").alias("weekly_rows"),
    F.mean(F.col("missing_prices").cast("double")).alias("weekly_missing_rate")
).display()

display(weekly_df.limit(10))


## Step 3 — Temporal Demand Signal Engineering

We create lagged and rolling demand indicators to capture:
- Momentum (`lag_sales_1`, `lag_sales_2`)
- Trend (`roll_avg_sales_4`)
- Volatility (`roll_std_sales_4`)
- Last known price (`lag_price_1`)

These features allow the model to infer price behavior from historical demand patterns.


In [0]:
w = Window.partitionBy("store_id","item_id").orderBy("wm_yr_wk")

weekly_feat = (
    weekly_df
    .withColumn("lag_sales_1", F.lag("weekly_sales", 1).over(w))
    .withColumn("lag_sales_2", F.lag("weekly_sales", 2).over(w))
    .withColumn("roll_avg_sales_4", F.avg("weekly_sales").over(w.rowsBetween(-4, 0)))
    .withColumn("roll_std_sales_4", F.stddev("weekly_sales").over(w.rowsBetween(-4, 0)))
    # helpful baseline feature: last observed price (will be null where unavailable)
    .withColumn("lag_price_1", F.lag("sell_price", 1).over(w))
)

# Fill ONLY feature nulls (safe)
weekly_feat = weekly_feat.fillna({
    "lag_sales_1": 0,
    "lag_sales_2": 0,
    "roll_avg_sales_4": 0,
    "roll_std_sales_4": 0,
    "lag_price_1": 0
})

display(weekly_feat.limit(10))


## Step 4 — Weekly Train vs Impute Split

- **Train Set:** Weeks with observed prices  
- **Impute Set:** Weeks where prices are structurally missing

This avoids leakage and ensures the model learns only from real price records.


In [0]:
train_weekly = weekly_feat.filter(F.col("missing_prices") == 0).filter(F.col("sell_price").isNotNull())
impute_weekly = weekly_feat.filter(F.col("missing_prices") == 1)

print("Train weekly:", train_weekly.count())
print("Impute weekly:", impute_weekly.count())


In [0]:
store_item_median = (
    train_weekly
    .groupBy("store_id","item_id")
    .agg(F.expr("percentile_approx(sell_price, 0.5)").alias("median_price_store_item"))
)

store_dept_median = (
    train_weekly
    .groupBy("store_id","dept_id")
    .agg(F.expr("percentile_approx(sell_price, 0.5)").alias("median_price_store_dept"))
)

train_split, val_split = train_weekly.randomSplit([0.8, 0.2], seed=42)

baseline_val = (
    val_split
    .join(store_item_median, ["store_id","item_id"], "left")
    .join(store_dept_median, ["store_id","dept_id"], "left")
    .withColumn(
        "baseline_price",
        F.when(F.col("lag_price_1") > 0, F.col("lag_price_1"))
         .when(F.col("median_price_store_item").isNotNull(), F.col("median_price_store_item"))
         .otherwise(F.col("median_price_store_dept"))
    )
)


## Step 5 — Baseline Heuristic

Before ML, we establish a baseline using:
1. Last observed price (`lag_price_1`)
2. Median price per store-item
3. Median price per store-department

This proves whether ML meaningfully improves over simple heuristics.


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

mae_eval = RegressionEvaluator(labelCol="sell_price", predictionCol="baseline_price", metricName="mae")
rmse_eval = RegressionEvaluator(labelCol="sell_price", predictionCol="baseline_price", metricName="rmse")

print("Baseline MAE:", mae_eval.evaluate(baseline_val))
print("Baseline RMSE:", rmse_eval.evaluate(baseline_val))


## Step 6 — Supervised ML Price Imputation

**Model:** Gradient Boosted Trees (GBT)  
**Why GBT?**
- Handles non-linear relationships
- Robust to outliers
- Performs well on tabular retail data

**Features Used**
- Weekly demand metrics
- Event and SNAP signals
- Historical price
- Encoded categorical store/category/department


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import GBTRegressor

indexers = [
    StringIndexer(inputCol="store_id", outputCol="store_idx", handleInvalid="keep"),
    StringIndexer(inputCol="cat_id", outputCol="cat_idx", handleInvalid="keep"),
    StringIndexer(inputCol="dept_id", outputCol="dept_idx", handleInvalid="keep"),
]

assembler = VectorAssembler(
    inputCols=[
        "weekly_sales",
        "lag_sales_1","lag_sales_2",
        "roll_avg_sales_4","roll_std_sales_4",
        "event_ratio","snap_ratio",
        "lag_price_1",
        "store_idx","cat_idx","dept_idx"   # ✅ use indexes, not onehot vectors
    ],
    outputCol="features",
    handleInvalid="keep"
)

gbt = GBTRegressor(
    labelCol="sell_price",
    featuresCol="features",
    maxDepth=5,
    maxIter=40,
    stepSize=0.1,
    subsamplingRate=0.8
)

pipeline = Pipeline(stages=indexers + [assembler, gbt])
gbt_model = pipeline.fit(train_split)

In [0]:
val_pred = gbt_model.transform(val_split)

pred_col = "prediction"
print("GBT MAE:", RegressionEvaluator(labelCol="sell_price", predictionCol=pred_col, metricName="mae").evaluate(val_pred))
print("GBT RMSE:", RegressionEvaluator(labelCol="sell_price", predictionCol=pred_col, metricName="rmse").evaluate(val_pred))

## Step 7 — Predict Missing Prices

The trained model generates price predictions for structurally missing weeks.  
These predictions are merged with observed prices to create a **complete price series**.


In [0]:
impute_pred = gbt_model.transform(impute_weekly)

imputed_prices = impute_pred.select(
    "store_id","item_id","wm_yr_wk",
    F.col("prediction").alias("imputed_sell_price")
)

display(imputed_prices.limit(10))


## Step 8 — Gold Layer Assembly

The final Gold dataset includes:
- Weekly demand signals
- Observed or imputed prices
- No remaining null values
- Power BI-ready schema


In [0]:
gold_weekly = (
    weekly_feat
    .select(
        "store_id","item_id","dept_id","cat_id","wm_yr_wk",
        "weekly_sales","sell_price","missing_prices",
        "event_ratio","snap_ratio",
        "lag_sales_1","lag_sales_2","roll_avg_sales_4","roll_std_sales_4","lag_price_1"
    )
    .join(imputed_prices, on=["store_id","item_id","wm_yr_wk"], how="left")
    .select(
        "store_id","item_id","dept_id","cat_id","wm_yr_wk",
        "weekly_sales","sell_price","missing_prices",
        "imputed_sell_price",
        "event_ratio","snap_ratio",
        "lag_sales_1","lag_sales_2","roll_avg_sales_4","roll_std_sales_4","lag_price_1"
    )
    .withColumn(
        "final_sell_price",
        F.when(F.col("sell_price").isNotNull(), F.col("sell_price")).otherwise(F.col("imputed_sell_price"))
    )
)


## Step 9 — Persist Gold Table

The Gold table is written as a versioned Delta table to support:
- Dashboarding
- Alerting
- Historical reproducibility


In [0]:
gold_table = "workspace.database.gold_weekly_inventory_intelligence_v1"

(
    gold_weekly
    .write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .format("delta")
    .saveAsTable(gold_table)
)

print("✅ Gold table written:", gold_table)


✅ Gold table created: `workspace.database.gold_weekly_inventory_intelligence_v1`  
Recommended Power BI fields: `store_id`, `cat_id`, `dept_id`, `wm_yr_wk`, `weekly_sales`, `final_sell_price`


## Step 10 — Distributional Validation

To ensure statistical credibility, we compare:
- Mean price
- Standard deviation
- Percentiles (P10, P50, P90)
- Coverage rates

The imputed distribution aligns closely with observed prices, with slightly tighter tails — an expected behavior for tree-based models.


In [0]:
# Distribution comparison: observed vs imputed
gold_weekly.select(
    F.mean("sell_price").alias("mean_observed_price"),
    F.mean("imputed_sell_price").alias("mean_imputed_price"),
    F.mean("final_sell_price").alias("mean_final_price")
).display()

# Coverage
gold_weekly.select(
    F.mean(F.col("sell_price").isNotNull().cast("double")).alias("observed_rate"),
    F.mean(F.col("imputed_sell_price").isNotNull().cast("double")).alias("imputed_rate")
).display()


In [0]:
gold_weekly.select(
    F.stddev("sell_price").alias("std_observed"),
    F.stddev("imputed_sell_price").alias("std_imputed")
).display()


In [0]:
gold_weekly.selectExpr(
  "percentile_approx(sell_price, array(0.1,0.5,0.9)) as observed_p10_p50_p90",
  "percentile_approx(imputed_sell_price, array(0.1,0.5,0.9)) as imputed_p10_p50_p90"
).display()


If GBT MAE/RMSE improves over baseline, the ML model adds value beyond heuristic filling.


## Optional Step — Business Guardrail (Clipping)

For dashboard consumption, we apply a light guardrail to avoid implausible sub-$0.50 prices.

- **v1** = raw ML output (research/reproducibility)
- **v2** = business-ready output (clipped)


In [0]:
# =========================================
# Optional Business Guardrail (Dashboard-Ready)
# =========================================

gold_weekly_clipped = gold_weekly.withColumn(
    "final_sell_price",
    F.when(F.col("final_sell_price") < 0.5, 0.5).otherwise(F.col("final_sell_price"))
)

gold_table_v2 = "workspace.database.gold_weekly_inventory_intelligence_v2"

(
    gold_weekly_clipped
    .write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .format("delta")
    .saveAsTable(gold_table_v2)
)

print("✅ Gold table written (clipped):", gold_table_v2)


In [0]:
gold_weekly_clipped.select(
    F.min("final_sell_price").alias("min_final_price"),
    F.expr("percentile_approx(final_sell_price, 0.01)").alias("p01_final_price")
).display()


## Final Outcome

- **Weekly Rows:** ~740k  
- **Price Null Rate:** 0%  
- **Observed Coverage:** ~66%  
- **Imputed Coverage:** ~34%  
- **Distribution Alignment:** Healthy and plausible  

### Deliverable
`workspace.database.gold_weekly_inventory_intelligence_v1`

This dataset is fully complete, distribution-checked, and ready for:
- Business dashboards
- Demand forecasting
- Pricing optimization
- Decision analytics

---

### Pipeline Summary
Bronze → Silver → **Gold + ML Imputation**

This notebook demonstrates an end-to-end **Lakehouse + Machine Learning pipeline** suitable for real-world retail analytics.
