In [0]:
"""
04_gold_analytics.py

Purpose:
- Aggregate Silver sensor measurements
- Produce daily, location-level and latest sensor snapshot air quality metrics
- Create analytics-ready Gold Delta table for python engineer analytics
- Create analytics-ready Gold Delta table for dashboard analytics

Input:
- air_quality_silver.sensor_measurements

Output:
- air_quality_gold.daily_air_quality
- air_quality_gold.latest_sensor_snapshot
"""

# Databricks notebooks do not automatically include the project root in PYTHONPATH.
# To enable imports from src/, each notebook adds the project root to sys.path at runtime
import sys, os
sys.path.append(os.path.abspath(".."))

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import (
    col,
    to_date,
    avg,
    count,
    countDistinct,
    when,
    max as spark_max
)

# Ensure Gold database exists
spark.sql("CREATE DATABASE IF NOT EXISTS air_quality_gold")

# Read Silver table
silver_df = spark.read.table("air_quality_silver.sensor_measurements")

# Create daily gold data frame for python analytics
# Gold consumes only physically plausible (quality_flag = 'OK') measurements
pm_ok_df = silver_df.filter(
    col("measurement_type").isin("P1", "P2") &
    (col("quality_flag") == "OK")
)

# Add date column to daily data frame
pm_ok_df = pm_ok_df.withColumn(
    "date",
    to_date(col("measurement_ts"))
)

# Aggregate to location level
gold_daily_df = (
    pm_ok_df.groupBy(
        "date",
        "location_id",
        "country",
        "latitude",
        "longitude"
    )
    .agg(
        avg(
            when(col("measurement_type") == "P1", col("measurement_value"))
        ).alias("pm10_avg"),

        avg(
            when(col("measurement_type") == "P2", col("measurement_value"))
        ).alias("pm25_avg"),

        count(
            when(col("measurement_type") == "P1", col("measurement_value"))
        ).alias("pm10_count"),

        count(
            when(col("measurement_type") == "P2", col("measurement_value"))
        ).alias("pm25_count"),

        countDistinct("sensor_id").alias("sensors"),
        count("*").alias("measurements") # Number of valid PM records contributing to this aggregate
    )
)

# Defensive filter for empty days to exclude pm10_avg = NULL and pm25_avg = NULL rows
gold_daily_df = gold_daily_df.filter(
    col("pm10_avg").isNotNull() | col("pm25_avg").isNotNull()
)

# Create latest snapshot gold data frame for dashboard analytics
# Gold consumes only physically plausible (quality_flag = 'OK') measurements
pm_snapshot_df = silver_df.filter(
    (col("measurement_type").isin("P1", "P2")) &
    (col("quality_flag") == "OK")
)

# Pivot PM values into columns for dashboard analytics
pm_pivot_df = (
    pm_snapshot_df
    .groupBy(
        "sensor_id",
        "sensor_type",
        "location_id",
        "country",
        "latitude",
        "longitude",
        "measurement_ts",
        "ingested_at"
    )
    .pivot("measurement_type", ["P1", "P2"])
    .agg(spark_max("measurement_value"))
    .withColumnRenamed("P1", "pm10")
    .withColumnRenamed("P2", "pm25")
    .filter(col("pm10").isNotNull() | col("pm25").isNotNull())
)

# Pick the latest row per sensor (snapshot step), even if two records share the same measurement_ts
window_spec = Window.partitionBy("sensor_id").orderBy(
    col("measurement_ts").desc(),
    col("ingested_at").desc()
)

latest_df = (
    pm_pivot_df
    .withColumn("rn", row_number().over(window_spec))
    .filter(col("rn") == 1)
    .drop("rn")
)

# Add date column to the latest snapshot data frame
latest_df = latest_df.withColumn(
    "date",
    to_date(col("measurement_ts"))
)

# Write Gold table for python sanity checks
GOLD_TABLE_DAILY = "air_quality_gold.daily_air_quality"

gold_daily_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(GOLD_TABLE_DAILY)

gold_rows_python = spark.read.table(
    "air_quality_gold.daily_air_quality"
).count()

# Sanity check 1: Daily data for python data analytics
display(
    spark.sql(
        f"""
        SELECT *
        FROM {GOLD_TABLE_DAILY}
        ORDER BY date DESC
        LIMIT 100
        """
    )
)

# Write Gold latest snaphot table for dashboard analytics
GOLD_TABLE_SNAPSHOT = "air_quality_gold.latest_sensor_snapshot"

latest_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(GOLD_TABLE_SNAPSHOT)

gold_rows_dashboard = spark.read.table(
    "air_quality_gold.latest_sensor_snapshot"
).count()

# Sanity check 2: Latest snapshot data for using in the dashboard analytics
display(
    spark.sql(
        f"""
        SELECT
            COUNT(*) AS sensors,
            AVG(pm25) AS avg_pm25,
            MAX(ingested_at) AS last_ingest
        FROM {GOLD_TABLE_SNAPSHOT}
        """
    )
)

# Display gold rows for python and dashboard from completed steps in the orchestration
dbutils.notebook.exit(
    f"Gold aggregation python sanity check completed: {gold_rows_python} records for python produced \nGold aggregation for the dashboard completed: {gold_rows_dashboard} records for the dashboard produced"
)



date,location_id,country,latitude,longitude,pm10_avg,pm25_avg,pm10_count,pm25_count,sensors,measurements
2026-02-03,462,DE,48.758,9.256,12.4,7.6,1,1,1,2
2026-02-03,46518,DE,48.842,8.926,39.335,23.325,2,2,1,4
2026-02-03,262,DE,50.684,13.02,22.17,12.665,2,2,1,4
2026-02-03,461,DE,51.436,7.312,18.45,9.3,1,1,1,2
2026-02-03,701,DE,52.526,13.408,18.135,7.025,2,2,1,4
2026-02-03,684,DE,48.342,10.096,43.0,22.36,2,2,1,4
2026-02-03,150,DE,52.352,14.058,22.485,14.11,2,2,1,4
2026-02-03,259,DE,53.304,9.816,8.865,6.515000000000001,2,2,1,4
2026-02-03,16074,BG,42.708,23.312,16.505000000000003,9.335,2,2,1,4
2026-02-03,460,DE,48.734,9.152,0.79,0.6100000000000001,2,2,1,4


sensors,avg_pm25,last_ingest
11637,14.2236581373309,2026-02-03T12:01:37.928Z
