In [0]:
# 03_Gold_Aggregation.py - Aggregates Silver data for BI and saves to Gold Layer

from pyspark.sql.functions import col, count, min

# Persistence Configuration (MUST MATCH 01_Bronze and 02_Silver)
CATALOG_NAME = "workspace" 
SCHEMA_NAME = "default"
SILVER_TABLE_NAME = "omdb_releases_silver"
GOLD_TABLE_NAME = "omdb_releases_gold"
FULL_SILVER_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME}.{SILVER_TABLE_NAME}"
FULL_GOLD_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME}.{GOLD_TABLE_NAME}"

print(f"--- 1. READING THE SILVER LAYER from: {FULL_SILVER_PATH} ---")

# --- 1. LOAD (READS SILVER) ---
# Reads the cleaned data that was persisted by 02_Silver_Transformation.py
try:
    df_silver = spark.read.table(FULL_SILVER_PATH)
except Exception as e:
    print(f"ERROR: Could not read Silver table '{FULL_SILVER_PATH}'.")
    print(f"Ensure 02_Silver_Transformation.py ran successfully. Error details: {e}")
    dbutils.notebook.exit("Failed to read Silver table.")


print(f"--- 2. AGGREGATING FOR GOLD (Business KPI) ---")

# --- 2. AGGREGATION (GOLD) ---
df_gold = df_silver.groupBy("release_year", "media_type") \
                   .agg(
                       # Business Metric (KPI): Total number of releases
                       count("*").alias("total_releases"),
                       
                       # Audit Metric: Track when this group was first ingested
                       min("ingestion_timestamp").alias("first_ingestion_date")
                   ) \
                   .orderBy(col("release_year").desc())

print(f"Gold DataFrame (Datamart) ready with {df_gold.count()} aggregated rows.")

# --- 3. PERSISTENCE (SAVES GOLD) ---
# Overwrite the Gold table with the final aggregated datamart
df_gold.write \
       .format("delta") \
       .mode("overwrite") \
       .option("overwriteSchema", "true") \
       .saveAsTable(FULL_GOLD_PATH)

print(f"Gold Table (Datamart) '{FULL_GOLD_PATH}' saved successfully.")
print("The pipeline is complete and the data is ready for BI (SQL Warehouse).")