# Aerosol–Meteorology Data Fusion

This notebook merges MERRA-2 aerosol diagnostics and near-surface meteorological fields into a single, spatiotemporally aligned dataset over Saudi Arabia (2012–2021).

Both components have been independently validated in prior EDA notebooks to ensure:
- variable presence and integrity,
- physically reasonable ranges,
- absence of encoded fill values,
- consistent spatial and temporal grids.

The objective here is to construct a clean, unified dataset suitable for feature engineering and supervised learning of SDS-induced mmWave attenuation.

In [7]:
from __future__ import annotations

# Standard library
from pathlib import Path

# Scientific stack
import numpy as np
import pandas as pd
import xarray as xr

In [None]:
# Project paths (robust to execution location)
ROOT = Path.cwd().parent if Path.cwd().name == "notebooks" else Path.cwd()

AER_DIR = ROOT / "data" / "raw" / "merra2" / "aer" / "2012_2021"
MET_DIR = ROOT / "data" / "raw" / "merra2" / "met" / "2012_2021"

OUT_DIR = ROOT / "data" / "processed"
OUT_DIR.mkdir(parents=True, exist_ok=True)

print("AER_DIR:", AER_DIR, "| exists:", AER_DIR.exists())
print("MET_DIR:", MET_DIR, "| exists:", MET_DIR.exists())
print("OUT_DIR:", OUT_DIR)


# ---- Run mode ----
# "sample" = merge only the first aer+met files (fast debug)
# "full"   = merge ALL matching aer+met files and write full artifact
RUN_MODE = "full"  # change to "sample" for quick tests

# Output filenames
OUT_FP_SAMPLE = OUT_DIR / "merged_rep_sample_20120101.nc4"
OUT_FP_FULL = OUT_DIR / "merged_full_2012_2021.nc4"

## Merge Strategy

The aerosol and meteorological datasets share:
- identical latitude–longitude grids,
- hourly temporal resolution,
- consistent CF-compliant metadata.

The merge strategy is therefore:
- load both datasets lazily with xarray,
- align explicitly on time, latitude, and longitude,
- subset to common timestamps,
- retain only variables required for downstream modeling.

No spatial interpolation or temporal resampling is performed at this stage.

In [None]:
# Aerosol variables (from EDA)
aer_vars = [
    "DUCMASS",
    "DUCMASS25",
    "DUSMASS",
    "DUSMASS25",
    "DUEXTTAU",
    "DUSCATAU",
    "DUANGSTR",
]

# Meteorological variables (from EDA)
met_vars = [
    "T2M",
    "U2M",
    "V2M",
    "U10M",
    "V10M",
    "PS",
    "SLP",
    "QV2M",
]

In [None]:
# Discover files (robust to nested folders + .nc/.nc4)
aer_files = sorted(AER_DIR.rglob("*.nc")) + sorted(AER_DIR.rglob("*.nc4"))
met_files = sorted(MET_DIR.rglob("*.nc")) + sorted(MET_DIR.rglob("*.nc4"))

print("Aerosol files found:", len(aer_files))
print("Met files found:", len(met_files))

if not aer_files:
    raise FileNotFoundError(f"No aerosol NetCDF files found under {AER_DIR}")
if not met_files:
    raise FileNotFoundError(f"No meteorological NetCDF files found under {MET_DIR}")

# Helper: extract YYYYMMDD from filename to pair aer/met files
# Your raw files are daily (e.g., ...Nx.20120101_subsetted.nc4), so we must pair by day.
import re

def _key_yyyymmdd(p: Path) -> str | None:
    m = re.search(r"(20\d{2})(0[1-9]|1[0-2])([0-3]\d)", p.name)
    if m:
        return f"{m.group(1)}{m.group(2)}{m.group(3)}"
    return None

# Build maps for pairing
_aer_map = {}
for p in aer_files:
    k = _key_yyyymmdd(p)
    if k is not None and k not in _aer_map:
        _aer_map[k] = p

_met_map = {}
for p in met_files:
    k = _key_yyyymmdd(p)
    if k is not None and k not in _met_map:
        _met_map[k] = p

common_keys = sorted(set(_aer_map) & set(_met_map))

print("Pairable YYYYMMDD keys:", len(common_keys))
if not common_keys:
    # Fallback: if filenames don't contain YYYYMMDD, just pair by sorted order
    print("[WARN] Could not detect YYYYMMDD in filenames; falling back to index-wise pairing.")
    n = min(len(aer_files), len(met_files))
    common_pairs = list(zip(aer_files[:n], met_files[:n]))
else:
    common_pairs = [(_aer_map[k], _met_map[k]) for k in common_keys]

# In sample mode, keep only the first pair
if RUN_MODE == "sample":
    common_pairs = common_pairs[:1]
    print("RUN_MODE=sample -> using only 1 aer/met pair")
else:
    print("RUN_MODE=full -> using", len(common_pairs), "aer/met pairs")

In [None]:
print("=== Merge pipeline ===")

merged_parts: list[Path] = []

# Write per-pair merged parts first (keeps memory stable)
parts_dir = OUT_DIR / "_merge_parts"
parts_dir.mkdir(parents=True, exist_ok=True)

for i, (a_fp, m_fp) in enumerate(common_pairs, start=1):
    print(f"[{i}/{len(common_pairs)}] AER={a_fp.name} | MET={m_fp.name}")

    ds_aer = xr.open_dataset(a_fp, engine="netcdf4")[aer_vars]
    ds_met = xr.open_dataset(m_fp, engine="netcdf4")[met_vars]

    # Alignment checks (require exact coords)
    if not np.array_equal(ds_aer["lat"].values, ds_met["lat"].values):
        raise ValueError(f"lat mismatch for {a_fp.name} vs {m_fp.name}")
    if not np.array_equal(ds_aer["lon"].values, ds_met["lon"].values):
        raise ValueError(f"lon mismatch for {a_fp.name} vs {m_fp.name}")
    if not np.array_equal(ds_aer["time"].values, ds_met["time"].values):
        raise ValueError(f"time mismatch for {a_fp.name} vs {m_fp.name}")

    ds_merged = xr.merge([ds_aer, ds_met], compat="identical", join="exact")

    # Quick missingness check on this part
    part_missing = float(ds_merged.to_array().isnull().mean())
    print(f"    missing fraction (part): {part_missing:.6f}")

    # Save part
    # Use key if available; else fall back to index
    k = _key_yyyymmdd(a_fp) or f"part_{i:04d}"
    part_fp = parts_dir / f"merged_{k}.nc4"
    ds_merged.to_netcdf(part_fp, engine="netcdf4")
    merged_parts.append(part_fp)

# Final assembly
if RUN_MODE == "sample":
    # For sample mode, also save a stable sample artifact name
    final_fp = OUT_FP_SAMPLE
    ds_final = xr.open_dataset(merged_parts[0], engine="netcdf4")
    ds_final.to_netcdf(final_fp, engine="netcdf4")
    print("Saved merged representative sample to:", final_fp)
else:
    final_fp = OUT_FP_FULL

    print("=== Concatenating parts into full dataset ===")
    ds_final = xr.open_mfdataset(
        [str(p) for p in merged_parts],
        engine="netcdf4",
        combine="by_coords",
        parallel=False,
    )

    # Global missingness check
    missing_frac = float(ds_final.to_array().isnull().mean())
    print(f"Overall missing fraction (full): {missing_frac:.6f}")

    ds_final.to_netcdf(final_fp, engine="netcdf4")
    print("Saved FULL merged dataset to:", final_fp)

print("DONE.")

## Merge Outcome Summary

This notebook completed the full fusion of MERRA-2 aerosol and near-surface meteorological diagnostics into a single, spatiotemporally aligned dataset spanning the complete 2012–2021 study period.

### Key Outcomes

- Aerosol (M2T1NXAER) and meteorological (M2T1NXSLV) datasets are perfectly aligned by:
  - Time (daily resolution)
  - Latitude
  - Longitude
- Pairing is performed at the **daily level (YYYYMMDD)** to ensure exact temporal correspondence.
- All selected aerosol and meteorological variables are preserved without resampling or interpolation.
- Strict coordinate equality checks enforce:
  - No spatial misalignment
  - No temporal drift
- Missingness was verified at both per-part and global levels.
- The final merged artifact was saved as: `data/processed/merged_full_2012_2021.nc4`

### Engineering Design Decisions

- Merging is performed per daily file and written as intermediate parts to maintain memory stability.
- Final concatenation is executed using `xarray.open_mfdataset` with coordinate-based alignment.
- The merged dataset retains the native spatial resolution and temporal sampling of MERRA-2.

### Role in the Pipeline

This merged dataset is the canonical upstream artifact for:

- Feature engineering
- Physics-informed label construction
- Saudi-calibrated attenuation generation
- Surrogate model training and validation

All downstream stages depend exclusively on this artifact.