# AI-Driven Inventory Intelligence System  
### Demand Signal Extraction • Price Imputation • Decision Analytics (Databricks + Power BI)

This notebook builds a scalable retail analytics pipeline using the **M5 Forecasting (Accuracy)** dataset to convert raw sales + price + calendar data into **decision-ready inventory intelligence**.

**Focus Areas**
- Demand Signal Extraction (store/category behavior)
- Price Missingness Diagnostics (MNAR vs random)
- Predictive Price Imputation (ML-based)
- Gold-layer outputs for dashboarding + alerts



## Dataset & Environment
**Dataset:** M5 Forecasting – Accuracy  
**Platform:** Databricks Lakehouse  
**Tools:** PySpark • SQL • Python • Seaborn • Matplotlib

Files Used:
- calendar.csv
- sales_train_evaluation.csv
- sales_train_validation.csv
- sell_prices.csv



In [0]:
# Imports
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from functools import reduce

import matplotlib.pyplot as plt
import seaborn as sns


## Architecture (Bronze → Silver → Gold)

- **Bronze:** Raw tables (already loaded in `workspace.database.*`)
- **Silver:** Cleaned + scoped + normalized daily table with event & missingness flags
- **Gold:** Weekly aggregates + features + imputed prices + demand risk signals (Power BI ready)

**Silver** is created after the unified daily dataset is built.  
**Gold** is created after aggregation + feature engineering + modeling.


## Analytical Scope
To keep compute efficient while preserving realism:

- Stores: **CA_1, CA_2, CA_3**
- Categories: **Non-FOODS**
- Years: **2011–2013**
- Grain: **Daily** (later aggregated weekly for Gold)


In [0]:
calendar_df=spark.read.table("workspace.database.calendar")
sales_train_df=spark.read.table("workspace.database.sales_train_evaluation")
sales_val_df=spark.read.table("workspace.database.sales_train_validation")
sell_prices_df=spark.read.table("workspace.database.sell_prices")


## Calendar Data Engineering
Objective: Build a temporal backbone for accurate joins and event analysis.


In [0]:
calendar_df2 = (
    calendar_df
    .filter(F.col("year").isin([2011, 2012, 2013]))
    .drop("snap_TX", "snap_WI")
    .withColumn("date", F.to_date("date"))
    .withColumn("d_num", F.regexp_extract("d", r"d_(\d+)", 1).cast("int"))
)

display(calendar_df2.select("year").distinct().orderBy("year"))


## Item Dimension (for accurate category filtering in prices)

In [0]:
item_dim = (
    sales_train_df
    .select("item_id","store_id","cat_id","dept_id","state_id")
    .dropDuplicates(["store_id","item_id"])
)
display(item_dim.groupBy("cat_id").count().orderBy("cat_id"))

## Sell Price Conditioning
Objective: Align pricing data with selected stores and categories.


In [0]:
sell_prices_scoped = (
    sell_prices_df
    .filter(F.col("store_id").isin(["CA_1","CA_2","CA_3"]))
    .join(item_dim.select("store_id","item_id","cat_id"), on=["store_id","item_id"], how="left")
)

# QC: how many prices cannot be mapped to a category?
null_cat_before = sell_prices_scoped.filter(F.col("cat_id").isNull()).count()
print("sell_prices rows with NULL cat_id BEFORE filtering:", null_cat_before)

sell_prices_df2 = (
    sell_prices_scoped
    .filter(F.col("cat_id").isNotNull())
    .filter(F.col("cat_id") != "FOODS")
)

display(sell_prices_df2.select("store_id","cat_id").distinct().orderBy("store_id","cat_id"))


In [0]:
dup_price_keys = (
    sell_prices_df2
    .groupBy("store_id","item_id","wm_yr_wk")
    .count()
    .filter(F.col("count") > 1)
    .count()
)
print("Duplicate price keys (should be 0):", dup_price_keys)

null_sell_price_in_source = sell_prices_df2.filter(F.col("sell_price").isNull()).count()
print("Rows where sell_price is NULL in price table (should be 0 ideally):", null_sell_price_in_source)


## Sales Conditioning
Scope sales to same stores + Non-FOODS categories.

In [0]:
sales_train_df2 = (
    sales_train_df
    .filter(F.col("store_id").isin(["CA_1","CA_2","CA_3"]))
    .filter(F.col("cat_id").isNotNull())
    .filter(F.col("cat_id") != "FOODS")
)

display(sales_train_df2.groupBy("cat_id").count().orderBy("cat_id"))

## Sales Normalization (Wide → Long)
Raw sales arrives in wide format (`d_1 ... d_n`).  
We reshape to long format to enable scalable joins + feature engineering.



In [0]:

id_columns = ["id","item_id","dept_id","cat_id","store_id","state_id"]

# ordered by real date, not string "d"
days_in_scope = [
    r["d"] for r in (
        calendar_df2
        .select("d","date","d_num")
        .dropDuplicates(["d"])
        .orderBy("date","d_num")
        .collect()
    )
]

sales_day_cols = [c for c in sales_train_df2.columns if c.startswith("d_")]
days_to_keep = [d for d in days_in_scope if d in sales_day_cols]

print("Days to keep:", len(days_to_keep), "| Example:", days_to_keep[:5], "…", days_to_keep[-5:])

sales_train_filtered = sales_train_df2.select(id_columns + days_to_keep)

stack_expression = ", ".join([f"'{day}', `{day}`" for day in days_to_keep])

sales_long_df = (
    sales_train_filtered
    .selectExpr(*id_columns, f"stack({len(days_to_keep)},{stack_expression}) as (day_col, sales)")
    .withColumn("sales", F.col("sales").cast("double"))
)

display(sales_long_df.limit(5))

## Dataset Enrichment (Sales + Calendar + Prices)
Creates the unified daily dataset used for analysis, modeling, and dashboarding.



In [0]:
sales_calendar_df = (
    sales_long_df
    .join(
        calendar_df2.select(
            F.col("d").alias("day_col"),
            "date","wm_yr_wk","event_name_1","event_type_1","event_name_2","event_type_2","snap_CA",
            "d_num"
        ),
        on="day_col",
        how="left"
    )
)

display(sales_calendar_df.limit(5))

## Dataset Enrichment (Add Prices)

In [0]:
daily_sales_df = (
    sales_calendar_df
    .join(
        sell_prices_df2.select("store_id","item_id","wm_yr_wk","sell_price"),
        on=["store_id","item_id","wm_yr_wk"],
        how="left"
    )
)

display(daily_sales_df.limit(5))

##Integrity Checks (must pass before Silver/Gold)


In [0]:
unmatched_calendar = daily_sales_df.filter(F.col("date").isNull()).count()
print("Rows missing calendar match (date is null):", unmatched_calendar)

price_match_rate = (
    daily_sales_df
    .select(F.mean((F.col("sell_price").isNotNull()).cast("double")).alias("price_match_rate"))
    .collect()[0]["price_match_rate"]
)
print("Price match rate (sell_price not null):", price_match_rate)

daily_sales_df.select(
    F.mean(F.col("sell_price").isNull().cast("double")).alias("missing_price_rate")
).display()

## Silver Layer Creation (First Persistence Point)
This table is the **cleaned, scoped, enriched daily dataset**.
It becomes the foundation for Gold outputs.


In [0]:
daily_sales_df = (
    daily_sales_df
    .withColumn("missing_prices", F.when(F.col("sell_price").isNull(), 1).otherwise(0))
    .withColumn(
        "has_event",
        F.when(
            (F.col("event_name_1").isNotNull()) | (F.col("event_name_2").isNotNull()),
            1
        ).otherwise(0)
    )
    .withColumn("price_observed", (F.col("sell_price").isNotNull()).cast("int"))
)

silver_table = "workspace.database.silver_daily_sales_v2"

(
    daily_sales_df
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(silver_table)
)

print(f"✅ Silver table written: {silver_table}")

## Missing Price Diagnostics (Structural vs Random Signals)
We evaluate whether missing prices are random or behavior-driven, and whether imputation is justified.


In [0]:
daily_sales_df.groupBy("missing_prices").count().display()

total_missing = daily_sales_df.filter(F.col("missing_prices")==1).count()
total_rows = daily_sales_df.count()
print(f"Missing sell_price rows: {total_missing:,} / {total_rows:,} ({(total_missing/total_rows)*100:.2f}%)")


### Missingness over time (weekly)


In [0]:
missing_by_week = (
    daily_sales_df
    .groupBy("wm_yr_wk")
    .agg(F.mean("missing_prices").alias("missing_price_rate"))
    .orderBy("wm_yr_wk")
)

missing_by_week_pd = missing_by_week.toPandas()

plt.figure(figsize=(12,4))
sns.lineplot(data=missing_by_week_pd, x="wm_yr_wk", y="missing_price_rate")
plt.title("Missing Sell Price Rate Over Time (Weekly)")
plt.xlabel("wm_yr_wk")
plt.ylabel("Missing Price Rate")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


### Missingness by Store / Category


In [0]:
missing_by_store_pd = (
    daily_sales_df
    .groupBy("store_id")
    .agg(F.mean("missing_prices").alias("missing_price_rate"))
    .toPandas()
)

plt.figure(figsize=(6,4))
sns.barplot(data=missing_by_store_pd, x="store_id", y="missing_price_rate")
plt.title("Missing Sell Price Rate by Store")
plt.xlabel("Store")
plt.ylabel("Missing Price Rate")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

missing_by_cat_pd = (
    daily_sales_df
    .groupBy("cat_id")
    .agg(F.mean("missing_prices").alias("missing_price_rate"))
    .toPandas()
)

plt.figure(figsize=(6,4))
sns.barplot(data=missing_by_cat_pd, x="cat_id", y="missing_price_rate")
plt.title("Missing Sell Price Rate by Category")
plt.xlabel("Category")
plt.ylabel("Missing Price Rate")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

### Variability of Missingness
If variability across store/category is low, missingness is systematic rather than isolated.


In [0]:
summary_stats = (
    daily_sales_df
    .groupBy("store_id","cat_id")
    .agg(F.mean("missing_prices").alias("missing_rate"))
    .agg(
        F.mean("missing_rate").alias("mean_missing_rate"),
        F.stddev("missing_rate").alias("std_missing_rate")
    )
    .toPandas()
)
print(summary_stats)

### MNAR Check (Missing Not At Random)
We test whether missing prices associate with zero-sales behavior (a signal of non-random missingness), while also validating structural sparsity in the source price table.



In [0]:
missing_vs_sales = (
    daily_sales_df
    .groupBy("missing_prices")
    .agg(F.mean("sales").alias("mean_sales"))
    .toPandas()
)
print(missing_vs_sales)

item_store_stats = (
    daily_sales_df
    .withColumn("zero_sales", (F.col("sales") == 0).cast("int"))
    .groupBy("store_id","item_id")
    .agg(
        F.mean("missing_prices").alias("missing_rate"),
        F.mean("zero_sales").alias("zero_sales_rate"),
        F.count("*").alias("n")
    )
    .filter(F.col("n") >= 30)
)

corr_val = item_store_stats.select(F.corr("zero_sales_rate","missing_rate").alias("corr")).collect()[0]["corr"]
print("Corr(missing_rate, zero_sales_rate) across store-item (n>=30):", corr_val)


### Join Integrity Validation
We verify missing prices are not caused by join mismatches with sell_prices.


In [0]:
m = daily_sales_df.filter(F.col("missing_prices") == 1).alias("m")

p = sell_prices_df2.select(
    "store_id","item_id","wm_yr_wk",
    F.col("sell_price").alias("price_from_table")
).alias("p")

matches_any_key = m.join(p, on=["store_id","item_id","wm_yr_wk"], how="inner").count()

matches_with_price = (
    m.join(p, on=["store_id","item_id","wm_yr_wk"], how="inner")
     .filter(F.col("price_from_table").isNotNull())
     .count()
)

total_missing = m.count()
print(f"Missing rows with a key match in price table (any): {matches_any_key:,}")
print(f"Missing rows with a key match AND non-null price in price table: {matches_with_price:,}")
print(f"Missing rows absent (or null-priced) in price table: {total_missing - matches_with_price:,}")

## Conclusion: Imputation Strategy
Missing sell prices are behavior-driven (MNAR) and not caused by join issues.  
We proceed with predictive imputation using a supervised ML model trained on non-missing price records.


## Train / Validation / Test Split (for Imputation Model)
- Train/Validation: rows with observed prices  
- Test: rows with missing prices (to be imputed)


In [0]:

not_missing_df = daily_sales_df.filter(F.col("missing_prices")==0)
missing_df = daily_sales_df.filter(F.col("missing_prices")==1)

train_df, val_df = not_missing_df.randomSplit([0.8, 0.2], seed=42)
test_df = missing_df

print("Non-missing:", not_missing_df.count())
print("Missing:", missing_df.count())
print("Train:", train_df.count())
print("Validation:", val_df.count())
print("Test (to impute):", test_df.count())

## Exploratory Demand Signals


In [0]:
daily_sales_df.groupBy("store_id").agg(F.sum("sales").alias("total_sales")).display()
daily_sales_df.groupBy("cat_id").agg(F.sum("sales").alias("total_sales")).display()

✅ Output: `workspace.database.silver_daily_sales_v2` (daily, scoped, enriched, quality-checked)  
Next notebook: weekly feature engineering → ML price imputation → Gold table for Power BI
