# Gold layer

## Purpose

This notebook builds the **Gold layer** of the Crisis Recovery Lakehouse.

The Gold layer represents the **business-facing, decision-ready outputs**
used by:
- Leadership dashboards
- Operations teams
- Retention & marketing teams
- Crisis response stakeholders

Unlike Bronze and Silver, Gold tables are:
- Aggregated
- Opinionated
- Business-aligned
- Optimized for reporting and decision-making

---

## Business Context

During a crisis, leadership does not want raw data or row-level facts.

They need clear answers to questions like:
- Is the crisis getting better or worse?
- Which markets and stores are under stress?
- Which customers are at risk of churning?
- Where should intervention happen first?

The Gold layer transforms analytical data into **actionable intelligence**.

---

## Inputs and Outputs

### Inputs (from Silver Layer)

| Source Table | Purpose |
|-------------|---------|
| `silver_orders_enriched` | Customer behavior & sentiment |
| `silver_sla_metrics` | Operational & delivery performance |

---

### Outputs (Gold Tables)

| Table | Business Purpose |
|------|------------------|
| `gold_daily_kpis` | Executive-level crisis monitoring |
| `gold_customer_churn_risk` | Retention & CRM targeting |
| `gold_daily_sla_health` | Operations health tracking |
| `gold_store_sla_ranking` | Store-level risk prioritization |
| `gold_market_congestion` | Market-level capacity stress |

---

## Design Principles of the Gold Layer

- Tables answer **specific business questions**
- Metrics are **pre-computed**, not derived on the fly
- Each table serves **one primary audience**
- Optimized for fast BI queries
- No raw or sensitive text data exposed



## 1: Daily Crisis KPIs (`gold_daily_kpis`)

### Business Problem

Executives need a **single daily view** answering:
- How severe are delivery delays?
- Are customer complaints increasing or decreasing?
- Is the crisis stabilizing?

---

### Approach

We aggregate order and sentiment data by day to compute:
- Total orders
- Average delivery delay
- Late delivery complaints
- Food safety incidents
- Positive feedback trends

This table powers the **Crisis Recovery Tracker dashboard**.

In [0]:
from pyspark.sql.functions import avg, count, sum, col, when,to_date

# 1. Read from Silver Enriched
df_silver = spark.table("food_delivery.silver_orders_enriched")

if not spark.catalog.tableExists("food_delivery.gold_daily_kpis"):
# 2. Aggregate by Day
    gold_daily_kpis = (
        df_silver
        .withColumn("order_date", to_date("created_at_simulated"))
        .groupBy("order_date").agg(
        count("order_id").alias("total_orders"),
        
        # Calculate Average Delivery Time (in minutes)
        (avg(col("actual_delivery_time_simulated").cast("long") - col("created_at_simulated").cast("long")) / 60).alias("avg_delivery_minutes"),
        
        # Count how many reviews were Negative vs Positive
        sum(when(col("sentiment_category") == "Late Delivery", 1).otherwise(0)).alias("late_complaints"),
        sum(when(col("sentiment_category") == "Food Safety", 1).otherwise(0)).alias("safety_incidents"),
        sum(when(col("sentiment_category") == "Positive", 1).otherwise(0)).alias("positive_reviews")
    ).orderBy("order_date"))

    # 3. Write to Gold Table
    gold_daily_kpis.write.format("delta").mode("overwrite").saveAsTable("food_delivery.gold_daily_kpis")
else:
     print("Gold Daily KPIs table already exists → skipping creation")

display(spark.table("food_delivery.gold_daily_kpis"))

## 2: Customer Churn Risk (`gold_customer_churn_risk`)

### Business Problem

Retention teams must identify:
- Customers impacted by the crisis
- Customers at risk of leaving
- Customers who need proactive offers

---

### Approach

We aggregate customer-level behavior to compute:
- Last order date
- Lifetime orders
- Negative experience frequency
- Days since last engagement

Based on these signals, we assign **churn risk segments**.

---

### Key Assumption

> In a crisis, **recent bad experiences combined with inactivity**
> are stronger churn indicators than historical loyalty alone.

In [0]:
from pyspark.sql.functions import (max as max_date, lit, datediff, current_date, col,
    when,
    count,
    sum,
    datediff,to_date)

# 1. Read Silver
df_silver = spark.table("food_delivery.silver_orders_enriched")

if not spark.catalog.tableExists("food_delivery.gold_customer_churn_risk"):

    # 2. Customer Level Aggregation
    gold_churn = df_silver.groupBy("customer_id", "customer_name", "segment").agg(
        max_date(to_date("created_at_simulated")).alias("last_order_date"),
        count("order_id").alias("lifetime_orders"),
        
        # How many times did they complain about lateness?
        sum(when(col("sentiment_category") == "Late Delivery", 1).otherwise(0)).alias("bad_experiences_count")
    )

    # 3. Define Logic: Who is "At Risk"?
    # Rule: If they had a bad experience AND haven't ordered in the last 45 days (assuming current date is End of 2025)
    # Note: We use our simulated "Current Date" (2025-12-31) for calculation
    simulated_today = to_date(lit("2025-12-31"))

    gold_churn_risk = gold_churn.withColumn(
        "days_since_last_order", 
        datediff(simulated_today, col("last_order_date"))
    ).withColumn(
        "churn_risk_segment",
        when((col("bad_experiences_count") > 0) & (col("days_since_last_order") > 30), "High Risk - Crisis Victim")
        .when(col("days_since_last_order") > 60, "High Risk - Natural Churn")
        .otherwise("Loyal / Active")
    )

    # 4. Write to Gold Table
    gold_churn_risk.write.format("delta").mode("overwrite").saveAsTable("food_delivery.gold_customer_churn_risk")

else:
    print("Gold Customer Churn Risk table already exists → skipping creation")

# Display the "High Risk" customers we need to save
display(spark.table("food_delivery.gold_customer_churn_risk").filter(col("churn_risk_segment") == "High Risk - Crisis Victim").limit(10))

In [0]:
from pyspark.sql.functions import (max as max_date, lit, datediff, current_date, col)

display(spark.table("food_delivery.gold_customer_churn_risk").limit(10))

## 3: Customer Risk Profile (`gold_customer_risk_profile`)

### Business Problem

During a crisis, retention teams cannot act on raw order or review data.
They need a **single, customer-level snapshot** that answers:

- Which customers are likely to churn?
- Who requires immediate intervention?
- Which dissatisfied customers are most valuable to the business?

Event-level data is too granular and slow for operational decision-making.

---

### Approach

We construct a **denormalized customer risk profile** by aggregating
order behavior and sentiment signals:

- Recency of last order
- Lifetime order frequency
- Average order value
- Count of negative experiences
- Sentiment of the most recent order

These signals are combined into:
- A **churn probability score** (0–100)
- A **lifetime value estimate**
- A **last-order sentiment indicator**

This table is optimized for direct consumption by
CRM systems, dashboards, and retention workflows.

---

### Key Assumption

> In a crisis scenario, **recent inactivity combined with repeated negative experiences**
> is a stronger churn signal than historical loyalty alone.


In [0]:
#Build Aggregates

from pyspark.sql import functions as F
from pyspark.sql.window import Window


df_silver = spark.table("food_delivery.silver_orders_enriched")

simulated_today = lit("2025-12-31")

customer_agg = (
    df_silver
    .groupBy("customer_id", "customer_name", "segment")
    .agg(
        F.to_date(max_date("created_at_simulated")).alias("last_order_date"),
        count("order_id").alias("lifetime_orders"),
        avg("subtotal").alias("avg_order_value"),
        sum(when(col("sentiment_category") != "Positive", 1).otherwise(0))
            .alias("negative_experiences")
    )
)


#Lifetime Value
customer_agg = customer_agg.withColumn(
    "lifetime_value",
    col("lifetime_orders") * col("avg_order_value")
)

#Churn Probability Score (0–100)
customer_agg = (
    customer_agg
    .withColumn(
        "days_since_last_order",
        datediff(simulated_today, col("last_order_date"))
    )
    .withColumn(
        "churn_probability_score",
        when(col("days_since_last_order") > 90, 80)
        .when(col("days_since_last_order") > 60, 60)
        .when(col("days_since_last_order") > 30, 40)
        .otherwise(20)
        + when(col("negative_experiences") > 2, 20).otherwise(0)
    )
)

#Last Order Sentiment
last_sentiment = (
    df_silver
    .withColumn(
        "rn",
        F.row_number().over(
            Window.partitionBy("customer_id").orderBy(col("created_at_simulated").desc())
        )
    )
    .filter(col("rn") == 1)
    .select("customer_id", col("sentiment_category").alias("last_order_sentiment"))
)

gold_customer_risk_profile = (
    customer_agg
    .join(last_sentiment, "customer_id", "left")
)

#Write Gold Table
gold_customer_risk_profile.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("food_delivery.gold_customer_risk_profile")

display(spark.table("food_delivery.gold_customer_risk_profile").limit(10))


## 4: Daily SLA Health (`gold_daily_sla_health`)

### Business Problem

Operations teams need to monitor:
- Delivery delays
- Capacity utilization
- Backlog pressure

at a **daily resolution**.

---

### Approach

We aggregate SLA metrics by date to calculate:
- Average delivery delay
- Percentage of delayed orders
- Dasher utilization
- Outstanding order backlog

This enables early detection of operational degradation.

In [0]:
from pyspark.sql.functions import avg, count, sum, col, when, to_date

# Read Silver SLA metrics table
df_sla = spark.table("food_delivery.silver_sla_metrics")

# Check if the Gold daily SLA health table already exists to avoid overwriting
if not spark.catalog.tableExists("food_delivery.gold_daily_sla_health"):
    # Aggregate daily SLA metrics
    gold_daily_sla_health = (
        df_sla
        # Extract order date from simulated creation timestamp
        .withColumn("order_date", to_date("created_at_simulated"))
        .groupBy("order_date")
        .agg(
            # Total orders per day
            count("order_id").alias("total_orders"),

            # Average delivery delay in seconds
            avg("delivery_delay_seconds").alias("avg_delay_seconds"),

            # Count of delayed orders (delay > 0)
            sum(when(col("delivery_delay_seconds") > 0, 1).otherwise(0))
                .alias("delayed_orders"),

            # Percentage of delayed orders
            (sum(when(col("delivery_delay_seconds") > 0, 1).otherwise(0))
            / count("order_id") * 100).alias("delay_percentage"),

            # Average dasher utilization (busy/onshift), skip if onshift is zero
            avg(
                when(col("total_onshift_dashers") == 0, None)
                .otherwise(col("total_busy_dashers") / col("total_onshift_dashers"))
            ).alias("avg_dasher_utilization"),

            # Average outstanding orders per day
            avg("total_outstanding_orders").alias("avg_outstanding_orders")
        )
        .orderBy("order_date")
    )

    # Save the aggregated results as a Gold table for downstream reporting
    gold_daily_sla_health.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable("food_delivery.gold_daily_sla_health")
else:
    print("Gold Daily SLA Health table already exists → skipping creation")

# Display a sample of daily SLA health metrics

display(spark.table("food_delivery.gold_daily_sla_health").limit(5))


## 5: Store SLA Ranking (`gold_store_sla_ranking`)

### Business Problem

Not all stores contribute equally to the crisis.

Operations teams must:
- Identify worst-performing stores
- Prioritize interventions
- Allocate resources effectively

---

### Approach

We rank stores based on:
- Delay frequency
- Average delay duration
- Volume of impacted orders

This table supports **store-level accountability**.

In [0]:

from pyspark.sql.functions import avg, count, sum, col, when, to_date

# Read the Silver SLA metrics table
df_sla = spark.table("food_delivery.silver_sla_metrics")

# Check if the Gold store SLA ranking table already exists to avoid overwriting
if not spark.catalog.tableExists("food_delivery.gold_store_sla_ranking"):
    # Aggregate SLA metrics at the store level
    gold_store_sla_ranking = (
        df_sla
        .groupBy("store_id")
        .agg(
            # Total orders per store
            count("order_id").alias("total_orders"),
            # Average delivery delay in seconds per store
            avg("delivery_delay_seconds").alias("avg_delay_seconds"),
            # Count of delayed orders (delay > 0) per store
            sum(when(col("delivery_delay_seconds") > 0, 1).otherwise(0))
                .alias("delayed_orders")
        )
        # Calculate delay rate as a percentage
        .withColumn(
            "delay_rate",
            col("delayed_orders") / col("total_orders") * 100
        )
        # Order stores by delay rate descending
        .orderBy(col("delay_rate").desc())
    )

    # Save the aggregated results as a Gold table for downstream reporting
    gold_store_sla_ranking.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable("food_delivery.gold_store_sla_ranking")
else:
    print("Gold store SLA ranking table already exists → skipping creation")

# Display a sample of store SLA ranking metrics
display(spark.table("food_delivery.gold_store_sla_ranking").limit(10))

## 5: Market Congestion Analysis (`gold_market_congestion`)

### Business Problem

At scale, crises are often driven by **regional capacity imbalances**.

Leadership needs to know:
- Which markets are overloaded
- Where supply is insufficient
- Where demand management is required

---

### Approach

We compute congestion metrics per market using:
- Outstanding orders
- Busy vs on-shift dashers
- Derived congestion ratios

This table informs **strategic, market-level decisions**.

In [0]:
from pyspark.sql.functions import avg, count, sum, col, when, to_date

# Read the Silver SLA metrics table
df_sla = spark.table("food_delivery.silver_sla_metrics")

# Check if the Gold market congestion table already exists to avoid overwriting
if not spark.catalog.tableExists("food_delivery.gold_market_congestion"):
    # Aggregate congestion metrics at the market level
    gold_market_congestion = (
        df_sla
        .groupBy("market_id")
        .agg(
            avg("total_outstanding_orders").alias("avg_backlog"),      # Average outstanding orders per market
            avg("total_busy_dashers").alias("avg_busy_dashers"),       # Average busy dashers per market
            avg("total_onshift_dashers").alias("avg_onshift_dashers")  # Average onshift dashers per market
        )
        # Calculate congestion ratio: backlog divided by onshift dashers
        .withColumn(
            "congestion_ratio",
            col("avg_backlog") / col("avg_onshift_dashers")
        )
        # Order markets by congestion ratio descending
        .orderBy(col("congestion_ratio").desc())
    )

    # Save the aggregated results as a Gold table for downstream reporting
    gold_market_congestion.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable("food_delivery.gold_market_congestion")
else:
    print("Gold Market Congestion table already exists → skipping creation")

# Display a sample of market congestion metrics
display(spark.table("food_delivery.gold_market_congestion").limit(10))

## Performance Optimization

In [0]:
%sql
OPTIMIZE food_delivery.gold_daily_sla_health
ZORDER BY (order_date);

OPTIMIZE food_delivery.gold_customer_risk_profile
ZORDER BY (churn_probability_score);

OPTIMIZE food_delivery.gold_customer_risk_profile
ZORDER BY (churn_probability_score);


## Downstream Usage

Gold tables are consumed by:
- Databricks Dashboards
- BI tools (Power BI / Tableau)
- Executive reviews
- Crisis response playbooks

---

## Summary

This notebook converts **trusted analytical data**
into **business-ready intelligence**.

It represents the final step in the Crisis Recovery Lakehouse,
where data becomes **decisions**.