Skip to content

_targets.R: orchestrate the comparison pipeline with targets #38

@NewGraphEnvironment

Description

@NewGraphEnvironment

Problem

The habitat classification pipeline lives in a 635-line script (data-raw/compare_bcfishpass.R). The script:

  • Loops over WSGs internally — no caching, no skip-unchanged
  • Hardcodes the config
  • Repeats orchestration logic per variant
  • Can't parallelize across hosts (rtj is standing up M4+M1 distributed runners — see rtj/docs/distributed-fwapg.md)
  • Duplicates the pipeline DAG in research/bcfishpass_comparison.md (hand-maintained Mermaid)

Targets is the right tool:

  • Each DAG node = one tar_target() — inspectable, cacheable, skippable
  • tar_map(wsg = c(...)) for per-WSG parallelism — matches rtj's crew_controller_group design
  • tar_mermaid() regenerates the DAG in the research doc automatically
  • Config variants = different bundles loaded at the top, no code fork

Architecture: package vs pipeline

This refactor produces two distinct things that must NOT be mixed:

1. Orchestration helpers (belong in the link package — R/)

Generic building blocks. Any user running a habitat classification pipeline wants these (bcfishpass comparison, newgraph defaults, custom variants). Signature: (conn, aoi, cfg, schema) → writes DB state. Exported, documented, tested.

2. The bcfishpass comparison pipeline itself (does NOT belong in R/)

_targets.R + compare_bcfishpass_wsg() + crew config. Specific to our regression against bcfishpass reference tables. Lives in data-raw/ — the canonical R-package convention for "code that USES this package to produce outputs but isn't part of the package itself." Other users running their own comparison pipelines write their own _targets.R.

Proposed structure

link/
├── R/
│   ├── lnk_config.R                          # shipped in 0.2.0
│   ├── lnk_pipeline_setup.R            # exported helper
│   ├── lnk_pipeline_load.R             # exported helper
│   ├── lnk_pipeline_prepare.R           # exported helper (uses frs_barriers_minimal)
│   ├── lnk_pipeline_break.R          # exported helper
│   ├── lnk_pipeline_classify.R                # exported helper
│   └── lnk_pipeline_connect.R                 # exported helper
├── data-raw/
│   ├── _targets.R                            # comparison pipeline (NOT installed)
│   ├── compare_bcfishpass_wsg.R              # per-WSG target function (NOT installed)
│   ├── compare_bcfishpass.R                  # DELETE once verified
│   └── logs/
│       └── YYYYMMDD_NN_tar_make-first-run.txt
├── research/
│   └── bcfishpass_comparison.md              # DAG regenerated from tar_mermaid()
└── tests/                                    # tests the helpers, not the pipeline

targets runs via targets::tar_config_set(script = "data-raw/_targets.R") or cd data-raw && R -e "targets::tar_make()".

_targets.R skeleton (goes in data-raw/)

library(targets)
library(crew)
library(link)
library(fresh)

tar_option_set(
  packages = c("link", "fresh", "DBI", "RPostgres", "dplyr", "tibble"),
  controller = crew_controller_local(workers = 2)    # single-host; swap later for crew_controller_group
)

wsgs <- c("ADMS", "BULK", "BABL", "ELKR")

source("compare_bcfishpass_wsg.R")

list(
  tar_target(cfg, lnk_config("bcfishpass")),

  tar_map(
    values = tibble::tibble(wsg = wsgs),
    tar_target(comparison, compare_bcfishpass_wsg(wsg = wsg, config = cfg))
  ),

  tar_target(
    rollup,
    dplyr::bind_rows(!!!rlang::syms(paste0("comparison_", wsgs)))
  ),

  tar_target(
    dag_mermaid,
    writeLines(tar_mermaid(targets_only = TRUE), "../research/dag_generated.mmd")
  )
)

compare_bcfishpass_wsg() shape (in data-raw/)

compare_bcfishpass_wsg <- function(wsg, config) {
  conn <- link::lnk_db_conn()                        # localhost on whichever worker runs this
  on.exit(DBI::dbDisconnect(conn), add = TRUE)

  schema <- paste0("working_", tolower(wsg))

  link::lnk_pipeline_setup(conn, schema)
  link::lnk_pipeline_load(conn, aoi = wsg, config, schema)
  link::lnk_pipeline_prepare(conn, aoi = wsg, config, schema)
  link::lnk_pipeline_break(conn, aoi = wsg, config, schema)
  link::lnk_pipeline_classify(conn, aoi = wsg, config, schema)
  link::lnk_pipeline_connect(conn, aoi = wsg, config, schema)

  pull_comparison_diff(conn, wsg, schema)            # small tibble: wsg × species × km × diff_pct
}

Return: ~10 rows per WSG — wsg × species × habitat_type × link_km × bcfishpass_km × diff_pct. KB-scale. Ships cleanly over SSH when distributed.

Heavy tables stay on worker's localhost DB:

  • fresh.streams_habitat (per-WSG classified segments)
  • working_<wsg>.* (crossings, barriers, overrides, break state)

Abstraction notes

Alternatives considered:

  • Monolithic lnk_habitat() wrapper in the package — hides the DAG, duplicates what targets does (caching, skipping, parallelism), turns variants into if/else branches. Rejected
  • Dynamic branching (pattern = map(wsg)) — less readable, targets by name harder to debug. Use static tar_map()
  • One target per DAG node vs per-phase — per-phase (6 helpers) keeps _targets.R readable; per-DAG-node (15+ targets) is noisy. Per-phase wins readability, slight loss in cache granularity — acceptable
  • _targets.R at repo root — conventional placement. Rejected: puts a pipeline artifact at repo root of a package, mixes concerns. data-raw/ is the canonical R-package home for scripts that use the package to produce outputs
  • _targets.R inside inst/ — would ship with the installed package, confusing. Rejected
  • Separate repo for the comparison pipeline — cleaner separation but adds repo overhead. Rejected for now; data-raw/ is sufficient

Key design decisions:

  • Per-phase helpers (lnk_pipeline_*), not a single wrapper — keeps targets inspectable
  • Helpers are exported package functions — any user can compose their own pipeline
  • Each worker creates its own localhost lnk_db_conn() — no remote DB chatter (matches rtj/docs/distributed-fwapg.md constraint)
  • Schema namespacing working_<wsg> — parallel WSGs don't collide (pre-cleared with rtj)
  • Return from targets = small tibble — no geometry, no wkb (pre-cleared with rtj)
  • Single-host first; distributed run is a one-line controller swap after rtj Phase 4 passes
  • targets, crew in Suggests (not Imports) — pipeline-dev deps, not user-facing

Execution checklist

Split into three PRs for reviewability:

PR 1 — Extract helpers to R/lnk_pipeline_*.R

  • /planning-init → PWF in planning/active/targets_pipeline/
  • Cross-reference rtj/docs/distributed-fwapg.md in findings.md
  • Extract phases from data-raw/compare_bcfishpass.R into R/lnk_pipeline_*.R helpers (one commit per phase)
  • Update existing compare_bcfishpass.R to call the helpers — verify identical output to prior run
  • Tests + examples for each helper
  • /code-check before commit
  • PR with SRED tag

PR 2 — Add _targets.R + per-WSG target fn

  • Write data-raw/compare_bcfishpass_wsg() that calls helpers in order
  • Write data-raw/_targets.R with static tar_map() over 4 WSGs
  • Run tar_make() — verify per-WSG tibbles match current research doc numbers (all species within 5%)
  • Log the run under data-raw/logs/
  • targets, crew → DESCRIPTION Suggests
  • /code-check before commit
  • PR with SRED tag

PR 3 — Retire old script + regenerate DAG

  • Wire tar_mermaid() output into research/bcfishpass_comparison.md DAG section
  • Delete data-raw/compare_bcfishpass.R (git history preserves)
  • Delete data-raw/compare_adms.R if redundant
  • Vignette: "Running the comparison pipeline" — tar_make(), generated DAG, rollup tibble
  • NEWS entry + bump to 0.3.0
  • /code-check before commit
  • PR with SRED tag

Defer (follow-up issue once rtj Phase 4 passes):

  • Swap crew_controller_local() for crew_controller_group() with M1 as cluster worker
  • Test distributed run, verify byte-identical results

Tests required

  • Each lnk_pipeline_* helper: unit tests where stubbable (most integration-heavy; live DB tests with skip_if_not(.lnk_db_available(), ...))
  • compare_bcfishpass_wsg("ADMS", config) returns tibble with expected shape and columns
  • tar_make() completes successfully on all 4 WSGs (ADMS, BULK, BABL, ELKR)
  • Generated DAG in research doc is valid Mermaid (parses without error)
  • Rollup tibble matches per-WSG numbers (no aggregation bugs)
  • Sub-basin iteration pattern used where possible (~5s vs full WSG minutes)

Example must show

  • Why — reproducible, parallelizable pipeline with built-in caching
  • Howcd data-raw && R -e "targets::tar_make()"; targets::tar_visnetwork() shows the graph
  • Produces — per-WSG comparison tibbles + aggregate rollup + regenerated DAG artifact

Not in scope

Cross-refs

Versions

  • fresh: 0.14.0
  • link: main (0.2.0, target 0.3.0)
  • bcfishpass: ea3c5d8
  • fwapg: Docker (FWA 20240830)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions