Skip to content

Decouple bcfp compare from link pipeline run — separate compare-only step #168

@NewGraphEnvironment

Description

@NewGraphEnvironment

Principle

Modelling stands on its own. The link pipeline produces the BC freshwater network model — PG fresh.* tables. That's the deliverable. It must be runnable end-to-end without any comparison framework attached.

Comparison is separate. Parity vs bcfishpass (or any future reference: federal data, regression-against-previous-run, etc.) is a diagnostic overlay on top of the canonical model. It reads from PG, produces side artifacts (RDS, annotated CSV), and never gates whether the model itself ran.

Today these are coupled — modelling and comparison live inside one function (compare_bcfishpass_wsg.R) called from one loop (run_provincial_parity.R). The loop's resume-check uses RDS existence (the comparison artifact), not PG state (the model). Result: a stale comparison RDS silently skips re-running the model, even when PG never received the model output.

Concrete reproduction (2026-05-14)

While running a 16-WSG --no-cyphers integration test on the #172 branch, the dispatch reported 16 OK, 0 errors but only 12 of 16 WSGs actually populated fresh_default.streams. Four WSGs (FINA, CRKD on M4; INGR, MESI on M1) had stale RDS files from earlier failed attempts. The cache-skip in run_provincial_parity.R:200-205 saw the RDS and skipped them:

for (w in wsgs) {
  out_rds <- file.path(out_dir, paste0(w, ".rds"))
  if (file.exists(out_rds)) {
    cat(format(Sys.time(), "%H:%M:%S"), "  ", w, " (cached, skip)\n", sep = "")
    next   # ← skips PIPELINE *and* compare; PG may be empty
  }
  ...
  out <- compare_bcfishpass_wsg(wsg = w, config = cfg, ...)  # ← pipeline + compare bundled
  saveRDS(out, out_rds)
}

Reference run: data-raw/logs/202605141122_trifecta_provincial_orchestrator.txt.

Proposed: decoupled architecture

Two independent functions

Function Role Input Output
lnk_pipeline_run(conn, wsg, cfg, ...) Model — runs the lnk_pipeline_* chain end-to-end local fwapg conn PG: fresh.streams, fresh.streams_habitat_<sp>, fresh.barriers for this WSG
lnk_compare_one(conn, conn_ref, wsg, cfg, reference, ...) Comparison — reads PG, queries reference, returns diff local + reference conns R tibble (caller persists as RDS or CSV)

lnk_compare_one() is reference-agnostic via the reference arg — "bcfishpass" today, future references plug in without naming changes.

Today's lnk_compare_wsg() becomes a thin convenience wrapper that calls both (for backwards-compat with operators who want the bundled behavior). New code uses the two split functions.

data-raw/ companions, renamed for honesty

Old name (lies about scope) New name Role
data-raw/compare_bcfishpass_wsg.R data-raw/wsg_pipeline_run.R + data-raw/wsg_compare.R Split into the two concerns. wsg_compare.R is reference-agnostic, not bcfp-specific.

The other 8 script renames (trifecta_provincial.shwsgs_dispatch.sh etc.) stay in #172, which builds the autonomy CLI surface on top of this decoupled foundation.

Resume-check uses PG, not the filesystem

for (w in wsgs) {
  pipeline_done <- wsg_in_pg(conn, w, schema = cfg$pipeline$schema)
  rollup_done   <- file.exists(out_rds) && !is_error_stub(out_rds)

  if (pipeline_done && rollup_done) {
    next                              # fully cached — skip
  } else if (pipeline_done && !rollup_done) {
    # cheap: compare-only against existing PG state
    rollup <- lnk_compare_one(conn, conn_ref, w, cfg, reference = "bcfishpass")
    saveRDS(rollup, out_rds)
  } else {
    # pipeline missing: run model + compare
    lnk_pipeline_run(conn, w, cfg)
    rollup <- lnk_compare_one(conn, conn_ref, w, cfg, reference = "bcfishpass")
    saveRDS(rollup, out_rds)
  }
}

PG is the canonical state. RDS is a diagnostic side-artifact. Operator can drop PG rows to force re-model, or delete RDS to force re-compare — independently.

--force flag

Operator-friendly opt-out of all caching for a re-dispatch:

bash data-raw/wsgs_dispatch.sh --wsgs=A,B,C --force

--force ignores both PG presence and RDS presence; re-runs both pipeline and compare.

Acceptance

  • R/lnk_pipeline_run.R exported: runs link pipeline for one WSG, writes to PG, returns invisibly.
  • R/lnk_compare_one.R exported: runs comparison query for one WSG, returns tibble. Reference-agnostic.
  • R/lnk_compare_wsg.R becomes a thin convenience wrapper calling both (deprecation note in docs but kept for compat).
  • data-raw/compare_bcfishpass_wsg.R deleted; data-raw/wsg_pipeline_run.R + data-raw/wsg_compare.R added.
  • data-raw/run_provincial_parity.R updated to use PG-state resume check + the two new functions. (Rename to wsgs_run_host.R stays in Provincial run autonomy: rename scripts to noun_verb + single 'approve once, builds end-to-end' wrapper #172.)
  • 1-WSG smoke: re-run with all PG rows present + RDS present → "fully cached, skip" (fast no-op).
  • 1-WSG smoke: re-run with PG rows present + RDS missing → only the compare step fires (~1-2s).
  • 1-WSG smoke: re-run with PG rows missing → both pipeline + compare fire.
  • 1-WSG smoke: --force re-runs both regardless of cache state.
  • devtools::test() passes.

Out of scope (separate issues)

Cross-ref

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions