# Gold Layer: Dimension Products
This notebook creates the final `dim_products` table by joining cleaned CRM data with ERP product categories.
- **Key Logic**: Creates a Surrogate Key (`product_key`) and joins master data with category metadata.
- **Data Enrichment**: Attaches maintenance flags and subcategory names to products.
- **Schema Alignment**: Ensures all product attributes (line, cost, name) are consolidated.
- **Output**: Delta table `workspace.gold.dim_products`.

In [0]:
%python
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# Define source and target paths
SILVER_CRM_PRD = "workspace.silver.crm_products"
SILVER_ERP_CAT = "workspace.silver.erp_product_category"
GOLD_TARGET    = "workspace.gold.dim_products"

In [0]:
%python
# 1. Integration: Joining Product Master data with Category Metadata
query = f"""
SELECT
    pn.product_id,
    pn.product_number,
    pn.product_name,
    pn.category_id,
    -- Using COALESCE to convert "null" to "Unknown" for better reporting
    COALESCE(pc.category, 'Unknown') as category,
    COALESCE(pc.subcategory, 'Unknown') as subcategory,
    -- Flag is False rather than null if there is no data
    COALESCE(pc.maintenance_flag, False) as maintenance_flag,
    pn.product_line,
    pn.product_cost,
    pn.start_date
FROM {SILVER_CRM_PRD} pn
LEFT JOIN {SILVER_ERP_CAT} pc
    ON pn.category_id = pc.category_id
"""

df_raw = spark.sql(query)

# 2. Cleansing: Sort by start_date descending to keep the latest record per product_number
df_clean = df_raw.orderBy(F.col("start_date").desc()) \
                 .dropDuplicates(["product_number"])

# 3. Surrogate Key Generation: Assigning a unique numeric key to each unique product
w = Window.orderBy("product_number")
df_gold = df_clean.withColumn("product_key", F.row_number().over(w)) \
                  .withColumn("gold_ingestion_ts", F.current_timestamp())

# 4. Final Load: Saving the results to the Gold layer
df_gold.write.mode("overwrite").format("delta").saveAsTable(GOLD_TARGET)

print(f"âœ… Gold table {GOLD_TARGET} successfully created with unique records.")
display(df_gold.limit(10))

## Data Quality
Verification of category joins and identification of missing source data.

In [0]:
%sql
-- Distribution check: How many products fall under 'Unknown'?
SELECT 
    category, 
    COUNT(*) as product_count 
FROM workspace.gold.dim_products 
GROUP BY ALL
ORDER BY product_count DESC;

In [0]:
%sql
-- Root Cause Analysis: Which specific IDs are missing from the ERP category table?
SELECT DISTINCT 
    category_id, 
    product_name 
FROM workspace.gold.dim_products 
WHERE category = 'Unknown';