In [0]:
import uuid
from pyspark.sql import functions as F

pipeline = "gold"
run_id = str(uuid.uuid4())

silver_table = "etl_demo.silver.customer_dim"

spark.sql("CREATE SCHEMA IF NOT EXISTS etl_demo.gold")
spark.sql("CREATE SCHEMA IF NOT EXISTS etl_demo.control")

# STARTED log (SQL-only to avoid type inference issues)
spark.sql(f"""
INSERT INTO etl_demo.control.run_log
SELECT
  '{pipeline}' AS pipeline,
  '{run_id}'   AS run_id,
  CAST(NULL AS DATE) AS snapshot_date,
  current_timestamp() AS started_at,
  CAST(NULL AS TIMESTAMP) AS finished_at,
  'STARTED' AS status,
  CAST(NULL AS BIGINT) AS rows_written,
  CAST(NULL AS STRING) AS message
""")

try:
    # 1) current view
    spark.sql(f"""
    CREATE OR REPLACE VIEW etl_demo.gold.customer_current AS
    SELECT customer_id, name, city, email, valid_from
    FROM {silver_table}
    WHERE is_current = true
    """)

    # 2) latest date
    s = spark.table(silver_table)
    latest_date = s.select(F.max("valid_from").alias("d")).collect()[0]["d"]

    # KPI table
    spark.sql("""
    CREATE TABLE IF NOT EXISTS etl_demo.gold.customer_daily_kpi (
      kpi_date         DATE,
      active_customers BIGINT,
      new_customers    BIGINT,
      changed_customers BIGINT,
      closed_records   BIGINT,
      run_time         TIMESTAMP
    )
    USING DELTA
    """)

    active_customers = s.filter(F.col("is_current") == True).count()

    first_dates = s.groupBy("customer_id").agg(F.min("valid_from").alias("first_valid_from"))
    new_customers = first_dates.filter(F.col("first_valid_from") == F.lit(latest_date)).count()

    changed_customers = (
        s.filter(F.col("valid_from") == F.lit(latest_date))
         .join(first_dates, on="customer_id", how="inner")
         .filter(F.col("first_valid_from") < F.lit(latest_date))
         .select("customer_id").distinct()
         .count()
    )

    closed_records = s.filter(F.col("valid_to") == F.lit(latest_date)).count()

    kpi_df = spark.createDataFrame(
        [(latest_date, active_customers, new_customers, changed_customers, closed_records)],
        ["kpi_date", "active_customers", "new_customers", "changed_customers", "closed_records"]
    ).withColumn("run_time", F.current_timestamp())

    kpi_df.createOrReplaceTempView("kpi_src")

    spark.sql("""
    MERGE INTO etl_demo.gold.customer_daily_kpi AS tgt
    USING kpi_src AS src
    ON tgt.kpi_date = src.kpi_date
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
    """)

    # SUCCESS update
    spark.sql(f"""
    UPDATE etl_demo.control.run_log
    SET
      snapshot_date = DATE('{latest_date}'),
      finished_at = current_timestamp(),
      status = 'SUCCESS',
      rows_written = 1,
      message = 'gold view + kpi upsert ok'
    WHERE pipeline = '{pipeline}' AND run_id = '{run_id}'
    """)

    print("âœ… GOLD done. Latest date:", latest_date)

except Exception as e:
    err = str(e).replace("'", "''")[:1500]

    spark.sql(f"""
    UPDATE etl_demo.control.run_log
    SET
      finished_at = current_timestamp(),
      status = 'FAILED',
      rows_written = 0,
      message = '{err}'
    WHERE pipeline = '{pipeline}' AND run_id = '{run_id}'
    """)

    raise
