In [36]:
# %pip install statsmodels

# Supply Chain ETL with Star Schema (Updated)

This notebook/script:

1. **Reads** the CSV data for Sales, Inventory, Suppliers, and Purchase Orders.
2. **Cleans** and **validates** data:
   - Checks for missing values in primary key fields.
   - Ensures uniqueness for single-column primary keys (Sale_ID, Supplier_ID, Order_ID).
   - Resolves duplicates in Inventory (composite key of Product_ID, Store_ID, Warehouse_ID) by keeping only the latest Last_Updated record.
3. **Constructs a Star Schema**:
   - **Dimension Tables**: 
     - dim_products
     - dim_suppliers
     - dim_stores
     - dim_warehouses
     - dim_dates
   - **Fact Tables**: 
     - fact_sales
     - fact_inventory
     - fact_purchase_orders
4. **Loads** these dimension and fact tables into MySQL using SQLAlchemy.

In [None]:

# %% [code]
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import pymysql

# =============== 1. Extraction ===============

sales_file = "sales_data-2.csv"
inventory_file = "inventory_data.csv"
suppliers_file = "suppliers_data.csv"
purchase_orders_file = "purchase_orders_data.csv"

# Read CSV files
sales_df = pd.read_csv(sales_file, parse_dates=["Sale_Date"])
inventory_df = pd.read_csv(inventory_file, parse_dates=["Last_Updated"])
suppliers_df = pd.read_csv(suppliers_file)
purchase_orders_df = pd.read_csv(purchase_orders_file, parse_dates=["Order_Date", "Arrival_Date"])


## 2. Data Quality Checks

In [38]:
# --- 2.1 Check for missing values in critical columns ---
print("Missing values in Sales Data:\n", sales_df.isnull().sum(), "\n")
print("Missing values in Inventory Data:\n", inventory_df.isnull().sum(), "\n")
print("Missing values in Suppliers Data:\n", suppliers_df.isnull().sum(), "\n")
print("Missing values in Purchase Orders Data:\n", purchase_orders_df.isnull().sum(), "\n")

# Drop rows missing primary key fields
sales_df.dropna(subset=["Sale_ID"], inplace=True)
suppliers_df.dropna(subset=["Supplier_ID"], inplace=True)
purchase_orders_df.dropna(subset=["Order_ID"], inplace=True)
inventory_df.dropna(subset=["Product_ID", "Store_ID", "Warehouse_ID"], inplace=True)

# --- 2.2 Ensure single-column primary keys are unique (Sales, Suppliers, Purchase Orders) ---
if sales_df["Sale_ID"].duplicated().any():
    print("Duplicate Sale_ID found. Keeping first occurrence.")
    sales_df.drop_duplicates(subset=["Sale_ID"], keep="first", inplace=True)

if suppliers_df["Supplier_ID"].duplicated().any():
    print("Duplicate Supplier_ID found. Keeping first occurrence.")
    suppliers_df.drop_duplicates(subset=["Supplier_ID"], keep="first", inplace=True)

if purchase_orders_df["Order_ID"].duplicated().any():
    print("Duplicate Order_ID found. Keeping first occurrence.")
    purchase_orders_df.drop_duplicates(subset=["Order_ID"], keep="first", inplace=True)

# --- 2.3 Resolve composite key duplicates in Inventory ---
# (Product_ID, Store_ID, Warehouse_ID) must be unique; keep the latest Last_Updated
inventory_df.sort_values(by="Last_Updated", ascending=False, inplace=True)
inventory_df.drop_duplicates(subset=["Product_ID", "Store_ID", "Warehouse_ID"], keep="first", inplace=True)


Missing values in Sales Data:
 Sale_ID          0
Product_ID       0
Store_ID         0
Sale_Date        0
Quantity_Sold    0
Revenue          0
dtype: int64 

Missing values in Inventory Data:
 Product_ID       0
Store_ID         0
Warehouse_ID     0
Stock_Level      0
Reorder_Level    0
Last_Updated     0
dtype: int64 

Missing values in Suppliers Data:
 Supplier_ID         0
Supplier_Name       0
Product_ID          0
Lead_Time (days)    0
Order_Frequency     0
dtype: int64 

Missing values in Purchase Orders Data:
 Order_ID        0
Product_ID      0
Supplier_ID     0
Order_Date      0
Quantity        0
Arrival_Date    0
dtype: int64 



## 3. Star Schema Construction

In [39]:
# --- 3.1 Dimension: dim_products ---
# Collect unique Product_ID from all tables referencing products
all_product_ids = set(sales_df["Product_ID"].dropna().unique()) \
    .union(inventory_df["Product_ID"].dropna().unique()) \
    .union(purchase_orders_df["Product_ID"].dropna().unique()) \
    .union(suppliers_df["Product_ID"].dropna().unique())

dim_products = pd.DataFrame({"Product_ID": sorted(all_product_ids)})
dim_products["product_key"] = range(1, len(dim_products) + 1)
dim_products = dim_products[["product_key", "Product_ID"]].copy()

# --- 3.2 Dimension: dim_suppliers ---
# Group by Supplier_ID to ensure one row per supplier
suppliers_gb = suppliers_df.groupby("Supplier_ID", as_index=False).agg({
    "Supplier_Name": "first",
    "Lead_Time (days)": "first",
    "Order_Frequency": "first"
})
dim_suppliers = suppliers_gb.copy()
dim_suppliers["supplier_key"] = range(1, len(dim_suppliers) + 1)
dim_suppliers = dim_suppliers[[
    "supplier_key",
    "Supplier_ID",
    "Supplier_Name",
    "Lead_Time (days)",
    "Order_Frequency"
]].copy()

# --- 3.3 Dimension: dim_stores ---
# Gather unique Store_IDs from sales and inventory
all_store_ids = set(sales_df["Store_ID"].dropna().unique()) \
    .union(inventory_df["Store_ID"].dropna().unique())

dim_stores = pd.DataFrame({"Store_ID": sorted(all_store_ids)})
dim_stores["store_key"] = range(1, len(dim_stores) + 1)
dim_stores = dim_stores[["store_key", "Store_ID"]].copy()

# --- 3.4 Dimension: dim_warehouses ---
# Gather unique Warehouse_ID from inventory
all_warehouse_ids = set(inventory_df["Warehouse_ID"].dropna().unique())
dim_warehouses = pd.DataFrame({"Warehouse_ID": sorted(all_warehouse_ids)})
dim_warehouses["warehouse_key"] = range(1, len(dim_warehouses) + 1)
dim_warehouses = dim_warehouses[["warehouse_key", "Warehouse_ID"]].copy()

# --- 3.5 Dimension: dim_dates ---
# Collect all date columns: Sale_Date, Last_Updated, Order_Date, Arrival_Date
dates_sales = sales_df["Sale_Date"].dropna().unique()
dates_inv = inventory_df["Last_Updated"].dropna().unique()
dates_po_order = purchase_orders_df["Order_Date"].dropna().unique()
dates_po_arrival = purchase_orders_df["Arrival_Date"].dropna().unique()

all_dates = pd.Series(list(dates_sales) + list(dates_inv) + list(dates_po_order) + list(dates_po_arrival)).unique()
all_dates = pd.to_datetime(all_dates)
all_dates = sorted(all_dates)

dim_dates = pd.DataFrame({"date": all_dates})
dim_dates["date_key"] = range(1, len(dim_dates) + 1)
dim_dates["year"] = dim_dates["date"].dt.year
dim_dates["month"] = dim_dates["date"].dt.month
dim_dates["day"] = dim_dates["date"].dt.day
dim_dates = dim_dates[["date_key", "date", "year", "month", "day"]].copy()

# =============== Helper Functions for Surrogate Key Mapping ===============

def map_product_key(df, product_id_col):
    return pd.merge(
        df, 
        dim_products, 
        how="left", 
        left_on=product_id_col, 
        right_on="Product_ID"
    )

def map_supplier_key(df, supplier_id_col):
    return pd.merge(
        df,
        dim_suppliers,
        how="left",
        left_on=supplier_id_col,
        right_on="Supplier_ID"
    )

def map_store_key(df, store_id_col):
    return pd.merge(
        df,
        dim_stores,
        how="left",
        left_on=store_id_col,
        right_on="Store_ID"
    )

def map_warehouse_key(df, warehouse_id_col):
    return pd.merge(
        df,
        dim_warehouses,
        how="left",
        left_on=warehouse_id_col,
        right_on="Warehouse_ID"
    )

def map_date_key(df, date_col):
    # merges on exact match of date
    return pd.merge(
        df,
        dim_dates,
        how="left",
        left_on=date_col,
        right_on="date"
    )


## 4. Build Fact Tables

In [40]:
# --- 4.1 fact_sales ---
# Original columns: [Sale_ID, Product_ID, Store_ID, Sale_Date, Quantity_Sold, Revenue]
fact_sales = sales_df.copy()

fact_sales = map_product_key(fact_sales, "Product_ID")
fact_sales = map_store_key(fact_sales, "Store_ID")
fact_sales = map_date_key(fact_sales, "Sale_Date")

fact_sales = fact_sales[[
    "Sale_ID",
    "product_key",
    "store_key",
    "date_key",
    "Quantity_Sold",
    "Revenue"
]].copy()

# --- 4.2 fact_inventory ---
# Original columns: [Product_ID, Store_ID, Warehouse_ID, Stock_Level, Reorder_Level, Last_Updated]
fact_inventory = inventory_df.copy()

fact_inventory = map_product_key(fact_inventory, "Product_ID")
fact_inventory = map_store_key(fact_inventory, "Store_ID")
fact_inventory = map_warehouse_key(fact_inventory, "Warehouse_ID")
fact_inventory = map_date_key(fact_inventory, "Last_Updated")

fact_inventory = fact_inventory[[
    "product_key",
    "store_key",
    "warehouse_key",
    "Stock_Level",
    "Reorder_Level",
    "date_key"  # or rename to 'last_updated_date_key'
]].copy()

# --- 4.3 fact_purchase_orders ---
# Original columns: [Order_ID, Product_ID, Supplier_ID, Order_Date, Quantity, Arrival_Date]
fact_purchase_orders = purchase_orders_df.copy()

# Map product_key & supplier_key
fact_purchase_orders = map_product_key(fact_purchase_orders, "Product_ID")
fact_purchase_orders = map_supplier_key(fact_purchase_orders, "Supplier_ID")

# Map order_date_key
fact_purchase_orders = map_date_key(fact_purchase_orders, "Order_Date")
fact_purchase_orders.rename(columns={"date_key": "order_date_key"}, inplace=True)

# Map arrival_date_key
fact_purchase_orders = map_date_key(fact_purchase_orders, "Arrival_Date")
fact_purchase_orders.rename(columns={"date_key": "arrival_date_key"}, inplace=True)

fact_purchase_orders = fact_purchase_orders[[
    "Order_ID",
    "product_key",
    "supplier_key",
    "order_date_key",
    "Quantity",
    "arrival_date_key"
]].copy()


## 5. Load into MySQL

In [41]:
# Update credentials and DB name as needed
username = 'root'
password = '12345'
host = 'localhost'
port = '3306'
database = 'case4'

engine = create_engine(f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}")

# --- 5.1 Write dimension tables ---
dim_products.to_sql("dim_products", con=engine, if_exists="replace", index=False)
dim_suppliers.to_sql("dim_suppliers", con=engine, if_exists="replace", index=False)
dim_stores.to_sql("dim_stores", con=engine, if_exists="replace", index=False)
dim_warehouses.to_sql("dim_warehouses", con=engine, if_exists="replace", index=False)
dim_dates.to_sql("dim_dates", con=engine, if_exists="replace", index=False)

# --- 5.2 Write fact tables ---
fact_sales.to_sql("fact_sales", con=engine, if_exists="replace", index=False)
fact_inventory.to_sql("fact_inventory", con=engine, if_exists="replace", index=False)
fact_purchase_orders.to_sql("fact_purchase_orders", con=engine, if_exists="replace", index=False)

print("Star Schema tables loaded successfully into MySQL.")


Star Schema tables loaded successfully into MySQL.
