#  Gold Layer — Business Metrics

**Annie's Magic Numbers — Medallion Architecture**

Produces the 10 analytical tables for Power BI using clean data from Silver.
All tables are **aggregated** and contain **only the columns used in the dashboard**.

### Gold Tables (columns = only those consumed by Power BI):
| # | Table | Columns |
|---|---|---|
| 1 | `gold.sales_enriched` | brand, description, size, classification, store, sales_quantity, sales_dollars, profit_dollars, volume |
| 2 | `gold.product_profitability` | brand, description, size, classification, total_units_sold, total_revenue, total_profit_dollars, avg_margin_pct, is_loss_maker, rank_by_profit, rank_by_margin |
| 3 | `gold.brand_profitability` | brand, classification, sku_count, total_units_sold, total_revenue, total_profit_dollars, avg_margin_pct, is_loss_maker, rank_by_profit, rank_by_margin |
| 4 | `gold.loss_makers` | level, brand, description, size, classification, stores_stocking, total_units_sold, total_revenue, total_profit_dollars, avg_margin_pct, recommendation |
| 5 | `gold.sales_by_store` | store, transaction_count, unique_brands, unique_skus, total_units_sold, total_revenue, total_profit_dollars, avg_margin_pct |
| 6 | `gold.sales_time_series` | sale_year, sale_month, sale_month_name, monthly_revenue, monthly_profit, avg_margin_pct, total_units_sold, active_brands, active_stores, transaction_count |
| 7 | `gold.inventory_delta` | brand, description, size, vendor_name, beg_on_hand, end_on_hand, inventory_change, inventory_value_change, stock_status |
| 8 | `gold.vendor_performance` | vendor_name, brands_supplied, skus_supplied, total_purchase_spend, avg_cost_per_unit, avg_lead_time_days, total_po_count |
| 9 | `gold.size_analysis` | size, unique_skus, unique_brands, total_units_sold, total_profit_dollars, avg_margin_pct, avg_selling_price |
| 10 | `gold.classification_performance` | classification, unique_brands, unique_skus, total_units_sold, total_revenue, total_profit_dollars, overall_margin_pct |

###  Configuration — ADLS Gen2 Authentication

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "8")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoOptimize.optimizeWrite", "true")

spark.conf.set(
    "fs.azure.account.key.anniedatalake123.dfs.core.windows.net",
    "<REDACTED_AZURE_STORAGE_KEY>"
)
print(" Spark config ready.")

###  Path Setup & Imports

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

container_name  = "annie-data"
storage_account = "anniedatalake123"
base_path       = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/"
silver_path     = base_path + "silver/"
gold_path       = base_path + "gold/"

print(f"Silver : {silver_path}")
print(f"Gold   : {gold_path}")

###  Register Silver Tables + Schema Validation

In [None]:
# Silver schemas after normalize_columns() in 02_silver_cleaning:
#   sales      : store, brand, description, size, classification,
#                sales_quantity, sales_dollars, sales_price, sales_date  <-- NOTE: sales_date (not sale_date)
#   purchases  : brand, description, vendor_name, vendor_number, quantity,
#                purchase_price, cost_per_unit, total_cost, po_date, receiving_date, invoice_date
#   beg_inventory / end_inventory : brand, description, size, on_hand, price, inventory_id

spark.read.format("delta").load(silver_path + "sales").createOrReplaceTempView("sv_sales")
spark.read.format("delta").load(silver_path + "purchases").createOrReplaceTempView("sv_purchases")
spark.read.format("delta").load(silver_path + "beg_inventory").createOrReplaceTempView("sv_beg_inv")
spark.read.format("delta").load(silver_path + "end_inventory").createOrReplaceTempView("sv_end_inv")

print(" Silver views registered.")
print("sv_sales cols    :", spark.table("sv_sales").columns)
print("sv_purchases cols:", spark.table("sv_purchases").columns)
print("sv_beg_inv cols  :", spark.table("sv_beg_inv").columns)

###  Helper Writer Function

In [None]:
def write_gold(df, table_name):
    """
    Saves a DataFrame as a MANAGED Delta table in hive_metastore.annie_gold.
    Readable from Power BI via SQL Warehouse without any ADLS key config.
    """
    spark.sql("CREATE SCHEMA IF NOT EXISTS hive_metastore.annie_gold")
    (
        df.write
          .format("delta")
          .mode("overwrite")
          .option("overwriteSchema", "true")
          .saveAsTable(f"hive_metastore.annie_gold.{table_name}")
    )
    count = spark.table(f"hive_metastore.annie_gold.{table_name}").count()
    print(f"    gold.{table_name} → {count:,} rows | {len(df.columns)} cols")

---
##  Intermediate Steps — Cost Lookup + Cached Enriched Sales

These are **not** Gold tables — in-memory helpers that feed all 10 tables below.

In [None]:
# Median cost per (brand, description) from purchase invoices
cost_lookup = spark.sql("""
    SELECT
        brand,
        description,
        PERCENTILE_APPROX(cost_per_unit, 0.5) AS median_cost_per_unit
    FROM sv_purchases
    WHERE cost_per_unit IS NOT NULL AND cost_per_unit > 0
    GROUP BY brand, description
""")
cost_lookup.createOrReplaceTempView("cost_lookup")
print(f" cost_lookup: {cost_lookup.count():,} SKUs with known cost")

In [None]:
# Row-level enriched sales with profit & margin per transaction.
# Cached in memory — all 10 Gold tables aggregate from this view.
# NOT saved to Gold as-is (would be millions of rows in Power BI).
#
# FIX: Silver 'sales' table uses 'sales_date' (SalesDate normalized by normalize_columns),
#      NOT 'sale_date'. All date references below use s.sales_date.
sales_enriched_raw = spark.sql("""
    SELECT
        s.store,
        s.brand,
        s.description,
        s.size,
        s.classification,
        s.sales_quantity,
        s.sales_dollars,
        s.sales_price,
        s.sales_date,
        COALESCE(c.median_cost_per_unit, s.sales_price * 0.60) AS cost_per_unit,
        ROUND(
            s.sales_dollars
            - (COALESCE(c.median_cost_per_unit, s.sales_price * 0.60) * s.sales_quantity),
            2
        ) AS profit_dollars,
        ROUND(
            CASE
                WHEN s.sales_dollars = 0 OR s.sales_dollars IS NULL THEN NULL
                ELSE ((s.sales_dollars
                       - COALESCE(c.median_cost_per_unit, s.sales_price * 0.60) * s.sales_quantity)
                      / s.sales_dollars) * 100
            END,
            2
        ) AS margin_pct
    FROM sv_sales s
    LEFT JOIN cost_lookup c
        ON  CAST(s.brand AS STRING) = CAST(c.brand AS STRING)
        AND s.description            = c.description
""")

sales_enriched_raw = sales_enriched_raw.repartition(8).cache()
total_raw = sales_enriched_raw.count()  # trigger cache
sales_enriched_raw.createOrReplaceTempView("gold_se_raw")
print(f" gold_se_raw cached: {total_raw:,} rows (in-memory only, not saved)")

---
##  Gold Tables

### Table 1 — `gold.sales_enriched`
**Grain:** (brand, description, size, classification, store)  
**DAX measures:** Total Revenue, Total Profit, Total Units Sold, Total Volume

In [None]:
sales_enriched_df = spark.sql("""
    SELECT
        brand,
        description,
        size,
        classification,
        store,
        SUM(sales_quantity)                      AS sales_quantity,
        ROUND(SUM(sales_dollars), 2)             AS sales_dollars,
        ROUND(SUM(profit_dollars), 2)            AS profit_dollars,
        ROUND(
            SUM(sales_quantity) * CASE
                WHEN UPPER(size) LIKE '%1.75L%' OR UPPER(size) LIKE '%1750%' THEN 1.75
                WHEN UPPER(size) LIKE '%1L%'   OR UPPER(size) LIKE '%1000%' THEN 1.00
                WHEN UPPER(size) LIKE '%750%'                                THEN 0.75
                WHEN UPPER(size) LIKE '%500%'                                THEN 0.50
                WHEN UPPER(size) LIKE '%375%'                                THEN 0.375
                WHEN UPPER(size) LIKE '%200%'                                THEN 0.20
                WHEN UPPER(size) LIKE '%100%'                                THEN 0.10
                ELSE 0.75
            END,
        2) AS volume
    FROM gold_se_raw
    GROUP BY brand, description, size, classification, store
""")

sales_enriched_df = sales_enriched_df.repartition(4).cache()
sales_enriched_df.count()
write_gold(sales_enriched_df, "sales_enriched")

### Table 2 — `gold.product_profitability`
**Grain:** (brand, description, size, classification)  
**Visuals:** Top 10 Products by Profit, Top 10 by Margin, Scatter Plot, Brand slicer

In [None]:
product_profitability = spark.sql("""
    WITH base AS (
        SELECT
            brand,
            description,
            size,
            classification,
            SUM(sales_quantity)              AS total_units_sold,
            ROUND(SUM(sales_dollars), 2)     AS total_revenue,
            ROUND(SUM(profit_dollars), 2)    AS total_profit_dollars,
            ROUND(AVG(margin_pct), 2)        AS avg_margin_pct
        FROM gold_se_raw
        GROUP BY brand, description, size, classification
    )
    SELECT
        brand, description, size, classification,
        total_units_sold, total_revenue, total_profit_dollars, avg_margin_pct,
        CASE WHEN total_profit_dollars < 0 THEN true ELSE false END AS is_loss_maker,
        ROW_NUMBER() OVER (ORDER BY total_profit_dollars DESC)      AS rank_by_profit,
        ROW_NUMBER() OVER (ORDER BY avg_margin_pct DESC)            AS rank_by_margin
    FROM base
    WHERE total_units_sold > 0
""")

product_profitability = product_profitability.repartition(4).cache()
product_profitability.count()
write_gold(product_profitability, "product_profitability")

### Table 3 — `gold.brand_profitability`
**Grain:** brand  
**Visuals:** Top 10 Brands by Profit, Top 10 by Margin, Revenue vs Profit clustered bar

In [None]:
brand_profitability = spark.sql("""
    WITH base AS (
        SELECT
            brand,
            FIRST(classification)            AS classification,
            COUNT(DISTINCT description)      AS sku_count,
            SUM(sales_quantity)              AS total_units_sold,
            ROUND(SUM(sales_dollars), 2)     AS total_revenue,
            ROUND(SUM(profit_dollars), 2)    AS total_profit_dollars,
            ROUND(AVG(margin_pct), 2)        AS avg_margin_pct
        FROM gold_se_raw
        GROUP BY brand
    )
    SELECT
        brand, classification, sku_count,
        total_units_sold, total_revenue, total_profit_dollars, avg_margin_pct,
        CASE WHEN total_profit_dollars < 0 THEN true ELSE false END AS is_loss_maker,
        ROW_NUMBER() OVER (ORDER BY total_profit_dollars DESC)      AS rank_by_profit,
        ROW_NUMBER() OVER (ORDER BY avg_margin_pct DESC)            AS rank_by_margin
    FROM base
    WHERE total_units_sold > 0
""")

brand_profitability = brand_profitability.repartition(4).cache()
brand_profitability.count()
write_gold(brand_profitability, "brand_profitability")

### Table 4 — `gold.loss_makers`
**Grain:** (brand, description, size) where SUM(profit) < 0  
**Visual:** Loss-Making Items Table (Page 1) — all columns shown in table

In [None]:
loss_makers = spark.sql("""
    WITH sku_agg AS (
        SELECT
            'PRODUCT'                            AS level,
            brand,
            description,
            size,
            FIRST(classification)                AS classification,
            COUNT(DISTINCT store)                AS stores_stocking,
            SUM(sales_quantity)                  AS total_units_sold,
            ROUND(SUM(sales_dollars), 2)         AS total_revenue,
            ROUND(SUM(profit_dollars), 2)        AS total_profit_dollars,
            ROUND(AVG(margin_pct), 2)            AS avg_margin_pct
        FROM gold_se_raw
        GROUP BY brand, description, size
        HAVING SUM(profit_dollars) < 0
    )
    SELECT
        level, brand, description, size, classification,
        stores_stocking, total_units_sold, total_revenue,
        total_profit_dollars, avg_margin_pct,
        CASE
            WHEN avg_margin_pct < -10 THEN 'DROP — Severe loss, consider discontinuing'
            WHEN avg_margin_pct <   0 THEN 'REVIEW — Renegotiate cost or reprice'
            ELSE                           'MONITOR — Minor loss, watch trend'
        END AS recommendation
    FROM sku_agg
    ORDER BY total_profit_dollars ASC
""")

loss_makers = loss_makers.repartition(4).cache()
loss_makers.count()
write_gold(loss_makers, "loss_makers")

### Table 5 — `gold.sales_by_store`
**Grain:** store  
**Visual:** Map (bubble size=revenue, color=margin), Store slicer

In [None]:
sales_by_store = spark.sql("""
    SELECT
        store,
        COUNT(*)                             AS transaction_count,
        COUNT(DISTINCT brand)                AS unique_brands,
        COUNT(DISTINCT description)          AS unique_skus,
        SUM(sales_quantity)                  AS total_units_sold,
        ROUND(SUM(sales_dollars), 2)         AS total_revenue,
        ROUND(SUM(profit_dollars), 2)        AS total_profit_dollars,
        ROUND(AVG(margin_pct), 2)            AS avg_margin_pct
    FROM gold_se_raw
    GROUP BY store
    ORDER BY total_revenue DESC
""")

sales_by_store = sales_by_store.repartition(4).cache()
sales_by_store.count()
write_gold(sales_by_store, "sales_by_store")

### Table 6 — `gold.sales_time_series`
**Grain:** year + month (max ~12 rows)  
**Visual:** Monthly Trend combo chart (bars=revenue, line=profit, dashed=margin)

> **FIX:** Silver `sales` table has column `sales_date` (from `SalesDate` CSV header after normalize_columns).  
> All date functions below reference `sales_date` — aliased to `sale_year`/`sale_month` for Power BI.

In [None]:
sales_time_series = spark.sql("""
    SELECT
        YEAR(sales_date)                     AS sale_year,
        MONTH(sales_date)                    AS sale_month,
        DATE_FORMAT(sales_date, 'MMMM')      AS sale_month_name,
        ROUND(SUM(sales_dollars), 2)         AS monthly_revenue,
        ROUND(SUM(profit_dollars), 2)        AS monthly_profit,
        ROUND(AVG(margin_pct), 2)            AS avg_margin_pct,
        SUM(sales_quantity)                  AS total_units_sold,
        COUNT(DISTINCT brand)                AS active_brands,
        COUNT(DISTINCT store)                AS active_stores,
        COUNT(*)                             AS transaction_count
    FROM gold_se_raw
    WHERE sales_date IS NOT NULL
    GROUP BY YEAR(sales_date), MONTH(sales_date), DATE_FORMAT(sales_date, 'MMMM')
    ORDER BY sale_year, sale_month
""")

sales_time_series = sales_time_series.repartition(4).cache()
sales_time_series.count()
write_gold(sales_time_series, "sales_time_series")

### Table 7 — `gold.inventory_delta`
**Grain:** (brand, description, size) present in beg or end inventory  
**Visual:** Inventory Health Table (all columns displayed)

> Silver `beg_inventory`/`end_inventory` column is `price` (from `Price` CSV → normalize_columns)

In [None]:
inventory_delta = spark.sql("""
    WITH beg AS (
        SELECT brand, description, size,
               SUM(on_hand) AS beg_on_hand, AVG(price) AS beg_avg_cost
        FROM sv_beg_inv WHERE on_hand IS NOT NULL
        GROUP BY brand, description, size
    ),
    ends AS (
        SELECT brand, description, size,
               SUM(on_hand) AS end_on_hand, AVG(price) AS end_avg_cost
        FROM sv_end_inv WHERE on_hand IS NOT NULL
        GROUP BY brand, description, size
    ),
    vendor_map AS (
        SELECT brand, description, vendor_name,
               ROW_NUMBER() OVER (PARTITION BY brand, description ORDER BY brand) AS rn
        FROM sv_purchases WHERE vendor_name IS NOT NULL
    )
    SELECT
        COALESCE(b.brand, e.brand)              AS brand,
        COALESCE(b.description, e.description)  AS description,
        COALESCE(b.size, e.size)                AS size,
        COALESCE(vm.vendor_name, 'Unknown')     AS vendor_name,
        COALESCE(b.beg_on_hand, 0)              AS beg_on_hand,
        COALESCE(e.end_on_hand, 0)              AS end_on_hand,
        (COALESCE(e.end_on_hand, 0)
         - COALESCE(b.beg_on_hand, 0))          AS inventory_change,
        ROUND(
            (COALESCE(e.end_on_hand, 0) * COALESCE(e.end_avg_cost, 0))
          - (COALESCE(b.beg_on_hand, 0) * COALESCE(b.beg_avg_cost, 0)),
        2)                                      AS inventory_value_change,
        CASE
            WHEN COALESCE(e.end_on_hand, 0) = 0
             AND COALESCE(b.beg_on_hand, 0) > 0  THEN 'DEPLETED'
            WHEN COALESCE(e.end_on_hand, 0)
               > COALESCE(b.beg_on_hand, 0) * 1.2  THEN 'OVERSTOCKED'
            ELSE 'STABLE'
        END AS stock_status
    FROM beg b
    FULL OUTER JOIN ends e
        ON  CAST(b.brand AS STRING) = CAST(e.brand AS STRING)
        AND b.description           = e.description
        AND b.size                  = e.size
    LEFT JOIN vendor_map vm
        ON  CAST(COALESCE(b.brand, e.brand) AS STRING) = CAST(vm.brand AS STRING)
        AND COALESCE(b.description, e.description)     = vm.description
        AND vm.rn = 1
    WHERE COALESCE(b.beg_on_hand, 0) > 0
       OR COALESCE(e.end_on_hand, 0) > 0
""")

inventory_delta = inventory_delta.repartition(4).cache()
inventory_delta.count()
write_gold(inventory_delta, "inventory_delta")

### Table 8 — `gold.vendor_performance`
**Grain:** vendor_name  
**Visual:** Top 10 Vendors bar chart, Vendor slicer

In [None]:
vendor_performance = spark.sql("""
    SELECT
        vendor_name,
        COUNT(DISTINCT brand)                    AS brands_supplied,
        COUNT(DISTINCT description)              AS skus_supplied,
        ROUND(SUM(cost_per_unit * quantity), 2)  AS total_purchase_spend,
        ROUND(AVG(cost_per_unit), 2)             AS avg_cost_per_unit,
        ROUND(AVG(
            CASE
                WHEN receiving_date IS NOT NULL AND po_date IS NOT NULL
                THEN DATEDIFF(receiving_date, po_date)
                ELSE NULL
            END
        ), 1)                                    AS avg_lead_time_days,
        COUNT(*)                                 AS total_po_count
    FROM sv_purchases
    WHERE vendor_name IS NOT NULL
    GROUP BY vendor_name
    ORDER BY total_purchase_spend DESC
""")

vendor_performance = vendor_performance.repartition(4).cache()
vendor_performance.count()
write_gold(vendor_performance, "vendor_performance")

### Table 9 — `gold.size_analysis`
**Grain:** size  
**Visual:** Profit by Bottle Size column chart, Size slicer (Page 1)

In [None]:
size_analysis = spark.sql("""
    SELECT
        size,
        COUNT(DISTINCT description)          AS unique_skus,
        COUNT(DISTINCT brand)                AS unique_brands,
        SUM(sales_quantity)                  AS total_units_sold,
        ROUND(SUM(profit_dollars), 2)        AS total_profit_dollars,
        ROUND(AVG(margin_pct), 2)            AS avg_margin_pct,
        ROUND(AVG(sales_price), 2)           AS avg_selling_price
    FROM gold_se_raw
    WHERE size IS NOT NULL
    GROUP BY size
    ORDER BY total_profit_dollars DESC
""")

size_analysis = size_analysis.repartition(4).cache()
size_analysis.count()
write_gold(size_analysis, "size_analysis")

### Table 10 — `gold.classification_performance`
**Grain:** classification  
**Visual:** Treemap (size=revenue, color=margin), Classification slicer (Page 1)

In [None]:
classification_performance = spark.sql("""
    SELECT
        classification,
        COUNT(DISTINCT brand)                AS unique_brands,
        COUNT(DISTINCT description)          AS unique_skus,
        SUM(sales_quantity)                  AS total_units_sold,
        ROUND(SUM(sales_dollars), 2)         AS total_revenue,
        ROUND(SUM(profit_dollars), 2)        AS total_profit_dollars,
        ROUND(AVG(margin_pct), 2)            AS overall_margin_pct
    FROM gold_se_raw
    WHERE classification IS NOT NULL
    GROUP BY classification
    ORDER BY total_revenue DESC
""")

classification_performance = classification_performance.repartition(4).cache()
classification_performance.count()
write_gold(classification_performance, "classification_performance")

---
##  Summary

In [None]:
gold_tables = [
    "sales_enriched",
    "product_profitability",
    "brand_profitability",
    "loss_makers",
    "sales_by_store",
    "sales_time_series",
    "inventory_delta",
    "vendor_performance",
    "size_analysis",
    "classification_performance"
]

print("=" * 65)
print(" GOLD LAYER — FINAL SUMMARY")
print("=" * 65)
total_rows = 0
for t in gold_tables:
    try:
        df = spark.table(f"hive_metastore.annie_gold.{t}")
        n  = df.count()
        total_rows += n
        print(f"     {t:<35} → {n:>8,} rows | {len(df.columns)} cols")
    except Exception as ex:
        print(f"     {t:<35} → ERROR: {ex}")

print("-" * 65)
print(f"   Total rows across all Gold tables : {total_rows:,}")
print("=" * 65)
print(" Connect via: Databricks → Partner Connect → Power BI → SQL Warehouse")
print("   Schema: hive_metastore.annie_gold")