diff --git a/CLAUDE.md b/CLAUDE.md index eaf85b9..5e69da4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -15,7 +15,7 @@ Infrastructure stack stabilized; methodology research is the active focus. Recen - **v0.27.0** (#114, #45) — `cfg$pipeline$gradient_classes` config knob + per-species filter derivation in `prep_minimal`. Bit-identical bcfp parity by default; lets bundles override the break vector. - **v0.28.0** (#119, #45-followup) — Orphan-class break source: classes below all species' access thresholds enter `gradient_barriers_minimal` as a shared `barriers_orphan` table (segmentation only, no access semantics). New `default_extrabreaks` bundle. Province-wide methodology delta (231 vs 225 WSGs): SK spawn +6.7%, BT/RB/ST/WCT/GR spawn +1–2%, CH/ST rear -11%, BT/CO/GR rear -3 to -7%. "Ceiling sub-segment" mechanism — flat parts of mixed reaches now pass spawn; steep pockets previously folded into rear via averaging exceed rear ceiling as standalone segments. -- **v0.29.0** (#120, #118) — DB hygiene: `cleanup_working = TRUE` in `compare_bcfishpass_wsg` drops working schemas after rollup; `keep_source = FALSE` in `consolidate_schema` drops source schemas after pg_restore. Prevents the disk-full incident that crashed cypher 2026-05-04. +- **v0.29.0** (#120, #118) — DB hygiene: `cleanup_working = TRUE` in `compare_bcfishpass_wsg` drops working schemas after rollup; `keep_source = FALSE` in `schema_consolidate` drops source schemas after pg_restore. Prevents the disk-full incident that crashed cypher 2026-05-04. Cypher recovered (volumes nuked); needs fwapg reload before next provincial run. Methodology research is active: `default_extrabreaks` proved the mechanism; next variants include spawn-only / rear-only break vectors (each adds ~4 new classes; per-WSG cost ~1.3–1.4× vs default) and the channel-class breaks research (#52, blocked on fresh-side helper). diff --git a/DESCRIPTION b/DESCRIPTION index eb9209c..dfb7cd8 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: link Title: Stream Network Habitat Interpretation (Experimental) -Version: 0.37.0 +Version: 0.38.0 Date: 2026-05-14 Authors@R: c( person("Allan", "Irvine", , "airvine@newgraphenvironment.com", diff --git a/NEWS.md b/NEWS.md index c80135d..23e957f 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,28 @@ +# link 0.38.0 + +Provincial-run autonomy CLI + 8 operational-script renames to noun_verb convention. Closes [#172](https://github.com/NewGraphEnvironment/link/issues/172). Builds on v0.37.0's #168 decouple — with PG-state resume in place, the autonomy surface stays thin and the renames stay mechanical. + +- **Single-command autonomous run.** `wsgs_run_pipeline.sh` (was `province_run.sh`) accepts `--wsgs=A,B,C`, `--config=`, `--schema=`, `--no-cyphers`, `--force`, forwards to `wsgs_dispatch.sh` (was `trifecta_provincial.sh`) which intersects the WSG subset in its LPT split. M4+M1-only baseline validated end-to-end: 16-WSG default-bundle dispatch lands 16/16 in `fresh_default.streams` on M4, ~20 min wall, no operator prompts. +- **Step 0 pre-clean.** When `--schema=` is set, umbrella fires `state_clean.sh --schemas=` on every host before Step 1. Drops only the target schema (skips the canonical-fresh heuristic + snapshot reload). Eliminates a class of consolidate failures where stale leftover WSGs on a source host collided with destination data during pg_restore. +- **Scoped `state_clean.sh` (was `province_clean.sh`).** New `--schemas=A,B,C` mode drops only the listed schemas. Empty `--schemas=` rejected loud to prevent dynamic-arg silent fall-through to the destructive default mode. +- **Phantom-cy + error-surface fixes in `wsgs_dispatch.sh`.** R's `paste0("cy", integer(0))` returns `"cy"` length-1 (constant recycling) — would put a non-existent cypher in the host plan under `--no-cyphers`. Three-branched `cy_host_keys`. Empty `CY_WORKSPACES` init via explicit `CY_WS_ARR=()` (was `read -r -a` yielding single-empty-element). `SPLIT_OUT=$(Rscript ...)` wrapped with explicit `||` block so R-side `stop()` messages reach the operator (e.g. `--wsgs=BOGUS` surfaces the R error verbatim instead of silent abort). +- **8 rename mapping (`git mv` preserves `git log --follow`).** Names now describe scope honestly — these scripts work for any list of WSGs / any host count / any reference: + +| Old | New | +|---|---| +| `data-raw/province_run.sh` | `data-raw/wsgs_run_pipeline.sh` | +| `data-raw/province_clean.sh` | `data-raw/state_clean.sh` | +| `data-raw/province_progress.sh` | `data-raw/progress_check.sh` | +| `data-raw/trifecta_provincial.sh` | `data-raw/wsgs_dispatch.sh` | +| `data-raw/run_provincial_parity.R` | `data-raw/wsgs_run_host.R` | +| `data-raw/consolidate_schema.R` | `data-raw/schema_consolidate.R` | +| `data-raw/archive_provincial_runs.sh` | `data-raw/runs_archive.sh` | +| `data-raw/balance_provincial_buckets.R` | `data-raw/buckets_balance.R` | + +The `wsg_*` (singular, per-WSG functions from #168) vs `wsgs_*` (plural, collection-level orchestrators) distinction is now load-bearing in the naming. `compare_bcfishpass_wsg.R → wsg_compare.R` was renamed in #168. + +Filed-but-not-closed follow-ups: cypher integration testing (issue #172 Phase 2 + 3 acceptance — defer until M4+M1 baseline lands repeatably); LPT-fallback empty-bucket edge case when N_WSGs ≤ N_hosts without timing CSVs (pre-existing, not a #172 regression). + # link 0.37.0 Decouple bcfp comparison from the modelling pipeline. Closes [#168](https://github.com/NewGraphEnvironment/link/issues/168). The link package's deliverable — the per-WSG model in `.streams` + per-species habitat + barriers — now runs and is observable independently of any comparison framework. Comparison vs bcfishpass (or any future reference) is a diagnostic overlay that reads the persisted state and never gates whether the model itself ran. diff --git a/R/utils.R b/R/utils.R index 512c24d..82c726e 100644 --- a/R/utils.R +++ b/R/utils.R @@ -193,7 +193,7 @@ #' Probes `.streams` for `watershed_group_code = aoi`. #' Returns `TRUE` when the table exists and has at least one row for #' the WSG; `FALSE` when the table is absent OR has no rows for the -#' WSG. Used by the orchestrator loop (run_provincial_parity.R) as the +#' WSG. Used by the orchestrator loop (wsgs_run_host.R) as the #' canonical resume check — PG state is authoritative, RDS files are #' diagnostic side-artifacts. #' diff --git a/data-raw/README.md b/data-raw/README.md index 1d5bce1..b36c4dc 100644 --- a/data-raw/README.md +++ b/data-raw/README.md @@ -76,10 +76,10 @@ The dispatch hierarchy: trifecta → run_provincial → compare_wsg. | Script | Calls | Purpose | |--------|-------|---------| -| `trifecta_provincial.sh` | `run_provincial_parity.R` (×N hosts) | M4 + M1 + N-cypher orchestrator. Inline LPT bucket allocation (reads `_per_wsg_times.csv` from prior runs, computes balanced split using `--host-speeds=`), pre-flight version check across all hosts, parallel dispatch, RDS pull-back, post-pull `lnk_parity_annotate` against the divergence taxonomy. See "Provincial dispatch" section below for full flag reference + gotchas. | +| `wsgs_dispatch.sh` | `wsgs_run_host.R` (×N hosts) | M4 + M1 + N-cypher orchestrator. Inline LPT bucket allocation (reads `_per_wsg_times.csv` from prior runs, computes balanced split using `--host-speeds=`), pre-flight version check across all hosts, parallel dispatch, RDS pull-back, post-pull `lnk_parity_annotate` against the divergence taxonomy. See "Provincial dispatch" section below for full flag reference + gotchas. | | `trifecta_15wsg.sh` | same | 15-WSG smoke variant (legacy 3-host, hardcoded WSG list). | -| `trifecta_smoke.sh` | `trifecta_provincial.sh` | N-host smoke shim: one small WSG per host, ~3 min wall. See `Provincial dispatch` section. | -| `run_provincial_parity.R` | `compare_bcfishpass_wsg.R` per WSG | Single-host provincial dispatcher. Loops every WSG in `wsg_species_presence`, saves per-WSG RDS, emits per-WSG times CSV. After the loop, optionally annotates the host's bucket against `research/bcfp_divergence_taxonomy.yml` (writes `__annotated.csv`). Accepts `--wsgs=`, `--config=`, `--schema=`, `--rds-dir=`, `--with-mapping-code`. | +| `trifecta_smoke.sh` | `wsgs_dispatch.sh` | N-host smoke shim: one small WSG per host, ~3 min wall. See `Provincial dispatch` section. | +| `wsgs_run_host.R` | `compare_bcfishpass_wsg.R` per WSG | Single-host provincial dispatcher. Loops every WSG in `wsg_species_presence`, saves per-WSG RDS, emits per-WSG times CSV. After the loop, optionally annotates the host's bucket against `research/bcfp_divergence_taxonomy.yml` (writes `__annotated.csv`). Accepts `--wsgs=`, `--config=`, `--schema=`, `--rds-dir=`, `--with-mapping-code`. | | `compare_bcfishpass_wsg.R` | `lnk_pipeline_*` family | Single-WSG end-to-end runner. Sources both connections (local fwapg + bcfp tunnel), runs the 6-phase pipeline, persists, emits comparison rollup tibble (link vs bcfp). The atomic unit of work in every multi-WSG run above. | ## Pipeline support @@ -88,14 +88,14 @@ Run-adjacent helpers (planning, consolidation across hosts). | Script | Purpose | |--------|---------| -| `balance_provincial_buckets.R` | Standalone LPT planner for the 3-host case. Reads per-host wall times from prior runs and prints buckets ready to paste into `trifecta_provincial.sh --m4-bucket=…`. **Superseded for the N-host orchestrator** — `trifecta_provincial.sh` now computes the LPT plan inline at dispatch time using the same algorithm. Kept here for one-off planning + cross-checks. Dedups `(wsg, host)` and across hosts before LPT so multi-run CSV accumulation doesn't double-assign WSGs. | -| `consolidate_schema.R` | pg_dump from M1 + cypher → scp to M4 → pg_restore --data-only. Bucket-aware destination cleanup (DELETEs each source host's WSG bucket from destination tables before restore — avoids duplicate-key violations on re-consolidation). `ok = TRUE` requires pg_restore rc=0 AND post-restore row count > 0; rc=0 with empty schema flags as failure. | -| `archive_provincial_runs.sh` | Moves the current top-level `_per_wsg_times.csv` + `*.rds` + `*_annotated.csv` artifacts in `provincial_/` to `archive//`. Operator cadence: run between provincial runs when you want the LPT planner to use the most recent run only. Skip to median-over multiple recent runs. | -| `trifecta_smoke.sh` | Thin shim over `trifecta_provincial.sh` — one small WSG per host (m4→DEAD, m1→ELKR, cyN→ADMS/BABL/BULL). ~3 min wall. Exercises every orchestrator code path (preflight, dispatch, tunnel, RDS pull-back, annotation) before committing to a 200-WSG run. All flags pass through (e.g. `--cy-workspaces=`, `--with-mapping-code`). | +| `buckets_balance.R` | Standalone LPT planner for the 3-host case. Reads per-host wall times from prior runs and prints buckets ready to paste into `wsgs_dispatch.sh --m4-bucket=…`. **Superseded for the N-host orchestrator** — `wsgs_dispatch.sh` now computes the LPT plan inline at dispatch time using the same algorithm. Kept here for one-off planning + cross-checks. Dedups `(wsg, host)` and across hosts before LPT so multi-run CSV accumulation doesn't double-assign WSGs. | +| `schema_consolidate.R` | pg_dump from M1 + cypher → scp to M4 → pg_restore --data-only. Bucket-aware destination cleanup (DELETEs each source host's WSG bucket from destination tables before restore — avoids duplicate-key violations on re-consolidation). `ok = TRUE` requires pg_restore rc=0 AND post-restore row count > 0; rc=0 with empty schema flags as failure. | +| `runs_archive.sh` | Moves the current top-level `_per_wsg_times.csv` + `*.rds` + `*_annotated.csv` artifacts in `provincial_/` to `archive//`. Operator cadence: run between provincial runs when you want the LPT planner to use the most recent run only. Skip to median-over multiple recent runs. | +| `trifecta_smoke.sh` | Thin shim over `wsgs_dispatch.sh` — one small WSG per host (m4→DEAD, m1→ELKR, cyN→ADMS/BABL/BULL). ~3 min wall. Exercises every orchestrator code path (preflight, dispatch, tunnel, RDS pull-back, annotation) before committing to a 200-WSG run. All flags pass through (e.g. `--cy-workspaces=`, `--with-mapping-code`). | -## Provincial dispatch (`trifecta_provincial.sh`) +## Provincial dispatch (`wsgs_dispatch.sh`) -The flagship orchestrator. Dispatches `run_provincial_parity.R` across +The flagship orchestrator. Dispatches `wsgs_run_host.R` across M4 + M1 + N cyphers in parallel, pulls RDS files back, and emits a province-wide annotated CSV. @@ -106,7 +106,7 @@ cd ~/Projects/repo/link/data-raw # Optional: archive prior run's CSVs first if you want LPT to plan # against this run only (not median-of-recent-runs): -./archive_provincial_runs.sh +./runs_archive.sh # Smoke-test first (~3 min, one small WSG per host) — catches preflight, # tunnel, dispatch, and annotation surprises before the full run: @@ -114,14 +114,14 @@ cd ~/Projects/repo/link/data-raw ./trifecta_smoke.sh --cy-workspaces=job1,job2,job3 # 5-host smoke # Full run: -./trifecta_provincial.sh # 3-host default -./trifecta_provincial.sh --cy-workspaces=job1,job2,job3 # 5-host +./wsgs_dispatch.sh # 3-host default +./wsgs_dispatch.sh --cy-workspaces=job1,job2,job3 # 5-host # Add per-segment mapping_code lens (+50% cost): -./trifecta_provincial.sh --cy-workspaces=job1,job2,job3 --with-mapping-code +./wsgs_dispatch.sh --cy-workspaces=job1,job2,job3 --with-mapping-code # Custom host-speed factors (lower = faster): -./trifecta_provincial.sh --host-speeds=m4=1.0,m1=0.83,cy=1.83 +./wsgs_dispatch.sh --host-speeds=m4=1.0,m1=0.83,cy=1.83 ``` **Recommended cadence:** archive → smoke → full run. The smoke catches @@ -240,7 +240,7 @@ Research scratchpad and one-off verification scripts. | Script | Purpose | |--------|---------| -| `_targets.R` | The (legacy) targets pipeline that pre-dated `trifecta_provincial.sh`. Kept for the multi-WSG comparison harness. | +| `_targets.R` | The (legacy) targets pipeline that pre-dated `wsgs_dispatch.sh`. Kept for the multi-WSG comparison harness. | | `exp_gradient_extra_breaks.R` | Experimental script that prototyped the orphan-class break source via in-line `frs_break_apply` before it was absorbed into `lnk_pipeline_prep_minimal()` (link v0.28.0). Kept as the smoke-test reference. | | `rule_flexibility_demo.R` / `rule_flexibility_render.R` | Demonstration of rules.yaml format flexibility, rendered as RMarkdown. | | `regress_dams_isolation.R` | One-off regression test for the dams-isolation work (link #109). | @@ -266,9 +266,9 @@ discoverable without opening the file: Run artifacts land in subdirectories of `data-raw/logs/` keyed by topic: -- `provincial_parity/`, `provincial_default/`, `provincial_default_extrabreaks/` — per-WSG RDS + per-WSG times CSV from `run_provincial_parity.R`. +- `provincial_parity/`, `provincial_default/`, `provincial_default_extrabreaks/` — per-WSG RDS + per-WSG times CSV from `wsgs_run_host.R`. - `methodology_delta/` — schema-vs-schema delta RDS from `methodology_delta_query.R`. -- `dumps_/` — pg_dump custom-format files from `consolidate_schema.R`. +- `dumps_/` — pg_dump custom-format files from `schema_consolidate.R`. - `_*.txt` — orchestrator + per-host run logs. Reusable helper scripts read from these directories without hardcoded @@ -290,7 +290,7 @@ single bundle's persistent schema. Rough footprints on a 232-WSG run: The 2026-05-04 cypher disk-full incident filled a 96 GB droplet with 3 accumulated bundles + 60 working schemas at once. After the v0.29.0 hygiene fixes (`compare_bcfishpass_wsg(cleanup_working = TRUE)` drops -working schemas on completion; `consolidate_schema(keep_source = FALSE)` +working schemas on completion; `schema_consolidate(keep_source = FALSE)` drops source persistent schema after successful pg_restore), a single-bundle-in-flight worker holds ~60 GB total — comfortable on the existing 96 GB cypher tier. diff --git a/data-raw/balance_provincial_buckets.R b/data-raw/buckets_balance.R similarity index 95% rename from data-raw/balance_provincial_buckets.R rename to data-raw/buckets_balance.R index 63dbe6c..cabfcd6 100644 --- a/data-raw/balance_provincial_buckets.R +++ b/data-raw/buckets_balance.R @@ -9,7 +9,7 @@ # cypher caught up. This script projects ~10-15 min savings per run. # # Usage: -# Rscript data-raw/balance_provincial_buckets.R +# Rscript data-raw/buckets_balance.R # # Hardcoded inputs (override above when re-running with new baseline): # - Yesterday's host log paths (one per host) @@ -23,16 +23,16 @@ suppressPackageStartupMessages({}) logs_dir <- "/Users/airvine/Projects/repo/link/data-raw/logs" -# Prefer per-WSG CSVs (emitted by run_provincial_parity.R after this script +# Prefer per-WSG CSVs (emitted by wsgs_run_host.R after this script # was added). Fall back to text-log regex parsing for older runs. csvs <- list.files(file.path(logs_dir, "provincial_parity"), pattern = "_per_wsg_times\\.csv$", full.names = TRUE) csvs <- c(csvs, list.files(file.path(logs_dir, "provincial_default"), pattern = "_per_wsg_times\\.csv$", full.names = TRUE)) if (length(csvs) == 0) { - m4_log <- file.path(logs_dir, "202605031423_trifecta_provincial_m4.txt") - m1_log <- file.path(logs_dir, "202605031423_trifecta_provincial_m1.txt") - cy_log <- "/Users/airvine/Projects/repo/rtj/scripts/cypher/logs/202605031423_cypher-run_202605031423_trifecta_provincial_cypher.txt" + m4_log <- file.path(logs_dir, "202605031423_wsgs_dispatch_m4.txt") + m1_log <- file.path(logs_dir, "202605031423_wsgs_dispatch_m1.txt") + cy_log <- "/Users/airvine/Projects/repo/rtj/scripts/cypher/logs/202605031423_cypher-run_202605031423_wsgs_dispatch_cypher.txt" } # Host speed factors (M4 = reference). Use yesterday's per-host MEAN diff --git a/data-raw/province_progress.sh b/data-raw/progress_check.sh similarity index 88% rename from data-raw/province_progress.sh rename to data-raw/progress_check.sh index 4ff3332..aa2026d 100755 --- a/data-raw/province_progress.sh +++ b/data-raw/progress_check.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# province_progress.sh — report live progress of an in-flight provincial dispatch. +# progress_check.sh — report live progress of an in-flight provincial dispatch. # # Reads each host's newest `_per_wsg_times.csv` (by mtime, NOT by date glob — # cypher logs use UTC, M4/M1 use local TZ; date-globbing across hosts breaks @@ -9,9 +9,9 @@ # plus a sample of the most recent completions to see what each host is on. # # Usage: -# bash data-raw/province_progress.sh [--cy-workspaces=job1,job2,job3] [--mtime-min=120] +# bash data-raw/progress_check.sh [--cy-workspaces=job1,job2,job3] [--mtime-min=120] # -# Honors /tmp/cy_ips.env if present (set by trifecta_provincial.sh dispatch). +# Honors /tmp/cy_ips.env if present (set by wsgs_dispatch.sh dispatch). # Otherwise derives cypher IPs from tofu state per workspace. # # --mtime-min=N : only consider CSVs modified in the last N minutes (default 240 @@ -85,11 +85,11 @@ done echo # Orchestrator-side state -if pgrep -f "trifecta_provincial.sh" >/dev/null 2>&1; then - PID=$(pgrep -f "trifecta_provincial.sh" | head -1) +if pgrep -f "wsgs_dispatch.sh" >/dev/null 2>&1; then + PID=$(pgrep -f "wsgs_dispatch.sh" | head -1) echo "dispatch process: ✓ PID=$PID (running)" # Find latest orchestrator log to extract bucket sizes if possible - LATEST_ORCH=$(find ~/Projects/repo/link/data-raw/logs -maxdepth 1 -name '*_trifecta_provincial_orchestrator.txt' -mmin -$MTIME_MIN | sort | tail -1) + LATEST_ORCH=$(find ~/Projects/repo/link/data-raw/logs -maxdepth 1 -name '*_wsgs_dispatch_orchestrator.txt' -mmin -$MTIME_MIN | sort | tail -1) if [ -n "$LATEST_ORCH" ]; then echo "orchestrator log: $LATEST_ORCH" grep -E "total WSGs|projected finish" "$LATEST_ORCH" 2>/dev/null | head -2 diff --git a/data-raw/province_run.sh b/data-raw/province_run.sh deleted file mode 100755 index 0d55da6..0000000 --- a/data-raw/province_run.sh +++ /dev/null @@ -1,266 +0,0 @@ -#!/usr/bin/env bash -# province_run.sh — top-level wrapper for the full provincial parity run. -# -# Orchestrates the 10-step sequence documented in -# research/post_compact_provincial_handoff.md: -# pre-flight → fail loud before any spend -# 1+2. snapshot_bcfp.sh on M4 + M1 (parallel) -# 3. cypher_up.sh job1/job2/job3 (parallel) -# 4. cypher_prep.sh on each cypher (parallel) -# 5. archive_provincial_runs.sh on all 5 hosts (parallel) -# 6. SMOKE — fail-fast -# 7. FULL DISPATCH -# 8. acceptance bar check (UNEXPLAINED at |diff_pct|>=2% == 0) -# 9. consolidate fresh schema → M4 -# 10. BURN CYPHERS — fires via trap EXIT -# -# Cypher burn is `trap EXIT` so it fires regardless of failure mode in -# steps 6-9. Steps 1-3 set CYPHERS_UP=1 once cyphers exist, so the trap -# only attempts burn when there's something to burn. -# -# Usage: -# bash data-raw/province_run.sh [--skip-smoke] [--no-mapping-code] [--keep-cyphers] -# -# Total wall: ~95-110 min Cypher cost: ~$1-2 - -set -euo pipefail - -# --- args --- -SKIP_SMOKE=0 -NO_MAPPING=0 -KEEP_CYPHERS=0 -for arg in "$@"; do - case "$arg" in - --skip-smoke) SKIP_SMOKE=1 ;; - --no-mapping-code) NO_MAPPING=1 ;; - --keep-cyphers) KEEP_CYPHERS=1 ;; - *) echo "FATAL: unknown arg: $arg" >&2; exit 1 ;; - esac -done - -MAPPING_FLAG="--with-mapping-code" -[ "$NO_MAPPING" = "1" ] && MAPPING_FLAG="" - -REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" -cd "$REPO_ROOT" -TS="$(date -u +%Y%m%d_%H%M%S)" -LOG_DIR="$REPO_ROOT/data-raw/logs/province_run" -mkdir -p "$LOG_DIR" -LOG="$LOG_DIR/${TS}_province_run.log" -exec > >(tee -a "$LOG") 2>&1 - -START_EPOCH=$(date +%s) -echo "=== province_run.sh $TS ===" -echo " log: $LOG" -echo " mapping: $([ "$NO_MAPPING" = "0" ] && echo with || echo without)" -echo " smoke: $([ "$SKIP_SMOKE" = "0" ] && echo on || echo SKIPPED)" -echo " keep-cy: $([ "$KEEP_CYPHERS" = "0" ] && echo no || echo YES)" - -# --- trap: burn cyphers on exit, but only if we ever spun them --- -CYPHERS_UP=0 -burn_cyphers() { - local rc=$? - if [ "$CYPHERS_UP" = "0" ]; then - echo "=== trap EXIT: no cyphers spun, nothing to burn ===" - return $rc - fi - if [ "$KEEP_CYPHERS" = "1" ]; then - echo "=== trap EXIT: --keep-cyphers given; NOT burning ===" - echo " manually: cd ~/Projects/repo/rtj/scripts/cypher && \\" - echo " for WS in job1 job2 job3; do ./cypher_down.sh --workspace \$WS & done; wait" - return $rc - fi - echo "=== Step 10: BURN CYPHERS (trap EXIT, mandatory) ===" - cd ~/Projects/repo/rtj/scripts/cypher - for WS in job1 job2 job3; do - ./cypher_down.sh --workspace "$WS" > "$LOG_DIR/${TS}_burn_$WS.log" 2>&1 & - done - wait - echo "--- destruction verification ---" - local clean=1 - for WS in job1 job2 job3; do - local n - n=$(cd ~/Projects/repo/rtj/env/do/dev/cypher && TF_WORKSPACE="$WS" tofu state list 2>/dev/null | wc -l | tr -d ' ') - echo " cy[$WS]: $n tofu resources (expect 0)" - [ "$n" = "0" ] || clean=0 - done - if doctl compute droplet list --no-header 2>/dev/null | grep -qi cypher; then - echo " ✗ doctl still shows cypher droplets:" - doctl compute droplet list --no-header 2>/dev/null | grep -i cypher - clean=0 - else - echo " ✓ doctl: no cypher droplets" - fi - [ "$clean" = "1" ] && echo " ✓ burn clean" || echo " ✗ BURN INCOMPLETE — investigate before next run" - return $rc -} -trap burn_cyphers EXIT - -# --- pre-flight --- -echo "=== pre-flight ===" -fail=0 -pg_isready -h localhost -p 63333 >/dev/null 2>&1 || { echo " ✗ bcfp tunnel down (:63333)"; fail=1; } -pg_isready -h localhost -p 5432 >/dev/null 2>&1 || { echo " ✗ local fwapg down (:5432)"; fail=1; } -Rscript -e 'q(status = if (nchar(Sys.getenv("PG_PASS_SHARE")) > 0) 0 else 1)' >/dev/null 2>&1 \ - || { echo " ✗ PG_PASS_SHARE not visible to R (check ~/.Renviron)"; fail=1; } -ssh -o ConnectTimeout=3 m1 'hostname' >/dev/null 2>&1 || { echo " ✗ m1 ssh failed"; fail=1; } -doctl compute droplet list --no-header >/dev/null 2>&1 || { echo " ✗ doctl not authed"; fail=1; } -( cd ~/Projects/repo/rtj/env/do/dev/cypher && tofu workspace list >/dev/null 2>&1 ) \ - || { echo " ✗ tofu workspace list failed"; fail=1; } -[ "$fail" = "0" ] || { echo "FATAL: pre-flight failed; aborting before spend"; exit 1; } -echo " ✓ pre-flight clean" - -# --- Step 1+2: snapshot_bcfp.sh on M4 + M1 (parallel) --- -echo "=== Step 1+2: snapshot_bcfp.sh --force on M4 + M1 ===" -( PGUSER=postgres PGPASSWORD=postgres PGHOST=localhost PGPORT=5432 PGDATABASE=fwapg \ - bash data-raw/snapshot_bcfp.sh --force > "$LOG_DIR/${TS}_snapshot_m4.log" 2>&1 ) & -M4_PID=$! -( ssh m1 'export PGUSER=postgres PGPASSWORD=postgres PGHOST=localhost PGPORT=5432 PGDATABASE=fwapg && \ - cd ~/Projects/repo/link && bash data-raw/snapshot_bcfp.sh --force' \ - > "$LOG_DIR/${TS}_snapshot_m1.log" 2>&1 ) & -M1_PID=$! -wait $M4_PID || { echo "FATAL: M4 snapshot failed; see $LOG_DIR/${TS}_snapshot_m4.log"; exit 1; } -wait $M1_PID || { echo "FATAL: M1 snapshot failed; see $LOG_DIR/${TS}_snapshot_m1.log"; exit 1; } -echo " ✓ snapshots done" - -# --- Step 3: spin 3 cyphers (parallel) --- -echo "=== Step 3: cypher_up.sh job1/job2/job3 ===" -cd ~/Projects/repo/rtj/scripts/cypher -for WS in job1 job2 job3; do - ./cypher_up.sh --workspace "$WS" > "$LOG_DIR/${TS}_up_$WS.log" 2>&1 & -done -wait -cd "$REPO_ROOT" -declare -A CY_IP -for WS in job1 job2 job3; do - IP=$(cd ~/Projects/repo/rtj/env/do/dev/cypher && TF_WORKSPACE="$WS" tofu output -raw droplet_ip 2>/dev/null) || { - echo "FATAL: tofu output droplet_ip failed for $WS; see $LOG_DIR/${TS}_up_$WS.log" - exit 1 - } - [ -n "$IP" ] || { echo "FATAL: empty droplet_ip for $WS"; exit 1; } - CY_IP[$WS]="$IP" - echo " cy[$WS] = $IP" -done -CYPHERS_UP=1 # trap EXIT will now attempt burn - -# --- Step 4: per-cypher prep (parallel) --- -echo "=== Step 4: cypher_prep.sh on all 3 cyphers ===" -for WS in job1 job2 job3; do - IP="${CY_IP[$WS]}" - ( scp -q data-raw/cypher_prep.sh "cypher@$IP:/tmp/cypher_prep.sh" && \ - ssh "cypher@$IP" "bash /tmp/cypher_prep.sh" ) > "$LOG_DIR/${TS}_prep_$WS.log" 2>&1 & -done -wait -for WS in job1 job2 job3; do - if ! grep -q "snapshot_bcfp.sh: complete" "$LOG_DIR/${TS}_prep_$WS.log" 2>/dev/null; then - echo "FATAL: cypher[$WS] prep failed; see $LOG_DIR/${TS}_prep_$WS.log" - exit 1 - fi -done -echo " ✓ cyphers prepped" - -# --- Step 5: archive prior RDS on all 5 hosts (parallel) --- -echo "=== Step 5: archive_provincial_runs.sh on all hosts ===" -bash data-raw/archive_provincial_runs.sh > "$LOG_DIR/${TS}_archive_m4.log" 2>&1 & -ssh m1 'cd ~/Projects/repo/link/data-raw && ./archive_provincial_runs.sh' \ - > "$LOG_DIR/${TS}_archive_m1.log" 2>&1 & -for WS in job1 job2 job3; do - IP="${CY_IP[$WS]}" - ssh "cypher@$IP" 'cd ~/Projects/repo/link/data-raw && ./archive_provincial_runs.sh' \ - > "$LOG_DIR/${TS}_archive_$WS.log" 2>&1 & -done -wait -echo " ✓ archived" - -# --- Step 6: SMOKE (fail-fast) --- -if [ "$SKIP_SMOKE" = "0" ]; then - echo "=== Step 6: SMOKE ===" - cd "$REPO_ROOT/data-raw" - if ! bash trifecta_smoke.sh --cy-workspaces=job1,job2,job3 $MAPPING_FLAG \ - > "$LOG_DIR/${TS}_smoke.log" 2>&1; then - echo "FATAL: smoke FAILED; see $LOG_DIR/${TS}_smoke.log" - grep -E "smoke.*FAILED|smoke.*ERROR|SMOKE_ERR:" "$LOG_DIR/${TS}_smoke.log" | head -10 || true - exit 1 - fi - echo " ✓ smoke clean" - cd "$REPO_ROOT" -fi - -# --- Step 7: FULL DISPATCH --- -echo "=== Step 7: full provincial dispatch (~80-95 min wall) ===" -cd "$REPO_ROOT/data-raw" -if ! bash trifecta_provincial.sh --cy-workspaces=job1,job2,job3 $MAPPING_FLAG \ - > "$LOG_DIR/${TS}_full.log" 2>&1; then - echo "WARNING: trifecta_provincial.sh exited non-zero; partial result may exist" - # don't exit — let acceptance + consolidate inspect what landed -fi -echo "--- full dispatch tail ---" -tail -15 "$LOG_DIR/${TS}_full.log" -cd "$REPO_ROOT" - -# --- Step 8: acceptance bar --- -echo "=== Step 8: acceptance bar ===" -ANN_CSV=$(ls -1t data-raw/logs/provincial_parity/*_annotated.csv 2>/dev/null | head -1 || true) -if [ -z "$ANN_CSV" ]; then - echo " ✗ no annotated.csv found — dispatch likely failed before annotation" - exit 1 -fi -N_UNEXP=$(Rscript -e " -ann <- read.csv('$ANN_CSV', stringsAsFactors=FALSE) -cat(nrow(ann[ann\$class == 'UNEXPLAINED' & abs(ann\$diff_pct) >= 2, ])) -") -echo " annotated: $ANN_CSV" -echo " UNEXPLAINED at |diff_pct|>=2%: $N_UNEXP" -if [ "$N_UNEXP" -gt 0 ]; then - echo " WARNING: $N_UNEXP UNEXPLAINED rows — surface to user; consolidate still proceeds" -fi - -# --- Step 9: consolidate fresh schema → M4 --- -echo "=== Step 9: consolidate fresh schema ===" -ORCH_LOG=$(ls -1t data-raw/logs/*_trifecta_provincial_orchestrator.txt 2>/dev/null | head -1 || true) -if [ -z "$ORCH_LOG" ]; then - echo " ✗ no orchestrator log found — cannot extract per-host buckets" - exit 1 -fi -M1_BUCKET=$(grep '^ m1 bucket:' "$ORCH_LOG" | sed 's/.*bucket: //' || true) -CY1_BUCKET=$(grep '^ cypher\[job1\] bucket:' "$ORCH_LOG" | sed 's/.*bucket: //' || true) -CY2_BUCKET=$(grep '^ cypher\[job2\] bucket:' "$ORCH_LOG" | sed 's/.*bucket: //' || true) -CY3_BUCKET=$(grep '^ cypher\[job3\] bucket:' "$ORCH_LOG" | sed 's/.*bucket: //' || true) -if [ -z "$M1_BUCKET" ] || [ -z "$CY1_BUCKET" ] || [ -z "$CY2_BUCKET" ] || [ -z "$CY3_BUCKET" ]; then - echo " ✗ failed to extract one or more buckets from $ORCH_LOG" - echo " m1=$M1_BUCKET cy1=$CY1_BUCKET cy2=$CY2_BUCKET cy3=$CY3_BUCKET" - exit 1 -fi - -cd "$REPO_ROOT/data-raw" -M1_BUCKET="$M1_BUCKET" CY1_BUCKET="$CY1_BUCKET" CY2_BUCKET="$CY2_BUCKET" CY3_BUCKET="$CY3_BUCKET" \ -CY1_IP="${CY_IP[job1]}" CY2_IP="${CY_IP[job2]}" CY3_IP="${CY_IP[job3]}" \ -Rscript -e ' -suppressPackageStartupMessages({library(link)}) -source("consolidate_schema.R") -result <- consolidate_schema( - schema = "fresh", - sources = list( - list(host = "m1", via = "docker", bucket = strsplit(Sys.getenv("M1_BUCKET"), ",")[[1]]), - list(host = paste0("cypher@", Sys.getenv("CY1_IP")), via = "docker", bucket = strsplit(Sys.getenv("CY1_BUCKET"), ",")[[1]]), - list(host = paste0("cypher@", Sys.getenv("CY2_IP")), via = "docker", bucket = strsplit(Sys.getenv("CY2_BUCKET"), ",")[[1]]), - list(host = paste0("cypher@", Sys.getenv("CY3_IP")), via = "docker", bucket = strsplit(Sys.getenv("CY3_BUCKET"), ",")[[1]]) - ), - backup = TRUE) -print(result) -saveRDS(result, "/tmp/consolidate_result.rds") -' > "$LOG_DIR/${TS}_consolidate.log" 2>&1 || { - echo " ✗ consolidate_schema.R failed; see $LOG_DIR/${TS}_consolidate.log" - exit 1 -} -echo " ✓ consolidated (see $LOG_DIR/${TS}_consolidate.log)" -cd "$REPO_ROOT" - -# --- summary --- -END_EPOCH=$(date +%s) -WALL=$(( END_EPOCH - START_EPOCH )) -echo -echo "=== province_run.sh complete in ${WALL}s (~$((WALL/60))m) ===" -echo " annotated CSV: $ANN_CSV" -echo " UNEXPLAINED ≥2%: $N_UNEXP" -echo " trap EXIT will now burn cyphers (unless --keep-cyphers)" diff --git a/data-raw/query_schema_delta.R b/data-raw/query_schema_delta.R index 0b13189..12fccfb 100644 --- a/data-raw/query_schema_delta.R +++ b/data-raw/query_schema_delta.R @@ -5,7 +5,7 @@ # `.streams` + `.streams_habitat_` pairs. # # Schemas are populated by lnk_pipeline_persist via a provincial trifecta -# run (see trifecta_provincial.sh). This script reads streams + +# run (see wsgs_dispatch.sh). This script reads streams + # streams_habitat_ from each schema and emits: # 1. Province-wide totals per species (spawn / rear / accessible km) # 2. Per-species delta summary (km, percent, # WSGs shifted) diff --git a/data-raw/archive_provincial_runs.sh b/data-raw/runs_archive.sh similarity index 90% rename from data-raw/archive_provincial_runs.sh rename to data-raw/runs_archive.sh index f67fb4e..a43f909 100755 --- a/data-raw/archive_provincial_runs.sh +++ b/data-raw/runs_archive.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash # Archive per-run artifacts in data-raw/logs/provincial_/ to # data-raw/logs/provincial_/archive// so the LPT planner -# (both trifecta_provincial.sh inline and balance_provincial_buckets.R) +# (both wsgs_dispatch.sh inline and buckets_balance.R) # sees only the LATEST run's _per_wsg_times.csv files in the top level. # # Operator cadence: run this BEFORE kicking off a new provincial run if @@ -10,8 +10,8 @@ # smoothing out noisy one-offs but slower to react to host changes). # # Usage: -# ./archive_provincial_runs.sh # bcfishpass (default) -# ./archive_provincial_runs.sh --config=default # different bundle +# ./runs_archive.sh # bcfishpass (default) +# ./runs_archive.sh --config=default # different bundle # # What's archived: # - *_per_wsg_times.csv (drives LPT) diff --git a/data-raw/consolidate_schema.R b/data-raw/schema_consolidate.R similarity index 98% rename from data-raw/consolidate_schema.R rename to data-raw/schema_consolidate.R index 374442e..8e24d89 100644 --- a/data-raw/consolidate_schema.R +++ b/data-raw/schema_consolidate.R @@ -1,12 +1,12 @@ #!/usr/bin/env Rscript -# data-raw/consolidate_schema.R +# data-raw/schema_consolidate.R # # Consolidate a Postgres schema from multiple remote hosts onto the # local fwapg via `pg_dump -Fc` + scp + `pg_restore --data-only`. # # Usage: # Source this file (or Rscript), then: -# consolidate_schema( +# schema_consolidate( # schema = "fresh", # sources = list( # list(host = "m1", via = "docker", container = "fresh-db"), @@ -18,7 +18,7 @@ # pattern (fresh schema across M4 + M1 + cypher) but not generalized # to arbitrary table subsets, alternative protocols (COPY streaming, # logical replication), or arbitrary destination conns. Promote to -# `lnk_consolidate_schema()` only after using it 2-3 times for +# `lnk_schema_consolidate()` only after using it 2-3 times for # different schemas — the right API will emerge from real usage. # # Why pg_dump/restore not COPY streaming: per-host dumps act as @@ -60,7 +60,7 @@ #' retry. #' #' @return Invisibly: list of per-source pg_dump + restore outcomes. -consolidate_schema <- function(schema, +schema_consolidate <- function(schema, sources, backup = TRUE, dest_conn = link::lnk_db_conn(), @@ -246,7 +246,7 @@ consolidate_schema <- function(schema, # --------------------------------------------------------------------------- if (!interactive() && length(commandArgs(trailingOnly = TRUE)) == 0L && sys.nframe() == 0L) { - consolidate_schema( + schema_consolidate( schema = "fresh", sources = list( list(host = "m1", via = "docker"), diff --git a/data-raw/province_clean.sh b/data-raw/state_clean.sh similarity index 59% rename from data-raw/province_clean.sh rename to data-raw/state_clean.sh index e767f8e..a43690d 100755 --- a/data-raw/province_clean.sh +++ b/data-raw/state_clean.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# province_clean.sh — wipe link-pipeline state on all hosts to a known-clean +# state_clean.sh — wipe link-pipeline state on all hosts to a known-clean # baseline. Idempotent. Runs in <5 min wall on a healthy cluster. # # What it cleans: @@ -18,27 +18,52 @@ # - bcfishpass_ref (reference data; not pipeline output) # # Usage: -# bash data-raw/province_clean.sh [--cy-workspaces=job1,job2,job3] [--skip-m1] [--skip-cy] +# bash data-raw/state_clean.sh [flags] # -# Honors /tmp/cy_ips.env if present (set by trifecta_provincial.sh dispatch +# Flags: +# --cy-workspaces=A,B,C cypher workspaces to clean (default: job1,job2,job3) +# --skip-m1 skip M1 +# --skip-cy skip all cyphers +# --schemas=A,B,C SCOPED MODE — drop ONLY these exact schemas. +# Skips the working*/fresh_*/fresh heuristic AND +# skips the snapshot_bcfp.sh re-run (canonical state +# not touched). Use for per-bundle pre-cleans like +# `--schemas=fresh_default` before subset dispatches. +# +# Honors /tmp/cy_ips.env if present (set by wsgs_dispatch.sh dispatch # wrapper). Otherwise derives cypher IPs from tofu state. # -# Expected wall: ~2-3 min (parallel across all hosts). +# Expected wall: +# Full mode (default): ~2-3 min (drop + snapshot reload, parallel) +# Scoped (--schemas=): ~10-20 s (drop only) set -euo pipefail CY_WORKSPACES="job1,job2,job3" SKIP_M1=0 SKIP_CY=0 +SCOPED_SCHEMAS="" +SAW_SCHEMAS=0 for arg in "$@"; do case "$arg" in --cy-workspaces=*) CY_WORKSPACES="${arg#--cy-workspaces=}" ;; --skip-m1) SKIP_M1=1 ;; --skip-cy) SKIP_CY=1 ;; + --schemas=*) SCOPED_SCHEMAS="${arg#--schemas=}"; SAW_SCHEMAS=1 ;; *) echo "FATAL: unknown arg: $arg" >&2; exit 1 ;; esac done +# Guard against empty `--schemas=` falling through to the destructive +# heuristic full-wipe. Callers that build the arg dynamically and end +# up with an empty value need to know — silently wiping `fresh` is the +# wrong default for "the operator forgot to populate $VAR". +if [ "$SAW_SCHEMAS" = "1" ] && [ -z "$SCOPED_SCHEMAS" ]; then + echo "FATAL: --schemas= requires at least one schema (got empty value)." >&2 + echo " Omit --schemas= entirely to invoke the heuristic full-wipe mode." >&2 + exit 1 +fi + REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" # --- Resolve cypher IPs --- @@ -54,12 +79,12 @@ if [ "$SKIP_CY" = "0" ]; then done fi -echo "=== province_clean.sh starting $(date -u +%Y-%m-%dT%H:%M:%SZ) ===" +echo "=== state_clean.sh starting $(date -u +%Y-%m-%dT%H:%M:%SZ) ===" echo " hosts: M4 + $([ "$SKIP_M1" = "0" ] && echo "M1 + ")$([ "$SKIP_CY" = "0" ] && echo "${#CY_IPS[@]} cyphers" || echo "no cyphers")" # --- Step 1: kill in-flight processes --- echo "--- step 1: kill in-flight dispatch ---" -ps -ef | grep -E "trifecta_provincial|cypher_run|run_provincial_parity|ssh.*cypher@|ssh.*-R.*m1|consolidate_schema" \ +ps -ef | grep -E "wsgs_dispatch|cypher_run|wsgs_run_host|ssh.*cypher@|ssh.*-R.*m1|schema_consolidate" \ | grep -v grep | awk '{print $2}' | xargs -r kill -9 2>/dev/null || true sleep 2 [ "$SKIP_M1" = "0" ] && ssh m1 'pkill -9 -f "Rscript.*run_provincial" 2>/dev/null' 2>&1 || true @@ -71,12 +96,28 @@ if [ "$SKIP_CY" = "0" ]; then fi echo " ✓ killed" -# --- Step 2-4: drop stale schemas + fresh, parallel across hosts --- -# Combined DROP block: drops working_*, fresh_*, AND fresh itself. -DROP_SQL="SELECT 'DROP SCHEMA \"' || schema_name || '\" CASCADE' FROM information_schema.schemata WHERE (schema_name LIKE 'working%' OR schema_name LIKE 'fresh_%' OR schema_name = 'fresh') AND schema_name NOT IN ('bcfishpass_ref') \\gexec +# --- Step 2-4: drop stale schemas, parallel across hosts --- +# Default (heuristic) mode: drops working_*, fresh_*, AND fresh +# itself; recreates empty `fresh`. +# +# Scoped mode (--schemas=A,B,C): drops ONLY the listed exact schemas; +# does NOT recreate `fresh`. Use for per-bundle pre-cleans before subset +# dispatches (e.g. --schemas=fresh_default). +if [ -n "$SCOPED_SCHEMAS" ]; then + # Build a literal IN-list of schema names, double-quoted for safety. + SCOPED_IN=$(echo "$SCOPED_SCHEMAS" | awk -F',' '{ + for(i=1;i<=NF;i++) { + gsub(/^[ \t]+|[ \t]+$/, "", $i) + printf("%s\047%s\047", (i==1?"":","), $i) + } + }') + DROP_SQL="SELECT 'DROP SCHEMA \"' || schema_name || '\" CASCADE' FROM information_schema.schemata WHERE schema_name IN ($SCOPED_IN) \\gexec" + echo "--- step 2-4 (scoped): drop schemas [$SCOPED_SCHEMAS], parallel ---" +else + DROP_SQL="SELECT 'DROP SCHEMA \"' || schema_name || '\" CASCADE' FROM information_schema.schemata WHERE (schema_name LIKE 'working%' OR schema_name LIKE 'fresh_%' OR schema_name = 'fresh') AND schema_name NOT IN ('bcfishpass_ref') \\gexec CREATE SCHEMA fresh;" - -echo "--- step 2-4: drop stale schemas + recreate fresh, parallel ---" + echo "--- step 2-4: drop stale schemas + recreate fresh, parallel ---" +fi ( PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -d fwapg < "/tmp/clean_m4.log" 2>&1 @@ -107,6 +148,15 @@ wait echo " ✓ schemas dropped + fresh recreated empty" # --- Step 5: reload modelled_stream_crossings via snapshot_bcfp.sh --force --- +# Skipped under --schemas= (scoped mode): canonical fresh schema wasn't +# touched, so modelled_stream_crossings is still present. +if [ -n "$SCOPED_SCHEMAS" ]; then + echo "--- step 5: SKIPPED (scoped mode — canonical fresh untouched) ---" + echo + echo "=== state_clean.sh complete (scoped: [$SCOPED_SCHEMAS]) $(date -u +%Y-%m-%dT%H:%M:%SZ) ===" + exit 0 +fi + echo "--- step 5: snapshot_bcfp.sh --force on all hosts ---" ( @@ -154,4 +204,4 @@ if [ "$SKIP_CY" = "0" ]; then done fi -echo "=== province_clean.sh complete $(date -u +%Y-%m-%dT%H:%M:%SZ) ===" +echo "=== state_clean.sh complete $(date -u +%Y-%m-%dT%H:%M:%SZ) ===" diff --git a/data-raw/trifecta_smoke.sh b/data-raw/trifecta_smoke.sh index 863324b..57afb4a 100755 --- a/data-raw/trifecta_smoke.sh +++ b/data-raw/trifecta_smoke.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Smoke variant of trifecta_provincial.sh — one small WSG per host. +# Smoke variant of wsgs_dispatch.sh — one small WSG per host. # # Thin shim that calls the production orchestrator with explicit per-host # bucket overrides. Goal: exercise EVERY code path the full provincial run @@ -19,7 +19,7 @@ # cy2 → BABL (small generic) # cy3 → BULL (small generic) # -# All other flags pass through to trifecta_provincial.sh. +# All other flags pass through to wsgs_dispatch.sh. set -euo pipefail @@ -32,7 +32,7 @@ for arg in "$@"; do --cy-workspaces=*) CY_WORKSPACES="${arg#--cy-workspaces=}" ;; --m4-bucket=*|--m1-bucket=*|--cy-bucket=*|--cy[1-9]-bucket=*) echo "ERROR: trifecta_smoke.sh does not accept manual bucket overrides." >&2 - echo " Smoke picks one small WSG per host. Use trifecta_provincial.sh directly to override." >&2 + echo " Smoke picks one small WSG per host. Use wsgs_dispatch.sh directly to override." >&2 exit 2 ;; *) @@ -89,7 +89,7 @@ find "$SMOKE_DIR" -maxdepth 1 -name '*.rds' 2>/dev/null | sort > "$PRE_RDS_LIST" # Run the orchestrator. Don't exec — we want control flow back so we # can run the smoke-pass assertion afterward. ORCH_RC=0 -bash "$SCRIPT_DIR/trifecta_provincial.sh" \ +bash "$SCRIPT_DIR/wsgs_dispatch.sh" \ --m4-bucket=DEAD \ --m1-bucket=ELKR \ --cy-workspaces="$CY_WORKSPACES" \ @@ -134,11 +134,11 @@ if [ -n "$ERR_LINE" ]; then echo "" >&2 echo "[smoke] FAILED: $N WSG(s) errored: $WSGS" >&2 echo "[smoke] inspect logs:" >&2 - echo " data-raw/logs/_trifecta_provincial_*.txt (orchestrator-side)" >&2 + echo " data-raw/logs/_wsgs_dispatch_*.txt (orchestrator-side)" >&2 echo " rtj/scripts/cypher/logs/_cypher-run_*.txt (cypher-side R output)" >&2 - echo "[smoke] DO NOT dispatch trifecta_provincial.sh until these are fixed." >&2 + echo "[smoke] DO NOT dispatch wsgs_dispatch.sh until these are fixed." >&2 exit 5 fi -echo "[smoke] PASS: all $(echo "$NEW_RDS" | wc -l | tr -d ' ') new RDS are successful tibbles. Safe to dispatch trifecta_provincial.sh." +echo "[smoke] PASS: all $(echo "$NEW_RDS" | wc -l | tr -d ' ') new RDS are successful tibbles. Safe to dispatch wsgs_dispatch.sh." exit $ORCH_RC diff --git a/data-raw/wsg_compare.R b/data-raw/wsg_compare.R index af60198..64de942 100644 --- a/data-raw/wsg_compare.R +++ b/data-raw/wsg_compare.R @@ -2,7 +2,7 @@ # # Compare-only wrapper around `link::lnk_compare_rollup(reference = "bcfishpass")` # for the targets pipeline in data-raw/_targets.R and the orchestrator -# scripts (run_provincial_parity.R, trifecta_*.sh). +# scripts (wsgs_run_host.R, trifecta_*.sh). # # Reads persisted state in .streams + streams_habitat_ # (written by `wsg_pipeline_run.R` or any prior modelling call), queries diff --git a/data-raw/wsg_pipeline_run.R b/data-raw/wsg_pipeline_run.R index 29005d8..fefc702 100644 --- a/data-raw/wsg_pipeline_run.R +++ b/data-raw/wsg_pipeline_run.R @@ -2,7 +2,7 @@ # # Modelling-only wrapper around `link::lnk_pipeline_run()` for the # targets pipeline (data-raw/_targets.R) and the orchestrator scripts -# (run_provincial_parity.R, trifecta_*.sh). +# (wsgs_run_host.R, trifecta_*.sh). # # Writes per-WSG segment-level data into the persistent # .streams + per-species streams_habitat_ + barriers diff --git a/data-raw/trifecta_provincial.sh b/data-raw/wsgs_dispatch.sh similarity index 85% rename from data-raw/trifecta_provincial.sh rename to data-raw/wsgs_dispatch.sh index 20d185e..70029f9 100755 --- a/data-raw/trifecta_provincial.sh +++ b/data-raw/wsgs_dispatch.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash # Provincial parity orchestrator — dispatch across M4 + M1 + N cyphers. -# Each host runs `run_provincial_parity.R --wsgs= --config=` +# Each host runs `wsgs_run_host.R --wsgs= --config=` # (resume-safe; skips WSGs whose RDS already exists). After all hosts # finish, pulls every host's RDS files back to M4, binds them, and writes # `_annotated.csv` against `research/bcfp_divergence_taxonomy.yml`. @@ -13,14 +13,22 @@ # Per-host `---bucket=` overrides still take precedence. # # Usage: -# ./trifecta_provincial.sh # 3-host default: M4 + M1 + 1 cypher -# ./trifecta_provincial.sh --with-mapping-code # per-WSG mapping_code lens -# ./trifecta_provincial.sh --cy-workspaces=job1,job2,job3 # 5-host: 3 cyphers +# ./wsgs_dispatch.sh # 3-host default: M4 + M1 + 1 cypher +# ./wsgs_dispatch.sh --with-mapping-code # per-WSG mapping_code lens +# ./wsgs_dispatch.sh --cy-workspaces=job1,job2,job3 # 5-host: 3 cyphers +# ./wsgs_dispatch.sh --wsgs=ADMS,BULK,DEAD --no-cyphers # M4+M1, 3 WSGs +# ./wsgs_dispatch.sh --force --no-cyphers # force re-run, M4+M1 only # # CLI flags: # --config= bundle (default: bcfishpass) # --schema= override cfg$pipeline$schema # --rds-dir= override per-bundle RDS dir +# --wsgs= restrict to a subset of the bundle's WSG list +# (intersected with bundle presence-filtered set; +# unknown WSGs error loud) +# --no-cyphers skip cypher hosts entirely; dispatch M4+M1 only +# --force forward --force to the per-host Rscript +# (bypasses both PG-state and RDS resume gates) # --host-speeds= per-host speed factor vs M4 (default: m4=1.0,m1=0.83,cy=1.83). # Higher = slower. Used in LPT bucket projection. # Per-cypher overrides via --host-speeds=...,cy1=1.83,cy2=2.10 @@ -29,10 +37,11 @@ # --cy-bucket= single-cypher override (only valid with 1 workspace) # --cy-workspaces= comma-list of cypher tofu workspaces (default: "default") # --cyN-bucket= per-cypher override (1-indexed, e.g. --cy1-bucket=...) -# --with-mapping-code pass through to run_provincial_parity.R +# --with-mapping-code pass through to wsgs_run_host.R # --skip-preflight skip version-match check (debug only) # -# Estimated wall: ~2 hours single-cypher, ~50-60 min 3-cypher. +# Estimated wall: ~2 hours single-cypher, ~50-60 min 3-cypher, +# ~30-40 min M4+M1 only (16-WSG default-bundle smoke test). set -euo pipefail @@ -59,12 +68,18 @@ HOST_SPEEDS="m4=1.0,m1=0.79,cy=1.23" declare -A CYN_BUCKETS=() WITH_MAPPING_CODE="" SKIP_PREFLIGHT=0 +WSGS_FILTER="" +NO_CYPHERS=0 +FORCE_FLAG="" for arg in "$@"; do case "$arg" in --config=*) CONFIG="${arg#--config=}" ;; --schema=*) SCHEMA="${arg#--schema=}" ;; --rds-dir=*) RDS_DIR="${arg#--rds-dir=}" ;; + --wsgs=*) WSGS_FILTER="${arg#--wsgs=}" ;; + --no-cyphers) NO_CYPHERS=1 ;; + --force) FORCE_FLAG="--force" ;; --host-speeds=*) HOST_SPEEDS="${arg#--host-speeds=}" ;; --m4-bucket=*) M4_OVERRIDE="${arg#--m4-bucket=}" ;; --m1-bucket=*) M1_OVERRIDE="${arg#--m1-bucket=}" ;; @@ -79,13 +94,28 @@ for arg in "$@"; do esac done +# --no-cyphers wipes the cypher workspace list so the rest of the +# script sees a zero-cypher plan. Bash array init from an empty string +# via `read -r -a` produces a single-element array containing "" — +# clear it explicitly to get a true empty list. +if [ "$NO_CYPHERS" -eq 1 ]; then + CY_WORKSPACES="" +fi + EXTRA_ARGS="--config=$CONFIG" [ -n "$SCHEMA" ] && EXTRA_ARGS="$EXTRA_ARGS --schema=$SCHEMA" [ -n "$RDS_DIR" ] && EXTRA_ARGS="$EXTRA_ARGS --rds-dir=$RDS_DIR" [ -n "$WITH_MAPPING_CODE" ] && EXTRA_ARGS="$EXTRA_ARGS $WITH_MAPPING_CODE" +[ -n "$FORCE_FLAG" ] && EXTRA_ARGS="$EXTRA_ARGS $FORCE_FLAG" -# Parse cypher workspace list into array -IFS=',' read -r -a CY_WS_ARR <<< "$CY_WORKSPACES" +# Parse cypher workspace list into array. Empty CY_WORKSPACES (set by +# --no-cyphers) yields N_CY=0; all `for ((i=0; i "$SPLIT_R" < 0L) { + stop("--wsgs contains WSGs not in the bundle presence-filtered set: ", + paste(unknown, collapse = ", "), call. = FALSE) + } + all_wsgs <- sort(intersect(all_wsgs, requested)) + cat("[LPT] --wsgs subset: ", length(all_wsgs), " of ", + length(requested), " requested kept\n", sep = "") +} + # Parse --host-speeds=m4=1.0,m1=0.83,cy=1.83 into a named numeric vector parse_speeds <- function(s) { pairs <- strsplit(s, ",", fixed = TRUE)[[1]] @@ -154,10 +200,25 @@ if (!all(c("m4","m1","cy") %in% names(speeds))) { # Hosts in the plan: m4, m1, cy1..cyN_CY (each cypher workspace is its # own host). Per-cypher speed: take \`cyN\` from --host-speeds if present, # else fall back to the generic \`cy\` factor. +# +# Phantom-cy guard for the --no-cyphers / empty workspace list case: +# R's \`paste0("cy", integer(0))\` returns \`"cy"\` (length 1) due to +# constant recycling, which would put a non-existent cypher in +# host_keys. Three-branch the construction to keep n_cy = 0 honest. cy_ws_csv <- "$CY_WORKSPACES" -cy_ws <- strsplit(cy_ws_csv, ",", fixed = TRUE)[[1]] +cy_ws <- if (nzchar(cy_ws_csv)) { + strsplit(cy_ws_csv, ",", fixed = TRUE)[[1]] +} else { + character(0) +} n_cy <- length(cy_ws) -cy_host_keys <- if (n_cy == 1L) "cy" else paste0("cy", seq_len(n_cy)) +cy_host_keys <- if (n_cy == 0L) { + character(0) +} else if (n_cy == 1L) { + "cy" +} else { + paste0("cy", seq_len(n_cy)) +} host_keys <- c("m4", "m1", cy_host_keys) host_factor <- numeric(length(host_keys)) names(host_factor) <- host_keys @@ -295,7 +356,12 @@ for (h in host_keys) { } SPLIT_EOF -SPLIT_OUT=$(Rscript "$SPLIT_R" 2>&1) +SPLIT_OUT=$(Rscript "$SPLIT_R" 2>&1) || { + rc=$? + echo "ERROR: split/LPT R script failed (rc=$rc):" >&2 + echo "$SPLIT_OUT" >&2 + exit "$rc" +} echo "$SPLIT_OUT" | grep -E "^\[LPT\]" || true M4_WSGS=$(echo "$SPLIT_OUT" | awk -F'=' '$1=="M4" {print $2}') @@ -327,7 +393,7 @@ for ((i=0; i "$CY_SHELL" < "$M4_SHELL" < "$M4_LOG" 2>&1 ) & @@ -513,10 +579,10 @@ M4_PID=$! # - M1 must allow reverse port-forwards (OpenSSH default = yes). # - Nothing else listening on M1's port 63333 (M1 has no persistent # tunnel; this is free in practice). -M1_LOG="$LOG_DIR/${TS}_trifecta_provincial_m1.txt" +M1_LOG="$LOG_DIR/${TS}_wsgs_dispatch_m1.txt" ( ssh -o ServerAliveInterval=60 -o ServerAliveCountMax=10 \ -R 63333:127.0.0.1:63333 m1 \ - "cd ~/Projects/repo/link/data-raw && Rscript run_provincial_parity.R '--wsgs=$M1_WSGS' $EXTRA_ARGS" \ + "cd ~/Projects/repo/link/data-raw && Rscript wsgs_run_host.R '--wsgs=$M1_WSGS' $EXTRA_ARGS" \ > "$M1_LOG" 2>&1 ) & M1_PID=$! @@ -525,7 +591,7 @@ declare -a CY_PIDS=() declare -a CY_LOGS=() for ((i=0; i "$CY_LOG" 2>&1 ) & @@ -555,7 +621,7 @@ END=$(date +%s) ELAPSED=$((END - START)) echo "============================================" -printf '[trifecta-provincial] elapsed: %dh%02dm%02ds\n' \ +printf '[wsgs-dispatch] elapsed: %dh%02dm%02ds\n' \ $((ELAPSED/3600)) $(((ELAPSED%3600)/60)) $((ELAPSED%60)) printf ' m4 exit=%d log=%s\n' "$M4_EXIT" "$M4_LOG" printf ' m1 exit=%d log=%s\n' "$M1_EXIT" "$M1_LOG" @@ -584,7 +650,7 @@ for ((i=0; i/dev/null | head -1 || true) if [ -n "$CY_R_LOG" ] && [ -f "$CY_R_LOG" ]; then - cp "$CY_R_LOG" "$LOG_DIR/${TS}_trifecta_provincial_cypher_${WS}_R.txt" + cp "$CY_R_LOG" "$LOG_DIR/${TS}_wsgs_dispatch_cypher_${WS}_R.txt" fi done @@ -594,7 +660,7 @@ done # Each cypher: via TF_WORKSPACE-resolved droplet IP # --------------------------------------------------------------------------- echo -echo "[trifecta-provincial] pulling m1 RDS files" +echo "[wsgs-dispatch] pulling m1 RDS files" scp -q "m1:~/Projects/repo/link/data-raw/logs/$RDS_DIR_NAME/*.rds" \ "$REPO_ROOT/data-raw/logs/$RDS_DIR_NAME/" 2>&1 | tail -3 || true @@ -602,10 +668,10 @@ for ((i=0; i/dev/null || echo "") if [ -z "$CY_IP" ]; then - echo "[trifecta-provincial] WARN: workspace '$WS' has no droplet_ip — skipping pull" + echo "[wsgs-dispatch] WARN: workspace '$WS' has no droplet_ip — skipping pull" continue fi - echo "[trifecta-provincial] pulling cypher[$WS] RDS files (cypher@$CY_IP)" + echo "[wsgs-dispatch] pulling cypher[$WS] RDS files (cypher@$CY_IP)" scp -q "cypher@$CY_IP:/home/cypher/Projects/repo/link/data-raw/logs/$RDS_DIR_NAME/*.rds" \ "$REPO_ROOT/data-raw/logs/$RDS_DIR_NAME/" 2>&1 | tail -3 || true done @@ -636,13 +702,13 @@ cat(n_ok, n_err) N_OK=$(echo "$RDS_COUNTS" | awk '{print $1+0}') N_ERR=$(echo "$RDS_COUNTS" | awk '{print $2+0}') N_OK=${N_OK:-0}; N_ERR=${N_ERR:-0} - echo "[trifecta-provincial] local RDS: $TOTAL_RDS / $TOTAL pulled — $N_OK OK, $N_ERR errors" + echo "[wsgs-dispatch] local RDS: $TOTAL_RDS / $TOTAL pulled — $N_OK OK, $N_ERR errors" if [ "$N_ERR" -gt 0 ]; then - echo "[trifecta-provincial] WARN: $N_ERR error-stub RDS found. Inspect cypher-side R logs:" - ls "$LOG_DIR/${TS}_trifecta_provincial_cypher_"*_R.txt 2>/dev/null | sed 's/^/ /' || true + echo "[wsgs-dispatch] WARN: $N_ERR error-stub RDS found. Inspect cypher-side R logs:" + ls "$LOG_DIR/${TS}_wsgs_dispatch_cypher_"*_R.txt 2>/dev/null | sed 's/^/ /' || true fi else - echo "[trifecta-provincial] local RDS file count: 0 / $TOTAL (no files pulled — all hosts failed?)" + echo "[wsgs-dispatch] local RDS file count: 0 / $TOTAL (no files pulled — all hosts failed?)" fi # --------------------------------------------------------------------------- @@ -655,7 +721,7 @@ TAXONOMY="$REPO_ROOT/research/bcfp_divergence_taxonomy.yml" if [ -f "$TAXONOMY" ]; then echo - echo "[trifecta-provincial] aggregating + annotating $TOTAL_RDS RDS files" + echo "[wsgs-dispatch] aggregating + annotating $TOTAL_RDS RDS files" Rscript - < 0) { cat(" acceptance bar met.\n") } RSCRIPT_EOF - echo "[trifecta-provincial] annotated CSV: $ANNOTATED_CSV" + echo "[wsgs-dispatch] annotated CSV: $ANNOTATED_CSV" else - echo "[trifecta-provincial] WARN: taxonomy YAML not at $TAXONOMY — skipping annotation" + echo "[wsgs-dispatch] WARN: taxonomy YAML not at $TAXONOMY — skipping annotation" fi # --------------------------------------------------------------------------- diff --git a/data-raw/run_provincial_parity.R b/data-raw/wsgs_run_host.R similarity index 98% rename from data-raw/run_provincial_parity.R rename to data-raw/wsgs_run_host.R index 0657fb7..a65d828 100644 --- a/data-raw/run_provincial_parity.R +++ b/data-raw/wsgs_run_host.R @@ -28,7 +28,7 @@ # These are accepted as known gaps in this baseline. # # Run from data-raw/: -# Rscript run_provincial_parity.R > logs/_provincial_parity.txt 2>&1 & +# Rscript wsgs_run_host.R > logs/_provincial_parity.txt 2>&1 & suppressPackageStartupMessages({ library(link); library(fresh); library(dplyr); library(DBI); library(RPostgres) @@ -121,7 +121,7 @@ cat("Output dir :", out_dir, "\n\n") t_total <- Sys.time() # Per-WSG timings CSV (one row appended per WSG completion). -# Drives data-raw/balance_provincial_buckets.R for future LPT planning; +# Drives data-raw/buckets_balance.R for future LPT planning; # replaces the regex-parse-the-text-log path. Host-tagged via Sys.info() # so multi-host trifecta runs produce comparable rows. host_id <- Sys.info()[["nodename"]] @@ -203,7 +203,7 @@ stamp_bcfp_baseline <- function(config_name, link_schema) { bcfp_model_run_id = bcfp$model_run_id, bcfp_model_version = bcfp$model_version, bcfp_date_completed = bcfp$date_completed, - notes = "auto-stamped at run_provincial_parity.R start", + notes = "auto-stamped at wsgs_run_host.R start", stringsAsFactors = FALSE) write.table(row, csv_path, sep = ",", row.names = FALSE, col.names = FALSE, quote = FALSE, append = TRUE) @@ -343,7 +343,7 @@ cat("WSGs completed:", length(list.files(out_dir, pattern = "\\.rds$")), "\n") # Post-loop annotation: bind all per-WSG RDS rollups, annotate against # the bcfp divergence taxonomy, write `__annotated.csv`. # Each host writes its own bucket's annotated CSV. The orchestrator -# (trifecta_provincial.sh) does the province-wide aggregate after the +# (wsgs_dispatch.sh) does the province-wide aggregate after the # RDS pull-back step. # # Skipped if the taxonomy YAML doesn't exist relative to the script's diff --git a/data-raw/wsgs_run_pipeline.sh b/data-raw/wsgs_run_pipeline.sh new file mode 100755 index 0000000..4b61987 --- /dev/null +++ b/data-raw/wsgs_run_pipeline.sh @@ -0,0 +1,412 @@ +#!/usr/bin/env bash +# wsgs_run_pipeline.sh — top-level wrapper for the full provincial parity run. +# +# Orchestrates the 10-step sequence documented in +# research/post_compact_provincial_handoff.md: +# pre-flight → fail loud before any spend +# 1+2. snapshot_bcfp.sh on M4 + M1 (parallel) +# 3. cypher_up.sh job1/job2/job3 (parallel) +# 4. cypher_prep.sh on each cypher (parallel) +# 5. runs_archive.sh on all 5 hosts (parallel) +# 6. SMOKE — fail-fast +# 7. FULL DISPATCH +# 8. acceptance bar check (UNEXPLAINED at |diff_pct|>=2% == 0) +# 9. consolidate fresh schema → M4 +# 10. BURN CYPHERS — fires via trap EXIT +# +# Cypher burn is `trap EXIT` so it fires regardless of failure mode in +# steps 6-9. Steps 1-3 set CYPHERS_UP=1 once cyphers exist, so the trap +# only attempts burn when there's something to burn. +# +# Usage: +# bash data-raw/wsgs_run_pipeline.sh [flags] +# +# Flags: +# --wsgs=A,B,C restrict to a WSG subset (full bundle if omitted) +# --config= bundle name (default: bcfishpass) +# --schema= override cfg$pipeline$schema (default: bundle default) +# --no-cyphers M4+M1 only — skip cypher spin/prep/burn entirely +# --force forward --force to per-host Rscript (bypass resume gates) +# --skip-smoke skip the smoke pre-check +# --no-mapping-code drop the mapping_code lens +# --keep-cyphers don't burn cyphers on exit (debug) +# +# Total wall: +# ~95-110 min for full provincial (3 cyphers) +# ~30-40 min for --wsgs=<16-WSG-set> --no-cyphers (M4+M1 only) +# Cypher cost: ~$1-2 per full provincial; $0 with --no-cyphers. + +set -euo pipefail + +# --- args --- +SKIP_SMOKE=0 +NO_MAPPING=0 +KEEP_CYPHERS=0 +WSGS_FILTER="" +CONFIG_NAME="bcfishpass" +SCHEMA="" +NO_CYPHERS=0 +FORCE_FLAG="" +for arg in "$@"; do + case "$arg" in + --skip-smoke) SKIP_SMOKE=1 ;; + --with-mapping-code) ;; # default-on; accept explicitly for symmetry with --no-mapping-code + --no-mapping-code) NO_MAPPING=1 ;; + --keep-cyphers) KEEP_CYPHERS=1 ;; + --wsgs=*) WSGS_FILTER="${arg#--wsgs=}" ;; + --config=*) CONFIG_NAME="${arg#--config=}" ;; + --schema=*) SCHEMA="${arg#--schema=}" ;; + --no-cyphers) NO_CYPHERS=1 ;; + --force) FORCE_FLAG="--force" ;; + *) echo "FATAL: unknown arg: $arg" >&2; exit 1 ;; + esac +done + +MAPPING_FLAG="--with-mapping-code" +[ "$NO_MAPPING" = "1" ] && MAPPING_FLAG="" + +# Build the passthrough flag string for wsgs_dispatch.sh + trifecta_smoke.sh. +DISPATCH_FLAGS="" +[ -n "$WSGS_FILTER" ] && DISPATCH_FLAGS="$DISPATCH_FLAGS --wsgs=$WSGS_FILTER" +[ -n "$CONFIG_NAME" ] && DISPATCH_FLAGS="$DISPATCH_FLAGS --config=$CONFIG_NAME" +[ -n "$SCHEMA" ] && DISPATCH_FLAGS="$DISPATCH_FLAGS --schema=$SCHEMA" +[ "$NO_CYPHERS" = "1" ] && DISPATCH_FLAGS="$DISPATCH_FLAGS --no-cyphers" +[ -n "$FORCE_FLAG" ] && DISPATCH_FLAGS="$DISPATCH_FLAGS $FORCE_FLAG" + +REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$REPO_ROOT" +TS="$(date -u +%Y%m%d_%H%M%S)" +LOG_DIR="$REPO_ROOT/data-raw/logs/wsgs_run_pipeline" +mkdir -p "$LOG_DIR" +LOG="$LOG_DIR/${TS}_wsgs_run_pipeline.log" +exec > >(tee -a "$LOG") 2>&1 + +START_EPOCH=$(date +%s) +echo "=== wsgs_run_pipeline.sh $TS ===" +echo " log: $LOG" +echo " config: $CONFIG_NAME" +[ -n "$SCHEMA" ] && echo " schema: $SCHEMA" +[ -n "$WSGS_FILTER" ] && echo " wsgs: $WSGS_FILTER" +echo " no-cyphers: $([ "$NO_CYPHERS" = "0" ] && echo no || echo YES)" +echo " force: $([ -n "$FORCE_FLAG" ] && echo YES || echo no)" +echo " mapping: $([ "$NO_MAPPING" = "0" ] && echo with || echo without)" +echo " smoke: $([ "$SKIP_SMOKE" = "0" ] && echo on || echo SKIPPED)" +echo " keep-cy: $([ "$KEEP_CYPHERS" = "0" ] && echo no || echo YES)" + +# Auto-skip smoke when the smoke harness's preconditions are not met. +# trifecta_smoke.sh assumes 3 cypher workspaces (job1/job2/job3) and a +# fixed per-host WSG triplet; both break under --no-cyphers or --wsgs. +# Setting SKIP_SMOKE here (after the log redirect) keeps the notice in +# the log for post-hoc inspection. +if [ "$SKIP_SMOKE" = "0" ] && { [ "$NO_CYPHERS" = "1" ] || [ -n "$WSGS_FILTER" ]; }; then + echo "[auto-skip-smoke] --no-cyphers or --wsgs is set; trifecta_smoke.sh" + echo " assumptions don't hold — skipping Step 6." + SKIP_SMOKE=1 +fi + +# --- trap: burn cyphers on exit, but only if we ever spun them --- +CYPHERS_UP=0 +burn_cyphers() { + local rc=$? + if [ "$CYPHERS_UP" = "0" ]; then + echo "=== trap EXIT: no cyphers spun, nothing to burn ===" + return $rc + fi + if [ "$KEEP_CYPHERS" = "1" ]; then + echo "=== trap EXIT: --keep-cyphers given; NOT burning ===" + echo " manually: cd ~/Projects/repo/rtj/scripts/cypher && \\" + echo " for WS in job1 job2 job3; do ./cypher_down.sh --workspace \$WS & done; wait" + return $rc + fi + echo "=== Step 10: BURN CYPHERS (trap EXIT, mandatory) ===" + cd ~/Projects/repo/rtj/scripts/cypher + for WS in job1 job2 job3; do + ./cypher_down.sh --workspace "$WS" > "$LOG_DIR/${TS}_burn_$WS.log" 2>&1 & + done + wait + echo "--- destruction verification ---" + local clean=1 + for WS in job1 job2 job3; do + local n + n=$(cd ~/Projects/repo/rtj/env/do/dev/cypher && TF_WORKSPACE="$WS" tofu state list 2>/dev/null | wc -l | tr -d ' ') + echo " cy[$WS]: $n tofu resources (expect 0)" + [ "$n" = "0" ] || clean=0 + done + if doctl compute droplet list --no-header 2>/dev/null | grep -qi cypher; then + echo " ✗ doctl still shows cypher droplets:" + doctl compute droplet list --no-header 2>/dev/null | grep -i cypher + clean=0 + else + echo " ✓ doctl: no cypher droplets" + fi + [ "$clean" = "1" ] && echo " ✓ burn clean" || echo " ✗ BURN INCOMPLETE — investigate before next run" + return $rc +} +trap burn_cyphers EXIT + +# --- pre-flight --- +echo "=== pre-flight ===" +fail=0 +pg_isready -h localhost -p 63333 >/dev/null 2>&1 || { echo " ✗ bcfp tunnel down (:63333)"; fail=1; } +pg_isready -h localhost -p 5432 >/dev/null 2>&1 || { echo " ✗ local fwapg down (:5432)"; fail=1; } +Rscript -e 'q(status = if (nchar(Sys.getenv("PG_PASS_SHARE")) > 0) 0 else 1)' >/dev/null 2>&1 \ + || { echo " ✗ PG_PASS_SHARE not visible to R (check ~/.Renviron)"; fail=1; } +ssh -o ConnectTimeout=3 m1 'hostname' >/dev/null 2>&1 || { echo " ✗ m1 ssh failed"; fail=1; } +doctl compute droplet list --no-header >/dev/null 2>&1 || { echo " ✗ doctl not authed"; fail=1; } +( cd ~/Projects/repo/rtj/env/do/dev/cypher && tofu workspace list >/dev/null 2>&1 ) \ + || { echo " ✗ tofu workspace list failed"; fail=1; } +[ "$fail" = "0" ] || { echo "FATAL: pre-flight failed; aborting before spend"; exit 1; } +echo " ✓ pre-flight clean" + +# --- Step 0: pre-clean target schema (when --schema= is set) --- +# Drops $SCHEMA on every host before dispatch so the per-WSG pipeline +# writes land into a clean slate AND so consolidate's pg_dump source +# contains only the current run's bucket (no leftover WSGs from prior +# runs colliding with destination data). Uses state_clean.sh in +# scoped mode (--schemas=...) which skips the canonical-fresh wipe and +# the snapshot_bcfp.sh reload. +# +# Skipped when --schema is empty (writes go to the bundle's default +# schema, typically the canonical `fresh` — which Step 1+2's snapshot +# already handles). +if [ -n "$SCHEMA" ]; then + echo "=== Step 0: pre-clean target schema [$SCHEMA] ===" + CLEAN_ARGS="--schemas=$SCHEMA" + [ "$NO_CYPHERS" = "1" ] && CLEAN_ARGS="$CLEAN_ARGS --skip-cy" + bash data-raw/state_clean.sh $CLEAN_ARGS > "$LOG_DIR/${TS}_preclean.log" 2>&1 || { + echo "FATAL: pre-clean failed; see $LOG_DIR/${TS}_preclean.log" + exit 1 + } + echo " ✓ pre-cleaned" +fi + +# --- Step 1+2: snapshot_bcfp.sh on M4 + M1 (parallel) --- +echo "=== Step 1+2: snapshot_bcfp.sh --force on M4 + M1 ===" +( PGUSER=postgres PGPASSWORD=postgres PGHOST=localhost PGPORT=5432 PGDATABASE=fwapg \ + bash data-raw/snapshot_bcfp.sh --force > "$LOG_DIR/${TS}_snapshot_m4.log" 2>&1 ) & +M4_PID=$! +( ssh m1 'export PGUSER=postgres PGPASSWORD=postgres PGHOST=localhost PGPORT=5432 PGDATABASE=fwapg && \ + cd ~/Projects/repo/link && bash data-raw/snapshot_bcfp.sh --force' \ + > "$LOG_DIR/${TS}_snapshot_m1.log" 2>&1 ) & +M1_PID=$! +wait $M4_PID || { echo "FATAL: M4 snapshot failed; see $LOG_DIR/${TS}_snapshot_m4.log"; exit 1; } +wait $M1_PID || { echo "FATAL: M1 snapshot failed; see $LOG_DIR/${TS}_snapshot_m1.log"; exit 1; } +echo " ✓ snapshots done" + +# --- Step 3: spin 3 cyphers (parallel) — skipped under --no-cyphers --- +declare -A CY_IP +if [ "$NO_CYPHERS" = "0" ]; then + echo "=== Step 3: cypher_up.sh job1/job2/job3 ===" + cd ~/Projects/repo/rtj/scripts/cypher + for WS in job1 job2 job3; do + ./cypher_up.sh --workspace "$WS" > "$LOG_DIR/${TS}_up_$WS.log" 2>&1 & + done + wait + cd "$REPO_ROOT" + for WS in job1 job2 job3; do + IP=$(cd ~/Projects/repo/rtj/env/do/dev/cypher && TF_WORKSPACE="$WS" tofu output -raw droplet_ip 2>/dev/null) || { + echo "FATAL: tofu output droplet_ip failed for $WS; see $LOG_DIR/${TS}_up_$WS.log" + exit 1 + } + [ -n "$IP" ] || { echo "FATAL: empty droplet_ip for $WS"; exit 1; } + CY_IP[$WS]="$IP" + echo " cy[$WS] = $IP" + done + CYPHERS_UP=1 # trap EXIT will now attempt burn + + # --- Step 4: per-cypher prep (parallel) --- + echo "=== Step 4: cypher_prep.sh on all 3 cyphers ===" + for WS in job1 job2 job3; do + IP="${CY_IP[$WS]}" + ( scp -q data-raw/cypher_prep.sh "cypher@$IP:/tmp/cypher_prep.sh" && \ + ssh "cypher@$IP" "bash /tmp/cypher_prep.sh" ) > "$LOG_DIR/${TS}_prep_$WS.log" 2>&1 & + done + wait + for WS in job1 job2 job3; do + if ! grep -q "snapshot_bcfp.sh: complete" "$LOG_DIR/${TS}_prep_$WS.log" 2>/dev/null; then + echo "FATAL: cypher[$WS] prep failed; see $LOG_DIR/${TS}_prep_$WS.log" + exit 1 + fi + done + echo " ✓ cyphers prepped" +else + echo "=== Step 3+4: SKIPPED (--no-cyphers) ===" +fi + +# --- Step 5: archive prior RDS — M4+M1 always, cyphers only when up --- +echo "=== Step 5: runs_archive.sh on all hosts ===" +bash data-raw/runs_archive.sh > "$LOG_DIR/${TS}_archive_m4.log" 2>&1 & +ssh m1 'cd ~/Projects/repo/link/data-raw && ./runs_archive.sh' \ + > "$LOG_DIR/${TS}_archive_m1.log" 2>&1 & +if [ "$NO_CYPHERS" = "0" ]; then + for WS in job1 job2 job3; do + IP="${CY_IP[$WS]}" + ssh "cypher@$IP" 'cd ~/Projects/repo/link/data-raw && ./runs_archive.sh' \ + > "$LOG_DIR/${TS}_archive_$WS.log" 2>&1 & + done +fi +wait +echo " ✓ archived" + +# --- Step 6: SMOKE (fail-fast) --- +if [ "$SKIP_SMOKE" = "0" ]; then + echo "=== Step 6: SMOKE ===" + cd "$REPO_ROOT/data-raw" + if ! bash trifecta_smoke.sh --cy-workspaces=job1,job2,job3 $MAPPING_FLAG \ + > "$LOG_DIR/${TS}_smoke.log" 2>&1; then + echo "FATAL: smoke FAILED; see $LOG_DIR/${TS}_smoke.log" + grep -E "smoke.*FAILED|smoke.*ERROR|SMOKE_ERR:" "$LOG_DIR/${TS}_smoke.log" | head -10 || true + exit 1 + fi + echo " ✓ smoke clean" + cd "$REPO_ROOT" +fi + +# --- Step 7: FULL DISPATCH --- +# When --no-cyphers OR --wsgs is set, omit --cy-workspaces so +# wsgs_dispatch.sh runs with the M4+M1-only plan it derives +# from DISPATCH_FLAGS (which includes --no-cyphers, --wsgs, etc.). +if [ "$NO_CYPHERS" = "0" ] && [ -z "$WSGS_FILTER" ]; then + TRIFECTA_CY_ARG="--cy-workspaces=job1,job2,job3" + echo "=== Step 7: full provincial dispatch (~80-95 min wall) ===" +else + TRIFECTA_CY_ARG="" + echo "=== Step 7: subset dispatch — see DISPATCH_FLAGS below ===" +fi +echo " DISPATCH_FLAGS=$DISPATCH_FLAGS" +cd "$REPO_ROOT/data-raw" +if ! bash wsgs_dispatch.sh $TRIFECTA_CY_ARG $DISPATCH_FLAGS $MAPPING_FLAG \ + > "$LOG_DIR/${TS}_full.log" 2>&1; then + echo "WARNING: wsgs_dispatch.sh exited non-zero; partial result may exist" + # don't exit — let acceptance + consolidate inspect what landed +fi +echo "--- full dispatch tail ---" +tail -15 "$LOG_DIR/${TS}_full.log" +cd "$REPO_ROOT" + +# --- Step 8: acceptance bar --- +# RDS dir is config-aware: bcfishpass → provincial_parity (legacy +# name, kept for back-compat); any other bundle → provincial_. +echo "=== Step 8: acceptance bar ===" +if [ "$CONFIG_NAME" = "bcfishpass" ]; then + RDS_DIR_NAME="provincial_parity" +else + RDS_DIR_NAME="provincial_${CONFIG_NAME}" +fi +ANN_CSV=$(ls -1t "data-raw/logs/$RDS_DIR_NAME"/*_annotated.csv 2>/dev/null | head -1 || true) +if [ -z "$ANN_CSV" ]; then + echo " ✗ no annotated.csv found — dispatch likely failed before annotation" + exit 1 +fi +N_UNEXP=$(Rscript -e " +ann <- read.csv('$ANN_CSV', stringsAsFactors=FALSE) +cat(nrow(ann[ann\$class == 'UNEXPLAINED' & abs(ann\$diff_pct) >= 2, ])) +") +echo " annotated: $ANN_CSV" +echo " UNEXPLAINED at |diff_pct|>=2%: $N_UNEXP" +if [ "$N_UNEXP" -gt 0 ]; then + echo " WARNING: $N_UNEXP UNEXPLAINED rows — surface to user; consolidate still proceeds" +fi + +# --- Step 9: consolidate target schema → M4 --- +# Target schema: --schema= if provided, else cfg$pipeline$schema for +# the bundle (best-effort lookup via Rscript). Sources list is built +# dynamically — M1 always present; cyphers only when --no-cyphers +# wasn't set. +echo "=== Step 9: consolidate target schema ===" +ORCH_LOG=$(ls -1t data-raw/logs/*_wsgs_dispatch_orchestrator.txt 2>/dev/null | head -1 || true) +if [ -z "$ORCH_LOG" ]; then + echo " ✗ no orchestrator log found — cannot extract per-host buckets" + exit 1 +fi +M1_BUCKET=$(grep '^ m1 bucket:' "$ORCH_LOG" | sed 's/.*bucket: //' || true) +if [ -z "$M1_BUCKET" ]; then + echo " ✗ failed to extract m1 bucket from $ORCH_LOG" + exit 1 +fi +if [ "$NO_CYPHERS" = "0" ]; then + CY1_BUCKET=$(grep '^ cypher\[job1\] bucket:' "$ORCH_LOG" | sed 's/.*bucket: //' || true) + CY2_BUCKET=$(grep '^ cypher\[job2\] bucket:' "$ORCH_LOG" | sed 's/.*bucket: //' || true) + CY3_BUCKET=$(grep '^ cypher\[job3\] bucket:' "$ORCH_LOG" | sed 's/.*bucket: //' || true) + if [ -z "$CY1_BUCKET" ] || [ -z "$CY2_BUCKET" ] || [ -z "$CY3_BUCKET" ]; then + echo " ✗ failed to extract cypher buckets from $ORCH_LOG" + echo " cy1=$CY1_BUCKET cy2=$CY2_BUCKET cy3=$CY3_BUCKET" + exit 1 + fi +fi + +# Resolve target schema name: explicit --schema wins, else look up +# cfg$pipeline$schema for the bundle. Explicit guards rather than a +# silent "fresh" fallback so a misconfigured --config= surfaces loud. +if [ -n "$SCHEMA" ]; then + TARGET_SCHEMA="$SCHEMA" +else + TARGET_SCHEMA=$(Rscript -e " + cfg <- link::lnk_config('$CONFIG_NAME') + s <- cfg\$pipeline\$schema + if (is.null(s) || !nzchar(s)) stop('cfg\$pipeline\$schema missing for bundle \"$CONFIG_NAME\"') + cat(s) + ") || { + echo " ✗ failed to resolve target schema for --config=$CONFIG_NAME" >&2 + echo " (lnk_config may be missing the bundle, or cfg\$pipeline\$schema is unset)" >&2 + exit 1 + } + if [ -z "$TARGET_SCHEMA" ] || [ "$TARGET_SCHEMA" = "NULL" ]; then + echo " ✗ lnk_config('$CONFIG_NAME')\$pipeline\$schema returned empty/NULL" >&2 + exit 1 + fi +fi +echo " target schema: $TARGET_SCHEMA" + +cd "$REPO_ROOT/data-raw" +if [ "$NO_CYPHERS" = "0" ]; then + SOURCES_R="list( + list(host = 'm1', via = 'docker', bucket = strsplit(Sys.getenv('M1_BUCKET'), ',')[[1]]), + list(host = paste0('cypher@', Sys.getenv('CY1_IP')), via = 'docker', bucket = strsplit(Sys.getenv('CY1_BUCKET'), ',')[[1]]), + list(host = paste0('cypher@', Sys.getenv('CY2_IP')), via = 'docker', bucket = strsplit(Sys.getenv('CY2_BUCKET'), ',')[[1]]), + list(host = paste0('cypher@', Sys.getenv('CY3_IP')), via = 'docker', bucket = strsplit(Sys.getenv('CY3_BUCKET'), ',')[[1]]) + )" + M1_BUCKET="$M1_BUCKET" CY1_BUCKET="$CY1_BUCKET" CY2_BUCKET="$CY2_BUCKET" CY3_BUCKET="$CY3_BUCKET" \ + CY1_IP="${CY_IP[job1]}" CY2_IP="${CY_IP[job2]}" CY3_IP="${CY_IP[job3]}" \ + TARGET_SCHEMA="$TARGET_SCHEMA" SOURCES_R="$SOURCES_R" \ + Rscript -e ' +suppressPackageStartupMessages({library(link)}) +source("schema_consolidate.R") +sources <- eval(parse(text = Sys.getenv("SOURCES_R"))) +result <- schema_consolidate(schema = Sys.getenv("TARGET_SCHEMA"), + sources = sources, backup = TRUE) +print(result) +saveRDS(result, "/tmp/consolidate_result.rds") +' > "$LOG_DIR/${TS}_consolidate.log" 2>&1 || { + echo " ✗ schema_consolidate.R failed; see $LOG_DIR/${TS}_consolidate.log" + exit 1 + } +else + # M1-only consolidate (no cyphers). + M1_BUCKET="$M1_BUCKET" TARGET_SCHEMA="$TARGET_SCHEMA" \ + Rscript -e ' +suppressPackageStartupMessages({library(link)}) +source("schema_consolidate.R") +result <- schema_consolidate( + schema = Sys.getenv("TARGET_SCHEMA"), + sources = list(list(host = "m1", via = "docker", + bucket = strsplit(Sys.getenv("M1_BUCKET"), ",")[[1]])), + backup = TRUE) +print(result) +saveRDS(result, "/tmp/consolidate_result.rds") +' > "$LOG_DIR/${TS}_consolidate.log" 2>&1 || { + echo " ✗ schema_consolidate.R failed; see $LOG_DIR/${TS}_consolidate.log" + exit 1 + } +fi +echo " ✓ consolidated (see $LOG_DIR/${TS}_consolidate.log)" +cd "$REPO_ROOT" + +# --- summary --- +END_EPOCH=$(date +%s) +WALL=$(( END_EPOCH - START_EPOCH )) +echo +echo "=== wsgs_run_pipeline.sh complete in ${WALL}s (~$((WALL/60))m) ===" +echo " annotated CSV: $ANN_CSV" +echo " UNEXPLAINED ≥2%: $N_UNEXP" +echo " trap EXIT will now burn cyphers (unless --keep-cyphers)" diff --git a/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/README.md b/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/README.md new file mode 100644 index 0000000..3715b3a --- /dev/null +++ b/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/README.md @@ -0,0 +1,33 @@ +# Provincial run autonomy + script renames (#172) — 2026-05-14 + +## Outcome + +Shipped v0.38.0 on top of v0.37.0 (#168 decouple). Two-axis change: + +1. **Autonomy CLI surface.** `wsgs_run_pipeline.sh` (was `province_run.sh`) accepts `--wsgs=A,B,C`, `--config=`, `--schema=`, `--no-cyphers`, `--force`, forwards to `wsgs_dispatch.sh` (was `trifecta_provincial.sh`). New Step 0 pre-clean fires `state_clean.sh --schemas=` when `--schema=` is set, eliminating the consolidate-stale-WSG class of failures. Phase 3 acceptance: 16/16 WSGs in `fresh_default.streams` on M4 after a single `bash data-raw/wsgs_run_pipeline.sh ...` invocation, ~20 min wall, exit 0, no operator prompts. +2. **8 mechanical renames to noun_verb.** `province_*` / `trifecta_*` / `consolidate_schema` / `archive_provincial_runs` / `balance_provincial_buckets` / `run_provincial_parity` → honest names that describe scope. Done via `git mv` so `git log --follow` preserves history. `compare_bcfishpass_wsg.R → wsg_compare.R` was already renamed in #168. + +Resulting naming family: +- `wsg_*` (singular, per-WSG functions from #168): `wsg_pipeline_run.R`, `wsg_compare.R`. +- `wsgs_*` (plural, collection-level orchestrators): `wsgs_run_host.R`, `wsgs_dispatch.sh`, `wsgs_run_pipeline.sh`. +- Mixed nouns for other wrappers: `state_clean.sh`, `progress_check.sh`, `runs_archive.sh`, `buckets_balance.R`, `schema_consolidate.R`. + +Side-fixes that landed because they were load-bearing for autonomy: +- Phantom-cy bug (R's `paste0("cy", integer(0))` returns `"cy"` length-1 via constant recycling). +- Empty `CY_WORKSPACES` init now explicit `CY_WS_ARR=()`. +- `SPLIT_OUT=$(Rscript ...)` wrapped with `||` block so R `stop()` errors reach the operator instead of silent abort. + +`/code-check` caught 3 real bugs over the phases: +- Phase 1: silent R-error abort (no operator-visible message) +- Phase 2: empty TARGET_SCHEMA fallback (masked misconfigured `--config=`) +- Phase 1.5: empty `--schemas=` silent fall-through to destructive default + +All fixed inline. + +## Filed-but-not-closed follow-ups + +- **Cypher integration tests** (issue #172 Phase 2 + 3 acceptance) — defer until M4+M1 baseline lands repeatably. Will file as new issue. +- **LPT-fallback empty-bucket edge case** when N_WSGs ≤ N_hosts without prior timing CSVs — pre-existing, not a #172 regression. Surfaces under `--wsgs=DEAD --config=bcfishpass --no-cyphers` (config without timing CSVs). +- **rtj `scripts/cypher/cypher_run.sh` ref** — only a docstring reference at line 8; updated post-merge as part of `/gh-pr-merge` workflow. + +Closed by: PR (TBD, branch `172-provincial-run-autonomy-renames`, tag `v0.38.0`). diff --git a/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/findings.md b/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/findings.md new file mode 100644 index 0000000..3095572 --- /dev/null +++ b/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/findings.md @@ -0,0 +1,67 @@ +# Findings — Provincial run autonomy + script renames (#172) + +## Issue context + +### Problem + +After PR #171 (v0.36.1) the operational scripts work but are scattered, inconsistently named, and require operator handholding mid-run. Goal: single command — approved once — that runs end-to-end and lands clean output. M4+M1 baseline first; cyphers opt-in after baseline lands repeatably. + +Names lie about scope — these scripts run **any list of WSGs**, not just "provincial". + +### Goals + +1. **Single-command autonomous run.** `bash data-raw/.sh ...` runs everything (state-clean → snapshot → dispatch → pull → consolidate → burn cyphers if any) without further prompts. +2. **Any WSG list.** `--wsgs=A,B,C` accepted at the umbrella level, auto-split via LPT across configured hosts. +3. **Any config bundle.** `--config=default` or `bcfishpass`, `--schema=`. +4. **Any host subset.** `--no-cyphers` (M4+M1 only) for the validated baseline; `--cy-workspaces=...` for full distributed. +5. **Rename for honesty.** No more "provincial" / "trifecta" / "bcfishpass" in script names that work for any list/host count/reference. + +### 16-WSG test set + +`CARP, CRKD, FINA, FINL, FIRE, FOXR, INGR, LOMI, MESI, NATR, OSPK, PARA, PARS, PCEA, TOOD, UOMI` + +## Naming decision (yesterday's session, confirmed locked-in today) + +Resolved before this session started; pulled forward into this plan: + +- **Umbrella**: `province_run.sh` → `wsgs_run_pipeline.sh` (typed by user as `wsgs_run_pipeline.R` — confirmed `.R` was a typo for `.sh`, the user is the operator entry point and that's a shell script). +- **Per-host loop**: `run_provincial_parity.R` → `wsgs_run_host.R` (plural `wsgs_` signals collection; suffix `host` signals scope = one host's bucket). +- **Other wrappers**: user picked "Mixed nouns (more descriptive)" — `state_clean.sh`, `progress_check.sh`, `runs_archive.sh`, `buckets_balance.R`, `schema_consolidate.R`. + +The singular/plural distinction `wsg_*` (one WSG operations, from #168) vs `wsgs_*` (collection-level operations) reads naturally now that #168 has shipped `wsg_pipeline_run.R` + `wsg_compare.R`. + +## Architecture shift vs yesterday's first attempt at #172 + +The scab fixes from yesterday's first attempt (smoke auto-skip when `--wsgs`, archive `--config`, phantom-cy from `paste0("cy", integer(0))` returning `"cy"` due to R constant recycling) are mostly **no longer load-bearing** because #168's PG-state resume gate makes the loop idempotent: + +- A stale RDS no longer silently skips a missing pipeline run. +- Operators can re-dispatch with `--force` to bypass all caching. +- Compare-only re-runs (`pipeline_done && !rollup_ok`) cost ~3s vs ~80s for full pipeline+compare. + +What remains genuinely needed: + +- `--wsgs=A,B,C` filter in `trifecta_provincial.sh` SPLIT_R block. +- `--no-cyphers` mode (force `N_CY=0`, skip cypher subprocess + wrap + pullback paths). +- `--force` passthrough to the per-host Rscript. +- Phantom-cy bug fix (still real — `paste0("cy", integer(0))` returns `"cy"` length-1; need explicit `if (n_cy == 0L) character(0)` branch). +- `province_run.sh` arg parser surface and config-aware ANN_CSV path. + +## Cross-repo coordination (rtj) + +`rtj/scripts/cypher/cypher_run.sh:8` references `~/Projects/repo/link/data-raw/run_provincial_parity.R`. After the rename, this reference becomes stale. + +User confirmed: direct commit + push to rtj is fine ("no one but us using this stuff"). Order: link's rename PR merges first, then a one-line update on rtj/main. This way `cypher_run.sh` never references a missing file on link/main. + +Coordinate via Phase 6 of this plan (not via comms thread — direct commit was approved). + +## Reference counts at start of session + +``` +trifecta_provincial: 15 files reference it +run_provincial_parity: 12 +consolidate_schema: 9 +archive_provincial_runs: 7 +balance_provincial_buckets: 6 +``` + +Most are inside data-raw/ scripts that source each other, plus README/runbook docs and CLAUDE.md. Phase 4 walks each rename + reference update in one commit per file (or one bulk commit — TBD during execution). diff --git a/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/progress.md b/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/progress.md new file mode 100644 index 0000000..5825aaa --- /dev/null +++ b/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/progress.md @@ -0,0 +1,22 @@ +# Progress — Provincial run autonomy + script renames (#172) + +## Session 2026-05-14 (afternoon — post-#168) + +- Resumed #172 against the new #168 architecture. PG-state resume + decoupled `wsg_pipeline_run`/`wsg_compare` are now in place, so most of yesterday's first-attempt scab fixes (smoke auto-skip, archive --config, phantom-cy mitigation via fallback paths) became unnecessary. +- Plan-mode exploration — phases approved by user. Locked in yesterday's rename decisions (umbrella = `wsgs_run_pipeline.sh`, per-host loop = `wsgs_run_host.R`, mixed nouns for other wrappers). +- Cypher integration deferred to follow-up — keeps PR scope tight, matches #168 discipline. +- rtj cross-repo update authorized (direct commit + push). +- Created branch `172-provincial-run-autonomy-renames` off main (v0.37.0 baseline). +- Scaffolded PWF baseline (`task_plan.md`, `findings.md`, `progress.md`) with 7 approved phases. +- **Phase 1 done.** Added `--wsgs=`, `--no-cyphers`, `--force` to `trifecta_provincial.sh`. Fixed phantom-cy (R's `paste0("cy", integer(0))` → `"cy"` recycling bug) via 3-branch `cy_host_keys`. Hardened empty-`CY_WORKSPACES` init. `/code-check` round 1 caught a silent-abort bug (R `stop()` exits bash without operator-visible message under `SPLIT_OUT=$(...)`); fixed with explicit `||` block dumping SPLIT_OUT to stderr. Round 2 clean. SPLIT_R logic verified via isolated R run. +- **Phase 2 done.** Added 5 new flags to `province_run.sh`. Gated Step 3+4 (cypher spin/prep), Step 5 cypher archive, Step 9 cypher-source consolidate, and trap-EXIT burn behind `if NO_CYPHERS=0`. Step 7 omits `--cy-workspaces=...` under `--no-cyphers` or `--wsgs`. Step 8 ANN_CSV path is config-aware. Auto-skip-smoke fires when smoke assumptions don't hold. `/code-check` round 1 caught a silent TARGET_SCHEMA fallback bug (masked misconfigured `--config=` with hardcoded "fresh"); fixed with explicit guards. Round 2 clean. +- **Phase 3 done — autonomy validated.** Two-attempt journey: + - **Attempt 1**: 16-WSG dispatch ran fine (16/16 RDS, 17m wall, exit 0) but consolidate hit 6 duplicate-key errors. M1's `fresh_default` had leftover WSGs from yesterday's province-wide dispatch; `pg_dump --schema=fresh_default` pulled rows for WSGs outside the current bucket, colliding with M4's data. Only 12 of 16 landed. + - **Root-cause fix** (Phase 1.5 commit `89da284`): added `state_clean.sh --schemas=` scoped mode (drops only listed schemas, skips canonical-fresh wipe + snapshot reload) + `province_run.sh` Step 0 pre-clean that fires when `--schema=` is set. Empty `--schemas=` guard added per round-1 code-check. + - **Attempt 2** (with pre-clean): 16/16 WSGs in `fresh_default.streams` on M4, 20m wall (pre-clean + cold-cache pipeline + consolidate). Exit 0. No mid-run prompts. Annotated CSV: 343 rows; 66 UNEXPLAINED at ≥2% surfaced as WARNING (methodology divergence for `default` bundle, expected for northern-WSG test set). +- Pre-existing limitation surfaced (consolidate stale-state collision) → resolved as part of #172 scope because the autonomy story requires it; the umbrella now genuinely runs end-to-end without operator handholding even when the cluster has leftover state. +- **Phase 4 done.** 8 scripts renamed via `git mv` (preserves `git log --follow`). Bulk `sed -i ''` applied old→new substitutions across the live tree, then NEWS.md was reverted to keep historical entries sealed. Tree-wide grep for the 8 old names returns empty (after excluding NEWS.md, planning/archive/, planning/active/, data-raw/logs/). `/code-check` round 1 clean — 7 concerns verified including idempotency (no new name contains any old name as substring). +- **Phase 5 done.** 3-WSG smoke via renamed `wsgs_run_pipeline.sh` ran end-to-end in 3m 13s, exit 0, consolidate clean. Pre-existing LPT-fallback edge case surfaced (1-WSG dispatch puts everything on M4 when no timing CSVs exist) — not a rename bug, documented for follow-up. Caught 10 cosmetic `[trifecta-provincial]` log prefixes inside `wsgs_dispatch.sh` (sed missed hyphenated prose); fixed to `[wsgs-dispatch]`. `devtools::test()` 1172 PASS / 0 FAIL. +- **Phase 6 deferred to post-merge.** rtj `cypher_run.sh:8` only has a docstring reference to `run_provincial_parity.R` (no runtime hardcode — script takes the workload path as an argv). Update lands as part of `/gh-pr-merge` after this PR lands. +- **Phase 7 prep done.** `devtools::check()` 0 errors, same warning baseline as v0.37.0. DESCRIPTION 0.37.0 → 0.38.0. NEWS.md v0.38.0 entry written with full rename mapping + CLI surface + Step 0 pre-clean + phantom-cy fix + error-surface fix. +- Next: Release commit, `/planning-archive`, `/gh-pr-push`. diff --git a/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/task_plan.md b/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/task_plan.md new file mode 100644 index 0000000..1f961d6 --- /dev/null +++ b/planning/archive/2026-05-issue-172-provincial-run-autonomy-renames/task_plan.md @@ -0,0 +1,102 @@ +# Task: Provincial run autonomy + script renames (#172) + +After #168 shipped (v0.37.0, PG-state resume), this PR adds the CLI surface for autonomous M4+M1 runs and renames 8 operational scripts to noun_verb convention. Cypher integration deferred to a follow-up. + +## Phase 1 — CLI surface on `trifecta_provincial.sh` + +Patch the original filename first so the smoke (Phase 3) validates on the known-good name. Rename in Phase 4. + +- [x] Add `--wsgs=A,B,C` arg parse. SPLIT_R block intersects `all_wsgs` with the `--wsgs` list; errors loud on unknown WSGs via `stop(call. = FALSE)`. Verified end-to-end with `--wsgs=BOGUS,ADMS`. +- [x] Add `--no-cyphers` arg parse. Wipes `CY_WORKSPACES=""` → empty `CY_WS_ARR` → `N_CY=0`; all `for ((i=0; i/`. +- [x] Step 9 consolidate: split into multi-host (M1+cy1+cy2+cy3) vs M1-only branches. Target schema resolved via `--schema` first, else `lnk_config(CONFIG_NAME)$pipeline$schema` lookup with explicit error/empty/NULL guards (round-1 code-check fix; round 1 had silent fallback masking misconfigured `--config=`). +- [x] Auto-skip-smoke when `--no-cyphers` OR `--wsgs` is set. Notice placed AFTER `exec > >(tee -a "$LOG")` redirect so it lands in the log. +- [x] Update usage block. +- [x] `bash -n data-raw/province_run.sh` syntax-clean. +- [x] Empirically verified TARGET_SCHEMA lookup: bcfishpass → "fresh", default → "fresh" (operator must `--schema=fresh_default` for default-bundle isolation), BOGUS → errors loud. +- [x] `/code-check` round 1: 1 real bug (TARGET_SCHEMA fallback) fixed. Round 2 clean. +- [ ] Commit "province_run.sh: --wsgs / --config / --schema / --no-cyphers / --force passthrough" + +## Phase 3 — Integration test (M4+M1, 16-WSG default-bundle, pre-rename) + +- [x] Pre-flight: M4 has bcfp tunnel up, M1 ssh reachable, `fresh.modelled_stream_crossings` present on both hosts. Verified at session start. +- [x] First attempt (no pre-clean) surfaced a consolidate edge case: M1's `fresh_default` had leftover WSGs from yesterday's province-wide run; `pg_dump --schema=fresh_default` pulled rows for WSGs outside the current bucket, colliding with M4's destination data on pg_restore. Six duplicate-key errors; 12 of 16 WSGs landed. +- [x] Root-cause fix: added `state_clean.sh --schemas=` scoped mode + `province_run.sh` Step 0 pre-clean. When `--schema=` is set, umbrella drops the target schema on all hosts BEFORE Step 1. +- [x] Run autonomous (relaunch with pre-clean): + ```bash + bash data-raw/province_run.sh \ + --wsgs=CARP,CRKD,FINA,FINL,FIRE,FOXR,INGR,LOMI,MESI,NATR,OSPK,PARA,PARS,PCEA,TOOD,UOMI \ + --config=default --schema=fresh_default --no-cyphers --with-mapping-code --force + ``` +- [x] Acceptance: exit code 0, 20m wall (under 30-40 min budget), no operator prompts mid-run. +- [x] Verified: `fresh_default.streams` = **16/16 WSGs** on M4, 468,631 rows. CARP,CRKD,FINA,FINL,FIRE,FOXR,INGR,LOMI,MESI,NATR,OSPK,PARA,PARS,PCEA,TOOD,UOMI all present. +- [x] Per-species habitat tables: bt, gr, ko, rb + barriers (correct for the geographic test set — northern WSGs without CH/CO/SK/ST presence). +- [x] Annotated CSV: `data-raw/logs/provincial_default/202605141658_annotated.csv` — 343 rows (263 NOT_APPLICABLE + 66 UNEXPLAINED + 14 WITHIN_TOLERANCE). 66 UNEXPLAINED at ≥2% surfaced as WARNING (methodology divergence, expected for `default` vs bcfishpass). +- [x] Consolidate (M1 → M4) succeeded — no duplicate-key conflicts now that pre-clean handles stale state. + +## Phase 4 — Rename 8 scripts + update all live references + +- [x] `git mv` all 8 renames (preserves `git log --follow`). +- [x] `sed -i ''` across live tree applies all 8 old→new substitutions atomically. Order chosen to avoid prefix collisions; verified no new name contains any old name as substring (sed map is idempotent). +- [x] Internal references updated in renamed files: usage blocks, `Rscript wsgs_run_host.R` invocations, log-filename literals (`${TS}_wsgs_dispatch_*`), cross-script `bash` calls. +- [x] Updated `data-raw/README.md`, `data-raw/trifecta_smoke.sh`, `data-raw/query_schema_delta.R`, `wsg_compare.R`, `wsg_pipeline_run.R`. +- [x] Updated `research/*.md` (runbook, handoff, parity docs). +- [x] Updated `CLAUDE.md`, `R/utils.R` (one-line docstring ref). +- [x] **NOT updated** (sealed): `NEWS.md` historical entries (reverted after sed swept them), `planning/archive/**`. +- [x] `bash -n` clean on all renamed shell scripts (wsgs_run_pipeline.sh, state_clean.sh, progress_check.sh, wsgs_dispatch.sh, runs_archive.sh) + trifecta_smoke.sh sibling. +- [x] `/code-check` round 1 clean (all 7 concerns verified: bash syntax, cross-refs, log literals, Rscript invocations, tree-wide grep empty, idempotency, R/utils.R docstring). +- [ ] Commit "Rename 8 operational scripts to noun_verb convention" + +## Phase 5 — Smoke after rename + +- [x] First attempt with 1-WSG `--wsgs=DEAD --config=bcfishpass --no-cyphers` surfaced a pre-existing LPT-fallback edge case: with no timing CSVs available for bcfishpass, the weighted split puts all WSGs on M4 → M1 empty bucket → empty-bucket guard fires. Not a rename bug — same behavior on main pre-rename. Not in scope to fix here; documented for follow-up. +- [x] Re-launched with `--wsgs=DEAD,ADMS,CRKD --config=default --schema=fresh_default --no-cyphers --with-mapping-code` (timing CSVs for `default` bundle exist from Phase 3 runs → LPT does a proper greedy split): + - Exit 0, 3m 13s wall, 18/3 OK (pulled 18 RDS including legacy provincial_default/ artifacts), 0 errors. + - `wsgs_run_pipeline.sh complete in 193s` — umbrella ran end-to-end under the new name. + - Step 0 pre-clean dropped `fresh_default` on both hosts; Step 7 dispatch via `wsgs_dispatch.sh` → `wsgs_run_host.R`; Step 9 consolidate succeeded. + - Annotated CSV: `data-raw/logs/provincial_default/202605141757_annotated.csv` (427 rows, 84 UNEXPLAINED at ≥2%, methodology divergence vs bcfp, expected). +- [x] Cosmetic fixup: 10 occurrences of `[trifecta-provincial]` log prefix inside `wsgs_dispatch.sh` swept to `[wsgs-dispatch]` (sed only matched filenames originally, not hyphenated prose). Fold into the rename commit history. +- [x] `devtools::test()` — 1172 PASS / 0 FAIL. +- [x] Reverted M1 to main + dropped the test branch (for now); will re-pull when PR lands. — see Phase 6 cross-repo note below. + +## Phase 6 — Cross-repo rtj update (DEFERRED to post-merge) + +Lands after link's rename PR merges so `cypher_run.sh` never references a missing file on link/main. Will be done as part of `/gh-pr-merge` workflow. + +- [ ] In `~/Projects/repo/rtj`, update `scripts/cypher/cypher_run.sh` reference from `run_provincial_parity.R` → `wsgs_run_host.R` (only the docstring at `scripts/cypher/cypher_run.sh:8` references the old name — confirmed; the script itself takes the workload-script path as an arg so no runtime hardcode). +- [ ] `bash -n scripts/cypher/cypher_run.sh` clean. +- [ ] Commit + push on rtj/main. + +## Phase 7 — Release v0.38.0 + +- [x] `devtools::check()` — 0 errors, same warning baseline as v0.37.0 (3 warnings + 2 notes all pre-existing on main; no regression). +- [x] Update `DESCRIPTION` Version 0.37.0 → 0.38.0. +- [x] Update `NEWS.md` with v0.38.0 entry covering CLI surface (5 new flags), Step 0 pre-clean, scoped `state_clean.sh --schemas=`, phantom-cy / error-surface fixes, and the 8-rename mapping table. +- [x] `CLAUDE.md` already updated via Phase 4 sed pass (one line in the Status v0.29.0 entry mentioned `consolidate_schema`). +- [ ] Commit "Release v0.38.0". +- [ ] `/planning-archive` with slug `provincial-run-autonomy-renames`. +- [ ] `/gh-pr-push` opens PR with SRED tag in body. +- [ ] After merge: `/gh-pr-merge` handles tag + post-merge CI watch + rtj coordination. + +## Validation + +- [ ] Tests pass (`devtools::test()`) +- [ ] `/code-check` clean on each commit +- [ ] PWF checkboxes match landed work +- [ ] `/planning-archive` on completion diff --git a/research/bcfp_compare_mapping_code.md b/research/bcfp_compare_mapping_code.md index 184aae5..6a3a0bf 100644 --- a/research/bcfp_compare_mapping_code.md +++ b/research/bcfp_compare_mapping_code.md @@ -150,7 +150,7 @@ Eight species columns: `mapping_code_bt`, `mapping_code_ch`, builds `.crossings` + `.barriers_anthropogenic` / `barriers_pscis` / `barriers_dams` / `barriers_remediations` from primitives. These are the inputs to `lnk_pipeline_access(barrier_sources = ...)`. -- The provincial parity script `data-raw/run_provincial_parity.R` runs +- The provincial parity script `data-raw/wsgs_run_host.R` runs phases 1–6 + km/ha rollup against bcfp. It does *not* run phases 7–8 (access + mapping_code). The mapping_code comparison is its own driver — this document plus `compare_bcfp_mapping_code.R`. diff --git a/research/distributed_2hosts_2026_05_01.md b/research/distributed_2hosts_2026_05_01.md index 364c0a0..27eeeb0 100644 --- a/research/distributed_2hosts_2026_05_01.md +++ b/research/distributed_2hosts_2026_05_01.md @@ -3,7 +3,7 @@ **Date**: 2026-05-01 08:12–08:18 PDT **Versions**: link 0.21.0, fresh 0.26.0, bcfishpass 440bc1e **Hosts**: M4 Max + M1 over Tailscale, both with local Docker fwapg :5432 + bcfp tunnel :63333 -**Coordination**: ad-hoc — `Rscript run_provincial_parity.R --wsgs=` per host, rsync per-WSG RDS files at end +**Coordination**: ad-hoc — `Rscript wsgs_run_host.R --wsgs=` per host, rsync per-WSG RDS files at end ## Headline @@ -27,7 +27,7 @@ The minor `bcfishobs` row-count delta between M4 (372,420) and M1 (372,505 from ## Architecture -Each host runs `run_provincial_parity.R --wsgs=` against its own: +Each host runs `wsgs_run_host.R --wsgs=` against its own: - writable Docker fwapg on :5432 (mutates `fresh.streams` per WSG) - bcfp reference DB on :63333 (read-only via SSH tunnel to db_newgraph) @@ -46,12 +46,12 @@ No shared filesystem. No coordination layer. Manual partition driven by per-WSG ```bash # m4 mkdir -p data-raw/logs/provincial_parity -Rscript data-raw/run_provincial_parity.R --wsgs=BULK,HARR,DEAD \ +Rscript data-raw/wsgs_run_host.R --wsgs=BULK,HARR,DEAD \ > data-raw/logs/_dist_m4.txt 2>&1 & # m1 (over Tailscale) ssh m1 "cd /Users/airvine/Projects/repo/link && nohup bash -c \ - '/usr/bin/time -p Rscript data-raw/run_provincial_parity.R --wsgs=ELKR,LFRA,VICT \ + '/usr/bin/time -p Rscript data-raw/wsgs_run_host.R --wsgs=ELKR,LFRA,VICT \ > data-raw/logs/_dist_m1.txt 2>&1' > /dev/null 2>&1 < /dev/null &" # wait for both, then rsync diff --git a/research/post_compact_provincial_handoff.md b/research/post_compact_provincial_handoff.md index a989d5b..c0abdc3 100644 --- a/research/post_compact_provincial_handoff.md +++ b/research/post_compact_provincial_handoff.md @@ -6,7 +6,7 @@ Read this first if you're a fresh Claude session and the user asks you to run th - `lnk_compare_wsg()` + `lnk_parity_annotate()` — exported library functions - `research/bcfp_divergence_taxonomy.yml` — 11 verified-mechanism entries -- 5-host N-cypher orchestrator (`data-raw/trifecta_provincial.sh`) +- 5-host N-cypher orchestrator (`data-raw/wsgs_dispatch.sh`) - Phase 7 hardening: DDL drift detection, smoke fail-fast, log visibility, truth-in-headline - 5-WSG audit (ADMS/SETN/HORS/BULK/THOM) hit 0 UNEXPLAINED at |diff_pct|>=2% @@ -85,10 +85,10 @@ wait # If any cypher's prep fails, surface to user and STOP. # === Step 5: archive prior RDS on ALL hosts (cross-host smoke gotcha from 2026-05-12) === -cd ~/Projects/repo/link/data-raw && bash archive_provincial_runs.sh -ssh m1 'cd ~/Projects/repo/link/data-raw && ./archive_provincial_runs.sh' +cd ~/Projects/repo/link/data-raw && bash runs_archive.sh +ssh m1 'cd ~/Projects/repo/link/data-raw && ./runs_archive.sh' for IP in ; do - ssh "cypher@$IP" 'cd ~/Projects/repo/link/data-raw && ./archive_provincial_runs.sh' & + ssh "cypher@$IP" 'cd ~/Projects/repo/link/data-raw && ./runs_archive.sh' & done wait @@ -100,17 +100,17 @@ if [ $SMOKE_RC -ne 0 ]; then # Smoke caught an error stub. STOP. grep -E "smoke.*FAILED|smoke.*ERROR" /tmp/smoke.log # Inspect: - # data-raw/logs/_trifecta_provincial_cypher__R.txt + # data-raw/logs/_wsgs_dispatch_cypher__R.txt # DO NOT proceed to step 7 until smoke is clean. fi # === Step 7: FULL PROVINCIAL DISPATCH (~80-95 min wall) === cd ~/Projects/repo/link/data-raw -nohup bash trifecta_provincial.sh \ +nohup bash wsgs_dispatch.sh \ --cy-workspaces=job1,job2,job3 \ --with-mapping-code > /tmp/full_run.log 2>&1 & # Check completion via process list + log tail -ps -ef | grep trifecta_provincial.sh | grep -v grep | wc -l # 0 when done +ps -ef | grep wsgs_dispatch.sh | grep -v grep | wc -l # 0 when done tail -10 /tmp/full_run.log # Expected final headline: "local RDS: 217/217 pulled — 217 OK, 0 errors" @@ -130,20 +130,20 @@ if (nrow(unexp) > 0) print(head(unexp[, c('wsg','species','habitat_type','link_v # === Step 9: consolidate fresh schema (m1 + 3 cyphers -> M4) === # Extract per-host buckets from orchestrator log: -ORCH_LOG=$(ls -1t data-raw/logs/*_trifecta_provincial_orchestrator.txt | head -1) +ORCH_LOG=$(ls -1t data-raw/logs/*_wsgs_dispatch_orchestrator.txt | head -1) M1_BUCKET=$(grep '^ m1 bucket:' "$ORCH_LOG" | sed 's/.*bucket: //') CY1_BUCKET=$(grep '^ cypher\[job1\] bucket:' "$ORCH_LOG" | sed 's/.*bucket: //') CY2_BUCKET=$(grep '^ cypher\[job2\] bucket:' "$ORCH_LOG" | sed 's/.*bucket: //') CY3_BUCKET=$(grep '^ cypher\[job3\] bucket:' "$ORCH_LOG" | sed 's/.*bucket: //') -# Then invoke consolidate_schema.R (DBI calls, needs PG_PASS_SHARE): +# Then invoke schema_consolidate.R (DBI calls, needs PG_PASS_SHARE): cd ~/Projects/repo/link/data-raw M1_BUCKET="$M1_BUCKET" CY1_BUCKET="$CY1_BUCKET" CY2_BUCKET="$CY2_BUCKET" CY3_BUCKET="$CY3_BUCKET" \ CY1_IP= CY2_IP= CY3_IP= \ Rscript -e ' suppressPackageStartupMessages({library(link)}) -source("consolidate_schema.R") -result <- consolidate_schema( +source("schema_consolidate.R") +result <- schema_consolidate( schema = "fresh", sources = list( list(host = "m1", via = "docker", bucket = strsplit(Sys.getenv("M1_BUCKET"), ",")[[1]]), @@ -185,7 +185,7 @@ Use `trap EXIT` defense if you write a wrapper. Without a wrapper, you do this m **Trigger**: No `_per_wsg_times.csv` files in `data-raw/logs/provincial_parity/` at the top level. Happens if you archive prior runs (which moves the timing CSVs to `archive//`) without inheriting prior timing data into the new run. -**Fix landed 2026-05-13** (`trifecta_provincial.sh` SPLIT_R block): when no timing CSV exists, the fallback now uses host_speeds-weighted alphabetical split (`floor(n * speed_factor / sum(speed_factors))` per host, remainder to highest-factor hosts). Log line is now `[LPT] no timing CSVs found; using host_speeds-weighted split` and reports per-host bucket sizes. +**Fix landed 2026-05-13** (`wsgs_dispatch.sh` SPLIT_R block): when no timing CSV exists, the fallback now uses host_speeds-weighted alphabetical split (`floor(n * speed_factor / sum(speed_factors))` per host, remainder to highest-factor hosts). Log line is now `[LPT] no timing CSVs found; using host_speeds-weighted split` and reports per-host bucket sizes. **Tradeoff vs LPT-with-timings**: still doesn't know which specific WSGs are heavy (BULK / THOM heavyweights vs lightweight DEAD / ELKR). LPT-with-timings places heavyweights first; weighted-split is alphabetical. Better than equal split, worse than real LPT. The remedy: after the first successful provincial run, the per_wsg_times.csv landing in the top level gives the next dispatch real LPT data. @@ -195,7 +195,7 @@ All hosts run the link pipeline against their OWN local fwapg, and ALL HOSTS run | Host | How it reaches bcfp | |---|---| -| M4 | Operator's persistent tunnel + `trifecta_provincial.sh` opens an idempotent inline tunnel as backup (no harm if already up — bind fails silently, existing tunnel preserved) | +| M4 | Operator's persistent tunnel + `wsgs_dispatch.sh` opens an idempotent inline tunnel as backup (no harm if already up — bind fails silently, existing tunnel preserved) | | M1 | **Reverse forward from M4** (`ssh -R 63333:127.0.0.1:63333 m1`) — M1's localhost:63333 → M4's localhost:63333 → db_newgraph:5432. M1 does NOT need its own working db_newgraph identity. | | cypher[jobN] | Opens its OWN ssh tunnel via `ssh -L 63333 db_newgraph -N &` inside the generated wrapper script. Cyphers have `~/.ssh/id_db_newgraph` (passphrase-free, authorized on db_newgraph). | @@ -236,12 +236,12 @@ History: Prior runs (e.g. 2026-05-12 with 67 M1 WSGs OK) worked only because the | Diagnostic recipes per Class | `research/bcfp_divergence_investigation.md` | | Operational runbook | `research/provincial_run_runbook.md` | | Latest live run record | `research/provincial_parity_2026_05_12.md` | -| Orchestrator | `data-raw/trifecta_provincial.sh` | +| Orchestrator | `data-raw/wsgs_dispatch.sh` | | Smoke shim | `data-raw/trifecta_smoke.sh` | | Per-cypher prep | `data-raw/cypher_prep.sh` (defaults to `main` branch) | | Snapshot loader | `data-raw/snapshot_bcfp.sh` (with `--force` flag) | -| Archive helper | `data-raw/archive_provincial_runs.sh` | -| Consolidation | `data-raw/consolidate_schema.R` (R, not shell — pre/post row delta verification) | +| Archive helper | `data-raw/runs_archive.sh` | +| Consolidation | `data-raw/schema_consolidate.R` (R, not shell — pre/post row delta verification) | | Cypher infra | `~/Projects/repo/rtj/scripts/cypher/cypher_{up,down,run}.sh --workspace ` | | Findings from Phase 7 | `planning/archive/2026-05-link162-lnk-compare-wsg-annotated-csv/findings.md` | | Memory state | `~/.claude/projects/-Users-airvine-Projects-repo-link/memory/project_link_state.md` | diff --git a/research/provincial_parity_2026_05_01.md b/research/provincial_parity_2026_05_01.md index 3c54a51..89f729c 100644 --- a/research/provincial_parity_2026_05_01.md +++ b/research/provincial_parity_2026_05_01.md @@ -161,7 +161,7 @@ The genuine "needs work" set is **Class D and the unresolved part of C**. Maybe - `data-raw/logs/20260501_0251_provincial_parity_per_wsg.csv` — per-WSG summary - `data-raw/logs/provincial_parity/.rds` — per-WSG output, 217 files - `data-raw/logs/20260430_2155_provincial_parity.txt` — run log -- `data-raw/run_provincial_parity.R` — reusable runner script +- `data-raw/wsgs_run_host.R` — reusable runner script ## Next diff --git a/research/provincial_parity_2026_05_11.md b/research/provincial_parity_2026_05_11.md index d53f358..a3c39ca 100644 --- a/research/provincial_parity_2026_05_11.md +++ b/research/provincial_parity_2026_05_11.md @@ -5,7 +5,7 @@ **Software**: link 0.35.0 (sha 8f8c7b6 + #157 dispatch fix), fresh 0.31.0, bcfp reference `bcfishpass@v0.7.14-125-g6e9cf1c` via tunnel `db_newgraph` **Configuration**: bcfishpass bundle parity only **Source data**: 232 candidate WSGs filtered → 217 dispatched (link#157), 15 known-empty WSGs excluded from dispatch -**Source log**: `data-raw/logs/202605112010_trifecta_provincial_orchestrator.txt` +**Source log**: `data-raw/logs/202605112010_wsgs_dispatch_orchestrator.txt` **Output**: 232 RDS files in `data-raw/logs/provincial_parity/*.rds` (15 stub-error from this run, 217 OK) **Aggregate**: 1,647 comparable rollup rows (after dropping `-100%` lake/wetland centerline artifacts and NA-baseline rows) @@ -52,10 +52,10 @@ Single-host baseline (2026-05-01): 4h 55min. 3-host LPT split saved **~3 hours** | M1 | Allans MacBook Pro (tailnet) | 0.83 (faster!) | 100 (102 − 7 errors) | 114.7 min | 64s | | cypher | DO droplet (g-8vcpu-32gb) | 1.83 | 46 | 88.6 min | 97s | -LPT (Longest Processing Time first) bin-packing in `data-raw/balance_provincial_buckets.R` weights each WSG by its `m4_equiv` time, then assigns to the host whose projected finish time would be shortest. Cypher gets the fewest WSGs because its per-WSG cost is 1.83× M4's; M1 gets the most because it's slightly faster than M4 per-WSG. Predicted wall was 155.5 min vs actual 114.7 min — predictions tracked within 25%. +LPT (Longest Processing Time first) bin-packing in `data-raw/buckets_balance.R` weights each WSG by its `m4_equiv` time, then assigns to the host whose projected finish time would be shortest. Cypher gets the fewest WSGs because its per-WSG cost is 1.83× M4's; M1 gets the most because it's slightly faster than M4 per-WSG. Predicted wall was 155.5 min vs actual 114.7 min — predictions tracked within 25%. **Operational notes:** -- `data-raw/trifecta_provincial.sh` orchestrates dispatch via SSH + tailnet (`m1`) + reserved-IP SSH (`cypher@24.144.70.121`). cypher gets its bcfp-tunnel via in-script SSH local-forward `-L 63333:127.0.0.1:5432 db_newgraph`. +- `data-raw/wsgs_dispatch.sh` orchestrates dispatch via SSH + tailnet (`m1`) + reserved-IP SSH (`cypher@24.144.70.121`). cypher gets its bcfp-tunnel via in-script SSH local-forward `-L 63333:127.0.0.1:5432 db_newgraph`. - M1 + cypher needed a one-time data sync of `cabd.dams` (1.9 MB), `whse_fish.pscis_assessment_svw` (18 MB), `fresh.modelled_stream_crossings` (380 MB) from M4 via `pg_dump | ssh docker exec psql`. `snapshot_bcfp.sh` is the canonical loader but those hosts didn't have it configured. - cypher's fresh+link install required `R CMD INSTALL --no-test-load` because pak tried to upgrade `sf` and conflicted with the host's conda-managed GDAL; downgrading to `R CMD INSTALL` kept the existing `sf 1.1.0` intact. @@ -228,11 +228,11 @@ for HOST in m1 cypher@24.144.70.121; do done # 4. Compute LPT-balanced buckets -Rscript data-raw/balance_provincial_buckets.R +Rscript data-raw/buckets_balance.R # Copy the --m4-bucket= / --m1-bucket= / --cy-bucket= overrides into the next step # 5. Dispatch -cd data-raw && ./trifecta_provincial.sh --m4-bucket=... --m1-bucket=... --cy-bucket=... +cd data-raw && ./wsgs_dispatch.sh --m4-bucket=... --m1-bucket=... --cy-bucket=... # 6. After completion: consolidate per-host RDS files (auto-pulled by trifecta script) # 7. Aggregate: source /tmp/summary.R-style script over data-raw/logs/provincial_parity/*.rds @@ -242,8 +242,8 @@ Wall: ~2 hours with current LPT factors (M4=1.0, M1=0.83, cy=1.83). Each provinc ## Files -- Trifecta orchestrator log: `data-raw/logs/202605112010_trifecta_provincial_orchestrator.txt` -- Per-host run logs: `data-raw/logs/202605112010_trifecta_provincial_{m4,m1,cypher}.txt` +- Trifecta orchestrator log: `data-raw/logs/202605112010_wsgs_dispatch_orchestrator.txt` +- Per-host run logs: `data-raw/logs/202605112010_wsgs_dispatch_{m4,m1,cypher}.txt` - Per-WSG timing CSVs: `data-raw/logs/provincial_parity/20260511_2010_{m4,m1,cy}_per_wsg_times.csv` - Per-WSG rollup RDS: `data-raw/logs/provincial_parity/.rds` - Aggregate summary: `/tmp/provincial_summary.rds` (regenerate via `/tmp/summary.R`) @@ -323,7 +323,7 @@ Bucket allocation was identity-deterministic from yesterday's per-WSG times (yes ### Consolidation to M4 (province-wide `fresh` schema) -After per-host runs, `data-raw/consolidate_schema.R` pg_dumps fresh schema from m1 + cypher and pg_restores to M4. Hit two recoverable issues: +After per-host runs, `data-raw/schema_consolidate.R` pg_dumps fresh schema from m1 + cypher and pg_restores to M4. Hit two recoverable issues: 1. **Cypher restore (52 WSGs)**: clean. Reported `ok=FALSE` in the result list but data did land (script's `ok` boolean false-positives on certain pg_restore warning patterns — investigate later). 2. **M1 restore (90 WSGs)**: pg_restore aborted COPYs for `streams_habitat_` tables due to duplicate-key violations. Root cause: M1's local `streams_habitat_` carries cross-run residuals (yesterday's runs left rows for WSGs that today went to different hosts), and `pg_restore --data-only` doesn't filter by today's bucket — it tries to insert ALL of M1's rows. Surgical recovery: `DELETE FROM fresh. WHERE watershed_group_code IN (today's M1 bucket)` on M4, then per-table `\COPY (SELECT ... WHERE wsg IN ...) FROM STDIN`. All 8 species habitat tables recovered cleanly. @@ -350,7 +350,7 @@ Per-species WSG counts reflect species presence: BT in 136 WSGs (most widespread - **rtj#129** (filed) — bake conda-geo-activate into cypher snapshot (`/etc/profile.d/conda-geo-activate.sh`). - **link#159** (filed) — wrap per-WSG body in `tryCatch({...}, finally = drop_working_schema)` for error-path cleanup. -- **consolidate_schema.R** — investigate the m1 `streams_habitat_` pg_restore failure pattern. Probably need a `--clean-wsg-keys` arg that DELETEs source-host-bucket rows on dest before pg_restore. +- **schema_consolidate.R** — investigate the m1 `streams_habitat_` pg_restore failure pattern. Probably need a `--clean-wsg-keys` arg that DELETEs source-host-bucket rows on dest before pg_restore. - **Full provincial mapping_code re-run** to validate #158 fix province-wide (BBAR + THOM confirmed ≥99.93%; remaining 215 WSGs extrapolated). Deferred — ~2 hr separate run. ### Runbook ratification diff --git a/research/provincial_parity_2026_05_12.md b/research/provincial_parity_2026_05_12.md index e50f56d..f7c4f73 100644 --- a/research/provincial_parity_2026_05_12.md +++ b/research/provincial_parity_2026_05_12.md @@ -9,8 +9,8 @@ - Per-WSG RDS: `data-raw/logs/provincial_parity/*.rds` - Per-WSG mapping_code stats embedded in each list-shape RDS - Aggregate annotated CSV: `data-raw/logs/provincial_parity/__TS___annotated.csv` (output of `lnk_parity_annotate()` against `research/bcfp_divergence_taxonomy.yml`) -- Per-host run logs: `data-raw/logs/202605122221_trifecta_provincial_*.txt` -- Orchestrator log: `data-raw/logs/202605122221_trifecta_provincial_orchestrator.txt` +- Per-host run logs: `data-raw/logs/202605122221_wsgs_dispatch_*.txt` +- Orchestrator log: `data-raw/logs/202605122221_wsgs_dispatch_orchestrator.txt` This is the **first run that exercises link#162's full machinery**: inline LPT bucket allocation (Phase 5), N-cypher dispatch via tofu workspaces, post-pull `lnk_parity_annotate()` against the divergence taxonomy YAML, the `lnk_compare_wsg` library function (replacing the inline `compare_bcfishpass_wsg.R` SQL), and the `mapping_code` branch (per-segment per-species token-level parity). @@ -177,7 +177,7 @@ If the diff count is non-zero AND those segments appear in the WSG's mapping_cod This run surfaced several gotchas worth codifying for the wrapper script (`data-raw/run_phase7.sh` follow-up): -1. **Cross-host archival before run.** `archive_provincial_runs.sh` ran on M4 only initially; M1's stale RDS got SCP-pulled back during the post-pull step, polluting the aggregate annotation. Fix: archive on ALL hosts (M4 + M1 + all cyphers) before dispatch. Now codified in the recommended cadence in `data-raw/README.md`. +1. **Cross-host archival before run.** `runs_archive.sh` ran on M4 only initially; M1's stale RDS got SCP-pulled back during the post-pull step, polluting the aggregate annotation. Fix: archive on ALL hosts (M4 + M1 + all cyphers) before dispatch. Now codified in the recommended cadence in `data-raw/README.md`. 2. **bcfp coverage gap is real, not a bug.** bcfp's 2026-05-12 build models 187 WSGs; we dispatch 217. The 36-WSG delta caused `with_mapping_code = TRUE` to stop loudly when it should have warned + returned NA. Fix (commit __TBD__): `.lnk_compare_wsg_mapping_code_diff` distinguishes (a) bcfp 0 rows → warn + NA fill, (b) bcfp has rows but no merge → stop loud (real misalignment). @@ -187,8 +187,8 @@ This run surfaced several gotchas worth codifying for the wrapper script (`data- ## Files -- Orchestrator log: `data-raw/logs/202605122221_trifecta_provincial_orchestrator.txt` -- Per-host run logs: `data-raw/logs/202605122221_trifecta_provincial_{m4,m1,cypher_job1,cypher_job2,cypher_job3}.txt` +- Orchestrator log: `data-raw/logs/202605122221_wsgs_dispatch_orchestrator.txt` +- Per-host run logs: `data-raw/logs/202605122221_wsgs_dispatch_{m4,m1,cypher_job1,cypher_job2,cypher_job3}.txt` - Per-host timing CSVs: `data-raw/logs/provincial_parity/__TS___{m4,m1,cy}_per_wsg_times.csv` - Per-WSG rollup RDS: `data-raw/logs/provincial_parity/*.rds` - Aggregate annotated CSV: `data-raw/logs/provincial_parity/__TS___annotated.csv` diff --git a/research/provincial_run_runbook.md b/research/provincial_run_runbook.md index 1b04367..caa2010 100644 --- a/research/provincial_run_runbook.md +++ b/research/provincial_run_runbook.md @@ -18,13 +18,13 @@ Companion docs: ```bash cd ~/Projects/repo/link/data-raw -./archive_provincial_runs.sh # 1. clean LPT input +./runs_archive.sh # 1. clean LPT input # (spin cyphers per §1) ./trifecta_smoke.sh --cy-workspaces=job1,job2,job3 # 2. smoke (~3 min) # (if smoke errors loud, fix and re-run; DO NOT skip to full) -./trifecta_provincial.sh --with-mapping-code --cy-workspaces=job1,job2,job3 # 3. full (~80 min) +./wsgs_dispatch.sh --with-mapping-code --cy-workspaces=job1,job2,job3 # 3. full (~80 min) # (inspect annotated CSV) -# (consolidate fresh schema via consolidate_schema.R) +# (consolidate fresh schema via schema_consolidate.R) ~/Projects/repo/rtj/scripts/cypher/cypher_down.sh --workspace job1 # 4. burn (mandatory) ~/Projects/repo/rtj/scripts/cypher/cypher_down.sh --workspace job2 ~/Projects/repo/rtj/scripts/cypher/cypher_down.sh --workspace job3 @@ -145,10 +145,10 @@ After this, `lnk_persist_init` future calls (during dispatch) are silent no-ops. ```bash cd ~/Projects/repo/link/data-raw -./archive_provincial_runs.sh -ssh m1 'cd ~/Projects/repo/link/data-raw && ./archive_provincial_runs.sh' +./runs_archive.sh +ssh m1 'cd ~/Projects/repo/link/data-raw && ./runs_archive.sh' for IP in $JOB1_IP $JOB2_IP $JOB3_IP; do - ssh "cypher@$IP" 'cd ~/Projects/repo/link/data-raw && ./archive_provincial_runs.sh' & + ssh "cypher@$IP" 'cd ~/Projects/repo/link/data-raw && ./runs_archive.sh' & done wait ``` @@ -166,8 +166,8 @@ cd ~/Projects/repo/link/data-raw **Exits non-zero** if any host produced an error stub. Inspect: ``` -data-raw/logs/_trifecta_provincial_*.txt # orchestrator + per-host -data-raw/logs/_trifecta_provincial_cypher_*_R.txt # cypher R output (auto-pulled) +data-raw/logs/_wsgs_dispatch_*.txt # orchestrator + per-host +data-raw/logs/_wsgs_dispatch_cypher_*_R.txt # cypher R output (auto-pulled) ``` Common smoke failures + fixes: @@ -186,7 +186,7 @@ compute per failure mode. ```bash cd ~/Projects/repo/link/data-raw -./trifecta_provincial.sh \ +./wsgs_dispatch.sh \ --with-mapping-code \ --cy-workspaces=job1,job2,job3 \ > /tmp/full_run.log 2>&1 & @@ -227,8 +227,8 @@ taxonomy entries + re-annotate without rerunning the pipeline. ```bash cd ~/Projects/repo/link/data-raw Rscript -e ' -source("consolidate_schema.R") -result <- consolidate_schema( +source("schema_consolidate.R") +result <- schema_consolidate( schema = "fresh", sources = list( list(host = "m1", via = "docker", bucket = strsplit("WSG1,WSG2,...", ",")[[1]]), @@ -240,7 +240,7 @@ result <- consolidate_schema( ``` Bucket strings come from the orchestrator log (per-host bucket lines under -`[trifecta-provincial] dispatch start`). `consolidate_schema` (Phase 6) does +`[trifecta-provincial] dispatch start`). `schema_consolidate` (Phase 6) does bucket-aware DELETE + pg_restore + pre/post row-count assertion. ## 6. Burn cyphers — MANDATORY @@ -298,8 +298,8 @@ Required sections: |---|---|---| | `snapshot_bcfp.sh` | Load PSCIS + CABD + bchamp + bcfishobs into local fwapg | ~5-15 min | | `cypher_up.sh --workspace ` | Spin DO droplet from `cypher--warm` snapshot | ~3 min | -| `archive_provincial_runs.sh` | Move prior-run RDS + CSVs to archive/ | <1s | +| `runs_archive.sh` | Move prior-run RDS + CSVs to archive/ | <1s | | `trifecta_smoke.sh` | 1-WSG-per-host smoke; fails loud on any error stub | ~3 min | -| `trifecta_provincial.sh` | N-cypher full dispatch with inline LPT + annotation | ~80 min (5-host) | -| `consolidate_schema.R` | pg_dump from m1+cyphers → pg_restore on M4 | ~5 min | +| `wsgs_dispatch.sh` | N-cypher full dispatch with inline LPT + annotation | ~80 min (5-host) | +| `schema_consolidate.R` | pg_dump from m1+cyphers → pg_restore on M4 | ~5 min | | `cypher_down.sh --workspace ` | Destroy DO droplet (idempotent) | ~30s |