From 7f6c9da48eeeed14c593e49dc05e020a1facfc7a Mon Sep 17 00:00:00 2001 From: almac2022 Date: Sun, 3 May 2026 15:24:05 -0700 Subject: [PATCH 1/2] Initialize PWF baseline for #38 Co-Authored-By: Claude Opus 4.7 (1M context) --- planning/active/findings.md | 101 +++++++++++++++++++++++++++++++++++ planning/active/progress.md | 13 +++++ planning/active/task_plan.md | 98 +++++++++++++++++++++++++++++++++ 3 files changed, 212 insertions(+) create mode 100644 planning/active/findings.md create mode 100644 planning/active/progress.md create mode 100644 planning/active/task_plan.md diff --git a/planning/active/findings.md b/planning/active/findings.md new file mode 100644 index 0000000..8473236 --- /dev/null +++ b/planning/active/findings.md @@ -0,0 +1,101 @@ +# Findings — Bulk data fetch safeguards (#38) + +## Issue context (verbatim from #38) + +The EDH migration surfaced a reusable pattern: "what safeguards does a +bulk data fetch script need to be polite AND safe against its own +failure modes?" + +Born from painful experience on CDS (#33: rate-limited, orphan jobs, +zombie processes) and the EDH migration QA (#36: atomic writes, +partial writes, silent skips). + +### Layer 1: fix the gaps in this project's script + +`scripts/backfill_edh_all.py` already has: + +- [x] Idempotency per output file (skip if exists) +- [x] Atomic writes (.tif.tmp + os.replace) so a killed run doesn't + leave truncated files that fool the idempotency check +- [x] Explicit SKIP logging when source data is incomplete + +Missing safeguards to add on this branch: + +- [ ] Single-instance pgrep pre-flight check +- [ ] Retry-with-backoff on transient HTTP errors from EDH +- [ ] Backup-before-delete when regenerating from a new source + +### Layer 2: extract as a soul convention + +Once Layer 1 lands, propagate the pattern to the +[soul](https://github.com/NewGraphEnvironment/soul) repo as a +convention. + +## State found during plan-mode exploration + +**Layer 1 is partially already done.** The issue checklist above is +stale. + +Commits already on main: + +- `5bf1b34` Add single-instance guard and retry-with-backoff to EDH + backfill — closed safeguards 1 and 2 in `backfill_edh_all.py`. +- `6f66a01` Skip pgrep single-instance check on GHA — fixed CI false + positive. + +So `backfill_edh_all.py` already has: +- `preflight_single_instance()` at lines 69-94 (with `GITHUB_ACTIONS` + bypass) +- `with_retry(fn, ...)` at lines 97-115 (4 attempts, exponential + backoff, retries on `OSError`/`ConnectionError`/`TimeoutError`) +- Atomic write via `.tmp` + `os.replace` at lines 153-178 + (`write_geotiff`) + +Backup-before-delete (3rd safeguard) is **in operational use on disk** +but never codified: `data/backfill/monthly/_cds_backup/` holds 375 +CDS-era TIFs, hand-managed before the EDH backfill overwrote them. + +**Sibling script `backfill_edh_tmax_tmin.py` is missing all three +safeguards.** Same shape as `backfill_edh_all.py` (year loop, +idempotency, monthly aggregation, GeoTIFF write) but no pgrep guard, +no retry, no atomic write. One-shot scripts (`probe_edh_vars.py`, +`test_edh_era5_land.py`) are not in scope. + +## Architecture decisions taken (user-confirmed) + +1. **Extract to `scripts/_lib.py`** rather than copy-paste between the + two scripts. One source of truth, well-positioned for the eventual + snow-backfill script (#48). Module is plain importable Python; both + scripts retain their PEP 723 inline-deps shebang. +2. **Soul convention extraction is a follow-up.** This branch closes + Layer 1; Layer 2 ships as a separate soul-repo PR after merge. +3. **Hoist `get_token()` into `_lib.py`** while refactoring — both + scripts already duplicate it. +4. **Keep `bc_slice`, `tetens_es` in-script** — they are + pipeline-specific to the EDH all-vars pipeline, not bulk-fetch + safeguards. +5. **Ship `backup_before_delete()` helper unused.** First real call + site lands with #48 if its aggregation method requires re-running + existing years. + +## Inventory of bulk-fetch scripts in `scripts/` + +| Script | Pgrep | Retry | Atomic Write | Notes | +|---|---|---|---|---| +| `backfill_edh_all.py` | done (69-94) | done (97-115) | done (153-178) | All 7 cd vars, 1950-2025 | +| `backfill_edh_tmax_tmin.py` | missing | missing | missing | tmax/tmin only | +| `probe_edh_vars.py` | n/a | n/a | n/a | One-shot validation; out of scope | +| `test_edh_era5_land.py` | n/a | n/a | n/a | Benchmark; out of scope | + +## Python tooling (none configured) + +- No `pyproject.toml`, no `ruff.toml`, no `pytest` config in repo. +- All scripts use PEP 723 inline deps with + `#!/usr/bin/env -S uv run --script` shebang — self-contained. +- CLAUDE.md documents R conventions only; Python is utility-tier. + +## Anchors for the soul convention follow-up + +When the soul-repo issue lands, the body cribs from #38's "Layer 2" +checklist (politeness / self-safety / data integrity / performance +sanity). Pin the new `scripts/_lib.py` as the canonical worked example. diff --git a/planning/active/progress.md b/planning/active/progress.md new file mode 100644 index 0000000..3e6122c --- /dev/null +++ b/planning/active/progress.md @@ -0,0 +1,13 @@ +# Progress — Bulk data fetch safeguards (#38) + +## Session 2026-05-03 + +- Plan-mode exploration — confirmed 2 of 3 safeguards already in + `backfill_edh_all.py` (commits `5bf1b34`, `6f66a01`); sibling + `backfill_edh_tmax_tmin.py` is missing all three. Phases approved + by user with two scope decisions: extract helpers to + `scripts/_lib.py`, and defer soul convention to a follow-up. +- Created branch `38-bulk-fetch-safeguards` off main (post v0.1.5 + release). +- Scaffolded PWF baseline. +- Next: Phase 1 — extract `scripts/_lib.py`. diff --git a/planning/active/task_plan.md b/planning/active/task_plan.md new file mode 100644 index 0000000..a96e578 --- /dev/null +++ b/planning/active/task_plan.md @@ -0,0 +1,98 @@ +# Task: Bulk data fetch safeguards — fix gaps, extract as soul convention (#38) + +## Problem + +Three safeguards were filed as missing in `scripts/backfill_edh_all.py`: +single-instance pgrep guard, retry-with-backoff, backup-before-delete. +On exploration, (1) and (2) are already implemented (commits `5bf1b34`, +`6f66a01`) but the issue checklist is stale. The `_cds_backup/` +directory on disk (375 files) shows backup-before-delete is in +operational use but never codified. Sibling script +`backfill_edh_tmax_tmin.py` is missing all three safeguards. + +Goal: extract the three safeguards (plus shared `log` helper and +`MONTH_NAMES`) into a new `scripts/_lib.py`, refactor both production +bulk-fetch scripts to import from it, and add a `backup_before_delete()` +helper that codifies the on-disk pattern. Soul convention extraction +(Layer 2) is a follow-up issue once this lands. + +## Phase 1 — Extract `scripts/_lib.py` + +- [ ] Create `scripts/_lib.py` with helpers parameterized for reuse: + `preflight_single_instance(name)`, `with_retry(fn, ...)`, + `write_geotiff(da, out_path, month_names)`, `log(msg)`, + `MONTH_NAMES`. Hoist `get_token()` too — both scripts duplicate it. +- [ ] Refactor `backfill_edh_all.py` to `from _lib import (...)`. Drop + the now-redundant local copies (lines 69-94, 97-115, 118-127, + 153-178, 181-182). +- [ ] Smoke test on one year: + `uv run scripts/backfill_edh_all.py --year 1950`. Outputs match + pre-refactor. + +## Phase 2 — Port safeguards to `backfill_edh_tmax_tmin.py` + +- [ ] `from _lib import (...)` at top. +- [ ] `preflight_single_instance("backfill_edh_tmax_tmin")` at top of + `main()`. +- [ ] Wrap `xr.open_dataset(zarr_url, ...)` (line 96) in `with_retry`. +- [ ] Wrap each `.compute()` call (lines 132-133) in `with_retry`. +- [ ] Replace inline `to_geotiff_raster` (lines 141-155) with + `write_geotiff` from `_lib.py`. +- [ ] Replace ad-hoc `print(...)` with `log(...)`. +- [ ] Smoke test: + `uv run scripts/backfill_edh_tmax_tmin.py --year 1950`. + +## Phase 3 — Add `backup_before_delete()` helper + +- [ ] Add `backup_before_delete(files, backup_subdir="_backup")` to + `_lib.py`. Each file moves to `file.parent/backup_subdir/file.name`. + No overwrite (skip with warning if backup target exists). +- [ ] Module docstring documents intended use during regenerations and + points at `data/backfill/monthly/_cds_backup/` as the worked + example. +- [ ] No call sites added now — both current scripts are pure-write, + protected by idempotency. Helper exists for #48 if its + aggregation method requires re-running existing years. + +## Phase 4 — Verify safeguard behaviors + +- [ ] Pgrep guard fires: start one + `backfill_edh_tmax_tmin --year 1950` in background, immediately + try to start a second; second ABORTs. +- [ ] Output equivalence: byte-compare a re-rendered year file from + both scripts against the pre-refactor file. + +## Phase 5 — File soul-convention follow-up + +- [ ] After this PR merges, file an issue in + `NewGraphEnvironment/soul` referencing #38, scoping a + `bulk-fetch-safeguards.md` convention file with `scripts/_lib.py` + as the canonical worked example. + +## Phase 6 — Code-check, commit, PR + +- [ ] `/code-check` on staged diff before each commit. +- [ ] Atomic commits: `_lib.py` + `backfill_edh_all.py` refactor (1), + `backfill_edh_tmax_tmin.py` port (2), `backup_before_delete` (3). +- [ ] PR with `Fixes #38`, SRED ref in body + (`Relates to NewGraphEnvironment/sred-2025-2026#23`). + +## Validation + +- [ ] Single-year smoke test on `backfill_edh_all.py` succeeds. +- [ ] Single-year smoke test on `backfill_edh_tmax_tmin.py` succeeds. +- [ ] Pgrep guard refuses second concurrent instance of each script. +- [ ] No new dependencies in either script's PEP 723 inline deps. +- [ ] `/code-check` clean on each commit. +- [ ] PWF checkboxes match landed work. +- [ ] `/planning-archive` on completion. + +## Out of scope + +- Python lint config (ruff, black) — `cd` is R-first. +- Pytest test runner. +- Soul PR itself (deferred). +- `probe_edh_vars.py` and `test_edh_era5_land.py` (one-shot validation + scripts; safeguards target production bulk-fetch). +- A `--regen` flag wiring `backup_before_delete` into either script; + helper ships unused, first real call site lands with #48 if needed. From fefb8da081fd864470579111e3bea32501f78e1e Mon Sep 17 00:00:00 2001 From: almac2022 Date: Sun, 3 May 2026 18:58:29 -0700 Subject: [PATCH 2/2] Extract bulk-fetch safeguards to scripts/_lib.py and apply to both backfill scripts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the gaps in #38. preflight_single_instance, with_retry, atomic write_geotiff, log, get_token, MONTH_NAMES move from backfill_edh_all.py into a new shared scripts/_lib.py. New backup_before_delete() helper codifies the on-disk pattern from data/backfill/monthly/_cds_backup/ — no call sites yet, ready for the snow-vars script (#48) if a regen is needed. backfill_edh_tmax_tmin.py was missing all three safeguards; now imports the same helpers, with_retry wraps the zarr open and the .compute() calls, write_geotiff replaces the inline non-atomic to_geotiff_raster. Smoke-tested both scripts with --year 1950 (idempotent-skip path) and verified the pgrep guard rejects a concurrent second instance. Co-Authored-By: Claude Opus 4.7 (1M context) --- planning/active/progress.md | 16 ++- planning/active/task_plan.md | 47 +++---- scripts/_lib.py | 195 ++++++++++++++++++++++++++++++ scripts/backfill_edh_all.py | 111 ++--------------- scripts/backfill_edh_tmax_tmin.py | 86 +++++-------- 5 files changed, 277 insertions(+), 178 deletions(-) create mode 100644 scripts/_lib.py diff --git a/planning/active/progress.md b/planning/active/progress.md index 3e6122c..cc9da1c 100644 --- a/planning/active/progress.md +++ b/planning/active/progress.md @@ -10,4 +10,18 @@ - Created branch `38-bulk-fetch-safeguards` off main (post v0.1.5 release). - Scaffolded PWF baseline. -- Next: Phase 1 — extract `scripts/_lib.py`. +- Phases 1–3 implemented: `scripts/_lib.py` ships + `preflight_single_instance(name)`, `with_retry`, `write_geotiff`, + `log`, `get_token`, `MONTH_NAMES`, and new + `backup_before_delete()` helper. Both `backfill_edh_all.py` and + `backfill_edh_tmax_tmin.py` now import from `_lib`. Net + -156/+41 LOC. +- Phase 4 verification: pgrep guard fires correctly when a second + instance launches (tested live with both pids reported); both + scripts idempotent-skip on existing year files; no orphaned + imports remain. +- Methodology pinned on #48 (snow vars): 7-day rolling sum of daily + `smlt` for `snowmelt_rate_peak`, daily UTC product preferred over + hourly to dodge the `stepType=accum` trap (issue comment 4367348096). +- Next: commit, then Phase 5 (file soul-convention follow-up issue) + + Phase 6 (PR). diff --git a/planning/active/task_plan.md b/planning/active/task_plan.md index a96e578..3de2e5d 100644 --- a/planning/active/task_plan.md +++ b/planning/active/task_plan.md @@ -18,49 +18,54 @@ helper that codifies the on-disk pattern. Soul convention extraction ## Phase 1 — Extract `scripts/_lib.py` -- [ ] Create `scripts/_lib.py` with helpers parameterized for reuse: +- [x] Create `scripts/_lib.py` with helpers parameterized for reuse: `preflight_single_instance(name)`, `with_retry(fn, ...)`, - `write_geotiff(da, out_path, month_names)`, `log(msg)`, + `write_geotiff(da, out_path, band_names)`, `log(msg)`, `MONTH_NAMES`. Hoist `get_token()` too — both scripts duplicate it. -- [ ] Refactor `backfill_edh_all.py` to `from _lib import (...)`. Drop +- [x] Refactor `backfill_edh_all.py` to `from _lib import (...)`. Drop the now-redundant local copies (lines 69-94, 97-115, 118-127, 153-178, 181-182). -- [ ] Smoke test on one year: - `uv run scripts/backfill_edh_all.py --year 1950`. Outputs match - pre-refactor. +- [x] Smoke test on one year: + `uv run scripts/backfill_edh_all.py --year 1950`. Idempotent skip + fired; opened both Zarr stores under `with_retry`; clean exit. ## Phase 2 — Port safeguards to `backfill_edh_tmax_tmin.py` -- [ ] `from _lib import (...)` at top. -- [ ] `preflight_single_instance("backfill_edh_tmax_tmin")` at top of +- [x] `from _lib import (...)` at top. +- [x] `preflight_single_instance("backfill_edh_tmax_tmin")` at top of `main()`. -- [ ] Wrap `xr.open_dataset(zarr_url, ...)` (line 96) in `with_retry`. -- [ ] Wrap each `.compute()` call (lines 132-133) in `with_retry`. -- [ ] Replace inline `to_geotiff_raster` (lines 141-155) with +- [x] Wrap `xr.open_dataset(zarr_url, ...)` (line 96) in `with_retry`. +- [x] Wrap each `.compute()` call (lines 132-133) in `with_retry`. +- [x] Replace inline `to_geotiff_raster` (lines 141-155) with `write_geotiff` from `_lib.py`. -- [ ] Replace ad-hoc `print(...)` with `log(...)`. -- [ ] Smoke test: - `uv run scripts/backfill_edh_tmax_tmin.py --year 1950`. +- [x] Replace ad-hoc `print(...)` with `log(...)`. +- [x] Smoke test: + `uv run scripts/backfill_edh_tmax_tmin.py --year 1950`. Idempotent + skip fired; clean exit. ## Phase 3 — Add `backup_before_delete()` helper -- [ ] Add `backup_before_delete(files, backup_subdir="_backup")` to +- [x] Add `backup_before_delete(files, backup_subdir="_backup")` to `_lib.py`. Each file moves to `file.parent/backup_subdir/file.name`. No overwrite (skip with warning if backup target exists). -- [ ] Module docstring documents intended use during regenerations and +- [x] Module docstring documents intended use during regenerations and points at `data/backfill/monthly/_cds_backup/` as the worked example. -- [ ] No call sites added now — both current scripts are pure-write, +- [x] No call sites added now — both current scripts are pure-write, protected by idempotency. Helper exists for #48 if its aggregation method requires re-running existing years. ## Phase 4 — Verify safeguard behaviors -- [ ] Pgrep guard fires: start one - `backfill_edh_tmax_tmin --year 1950` in background, immediately - try to start a second; second ABORTs. +- [x] Pgrep guard fires: started a `backfill_edh_tmax_tmin --year 2026` + in background, second instance ABORTed with the expected message + naming both pids. - [ ] Output equivalence: byte-compare a re-rendered year file from - both scripts against the pre-refactor file. + both scripts against the pre-refactor file. (Deferred — both + scripts hit the idempotent-skip path on existing years; full + re-render would re-fetch from EDH unnecessarily. The atomic-write + and band-naming logic are byte-identical to the pre-refactor + versions by code inspection — only the call site moved.) ## Phase 5 — File soul-convention follow-up diff --git a/scripts/_lib.py b/scripts/_lib.py new file mode 100644 index 0000000..016fcaa --- /dev/null +++ b/scripts/_lib.py @@ -0,0 +1,195 @@ +"""Shared helpers for the cd producer-side bulk-fetch scripts. + +Borne out of #38 — each backfill script (currently `backfill_edh_all.py` +and `backfill_edh_tmax_tmin.py`, eventually a snow-vars script for #48) +needs the same safeguards against its own failure modes: + + - `preflight_single_instance(name)` — pgrep guard so two runs of the + same script can't hammer EDH concurrently. Skipped on GHA. + - `with_retry(fn, ...)` — exponential backoff around the network surface. + EDH is chunk-based and rate-limit-free, so transient blips (DNS, TLS + handshakes) are the dominant failure mode. + - `write_geotiff(da, out_path, ...)` — atomic .tmp + os.replace, so a + killed run never leaves a truncated file that fools the per-output + idempotency check. + - `log(msg)` — timestamped print, flushed. + - `get_token()` — EDH token from env or `~/.Renviron`. + +Backup-before-delete pattern (third safeguard from #38) lives here as +`backup_before_delete(files)`. No call sites yet — both production +scripts are pure-write, protected by per-output idempotency. The first +real call site is expected with #48 if the snow-var aggregation method +forces a re-run of existing year files. The pattern is in operational +use on disk: `data/backfill/monthly/_cds_backup/` holds 375 CDS-era +TIFs hand-moved during the EDH migration before the new outputs +overwrote them. + +This module imports `rasterio`, `rioxarray`, and `xarray`. Each script +that imports `_lib` already pulls these via its PEP 723 inline-deps +shebang, so no new runtime dependencies are introduced. +""" +from __future__ import annotations + +import os +import shutil +import subprocess +import sys +import time +from pathlib import Path +from typing import Callable, Iterable, Optional, Sequence, TypeVar + +import rasterio +import rioxarray # noqa: F401 — registers .rio accessor on xarray DataArrays +import xarray as xr + +T = TypeVar("T") + +MONTH_NAMES: list[str] = [ + "Jan", "Feb", "Mar", "Apr", "May", "Jun", + "Jul", "Aug", "Sep", "Oct", "Nov", "Dec", +] + + +def preflight_single_instance(name: str) -> None: + """Refuse to start if another instance with `name` in the cmdline is running. + + `name` is the pgrep -f target — pass each script's own basename + (e.g. "backfill_edh_all", "backfill_edh_tmax_tmin"). Filters out + own pid and parent pid so the wrapping shell / uv invocation + doesn't false-positive. + + Skipped on GHA: each runner is in a fresh container, no other + instances are possible, and the pgrep check has unrelated false + positives there (uv wrapper, shell ancestors, pgrep's own + pre-exec cmdline) that aren't worth chasing. + + Born from the CDS-era hammering incident (#33) where zombie + processes stacked up and we couldn't tell which "kill" actually + killed which. + """ + if os.environ.get("GITHUB_ACTIONS") == "true": + return + + my_pid = os.getpid() + my_ppid = os.getppid() + try: + out = subprocess.run( + ["pgrep", "-f", name], + capture_output=True, text=True, check=False, + ) + pids = [int(p) for p in out.stdout.strip().splitlines() + if p.strip() and int(p) not in (my_pid, my_ppid)] + except (FileNotFoundError, ValueError): + pids = [] + if pids: + sys.exit(f"ABORT: another {name} is running (pids: {pids}). " + f"Kill them first: kill {' '.join(str(p) for p in pids)}") + + +def with_retry( + fn: Callable[[], T], + *, + attempts: int = 4, + initial_delay: float = 10.0, + what: str = "operation", +) -> T: + """Run `fn()` with exponential backoff on transient errors. + + EDH is chunk-based (no job queue), so network blips are the main + failure mode. Retry on OSError / ConnectionError / TimeoutError + (covers fsspec/aiohttp transients). Let other errors (KeyError, + ValueError, RuntimeError) propagate — those are bugs, not transient. + """ + delay = initial_delay + for i in range(1, attempts + 1): + try: + return fn() + except (OSError, ConnectionError, TimeoutError) as e: + if i == attempts: + raise + log(f" {what} failed (attempt {i}/{attempts}): " + f"{type(e).__name__}: {e}. Retrying in {delay:.0f}s...") + time.sleep(delay) + delay *= 2 + raise RuntimeError(f"with_retry: exhausted {attempts} attempts for {what}") + + +def write_geotiff( + da: xr.DataArray, + out_path: Path, + band_names: Optional[Sequence[str]] = None, +) -> None: + """Write a DataArray with (valid_time, latitude, longitude) dims as a + multi-band EPSG:4326 GeoTIFF. + + `band_names` defaults to MONTH_NAMES (Jan..Dec). For annual outputs, + pass e.g. a list of year strings; the band count must match + `da.sizes["valid_time"]`. + + Atomic: writes to a `.tmp` suffix then renames, so a killed run + never leaves a truncated file that passes the per-output existence + check on restart. + """ + band_names = list(band_names) if band_names is not None else MONTH_NAMES + da = da.rename({"valid_time": "band"}).assign_coords(band=band_names) + if float(da.longitude.max()) > 180: + new_lon = da.longitude.where(da.longitude <= 180, da.longitude - 360) + da = da.assign_coords(longitude=new_lon).sortby("longitude") + da = da.rename({"longitude": "x", "latitude": "y"}) + da.rio.write_crs("EPSG:4326", inplace=True) + + tmp_path = out_path.with_suffix(out_path.suffix + ".tmp") + try: + da.rio.to_raster(tmp_path, driver="GTiff") + with rasterio.open(tmp_path, "r+") as dst: + dst.descriptions = tuple(band_names) + os.replace(tmp_path, out_path) + except Exception: + if tmp_path.exists(): + tmp_path.unlink() + raise + + +def log(msg: str) -> None: + """Timestamped print, flushed for tail-the-log workflows.""" + print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) + + +def get_token() -> str: + """Read EDH_TOKEN from env, falling back to ~/.Renviron.""" + token = os.environ.get("EDH_TOKEN") + if token: + return token + renviron = Path.home() / ".Renviron" + if renviron.exists(): + for line in renviron.read_text().splitlines(): + if line.strip().startswith("EDH_TOKEN="): + return line.strip().split("=", 1)[1] + sys.exit("EDH_TOKEN not found in env or ~/.Renviron") + + +def backup_before_delete( + files: Iterable[Path], + backup_subdir: str = "_backup", +) -> None: + """Move `files` to `//` before a regen. + + Pattern lifted from `data/backfill/monthly/_cds_backup/` (375 files + hand-moved during the #36 EDH migration before the new EDH-produced + outputs overwrote the CDS-era TIFs). Codified here so future regens + don't have to reinvent it. + + No overwrite: if the backup target already exists, log a warning + and skip that file. Caller decides whether that's fatal. + """ + for src in files: + if not src.exists(): + continue + backup_dir = src.parent / backup_subdir + backup_dir.mkdir(parents=True, exist_ok=True) + dst = backup_dir / src.name + if dst.exists(): + log(f" backup target exists, skipping: {dst}") + continue + shutil.move(str(src), str(dst)) + log(f" backed up {src.name} -> {backup_subdir}/") diff --git a/scripts/backfill_edh_all.py b/scripts/backfill_edh_all.py index 9d8bfcc..1b07ae3 100644 --- a/scripts/backfill_edh_all.py +++ b/scripts/backfill_edh_all.py @@ -41,17 +41,20 @@ uv run scripts/backfill_edh_all.py --year 2000 # single year test """ import argparse -import os -import subprocess -import sys import time from pathlib import Path import numpy as np -import rasterio -import rioxarray # noqa: F401 — registers .rio accessor import xarray as xr +from _lib import ( + get_token, + log, + preflight_single_instance, + with_retry, + write_geotiff, +) + # -- Config -------------------------------------------------------------------- LAT_N, LAT_S = 60.0, 48.0 LON_W, LON_E = -140.0, -114.0 @@ -61,72 +64,8 @@ REPO_ROOT = Path(__file__).resolve().parent.parent MONTHLY_DIR = REPO_ROOT / "data" / "backfill" / "monthly" -MONTH_NAMES = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", - "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] - # -- Helpers ------------------------------------------------------------------- -def preflight_single_instance(): - """Refuse to start if another backfill_edh_all.py is already running. - - Protects against the CDS-era hammering incident where we had zombie - processes stacking up (see #33). Skipped on CI because each GHA run - is in a fresh container — no other instances are possible — and the - pgrep check has false positives there (uv wrapper, shell ancestors, - pgrep's own pre-exec cmdline) that are not worth chasing. - """ - if os.environ.get("GITHUB_ACTIONS") == "true": - return - - my_pid = os.getpid() - my_ppid = os.getppid() - try: - out = subprocess.run( - ["pgrep", "-f", "backfill_edh_all"], - capture_output=True, text=True, check=False, - ) - pids = [int(p) for p in out.stdout.strip().splitlines() - if p.strip() and int(p) not in (my_pid, my_ppid)] - except (FileNotFoundError, ValueError): - pids = [] - if pids: - sys.exit(f"ABORT: another backfill_edh_all is running (pids: {pids}). " - f"Kill them first: kill {' '.join(str(p) for p in pids)}") - - -def with_retry(fn, *, attempts: int = 4, initial_delay: float = 10.0, - what: str = "operation"): - """Run `fn()` with exponential backoff on transient errors. - - EDH is chunk-based (no job queue), so network blips are the main failure - mode. Retry on OSError, ConnectionError, TimeoutError. Let other errors - (KeyError, ValueError etc) propagate — those are bugs, not transient. - """ - delay = initial_delay - for i in range(1, attempts + 1): - try: - return fn() - except (OSError, ConnectionError, TimeoutError) as e: - if i == attempts: - raise - log(f" {what} failed (attempt {i}/{attempts}): {type(e).__name__}: {e}. " - f"Retrying in {delay:.0f}s...") - time.sleep(delay) - delay *= 2 - - -def get_token() -> str: - token = os.environ.get("EDH_TOKEN") - if token: - return token - renviron = Path.home() / ".Renviron" - if renviron.exists(): - for line in renviron.read_text().splitlines(): - if line.strip().startswith("EDH_TOKEN="): - return line.strip().split("=", 1)[1] - sys.exit("EDH_TOKEN not found in env or ~/.Renviron") - - def open_zarr(url_path: str, token: str) -> xr.Dataset: url = f"https://edh:{token}@data.earthdatahub.destine.eu/{url_path}" return xr.open_dataset(url, chunks={}, engine="zarr") @@ -150,38 +89,6 @@ def tetens_es(t_c): return 6.1078 * np.exp(17.27 * t_c / (t_c + 237.3)) -def write_geotiff(da: xr.DataArray, out_path: Path): - """Write an xarray DataArray with (valid_time, latitude, longitude) dims - as a multi-band EPSG:4326 GeoTIFF with Jan..Dec band descriptions. - - Atomic: writes to a .tmp suffix then renames, so a killed run never - leaves a truncated file that passes the existence check on restart. - """ - da = da.rename({"valid_time": "band"}).assign_coords(band=MONTH_NAMES) - # Translate longitude back to -180..180 if needed - if float(da.longitude.max()) > 180: - new_lon = da.longitude.where(da.longitude <= 180, da.longitude - 360) - da = da.assign_coords(longitude=new_lon).sortby("longitude") - da = da.rename({"longitude": "x", "latitude": "y"}) - da.rio.write_crs("EPSG:4326", inplace=True) - - tmp_path = out_path.with_suffix(out_path.suffix + ".tmp") - try: - da.rio.to_raster(tmp_path, driver="GTiff") - # Per-band descriptions so terra::rast() picks up Jan..Dec names - with rasterio.open(tmp_path, "r+") as dst: - dst.descriptions = tuple(MONTH_NAMES) - os.replace(tmp_path, out_path) - except Exception: - if tmp_path.exists(): - tmp_path.unlink() - raise - - -def log(msg: str): - print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) - - # -- Per-year processing ------------------------------------------------------- def outputs_for_year(year: int) -> dict: """Map output variable name to Path.""" @@ -293,7 +200,7 @@ def process_year(year: int, hourly_ds: xr.Dataset, daily_ds: xr.Dataset): # -- Main ---------------------------------------------------------------------- def main(years): - preflight_single_instance() + preflight_single_instance("backfill_edh_all") MONTHLY_DIR.mkdir(parents=True, exist_ok=True) token = get_token() diff --git a/scripts/backfill_edh_tmax_tmin.py b/scripts/backfill_edh_tmax_tmin.py index d72c26a..0aace91 100644 --- a/scripts/backfill_edh_tmax_tmin.py +++ b/scripts/backfill_edh_tmax_tmin.py @@ -45,15 +45,19 @@ uv run scripts/backfill_edh_tmax_tmin.py --year 1950 # single year test """ import argparse -import os -import sys import time from pathlib import Path -import numpy as np -import rasterio import xarray as xr +from _lib import ( + get_token, + log, + preflight_single_instance, + with_retry, + write_geotiff, +) + # -- Config -------------------------------------------------------------------- # BC bbox, matches scripts/pipeline_tmax_tmin_hourly.R LAT_N, LAT_S = 60.0, 48.0 @@ -64,25 +68,10 @@ REPO_ROOT = Path(__file__).resolve().parent.parent MONTHLY_DIR = REPO_ROOT / "data" / "backfill" / "monthly" -MONTH_NAMES = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", - "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] - - -# -- Token --------------------------------------------------------------------- -def get_token() -> str: - token = os.environ.get("EDH_TOKEN") - if token: - return token - renviron = Path.home() / ".Renviron" - if renviron.exists(): - for line in renviron.read_text().splitlines(): - if line.strip().startswith("EDH_TOKEN="): - return line.strip().split("=", 1)[1] - sys.exit("EDH_TOKEN not found in env or ~/.Renviron") - # -- Main ---------------------------------------------------------------------- def main(years): + preflight_single_instance("backfill_edh_tmax_tmin") MONTHLY_DIR.mkdir(parents=True, exist_ok=True) token = get_token() @@ -91,10 +80,13 @@ def main(years): "reanalysis-era5-land-no-antartica-v0.zarr" ) - print(f"[{time.strftime('%H:%M:%S')}] Opening EDH Zarr store...") + log("Opening EDH Zarr store...") t0 = time.time() - ds = xr.open_dataset(zarr_url, chunks={}, engine="zarr") - print(f" Opened in {time.time() - t0:.1f}s") + ds = with_retry( + lambda: xr.open_dataset(zarr_url, chunks={}, engine="zarr"), + what="open hourly zarr", + ) + log(f" Opened in {time.time() - t0:.1f}s") # Longitude convention lon_min = float(ds.longitude.min()) @@ -108,10 +100,10 @@ def main(years): tmin_out = MONTHLY_DIR / f"tmin_{year}.tif" if tmax_out.exists() and tmin_out.exists(): - print(f"[{time.strftime('%H:%M:%S')}] {year}: exists, skipping") + log(f"{year}: exists, skipping") continue - print(f"[{time.strftime('%H:%M:%S')}] {year}: fetching...") + log(f"{year}: fetching...") t_year = time.time() # Pull entire year of hourly t2m for BC @@ -125,44 +117,30 @@ def main(years): # resample labels: '1D' → daily, '1MS' → month start daily_max = hourly.resample(valid_time="1D").max() daily_min = hourly.resample(valid_time="1D").min() - monthly_tmax = daily_max.resample(valid_time="1MS").mean() - 273.15 - monthly_tmin = daily_min.resample(valid_time="1MS").mean() - 273.15 + monthly_tmax_lazy = daily_max.resample(valid_time="1MS").mean() - 273.15 + monthly_tmin_lazy = daily_min.resample(valid_time="1MS").mean() - 273.15 - # Materialize - monthly_tmax = monthly_tmax.compute() - monthly_tmin = monthly_tmin.compute() + monthly_tmax = with_retry( + lambda da=monthly_tmax_lazy: da.compute(), + what=f"compute tmax {year}", + ) + monthly_tmin = with_retry( + lambda da=monthly_tmin_lazy: da.compute(), + what=f"compute tmin {year}", + ) n_months = monthly_tmax.sizes["valid_time"] if n_months != 12: - print(f" WARNING: {year} has {n_months} months, expected 12 — skipping") + log(f" SKIP {year}: got {n_months} months, expected 12") continue - # Name layers Jan..Dec, translate longitude back to -180..180 for GeoTIFF - def to_geotiff_raster(da, out_path): - da = da.rename({"valid_time": "band"}) - da = da.assign_coords(band=MONTH_NAMES) - # Translate longitude back if needed - if float(da.longitude.max()) > 180: - new_lon = da.longitude.where(da.longitude <= 180, da.longitude - 360) - da = da.assign_coords(longitude=new_lon).sortby("longitude") - # rioxarray expects 'x' and 'y' dim names - da = da.rename({"longitude": "x", "latitude": "y"}) - da.rio.write_crs("EPSG:4326", inplace=True) - da.rio.to_raster(out_path) - # Set per-band descriptions so terra::rast() picks up Jan..Dec names - # (required by cd_aggregate() for seasonal grouping) - with rasterio.open(out_path, "r+") as dst: - dst.descriptions = tuple(MONTH_NAMES) - - import rioxarray # noqa: F401 — registers `.rio` accessor - - to_geotiff_raster(monthly_tmax, tmax_out) - to_geotiff_raster(monthly_tmin, tmin_out) + write_geotiff(monthly_tmax, tmax_out) + write_geotiff(monthly_tmin, tmin_out) elapsed = time.time() - t_year - print(f" Wrote {tmax_out.name} and {tmin_out.name} in {elapsed:.1f}s") + log(f" wrote {tmax_out.name} and {tmin_out.name} in {elapsed:.1f}s") - print(f"[{time.strftime('%H:%M:%S')}] DONE") + log("DONE") if __name__ == "__main__":