# Custom Flight-Weather Join Pipeline with Kriging Interpolation

**Author:** Daniel Costa  
**Project:** Flight Delay Prediction (W261 Final Project)  
**Dataset:** US Domestic Flights (2015-2019)

---

## Overview

This notebook implements a **custom spatial-temporal join** between flight data and weather station data using **Ordinary Kriging** for spatial interpolation. Unlike simple nearest-neighbor joins, this approach uses geostatistical methods to produce optimal weighted averages of weather measurements from multiple stations.

### Why Kriging?

Standard approaches join each flight to the nearest weather station. This is problematic because:
1. Weather stations may be far from airports
2. Single-station measurements are noisy
3. Nearest station may have missing data

**Kriging provides optimal interpolation** by:
- Weighting multiple nearby stations based on spatial correlation
- Accounting for measurement uncertainty
- Producing statistically optimal estimates with known variance

### Pipeline Architecture

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    KRIGING-BASED WEATHER INTERPOLATION                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌──────────────┐     ┌──────────────┐     ┌──────────────────────────────┐ │
│  │ Flight Data  │     │ Weather Data │     │ Station Coordinates          │ │
│  │ (with times) │     │ (hourly obs) │     │ (lat/long per station)       │ │
│  └──────┬───────┘     └──────┬───────┘     └──────────────┬───────────────┘ │
│         │                    │                            │                  │
│         ▼                    ▼                            ▼                  │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │                    SPATIAL WEIGHT COMPUTATION                            ││
│  │  1. Find all stations within spatial_radius (50 km) of each airport     ││
│  │  2. Compute Matérn covariance between station pairs                     ││
│  │  3. Solve Ordinary Kriging system for optimal weights                   ││
│  └─────────────────────────────────────────────────────────────────────────┘│
│                                    │                                         │
│                                    ▼                                         │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │                    TEMPORAL AGGREGATION                                  ││
│  │  For each flight, for each station:                                     ││
│  │  - Get weather readings from [dep_time - time_radius, dep_time - 2hr]   ││
│  │  - Weight by exponential decay: exp(-Δt / 5 hours)                      ││
│  │  - Aggregate: Σ(time_weight × measurement) / Σ(time_weight)             ││
│  └─────────────────────────────────────────────────────────────────────────┘│
│                                    │                                         │
│                                    ▼                                         │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │                    SPATIAL AGGREGATION                                   ││
│  │  For each flight:                                                        ││
│  │  - Combine station values using Kriging weights                         ││
│  │  - Final estimate: Σ(spatial_weight × station_value) / Σ(spatial_weight)││
│  └─────────────────────────────────────────────────────────────────────────┘│
│                                    │                                         │
│                                    ▼                                         │
│                         ┌──────────────────┐                                 │
│                         │  Joined Dataset  │                                 │
│                         │  (flight + weather)│                               │
│                         └──────────────────┘                                 │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
```

### Key Parameters

| Parameter | Value | Description |
|-----------|-------|-------------|
| `spatial_radius` | 50 km | Maximum distance to include weather stations |
| `time_radius` | 12 hours | Lookback window for weather observations |
| `time_dropoff` | 5 hours | Exponential decay constant for temporal weighting |
| `ν (nu)` | 1.5 | Matérn smoothness parameter |
| `a` | 10 km | Matérn range parameter |

---

## Table of Contents

1. [Setup & Configuration](#1-setup--configuration)
2. [Data Ingestion](#2-data-ingestion)
3. [Kriging Weight Computation](#3-kriging-weight-computation)
   - [3.1 Spatial Covariance (Matérn)](#31-spatial-covariance-matérn)
   - [3.2 Ordinary Kriging Solver](#32-ordinary-kriging-solver)
4. [Custom Join Pipeline](#4-custom-join-pipeline)
5. [Execution](#5-execution)
6. [Summary](#6-summary)

## 1. Setup & Configuration

In [None]:
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col, udf, desc, regexp_extract, expr,
    min as spark_min, max as spark_max, when
)
from pyspark.sql.types import (
    StructType, StructField, StringType, FloatType, DoubleType, TimestampType
)

from datetime import datetime
from timezonefinder import TimezoneFinder
import pytz
import math
import numpy as np
import pandas as pd
from scipy.linalg import cho_factor, cho_solve
from scipy.special import kv, gamma

# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
# Dataset paths
FLIGHT_DATASETS = {
    '3m': 'dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data_3m/',
    '1y': 'dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data_1y/',
    '5y': 'dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/'
}

WEATHER_DATASETS = {
    '3m': 'dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data_3m/',
    '1y': 'dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data_1y/',
    '5y': 'dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data/'
}

AIRPORT_CODES_PATH = "dbfs:/mnt/mids-w261/airport-codes_csv.csv"
OUTPUT_BASE_PATH = "dbfs:/student-groups/Group_2_2"

# Kriging parameters
SPATIAL_RADIUS = 50      # km - max distance to include weather stations
TIME_RADIUS = 12         # hours - lookback window for weather
TIME_DROPOFF = 5         # hours - exponential decay constant
NU_S = 1.5               # Matérn smoothness (1.5 = moderate smoothness)
A_S = 10.0               # Matérn range parameter (km)
SIGMA2_S = 1.0           # Matérn variance (assumes standardized data)
EPSILON = 1e-10          # Numerical stability constant

# Reference time for timezone calculations
REF_TIME = "2000-01-05 00:00:00"

# Earth's radius for distance calculations
EARTH_RADIUS_M = 6_371_008.8

## 2. Data Ingestion

In [None]:
# -----------------------------------------------------------------------------
# Timezone Conversion Helper
# -----------------------------------------------------------------------------

def _get_timezone_finder():
    """Lazy singleton for TimezoneFinder (one per executor)."""
    if not hasattr(_get_timezone_finder, "instance"):
        _get_timezone_finder.instance = TimezoneFinder()
    return _get_timezone_finder.instance


def get_gmt_offset(local_time_str: str, lat: float, long: float) -> float:
    """
    Get GMT offset in hours for a location at a given time.
    
    Args:
        local_time_str: Local time as string "YYYY-MM-DD HH:MM:SS"
        lat: Latitude in degrees
        long: Longitude in degrees
        
    Returns:
        Offset from GMT in hours (e.g., -8.0 for PST)
    """
    tf = _get_timezone_finder()
    tz_name = tf.timezone_at(lat=lat, lng=long)
    if tz_name is None:
        return None
    
    tz = pytz.timezone(tz_name)
    local_time = datetime.strptime(local_time_str, "%Y-%m-%d %H:%M:%S")
    local_dt = tz.localize(local_time)
    
    return float(local_dt.utcoffset().total_seconds() / 3600.0)


# Register as Spark UDF
get_gmt_offset_udf = udf(get_gmt_offset, FloatType())

In [None]:
# -----------------------------------------------------------------------------
# Data Ingestion Functions
# -----------------------------------------------------------------------------

def ingest_flight_data(parquet_path: str):
    """
    Load and preprocess flight data.
    
    Creates unique flight identifier and parses departure timestamps.
    
    Args:
        parquet_path: Path to flight parquet files
        
    Returns:
        Tuple of (df_flight_times, df_flights)
    """
    print(f"Loading flight data from {parquet_path}...")
    
    df_flights = spark.read.parquet(parquet_path).dropDuplicates()
    
    # Create unique flight identifier
    df_flights = df_flights.withColumn(
        "flight_uid",
        F.concat(
            F.col("ORIGIN"), F.lit("-"),
            F.col("FL_DATE"), F.lit("-"),
            F.coalesce(F.col("OP_CARRIER_AIRLINE_ID").cast("string"), 
                       F.floor(F.rand() * 100000 + 1).cast("string")), F.lit("-"),
            F.coalesce(F.col("OP_CARRIER_FL_NUM").cast("string"),
                       F.floor(F.rand() * 100000 + 1).cast("string")), F.lit("-"),
            F.coalesce(F.col("TAIL_NUM"),
                       F.floor(F.rand() * 100000 + 1).cast("string")), F.lit("-"),
            F.col("CRS_DEP_TIME")
        )
    )
    
    # Extract time components for timestamp construction
    df_flight_times = (
        df_flights
        .select("flight_uid", "YEAR", "MONTH", "DAY_OF_MONTH", "ORIGIN", "DEST",
                "CRS_DEP_TIME", "DEP_TIME", "DEP_DELAY", "CRS_ARR_TIME", "ARR_TIME", "ARR_DELAY")
        .withColumn("dep_hour", F.lpad((F.floor(F.col("CRS_DEP_TIME") / 100)).cast("string"), 2, "0"))
        .withColumn("month_str", F.lpad(F.col("MONTH").cast("string"), 2, "0"))
        .withColumn("day_str", F.lpad(F.col("DAY_OF_MONTH").cast("string"), 2, "0"))
        .withColumn(
            "dep_timestamp",
            F.to_timestamp(
                F.concat(
                    F.col("YEAR"), F.lit("-"), F.col("month_str"), F.lit("-"), F.col("day_str"),
                    F.lit("T"), F.col("dep_hour"), F.lit(":"),
                    F.lpad((F.col("CRS_DEP_TIME") % 100).cast("string"), 2, "0"), F.lit(":00")
                )
            )
        )
        .dropna()
    )
    
    print(f"  Loaded {df_flights.count():,} flights")
    return df_flight_times, df_flights


def ingest_airport_data(df_flight_times):
    """
    Load airport coordinates and compute timezone offsets.
    
    Args:
        df_flight_times: Flight DataFrame to get valid airport codes
        
    Returns:
        DataFrame with airport codes, coordinates, and GMT offsets
    """
    print("Loading airport data...")
    
    df_airports = (
        spark.read.csv(AIRPORT_CODES_PATH, header=True, inferSchema=True)
        .withColumn("long", F.split(F.col("coordinates"), ",")[0].cast("float"))
        .withColumn("lat", F.split(F.col("coordinates"), ",")[1].cast("float"))
        .select("iata_code", "long", "lat")
        .dropna()
    )
    
    # Add timezone offset
    df_airports = df_airports.withColumn(
        "gmt_offset_hours",
        get_gmt_offset_udf(F.lit(REF_TIME), F.col("lat"), F.col("long"))
    )
    
    # Filter to airports in our flight data
    valid_codes = set(row["ORIGIN"] for row in df_flight_times.select("ORIGIN").distinct().collect())
    df_airports_us = df_airports.filter(df_airports.iata_code.isin(valid_codes))
    
    print(f"  Found {df_airports_us.count()} airports")
    return df_airports_us


def ingest_weather_data(parquet_path: str):
    """
    Load and preprocess weather data.
    
    Cleans numeric columns, extracts cloud coverage, and computes wind components.
    
    Args:
        parquet_path: Path to weather parquet files
        
    Returns:
        Weather DataFrame with cleaned features
    """
    print(f"Loading weather data from {parquet_path}...")
    
    df_weather = spark.read.parquet(parquet_path)
    
    # Clean numeric columns (remove special characters)
    exclude_cols = ["NAME", "REPORT_TYPE", "HourlySkyConditions", "STATION", "DATE"]
    cols_to_clean = [c for c in df_weather.columns if c not in exclude_cols]
    
    for c in cols_to_clean:
        df_weather = df_weather.withColumn(
            c,
            F.when(F.col(c).contains("*"), None)
            .otherwise(F.regexp_replace(F.col(c), r"[^0-9.\-]", ""))
            .cast("double")
        )
    
    # Extract cloud coverage (oktas) and elevation from sky conditions
    df_weather = (
        df_weather
        .withColumn("oktas", regexp_extract(col("HourlySkyConditions"), r".*([A-Z]{3}:)\s*([0-9]*)", 2).cast("int"))
        .withColumn("elev_hundreds_ft", regexp_extract(col("HourlySkyConditions"), r".*([A-Z]{3}:)\s*([0-9]*)\s*([0-9]*)", 3).cast("int"))
    )
    
    # Decompose wind into north/east components
    df_weather = (
        df_weather
        .withColumn("from_east", -1 * col("HourlyWindSpeed") * F.sin(col("HourlyWindDirection") * F.pi() / 180))
        .withColumn("from_north", -1 * col("HourlyWindSpeed") * F.cos(col("HourlyWindDirection") * F.pi() / 180))
        .withColumn("DATE", col("DATE").cast("timestamp"))
    )
    
    print(f"  Loaded {df_weather.count():,} weather observations")
    return df_weather


def ingest_station_data(df_weather):
    """
    Extract unique weather station locations.
    
    Args:
        df_weather: Weather DataFrame with station info
        
    Returns:
        DataFrame with station codes, coordinates, and GMT offsets
    """
    print("Extracting station locations...")
    
    df_stations = (
        df_weather
        .groupBy("STATION")
        .agg(
            F.first(F.col("LONGITUDE")).alias("long"),
            F.first(F.col("LATITUDE")).alias("lat")
        )
        .dropna()
        .withColumn(
            "gmt_offset_hours",
            get_gmt_offset_udf(F.lit(REF_TIME), F.col("lat"), F.col("long"))
        )
    )
    
    print(f"  Found {df_stations.count()} weather stations")
    return df_stations

## 3. Kriging Weight Computation

### 3.1 Spatial Covariance (Matérn)

The Matérn covariance function models spatial correlation between weather stations. It's parameterized by:
- **ν (nu)**: Smoothness parameter (1.5 gives moderate smoothness)
- **a**: Range parameter (correlation decays to ~0.14 at distance a)
- **σ²**: Variance parameter

In [None]:
# -----------------------------------------------------------------------------
# Distance and Covariance Functions
# -----------------------------------------------------------------------------

def haversine_distance(lat1, lon1, lat2, lon2):
    """
    Compute great-circle distance between points using Haversine formula.
    
    Args:
        lat1, lon1: First point coordinates (degrees)
        lat2, lon2: Second point coordinates (degrees)
        
    Returns:
        Distance in kilometers
    """
    lat1, lon1 = np.radians(np.asarray(lat1)), np.radians(np.asarray(lon1))
    lat2, lon2 = np.radians(np.asarray(lat2)), np.radians(np.asarray(lon2))
    
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    return (2.0 * EARTH_RADIUS_M * np.arcsin(np.minimum(1.0, np.sqrt(a)))) / 1000


def matern_covariance(h: float) -> float:
    """
    Compute Matérn covariance for distance h.
    
    C(h) = σ² * (2^(1-ν)/Γ(ν)) * (h/a)^ν * K_ν(h/a)
    
    Args:
        h: Distance in kilometers
        
    Returns:
        Covariance value
    """
    h = max(h, EPSILON)
    val = (2**(1-NU_S)) / gamma(NU_S) * (h/A_S)**NU_S * kv(NU_S, h/A_S)
    return float(SIGMA2_S * val) if not math.isnan(val) else float(SIGMA2_S)


# Covariance at distance 0 (equals variance)
K_0 = matern_covariance(0)

### 3.2 Ordinary Kriging Solver

Ordinary Kriging finds optimal weights that:
1. Are unbiased (weights sum to 1)
2. Minimize estimation variance

The system solves: **K·w = k** subject to **Σw = 1**

In [None]:
# -----------------------------------------------------------------------------
# Ordinary Kriging Solver
# -----------------------------------------------------------------------------

def find_nearest_stations(airport_points, station_points, top_n=100):
    """
    Find nearest weather stations for each airport.
    
    Args:
        airport_points: Array of [long, lat, code] for airports
        station_points: Array of [long, lat, code] for stations
        top_n: Maximum stations to return per airport
        
    Returns:
        List of (airport_code, [(distance, station_code), ...])
    """
    results = []
    airport_codes = airport_points[:, -1]
    airport_coords = airport_points[:, :-1].astype(float)
    station_codes = station_points[:, -1]
    station_coords = station_points[:, :-1].astype(float)
    
    for i, (lon, lat) in enumerate(airport_coords):
        distances = haversine_distance(lat, lon, station_coords[:, 1], station_coords[:, 0])
        idx = np.argsort(distances)[:top_n]
        results.append((airport_codes[i], list(zip(distances[idx], station_codes[idx]))))
    
    return results


def build_neighbor_dataframe(df_stations, df_airports):
    """
    Build DataFrame of airport-station distances.
    
    Args:
        df_stations: Station DataFrame with coordinates
        df_airports: Airport DataFrame with coordinates
        
    Returns:
        Spark DataFrame with (airport_code, station_code, distance)
    """
    print("Building airport-station neighbor relationships...")
    
    station_points = np.array(df_stations.select("long", "lat", "STATION").collect())
    airport_points = np.array(df_airports.select("long", "lat", "iata_code").collect())
    
    nearest = find_nearest_stations(airport_points, station_points)
    
    data = []
    for airport_code, distances in nearest:
        for dist, station_code in distances:
            data.append([airport_code, station_code, float(dist)])
    
    schema = StructType([
        StructField("airport_code", StringType(), True),
        StructField("station_code", StringType(), True),
        StructField("distance", FloatType(), True)
    ])
    
    return spark.createDataFrame(pd.DataFrame(data, columns=["airport_code", "station_code", "distance"]), schema)


def solve_kriging_weights(cov_df, nugget_rel=1e-10):
    """
    Solve Ordinary Kriging system for optimal station weights.
    
    Args:
        cov_df: DataFrame with station covariances for one airport
        nugget_rel: Nugget effect for numerical stability
        
    Returns:
        Tuple of (weights, lagrange_multiplier, variance, station_codes)
    """
    station_coords = cov_df[["lat_station", "long_station"]].values
    station_codes = cov_df["station_code"].values
    n = len(station_coords)
    
    # Build station-station covariance matrix K_xx
    K_xx = np.empty((n, n), dtype=float)
    for i in range(n):
        K_xx[i, i] = K_0
        for j in range(i+1, n):
            d = haversine_distance(station_coords[i, 0], station_coords[i, 1],
                                   station_coords[j, 0], station_coords[j, 1])
            cij = matern_covariance(d)
            K_xx[i, j] = K_xx[j, i] = cij
    
    # Airport-station covariances (already computed)
    K_x = cov_df["C_space"].to_numpy(dtype=float)
    
    # Add nugget for numerical stability
    np.fill_diagonal(K_xx, np.diag(K_xx) + nugget_rel * K_0)
    
    # Solve using Cholesky decomposition
    ones = np.ones(n, dtype=float)
    c_fac = cho_factor(K_xx, check_finite=False)
    alpha = cho_solve(c_fac, ones, check_finite=False)
    beta = cho_solve(c_fac, K_x, check_finite=False)
    
    # Compute weights with Lagrange multiplier for unbiasedness
    denom = ones @ alpha
    lam = (1.0 - ones @ beta) / denom
    w = beta + lam * alpha
    
    # Kriging variance
    var = max(0.0, K_0 - w @ K_x - lam)
    
    return w, lam, float(var), station_codes


def compute_spatial_weights(neighbor_df, df_stations, df_airports, spatial_radius, time_radius):
    """
    Compute Kriging weights for all airport-station pairs.
    
    Args:
        neighbor_df: DataFrame of airport-station distances
        df_stations: Station DataFrame
        df_airports: Airport DataFrame
        spatial_radius: Maximum distance to include
        time_radius: Time radius for weight filtering
        
    Returns:
        Spark DataFrame with station weights per airport
    """
    print(f"Computing Kriging weights (spatial_radius={spatial_radius}km)...")
    
    # Filter to stations within radius and compute covariances
    spatial_cov = neighbor_df.toPandas()
    spatial_cov = spatial_cov[spatial_cov["distance"] < spatial_radius].copy()
    spatial_cov["C_space"] = spatial_cov["distance"].apply(matern_covariance)
    
    # Join with coordinates
    stations_pd = df_stations.toPandas()
    airports_pd = df_airports.toPandas()
    
    spatial_cov = (
        spatial_cov
        .merge(stations_pd, left_on="station_code", right_on="STATION")
        .rename(columns={"lat": "lat_station", "long": "long_station"})
        .drop(columns=["STATION"])
        .merge(airports_pd, left_on="airport_code", right_on="iata_code")
        .rename(columns={"lat": "lat_airport", "long": "long_airport"})
        .drop(columns=["iata_code"])
    )
    
    # Solve Kriging for each airport
    results = []
    for airport_code, group in spatial_cov.groupby("airport_code"):
        w, lam, var, station_codes = solve_kriging_weights(group)
        for weight, station in zip(w, station_codes):
            results.append([airport_code, station, weight, lam, var])
    
    weight_df = pd.DataFrame(results, columns=["airport_code", "station_code", "weight", "lambda", "variance"])
    
    # Filter small weights (won't contribute meaningfully)
    min_weight = 0.001 / np.exp(-time_radius / TIME_DROPOFF)
    weight_df = weight_df[abs(weight_df["weight"]) > min_weight]
    
    # Merge back with spatial info
    weight_df = spatial_cov.merge(weight_df, on=["airport_code", "station_code"], how="inner")
    
    print(f"  Computed weights for {weight_df['airport_code'].nunique()} airports")
    return spark.createDataFrame(weight_df)

## 4. Custom Join Pipeline

In [None]:
# -----------------------------------------------------------------------------
# Custom Join Pipeline
# -----------------------------------------------------------------------------

def add_gmt_timestamp(df, new_col, timestamp_col):
    """Convert local timestamp to GMT."""
    return df.withColumn(
        new_col,
        (col(timestamp_col).cast("long") - (expr("gmt_offset_hours") * 3600)).cast("timestamp")
    )


def custom_weather_join(df_flight_times, df_neighbor_weights, df_weather, df_flights, time_radius):
    """
    Perform spatial-temporal join of flights with weather using Kriging weights.
    
    The join:
    1. For each flight, finds all weather readings within time_radius hours
    2. Weights readings by temporal proximity (exponential decay)
    3. Aggregates across time for each station
    4. Combines stations using pre-computed Kriging weights
    
    Args:
        df_flight_times: Flight times DataFrame
        df_neighbor_weights: Kriging weights DataFrame
        df_weather: Weather observations DataFrame
        df_flights: Original flights DataFrame
        time_radius: Hours of weather lookback
        
    Returns:
        Joined DataFrame with interpolated weather features
    """
    print("Performing custom spatial-temporal join...")
    
    # Register temp views for SQL
    df_flight_times.createOrReplaceTempView("flights")
    df_neighbor_weights.createOrReplaceTempView("weights")
    df_weather.createOrReplaceTempView("weather")
    
    # SQL query for weighted aggregation
    result_df = spark.sql(f"""
    WITH flight_readings AS (
        -- Get all weather readings for each flight within time window
        SELECT 
            f.flight_uid,
            w.*,
            EXP((CAST(f.dep_timestamp_gmt AS double) - CAST(w.DATE_gmt AS double)) / (-3600.0 * {TIME_DROPOFF})) AS t_weight,
            wt.weight AS s_weight
        FROM flights f
        JOIN weights wt ON f.ORIGIN = wt.airport_code
        JOIN weather w ON wt.station_code = w.STATION
            AND w.DATE_gmt <= f.dep_timestamp_gmt - INTERVAL 2 HOURS
            AND w.DATE_gmt >= f.dep_timestamp_gmt - INTERVAL {time_radius} HOURS
    ),
    station_agg AS (
        -- Aggregate over time for each station
        SELECT
            flight_uid, STATION, FIRST(s_weight) AS s_weight,
            SUM(t_weight * HourlyDryBulbTemperature) / NULLIF(SUM(CASE WHEN HourlyDryBulbTemperature IS NULL THEN 0 ELSE t_weight END), 0) AS temp,
            SUM(t_weight * HourlyDewPointTemperature) / NULLIF(SUM(CASE WHEN HourlyDewPointTemperature IS NULL THEN 0 ELSE t_weight END), 0) AS dew_temp,
            SUM(t_weight * HourlyRelativeHumidity) / NULLIF(SUM(CASE WHEN HourlyRelativeHumidity IS NULL THEN 0 ELSE t_weight END), 0) AS humidity,
            SUM(t_weight * HourlyAltimeterSetting) / NULLIF(SUM(CASE WHEN HourlyAltimeterSetting IS NULL THEN 0 ELSE t_weight END), 0) AS altimeter,
            SUM(t_weight * HourlyVisibility) / NULLIF(SUM(CASE WHEN HourlyVisibility IS NULL THEN 0 ELSE t_weight END), 0) AS visibility,
            SUM(t_weight * HourlyStationPressure) / NULLIF(SUM(CASE WHEN HourlyStationPressure IS NULL THEN 0 ELSE t_weight END), 0) AS pressure,
            SUM(t_weight * HourlyWetBulbTemperature) / NULLIF(SUM(CASE WHEN HourlyWetBulbTemperature IS NULL THEN 0 ELSE t_weight END), 0) AS wet_temp,
            SUM(t_weight * HourlyPrecipitation) / NULLIF(SUM(CASE WHEN HourlyPrecipitation IS NULL THEN 0 ELSE t_weight END), 0) AS precip,
            SUM(t_weight * oktas) / NULLIF(SUM(CASE WHEN oktas IS NULL THEN 0 ELSE t_weight END), 0) AS cloud_cover,
            SUM(t_weight * elev_hundreds_ft) / NULLIF(SUM(CASE WHEN elev_hundreds_ft IS NULL THEN 0 ELSE t_weight END), 0) AS cloud_elev,
            SUM(t_weight * from_north) / NULLIF(SUM(CASE WHEN from_north IS NULL THEN 0 ELSE t_weight END), 0) AS wind_north,
            SUM(t_weight * from_east) / NULLIF(SUM(CASE WHEN from_east IS NULL THEN 0 ELSE t_weight END), 0) AS wind_east
        FROM flight_readings
        GROUP BY flight_uid, STATION
    ),
    spatial_agg AS (
        -- Aggregate across stations using Kriging weights
        SELECT
            flight_uid,
            SUM(s_weight * temp) / NULLIF(SUM(CASE WHEN temp IS NULL THEN 0 ELSE s_weight END), 0) AS HourlyDryBulbTemperature,
            SUM(s_weight * dew_temp) / NULLIF(SUM(CASE WHEN dew_temp IS NULL THEN 0 ELSE s_weight END), 0) AS HourlyDewPointTemperature,
            SUM(s_weight * humidity) / NULLIF(SUM(CASE WHEN humidity IS NULL THEN 0 ELSE s_weight END), 0) AS HourlyRelativeHumidity,
            SUM(s_weight * altimeter) / NULLIF(SUM(CASE WHEN altimeter IS NULL THEN 0 ELSE s_weight END), 0) AS HourlyAltimeterSetting,
            SUM(s_weight * visibility) / NULLIF(SUM(CASE WHEN visibility IS NULL THEN 0 ELSE s_weight END), 0) AS HourlyVisibility,
            SUM(s_weight * pressure) / NULLIF(SUM(CASE WHEN pressure IS NULL THEN 0 ELSE s_weight END), 0) AS HourlyStationPressure,
            SUM(s_weight * wet_temp) / NULLIF(SUM(CASE WHEN wet_temp IS NULL THEN 0 ELSE s_weight END), 0) AS HourlyWetBulbTemperature,
            SUM(s_weight * precip) / NULLIF(SUM(CASE WHEN precip IS NULL THEN 0 ELSE s_weight END), 0) AS HourlyPrecipitation,
            SUM(s_weight * cloud_cover) / NULLIF(SUM(CASE WHEN cloud_cover IS NULL THEN 0 ELSE s_weight END), 0) AS HourlyCloudCoverage,
            SUM(s_weight * cloud_elev) / NULLIF(SUM(CASE WHEN cloud_elev IS NULL THEN 0 ELSE s_weight END), 0) AS HourlyCloudElevation,
            SQRT(POWER(SUM(s_weight * wind_north) / NULLIF(SUM(CASE WHEN wind_north IS NULL THEN 0 ELSE s_weight END), 0), 2) +
                 POWER(SUM(s_weight * wind_east) / NULLIF(SUM(CASE WHEN wind_east IS NULL THEN 0 ELSE s_weight END), 0), 2)) AS HourlyWindSpeed
        FROM station_agg
        GROUP BY flight_uid
    )
    SELECT * FROM spatial_agg
    """)
    
    # Join with original flight data
    final_df = df_flights.join(result_df, on="flight_uid", how="left")
    
    print("  Join complete")
    return final_df

In [None]:
def run_custom_join_pipeline(dataset_id: str, spatial_radius: int = 50, time_radius: int = 12):
    """
    Execute the full custom join pipeline for a dataset.
    
    Args:
        dataset_id: Dataset identifier ('3m', '1y', '5y')
        spatial_radius: Maximum station distance in km
        time_radius: Weather lookback hours
        
    Returns:
        Joined DataFrame
    """
    print(f"\n{'='*60}")
    print(f"Running pipeline: {dataset_id}, spatial={spatial_radius}km, time={time_radius}hrs")
    print('='*60)
    
    # 1. Ingest data
    df_flight_times, df_flights = ingest_flight_data(FLIGHT_DATASETS[dataset_id])
    df_airports = ingest_airport_data(df_flight_times)
    df_weather = ingest_weather_data(WEATHER_DATASETS[dataset_id])
    df_stations = ingest_station_data(df_weather)
    
    # 2. Add airport coordinates to flights
    df_flight_times = df_flight_times.join(
        df_airports, 
        df_flight_times.ORIGIN == df_airports.iata_code, 
        how="left"
    )
    
    # 3. Compute Kriging weights
    neighbor_df = build_neighbor_dataframe(df_stations, df_airports)
    df_weights = compute_spatial_weights(neighbor_df, df_stations, df_airports, spatial_radius, time_radius)
    
    # 4. Prepare data for join (convert to GMT)
    df_flight_times = add_gmt_timestamp(df_flight_times, "dep_timestamp_gmt", "dep_timestamp")
    
    station_codes = [r["station_code"] for r in df_weights.select("station_code").distinct().collect()]
    df_weather_filtered = (
        df_weather
        .filter(df_weather["STATION"].isin(station_codes))
        .join(df_stations, on="STATION", how="left")
    )
    df_weather_filtered = add_gmt_timestamp(df_weather_filtered, "DATE_gmt", "DATE")
    
    # 5. Perform join
    final_df = custom_weather_join(df_flight_times, df_weights, df_weather_filtered, df_flights, time_radius)
    
    print(f"\n✓ Pipeline complete: {final_df.count():,} rows")
    return final_df

## 5. Execution

In [None]:
# Run the pipeline for 5-year dataset
spark.sparkContext.setCheckpointDir("/tmp/spark-checkpoints")

final_df = run_custom_join_pipeline(
    dataset_id="5y",
    spatial_radius=SPATIAL_RADIUS,
    time_radius=TIME_RADIUS
)

# Save results
output_path = f"{OUTPUT_BASE_PATH}/5_year_custom_joined"
final_df.write.mode("overwrite").parquet(output_path)
print(f"\nSaved to: {output_path}")

## 6. Summary

This notebook implements a sophisticated **Kriging-based spatial interpolation** pipeline for joining flight data with weather observations.

### Key Features

| Feature | Description |
|---------|-------------|
| **Matérn Covariance** | Models spatial correlation with tunable smoothness |
| **Ordinary Kriging** | Provides optimal unbiased estimates with minimum variance |
| **Temporal Weighting** | Exponential decay prioritizes recent observations |
| **Multi-Station Fusion** | Combines data from multiple stations for robustness |

### Weather Features Produced

| Feature | Description |
|---------|-------------|
| `HourlyDryBulbTemperature` | Air temperature (°F) |
| `HourlyDewPointTemperature` | Dew point (°F) |
| `HourlyRelativeHumidity` | Relative humidity (%) |
| `HourlyVisibility` | Visibility (miles) |
| `HourlyStationPressure` | Barometric pressure (inHg) |
| `HourlyPrecipitation` | Precipitation (inches) |
| `HourlyCloudCoverage` | Cloud cover (oktas, 0-8) |
| `HourlyCloudElevation` | Cloud base height (hundreds of feet) |
| `HourlyWindSpeed` | Wind speed (mph) |

### Advantages Over Simple Nearest-Neighbor Join

1. **Reduced Noise**: Averaging multiple stations reduces measurement error
2. **Missing Data Handling**: If one station has gaps, others fill in
3. **Optimal Weighting**: Kriging weights minimize estimation variance
4. **Spatial Consistency**: Nearby airports get similar weather estimates