# 🪙 Gold Layer: 
## Rolling Aggregations and more

**Purpose:**  
This notebook reads data from **Silver** tables, performs:
- Idempotent upserts into **Gold** tables  

It also updates the `pipeline_log` table for every execution — marking both **success** and **failure**.

## 🧰 Step 1: Setup and Imports


In [0]:
from datetime import datetime, timezone
from pyspark.sql import SparkSession, functions as F, types as T
from delta.tables import DeltaTable

spark = SparkSession.builder.getOrCreate()

## ⚙️ Step 2: Configuration — Table Names and Globals

In [0]:
catalog = "env_catalog"
schema = "env_data"

city_master_tbl = f"{catalog}.{schema}.city_master"
pipeline_log_tbl = f"{catalog}.{schema}.pipeline_log"
last_processed_tbl = f"{catalog}.{schema}.source_last_processed_ts"

# bronze_weather_hourly_tbl   = f"{catalog}.{schema}.bronze_weather_hourly"
# bronze_weather_daily_tbl    = f"{catalog}.{schema}.bronze_weather_daily"
# bronze_air_hourly_tbl       = f"{catalog}.{schema}.bronze_air_hourly"
# silver_weather_hourly_tbl = f"{catalog}.{schema}.silver_weather_hourly"
# silver_air_hourly_tbl     = f"{catalog}.{schema}.silver_air_hourly"

silver_weather_daily_tbl = f"{catalog}.{schema}.silver_weather_daily"
silver_air_daily_tbl      = f"{catalog}.{schema}.silver_air_daily"

gold_weather_daily_tbl = f"{catalog}.{schema}.gold_weather_daily"
gold_air_daily_tbl = f"{catalog}.{schema}.gold_air_daily"

In [0]:
pipeline_name = "AGGREGATE"
# TODO: Determine if Manual run or Scheduled run
triggered_by = "Manual"  # Can be Scheduled/Event driven if automated
run_type = "DAILY"
start_ts = datetime.now(timezone.utc)
run_id = globals().get("run_id", f"{run_type}_{pipeline_name.upper()}_{start_ts.strftime('%Y%m%d_%H%M%S')}")
status = "RUNNING"
remarks = "Gold Aggregation layer job started"

# Initial pipeline log entry
try:
    spark.sql(f"""
        INSERT INTO {pipeline_log_tbl} 
        (run_id, pipeline_name, run_type, start_time, status, triggered_by, remarks, created_ts)
        VALUES ('{run_id}', '{pipeline_name}', '{run_type}', TIMESTAMP '{start_ts}', '{status}', '{triggered_by}', '{remarks}', current_timestamp())
    """)
    print(f"🪶 Created pipeline_log entry for run_id={run_id}")
except Exception as e:
    print("⚠️ Could not log start in pipeline_log:", e)

🪶 Created pipeline_log entry for run_id=DAILY_AGGREGATE_20251113_095439


## 🧩 Step 3: Utility Functions

In [0]:
def table_exists(name: str) -> bool:
    """Robust check for table existence (works across environments)."""
    try:
        # prefer catalog metadata check
        if spark.catalog.tableExists(name):
            return True
    except Exception:
        pass
    # fallback: try to read metadata plan (no data read)
    try:
        spark.table(name).limit(0).count()
        return True
    except Exception:
        return False
    
def ensure_table(name: str, schema_sdf, partition_col=None):
    """
    Create a Delta table with the provided schema if it does not exist.
    Uses schema_sdf.limit(0) to create structure only.
    """
    if table_exists(name):
        return
    write_builder = schema_sdf.limit(0).write.format("delta") .mode("overwrite").option("overwriteSchema", "true")
    if partition_col:
        write_builder = write_builder.partitionBy(partition_col)
    write_builder.saveAsTable(name)
    print(f"Created table {name}")

def update_pipeline_log(status, remarks, records_processed=0, earliest_ts=None, latest_ts=None):
    """Update the pipeline_log for this run."""
    try:
        earliest_expr = f"TIMESTAMP '{earliest_ts}'" if earliest_ts else "NULL"
        latest_expr = f"TIMESTAMP '{latest_ts}'" if latest_ts else "NULL"
        spark.sql(f"""
            UPDATE {pipeline_log_tbl}
            SET end_time = current_timestamp(),
                status = '{status}',
                records_processed = COALESCE(records_processed, 0) + {records_processed},
                earliest_ts = COALESCE(earliest_ts, {earliest_expr}),
                latest_ts = {latest_expr},
                remarks = '{remarks}'
            WHERE run_id = '{run_id}'
        """)
        print(f"✅ pipeline_log updated: {status}")
    except Exception as e:
        print("⚠️ Could not update pipeline_log:", e)

## 🧼 Step 4: Data Cleansing and Data Enrichment Utilities

In [0]:
from pyspark.sql import functions as F

def get_rolling_agg(df, metric_col):
    """
    Compute rolling mean & max windows for metric_col, grouping the aggregation
    ONLY by (city, obsv_date), and return the original DataFrame columns
    augmented with the new aggregation columns and agg_ts.

    Input:
      df         : Spark DataFrame that must contain at least: city, obsv_date, and metric_col
      metric_col : string name of metric column (e.g. "aqi_value", "climate_index")

    Output:
      DataFrame with all original df columns + columns:
        mean_future7, max_future7,
        mean_past7,   max_past7,
        mean_past30,  max_past30,
        mean_past90,  max_past90,
        agg_ts
    Notes:
      - Aggregation windows are computed relative to the date part of obsv_date.
      - If source has multiple rows per (city, obsv_date) those will join back unchanged.
    """

    # keep original schema to re-attach later
    orig_cols = df.columns

    # prepare base: ensure needed fields and add obsv_date
    base = df.select(*orig_cols).withColumn("obsv_date", F.to_date("data_timestamp"))
    orig_ref = base 
    # aliases for windowing
    win = base.alias("win")
    base_alias = base.alias("base")

    def agg_window(cond, mean_name, max_name):
        """
        Aggregate mean & max for the given condition.
        Grouping is ONLY on city and obsv_date
        """
        return (
            base_alias.join(win, cond, how="left")
            .groupBy("base.city", "base.obsv_date")
            .agg(
                F.round(F.avg(F.col(f"win.{metric_col}")), 2).alias(mean_name),
                F.round(F.max(F.col(f"win.{metric_col}")), 2).alias(max_name)
            )
            .withColumnRenamed("city", "city")
            .withColumnRenamed("obsv_date", "obsv_date")
        )

    # Window conditions use obsv_date comparisons relative to the base row date
    cond_fut7 = (
        (F.col("win.city") == F.col("base.city")) &
        (F.col("win.obsv_date").between(F.col("base.obsv_date"), F.date_add(F.col("base.obsv_date"), 6)))
    )
    cond_past7 = (
        (F.col("win.city") == F.col("base.city")) &
        (F.col("win.obsv_date").between(F.date_add(F.col("base.obsv_date"), -7), F.date_add(F.col("base.obsv_date"), -1)))
    )
    cond_past30 = (
        (F.col("win.city") == F.col("base.city")) &
        (F.col("win.obsv_date").between(F.date_add(F.col("base.obsv_date"), -30), F.date_add(F.col("base.obsv_date"), -1)))
    )
    cond_past90 = (
        (F.col("win.city") == F.col("base.city")) &
        (F.col("win.obsv_date").between(F.date_add(F.col("base.obsv_date"), -90), F.date_add(F.col("base.obsv_date"), -1)))
    )

    # compute aggregated frames (grouped by city & obsv_date)
    fut7 = agg_window(cond_fut7, "mean_future7", "max_future7")
    p7   = agg_window(cond_past7, "mean_past7", "max_past7")
    p30  = agg_window(cond_past30, "mean_past30", "max_past30")
    p90  = agg_window(cond_past90, "mean_past90", "max_past90")

    # join all windows on city & obsv_date
    joined = (
        fut7.alias("f")
        .join(p7.alias("p7"), ["city","obsv_date"], "left")
        .join(p30.alias("p30"), ["city","obsv_date"], "left")
        .join(p90.alias("p90"), ["city","obsv_date"], "left")
    )

    # add agg_ts
    joined = joined.withColumn("agg_ts", F.current_timestamp())

    # join aggregated results back to original dataframe on city & obsv_date
    # keep all original columns and append agg columns
    agg_cols = [c for c in joined.columns if c not in ("city","obsv_date")]

    # build a tidy agg side that contains join keys + agg cols
    agg_df = joined.select("city", "obsv_date", *agg_cols)

    # explicit join, then explicitly select qualifed columns to avoid ambiguities
    result = (
        orig_ref.alias("orig")
        .join(agg_df.alias("agg"),
            on=[(F.col("orig.city") == F.col("agg.city")),
                (F.col("orig.obsv_date") == F.col("agg.obsv_date"))],
            how="left")
        .select(
            *[F.col(f"orig.{c}") for c in orig_cols],        # all original columns (qualified)
            *[F.col(f"agg.{c}")  for c in agg_cols]          # only new agg columns (qualified)
        )
    )
    return result
    
def aqi_category_and_color(df):
    """
    Adds AQI category and color columns to a DataFrame that already contains
    rolling mean/max AQI columns (e.g., mean_future7, max_past30, etc.)

    Columns added:
      - aqi_<mean/max_window>_cat
      - aqi_<mean/max_window>_color
    """

    def cat_expr(c):
        return (
            F.when(F.col(c).isNull(), None)
             .when(F.col(c) <= 50, "Good")
             .when(F.col(c) <= 100, "Moderate")
             .when(F.col(c) <= 150, "Unhealthy for Sensitive Groups")
             .when(F.col(c) <= 200, "Unhealthy")
             .when(F.col(c) <= 300, "Very Unhealthy")
             .otherwise("Hazardous")
        )

    def color_expr(c):
        return (
            F.when(F.col(c).isNull(), None)
             .when(F.col(c) <= 50, "Green")
             .when(F.col(c) <= 100, "Yellow")
             .when(F.col(c) <= 150, "Orange")
             .when(F.col(c) <= 200, "Red")
             .when(F.col(c) <= 300, "Purple")
             .otherwise("Maroon")
        )

    # Apply to all rolling mean & max columns
    mean_cols = ["mean_future7", "mean_past7", "mean_past30", "mean_past90"]
    max_cols  = ["max_future7", "max_past7", "max_past30", "max_past90"]

    for c in mean_cols:
        df = (
            df.withColumn(f"aqi_{c}_cat", cat_expr(c))
              .withColumn(f"aqi_{c}_color", color_expr(c))
        )

    for c in max_cols:
        df = (
            df.withColumn(f"aqi_{c}_cat", cat_expr(c))
              .withColumn(f"aqi_{c}_color", color_expr(c))
        )

    return df

def climate_category_and_color(df):
    """
    Adds Climate Index category and color columns to a DataFrame that already contains
    rolling mean/max climate index columns (e.g., mean_future7, max_past30, etc.)

    Columns added:
      - clim_<mean/max_window>_cat
      - clim_<mean/max_window>_color
    """

    def cat_expr(c):
        return (
            F.when(F.col(c).isNull(), None)
             .when(F.col(c) >= 81, "Excellent")
             .when(F.col(c) >= 61, "Good")
             .when(F.col(c) >= 41, "Moderate")
             .when(F.col(c) >= 21, "Fair")
             .otherwise("Poor")
        )

    def color_expr(c):
        return (
            F.when(F.col(c).isNull(), None)
             .when(F.col(c) >= 81, "Green")
             .when(F.col(c) >= 61, "LightGreen")
             .when(F.col(c) >= 41, "Yellow")
             .when(F.col(c) >= 21, "Orange")
             .otherwise("Red")
        )

    mean_cols = ["mean_future7", "mean_past7", "mean_past30", "mean_past90"]
    max_cols  = ["max_future7", "max_past7", "max_past30", "max_past90"]

    for c in mean_cols:
        df = (
            df.withColumn(f"clim_{c}_cat", cat_expr(c))
              .withColumn(f"clim_{c}_color", color_expr(c))
        )

    for c in max_cols:
        df = (
            df.withColumn(f"clim_{c}_cat", cat_expr(c))
              .withColumn(f"clim_{c}_color", color_expr(c))
        )

    return df

## ⚡ Step 5: Function to capture last load/transform timestamp from source table

In [0]:
def read_last_processed_load_ts(source_table: str):
    """Return last_processed_load_ts timestamp or None"""
    if not table_exists(last_processed_tbl):
        return None
    try:
        rows = spark.table(last_processed_tbl).filter(F.col("source_table") == source_table).select("last_processed_load_ts").limit(1).collect()
        return rows[0]["last_processed_load_ts"] if rows else None
    except Exception:
        return None

def commit_last_processed_load_ts(source_table: str, last_ts):
    """Upsert last_processed_load_ts into meta table (run after successful merge)"""
    if last_ts is None:
        return
    now_ts = datetime.now(timezone.utc)
    staging = spark.createDataFrame([(source_table, last_ts, now_ts)], schema="source_table STRING, last_processed_load_ts TIMESTAMP, updated_ts TIMESTAMP")
    if not table_exists(last_processed_tbl):
        staging.write.format("delta").mode("overwrite").saveAsTable(last_processed_tbl)
        return
    DeltaTable.forName(spark, last_processed_tbl).alias("t").merge(
        staging.alias("s"),
        "t.source_table = s.source_table"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    print(f"Updated last processed timestamp for {source_table}: {last_ts}")

## 🔄 Step 6: Merge Incremental Data into Silver Tables

In [0]:
# MERGE on (city, data_timestamp)
def merge_into_gold(target_table: str, staging_sdf, merge_keys=("city", "data_timestamp"), exclude_update_cols=None, partition_col=None):
    """
    Idempotent upsert via Delta MERGE
    Ensure target exists and MERGE the staging_sdf into it.
    Returns rows_processed, min_data_ts, max_data_ts, max_load_ts (if transform_ts present)
    """
    #print("Entered merge")
    if staging_sdf is None:
        print(f"Skipping {target_table} - no staging")
        return 0, None, None, None
    
    # Coerce data_timestamp/load_timestamp types if present
    if "data_timestamp" in staging_sdf.columns:
        staging_sdf = staging_sdf.withColumn("data_timestamp", F.to_timestamp("data_timestamp"))
    # if "load_timestamp" in staging_sdf.columns:
    #     staging_sdf = staging_sdf.withColumn("load_timestamp", F.to_timestamp("load_timestamp"))
    if "transform_ts" in staging_sdf.columns:
        staging_sdf = staging_sdf.withColumn("transform_ts", F.to_timestamp("transform_ts"))

    # print("Before table creation")
    # ensure target exists
    ensure_table(target_table, staging_sdf, partition_col=partition_col)
    
    # prepare temp view for staging
    view = f"temp_{target_table.split('.')[-1]}_{int(datetime.now().timestamp())}"
    staging_sdf.createOrReplaceTempView(view)

    cols = staging_sdf.columns
    exclude_update_cols = exclude_update_cols or []
    update_cols = [c for c in cols if c not in merge_keys + tuple(exclude_update_cols)]
    if not update_cols:
        raise ValueError("No columns to update in MERGE")

    on_clause = " AND ".join([f"t.`{k}` = s.`{k}`" for k in merge_keys])
    update_set = ", ".join([f"t.`{c}` = s.`{c}`" for c in update_cols])
    insert_cols = ", ".join([f"`{c}`" for c in cols])
    insert_vals = ", ".join([f"s.`{c}`" for c in cols])
    
    # print("Before merge")
    # spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
    # [CONFIG_NOT_AVAILABLE] Configuration spark.databricks.delta.schema.autoMerge.enabled is not available. See https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-configuration.html for details.

    # Work around for handling schema changes\
    staging_sdf.limit(0).write.format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable(target_table)

    merge_sql = f"""
    MERGE INTO {target_table} t
    USING {view} s
    ON {on_clause}
    WHEN MATCHED THEN UPDATE SET {update_set}
    WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_vals})
    """
    print(f"Executing MERGE into {target_table} ...")
    spark.sql(merge_sql)
    recs = staging_sdf.count()

    # compute data_timestamp and load_timestamp bounds
    min_data_ts = None
    max_data_ts = None
    max_load_ts = None
    if "data_timestamp" in cols:
        agg = staging_sdf.agg(F.min("data_timestamp").alias("min_dt"), F.max("data_timestamp").alias("max_dt")).collect()[0]
        min_data_ts, max_data_ts = agg["min_dt"], agg["max_dt"]
    if "transform_ts" in cols:
        agg2 = staging_sdf.agg(F.max("transform_ts").alias("max_lt")).collect()[0]
        max_load_ts = agg2["max_lt"]
    # if "load_timestamp" in cols:
    #     agg2 = staging_sdf.agg(F.max("load_timestamp").alias("max_lt")).collect()[0]
    #     max_load_ts = agg2["max_lt"]
    # elif "max_load_timestamp" in cols:
    #     agg2 = staging_sdf.agg(F.max("max_load_timestamp").alias("max_lt")).collect()[0]
    #     max_load_ts = agg2["max_lt"]
    
    print(f"MERGE completed for {target_table}: {recs} rows, data_ts range=({min_data_ts},{max_data_ts}), max_load_ts={max_load_ts}")
    return recs, min_data_ts, max_data_ts, max_load_ts

## 🌤️ Step 7: Create Gold layer Weather Daily Aggregations

In [0]:
total_processed = 0 
agg_min_data_ts = None
agg_max_data_ts = None

In [0]:
# Build Gold Daily aggregates (rolling Climate index)
try:
    src = silver_weather_daily_tbl
    if not table_exists(src):
        print("No daily weather table found in Silver layer; skipping daily aggregation.")
    else:
        last_load = read_last_processed_load_ts(src)
        print(f"Weather Daily: last processed transform_ts = {last_load}")

        # ensure climate_index exists and is DOUBLE
        # Retain only required columns - dim_key, city, data_timestamp, max_load_timestamp, transform_ts
        wdf = (
            spark.table(src)
                .select(
                    F.col("dim_key"),
                    F.col("city"),
                    F.col("data_timestamp"),
                    F.col("transform_ts"),
                    F.col("load_timestamp"),
                    F.col("climate_index").cast("double").alias("climate_index")
                )
        )
        # Drop rows with no climate_index
        wdf = wdf.filter(F.col("climate_index").isNotNull())

        if last_load:
            wdf = wdf.filter(F.col("transform_ts") > F.lit(last_load))
        
        # Retain only required columns - dim_key, city, data_timestamp, load_timestamp, transform_ts
        wdf_rolling = wdf.withColumn("data_timestamp", F.to_timestamp("data_timestamp")) \
            .withColumn("load_timestamp", F.to_timestamp("load_timestamp")) \
            .withColumn("transform_ts", F.to_timestamp("transform_ts"))
        # Add required derived columns
        wdf_rolling = wdf_rolling.withColumn("run_id", F.lit(run_id))
        
        # Add rolling values of aqi mean and max values for future 7 days, past 7 days, 30 days ans 90 days
        wdf_rolling = get_rolling_agg(wdf_rolling, metric_col="climate_index")
        wdf_rolling = climate_category_and_color(wdf_rolling)
        
        #print("Before merge function call")
        recs, min_dt, max_dt, max_lt = merge_into_gold(gold_weather_daily_tbl, wdf_rolling
                                                         , merge_keys=("city", "data_timestamp"))
        
        total_processed += recs
        if min_dt:
            agg_min_data_ts = min_dt if agg_min_data_ts is None else min(agg_min_data_ts, min_dt)
        if max_dt:
            agg_max_data_ts = max_dt if agg_max_data_ts is None else max(agg_max_data_ts, max_dt)

        # commit meta only after successful merge
        if max_lt:
            commit_last_processed_load_ts(src, max_lt)
        print("🌤️ Daily weather rolling aggregates updated.")
except Exception as e:
    print(f"❌ ERROR while building {gold_weather_daily_tbl}: {str(e)}")
    update_pipeline_log("FAILED", f"Weather daily rolling aggregation failed")

Weather Daily: last processed transform_ts = None
Executing MERGE into env_catalog.env_data.gold_weather_daily ...
MERGE completed for env_catalog.env_data.gold_weather_daily: 24 rows, data_ts range=(2025-01-01 00:00:00,2025-11-19 00:00:00), max_load_ts=2025-11-13 11:03:41.374148
Updated last processed timestamp for env_catalog.env_data.silver_weather_daily: 2025-11-13 11:03:41.374148
🌤️ Daily weather rolling aggregates updated.


## 🌤️ Step 8: Create Gold layer Air Daily Aggregations

In [0]:
# Build Gold Daily aggregates (rolling AQI)
try:
    src = silver_air_daily_tbl
    if not table_exists(src):
        print("No daily air table found in Silver layer; skipping daily aggregation.")
    else:
        last_load = read_last_processed_load_ts(src)
        print(f"Air Daily: last processed transform_ts = {last_load}")

        air = spark.table(src)
        # ensure aqi_value exists and is DOUBLE
        # Retain only required columns - dim_key, city, data_timestamp, max_load_timestamp, transform_ts
        air = (
            spark.table(src)
                .select(
                    F.col("dim_key"),
                    F.col("city"),
                    F.col("data_timestamp"),
                    F.col("transform_ts"),
                    F.col("max_load_timestamp").alias("load_timestamp"),
                    F.col("aqi_value").cast("double").alias("aqi_value")
                )
        )
        # Drop rows with no aqi_value
        air = air.filter(F.col("aqi_value").isNotNull())

        if last_load:
            air = air.filter(F.col("transform_ts") > F.lit(last_load))
        
        # Retain only required columns - dim_key, city, data_timestamp, load_timestamp, transform_ts
        air_rolling = air.withColumn("data_timestamp", F.to_timestamp("data_timestamp")) \
            .withColumn("load_timestamp", F.to_timestamp("load_timestamp")) \
            .withColumn("transform_ts", F.to_timestamp("transform_ts"))
        # Add required derived columns
        air_rolling = air_rolling.withColumn("run_id", F.lit(run_id))
        print("Step1")
        
        # Add rolling values of aqi mean and max values for future 7 days, past 7 days, 30 days ans 90 days
        air_rolling = get_rolling_agg(air_rolling, metric_col="aqi_value")
        print("Step2")
        
        air_rolling = aqi_category_and_color(air_rolling)
        print("Before merge function call")
        recs, min_dt, max_dt, max_lt = merge_into_gold(gold_air_daily_tbl, air_rolling
                                                         , merge_keys=("city", "data_timestamp"))
        print("After merge function call")
        total_processed += recs
        if min_dt:
            agg_min_data_ts = min_dt if agg_min_data_ts is None else min(agg_min_data_ts, min_dt)
        if max_dt:
            agg_max_data_ts = max_dt if agg_max_data_ts is None else max(agg_max_data_ts, max_dt)

        # commit meta only after successful merge
        if max_lt:
            commit_last_processed_load_ts(src, max_lt)
        print("🌤️ Daily air rolling aggregates updated.")
except Exception as e:
    print(f"❌ ERROR while building {gold_air_daily_tbl}: {str(e)}")
    update_pipeline_log("FAILED", f"Air daily rolling aggregation failed")

Air Daily: last processed transform_ts = None
Step1
Step2
Before merge function call
Executing MERGE into env_catalog.env_data.gold_air_daily ...
MERGE completed for env_catalog.env_data.gold_air_daily: 22 rows, data_ts range=(2025-01-01 00:00:00,2025-11-18 00:00:00), max_load_ts=2025-11-13 11:03:54.377870
After merge function call
Updated last processed timestamp for env_catalog.env_data.silver_air_daily: 2025-11-13 11:03:54.377870
🌤️ Daily air rolling aggregates updated.


## ✅ Step 9: Finalize and Log Success

In [0]:
try:
    update_pipeline_log("SUCCESS", "Gold layer completed successfully", total_processed, agg_min_data_ts, agg_max_data_ts)
    print(f"✅ Gold layer completed successfully. Rows processed: {total_processed}")
except Exception as e:
    print("⚠️ Final log update failed:", e)


✅ pipeline_log updated: SUCCESS
✅ Gold layer completed successfully. Rows processed: 46
