In [0]:
"""
Purpose: Advanced Gold Layer with ML-driven KPIs and business intelligence metrics
Layer: Gold (Business Ready + ML Features)
"""

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    col, sum, avg, count, max, min, stddev, when, lit, 
    datediff, current_date, expr, round, percent_rank,
    row_number, lag, lead, dense_rank, ntile
)
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.appName("AdvancedGoldModeling").getOrCreate()

# Read Silver Delta table
silver_path = "/mnt/silver/supply_chain_inventory"
df_silver = spark.read.format("delta").load(silver_path)

# ============================================================================
# 1. INVENTORY HEALTH METRICS (SKU Level)
# ============================================================================
df_sku_health = df_silver.select(
    col("SKU_ID"),
    col("SKU_Name"),
    col("Category"),
    col("ABC_Class"),
    col("Warehouse_Location"),
    col("Supplier_Name"),
    col("Quantity_On_Hand"),
    col("Quantity_Reserved"),
    col("Quantity_Committed"),
    col("Damaged_Qty"),
    col("Returns_Qty"),
    col("Avg_Daily_Sales"),
    col("Forecast_Next_30d"),
    col("Days_of_Inventory"),
    col("Reorder_Point"),
    col("Safety_Stock"),
    col("Lead_Time_Days"),
    col("Unit_Cost_USD"),
    col("Total_Inventory_Value_USD"),
    col("SKU_Churn_Rate"),
    col("Stock_Age_Days"),
    col("Inventory_Status"),
    col("Demand_Forecast_Accuracy_Pct"),
    col("Supplier_OnTime_Pct"),
    col("Expiry_Date")
).withColumn(
    # Available to Promise (ATP)
    "ATP_Quantity",
    col("Quantity_On_Hand") - col("Quantity_Reserved") - col("Quantity_Committed")
).withColumn(
    # Inventory Turnover Ratio (Annual)
    "Inventory_Turnover_Ratio",
    round(when(col("Quantity_On_Hand") > 0, 
         (col("Avg_Daily_Sales") * 365) / col("Quantity_On_Hand"))
         .otherwise(0), 2)
).withColumn(
    # Service Level (%) - based on stock vs forecast
    "Service_Level_Pct",
    round(when(col("Forecast_Next_30d") > 0,
         (col("Quantity_On_Hand") / col("Forecast_Next_30d")) * 100)
         .otherwise(0), 2)
).withColumn(
    # Stockout Risk Score (0-100)
    "Stockout_Risk_Score",
    round(
        when(col("ATP_Quantity") <= 0, 100)
        .when(col("ATP_Quantity") < col("Safety_Stock"), 75)
        .when(col("ATP_Quantity") < col("Reorder_Point"), 50)
        .when(col("Days_of_Inventory") < col("Lead_Time_Days"), 40)
        .otherwise(10),
    0)
).withColumn(
    # Excess Inventory Flag (slow-moving)
    "Excess_Inventory_Flag",
    when((col("Days_of_Inventory") > 90) & (col("SKU_Churn_Rate") < 2), "Yes")
    .otherwise("No")
).withColumn(
    # Dead Stock Flag (no movement + high age)
    "Dead_Stock_Flag",
    when((col("Avg_Daily_Sales") < 0.5) & (col("Stock_Age_Days") > 120), "Yes")
    .otherwise("No")
).withColumn(
    # Days Until Expiry
    "Days_Until_Expiry",
    datediff(col("Expiry_Date"), current_date())
).withColumn(
    # Expiry Risk Category
    "Expiry_Risk",
    when(col("Days_Until_Expiry") <= 7, "Critical")
    .when(col("Days_Until_Expiry") <= 30, "High")
    .when(col("Days_Until_Expiry") <= 90, "Medium")
    .otherwise("Low")
).withColumn(
    # Quality Issue Score (based on damages and returns)
    "Quality_Issue_Score",
    round(
        when(col("Quantity_On_Hand") > 0,
            ((col("Damaged_Qty") + col("Returns_Qty")) / col("Quantity_On_Hand")) * 100)
        .otherwise(0),
    2)
).withColumn(
    # Carrying Cost (estimated at 20% annual holding cost)
    "Annual_Carrying_Cost_USD",
    round(col("Total_Inventory_Value_USD") * 0.20, 2)
).withColumn(
    # Perfect Order Rate Component
    "Perfect_Order_Rate",
    round(
        when((col("Supplier_OnTime_Pct") >= 95) & 
             (col("Demand_Forecast_Accuracy_Pct") >= 90) &
             (col("Quality_Issue_Score") < 2), 100)
        .otherwise(70),
    0)
)

# ============================================================================
# 2. WAREHOUSE PERFORMANCE METRICS (Aggregated)
# ============================================================================
df_warehouse_kpi = df_sku_health.groupBy("Warehouse_Location").agg(
    count("SKU_ID").alias("Total_SKUs"),
    sum("Quantity_On_Hand").alias("Total_Stock_Units"),
    sum("Total_Inventory_Value_USD").alias("Total_Inventory_Value"),
    avg("Days_of_Inventory").alias("Avg_Days_of_Inventory"),
    avg("Inventory_Turnover_Ratio").alias("Avg_Turnover_Ratio"),
    sum(when(col("Inventory_Status") == "Out of Stock", 1).otherwise(0)).alias("Out_of_Stock_Count"),
    sum(when(col("Stockout_Risk_Score") >= 75, 1).otherwise(0)).alias("High_Risk_SKUs"),
    sum(when(col("Excess_Inventory_Flag") == "Yes", 1).otherwise(0)).alias("Excess_Inventory_SKUs"),
    sum(when(col("Dead_Stock_Flag") == "Yes", 1).otherwise(0)).alias("Dead_Stock_SKUs"),
    sum(when(col("Expiry_Risk") == "Critical", 1).otherwise(0)).alias("Critical_Expiry_SKUs"),
    avg("Service_Level_Pct").alias("Avg_Service_Level"),
    avg("Quality_Issue_Score").alias("Avg_Quality_Score"),
    sum("Annual_Carrying_Cost_USD").alias("Total_Annual_Carrying_Cost"),
    sum("Damaged_Qty").alias("Total_Damaged_Units"),
    sum("Returns_Qty").alias("Total_Returned_Units"),
    avg("Supplier_OnTime_Pct").alias("Avg_Supplier_OnTime_Pct")
).withColumn(
    # Warehouse Fill Rate (%)
    "Warehouse_Fill_Rate",
    round(
        ((col("Total_SKUs") - col("Out_of_Stock_Count")) / col("Total_SKUs")) * 100,
    2)
).withColumn(
    # Inventory Accuracy Score
    "Inventory_Accuracy_Score",
    round(
        100 - ((col("High_Risk_SKUs") + col("Dead_Stock_SKUs")) / col("Total_SKUs") * 100),
    2)
).withColumn(
    # Warehouse Health Score (0-100)
    "Warehouse_Health_Score",
    round(
        (col("Warehouse_Fill_Rate") * 0.3) +
        (col("Avg_Service_Level") * 0.25) +
        (col("Inventory_Accuracy_Score") * 0.25) +
        ((100 - col("Avg_Quality_Score")) * 0.2),
    0)
)

# ============================================================================
# 3. CATEGORY PERFORMANCE METRICS
# ============================================================================
df_category_kpi = df_sku_health.groupBy("Category").agg(
    count("SKU_ID").alias("SKU_Count"),
    sum("Total_Inventory_Value_USD").alias("Category_Value"),
    avg("Inventory_Turnover_Ratio").alias("Avg_Turnover"),
    sum("Avg_Daily_Sales").alias("Total_Daily_Demand"),
    avg("Demand_Forecast_Accuracy_Pct").alias("Forecast_Accuracy"),
    sum(when(col("ABC_Class") == "A", 1).otherwise(0)).alias("Class_A_Count"),
    sum(when(col("ABC_Class") == "B", 1).otherwise(0)).alias("Class_B_Count"),
    sum(when(col("ABC_Class") == "C", 1).otherwise(0)).alias("Class_C_Count")
).withColumn(
    # Category Contribution (%) - calculate after collecting totals
    "Category_Value_Pct",
    round(
        col("Category_Value") / sum("Category_Value").over(Window.partitionBy()) * 100,
    2)
).orderBy(col("Category_Value").desc())

# ============================================================================
# 4. ABC ANALYSIS WITH PARETO PRINCIPLE
# ============================================================================
window_spec = Window.orderBy(col("Total_Inventory_Value_USD").desc())

df_abc_analysis = df_sku_health.withColumn(
    "Cumulative_Value_Pct",
    round(
        (sum("Total_Inventory_Value_USD").over(window_spec.rowsBetween(Window.unboundedPreceding, 0)) /
         sum("Total_Inventory_Value_USD").over(Window.partitionBy())) * 100,
    2)
).withColumn(
    "ABC_Recommendation",
    when(col("Cumulative_Value_Pct") <= 80, "A - High Priority")
    .when(col("Cumulative_Value_Pct") <= 95, "B - Medium Priority")
    .otherwise("C - Low Priority")
).withColumn(
    "Optimal_Reorder_Qty",
    round(
        (col("Avg_Daily_Sales") * col("Lead_Time_Days")) + col("Safety_Stock"),
    0)
)

# ============================================================================
# 5. SUPPLIER PERFORMANCE SCORECARD
# ============================================================================
df_supplier_kpi = df_sku_health.groupBy("Supplier_Name").agg(
    count("SKU_ID").alias("SKUs_Supplied"),
    sum("Total_Inventory_Value_USD").alias("Total_Supply_Value"),
    avg("Supplier_OnTime_Pct").alias("Avg_OnTime_Delivery"),
    avg("Lead_Time_Days").alias("Avg_Lead_Time"),
    avg("Quality_Issue_Score").alias("Avg_Quality_Issues"),
    sum("Damaged_Qty").alias("Total_Damaged_Units"),
    avg("Demand_Forecast_Accuracy_Pct").alias("Forecast_Alignment")
).withColumn(
    # Supplier Performance Score (0-100)
    "Supplier_Performance_Score",
    round(
        (col("Avg_OnTime_Delivery") * 0.4) +
        ((100 - col("Avg_Quality_Issues")) * 0.3) +
        (col("Forecast_Alignment") * 0.3),
    0)
).withColumn(
    "Supplier_Grade",
    when(col("Supplier_Performance_Score") >= 90, "A - Excellent")
    .when(col("Supplier_Performance_Score") >= 75, "B - Good")
    .when(col("Supplier_Performance_Score") >= 60, "C - Average")
    .otherwise("D - Needs Improvement")
).orderBy(col("Supplier_Performance_Score").desc())

# ============================================================================
# 6. WRITE ALL GOLD TABLES
# ============================================================================

# Main SKU Health Table
gold_sku_path = "/mnt/gold/sku_health_metrics"
df_sku_health.write.format("delta").mode("overwrite").save(gold_sku_path)
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS supply_chain_db.gold_sku_health
    USING DELTA LOCATION '{gold_sku_path}'
""")

# Warehouse KPI Table
gold_warehouse_path = "/mnt/gold/warehouse_kpi"
df_warehouse_kpi.write.format("delta").mode("overwrite").save(gold_warehouse_path)
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS supply_chain_db.gold_warehouse_kpi
    USING DELTA LOCATION '{gold_warehouse_path}'
""")

# Category Performance Table
gold_category_path = "/mnt/gold/category_performance"
df_category_kpi.write.format("delta").mode("overwrite").save(gold_category_path)
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS supply_chain_db.gold_category_performance
    USING DELTA LOCATION '{gold_category_path}'
""")

# ABC Analysis Table
gold_abc_path = "/mnt/gold/abc_analysis"
df_abc_analysis.write.format("delta").mode("overwrite").save(gold_abc_path)
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS supply_chain_db.gold_abc_analysis
    USING DELTA LOCATION '{gold_abc_path}'
""")

# Supplier Scorecard Table
gold_supplier_path = "/mnt/gold/supplier_scorecard"
df_supplier_kpi.write.format("delta").mode("overwrite").save(gold_supplier_path)
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS supply_chain_db.gold_supplier_scorecard
    USING DELTA LOCATION '{gold_supplier_path}'
""")

print("âœ… Advanced Gold Layer Complete!")
print(f"📊 Tables Created:")
print(f"  - SKU Health Metrics: {df_sku_health.count()} records")
print(f"  - Warehouse KPIs: {df_warehouse_kpi.count()} warehouses")
print(f"  - Category Performance: {df_category_kpi.count()} categories")
print(f"  - ABC Analysis: {df_abc_analysis.count()} SKUs analyzed")
print(f"  - Supplier Scorecard: {df_supplier_kpi.count()} suppliers")