In [None]:
# --- Imports ---
import os, json, math, io, zipfile, time, re, shutil, glob, pathlib, sys, subprocess, shlex, tempfile, warnings
from pathlib import Path
from datetime import datetime as dt, timedelta, timezone, date, datetime
from dateutil import parser as dateparser
from dateutil.relativedelta import relativedelta
from urllib.parse import urljoin, quote
from functools import reduce

import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
import pytz
import xarray as xr
import rasterio
from rasterio.mask import mask

import shapely
from shapely import ops
from shapely.geometry import Point, Polygon, box, mapping
from shapely.ops import unary_union, transform as shp_transform

from pyproj import Transformer
import geopandas as gpd
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import folium
import ee  # Earth Engine

from sklearn.neighbors import BallTree
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from scipy.spatial.distance import cdist
from geopy.distance import geodesic

import yaml

warnings.filterwarnings("ignore", category=UserWarning)


def skip_if_exists(path: str) -> bool:
    return os.path.exists(path)


In [None]:
# --- Configuration loader (shared HeatShield/HydroPulse) ---
from pathlib import Path
import os
import re
import yaml

def load_env_file(path: Path) -> dict:
    env = {}
    if not path.exists():
        return env
    for line in path.read_text().splitlines():
        line = line.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        key, val = line.split("=", 1)
        env[key.strip()] = val.strip().strip('"').strip("'")
    return env

def apply_env_overrides() -> None:
    env = load_env_file(Path(".env"))
    for k, v in env.items():
        if v:
            os.environ[k] = v

def _fmt_yyyymmdd(s: str) -> str:
    # expects YYYY-MM-DD
    return re.sub(r"-", "", s.strip())

def _render_template(tpl: str, *, start: str, end: str, res_m: int, epsg: int, region: str) -> str:
    return tpl.format(
        start=_fmt_yyyymmdd(start),
        end=_fmt_yyyymmdd(end),
        res_m=int(res_m),
        epsg=int(epsg),
        region=str(region),
    )

def _resolve_out_dir(project_dir: Path, out_dir_value: str | None) -> Path:
    out_dir_value = out_dir_value or "results"
    p = Path(out_dir_value)
    return (p if p.is_absolute() else (project_dir / p)).resolve()

def _resolve_pathlike_keys(cfg: dict, out_dir: Path) -> None:
    """
    Resolve config values that look like relative file paths under out_dir.
    Rule: if key ends with _FILENAME, _PATH, _ZIP, _TIF_NAME, _CSV_NAME, _PARQUET_NAME, _TXT_NAME
    and the value is a relative path, make it absolute under out_dir.
    """
    suffixes = (
        "_FILENAME", "_PATH", "_ZIP",
        "_TIF_NAME", "_CSV_NAME", "_PARQUET_NAME", "_TXT_NAME",
        "_NETCDF_NAME", "_ZARR_NAME"
    )
    for k, v in list(cfg.items()):
        if not isinstance(v, str):
            continue
        if not k.endswith(suffixes):
            continue
        p = Path(v)
        if p.is_absolute():
            cfg[k] = str(p)
        else:
            cfg[k] = str((out_dir / p).resolve())

# Fail fast unless PROJECT_DIR is explicitly provided.
apply_env_overrides()

PROJECT_DIR = os.environ.get("PROJECT_DIR")
if not PROJECT_DIR:
    raise FileNotFoundError(
        "PROJECT_DIR is not set. Add PROJECT_DIR to .env or environment variables."
    )

PROJECT_DIR = Path(PROJECT_DIR).expanduser().resolve()
CONFIG_PATH = PROJECT_DIR / "config" / "config.yaml"
if not CONFIG_PATH.exists():
    raise FileNotFoundError(f"Missing config file: {CONFIG_PATH}")

CONFIG = yaml.safe_load(CONFIG_PATH.read_text()) or {}

# Set out_dir to an absolute path early.
OUT_DIR = _resolve_out_dir(PROJECT_DIR, CONFIG.get("out_dir", "results"))
CONFIG["out_dir"] = str(OUT_DIR)

# Optional: Apply env overrides ONLY for keys that exist in config.yaml.
# This prevents HeatShield-specific lists in shared code.
for key in list(CONFIG.keys()):
    env_val = os.environ.get(key)
    if env_val:
        CONFIG[key] = env_val

# Also allow a small, explicit allowlist for common optional overrides across projects.
for key in ["PURPLEAIR_SENSOR_INDEX"]:  # harmless if absent in HydroPulse config
    env_val = os.environ.get(key)
    if env_val:
        CONFIG[key] = env_val

# Render FINAL_DAILY_FILENAME from template if provided.
# Keeps downstream code stable: always refer to CONFIG["FINAL_DAILY_FILENAME"].
if "FINAL_DAILY_FILENAME_TEMPLATE" in CONFIG:
    region = CONFIG.get("region", "CA")
    start_date = CONFIG["start_date"]
    end_date = CONFIG["end_date"]
    res_m = CONFIG.get("grid_resolution_m", 3000)
    epsg = CONFIG.get("OPS_EPSG", CONFIG.get("CA_ALBERS_EPSG", 3310))
    CONFIG["FINAL_DAILY_FILENAME"] = _render_template(
        CONFIG["FINAL_DAILY_FILENAME_TEMPLATE"],
        start=start_date,
        end=end_date,
        res_m=res_m,
        epsg=epsg,
        region=region,
    )

# Resolve pathlike keys under out_dir (only for keys that exist).
_resolve_pathlike_keys(CONFIG, OUT_DIR)

# Create common directories only if they are referenced in config.
# (Avoid hardcoding "manual" for HydroPulse.)
for k, v in CONFIG.items():
    if k.endswith("_DIRNAME") and isinstance(v, str):
        os.makedirs(Path(CONFIG["out_dir"]) / v, exist_ok=True)

# EPSG constants (configurable)
WGS84_EPSG = int(CONFIG.get("WGS84_EPSG", 4326))
CA_ALBERS_EPSG = int(CONFIG.get("CA_ALBERS_EPSG", 3310))
OPS_EPSG = int(CONFIG.get("OPS_EPSG", CA_ALBERS_EPSG))

# Set working directory
os.chdir(PROJECT_DIR)

print(f"Config loaded from {CONFIG_PATH}")
print(f"Output dir: {CONFIG['out_dir']}")
print(f"Final daily filename: {CONFIG.get('FINAL_DAILY_FILENAME')}")

In [None]:
from pathlib import Path

def resolve_out_path(path_str: str) -> str:
    """
    Resolve a path string relative to CONFIG['out_dir'] unless already absolute.
    Returns an absolute string path.
    """
    p = Path(path_str)
    if p.is_absolute():
        return str(p)
    return str((Path(CONFIG["out_dir"]) / p).resolve())

In [None]:
# --- Ensure California boundary and build 3 km grid clipped to land ---


# Config
res_m = int(CONFIG.get("grid_resolution_m", 3000))
out_epsg = int(CONFIG.get("crs_epsg", 4326))
out_dir = CONFIG["out_dir"]; os.makedirs(out_dir, exist_ok=True)
inset_buffer_m = int(CONFIG.get("coast_inset_m", 0))  # e.g. 5000
boundary_path = CONFIG.get("ca_boundary_path", None)

# 1) Ensure boundary: download Census cartographic boundary if missing
if not boundary_path or not os.path.exists(boundary_path):
    states_zip = os.path.join(out_dir, "cb_2023_us_state_20m.zip")
    if not os.path.exists(states_zip):
        url = CONFIG["CENSUS_STATES_ZIP_URL"]
        r = requests.get(url, timeout=int(CONFIG.get("CENSUS_STATES_TIMEOUT", 120))); r.raise_for_status()
        with open(states_zip, "wb") as f: f.write(r.content)
    # Read from zip directly and select California
    states = gpd.read_file(f"zip://{states_zip}")
    if states.empty:
        raise ValueError("Census states file loaded empty.")
    ca = states[states["STATEFP"].astype(str).str.zfill(2).eq("06")][["geometry"]]
    if ca.empty:
        raise ValueError("California polygon not found in Census states file.")
    boundary_path = os.path.join(out_dir, "california_boundary.gpkg")
    ca.to_file(boundary_path, driver="GPKG")
    CONFIG["ca_boundary_path"] = boundary_path  # persist for later cells

# 2) Load boundary, dissolve, project, optional inward buffer
b = gpd.read_file(boundary_path)
if b.crs is None: raise ValueError("Boundary file has no CRS.")
b = b[["geometry"]].copy()
b = b.to_crs(CA_ALBERS_EPSG)
b = gpd.GeoDataFrame(geometry=[b.unary_union], crs=f"EPSG:{CA_ALBERS_EPSG}")
if inset_buffer_m > 0:
    b.geometry = b.buffer(-inset_buffer_m)
    b = gpd.GeoDataFrame(geometry=[b.unary_union], crs=f"EPSG:{CA_ALBERS_EPSG}")

# 3) Build snapped rectilinear grid over boundary bounds in EPSG:3310
minx, miny, maxx, maxy = b.total_bounds
snap_down = lambda v, s: np.floor(v/s)*s
snap_up   = lambda v, s: np.ceil(v/s)*s
minx, miny = snap_down(minx, res_m), snap_down(miny, res_m)
maxx, maxy = snap_up(maxx, res_m), snap_up(maxy, res_m)

xs = np.arange(minx, maxx, res_m)
ys = np.arange(miny, maxy, res_m)
n_rect = len(xs)*len(ys)
if n_rect > 3_500_000:
    raise MemoryError(f"Grid too large ({n_rect:,}). Increase res_m or tile the state.")

cells, col_i, row_j = [], [], []
for j, y in enumerate(ys):
    for i, x in enumerate(xs):
        cells.append(box(x, y, x+res_m, y+res_m)); col_i.append(i); row_j.append(j)

gdf_proj = gpd.GeoDataFrame({"col_i": np.int32(col_i), "row_j": np.int32(row_j)},
                            geometry=cells, crs=f"EPSG:{CA_ALBERS_EPSG}")
gdf_proj["cell_area_m2"] = float(res_m)*float(res_m)
gdf_proj["grid_id"] = f"CA3310_{res_m}_" + gdf_proj["col_i"].astype(str) + "_" + gdf_proj["row_j"].astype(str)

# 4) Strict land clip and land fraction
gdf_proj = gpd.sjoin(gdf_proj, b, how="inner", predicate="intersects").drop(columns=["index_right"])
inter = gpd.overlay(gdf_proj[["grid_id","geometry"]], b, how="intersection", keep_geom_type=True)
inter["land_area_m2"] = inter.geometry.area
land = inter[["grid_id","land_area_m2"]].groupby("grid_id", as_index=False).sum()
gdf_proj = gdf_proj.merge(land, on="grid_id", how="left")
gdf_proj["land_area_m2"] = gdf_proj["land_area_m2"].fillna(0.0)
gdf_proj["land_frac"] = (gdf_proj["land_area_m2"] / gdf_proj["cell_area_m2"]).clip(0,1)
gdf_proj = gdf_proj[gdf_proj["land_frac"] > 0].reset_index(drop=True)

# 5) Reproject to requested output CRS and save
grid_gdf = gdf_proj.to_crs(out_epsg)

parquet_path = os.path.join(out_dir, f"grid_{res_m}m_CA.parquet")
grid_gdf.to_parquet(parquet_path, index=False)

geojson_path = os.path.join(out_dir, f"grid_{res_m}m_CA_head10.geojson")
grid_gdf.head(10).to_file(geojson_path, driver="GeoJSON")

# Diagnostics
cell_area_km2 = (res_m/1000.0)**2
eff_land_km2 = float((grid_gdf.get("land_frac",1.0) * cell_area_km2).sum())
print(f"Saved: {parquet_path}")
print(f"Cells: {len(grid_gdf):,}")
print(f"Effective land area ≈ {round(eff_land_km2):,} km²")
print(f"Implied cell size ≈ {round((eff_land_km2/len(grid_gdf))**0.5,2)} km")

grid_gdf.head()


In [None]:
# --- Persist config + save grid (3310 ops copy, 4326 preview) + write metadata ---

# Inputs assumed from prior cell:
# - grid_gdf            : current grid GeoDataFrame (any CRS)
# - CONFIG              : dict with out_dir, grid_resolution_m, crs_epsg, ca_boundary_path
# - CA_ALBERS_EPSG=3310 : defined earlier

out_dir = CONFIG["out_dir"]; os.makedirs(out_dir, exist_ok=True)
res_m = int(CONFIG.get("grid_resolution_m", 3000))
out_epsg = int(CONFIG.get("crs_epsg", 4326))
boundary_path = CONFIG.get("ca_boundary_path")

# 1) Persist boundary path back to CONFIG 
if not boundary_path or not os.path.exists(boundary_path):
    raise FileNotFoundError("CONFIG['ca_boundary_path'] missing or invalid. Rebuild boundary.")
CONFIG["ca_boundary_path"] = boundary_path

config_runtime_path = os.path.join(out_dir, "config_runtime.json")
with open(config_runtime_path, "w") as f:
    json.dump(CONFIG, f, indent=2)
print("Saved:", config_runtime_path)

# 2) Ensure we have an EPSG:3310 version for spatial ops
if grid_gdf.crs is None:
    raise ValueError("grid_gdf has no CRS. Rebuild grid.")
grid_3310 = grid_gdf.to_crs(3310) if grid_gdf.crs.to_epsg() != 3310 else grid_gdf

# 3) Save operational GeoParquet in 3310 + lightweight WGS84 preview
parquet_3310 = os.path.join(out_dir, f"grid_{res_m}m_CA_epsg3310.parquet")
grid_3310.to_parquet(parquet_3310, index=False)
print("Saved:", parquet_3310, "| cells:", len(grid_3310))

# Optional small preview in 4326 for quick map checks
preview_4326 = grid_3310.to_crs(4326).head(500)  # cap to avoid huge files
geojson_preview = os.path.join(out_dir, f"grid_{res_m}m_CA_head500_epsg4326.geojson")
preview_4326.to_file(geojson_preview, driver="GeoJSON")
print("Saved:", geojson_preview)

# 4) Compute and save metadata
cell_area_km2 = (res_m/1000.0)**2
effective_land_km2 = float((grid_3310.get("land_frac", 1.0) * cell_area_km2).sum())
implied_cell_km = float((effective_land_km2 / len(grid_3310))**0.5)
minx, miny, maxx, maxy = grid_3310.total_bounds
bbox_km = ((maxx-minx)/1000.0, (maxy-miny)/1000.0)

meta = {
    "timestamp_utc": dt.utcnow().isoformat(timespec="seconds") + "Z",
    "grid_resolution_m": res_m,
    "crs_ops_epsg": 3310,
    "crs_export_default_epsg": out_epsg,
    "cells": int(len(grid_3310)),
    "effective_land_area_km2": round(effective_land_km2, 2),
    "implied_cell_km": round(implied_cell_km, 4),
    "bbox_km_width_height": [round(bbox_km[0], 2), round(bbox_km[1], 2)],
    "has_land_frac": bool("land_frac" in grid_3310.columns),
    "boundary_path": boundary_path,
    "parquet_3310_path": parquet_3310,
    "geojson_preview_4326_path": geojson_preview,
}

meta_path = os.path.join(out_dir, f"grid_{res_m}m_CA_meta.json")
with open(meta_path, "w") as f:
    json.dump(meta, f, indent=2)
print("Saved:", meta_path)
meta


In [None]:
# CDO data fetch and processing functions
# repeat some variables for clarity
OUT_DIR = CONFIG["out_dir"]
RAW_DIR = os.path.join(OUT_DIR, CONFIG["CDO_RAW_DIRNAME"])
CLEAN_DIR = os.path.join(OUT_DIR, CONFIG["CDO_CLEAN_DIRNAME"])
os.makedirs(RAW_DIR, exist_ok=True); os.makedirs(CLEAN_DIR, exist_ok=True)

def month_windows(start_date, end_date):
    s = dt.fromisoformat(start_date).date().replace(day=1)
    e = dt.fromisoformat(end_date).date()
    cur = s
    while cur <= e:
        nxt = (cur + relativedelta(months=1)) - relativedelta(days=1)
        yield cur.isoformat(), min(nxt, e).isoformat()
        cur = (cur + relativedelta(months=1)).replace(day=1)

def parse_attributes(attr):
    parts = (attr or "").split(","); parts += [""] * (4 - len(parts))
    mflag, qflag, sflag, obs_hhmm = parts[:4]
    return mflag or None, qflag or None, sflag or None, obs_hhmm or None

def fetch_cdo_page(session, url, headers, params, max_retries=None, base_delay=None, timeout=None):
    if max_retries is None:
        max_retries = int(CONFIG.get("CDO_MAX_RETRIES", 6))
    if base_delay is None:
        base_delay = float(CONFIG.get("CDO_BACKOFF_BASE", 0.8))
    if timeout is None:
        timeout = int(CONFIG.get("CDO_TIMEOUT", 180))
    for attempt in range(max_retries):
        try:
            r = session.get(url, headers=headers, params=params, timeout=timeout)
            if r.status_code in (429, 500, 502, 503, 504):
                raise requests.HTTPError(f"{r.status_code} retry")
            r.raise_for_status()
            return r.json()
        except Exception:
            if attempt == max_retries - 1:
                raise
            time.sleep(base_delay * (2 ** attempt))


def cdo_stream_monthly(datasetid, locationid, startdate, enddate, datatypes, token,
                       units="standard", page_limit=1000, force=False):
    url = CONFIG["CDO_BASE_URL"]
    headers = {"token": token}
    session = requests.Session()
    written = []

    for dtid in datatypes:
        for ms, me in month_windows(startdate, enddate):
            out_csv = os.path.join(RAW_DIR, f"ghcnd_{dtid}_{ms[:7]}.csv")
            if skip_if_exists(out_csv) and not force:
                # resume: skip existing month-datatype file
                written.append(out_csv); continue

            frames = []
            offset = 1
            while True:
                params = {
                    "datasetid": datasetid, "locationid": locationid,
                    "startdate": ms, "enddate": me,
                    "datatypeid": dtid, "units": units,
                    "limit": page_limit, "offset": offset
                }
                js = fetch_cdo_page(session, url, headers, params)
                rows = js.get("results", [])
                if not rows:
                    break
                frames.append(pd.json_normalize(rows))
                if len(rows) < page_limit:
                    break
                offset += page_limit
                time.sleep(0.15)  # gentle pacing

            if frames:
                df = pd.concat(frames, ignore_index=True)
                # normalize
                df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.date
                parsed = df["attributes"].apply(parse_attributes)
                df[["mflag","qflag","sflag","obs_hhmm"]] = pd.DataFrame(parsed.tolist(), index=df.index)
                # scale tenths
                scale = {"PRCP": 0.1, "TMAX": 0.1, "TMIN": 0.1}
                df["datatype"] = df["datatype"].astype(str)
                df["value"] = pd.to_numeric(df["value"], errors="coerce")
                df["value_scaled"] = df.apply(lambda r: r["value"] * scale.get(r["datatype"], 1.0), axis=1)
                # write monthly raw
                df[["date","datatype","station","attributes","mflag","qflag","sflag","obs_hhmm","value","value_scaled"]].to_csv(out_csv, index=False)
                written.append(out_csv)
            else:
                # create an empty file with header to mark completion
                with open(out_csv, "w", newline="") as f:
                    w = csv.writer(f); w.writerow(["date","datatype","station","attributes","mflag","qflag","sflag","obs_hhmm","value","value_scaled"])
                written.append(out_csv)
    return written

def build_clean_wide():
    # read all monthly raw files and assemble cleaned wide once
    files = sorted([os.path.join(RAW_DIR, f) for f in os.listdir(RAW_DIR) if f.endswith(".csv")])
    if not files:
        return None
    df = pd.concat((pd.read_csv(f, dtype={"datatype":str,"station":str}) for f in files), ignore_index=True)
    # convert types back
    df["date"] = pd.to_datetime(df["date"]).dt.date
    # keep good qflag
    df = df[(df["qflag"].isna()) | (df["qflag"]=="")]
    wide = (
        df.pivot_table(index=["station","date"], columns="datatype", values="value_scaled", aggfunc="mean")
          .reset_index()
          .rename(columns={"date":"obs_date","PRCP":"precipitation_mm","TMAX":"temperature_max_c","TMIN":"temperature_min_c"})
          .sort_values(["obs_date","station"])
    )
    # attach obs time from PRCP
    prcp_times = df[df["datatype"]=="PRCP"][["station","date","obs_hhmm"]].drop_duplicates().rename(columns={"date":"obs_date"})
    wide = wide.merge(prcp_times, on=["station","obs_date"], how="left")
    raw_all = os.path.join(OUT_DIR, "ghcnd_daily_raw_all.csv")
    wide_all = os.path.join(OUT_DIR, "ghcnd_daily_wide.csv")
    df.to_csv(raw_all, index=False)
    wide.to_csv(wide_all, index=False)
    return raw_all, wide_all, len(df), len(wide), wide["station"].nunique(), wide["obs_date"].nunique()

# ---- Run statewide with resume capability ----
token = os.environ.get("CDO_TOKEN") or CONFIG.get("CDO_TOKEN", "")
if token and token != "YOUR_NCEI_CDO_TOKEN":
    written = cdo_stream_monthly(
        datasetid="GHCND",
        locationid="FIPS:06",                      # California statewide
        startdate=CONFIG["start_date"],
        enddate=CONFIG["end_date"],
        datatypes=["TMAX","TMIN","PRCP"],
        token=token,
        units="standard",
        page_limit=1000,
        force=False                                 # set True to re-download
    )
    print(f"Monthly files written: {len(written)} → {RAW_DIR}")

    res = build_clean_wide()
    if res:
        raw_all, wide_all, n_raw, n_wide, n_stn, n_dates = res
        print(f"Saved raw:  {raw_all}")
        print(f"Saved wide: {wide_all}")
        print(f"Counts → raw: {n_raw} | wide: {n_wide} | stations: {n_stn} | dates: {n_dates}")
else:
    print("Skipping CDO (missing CDO token).")


In [None]:
# === GHCND DAILY: raw (long) -> cleaned (wide with lat/lon in bbox) ===
# Input  (from your earlier step):  results/ghcnd_daily_raw_all.csv  (long form)
# Output (used by superset):        results/ghcnd_daily_cleaned.parquet  (wide per station-day with lat/lon)

# Need to do this because we aren't getting proper "joins" in our superset setup.


BASE = CONFIG["out_dir"]
RAW = resolve_out_path(CONFIG["GHCND_RAW_CSV_NAME"])
OUT_PARQ = resolve_out_path(CONFIG["GHCND_CLEAN_PARQUET_NAME"])
OUT_CSV = resolve_out_path(CONFIG["GHCND_CLEAN_CSV_NAME"])

assert os.path.exists(RAW), f"Missing raw GHCND file: {RAW}"

# 1) Ensure we have a station catalog with lat/lon
#    Prefer a local copy if you already saved one; otherwise download NOAA's reference once.
CAT_DIR = os.path.join(BASE, CONFIG["MANUAL_DIRNAME"]); os.makedirs(CAT_DIR, exist_ok=True)
CAT_TXT = os.path.join(CAT_DIR, CONFIG["GHCND_STATIONS_TXT_NAME"])

if not os.path.exists(CAT_TXT):
    url = CONFIG["GHCND_STATIONS_URL"]
    r = requests.get(url, timeout=int(CONFIG.get("CENSUS_STATES_TIMEOUT", 120))); r.raise_for_status()
    with open(CAT_TXT, "wb") as f: f.write(r.content)

# Parse ghcnd-stations.txt (fixed-width)
# Columns per docs: ID(1-11), LAT(13-20), LON(22-30), ELEV(32-37), STATE(39-40), NAME(42-71) ...
def parse_stations(path):
    recs = []
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            if len(line) < 40: 
                continue
            sid = line[0:11].strip()
            try:
                lat = float(line[12:20].strip())
                lon = float(line[21:30].strip())
            except ValueError:
                continue
            state = line[38:40].strip()
            name  = line[41:71].strip()
            recs.append((sid, lat, lon, state, name))
    return pd.DataFrame(recs, columns=["station_core","lat","lon","state","name"])

stations = parse_stations(CAT_TXT)

# 2) Load your raw long-form CDO file
# Expected columns seen in your sample:
# ['attributes','datatype','date','mflag','obs_hhmm','qflag','sflag','station','value','value_scaled']
raw = pd.read_csv(RAW, low_memory=False)

# Normalize station key: raw uses "GHCND:USW00023232" → core "USW00023232"
raw["station_core"] = raw["station"].astype(str).str.replace("^GHCND:", "", regex=True)

# Pick a numeric value column: prefer value_scaled if present; else scale GHCND native units.
# GHCND native: PRCP = tenths of mm, TMAX/TMIN = tenths of °C.
have_scaled = "value_scaled" in raw.columns
def scaled_val(row):
    if have_scaled and pd.notna(row["value_scaled"]):
        return float(row["value_scaled"])
    v = pd.to_numeric(row["value"], errors="coerce")
    if pd.isna(v): 
        return np.nan
    if row["datatype"] == "PRCP":
        return v * 0.1             # → mm
    if row["datatype"] in ("TMAX","TMIN"):
        return v * 0.1             # → °C
    return v

raw["val_clean"] = raw.apply(scaled_val, axis=1)

# Filter to the analysis window if your raw contains more than needed
if "start_date" in CONFIG and "end_date" in CONFIG:
    sd = pd.to_datetime(CONFIG["start_date"], utc=True, errors="coerce")
    ed = pd.to_datetime(CONFIG["end_date"],   utc=True, errors="coerce")
    raw["date"] = pd.to_datetime(raw["date"], utc=True, errors="coerce")
    raw = raw[(raw["date"]>=sd) & (raw["date"]<=ed)]
else:
    raw["date"] = pd.to_datetime(raw["date"], utc=True, errors="coerce")

# 3) Keep only the datatypes we need and one value per (station,date,datatype)
keep_types = {"PRCP":"precipitation_mm", "TMAX":"temperature_max_c", "TMIN":"temperature_min_c"}
raw = raw[raw["datatype"].isin(keep_types.keys())].copy()

# If multiple rows per (station,date,datatype), average them
agg = (raw.groupby(["station_core","date","datatype"], as_index=False)["val_clean"]
          .mean())

# 4) Pivot to wide columns
wide = (agg.pivot(index=["station_core","date"], columns="datatype", values="val_clean")
           .reset_index())
# Rename columns to our canonical names
wide = wide.rename(columns={k:v for k,v in keep_types.items() if k in wide.columns})

# 5) Attach lat/lon from station catalog and clip to CA bbox
wide = wide.merge(stations[["station_core","lat","lon"]], on="station_core", how="left")

# Clip to CONFIG["bbox"] (California in your setup)
bbox = CONFIG["bbox"]
minx, miny, maxx, maxy = bbox["nwlng"], bbox["selat"], bbox["selng"], bbox["nwlat"]
in_box = (wide["lon"].between(minx, maxx)) & (wide["lat"].between(miny, maxy))
wide = wide[in_box].copy()

# 6) Final tidy columns + sorts
cols_order = ["station_core","date","lat","lon",
              "precipitation_mm","temperature_max_c","temperature_min_c"]
for c in cols_order:
    if c not in wide.columns: wide[c] = np.nan
wide = wide[cols_order].sort_values(["station_core","date"])

# 7) Save for the superset
wide.to_parquet(OUT_PARQ, index=False)
wide.to_csv(OUT_CSV, index=False)
print(f"Saved cleaned CDO daily → {OUT_PARQ} (rows={len(wide)}, stations={wide['station_core'].nunique()})")


In [None]:
### We've changed our approach -- DO NOT RUN THIS ANY MORE ###

# === PRISM | Cell A: Raw ingest via official Web Service (resumable + chunkable) ===
#
# Web service syntax (per PRISM doc):
#   https://services.nacse.org/prism/data/get/<region>/<res>/<element>/<date><?format=[nc|asc|bil]>
# One grid per request, returns a .zip.  [oai_citation:4‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)
#
# PRISM download limits:
# - If a file is downloaded twice in a 24-hour period, no more downloads of that file allowed in that period
# - Excessive activity may result in IP blocking  [oai_citation:5‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)
#
# This cell is designed for "download once, then reuse", and for running in small chunks.

import os
import time
import random
import requests
import pandas as pd
from pathlib import Path
from datetime import datetime

# -----------------------------
# Required config (uses your existing baseline keys)
# -----------------------------
BASELINE_START = pd.to_datetime(CONFIG["BASELINE_START_DATE"])
BASELINE_END   = pd.to_datetime(CONFIG["BASELINE_END_DATE"])

# PRISM web service parameters
PRISM_SERVICE_BASE = str(CONFIG.get("PRISM_SERVICE_BASE_URL", "https://services.nacse.org/prism/data/get")) 
PRISM_REGION = str(CONFIG.get("PRISM_REGION", "us"))     # 'us' CONUS  [oai_citation:6‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)
PRISM_RES    = str(CONFIG.get("PRISM_RESOLUTION", "4km"))# '4km' supported  [oai_citation:7‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)
PRISM_ELEMENTS = CONFIG.get("PRISM_ELEMENTS", ["ppt", "tmean"])  # elements list in doc  [oai_citation:8‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)

# Output folder
OUT_DIR = Path(CONFIG["out_dir"])
RAW_DIR = OUT_DIR / "prism_raw_baseline_ws"
RAW_DIR.mkdir(parents=True, exist_ok=True)

# -----------------------------
# Controls for "small chunks over several days"
# -----------------------------
# You can run year-by-year to reduce server load and make progress predictable.
# Set these each session (or add to config later if you like).
RUN_YEAR_START = int(CONFIG.get("PRISM_RUN_YEAR_START", BASELINE_START.year))
RUN_YEAR_END   = int(CONFIG.get("PRISM_RUN_YEAR_END", RUN_YEAR_START))  # default: single year
MAX_DOWNLOADS_THIS_RUN = int(CONFIG.get("PRISM_MAX_DOWNLOADS_PER_RUN", 400))  # hard stop per run

# Throttling/backoff
BASE_SLEEP_S = float(CONFIG.get("PRISM_BASE_SLEEP_S", 2.0))   # PRISM sample script uses sleep 2  [oai_citation:9‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)
JITTER_S     = float(CONFIG.get("PRISM_JITTER_S", 0.75))
TIMEOUT_S    = int(CONFIG.get("PRISM_TIMEOUT_S", 120))

MAX_RETRIES  = int(CONFIG.get("PRISM_MAX_RETRIES", 6))
BACKOFF_BASE = float(CONFIG.get("PRISM_BACKOFF_BASE", 1.7))

# Optional: request format (default returns COG package; doc mentions optional formats nc/asc/bil)  [oai_citation:10‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)
# Leave as None for default; or set to "bil" / "nc"
PRISM_FORMAT = CONFIG.get("PRISM_FORMAT", None)

# Optional: releaseDate check (defaults OFF).
# When OFF: "download once" behavior = if file exists, skip without checking.
USE_RELEASEDATE_CHECK = bool(CONFIG.get("PRISM_USE_RELEASEDATE_CHECK", False))

# -----------------------------
# URL helpers (per doc)
# -----------------------------
def prism_grid_url(element: str, yyyymmdd: str) -> str:
    # https://services.nacse.org/prism/data/get/<region>/<res>/<element>/<date><?format=...>  [oai_citation:11‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)
    url = f"{PRISM_SERVICE_BASE}/{PRISM_REGION}/{PRISM_RES}/{element}/{yyyymmdd}"
    if PRISM_FORMAT:
        url += f"?format={PRISM_FORMAT}"
    return url

def prism_release_url(element: str, yyyymmdd: str) -> str:
    # https://services.nacse.org/prism/data/get/releaseDate/<region>/<resolution>/<element>/<date>?json=true  [oai_citation:12‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)
    return f"{PRISM_SERVICE_BASE}/releaseDate/{PRISM_REGION}/{PRISM_RES}/{element}/{yyyymmdd}?json=true"

def safe_session() -> requests.Session:
    s = requests.Session()
    s.headers.update({
        "User-Agent": CONFIG.get("USER_AGENT_HEADERS", {}).get("User-Agent", "BlueLeafLabs/HydroPulse"),
        "Accept": "*/*",
    })
    return s

def parse_filename_from_cd(cd: str) -> str | None:
    # Content-Disposition: attachment; filename=prism_ppt_us_4km_19910101.zip
    if not cd:
        return None
    cd = cd.strip()
    parts = cd.split(";")
    for p in parts:
        p = p.strip()
        if p.lower().startswith("filename="):
            fn = p.split("=", 1)[1].strip().strip('"')
            return fn
    return None

# -----------------------------
# Local pathing
# -----------------------------
def out_path_for(element: str, yyyymmdd: str, filename_hint: str | None = None) -> Path:
    ed = RAW_DIR / element
    ed.mkdir(parents=True, exist_ok=True)
    if filename_hint:
        return ed / filename_hint
    # Fallback (stable and unique even if server changes naming slightly)
    suffix = PRISM_FORMAT if PRISM_FORMAT else "cog"
    return ed / f"prism_{element}_{PRISM_REGION}_{PRISM_RES}_{yyyymmdd}_{suffix}.zip"

# -----------------------------
# Optional releaseDate logic
# -----------------------------
def fetch_release_date(session: requests.Session, element: str, yyyymmdd: str) -> str | None:
    # doc: releaseDate service provides release date; older than Apr 2014 may be unpopulated  [oai_citation:13‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)
    try:
        r = session.get(prism_release_url(element, yyyymmdd), timeout=TIMEOUT_S)
        if r.status_code != 200:
            return None
        js = r.json()
        # The PDF describes fields; response structure may be list/dict depending on single vs range.
        # We'll defensively extract any plausible release date string.
        if isinstance(js, list) and js:
            return js[0].get("releaseDate") or js[0].get("ReleaseDate") or js[0].get("release_date")
        if isinstance(js, dict):
            return js.get("releaseDate") or js.get("ReleaseDate") or js.get("release_date")
        return None
    except Exception:
        return None

# -----------------------------
# Download with retries + backoff
# -----------------------------
def download_one(session: requests.Session, element: str, yyyymmdd: str) -> tuple[str, Path | None]:
    url = prism_grid_url(element, yyyymmdd)

    # First request HEAD-like via GET stream (server returns a zip)
    for attempt in range(MAX_RETRIES):
        try:
            r = session.get(url, stream=True, timeout=TIMEOUT_S)
            if r.status_code == 404:
                return ("unavailable", None)
            if r.status_code in (429, 500, 502, 503, 504):
                raise RuntimeError(f"transient {r.status_code}")
            r.raise_for_status()

            fn = parse_filename_from_cd(r.headers.get("Content-Disposition", ""))
            out_path = out_path_for(element, yyyymmdd, fn)

            if out_path.exists():
                # Do not re-download. Close response promptly.
                r.close()
                return ("exists", out_path)

            tmp = out_path.with_suffix(out_path.suffix + ".part")
            with open(tmp, "wb") as f:
                for chunk in r.iter_content(chunk_size=1024 * 1024):
                    if chunk:
                        f.write(chunk)
            os.replace(tmp, out_path)
            return ("downloaded", out_path)

        except Exception as e:
            # backoff + jitter
            sleep_s = (BACKOFF_BASE ** attempt) + random.random() * JITTER_S
            print(f"[WARN] {element} {yyyymmdd} attempt {attempt+1}/{MAX_RETRIES}: {e} -> sleep {sleep_s:.2f}s")
            time.sleep(sleep_s)

    return ("failed", None)

# -----------------------------
# Build run date range (year-sliced)
# -----------------------------
run_start = max(BASELINE_START, pd.Timestamp(year=RUN_YEAR_START, month=1, day=1))
run_end   = min(BASELINE_END,   pd.Timestamp(year=RUN_YEAR_END,   month=12, day=31))
dates = pd.date_range(run_start, run_end, freq="D")

print("PRISM Cell A (web service) starting")
print(f"Baseline window: {BASELINE_START.date()} → {BASELINE_END.date()}")
print(f"Run slice      : {run_start.date()} → {run_end.date()} ({len(dates)} days)")
print(f"Elements       : {PRISM_ELEMENTS}")
print(f"Resolution     : {PRISM_RES} | Region: {PRISM_REGION}")
print(f"Max downloads  : {MAX_DOWNLOADS_THIS_RUN}")
print(f"Sleep (on dl)  : {BASE_SLEEP_S}s + jitter up to {JITTER_S}s\n")

# -----------------------------
# Main loop
# -----------------------------
session = safe_session()

stats = {"downloaded": 0, "exists": 0, "unavailable": 0, "failed": 0, "skipped_release": 0}
downloads_this_run = 0

for element in PRISM_ELEMENTS:
    print(f"--- Element: {element} ---")
    for i, d in enumerate(dates, start=1):
        yyyymmdd = d.strftime("%Y%m%d")

        # Enforce per-run cap (lets you run small chunks over multiple days)
        if downloads_this_run >= MAX_DOWNLOADS_THIS_RUN:
            print(f"[STOP] Reached PRISM_MAX_DOWNLOADS_PER_RUN={MAX_DOWNLOADS_THIS_RUN}. Safe to rerun later.")
            break

        # If file exists, skip immediately (resume behavior)
        # We don’t know server filename until request, so check fallback name pattern too.
        # We’ll do a cheap existence check by globbing element dir for this date.
        el_dir = RAW_DIR / element
        if el_dir.exists():
            hits = list(el_dir.glob(f"*{yyyymmdd}*.zip"))
            if hits:
                stats["exists"] += 1
                continue

        # Optional release-date check (OFF by default)
        if USE_RELEASEDATE_CHECK:
            _ = fetch_release_date(session, element, yyyymmdd)  # you can wire this into a manifest later

        status, path = download_one(session, element, yyyymmdd)
        stats[status] += 1

        if status == "downloaded":
            downloads_this_run += 1
            # polite sleep only when we actually transfer bytes (PRISM sample script sleeps 2)  [oai_citation:14‡Prism Group](https://prism.oregonstate.edu/documents/PRISM_downloads_web_service.pdf)
            time.sleep(BASE_SLEEP_S + random.random() * JITTER_S)

        # Heartbeat every ~50 days
        if i % 50 == 0:
            print(f"{element} day {i}/{len(dates)} | dl={stats['downloaded']} exist={stats['exists']} unavail={stats['unavailable']} failed={stats['failed']}")

    print(f"Completed element: {element}\n")

print("PRISM Cell A complete (this run slice)")
print(stats)

In [None]:
# === PRISM | Cell B: Build HydroPulse baseline from PRISM daily long-term normals (avg_30y) ===
#
# Inputs (local, no downloads):
#   {out_dir}/manual/prism/prism_ppt_us_25m_YYYYMMDD_avg_30y.zip
#   {out_dir}/manual/prism/prism_tmean_us_25m_YYYYMMDD_avg_30y.zip
#
# Output:
#   {out_dir}/prism_baseline_normals/prism_normals_doy_grid_25m_to_3km.parquet
#
# Output schema:
#   grid_id, doy, prism_ppt_norm_mm, prism_tmean_norm_c

import re
import zipfile
from pathlib import Path

import numpy as np
import pandas as pd
import geopandas as gpd

import rasterio
from rasterio.io import MemoryFile
from pyproj import Transformer

OUT_DIR = Path(CONFIG["out_dir"])

# Where you placed PRISM normals
PRISM_DIR = Path(CONFIG.get("PRISM_MANUAL_DIR", OUT_DIR / "manual" / "prism"))
if not PRISM_DIR.exists():
    raise FileNotFoundError(f"PRISM_MANUAL_DIR not found: {PRISM_DIR}")

# HydroPulse grid (EPSG:3310)
OPS_EPSG = int(CONFIG.get("OPS_EPSG", 3310))
GRID_PATH = Path(resolve_out_path(CONFIG["GRID_FILENAME"]))

# Output
BASE_DIR = OUT_DIR / "prism_baseline_normals"
BASE_DIR.mkdir(parents=True, exist_ok=True)
SHARDS_DIR = BASE_DIR / "shards_doy"
SHARDS_DIR.mkdir(parents=True, exist_ok=True)
FINAL_PATH = BASE_DIR / "prism_normals_doy_grid_25m_to_3km.parquet"

# --- Load grid centroids ---
grid = gpd.read_parquet(GRID_PATH)
if grid.crs is None or (grid.crs.to_epsg() or 0) != OPS_EPSG:
    grid = grid.set_crs(f"EPSG:{OPS_EPSG}", allow_override=True) if grid.crs is None else grid.to_crs(OPS_EPSG)

if "grid_id" not in grid.columns:
    raise KeyError("Expected grid parquet to contain 'grid_id'.")

grid_id = grid["grid_id"].astype(str).values
centroids = grid.geometry.centroid
xs = centroids.x.values
ys = centroids.y.values
n_cells = len(grid_id)

print(f"Grid: {n_cells} cells | EPSG:{OPS_EPSG}")
print(f"PRISM normals dir: {PRISM_DIR}")

# --- PRISM filename parser (your observed convention) ---
# Example: prism_ppt_us_25m_20200115_avg_30y.zip
pat = re.compile(r"^prism_(ppt|tmean)_us_25m_(\d{8})_avg_30y\.zip$", re.IGNORECASE)

ppt = {}
tmean = {}

for p in sorted(PRISM_DIR.glob("*.zip")):
    m = pat.match(p.name)
    if not m:
        continue
    var = m.group(1).lower()
    yyyymmdd = m.group(2)

    # PRISM daily normals commonly use year=2020 as a convenient leap-year index;
    # we convert YYYYMMDD -> DOY using that year token directly.
    dt = pd.to_datetime(yyyymmdd, format="%Y%m%d", utc=True)
    doy = int(dt.dayofyear)

    if var == "ppt":
        ppt[doy] = p
    elif var == "tmean":
        tmean[doy] = p

doys = sorted(set(ppt.keys()) & set(tmean.keys()))
if not doys:
    raise RuntimeError(
        "No matching PRISM ppt/tmean normals found.\n"
        f"Expected filenames like: prism_ppt_us_25m_20200115_avg_30y.zip in {PRISM_DIR}"
    )

print(f"Matched DOYs: {len(doys)} (e.g., {doys[:5]})")

# --- Read GeoTIFF inside PRISM zip via MemoryFile ---
def open_tif_from_zip(zip_path: Path):
    with zipfile.ZipFile(zip_path, "r") as z:
        tif_names = [n for n in z.namelist() if n.lower().endswith(".tif")]
        if not tif_names:
            raise FileNotFoundError(f"No .tif found inside {zip_path.name}")
        tif_name = tif_names[0]
        tif_bytes = z.read(tif_name)

    mem = MemoryFile(tif_bytes)
    ds = mem.open()
    return mem, ds

def sample_zip_to_grid(zip_path: Path, xs3310: np.ndarray, ys3310: np.ndarray) -> np.ndarray:
    mem, ds = open_tif_from_zip(zip_path)
    try:
        tf = Transformer.from_crs(f"EPSG:{OPS_EPSG}", ds.crs, always_xy=True)
        sx, sy = tf.transform(xs3310, ys3310)

        vals = np.array([v[0] for v in ds.sample(zip(sx, sy))], dtype=np.float64)
        nodata = ds.nodata
        if nodata is not None:
            vals[vals == nodata] = np.nan
        vals[~np.isfinite(vals)] = np.nan
        return vals
    finally:
        ds.close()
        mem.close()

# --- Build shards per DOY (resumable) ---
written = 0
skipped = 0

for doy in doys:
    shard_path = SHARDS_DIR / f"prism_normals_doy_{doy:03d}.parquet"
    if shard_path.exists():
        skipped += 1
        continue

    ppt_vals = sample_zip_to_grid(ppt[doy], xs, ys).astype(np.float32)      # mm
    tm_vals  = sample_zip_to_grid(tmean[doy], xs, ys).astype(np.float32)    # °C

    out = pd.DataFrame({
        "grid_id": grid_id,
        "doy": np.full(n_cells, doy, dtype=np.int16),
        "prism_ppt_norm_mm": ppt_vals,
        "prism_tmean_norm_c": tm_vals,
    })
    out.to_parquet(shard_path, index=False)
    written += 1

    if written % 25 == 0:
        print(f"Shards written: {written} | latest DOY={doy:03d}")

print(f"Shard pass complete | written={written} | skipped={skipped}")

# --- Combine to one baseline parquet (fast enough at 366 shards) ---
shards = sorted(SHARDS_DIR.glob("prism_normals_doy_*.parquet"))
df_all = pd.concat((pd.read_parquet(p) for p in shards), ignore_index=True)
df_all.to_parquet(FINAL_PATH, index=False)

print(f"Saved baseline: {FINAL_PATH}")
print(f"Rows: {len(df_all)} | doys: {df_all['doy'].nunique()} | cells: {df_all['grid_id'].nunique()}")
print(df_all.head())

In [None]:
# === SMAP (SPL4SMGP) via Harmony: CA-only subset + quiet + strong resume ===

from pathlib import Path
import os
import datetime as dt

import earthaccess
from harmony import BBox, Client, Collection, Request, CapabilitiesRequest

# ---------- 0) env reload (reuse your existing helpers if available) ----------
try:
    apply_env_overrides()
except NameError:
    pass

if not os.environ.get("EARTHDATA_TOKEN") and not (
    os.environ.get("EARTHDATA_USERNAME") and os.environ.get("EARTHDATA_PASSWORD")
):
    raise RuntimeError("Missing Earthdata credentials. Set EARTHDATA_TOKEN (recommended) in .env.")

earthaccess.login(strategy="environment")  # should be quiet if already logged in

# ---------- 1) config ----------
bbox = CONFIG["bbox"]
W = float(bbox["nwlng"])
S = float(bbox["selat"])
E = float(bbox["selng"])
N = float(bbox["nwlat"])

START = dt.date.fromisoformat(CONFIG["start_date"])
END   = dt.date.fromisoformat(CONFIG["end_date"])

SHORT_NAME = CONFIG.get("SMAP_L4_SHORT_NAME", "SPL4SMGP")
DESIRED_VARS = CONFIG.get("SMAP_L4_VARIABLES", ["sm_surface", "sm_rootzone"])

OUT_DIR = Path(CONFIG["out_dir"])
SMAP_DIR = OUT_DIR / "manual" / "smap" / SHORT_NAME
SMAP_DIR.mkdir(parents=True, exist_ok=True)

SENTINEL_DIR = SMAP_DIR / "_done"
SENTINEL_DIR.mkdir(parents=True, exist_ok=True)

def month_start(d: dt.date) -> dt.date:
    return dt.date(d.year, d.month, 1)

def next_month(d: dt.date) -> dt.date:
    return dt.date(d.year + (d.month == 12), 1 if d.month == 12 else d.month + 1, 1)

def month_range(start: dt.date, end: dt.date):
    cur = month_start(start)
    while cur <= end:
        nxt = next_month(cur)
        yield cur, min(end + dt.timedelta(days=1), nxt)  # end is effectively exclusive
        cur = nxt

def has_any_files(folder: Path) -> bool:
    if not folder.exists():
        return False
    # Harmony may write nested outputs; look recursively for non-empty files
    for p in folder.rglob("*"):
        if p.is_file() and p.stat().st_size > 0:
            return True
    return False

print("SMAP/Harmony (quiet) setup")
print("  short_name:", SHORT_NAME)
print("  bbox (W,S,E,N):", (W, S, E, N))
print("  time:", START, "→", END)
print("  root:", SMAP_DIR)

# ---------- 2) capabilities + variable sanitization ----------
harmony_client = Client()

cap_req = CapabilitiesRequest(short_name=SHORT_NAME)
cap = harmony_client.submit(cap_req)

import json
if isinstance(cap, str):
    cap = json.loads(cap)

concept_id = cap.get("conceptId") or cap.get("concept_id") or cap.get("conceptID")
if not concept_id:
    raise RuntimeError(f"Could not determine conceptId for {SHORT_NAME} from Harmony capabilities.")

available_var_names = set()
for v in (cap.get("variables") or []):
    if isinstance(v, dict) and "name" in v:
        available_var_names.add(v["name"])

if available_var_names:
    chosen = []
    for name in DESIRED_VARS:
        if name in available_var_names:
            chosen.append(name)
        else:
            hits = [vn for vn in available_var_names if vn == name or vn.endswith(name) or name in vn]
            if hits:
                chosen.append(hits[0])
    DESIRED_VARS = sorted(set(chosen))

# ---------- 3) submit/download month-by-month (quiet + strong resume) ----------
jobs_submitted = 0
months_skipped = 0
files_downloaded = 0

for m0, m1 in month_range(START, END):
    tag = f"{m0:%Y%m}"
    sentinel = SENTINEL_DIR / f"{tag}.done"
    month_dir = SMAP_DIR / tag

    # Strong resume: skip if sentinel exists OR month folder already has files
    if sentinel.exists() or has_any_files(month_dir):
        months_skipped += 1
        if not sentinel.exists():
            # create sentinel so future runs are clean
            sentinel.write_text("done=1\nnote=folder already had files\n")
        print(f"[SKIP] {tag}")
        continue

    req = Request(
        collection=Collection(id=concept_id),
        spatial=BBox(W, S, E, N),
        temporal={"start": dt.datetime(m0.year, m0.month, m0.day),
                  "stop":  dt.datetime(m1.year, m1.month, m1.day)},
        variables=DESIRED_VARS if DESIRED_VARS else None,
    )

    if not req.is_valid():
        raise RuntimeError(f"Invalid Harmony request for month {tag}: check bbox/time/vars")

    print(f"[RUN] {tag} submitting…")
    job_id = harmony_client.submit(req)
    jobs_submitted += 1

    # Quiet wait (no progress spam)
    harmony_client.wait_for_processing(job_id, show_progress=False)

    month_dir.mkdir(parents=True, exist_ok=True)

    futures = harmony_client.download_all(job_id, directory=str(month_dir), overwrite=False)
    out_files = [f.result() for f in futures]
    out_files = [p for p in out_files if p]

    files_downloaded += len(out_files)
    sentinel.write_text(f"job_id={job_id}\nfiles={len(out_files)}\n")
    print(f"[DONE] {tag} files={len(out_files)}")

print("SMAP/Harmony complete")
print("  jobs submitted:", jobs_submitted)
print("  months skipped:", months_skipped)
print("  files downloaded this run:", files_downloaded)
print("  root dir:", SMAP_DIR)

In [None]:
# === SMAP Cell A (fixed): SPL4SMGP Harmony subset -> daily canonical parquet (resume-safe, quiet) ===

from pathlib import Path
import os
import re
import numpy as np
import pandas as pd
import xarray as xr

OUT_DIR = Path(CONFIG["out_dir"])
SMAP_ROOT = OUT_DIR / Path(CONFIG.get("SMAP_L4_DIRNAME", "manual/smap/SPL4SMGP"))
if not SMAP_ROOT.exists():
    raise FileNotFoundError(f"Missing SMAP root dir: {SMAP_ROOT}")

DAILY_DIR = OUT_DIR / Path(CONFIG.get("SMAP_DAILY_DIRNAME", "derived/smap_daily"))
DAILY_DIR.mkdir(parents=True, exist_ok=True)

GROUP = (CONFIG.get("SMAP_NETCDF_GROUP", "Geophysical_Data") or "").strip()
VAR_SURF = CONFIG.get("SMAP_VAR_SURFACE", "sm_surface")
VAR_ROOT = CONFIG.get("SMAP_VAR_ROOTZONE", "sm_rootzone")

TEMPLATE = CONFIG.get("SMAP_DAILY_TEMPLATE", "smap_daily_{date}.parquet")
AGG = (CONFIG.get("SMAP_DAILY_AGG", "median") or "median").lower()
LOG_EVERY = int(CONFIG.get("SMAP_LOG_EVERY_N_DAYS", 20))

if AGG not in {"median", "mean"}:
    raise ValueError("SMAP_DAILY_AGG must be 'median' or 'mean'")

# Filenames like: SMAP_L4_SM_gph_20240731T223000_Vv8010_001_subsetted.nc4
TS_RE = re.compile(r"_(\d{8})T(\d{6})_")

def parse_dt_from_name(p: Path) -> pd.Timestamp:
    m = TS_RE.search(p.name)
    if not m:
        raise ValueError(f"Cannot parse timestamp from filename: {p.name}")
    ymd, hms = m.group(1), m.group(2)
    return pd.Timestamp(f"{ymd}{hms}", tz="UTC")

def out_path_for_date(d_utc: pd.Timestamp) -> Path:
    return DAILY_DIR / TEMPLATE.format(date=d_utc.date().isoformat())

def open_smap(fp: Path) -> xr.Dataset:
    # Use netcdf4 engine; open the science group
    return xr.open_dataset(fp, engine="netcdf4", group=GROUP, decode_times=False, mask_and_scale=True)

def agg_stack(stack: np.ndarray) -> np.ndarray:
    # stack shape: (T, H, W), with NaN
    if AGG == "median":
        with np.errstate(all="ignore"):
            return np.nanmedian(stack, axis=0)
    return np.nanmean(stack, axis=0)

# Collect likely data files (Harmony outputs often .nc4)
OK_EXT = {".nc4", ".nc", ".cdf", ".h5", ".hdf5"}
all_files = sorted(
    p for p in SMAP_ROOT.rglob("*")
    if p.is_file()
    and "_done" not in p.parts
    and not p.name.endswith(".done")
    and p.stat().st_size > 0
    and (p.suffix.lower() in OK_EXT)
)

if not all_files:
    raise FileNotFoundError(f"No SMAP data files found under {SMAP_ROOT}")

# Build index of files with timestamps
rows = []
for fp in all_files:
    try:
        ts = parse_dt_from_name(fp)
    except Exception:
        continue
    rows.append((fp, ts, ts.normalize()))

idx = pd.DataFrame(rows, columns=["path", "ts_utc", "date_utc"])
if idx.empty:
    raise RuntimeError("Found SMAP files but none matched the expected timestamp pattern.")

# Filter to your configured analysis window
start = pd.Timestamp(CONFIG["start_date"], tz="UTC")
end = pd.Timestamp(CONFIG["end_date"], tz="UTC") + pd.Timedelta(days=1) - pd.Timedelta(seconds=1)
idx = idx[(idx["ts_utc"] >= start) & (idx["ts_utc"] <= end)].copy()
idx.sort_values(["date_utc", "ts_utc"], inplace=True)

dates = idx["date_utc"].drop_duplicates().tolist()
if not dates:
    raise RuntimeError("No SMAP files within CONFIG start/end window.")

print("SMAP Cell A starting")
print("  SMAP root      :", SMAP_ROOT)
print("  Files in window:", len(idx))
print("  Days in window :", len(dates))
print("  Group/vars     :", GROUP, VAR_SURF, VAR_ROOT)
print("  Output dir     :", DAILY_DIR)

processed = 0
skipped = 0

for i, d in enumerate(dates, start=1):
    outp = out_path_for_date(d)
    if outp.exists() and outp.stat().st_size > 0:
        skipped += 1
        continue

    day_files = idx.loc[idx["date_utc"] == d, "path"].tolist()
    if not day_files:
        continue

    surf_list, root_list = [], []

    for fp in day_files:
        ds = open_smap(fp)

        # pull arrays
        surf = ds[VAR_SURF].astype("float32").values
        root = ds[VAR_ROOT].astype("float32").values

        # replace fill (-9999) + enforce plausible range [0,1]
        surf = np.where((surf >= 0.0) & (surf <= 1.0), surf, np.nan)
        root = np.where((root >= 0.0) & (root <= 1.0), root, np.nan)

        surf_list.append(surf)
        root_list.append(root)

        ds.close()

    surf_stack = np.stack(surf_list, axis=0)
    root_stack = np.stack(root_list, axis=0)

    surf_day = agg_stack(surf_stack)
    root_day = agg_stack(root_stack)

    n_obs = np.sum(np.isfinite(surf_stack), axis=0).astype("int16")

    H, W = surf_day.shape
    yy, xx = np.indices((H, W))

    df = pd.DataFrame({
        "date_utc": np.repeat(d, H * W),
        "y": yy.ravel().astype("int32"),
        "x": xx.ravel().astype("int32"),
        "sm_surface": surf_day.ravel().astype("float32"),
        "sm_rootzone": root_day.ravel().astype("float32"),
        "n_obs": n_obs.ravel().astype("int16"),
    })

    # Drop pixels with no data at all
    df = df[~(df["sm_surface"].isna() & df["sm_rootzone"].isna())]

    tmp = outp.with_suffix(".parquet.tmp")
    df.to_parquet(tmp, index=False)
    os.replace(tmp, outp)

    processed += 1
    if (processed % LOG_EVERY) == 0 or i == len(dates):
        print(f"  day {i}/{len(dates)} | wrote={processed} skipped={skipped} | last={d.date()} | rows={len(df):,}")

print("SMAP Cell A complete")
print("  wrote  :", processed)
print("  skipped:", skipped)
print("  daily dir:", DAILY_DIR)

In [None]:
# Some SMAP validation/QA on daily parquet outputs

from pathlib import Path
import pandas as pd
import numpy as np

OUT_DIR = Path(CONFIG["out_dir"])
DAILY_DIR = OUT_DIR / Path(CONFIG.get("SMAP_DAILY_DIRNAME", "derived/smap_daily"))

files = sorted(DAILY_DIR.glob("smap_daily_*.parquet"))
assert files, f"No SMAP daily parquet files found in {DAILY_DIR}"

# Sample a few days + last day
sample_files = [files[0], files[len(files)//2], files[-1]]
rows = []

for fp in sample_files:
    df = pd.read_parquet(fp)

    rows.append({
        "file": fp.name,
        "rows": len(df),
        "unique_pixels": df[["y","x"]].drop_duplicates().shape[0],
        "sm_surface_min": float(np.nanmin(df["sm_surface"].values)) if df["sm_surface"].notna().any() else np.nan,
        "sm_surface_p01": float(np.nanpercentile(df["sm_surface"].dropna(), 1)) if df["sm_surface"].notna().any() else np.nan,
        "sm_surface_p50": float(np.nanpercentile(df["sm_surface"].dropna(), 50)) if df["sm_surface"].notna().any() else np.nan,
        "sm_surface_p99": float(np.nanpercentile(df["sm_surface"].dropna(), 99)) if df["sm_surface"].notna().any() else np.nan,
        "sm_surface_max": float(np.nanmax(df["sm_surface"].values)) if df["sm_surface"].notna().any() else np.nan,
        "sm_rootzone_p50": float(np.nanpercentile(df["sm_rootzone"].dropna(), 50)) if df["sm_rootzone"].notna().any() else np.nan,
        "n_obs_p50": float(np.nanpercentile(df["n_obs"].values, 50)),
        "n_obs_min": int(df["n_obs"].min()),
        "n_obs_max": int(df["n_obs"].max()),
        "nan_surface_frac": float(df["sm_surface"].isna().mean()),
        "nan_rootzone_frac": float(df["sm_rootzone"].isna().mean()),
    })

qa = pd.DataFrame(rows)
qa

In [None]:
# === SMAP Cell B: daily SMAP pixels -> HydroPulse 3km grid_id daily table (resume-safe) ===

from pathlib import Path
import os
import numpy as np
import pandas as pd

import geopandas as gpd
from shapely.geometry import Point
import xarray as xr

OUT_DIR = Path(CONFIG["out_dir"])

# Inputs
GRID_PATH = Path(resolve_out_path(CONFIG.get("GRID_FILENAME", "grid_3000m_CA_epsg3310.parquet")))
SMAP_ROOT = OUT_DIR / Path(CONFIG.get("SMAP_L4_DIRNAME", "manual/smap/SPL4SMGP"))
SMAP_DAILY_DIR = OUT_DIR / Path(CONFIG.get("SMAP_DAILY_DIRNAME", "derived/smap_daily"))

if not GRID_PATH.exists():
    raise FileNotFoundError(f"Missing grid parquet: {GRID_PATH}")
if not SMAP_ROOT.exists():
    raise FileNotFoundError(f"Missing SMAP root: {SMAP_ROOT}")
if not SMAP_DAILY_DIR.exists():
    raise FileNotFoundError(f"Missing SMAP daily dir: {SMAP_DAILY_DIR}")

# Outputs
GRIDMAP_PATH = OUT_DIR / CONFIG.get("SMAP_GRIDMAP_FILENAME", "smap_pixel_to_grid_3310.parquet")
SHARDS_DIR = OUT_DIR / Path(CONFIG.get("SMAP_GRID_DAILY_SHARDS_DIRNAME", "derived/smap_grid_shards"))
SHARDS_DIR.mkdir(parents=True, exist_ok=True)

final_name_tmpl = CONFIG.get("SMAP_GRID_FINAL_FILENAME", "smap_daily_grid_CA_3000m_epsg3310_{start}_{end}.parquet")
FINAL_PATH = OUT_DIR / final_name_tmpl.format(
    start=CONFIG["start_date"].replace("-", ""),
    end=CONFIG["end_date"].replace("-", "")
)

AGG = (CONFIG.get("SMAP_GRID_AGG", "mean") or "mean").lower()
MIN_PIX = int(CONFIG.get("SMAP_GRID_MIN_PIXELS", 1))
LOG_EVERY = int(CONFIG.get("SMAP_LOG_EVERY_N_DAYS", 20))
NEAR_KM = float(CONFIG.get("SMAP_NEAREST_FALLBACK_KM", 6.0))
NEAR_M = NEAR_KM * 1000.0

if AGG not in {"mean", "median"}:
    raise ValueError("SMAP_GRID_AGG must be 'mean' or 'median'")

# -------------------------
# 1) Load HydroPulse grid
# -------------------------
grid = gpd.read_parquet(GRID_PATH)
if "grid_id" not in grid.columns:
    raise KeyError("Grid parquet must contain 'grid_id' column.")
if "geometry" not in grid.columns:
    raise KeyError("Grid parquet must contain 'geometry' column.")

# Ensure CRS is EPSG:3310 (your OPS_EPSG)
OPS_EPSG = int(CONFIG.get("OPS_EPSG", 3310))
if grid.crs is None:
    grid = grid.set_crs(epsg=OPS_EPSG)
else:
    grid = grid.to_crs(epsg=OPS_EPSG)

grid = grid[["grid_id", "geometry"]].copy()

# -------------------------
# 2) Build (y,x)->grid_id map (one-time), using cell_lat/cell_lon from a sample .nc4
# -------------------------
def find_sample_nc4() -> Path:
    ok_ext = {".nc4", ".nc", ".cdf"}
    cands = sorted(
        p for p in SMAP_ROOT.rglob("*")
        if p.is_file()
        and "_done" not in p.parts
        and p.stat().st_size > 0
        and p.suffix.lower() in ok_ext
    )
    if not cands:
        raise FileNotFoundError(f"No .nc/.nc4 files found under {SMAP_ROOT}")
    # pick largest, tends to be real science granule
    return max(cands, key=lambda p: p.stat().st_size)

def build_gridmap() -> pd.DataFrame:
    sample = find_sample_nc4()

    # Root group contains cell_lat/cell_lon per your earlier debug
    ds_root = xr.open_dataset(sample, engine="netcdf4", decode_times=False, mask_and_scale=True)

    # Expect 2D arrays named cell_lat / cell_lon
    if "cell_lat" not in ds_root.variables or "cell_lon" not in ds_root.variables:
        raise KeyError(f"Expected cell_lat/cell_lon in root group. Vars: {list(ds_root.variables.keys())[:50]}")

    lat = ds_root["cell_lat"].values
    lon = ds_root["cell_lon"].values

    # y/x coordinate arrays exist too; but we use integer indices y,x matching your daily parquet
    H, W = lat.shape
    yy, xx = np.indices((H, W))

    # Flatten
    df = pd.DataFrame({
        "y": yy.ravel().astype("int32"),
        "x": xx.ravel().astype("int32"),
        "lat": lat.ravel().astype("float64"),
        "lon": lon.ravel().astype("float64"),
    })
    ds_root.close()

    # Drop any missing or insane coords
    df = df.replace([np.inf, -np.inf], np.nan).dropna(subset=["lat", "lon"])
    df = df[(df["lat"] >= -90) & (df["lat"] <= 90) & (df["lon"] >= -180) & (df["lon"] <= 180)]

    # Make points in WGS84 then project to 3310
    gdf = gpd.GeoDataFrame(
        df[["y", "x"]].copy(),
        geometry=gpd.points_from_xy(df["lon"], df["lat"]),
        crs="EPSG:4326"
    ).to_crs(epsg=OPS_EPSG)

    # Spatial join to grid polygons
    joined = gpd.sjoin(gdf, grid, how="left", predicate="within")[["y", "x", "grid_id", "geometry"]].copy()

    missing = joined["grid_id"].isna().sum()
    if missing > 0:
        # Fallback: nearest join within radius (meters)
        # Keep only missing points for nearest join
        miss = joined[joined["grid_id"].isna()].drop(columns=["grid_id"]).copy()
        # sjoin_nearest is available in geopandas >=0.10; use max_distance to avoid nonsense matches
        nearest = gpd.sjoin_nearest(miss, grid, how="left", max_distance=NEAR_M, distance_col="dist_m")
        joined.loc[joined["grid_id"].isna(), "grid_id"] = nearest["grid_id"].values

    joined = joined.drop(columns=["geometry"])
    joined = joined.dropna(subset=["grid_id"]).copy()

    # Enforce uniqueness
    joined = joined.drop_duplicates(subset=["y", "x"])
    return joined

if GRIDMAP_PATH.exists() and GRIDMAP_PATH.stat().st_size > 0:
    gridmap = pd.read_parquet(GRIDMAP_PATH)
else:
    gridmap = build_gridmap()
    tmp = GRIDMAP_PATH.with_suffix(".tmp.parquet")
    gridmap.to_parquet(tmp, index=False)
    os.replace(tmp, GRIDMAP_PATH)

if not {"y","x","grid_id"}.issubset(set(gridmap.columns)):
    raise RuntimeError(f"Bad gridmap schema: {gridmap.columns}")

print("SMAP Cell B starting")
print("  Grid:", GRID_PATH)
print("  SMAP daily dir:", SMAP_DAILY_DIR)
print("  Gridmap:", GRIDMAP_PATH, f"(rows={len(gridmap):,})")
print("  Shards dir:", SHARDS_DIR)
print("  FINAL:", FINAL_PATH)

# -------------------------
# 3) Process day shards (resume-safe)
# -------------------------
daily_files = sorted(SMAP_DAILY_DIR.glob("smap_daily_*.parquet"))
if not daily_files:
    raise FileNotFoundError(f"No smap_daily parquet files found in {SMAP_DAILY_DIR}")

written = 0
skipped = 0

def agg_group(df: pd.DataFrame) -> pd.DataFrame:
    # df has grid_id and moisture fields
    if AGG == "mean":
        out = df.groupby("grid_id", as_index=False).agg(
            sm_surface=("sm_surface", "mean"),
            sm_rootzone=("sm_rootzone", "mean"),
            n_pixels=("grid_id", "size"),
            n_obs_mean=("n_obs", "mean"),
        )
    else:
        out = df.groupby("grid_id", as_index=False).agg(
            sm_surface=("sm_surface", "median"),
            sm_rootzone=("sm_rootzone", "median"),
            n_pixels=("grid_id", "size"),
            n_obs_mean=("n_obs", "mean"),
        )
    return out

for i, fp in enumerate(daily_files, start=1):
    date_str = fp.stem.split("_")[-1]  # YYYY-MM-DD
    outp = SHARDS_DIR / f"smap_grid_{date_str}.parquet"

    if outp.exists() and outp.stat().st_size > 0:
        skipped += 1
        continue

    df = pd.read_parquet(fp)  # columns: date_utc,y,x,sm_surface,sm_rootzone,n_obs
    # Join mapping
    df = df.merge(gridmap, on=["y","x"], how="inner")

    # Aggregate to grid
    g = agg_group(df)

    # Apply min pixels filter
    g = g[g["n_pixels"] >= MIN_PIX].copy()
    g.insert(1, "date_utc", pd.Timestamp(date_str, tz="UTC"))

    tmp = outp.with_suffix(".tmp.parquet")
    g.to_parquet(tmp, index=False)
    os.replace(tmp, outp)

    written += 1
    if (written % LOG_EVERY) == 0 or i == len(daily_files):
        print(f"  day {i}/{len(daily_files)} | wrote={written} skipped={skipped} | last={date_str} | rows={len(g):,}")

print("SMAP Cell B shards complete")
print("  wrote  :", written)
print("  skipped:", skipped)

# -------------------------
# 4) Optional: stitch shards into one final parquet (resume-safe)
# -------------------------
if FINAL_PATH.exists() and FINAL_PATH.stat().st_size > 0:
    print("[SKIP] Final exists:", FINAL_PATH)
else:
    shard_files = sorted(SHARDS_DIR.glob("smap_grid_*.parquet"))
    if not shard_files:
        raise RuntimeError("No SMAP grid shards found to stitch.")
    parts = [pd.read_parquet(p) for p in shard_files]
    final = pd.concat(parts, ignore_index=True)
    tmp = FINAL_PATH.with_suffix(".tmp.parquet")
    final.to_parquet(tmp, index=False)
    os.replace(tmp, FINAL_PATH)
    print("Saved FINAL:", FINAL_PATH, "rows=", len(final))

In [None]:
# ============================
# SNOTEL — Cell A
# Parse, normalize, and sanity-check
# First, we download data manually from https://wcc.sc.egov.usda.gov/reportGenerator/
# Parameters are: click on "Advanced search", select Network - SNOTEL, all stations in CA - this is about 500 stations,
# date range 1991-01-01 to 2026-01-12 (most recent available as of writing),
# and select a few variables
# The buildable report URL looks like this: 
# https://wcc.sc.egov.usda.gov/reportGenerator/view_csv/customMultiTimeSeriesGroupByStationReport/daily/start_of_period/county=%2522Alameda%2522,%2522Alpine%2522,%2522Amador%2522,%2522Butte%2522,%2522Calaveras%2522,%2522Colusa%2522,%2522Contra%2520Costa%2522,%2522Del%2520Norte%2522,%2522El%2520Dorado%2522,%2522Fresno%2522,%2522Glenn%2522,%2522Humboldt%2522,%2522Imperial%2522,%2522Inyo%2522,%2522Kern%2522,%2522Kings%2522,%2522Lake%2522,%2522Lassen%2522,%2522Los%2520Angeles%2522,%2522Madera%2522,%2522Marin%2522,%2522Mariposa%2522,%2522Mendocino%2522,%2522Merced%2522,%2522Modoc%2522,%2522Mono%2522,%2522Monterey%2522,%2522Napa%2522,%2522Nevada%2522,%2522Orange%2522,%2522Placer%2522,%2522Plumas%2522,%2522Riverside%2522,%2522Sacramento%2522,%2522San%2520Benito%2522,%2522San%2520Bernardino%2522,%2522San%2520Diego%2522,%2522San%2520Francisco%2522,%2522San%2520Joaquin%2522,%2522San%2520Luis%2520Obispo%2522,%2522San%2520Mateo%2522,%2522Santa%2520Barbara%2522,%2522Santa%2520Clara%2522,%2522Santa%2520Cruz%2522,%2522Shasta%2522,%2522Sierra%2522,%2522Siskiyou%2522,%2522Solano%2522,%2522Sonoma%2522,%2522Stanislaus%2522,%2522Sutter%2522,%2522Tehama%2522,%2522Trinity%2522,%2522Tulare%2522,%2522Tuolumne%2522,%2522Ventura%2522,%2522Yolo%2522,%2522Yuba%2522,%2522UNKNOWN%2522%2520AND%2520network=%2522SNTL%2522%2520AND%2520outServiceDate=%25222100-01-01%2522%257Cname/1991-01-01,2026-12-31/stationId,name,WTEQ::value,SNWD::value,PREC::value,PRCP:-2:value,PRCP::value,TAVG::value,TMAX::value,TMIN::value?fitToScreen=false
# For making more changes, one can go to the main URL, experiment with parameters, and then copy the resulting CSV URL.
# Manually save the file as "manual/snotel/snotel-19910101-20260112.txt"
# and then of course, run the below code to parse and normalize it.
# ============================

# ============================
# SNOTEL — Cell A (FINAL, resolve_out_path)
# - Reads one wide NRCS report export (1991 → present)
# - Melts to long tidy format (station_id × date × variable)
# - Prefers non "-2in" variant when duplicates exist
# - Writes parquet directly into results/ (out_dir)
# ============================

import pandas as pd
import numpy as np
import re
from pathlib import Path

# ---------- config-driven paths ----------
OUT_DIR = Path(CONFIG["out_dir"])
SNOTEL_MANUAL_DIR = Path(resolve_out_path(CONFIG["SNOTEL_MANUAL_DIR"]))         # e.g. "manual/snotel"
RAW_FILE = SNOTEL_MANUAL_DIR / Path(CONFIG["SNOTEL_RAW_FILENAME"]).name
OUT_PARQUET = OUT_DIR / CONFIG["SNOTEL_DAILY_LONG_PARQUET_NAME"]               # e.g. "snotel_daily_long.parquet"

print("SNOTEL Cell A starting")
print("  out_dir   :", OUT_DIR)
print("  raw file  :", RAW_FILE)
print("  out parquet:", OUT_PARQUET)

if not RAW_FILE.exists():
    raise FileNotFoundError(f"Missing SNOTEL raw file: {RAW_FILE}")

# ---------- read raw (robust: skip WCC metadata preamble) ----------
def find_header_row(path: Path, max_lines: int = 500) -> int:
    """
    Find the first line index (0-based) that looks like the actual CSV header.
    WCC report exports often include a preamble before the header.
    """
    with open(path, "r", encoding="utf-8", errors="replace") as f:
        for i, line in enumerate(f):
            if i >= max_lines:
                break
            s = line.strip().lower()
            # header usually starts with 'date' and contains at least one '(' stationId ')'
            if s.startswith("date") and "(" in s and ")" in s and "," in s:
                return i
            # sometimes it's literally just 'date,' without station ids on the same line (rare)
            if s.startswith("date,"):
                return i
    raise ValueError(f"Could not find header row in first {max_lines} lines of {path}")

header_row = find_header_row(RAW_FILE)
print(f"SNOTEL: detected CSV header at line {header_row+1} (1-based)")

df_raw = pd.read_csv(
    RAW_FILE,
    header=header_row,
    low_memory=False,
    encoding="utf-8",
    encoding_errors="replace",
)
# First column is date in this NRCS export
df_raw.rename(columns={df_raw.columns[0]: "date"}, inplace=True)
df_raw["date"] = pd.to_datetime(df_raw["date"], errors="coerce", utc=True)
if df_raw["date"].isna().any():
    bad = df_raw[df_raw["date"].isna()].head(5)
    raise ValueError(f"Found NaT dates after parsing date column; sample:\n{bad}")

# ---------- helper to parse column names ----------
def parse_col(col: str):
    """
    Expected column pattern:
      '<station name> (<stationId>) <variable label...>'
    Returns (station_name, station_id, variable_key, is_minus2_variant) or None
    """
    m = re.match(r"(.+?)\s+\((\d+)\)\s+(.+)", col)
    if not m:
        return None

    station_name = m.group(1).strip()
    station_id = int(m.group(2))
    rest = m.group(3)

    # Canonical variables
    if "Snow Water Equivalent" in rest:
        var = "swe_in"
    elif "Snow Depth" in rest:
        var = "snow_depth_in"
    elif "Precipitation Increment" in rest:
        var = "precip_increment_in"
    elif "Precipitation Accumulation" in rest:
        var = "precip_accum_in"
    elif "Air Temperature Average" in rest:
        var = "tavg_f"
    elif "Air Temperature Maximum" in rest:
        var = "tmax_f"
    elif "Air Temperature Minimum" in rest:
        var = "tmin_f"
    else:
        return None

    is_minus2 = "-2in" in rest
    return station_name, station_id, var, is_minus2

# ---------- melt wide -> long ----------
records = []
for col in df_raw.columns[1:]:
    parsed = parse_col(col)
    if parsed is None:
        continue

    station_name, station_id, var, is_minus2 = parsed

    s = pd.to_numeric(df_raw[col], errors="coerce").astype("float32")

    # Basic validity cleaning (keep conservative; do more later if needed)
    if var.startswith("t"):
        # Fahrenheit: discard extreme negatives (sentinels)
        s = s.where(s > -50)
    else:
        # SWE/Depth/Precip should not be negative
        s = s.where(s >= 0)

    records.append(pd.DataFrame({
        "date": df_raw["date"],
        "station_id": station_id,
        "station_name": station_name,
        "variable": var,
        "value": s,
        "minus2_variant": is_minus2
    }))

if not records:
    raise RuntimeError(
        "Parsed zero SNOTEL data columns. The column naming pattern likely changed. "
        "Inspect df_raw.columns[:50] to update parse_col()."
    )

df_long = pd.concat(records, ignore_index=True)

# ---------- resolve duplicate variants (prefer non '-2in') ----------
df_long.sort_values(
    by=["date", "station_id", "variable", "minus2_variant"],  # False first -> non -2in preferred
    inplace=True
)

df_long = (
    df_long
    .drop_duplicates(subset=["date", "station_id", "variable"], keep="first")
    .drop(columns=["minus2_variant"])
)

df_long["source"] = "SNOTEL"

# ---------- sanity summary ----------
print("SNOTEL Cell A summary")
print("  rows      :", len(df_long))
print("  stations  :", df_long["station_id"].nunique())
print("  variables :", sorted(df_long["variable"].unique()))
print("  date min  :", df_long["date"].min())
print("  date max  :", df_long["date"].max())

# ---------- write (atomic) ----------
tmp_path = OUT_PARQUET.with_suffix(".tmp.parquet")
df_long.to_parquet(tmp_path, index=False)
tmp_path.replace(OUT_PARQUET)

print("Saved:", OUT_PARQUET)

In [None]:
# ============================
# SNOTEL — Station metadata (AWDB)
# Builds station_id -> lat/lon lookup for gridding
# ============================

import pandas as pd
import numpy as np
import requests
from pathlib import Path
from urllib.parse import quote

OUT_DIR = Path(CONFIG["out_dir"])
SNOTEL_LONG_PATH = OUT_DIR / CONFIG.get("SNOTEL_DAILY_LONG_PARQUET_NAME", "snotel_daily_long.parquet")
META_OUT = OUT_DIR / CONFIG.get("SNOTEL_STATION_META_PARQUET_NAME", "snotel_station_metadata.parquet")

AWDB_URL_TMPL = CONFIG.get(
    "AWDB_META_URL_TEMPLATE",
    "https://wcc.sc.egov.usda.gov/awdbRestApi/services/v1/stations?stationTriplets={triplets}&elements="
)

if not SNOTEL_LONG_PATH.exists():
    raise FileNotFoundError(f"Missing SNOTEL long parquet: {SNOTEL_LONG_PATH}")

df_long = pd.read_parquet(SNOTEL_LONG_PATH, columns=["station_id", "station_name"])
station_ids = sorted(df_long["station_id"].dropna().astype(int).unique().tolist())

print("SNOTEL station metadata starting")
print("  stations:", len(station_ids))
print("  out     :", META_OUT)

# If cached, load and only fetch missing
if META_OUT.exists():
    meta = pd.read_parquet(META_OUT)
    have = set(meta["station_id"].astype(int).unique().tolist())
    missing = [sid for sid in station_ids if sid not in have]
    print(f"  cached  : {len(have)} stations | missing: {len(missing)}")
else:
    meta = pd.DataFrame()
    missing = station_ids

def fetch_awdb(triplets: list[str]) -> pd.DataFrame:
    # AWDB expects comma-separated triplets like "1234:CA:SNTL,5678:CA:SNTL"
    trip_str = ",".join(triplets)
    url = AWDB_URL_TMPL.format(triplets=quote(trip_str, safe=",:"))
    r = requests.get(url, timeout=120)
    r.raise_for_status()
    js = r.json()
    # AWDB returns {'stations': [...]} or direct list depending on endpoint version
    rows = js.get("stations", js) if isinstance(js, dict) else js
    return pd.json_normalize(rows)

rows = []
batch_size = 80  # conservative; avoids long URLs
for i in range(0, len(missing), batch_size):
    batch = missing[i:i+batch_size]
    triplets = [f"{sid}:CA:SNTL" for sid in batch]  # CA SNOTEL triplet format
    try:
        got = fetch_awdb(triplets)
    except Exception as e:
        raise RuntimeError(f"AWDB metadata fetch failed for batch {i//batch_size}: {e}")
    rows.append(got)
    if (i//batch_size + 1) % 5 == 0 or (i + batch_size) >= len(missing):
        print(f"  fetched batch {i//batch_size + 1} | total rows so far: {sum(len(x) for x in rows):,}")

new = pd.concat(rows, ignore_index=True) if rows else pd.DataFrame()

# Normalize / keep only what we need; AWDB fields can vary slightly.
# We key on 'stationTriplet' and/or 'stationId' when present.
if not new.empty:
    # stationId sometimes present; otherwise parse from stationTriplet
    if "stationId" in new.columns:
        new["station_id"] = pd.to_numeric(new["stationId"], errors="coerce")
    elif "stationTriplet" in new.columns:
        new["station_id"] = pd.to_numeric(new["stationTriplet"].str.split(":").str[0], errors="coerce")
    else:
        raise KeyError(f"AWDB response missing stationId/stationTriplet; columns={list(new.columns)[:50]}")

    # lat/lon fields
    lat_col = "latitude" if "latitude" in new.columns else ("lat" if "lat" in new.columns else None)
    lon_col = "longitude" if "longitude" in new.columns else ("lon" if "lon" in new.columns else None)
    if lat_col is None or lon_col is None:
        raise KeyError(f"AWDB response missing lat/lon columns; columns={list(new.columns)[:50]}")

    out = pd.DataFrame({
        "station_id": new["station_id"].astype("Int64"),
        "lat": pd.to_numeric(new[lat_col], errors="coerce"),
        "lon": pd.to_numeric(new[lon_col], errors="coerce"),
        "elev_m": pd.to_numeric(new.get("elevation", np.nan), errors="coerce"),
        "state": new.get("state", "CA"),
        "station_name_meta": new.get("name", pd.NA),
        "station_triplet": new.get("stationTriplet", pd.NA),
    }).dropna(subset=["station_id"])
else:
    out = pd.DataFrame(columns=["station_id","lat","lon","elev_m","state","station_name_meta","station_triplet"])

# Merge with existing cache
if meta.empty:
    meta2 = out
else:
    meta2 = pd.concat([meta, out], ignore_index=True)
    meta2 = meta2.drop_duplicates(subset=["station_id"], keep="last")

# Basic sanity
meta2["station_id"] = meta2["station_id"].astype(int)
ok = meta2.dropna(subset=["lat","lon"])
print("  meta rows:", len(meta2), "| with lat/lon:", len(ok))

tmp = META_OUT.with_suffix(".tmp.parquet")
meta2.to_parquet(tmp, index=False)
tmp.replace(META_OUT)
print("Saved:", META_OUT)

In [None]:
# ============================
# SNOTEL — Cell B
# Grid stations to 3km grid; write daily shards; stitch final parquet (analysis window)
# ============================

import pandas as pd
import numpy as np
from pathlib import Path
import geopandas as gpd

# ---------- paths ----------
OUT_DIR = Path(CONFIG["out_dir"])

GRID_PATH = Path(resolve_out_path(CONFIG.get("GRID_FILENAME", "grid_3000m_CA_epsg3310.parquet")))
SNOTEL_LONG_PATH = OUT_DIR / CONFIG.get("SNOTEL_DAILY_LONG_PARQUET_NAME", "snotel_daily_long.parquet")

GRIDMAP_PATH = OUT_DIR / CONFIG.get("SNOTEL_GRIDMAP_PARQUET_NAME", "snotel_station_to_grid_3310.parquet")
SHARDS_DIR = OUT_DIR / CONFIG.get("SNOTEL_GRID_SHARDS_DIRNAME", "derived/snotel_grid_shards")
SHARDS_DIR.mkdir(parents=True, exist_ok=True)

start = pd.to_datetime(CONFIG["start_date"]).tz_localize("UTC")
end   = pd.to_datetime(CONFIG["end_date"]).tz_localize("UTC")

final_name_tmpl = CONFIG.get(
    "SNOTEL_DAILY_GRID_PARQUET_NAME",
    "snotel_daily_grid_CA_3000m_epsg3310_{start}_{end}.parquet"
)
FINAL_PATH = OUT_DIR / final_name_tmpl.format(
    start=start.strftime("%Y%m%d"),
    end=end.strftime("%Y%m%d")
)

print("SNOTEL Cell B starting")
print("  Grid     :", GRID_PATH)
print("  SNOTEL A :", SNOTEL_LONG_PATH)
print("  Gridmap  :", GRIDMAP_PATH)
print("  Shards   :", SHARDS_DIR)
print("  FINAL    :", FINAL_PATH)

if not GRID_PATH.exists():
    raise FileNotFoundError(f"Missing grid parquet: {GRID_PATH}")
if not SNOTEL_LONG_PATH.exists():
    raise FileNotFoundError(f"Missing SNOTEL long parquet (Cell A output): {SNOTEL_LONG_PATH}")

# ---------- load grid ----------
ggrid = gpd.read_parquet(GRID_PATH)
if "grid_id" not in ggrid.columns:
    raise KeyError("Grid file must contain grid_id")
if ggrid.crs is None:
    raise ValueError("Grid GeoDataFrame missing CRS")
# ensure EPSG 3310 ops
ggrid = ggrid.to_crs(epsg=int(CONFIG.get("OPS_EPSG", 3310)))

# precompute centroids for nearest mapping
ggrid_cent = ggrid.copy()
ggrid_cent["geometry"] = ggrid_cent.geometry.centroid
ggrid_cent = ggrid_cent[["grid_id", "geometry"]]

# ---------- load SNOTEL long ----------
df = pd.read_parquet(SNOTEL_LONG_PATH)

# Slice to analysis window early (massive speedup)
df = df[(df["date"] >= start) & (df["date"] <= end)].copy()
if df.empty:
    raise ValueError("SNOTEL long table has no rows in analysis window. Check date parsing/timezones.")

META_PATH = OUT_DIR / CONFIG.get("SNOTEL_STATION_META_PARQUET_NAME", "snotel_station_metadata.parquet")
if not META_PATH.exists():
    raise FileNotFoundError(f"Missing station metadata parquet: {META_PATH}. Run the SNOTEL metadata cell first.")

meta = pd.read_parquet(META_PATH)
meta["station_id"] = meta["station_id"].astype(int)

# Build station table from metadata + names from long table
stations = (
    df[["station_id", "station_name"]]
    .drop_duplicates(subset=["station_id"])
    .merge(meta[["station_id","lat","lon"]], on="station_id", how="left")
)

missing_ll = stations["lat"].isna().sum()
if missing_ll:
    print(f"[WARN] {missing_ll} stations missing lat/lon; they will be dropped for gridding.")
stations = stations.dropna(subset=["lat","lon"])


# ---------- station -> grid mapping (cached) ----------
if GRIDMAP_PATH.exists():
    gridmap = pd.read_parquet(GRIDMAP_PATH)
    print(f"Loaded existing gridmap ({len(gridmap):,} stations)")
else:
    gstations = gpd.GeoDataFrame(
        stations,
        geometry=gpd.points_from_xy(stations["lon"], stations["lat"]),
        crs="EPSG:4326"
    ).to_crs(ggrid_cent.crs)

    # nearest grid centroid
    joined = gpd.sjoin_nearest(
        gstations,
        ggrid_cent,
        how="left",
        distance_col="dist_m"
    )

    gridmap = joined[["station_id", "grid_id", "dist_m"]].copy()
    gridmap.to_parquet(GRIDMAP_PATH, index=False)
    print(f"Saved gridmap ({len(gridmap):,} stations) -> {GRIDMAP_PATH}")

# attach grid_id to all records
df = df.merge(gridmap[["station_id", "grid_id"]], on="station_id", how="inner")
if df.empty:
    raise ValueError("After station->grid join, SNOTEL has zero rows. Check station_id alignment.")

# ---------- pick variables of interest (you can expand later) ----------
# Keep everything for now; but when aggregating, we compute grid-level stats per variable.
keep_vars = [
    "precip_increment_in",  # primary precip
    "swe_in",
    "snow_depth_in",
    "tavg_f",
    "tmax_f",
    "tmin_f",
    "precip_accum_in",      # kept but not primary
]
df = df[df["variable"].isin(keep_vars)].copy()

# ---------- aggregate per grid_id x day x variable ----------
# We'll compute mean across stations in same grid cell, plus count.
df["date_day"] = df["date"].dt.floor("D")

all_days = pd.date_range(start=start.floor("D"), end=end.floor("D"), freq="D", tz="UTC")

wrote = skipped = 0
for i, day in enumerate(all_days, start=1):
    day_tag = day.strftime("%Y-%m-%d")
    shard_path = SHARDS_DIR / f"snotel_grid_{day_tag}.parquet"

    if shard_path.exists():
        skipped += 1
        continue

    dday = df[df["date_day"] == day]
    if dday.empty:
        # still write an empty shard to mark completion (resume-safe)
        empty = pd.DataFrame(columns=["grid_id", "date", "variable", "value_mean", "n_stations"])
        empty.to_parquet(shard_path, index=False)
        wrote += 1
        continue

    agg = (
        dday.groupby(["grid_id", "date_day", "variable"], as_index=False)
        .agg(
            value_mean=("value", "mean"),
            n_stations=("value", "count")
        )
        .rename(columns={"date_day": "date"})
    )

    # Optional: pivot to wide per day for easier merges later
    wide = agg.pivot_table(
        index=["grid_id", "date"],
        columns="variable",
        values="value_mean"
    ).reset_index()

    # attach station counts in wide form too
    counts = agg.pivot_table(
        index=["grid_id", "date"],
        columns="variable",
        values="n_stations"
    ).add_prefix("n_").reset_index()

    out = wide.merge(counts, on=["grid_id", "date"], how="left")

    out.to_parquet(shard_path, index=False)
    wrote += 1

    if i % 20 == 0 or i == len(all_days):
        print(f"  day {i}/{len(all_days)} | wrote={wrote} skipped={skipped} | last={day_tag} | rows={len(out):,}")

print("SNOTEL Cell B shards complete")
print("  wrote  :", wrote)
print("  skipped:", skipped)

# ---------- stitch final ----------
shards = sorted(SHARDS_DIR.glob("snotel_grid_*.parquet"))
parts = [pd.read_parquet(p) for p in shards]
final = pd.concat(parts, ignore_index=True)

# Some days may be empty shards; drop them
final = final.dropna(subset=["grid_id", "date"], how="any")

final.to_parquet(FINAL_PATH, index=False)
print("Saved FINAL:", FINAL_PATH, "rows=", len(final))
print(final.head())

In [74]:
# ============================
# ERA5 MASTER CELL (CDS API)
# - Downloads CA-subset ERA5-Land data into results/{era5_raw,era5_clean}
# - Supports:
#   A) derived-era5-land-daily-statistics (daily mean)
#   B) reanalysis-era5-land-monthly-means (baseline climatology)
# - Resume-safe, low-noise, config-driven

# Got the base data from here: https://cds.climate.copernicus.eu/datasets/derived-era5-land-daily-statistics?tab=download
# Config file contains the variables
# Created an account, and got an API key from https://cds.climate.copernicus.eu/api-how-to
# Saved it in ~/.cdsapirc as:
#   url: https://cds.climate.copernicus.eu/api/v2
#   key: <uid>:<api_key>   
# Config file contains the directories and download parameters


# ============================

from pathlib import Path
import calendar
import time
import cdsapi
import pandas as pd

# ---------- toggles (edit these per run) ----------
DO_DAILY_ANALYSIS_WINDOW = True          # 2024-06-01 .. 2024-10-31 (or CONFIG window)
DO_MONTHLY_BASELINE = True               # baseline monthly means (1991-2020 by default)

# If you want to limit daily downloads to only some months, set e.g. [6,7,8,9,10]
DAILY_MONTHS_OVERRIDE = None  # e.g. [6,7,8,9,10] or None to auto from CONFIG start/end

# ---------- helpers ----------
def bbox_to_area(bbox: dict) -> list[float]:
    # CDS expects [North, West, South, East]
    return [
        float(bbox["nwlat"]),
        float(bbox["nwlng"]),
        float(bbox["selat"]),
        float(bbox["selng"]),
    ]

def ensure_dir(p: Path) -> Path:
    p.mkdir(parents=True, exist_ok=True)
    return p

def ok_file(path: Path, min_bytes: int = 1_000_000) -> bool:
    return path.exists() and path.is_file() and path.stat().st_size >= min_bytes

def retry_sleep(base: float, attempt: int) -> float:
    return base * attempt

# ---------- config ----------
OUT_DIR = Path(CONFIG["out_dir"])
RAW_DIR = ensure_dir(OUT_DIR / CONFIG.get("ERA5_RAW_DIRNAME", "era5_raw"))
CLEAN_DIR = ensure_dir(OUT_DIR / CONFIG.get("ERA5_CLEAN_DIRNAME", "era5_clean"))  # placeholder for later

BBOX = CONFIG["bbox"]
AREA = bbox_to_area(BBOX)

DAILY_DATASET = CONFIG.get("ERA5_DAILY_DATASET", "derived-era5-land-daily-statistics")
MONTHLY_DATASET = CONFIG.get("ERA5_MONTHLY_DATASET", "reanalysis-era5-land-monthly-means")

VARS_DAILY = CONFIG.get("ERA5_VARIABLES_DAILY", [
    "2m_temperature",
    "snow_depth_water_equivalent",
    "volumetric_soil_water_layer_1",
])

MAX_RETRIES = int(CONFIG.get("ERA5_MAX_RETRIES", 6))
BACKOFF_BASE = float(CONFIG.get("ERA5_BACKOFF_BASE_S", 25))

# Analysis window from CONFIG
analysis_start = pd.to_datetime(CONFIG["start_date"]).date()
analysis_end   = pd.to_datetime(CONFIG["end_date"]).date()

# Baseline window from CONFIG
baseline_start = pd.to_datetime(CONFIG.get("BASELINE_START_DATE", "1991-01-01")).date()
baseline_end   = pd.to_datetime(CONFIG.get("BASELINE_END_DATE", "2020-12-31")).date()

print("ERA5 master cell starting")
print("  out_dir :", OUT_DIR)
print("  RAW_DIR :", RAW_DIR)
print("  CLEAN_DIR (future use):", CLEAN_DIR)
print("  AREA   :", AREA, "(N,W,S,E)")
print("  Daily dataset  :", DAILY_DATASET)
print("  Monthly dataset:", MONTHLY_DATASET)
print("  Daily vars:", VARS_DAILY)
print("  Analysis window :", analysis_start, "→", analysis_end)
print("  Baseline window :", baseline_start, "→", baseline_end)

# ---------- CDS client ----------
client = cdsapi.Client()

# ---------- download functions ----------
def cds_download(dataset: str, request: dict, out_path: Path):
    # resume-safe: skip if file exists
    if ok_file(out_path):
        print(f"[SKIP] {out_path.name} exists ({out_path.stat().st_size/1e6:.1f} MB)")
        return "skipped"

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            print(f"[DL] {out_path.name}  (dataset={dataset})")
            client.retrieve(dataset, request, str(out_path))
            if not ok_file(out_path):
                raise RuntimeError(f"Download finished but output looks too small: {out_path}")
            print(f"[OK] {out_path.name} ({out_path.stat().st_size/1e6:.1f} MB)")
            return "downloaded"
        except Exception as e:
            if attempt == MAX_RETRIES:
                raise
            sleep_s = retry_sleep(BACKOFF_BASE, attempt)
            print(f"[WARN] attempt {attempt}/{MAX_RETRIES} failed: {e}")
            print(f"       sleeping {sleep_s:.0f}s, then retrying...")
            time.sleep(sleep_s)

def download_daily_month(year: int, month: int):
    ndays = calendar.monthrange(year, month)[1]
    request = {
        "variable": VARS_DAILY,
        "year": f"{year:d}",
        "month": f"{month:02d}",
        "day": [f"{d:02d}" for d in range(1, ndays + 1)],
        "daily_statistic": "daily_mean",
        "time_zone": "utc+00:00",
        "frequency": "1_hourly",
        "area": AREA,
        "format": "netcdf",
    }
    out = RAW_DIR / f"era5l_daily_CA_{year}{month:02d}_mean.nc"
    return cds_download(DAILY_DATASET, request, out)

def month_range_from_dates(d1, d2):
    # inclusive month list between two dates
    months = []
    y, m = d1.year, d1.month
    while (y, m) <= (d2.year, d2.month):
        months.append((y, m))
        if m == 12:
            y += 1
            m = 1
        else:
            m += 1
    return months

def download_monthly_baseline(year: int):
    # Monthly means dataset supports selecting all months at once for a year
    request = {
        "variable": VARS_DAILY,     # same variable list to keep schema aligned
        "year": f"{year:d}",
        "month": [f"{m:02d}" for m in range(1, 13)],
        "time": "00:00",
        "area": AREA,
        "format": "netcdf",
    }
    out = RAW_DIR / f"era5l_monthly_CA_{year}.nc"
    return cds_download(MONTHLY_DATASET, request, out)

# ---------- run daily downloads ----------
if DO_DAILY_ANALYSIS_WINDOW:
    if DAILY_MONTHS_OVERRIDE is not None:
        months = [(analysis_start.year, m) for m in DAILY_MONTHS_OVERRIDE]
    else:
        months = month_range_from_dates(analysis_start, analysis_end)

    print("\nERA5 daily downloads:")
    stats = {"downloaded": 0, "skipped": 0}
    for (y, m) in months:
        res = download_daily_month(y, m)
        stats[res] += 1
    print("Daily summary:", stats)

# ---------- run monthly baseline downloads ----------
if DO_MONTHLY_BASELINE:
    print("\nERA5 monthly baseline downloads:")
    stats = {"downloaded": 0, "skipped": 0}
    for year in range(baseline_start.year, baseline_end.year + 1):
        res = download_monthly_baseline(year)
        stats[res] += 1
    print("Monthly baseline summary:", stats)

print("\nERA5 master cell complete.")
print("Raw files are in:", RAW_DIR)

ERA5 master cell starting
  out_dir : /Users/Shared/blueleaflabs/hydropulse/results
  RAW_DIR : /Users/Shared/blueleaflabs/hydropulse/results/era5_raw
  CLEAN_DIR (future use): /Users/Shared/blueleaflabs/hydropulse/results/era5_clean
  AREA   : [42.0095, -124.482, 32.5343, -114.1315] (N,W,S,E)
  Daily dataset  : derived-era5-land-daily-statistics
  Monthly dataset: reanalysis-era5-land-monthly-means
  Daily vars: ['2m_temperature', 'snow_depth_water_equivalent', 'volumetric_soil_water_layer_1']
  Analysis window : 2024-06-01 → 2024-10-31
  Baseline window : 1991-01-01 → 2025-12-31

ERA5 daily downloads:
[DL] era5l_daily_CA_202406_mean.nc  (dataset=derived-era5-land-daily-statistics)


2026-01-12 21:22:43,595 INFO Request ID is b051bcd7-774d-48a9-b7ae-0cbf189afccc
2026-01-12 21:22:43,808 INFO status has been updated to accepted
2026-01-12 21:22:53,032 INFO status has been updated to running
2026-01-12 21:31:11,022 INFO status has been updated to successful
                                                                                         

[OK] era5l_daily_CA_202406_mean.nc (1.3 MB)
[DL] era5l_daily_CA_202407_mean.nc  (dataset=derived-era5-land-daily-statistics)


2026-01-12 21:31:15,793 INFO Request ID is bbbf1732-28ba-472d-b58e-c95ef1053c1c
2026-01-12 21:31:16,038 INFO status has been updated to accepted
2026-01-12 21:31:30,700 INFO status has been updated to running
2026-01-12 21:33:12,511 INFO status has been updated to accepted
2026-01-12 21:34:11,352 INFO status has been updated to running
2026-01-12 21:39:42,795 INFO status has been updated to successful
                                                                                         

[OK] era5l_daily_CA_202407_mean.nc (1.3 MB)
[DL] era5l_daily_CA_202408_mean.nc  (dataset=derived-era5-land-daily-statistics)


2026-01-12 21:39:47,470 INFO Request ID is 31f80d73-d5bb-454a-bbbc-3da0b461ca00
2026-01-12 21:39:47,726 INFO status has been updated to accepted
2026-01-12 21:40:10,045 INFO status has been updated to running
2026-01-12 21:46:11,283 INFO status has been updated to successful
                                                                                         

[OK] era5l_daily_CA_202408_mean.nc (1.3 MB)
[DL] era5l_daily_CA_202409_mean.nc  (dataset=derived-era5-land-daily-statistics)


2026-01-12 21:46:18,889 INFO Request ID is 1babde93-769b-463c-aacd-43016c6ff766
2026-01-12 21:46:19,195 INFO status has been updated to accepted
2026-01-12 21:47:10,969 INFO status has been updated to running
2026-01-12 21:52:44,452 INFO status has been updated to successful
                                                                                         

[OK] era5l_daily_CA_202409_mean.nc (1.3 MB)
[DL] era5l_daily_CA_202410_mean.nc  (dataset=derived-era5-land-daily-statistics)


2026-01-12 21:52:48,399 INFO Request ID is a1c8e289-73e1-4321-a919-adf95836e43c
2026-01-12 21:52:48,593 INFO status has been updated to accepted
2026-01-12 21:53:22,361 INFO status has been updated to running
2026-01-12 21:59:13,496 INFO status has been updated to successful
                                                                                         

[OK] era5l_daily_CA_202410_mean.nc (1.4 MB)
Daily summary: {'downloaded': 5, 'skipped': 0}

ERA5 monthly baseline downloads:
[DL] era5l_monthly_CA_1991.nc  (dataset=reanalysis-era5-land-monthly-means)


2026-01-12 21:59:19,274 INFO Request ID is b6ff0ac1-869c-4d5d-8eb0-540dba5fa8d0
2026-01-12 21:59:19,520 INFO status has been updated to accepted
2026-01-12 21:59:36,293 INFO status has been updated to running
2026-01-12 21:59:44,155 INFO status has been updated to accepted
2026-01-12 21:59:55,751 INFO status has been updated to successful
                                                                                       

[WARN] attempt 1/6 failed: Download finished but output looks too small: /Users/Shared/blueleaflabs/hydropulse/results/era5_raw/era5l_monthly_CA_1991.nc
       sleeping 25s, then retrying...
[DL] era5l_monthly_CA_1991.nc  (dataset=reanalysis-era5-land-monthly-means)


2026-01-12 22:00:24,843 INFO Request ID is 3250a60c-3a41-47c3-ac4c-12e532db3215
2026-01-12 22:00:26,914 INFO status has been updated to accepted
2026-01-12 22:00:36,906 INFO status has been updated to successful
                                                                                       

[WARN] attempt 2/6 failed: Download finished but output looks too small: /Users/Shared/blueleaflabs/hydropulse/results/era5_raw/era5l_monthly_CA_1991.nc
       sleeping 50s, then retrying...
[DL] era5l_monthly_CA_1991.nc  (dataset=reanalysis-era5-land-monthly-means)


2026-01-12 22:01:31,808 INFO Request ID is ba899ca7-de1d-4eb5-b891-b8bc5007dfd0
2026-01-12 22:01:32,199 INFO status has been updated to accepted
2026-01-12 22:01:46,562 INFO status has been updated to successful
                                                                                       

[WARN] attempt 3/6 failed: Download finished but output looks too small: /Users/Shared/blueleaflabs/hydropulse/results/era5_raw/era5l_monthly_CA_1991.nc
       sleeping 75s, then retrying...
[DL] era5l_monthly_CA_1991.nc  (dataset=reanalysis-era5-land-monthly-means)


2026-01-12 22:03:06,929 INFO Request ID is a4b35a59-8283-467b-aa79-87b4b7518dd5
2026-01-12 22:03:07,184 INFO status has been updated to accepted
2026-01-12 22:03:30,281 INFO status has been updated to successful
                                                                                       

[WARN] attempt 4/6 failed: Download finished but output looks too small: /Users/Shared/blueleaflabs/hydropulse/results/era5_raw/era5l_monthly_CA_1991.nc
       sleeping 100s, then retrying...
[DL] era5l_monthly_CA_1991.nc  (dataset=reanalysis-era5-land-monthly-means)


2026-01-12 22:05:16,326 INFO Request ID is 5d657024-59f0-458f-ac01-4122fd2c378f
2026-01-12 22:05:16,867 INFO status has been updated to accepted
2026-01-12 22:05:22,403 INFO status has been updated to running
2026-01-12 22:05:26,002 INFO status has been updated to successful
                                                                                       

[WARN] attempt 5/6 failed: Download finished but output looks too small: /Users/Shared/blueleaflabs/hydropulse/results/era5_raw/era5l_monthly_CA_1991.nc
       sleeping 125s, then retrying...
[DL] era5l_monthly_CA_1991.nc  (dataset=reanalysis-era5-land-monthly-means)


2026-01-12 22:07:35,622 INFO Request ID is 3630b934-6b11-4521-affc-c3903f4da419
2026-01-12 22:07:35,819 INFO status has been updated to accepted
2026-01-12 22:07:50,261 INFO status has been updated to successful
                                                                                       

RuntimeError: Download finished but output looks too small: /Users/Shared/blueleaflabs/hydropulse/results/era5_raw/era5l_monthly_CA_1991.nc

In [75]:
# Next dataset is the USGS-StreamFlow dataset
# ============================
# Base endpoint (Daily Values / DV):
# 	•	https://waterservices.usgs.gov/nwis/dv/

# Key parameters you’ll use (these are the ones that matter):
# 	•	format=rdb (tab-delimited, easy to parse)
# 	•	stateCd=ca (California-only site filter)
# 	•	parameterCd=00060 (discharge, cubic feet per second)
# 	•	siteType=ST (streams)
# 	•	startDT=1991-01-01 (baseline start)
# 	•	endDT=2020-12-31 (baseline end) or endDT=2024-10-31 (analysis end)
# 	•	siteStatus=active (optional)
# 	•	statCd=00003 (daily mean) (supported by DV service outputs)  ￼
# ============================

# URLs look like this: https://waterservices.usgs.gov/nwis/dv/?format=rdb&stateCd=CA&siteType=ST&parameterCd=00060&startDT=1991-01-01&endDT=2025-12-31
# This is a large download! If this fails, then try manually
# downloading from the above URL and saving as "manual/usgs-streamflow/streamflow-19910101-20251231.txt"
# and then run the parsing/normalization cell below.
# ============================


# ============================
# USGS NWIS Daily Values downloader (chunked, resumable)
# Writes to:
#   results/{USGS_RAW_DIRNAME}/  (yearly .rdb chunks + optional combined raw)
#   results/{USGS_CLEAN_DIRNAME}/ (reserved for later)
# ============================

from pathlib import Path
import time
import requests
import pandas as pd

# ---- config ----
OUT_DIR = Path(CONFIG["out_dir"])

RAW_DIR = OUT_DIR / CONFIG.get("USGS_RAW_DIRNAME", "usgs_raw")
CLEAN_DIR = OUT_DIR / CONFIG.get("USGS_CLEAN_DIRNAME", "usgs_clean")
RAW_DIR.mkdir(parents=True, exist_ok=True)
CLEAN_DIR.mkdir(parents=True, exist_ok=True)

STATE_CD = CONFIG.get("USGS_STATE_CD", "CA")
SITE_TYPE = CONFIG.get("USGS_SITE_TYPE", "ST")
PARAM = CONFIG.get("USGS_PARAM_DISCHARGE", "00060")

START = pd.to_datetime(CONFIG.get("USGS_START_DATE", "1991-01-01")).date()
END   = pd.to_datetime(CONFIG.get("USGS_END_DATE", "2025-12-31")).date()

SLEEP_S = float(CONFIG.get("USGS_SLEEP_S", 2.5))
MAX_RETRIES = int(CONFIG.get("USGS_MAX_RETRIES", 6))
BACKOFF_BASE = float(CONFIG.get("USGS_BACKOFF_BASE_S", 10))
TIMEOUT_S = int(CONFIG.get("USGS_TIMEOUT_S", 180))

BASE_URL = "https://waterservices.usgs.gov/nwis/dv/"

def ok_file(path: Path, min_bytes: int = 50_000) -> bool:
    return path.exists() and path.is_file() and path.stat().st_size >= min_bytes

def fetch_year(year: int) -> Path:
    startDT = f"{year:04d}-01-01"
    endDT   = f"{year:04d}-12-31"
    out = RAW_DIR / f"usgs_dv_{STATE_CD.lower()}_{SITE_TYPE.lower()}_{PARAM}_{year:04d}.rdb"

    if ok_file(out):
        print(f"[SKIP] {out.name} exists ({out.stat().st_size/1e6:.2f} MB)")
        return out

    params = {
        "format": "rdb",
        "stateCd": STATE_CD,
        "siteType": SITE_TYPE,
        "parameterCd": PARAM,
        "startDT": startDT,
        "endDT": endDT,
    }

    headers = {
        "User-Agent": CONFIG.get("USER_AGENT_HEADERS", {}).get("User-Agent", "BlueLeafLabs/HydroPulse"),
        "Accept-Encoding": "gzip, deflate",
    }

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            print(f"[DL] {year} -> {out.name}")
            r = requests.get(BASE_URL, params=params, headers=headers, timeout=TIMEOUT_S)
            r.raise_for_status()

            tmp = out.with_suffix(".tmp")
            tmp.write_bytes(r.content)

            if not ok_file(tmp):
                raise RuntimeError(f"Downloaded file too small for {year}: {tmp.stat().st_size} bytes")

            tmp.replace(out)
            print(f"[OK] {out.name} ({out.stat().st_size/1e6:.2f} MB)")
            time.sleep(SLEEP_S)
            return out

        except Exception as e:
            if attempt == MAX_RETRIES:
                raise
            sleep = BACKOFF_BASE * attempt
            print(f"[WARN] {year} attempt {attempt}/{MAX_RETRIES} failed: {e}")
            print(f"       sleeping {sleep:.0f}s then retrying...")
            time.sleep(sleep)

# ---- run yearly downloads ----
years = list(range(START.year, END.year + 1))

print("USGS download starting (yearly chunks)")
print("  RAW_DIR  :", RAW_DIR)
print("  CLEAN_DIR:", CLEAN_DIR, "(reserved)")
print("  state/site/param:", STATE_CD, SITE_TYPE, PARAM)
print("  date range:", START, "→", END)
print("  years:", years[0], "→", years[-1])

paths = []
for y in years:
    paths.append(fetch_year(y))

print("\nDownloaded/verified chunks:", len(paths))

# ---- optional: combined raw file under RAW_DIR ----
combined = RAW_DIR / f"usgs_dv_{STATE_CD.lower()}_{SITE_TYPE.lower()}_{PARAM}_{START.year:04d}-{END.year:04d}.rdb"
if not ok_file(combined, min_bytes=200_000):
    print("[COMBINE] Writing combined file:", combined.name)
    with combined.open("wb") as w:
        first = True
        for p in paths:
            data = p.read_bytes().splitlines(keepends=True)
            if first:
                w.writelines(data)
                first = False
            else:
                # drop leading comment lines from each subsequent yearly shard
                i = 0
                while i < len(data) and data[i].lstrip().startswith(b"#"):
                    i += 1
                w.writelines(data[i:])
    print("[OK] Combined:", combined.name, f"({combined.stat().st_size/1e6:.2f} MB)")
else:
    print("[SKIP] Combined file already exists:", combined.name)
    


USGS download starting (yearly chunks)
  RAW_DIR  : /Users/Shared/blueleaflabs/hydropulse/results/usgs_raw
  CLEAN_DIR: /Users/Shared/blueleaflabs/hydropulse/results/usgs_clean (reserved)
  state/site/param: CA ST 00060
  date range: 1991-01-01 → 2025-12-31
  years: 1991 → 2025
[DL] 1991 -> usgs_dv_ca_st_00060_1991.rdb
[OK] usgs_dv_ca_st_00060_1991.rdb (9.97 MB)
[DL] 1992 -> usgs_dv_ca_st_00060_1992.rdb
[OK] usgs_dv_ca_st_00060_1992.rdb (9.79 MB)
[DL] 1993 -> usgs_dv_ca_st_00060_1993.rdb
[OK] usgs_dv_ca_st_00060_1993.rdb (9.54 MB)
[DL] 1994 -> usgs_dv_ca_st_00060_1994.rdb
[OK] usgs_dv_ca_st_00060_1994.rdb (9.68 MB)
[DL] 1995 -> usgs_dv_ca_st_00060_1995.rdb
[OK] usgs_dv_ca_st_00060_1995.rdb (9.45 MB)
[DL] 1996 -> usgs_dv_ca_st_00060_1996.rdb
[OK] usgs_dv_ca_st_00060_1996.rdb (9.59 MB)
[DL] 1997 -> usgs_dv_ca_st_00060_1997.rdb
[OK] usgs_dv_ca_st_00060_1997.rdb (9.53 MB)
[DL] 1998 -> usgs_dv_ca_st_00060_1998.rdb
[OK] usgs_dv_ca_st_00060_1998.rdb (9.66 MB)
[DL] 1999 -> usgs_dv_ca_st_00060_

In [None]:
# MODIS DATA DOWNLOAD
# ============================
# Manual data again
# https://appeears.earthdatacloud.nasa.gov/task/area
# parameters: name: hydropulse_ca_ndvi
# GeoJSON file: results/california_boundary.geojson
# date range: 2000-01-01 to 2020-12-31
# products: MOD13Q1.061 – MODIS/Terra Vegetation Indices 16-Day L3 Global 250m
# From this product, select only: NDVI, VI_Quality, EVI
# Output format: GeoTIFF
# Projection: Native Projection 
# ============================

# Then repeat the same thing for 2024-06-01 to 2024-10-31
# Emails were sent when the jobs were received
# Now to wait for the files to be ready for download

In [None]:
# === HydroPulse | Final Daily Grid Builder (v0: GHCND PRCP only) ===
# Produces: CONFIG["FINAL_DAILY_FILENAME"] as a canonical daily grid table:
#   grid_id × date → prcp_mm + QC
#
# Inputs:
#   - Grid parquet (EPSG:3310): CONFIG["GRID_FILENAME"]
#   - GHCND cleaned station-day parquet: resolve_out_path(CONFIG["GHCND_CLEAN_PARQUET_NAME"])
#
# Notes for later HeatShield refactor:
#   - This cell establishes the common "final builder" contract: {grid_id, date, variables..., QC...}
#   - Keep the interface stable; only swap/extend source adapters per repo.

import numpy as np
import pandas as pd
import geopandas as gpd
from pathlib import Path
from sklearn.neighbors import BallTree

# -----------------------------
# Paths
# -----------------------------
OUT_DIR = Path(CONFIG["out_dir"])
GRID_PATH = Path(resolve_out_path(CONFIG["GRID_FILENAME"]))
GHCND_PATH = Path(resolve_out_path(CONFIG["GHCND_CLEAN_PARQUET_NAME"]))
FINAL_PATH = Path(resolve_out_path(CONFIG["FINAL_DAILY_FILENAME"]))

# Resume via daily shards
SHARDS_DIR = OUT_DIR / "derived" / "final_daily_shards_prcp"
SHARDS_DIR.mkdir(parents=True, exist_ok=True)

print("GRID_PATH:", GRID_PATH)
print("GHCND_PATH:", GHCND_PATH)
print("SHARDS_DIR:", SHARDS_DIR)
print("FINAL_PATH:", FINAL_PATH)

# -----------------------------
# Column contract (confirmed)
# -----------------------------
STATION_COL = "station_core"
DATE_COL    = "date"
LAT_COL     = "lat"
LON_COL     = "lon"
PRCP_COL_IN = "precipitation_mm"

# Output column naming for the final table
PRCP_COL_OUT = "prcp_mm"

# -----------------------------
# Tunables (can later move to config)
# -----------------------------
K = int(CONFIG.get("PRCP_IDW_K", 8))                        # k nearest stations
HARD_CAP_KM = float(CONFIG.get("PRCP_HARD_CAP_KM", 100.0))  # ignore stations beyond this radius
POWER = float(CONFIG.get("PRCP_IDW_POWER", 2.0))            # IDW power

OPS_EPSG = int(CONFIG.get("OPS_EPSG", 3310))
WGS84_EPSG = int(CONFIG.get("WGS84_EPSG", 4326))

# -----------------------------
# Helpers (keep these stable across repos)
# -----------------------------
def ensure_epsg(gdf: gpd.GeoDataFrame, epsg: int) -> gpd.GeoDataFrame:
    if gdf.crs is None:
        gdf = gdf.set_crs(f"EPSG:{WGS84_EPSG}")
    if (gdf.crs.to_epsg() or 0) != epsg:
        gdf = gdf.to_crs(epsg)
    return gdf

def ensure_grid_id(grid: gpd.GeoDataFrame) -> tuple[gpd.GeoDataFrame, str]:
    for c in ["grid_id", "cell_id", "id"]:
        if c in grid.columns:
            return grid, c
    grid = grid.copy()
    grid["grid_id"] = np.arange(len(grid), dtype=np.int32)
    return grid, "grid_id"

def build_balltree_from_points(geom: gpd.GeoSeries) -> BallTree:
    xy = np.column_stack([geom.x.values, geom.y.values])
    return BallTree(xy, metric="euclidean")

def idw(dist_m: np.ndarray, vals: np.ndarray, power: float) -> float:
    if np.any(dist_m == 0):
        return float(vals[np.argmin(dist_m)])
    w = 1.0 / np.power(dist_m, power)
    return float(np.sum(w * vals) / np.sum(w))

def interpolate_prcp_for_day(grid_centroids: gpd.GeoSeries, stations_day: gpd.GeoDataFrame) -> pd.DataFrame:
    """
    Returns a DF aligned to grid_centroids order with columns:
      prcp_mm, n_used, maxdist_km, method
    """
    n_cells = len(grid_centroids)
    out = pd.DataFrame({
        PRCP_COL_OUT: np.full(n_cells, np.nan, dtype=float),
        "n_used": np.zeros(n_cells, dtype=np.int16),
        "maxdist_km": np.full(n_cells, np.nan, dtype=float),
        "method": np.full(n_cells, None, dtype=object),
    })

    if stations_day is None or len(stations_day) == 0:
        return out

    # Build tree on station points
    tree = build_balltree_from_points(stations_day.geometry)
    vals = stations_day[PRCP_COL_IN].to_numpy(dtype=float)

    qxy = np.column_stack([grid_centroids.x.values, grid_centroids.y.values])
    k_eff = min(K, len(stations_day))
    dist_m, idx = tree.query(qxy, k=k_eff)

    hard_cap_m = HARD_CAP_KM * 1000.0

    for i in range(n_cells):
        d = dist_m[i]
        j = idx[i]

        # Apply hard cap
        mask = d <= hard_cap_m
        if not np.any(mask):
            continue

        d_use = d[mask]
        v_use = vals[j[mask]]

        # Drop NaNs (defensive)
        good = np.isfinite(v_use)
        d_use = d_use[good]
        v_use = v_use[good]
        if len(v_use) == 0:
            continue

        if len(v_use) == 1:
            out.at[i, PRCP_COL_OUT] = float(v_use[0])
            out.at[i, "method"] = "nearest"
        else:
            out.at[i, PRCP_COL_OUT] = idw(d_use, v_use, power=POWER)
            out.at[i, "method"] = f"idw_k{len(v_use)}"

        out.at[i, "n_used"] = int(len(v_use))
        out.at[i, "maxdist_km"] = float(np.max(d_use) / 1000.0)

    return out

# -----------------------------
# Load grid (EPSG:3310) and compute centroids
# -----------------------------
grid = gpd.read_parquet(GRID_PATH)
grid = ensure_epsg(grid, OPS_EPSG)
grid, GID_COL = ensure_grid_id(grid)
centroids = grid.geometry.centroid

print(f"Grid: {len(grid)} cells | CRS EPSG: {grid.crs.to_epsg()} | id col: {GID_COL}")

# -----------------------------
# Load cleaned GHCND station-day table
# -----------------------------
cdo = pd.read_parquet(GHCND_PATH)

required = [STATION_COL, DATE_COL, LAT_COL, LON_COL, PRCP_COL_IN]
missing = [c for c in required if c not in cdo.columns]
if missing:
    raise KeyError(f"Missing required columns in {GHCND_PATH.name}: {missing}. Have: {list(cdo.columns)}")

# Normalize date to UTC day
cdo[DATE_COL] = pd.to_datetime(cdo[DATE_COL], utc=True, errors="coerce").dt.normalize()

# Filter date window
start = pd.to_datetime(CONFIG["start_date"], utc=True).normalize()
end = pd.to_datetime(CONFIG["end_date"], utc=True).normalize()
cdo = cdo[(cdo[DATE_COL] >= start) & (cdo[DATE_COL] <= end)].copy()

# Drop invalid coords / missing precip
cdo = cdo[np.isfinite(cdo[LAT_COL]) & np.isfinite(cdo[LON_COL])].copy()
cdo = cdo[np.isfinite(cdo[PRCP_COL_IN])].copy()

print("Station-day rows:", len(cdo), "| stations:", cdo[STATION_COL].nunique(), "| dates:", cdo[DATE_COL].nunique())

# GeoDataFrame in OPS_EPSG
pts = gpd.GeoDataFrame(
    cdo,
    geometry=gpd.points_from_xy(cdo[LON_COL], cdo[LAT_COL]),
    crs=f"EPSG:{WGS84_EPSG}"
)
pts = ensure_epsg(pts, OPS_EPSG)

# Pre-group by date for speed (avoid repeated boolean filters)
pts_by_date = {d: df for d, df in pts.groupby(DATE_COL)}
all_dates = pd.date_range(start=start, end=end, freq="D")

# -----------------------------
# Daily loop with resume
# -----------------------------
written = 0
skipped = 0

for d in all_dates:
    tag = d.strftime("%Y%m%d")
    shard_path = SHARDS_DIR / f"final_prcp_{tag}.parquet"
    if shard_path.exists():
        skipped += 1
        continue

    day_pts = pts_by_date.get(d)
    interp = interpolate_prcp_for_day(centroids, day_pts)

    out = pd.DataFrame({
        GID_COL: grid[GID_COL].values,
        "date": np.full(len(grid), d),
    })
    out = pd.concat([out, interp], axis=1)

    out.to_parquet(shard_path, index=False)
    written += 1

print(f"Shards written: {written} | skipped (resume): {skipped} | total days: {len(all_dates)}")

# -----------------------------
# Compose FINAL parquet
# -----------------------------
shards = sorted(SHARDS_DIR.glob("final_prcp_*.parquet"))
if not shards:
    raise FileNotFoundError(f"No shards found in {SHARDS_DIR}")

df_final = pd.concat((pd.read_parquet(p) for p in shards), ignore_index=True)

df_final["date"] = pd.to_datetime(df_final["date"], utc=True).dt.normalize()
df_final = df_final.sort_values([GID_COL, "date"]).reset_index(drop=True)

FINAL_PATH.parent.mkdir(parents=True, exist_ok=True)
df_final.to_parquet(FINAL_PATH, index=False)

print("Saved FINAL:", FINAL_PATH)
print("Final rows:", len(df_final), "| cells:", df_final[GID_COL].nunique(), "| dates:", df_final["date"].nunique())
print(df_final.head())