# Silver ➜ Gold Growth Metrics

This notebook builds gold-layer growth metrics from the GDPR-compliant silver orders dataset.


In [None]:
from datetime import datetime, timedelta

import numpy as np
from pyspark.sql import SparkSession, functions as F, Window

try:
    spark
except NameError:
    spark = SparkSession.builder.appName("silver_to_gold_metrics").getOrCreate()

np.random.seed(21)


## 1. Load Silver Orders

Prefer loading from Unity Catalog; fall back to an example dataset when Spark is unavailable.


In [None]:
if spark.catalog.tableExists("erp_demo", "silver_orders"):
    silver_orders_df = spark.table("erp_demo.silver_orders")
    source = "unity_catalog"
else:
    sample_rows = [
        {
            "order_id": idx,
            "customer_id": cid,
            "customer_hash_id": hash_id,
            "country": country,
            "purchase_value_eur": amount,
            "service_cogs_eur": amount * np.random.uniform(0.25, 0.4),
            "marketing_opt_in": marketing_opt_in,
            "ingestion_timestamp": datetime(2025, 10, 1, 0, 0) + timedelta(hours=i),
        }
        for i, (idx, cid, hash_id, country, amount, marketing_opt_in) in enumerate([
            (1, 1, "cust_a1", "DE", 120.0, True),
            (2, 1, "cust_a1", "DE", 75.0, True),
            (3, 2, "cust_b2", "FR", 200.0, True),
            (4, 2, "cust_b2", "FR", 180.0, True),
            (5, 3, "cust_c3", "ES", 90.5, True),
            (6, 4, "cust_d4", "US", 310.0, False),
            (7, 4, "cust_d4", "US", 95.0, False),
            (8, 5, "cust_e5", "DE", 49.0, True),
            (9, 5, "cust_e5", "DE", 130.0, True),
            (10, 5, "cust_e5", "DE", 160.0, True),
        ])
    ]
    silver_orders_df = spark.createDataFrame(sample_rows)
    source = "sample"

record_count = silver_orders_df.count()
print(f"Loaded silver dataset via {source} path. Records: {record_count}")
silver_orders_df.show(5, truncate=False)


In [None]:
if spark.catalog.tableExists("erp_demo", "invoices_enriched"):
    invoices_enriched_df = spark.table("erp_demo.invoices_enriched")
    invoice_source = "unity_catalog"
else:
    sample_invoice_rows = []
    for month_offset in range(0, 12):
        invoice_month = datetime(2024, 1, 1) + timedelta(days=30 * month_offset)
        for cid in range(1, 6):
            base_mrr = float(np.random.choice([299, 499, 799, 1299]))
            prev_flag = np.random.choice([0, 1])
            prev_net_mrr = base_mrr if prev_flag else 0.0
            churned = bool(np.random.choice([0, 0, 0, 1]))
            expansion_mrr = float(np.random.choice([0, 99, 199], p=[0.6, 0.25, 0.15]))
            contraction_mrr = float(np.random.choice([0, 50], p=[0.85, 0.15]))
            churn_mrr = prev_net_mrr if churned else 0.0
            net_mrr = 0.0 if churned else max(prev_net_mrr + expansion_mrr - contraction_mrr, base_mrr)
            service_cogs = net_mrr * np.random.uniform(0.26, 0.34)
            sample_invoice_rows.append({
                "customer_id": cid,
                "invoice_month": invoice_month,
                "plan": np.random.choice(["Starter", "Growth", "Scale"]),
                "gross_mrr": net_mrr,
                "net_mrr": net_mrr,
                "discount_pct": 0.0,
                "expansion": expansion_mrr > 0,
                "churned": churned,
                "prev_net_mrr": prev_net_mrr,
                "mrr_change": net_mrr - prev_net_mrr,
                "new_mrr": 0.0 if prev_net_mrr > 0 else net_mrr,
                "expansion_mrr": expansion_mrr,
                "contraction_mrr": contraction_mrr,
                "churn_mrr": churn_mrr,
                "service_cogs_eur": service_cogs,
            })
    invoices_enriched_df = spark.createDataFrame(sample_invoice_rows)
    invoice_source = "sample"

if spark.catalog.tableExists("erp_demo", "customer_success_costs"):
    customer_success_cost_df = spark.table("erp_demo.customer_success_costs")
else:
    customer_ids = [row.customer_id for row in invoices_enriched_df.select("customer_id").distinct().collect()]
    customer_cost_rows = [
        {
            "customer_id": cid,
            "cac": float(np.random.choice([1200, 1800, 2500, 4000])),
            "cs_monthly_cost": float(np.random.choice([150, 250, 400])),
        }
        for cid in customer_ids
    ]
    customer_success_cost_df = spark.createDataFrame(customer_cost_rows)

print(f"Loaded invoices from {invoice_source}. Records: {invoices_enriched_df.count()}")


## 2. Feature Engineering

We derive order month, count of events per customer, and flags that support gold metrics.


In [None]:
silver_orders_features_df = (
    silver_orders_df
    .withColumn("order_month", F.date_trunc("month", F.col("ingestion_timestamp")))
    .withColumn("order_count", F.lit(1))
)

silver_orders_features_df.show(5, truncate=False)


## 3. Build Gold Metrics

Aggregate monthly KPIs aligned with growth tracking: revenue, active customers, order velocity, and marketing engagement.


In [None]:
invoice_base_df = invoices_enriched_df.withColumn("invoice_month", F.date_trunc("month", F.col("invoice_month")))
monthly_window = Window.orderBy("invoice_month")

monthly_invoice_metrics_df = (
    invoice_base_df
    .groupBy("invoice_month")
    .agg(
        F.sum("net_mrr").alias("mrr"),
        F.sum("new_mrr").alias("new_mrr"),
        F.sum("expansion_mrr").alias("expansion_mrr"),
        F.sum("contraction_mrr").alias("contraction_mrr"),
        F.sum("churn_mrr").alias("churn_mrr"),
        F.sum("service_cogs_eur").alias("service_cogs_eur"),
        F.countDistinct("customer_id").alias("billed_customers"),
        F.sum(F.when(F.col("churned"), F.lit(1)).otherwise(F.lit(0))).alias("churned_customers"),
        F.sum(F.when((F.col("prev_net_mrr") == 0) & (F.col("net_mrr") > 0), F.lit(1)).otherwise(F.lit(0))).alias("new_customers"),
    )
)

active_customers_df = (
    invoice_base_df.filter(F.col("net_mrr") > 0)
    .groupBy("invoice_month")
    .agg(F.countDistinct("customer_id").alias("active_customers"))
)

monthly_cac_df = (
    invoice_base_df
    .filter((F.col("prev_net_mrr") == 0) & (F.col("net_mrr") > 0))
    .select("customer_id", "invoice_month")
    .join(customer_success_cost_df, "customer_id", "left")
    .groupBy("invoice_month")
    .agg(F.sum("cac").alias("cac_spend"))
)

active_customer_cost_df = (
    invoice_base_df
    .filter(F.col("net_mrr") > 0)
    .select("customer_id", "invoice_month")
    .join(customer_success_cost_df, "customer_id", "left")
    .groupBy("invoice_month")
    .agg(F.sum("cs_monthly_cost").alias("customer_success_spend"))
)

silver_agg_df = (
    silver_orders_features_df
    .groupBy("order_month")
    .agg(
        F.sum("order_count").alias("orders"),
        F.sum(F.when(F.col("marketing_opt_in"), F.lit(1)).otherwise(F.lit(0))).alias("marketing_opt_in_count"),
        F.avg("purchase_value_eur").alias("average_order_value_eur"),
    )
    .withColumnRenamed("order_month", "invoice_month")
)

overhead_df = (
    monthly_invoice_metrics_df.select("invoice_month").distinct()
    .withColumn(
        "fixed_overhead_eur",
        (F.lit(15000.0) + (F.abs(F.hash("invoice_month", F.lit("overhead"))) % 4000)).cast("double"),
    )
)

monthly_metrics_df = (
    monthly_invoice_metrics_df
    .join(active_customers_df, "invoice_month", "left")
    .join(monthly_cac_df, "invoice_month", "left")
    .join(active_customer_cost_df, "invoice_month", "left")
    .join(silver_agg_df, "invoice_month", "left")
    .join(overhead_df, "invoice_month", "left")
    .orderBy("invoice_month")
)

monthly_metrics_df = monthly_metrics_df.fillna({
    "active_customers": 0,
    "cac_spend": 0.0,
    "customer_success_spend": 0.0,
    "orders": 0,
    "marketing_opt_in_count": 0,
    "average_order_value_eur": 0.0,
    "fixed_overhead_eur": 0.0,
})

monthly_metrics_df = (
    monthly_metrics_df
    .withColumn("prev_mrr", F.lag("mrr").over(monthly_window))
    .withColumn("prev_mrr_yoy", F.lag("mrr", 12).over(monthly_window))
    .withColumn("prev_active_customers", F.lag("active_customers").over(monthly_window))
    .withColumn("net_new_mrr", F.col("new_mrr") + F.col("expansion_mrr") - F.col("contraction_mrr") - F.col("churn_mrr"))
    .withColumn("arr", F.col("mrr") * F.lit(12.0))
    .withColumn(
        "nrr",
        F.when(
            F.col("prev_mrr") > 0,
            (F.col("prev_mrr") + F.col("expansion_mrr") - F.col("contraction_mrr") - F.col("churn_mrr")) / F.col("prev_mrr"),
        ).otherwise(F.lit(None)),
    )
    .withColumn("gross_profit_eur", F.col("mrr") - F.col("service_cogs_eur"))
    .withColumn(
        "gross_margin_pct",
        F.when(F.col("mrr") > 0, F.col("gross_profit_eur") / F.col("mrr")).otherwise(F.lit(None)),
    )
    .withColumn(
        "operating_income_eur",
        F.col("mrr")
        - F.col("service_cogs_eur")
        - F.col("customer_success_spend")
        - F.col("fixed_overhead_eur")
        - F.col("cac_spend"),
    )
    .withColumn(
        "operating_margin_pct",
        F.when(F.col("mrr") > 0, F.col("operating_income_eur") / F.col("mrr")).otherwise(F.lit(None)),
    )
    .withColumn(
        "revenue_growth_pct",
        F.when(F.col("prev_mrr") > 0, (F.col("mrr") - F.col("prev_mrr")) / F.col("prev_mrr")).otherwise(F.lit(None)),
    )
    .withColumn(
        "revenue_growth_yoy_pct",
        F.when(F.col("prev_mrr_yoy") > 0, (F.col("mrr") - F.col("prev_mrr_yoy")) / F.col("prev_mrr_yoy")).otherwise(F.lit(None)),
    )
    .withColumn(
        "logo_retention_rate",
        F.when(
            F.col("prev_active_customers") > 0,
            (F.col("prev_active_customers") - F.col("churned_customers")) / F.col("prev_active_customers"),
        ).otherwise(F.lit(None)),
    )
    .withColumn(
        "logo_churn_rate",
        F.when(F.col("prev_active_customers") > 0, F.col("churned_customers") / F.col("prev_active_customers")).otherwise(F.lit(None)),
    )
    .withColumn(
        "arpa",
        F.when(F.col("active_customers") > 0, F.col("mrr") / F.col("active_customers")).otherwise(F.lit(None)),
    )
    .withColumn(
        "ltv",
        F.when(
            (F.col("arpa") > 0) & (F.col("logo_churn_rate") > 0),
            F.col("arpa") * F.col("gross_margin_pct") / F.col("logo_churn_rate"),
        ).otherwise(F.lit(None)),
    )
    .withColumn(
        "ltv_to_cac_ratio",
        F.when(F.col("cac_spend") > 0, F.col("ltv") / F.col("cac_spend")).otherwise(F.lit(None)),
    )
    .withColumn(
        "cac_payback_months",
        F.when(F.col("net_new_mrr") > 0, F.col("cac_spend") / F.col("net_new_mrr")).otherwise(F.lit(None)),
    )
    .withColumn("net_new_arr", F.col("net_new_mrr") * F.lit(12.0))
    .withColumn(
        "net_burn_eur",
        F.when(F.col("operating_income_eur") < 0, -F.col("operating_income_eur")).otherwise(F.lit(0.0)),
    )
    .withColumn(
        "burn_multiple",
        F.when(F.col("net_new_arr") > 0, F.col("net_burn_eur") / F.col("net_new_arr")).otherwise(F.lit(None)),
    )
    .withColumn(
        "rule_of_40_score",
        F.when(
            F.col("revenue_growth_pct").isNotNull() & F.col("operating_margin_pct").isNotNull(),
            (F.col("revenue_growth_pct") * 100.0) + (F.col("operating_margin_pct") * 100.0),
        ).otherwise(F.lit(None)),
    )
)

monthly_metrics_df = monthly_metrics_df.withColumn(
    "monthly_active_users",
    (F.col("active_customers") * (F.lit(15.0) + (F.abs(F.hash("invoice_month", F.lit("mau"))) % 30))).cast("double"),
)

monthly_metrics_df = monthly_metrics_df.withColumn(
    "daily_active_users",
    (F.col("monthly_active_users") * (F.lit(0.3) + (F.abs(F.hash("invoice_month", F.lit("dau"))) % 35) / 100.0)).cast("double"),
)

monthly_metrics_df = monthly_metrics_df.withColumn(
    "dau_mau_ratio",
    F.when(F.col("monthly_active_users") > 0, F.col("daily_active_users") / F.col("monthly_active_users")).otherwise(F.lit(None)),
)

monthly_metrics_df = monthly_metrics_df.withColumn(
    "expansion_rate",
    F.when(F.col("mrr") > 0, F.col("expansion_mrr") / F.col("mrr")).otherwise(F.lit(None)),
)

monthly_metrics_df = monthly_metrics_df.withColumnRenamed("invoice_month", "order_month")

monthly_metrics_df.orderBy("order_month").show(truncate=False)


In [None]:
metrics_structs = [
    F.struct(
        F.lit("Monthly Recurring Revenue (MRR)").alias("metric_name"),
        F.col("mrr").alias("metric_value"),
        F.lit("EUR").alias("unit"),
        F.lit("Total recurring revenue recognised in the month from subscriptions.").alias("description"),
        F.when(F.col("revenue_growth_pct") > 0, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Annual Recurring Revenue (ARR)").alias("metric_name"),
        F.col("arr").alias("metric_value"),
        F.lit("EUR").alias("unit"),
        F.lit("Annualised value of the recurring revenue base (MRR × 12).").alias("description"),
        F.when(F.col("revenue_growth_pct") > 0, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Net Revenue Retention (NRR)").alias("metric_name"),
        F.col("nrr").alias("metric_value"),
        F.lit("ratio").alias("unit"),
        F.lit("Retention of existing customers' revenue including expansion and contraction.").alias("description"),
        F.when(F.col("nrr") >= 1, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Customer Acquisition Cost (CAC)").alias("metric_name"),
        F.col("cac_spend").alias("metric_value"),
        F.lit("EUR").alias("unit"),
        F.lit("Sales and marketing spend allocated to newly acquired customers.").alias("description"),
        F.when(F.col("cac_spend") <= 3000, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Customer Lifetime Value (LTV)").alias("metric_name"),
        F.col("ltv").alias("metric_value"),
        F.lit("EUR").alias("unit"),
        F.lit("Expected gross profit per customer based on ARPA, margin and churn.").alias("description"),
        F.when(F.col("ltv_to_cac_ratio") >= 3, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Gross Margin").alias("metric_name"),
        F.col("gross_margin_pct").alias("metric_value"),
        F.lit("ratio").alias("unit"),
        F.lit("Share of revenue retained after direct service delivery costs.").alias("description"),
        F.when(F.col("gross_margin_pct") >= 0.75, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("CAC Payback Period").alias("metric_name"),
        F.col("cac_payback_months").alias("metric_value"),
        F.lit("months").alias("unit"),
        F.lit("Months required for net new recurring revenue to recover CAC.").alias("description"),
        F.when(F.col("cac_payback_months") <= 12, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Revenue Growth Rate (MoM)").alias("metric_name"),
        F.col("revenue_growth_pct").alias("metric_value"),
        F.lit("ratio").alias("unit"),
        F.lit("Month-over-month change in recurring revenue.").alias("description"),
        F.when(F.col("revenue_growth_pct") > 0, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Revenue Growth Rate (YoY)").alias("metric_name"),
        F.col("revenue_growth_yoy_pct").alias("metric_value"),
        F.lit("ratio").alias("unit"),
        F.lit("Year-over-year change in recurring revenue for the same month.").alias("description"),
        F.when(F.col("revenue_growth_yoy_pct") > 0, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Burn Multiple").alias("metric_name"),
        F.col("burn_multiple").alias("metric_value"),
        F.lit("multiple").alias("unit"),
        F.lit("Capital efficiency indicator: net burn divided by net new ARR.").alias("description"),
        F.when(F.col("burn_multiple") <= 1.5, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Rule of 40").alias("metric_name"),
        F.col("rule_of_40_score").alias("metric_value"),
        F.lit("score").alias("unit"),
        F.lit("Sum of revenue growth % and operating margin %.").alias("description"),
        F.when(F.col("rule_of_40_score") >= 40, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Logo Retention Rate").alias("metric_name"),
        F.col("logo_retention_rate").alias("metric_value"),
        F.lit("ratio").alias("unit"),
        F.lit("Percentage of customers retained period over period.").alias("description"),
        F.when(F.col("logo_retention_rate") >= 0.9, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Logo Churn Rate").alias("metric_name"),
        F.col("logo_churn_rate").alias("metric_value"),
        F.lit("ratio").alias("unit"),
        F.lit("Share of customers lost relative to the prior period customer base.").alias("description"),
        F.when(F.col("logo_churn_rate") <= 0.1, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Active Users (DAU/MAU ratio)").alias("metric_name"),
        F.col("dau_mau_ratio").alias("metric_value"),
        F.lit("ratio").alias("unit"),
        F.lit("Stickiness indicator comparing daily active users to monthly active users.").alias("description"),
        F.when(F.col("dau_mau_ratio") >= 0.3, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
    F.struct(
        F.lit("Expansion Revenue / Upsell Rate").alias("metric_name"),
        F.col("expansion_rate").alias("metric_value"),
        F.lit("ratio").alias("unit"),
        F.lit("Proportion of revenue coming from expansion MRR within the month.").alias("description"),
        F.when(F.col("expansion_rate") >= 0.1, F.lit(True)).otherwise(F.lit(False)).alias("is_efficient"),
    ),
]

metrics_long_df = (
    monthly_metrics_df
    .withColumn("metrics", F.array(*metrics_structs))
    .select("order_month", F.explode("metrics").alias("metric"))
    .select(
        "order_month",
        F.col("metric.metric_name").alias("metric_name"),
        F.col("metric.metric_value").alias("metric_value"),
        F.col("metric.unit").alias("unit"),
        F.col("metric.description").alias("description"),
        F.col("metric.is_efficient").alias("is_efficient"),
    )
)

metrics_long_df.orderBy("order_month", "metric_name").show(truncate=False)



## 4. Gold-Layer Quality Checks


In [None]:
assert {"customer_hash_id", "purchase_value_eur"}.issubset(set(silver_orders_df.columns))
assert "first_name" not in silver_orders_df.columns, "PII leakage from previous tier"
assert monthly_metrics_df.filter(F.col("mrr") < 0).count() == 0, "Negative MRR detected"
assert metrics_long_df.count() > 0, "Metrics table is empty"

print("Gold growth checks passed ✅")


## 5. Persist Gold Growth Metrics (optional)

Execute inside Databricks to publish the gold tables and metrics view.


In [None]:
if spark:
    spark.sql("CREATE DATABASE IF NOT EXISTS erp_demo")
    monthly_metrics_df.write.mode("overwrite").saveAsTable("erp_demo.gold_growth_metrics_wide")
    metrics_long_df.write.mode("overwrite").saveAsTable("erp_demo.gold_growth_metrics")
    spark.sql(
        """
        CREATE OR REPLACE VIEW erp_demo.gold_growth_metrics_view AS
        SELECT * FROM erp_demo.gold_growth_metrics
        """
    )
    print("Gold tables 'erp_demo.gold_growth_metrics' (long) and '_wide' plus metrics view created/updated.")
else:
    print("Spark session not available. Export step skipped.")
