# Gold: Space Weather — Daily Aggregations

This notebook builds the **Gold** layer for the Space Weather pipeline by aggregating **Silver 30-minute windows** into **daily metrics** (UTC).

#### What this notebook produces
- **Gold Daily (Long)**: one row per **(date_utc, source, metric)** with daily aggregates + coverage/quality diagnostics.
- **Gold Daily (Wide, optional)**: one row per **(date_utc, region)** with flattened `<metric_key>_<measure>` columns for analytics/BI.

#### Key conventions
- **UTC is the only time basis** (session timezone forced to UTC).
- `date_utc` is derived from `window_start_utc` (date bucketing in UTC).
- Coverage and data-quality signals are computed per day (e.g., overlaps, gaps, midnight spanning).

## Libraries & runtime assumptions

This notebook runs on **Databricks/Spark** with **Delta Lake** enabled.

#### Runtime requirements
- A Spark session is available as `spark`
- Delta Lake is available (for `DeltaTable` merges/writes)
- Session timezone is set to **UTC** (required for correct `date_utc` grouping)

#### Timezone note
The notebook enforces:
- `spark.conf.set("spark.sql.session.timeZone", "UTC")`

This guarantees that `to_date(window_start_utc)` and all midnight-boundary checks behave consistently across clusters and sessions.


In [0]:
from typing import List

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window

from delta.tables import DeltaTable

spark.conf.set("spark.sql.session.timeZone", "UTC")


## Global configuration

This section centralizes all runtime settings and storage paths used throughout the notebook.

#### What lives here
- **Delta paths** for Silver inputs and Gold outputs
- **Pipeline constants** (e.g., expected window size, allowed ranges)
- **Runtime settings** that must be stable for correctness (notably **UTC timezone**)

#### Why it matters
Keeping configuration in one place makes the notebook:
- easier to review in PRs
- safer to run across dev/prod workspaces
- less error-prone when paths change

> Rule of thumb: if a value changes between environments, it belongs in `VAR` (or environment-specific config), not hard-coded inside logic.

In [0]:
def global_variables():
    tiers = ["bronze", "silver", "gold"]
    container = {tier: f"abfss://{tier}@alexccrv0dcn.dfs.core.windows.net" for tier in tiers}

    VAR = {
        "container": container,
        "silver_input": "/".join([container["silver"], "space_weather"]),
        "gold_output_long": "/".join([container["gold"], "space_weather_daily_long"]),
        "wide_measures": [
            "value_mean",
            "value_max",
            "value_min",
            "value_stddev",
            "count_window",
            "coverage_pct",
        ],
        "regions": {
            "CH": {
                "main":"JUNG1",
                "references": ["OULU","ROME"]
            }
        },
    }
    return VAR

VAR = global_variables()


## Helpers

Utility functions used across the notebook to keep the main logic readable and consistent.

#### What helpers cover
- **Delta table checks** (e.g., “path exists but is not a Delta table”)
- **Safe read/write patterns** for idempotent pipelines
- **Small reusable transforms** that are not business logic (formatting, validation, etc.)

#### Design principles
- Helpers should be **pure** where possible (same input → same output).
- Prefer **explicit errors** over silent failure for invalid table states.
- Keep business rules out of helpers (those belong in the transform functions).

In [0]:
def assert_delta_or_missing(path: str) -> None:
    """
    Assert that a storage path is either a Delta table, or does not exist.

    Args:
        path (str): Storage path to validate.

    Raises:
        ValueError: If the path exists but is not a Delta table.
    """

    # --- SETUP AND VALIDATION ---
    if not path or not path.strip():
        raise ValueError("path must be a non-empty string.")

    # --- LOGIC ---
    # If it's delta, we're good.
    try:
        if DeltaTable.isDeltaTable(spark, path):
            return
    except Exception:
        # If isDeltaTable itself fails (e.g., permissions / transient FS issues),
        # fall back to existence check below to decide what to do.
        pass

    # If path doesn't exist, we're good.
    try:
        listing = dbutils.fs.ls(path)
    except Exception:
        return

    # Path exists but is not delta.
    raise ValueError(f"Path exists but is not a Delta table: {path}")


def upsert_delta(df: DataFrame, path: str, keys: List[str]) -> None:
    ### HAS TO CHANGE TO ONLY APPEND INFO THAT DOES NOT ALREDY EXISTS
    """
    Upsert a DataFrame into a Delta Lake table at the given path.

    Args:
        df (DataFrame): Source DataFrame to upsert.
        path (str): Delta table path.
        keys (List[str]): Column names that define the merge key.

    Raises:
        ValueError: If `keys` is empty, or the path exists but is not a Delta table.
    """

    # --- SETUP AND VALIDATION ---
    assert_delta_or_missing(path)

    if not keys:
        raise ValueError("keys must be a non-empty list of merge key column names.")

    # --- LOGIC ---
    if DeltaTable.isDeltaTable(spark, path):
        delta = DeltaTable.forPath(spark, path)

        merge_condition = " AND ".join([f"t.`{k}` = s.`{k}`" for k in keys])

        (
            delta.alias("t")
            .merge(df.alias("s"), merge_condition)
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
        return

    (
        df.write.format("delta")
        .mode("overwrite")
        .save(path)
    )


## Gold transforms

This notebook builds Gold in two steps:

#### 1) Gold Daily (Long)
`build_gold_daily_long(silver_df)` aggregates 30-minute Silver windows into one row per:

- **Keys:** `date_utc, source, metric` (UTC)
- **Aggregates:** daily statistics derived from `value`
- **Coverage & diagnostics:** derived from `window_start_utc + window_duration_s`, including:
  - window coverage within the day
  - overlaps / gaps between windows
  - whether any window spans **past midnight UTC** (and how many seconds)

The long table is the canonical daily output and is the input to the wide build.

#### 2) Gold Daily (Wide, per region)
`build_gold_daily_wide_by_region(gold_long_df, region, nmdb_stations)` creates a single-row-per-day table for one region:

- **Keys:** `date_utc, region`
- **Metric selection:**
  - keeps **all non-NMDB** metrics (e.g., GFZ)
  - keeps **only selected NMDB stations** defined by:
    `{"main": "<CODE>", "references": ["<CODE1>", ...]}`
- **Quality filters (wide is “clean-only”):**
  - keeps only homogeneous days (`is_window_duration_homogeneous`)
  - drops days with overlaps (`has_overlap`)
  - drops days with midnight-spanning windows (`has_window_spanning_midnight`)
- **Stable NMDB aliases:**
  - main station → `metric = "main"`
  - reference stations → `metric = "reference1"`, `reference2`, ...

The wide table is optimized for analytics/BI and uses a normalized `metric_key` to produce `<metric_key>_<measure>` columns.

In [0]:
def build_gold_daily_long(silver_df: DataFrame) -> DataFrame:
    """
    Aggregate windowed Silver metrics into daily (UTC) Gold rows with coverage and
    interval-quality diagnostics.

    Computes daily stats (mean/max/min/stddev), coverage, window-duration metadata,
    within-day gap/overlap diagnostics, plus a cross-midnight spillover flag and the
    number of seconds a window extends past midnight.

    Notes:
        - `has_window_spanning_midnight` is true if any window for that day ends after
          00:00:00 of the next day (relative to its start date_utc).
        - `crossed_s` is the maximum spillover seconds among windows for that day.
          Example: 23:30 -> 00:15 yields crossed_s = 900.

    Args:
        silver_df (DataFrame): Input Spark DataFrame with columns:
            window_start_utc, source, metric, value, window_duration_s.

    Returns:
        DataFrame: Daily Gold DataFrame keyed by [date_utc, source, metric].
    """

    # --- SETUP AND VALIDATION ---
    day_s = 24 * 60 * 60
    keys = ["date_utc", "source", "metric"]

    base = (
        silver_df
            .select("window_start_utc", "source", "metric", "value", "window_duration_s")
            .filter(F.col("window_start_utc").isNotNull())
            .filter(F.col("window_duration_s").isNotNull())
            .withColumn("date_utc", F.to_date("window_start_utc"))
    )

    valid = base.filter(F.col("value").isNotNull())
    
    # --- LOGIC ---

    # Compute explicit interval end
    checks = valid.withColumn(
        "window_end_utc",
        F.to_timestamp(
            F.from_unixtime(
                F.col("window_start_utc").cast("long") + F.col("window_duration_s")
            )
        ),
    )

    # Cross-midnight spillover (how much the window extends past next midnight)
    checks = (
        checks
        .withColumn("next_midnight_utc",
            F.to_timestamp(
                F.concat_ws(
                        " ",
                        F.date_add(F.col("date_utc"), 1).cast("string"),
                        F.lit("00:00:00"),
                        )
                    )
                )
        .withColumn(
            "crossed_s_row",
            F.greatest(
                F.lit(0),
                F.col("window_end_utc").cast("long")
                - F.col("next_midnight_utc").cast("long"),
            ),
        )
        .withColumn("has_window_spanning_midnight_row", F.col("crossed_s_row") > 0)
    )

    # Within-day gap/overlap checks (per date/source/metric)
    w_day_order = Window.partitionBy(*keys).orderBy(F.col("window_start_utc").asc())
    w_day_prev = w_day_order.rowsBetween(Window.unboundedPreceding, -1)

    checks = (
        checks.withColumn("prev_max_end_utc_day", F.max("window_end_utc").over(w_day_prev))
        .withColumn(
            "has_overlap_row",
            F.col("prev_max_end_utc_day").isNotNull()
            & (F.col("window_start_utc") < F.col("prev_max_end_utc_day")),
        )
        .withColumn(
            "has_gap_row",
            F.col("prev_max_end_utc_day").isNotNull()
            & (F.col("window_start_utc") > F.col("prev_max_end_utc_day")),
        )
        .withColumn(
            "overlap_s",
            F.when(
                F.col("has_overlap_row"),
                F.col("prev_max_end_utc_day").cast("long") - F.col("window_start_utc").cast("long"),
            ),
        )
        .withColumn(
            "gap_s",
            F.when(
                F.col("has_gap_row"),
                F.col("window_start_utc").cast("long") - F.col("prev_max_end_utc_day").cast("long"),
            ),
        )
    )


    daily = (
        checks.groupBy(*keys)
        .agg(
            # --- Daily stats ---
            F.avg("value").alias("value_mean"),
            F.max("value").alias("value_max"),
            F.min("value").alias("value_min"),
            F.stddev_samp("value").alias("value_stddev"),
            # --- Counts / coverage ---
            F.count("*").alias("count_window"),
            F.sum("window_duration_s").alias("covered_window_s"),
            # --- Window metadata ---
            F.sort_array(F.collect_set("window_duration_s")).alias("window_duration_s_values"),
            # --- Interval diagnostics (within-day) ---
            F.max(F.col("has_overlap_row").cast("int")).cast("boolean").alias("has_overlap"),
            F.max(F.col("has_gap_row").cast("int")).cast("boolean").alias("has_gap"),
            F.max(F.coalesce(F.col("overlap_s"), F.lit(0))).alias("max_overlap_s"),
            F.max(F.coalesce(F.col("gap_s"), F.lit(0))).alias("max_gap_s"),
            # --- Cross-midnight spillover (what you asked for) ---
            F.max(F.col("has_window_spanning_midnight_row").cast("int"))
                .cast("boolean").alias("has_window_spanning_midnight"),
            F.max("crossed_s_row").cast("long").alias("crossed_s"),
        )
    )

    daily = (
        daily.withColumn("is_window_duration_homogeneous", F.size("window_duration_s_values") == 1)
        .withColumn(
            "window_duration_s",
            F.when(F.col("is_window_duration_homogeneous"), F.element_at("window_duration_s_values", 1)),
        )
        .withColumn("coverage_pct", F.col("covered_window_s") / F.lit(day_s))
    )

    return daily.select(
        # --- Keys ---
        *keys,
        # --- Daily stats ---
        "value_mean",
        "value_max",
        "value_min",
        "value_stddev",
        # --- Counts / coverage ---
        "count_window",
        "covered_window_s",
        "coverage_pct",
        # --- Window metadata ---
        "window_duration_s_values",
        "is_window_duration_homogeneous",
        "window_duration_s",
        # --- Interval diagnostics (within-day) ---
        "has_overlap",
        "has_gap",
        "max_overlap_s",
        "max_gap_s",
        # --- Cross-midnight spillover ---
        "has_window_spanning_midnight",
        "crossed_s",
    )

In [None]:
def add_metric_key(df: DataFrame) -> DataFrame:
    """
    Create a normalized join-friendly metric key from `source` and `metric`.

    The key is built as `lower(source) + "_" + lower(metric)`, then normalized by:
    - Replacing any non-alphanumeric characters with underscores.
    - Collapsing repeated underscores to a single underscore.
    - Stripping leading/trailing underscores.

    Args:
        df (DataFrame): Input Spark DataFrame containing `source` and `metric`
            columns.

    Returns:
        DataFrame: A new DataFrame with an additional `metric_key` column.
    """
    metric_key = F.concat_ws("_", F.lower(F.col("source")), F.lower(F.col("metric")))
    metric_key = F.regexp_replace(metric_key, r"[^a-z0-9]+", "_")
    metric_key = F.regexp_replace(metric_key, r"_+", "_")
    metric_key = F.regexp_replace(metric_key, r"^_+|_+$", "")
    return df.withColumn("metric_key", metric_key)

def build_gold_daily_wide_by_region(
    long_df: DataFrame, region: str, nmdb_stations: dict) -> DataFrame:
    """
    Build a wide daily table for one region using:
      - all non-NMDB metrics (e.g., GFZ), plus
      - NMDB metrics restricted to the provided station spec.

    The NMDB station codes are remapped to stable aliases:
      - nmdb_stations["main"] -> metric = "main"
      - nmdb_stations["references"][i] -> metric = f"reference{i+1}"

    Args:
        long_df (DataFrame): Long daily table keyed by [date_utc, source, metric].
        region (str): Region label to attach to the output.
        nmdb_stations (dict): Station spec:
            {"main": "<CODE>", "references": ["<CODE1>", "<CODE2>", ...]}.

    Returns:
        DataFrame: Wide daily table with date_utc, region, and <metric_key>_<measure> columns.
    """
    # --- SETUP / VALIDATION ---
    main_station = (nmdb_stations.get("main") or "").strip()
    if not main_station:
        raise ValueError('nmdb_stations["main"] must be a non-empty station code.')

    ref_stations = nmdb_stations.get("references") or []

    main_norm = main_station.upper()
    refs_norm = [
        (s or "").strip().upper()
        for s in ref_stations
        if s is not None and str(s).strip()
    ]
    station_codes = [main_norm] + refs_norm

    # --- FILTERS (QUALITY + NMDB SELECTION) ---
    filtered = (
        long_df
            .filter((F.col("source") != "NMDB") |
                    (F.upper(F.col("metric")).isin(station_codes)))
            .filter(F.col("is_window_duration_homogeneous"))
            .filter(~F.col("has_overlap"))
            .filter(~F.col("has_window_spanning_midnight"))
    )

    # --- NMDB REMAP: STATION_CODE -> alias ("main", "reference1", ...) ---
    kvs = [F.lit(main_norm), F.lit("main")]
    for i, code in enumerate(refs_norm, start=1):
        kvs.extend([F.lit(code), F.lit(f"reference{i}")])

    mapping_expr = F.create_map(*kvs)

    # Rename NMDB metrics to stable aliases before metric_key is created.
    # coalesce() is defensive: if mapping is missing for any reason, keep original metric.
    filtered = filtered.withColumn(
        "metric",
        F.when(
            F.col("source") == "NMDB",
            F.coalesce(
                mapping_expr[F.upper(F.col("metric"))],
                F.col("metric"),
            ),
        ).otherwise(F.col("metric")),
    )
    
    # --- METRIC KEYS ---
    df = add_metric_key(filtered)
    
    # --- SINGLE PIVOT USING STRUCT OF MEASURES ---
    measures = list(VAR["wide_measures"])
    payload = F.struct(*[F.col(m).alias(m) for m in measures])

    pivoted = (
        df.select("date_utc", "metric_key", payload.alias("payload"))
        .groupBy("date_utc")
        .pivot("metric_key")
        .agg(F.first("payload"))
    )

    # --- EXPAND STRUCT COLUMNS INTO <metric_key>_<measure> ---
    select_exprs = [F.col("date_utc")]
    for c in pivoted.columns:
        if c == "date_utc":
            continue
        # c is a metric_key column containing a struct(payload)
        for m in measures:
            select_exprs.append(F.col(f"`{c}`.{m}").alias(f"{c}_{m}"))

    wide = pivoted.select(*select_exprs).withColumn("region", F.lit(region))
    
    # --- RETURN ---
    base_cols = ["date_utc", "region"]
    data_cols = [c for c in wide.columns if c not in ("date_utc", "region")]
    return wide.select(*base_cols, *data_cols)

## Show Time

Execution flow:

1. **Load Silver**  
   Read the Silver Delta table from `VAR["silver_input"]`.

2. **Build Gold Daily (Long)**  
   Compute `gold_long_df = build_gold_daily_long(silver_df)`.

3. **Write / upsert Gold Long (Delta)**  
   Merge into `VAR["gold_output_long"]` using keys: `["date_utc", "source", "metric"]`.

4. **Build + write Gold Daily (Wide) per region**  
   For each `(region, stations)` in `VAR["regions"]`:
   - `gold_wide_df = build_gold_daily_wide_by_region(gold_long_df, region, stations)`
   - upsert into: `abfss://gold@.../space_weather_daily_<region>`  
     using keys: `["date_utc", "region"]`

Note: `upsert_delta()` performs a true upsert (matched keys are updated, non-matched keys are inserted).


In [0]:
silver_path = VAR["silver_input"]
gold_long_path = VAR["gold_output_long"]

silver_df = spark.read.format("delta").load(silver_path)

gold_long_df = build_gold_daily_long(silver_df)
upsert_delta(gold_long_df, gold_long_path, keys=["date_utc", "source", "metric"])

for region, stations in VAR["regions"].items():
    gold_wide_path = "/".join([VAR["container"]["gold"], f"space_weather_daily_{region}"])
    gold_wide_df = build_gold_daily_wide_by_region(gold_long_df, region, stations)
    upsert_delta(gold_wide_df, gold_wide_path, keys=["date_utc", "region"])
