Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ Imports:
sf
Suggests:
bookdown,
furrr,
future,
gq,
mirai,
knitr,
rmarkdown,
mockery,
Expand Down
29 changes: 29 additions & 0 deletions R/frs_db_conn.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,32 @@ frs_db_conn <- function(
password = password
)
}


#' Extract connection parameters from an existing connection
#'
#' Reads host, port, dbname, user from a live [RPostgres::Postgres()]
#' connection. Used by [frs_habitat()] to pass connection params to
#' parallel workers so they can open their own connections without
#' depending on `PG_*_SHARE` environment variables.
#'
#' Password is not available from `DBI::dbGetInfo()` (security). Must
#' be provided explicitly via `password` param or the connection will
#' fail for password-authenticated databases.
#'
#' @param conn A [DBI::DBIConnection-class] object.
#' @param password Character. Password for reconnection. Required for
#' password-authenticated databases. Not needed for trust auth or
#' `.pgpass` file.
#' @return Named list with `dbname`, `host`, `port`, `user`, `password`.
#' @noRd
.frs_conn_params <- function(conn, password = "") {
info <- DBI::dbGetInfo(conn)
list(
dbname = info$dbname,
host = info$host,
port = info$port,
user = info$username,
password = password
)
}
130 changes: 86 additions & 44 deletions R/frs_habitat.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#' more watershed groups. Calls [frs_habitat_partition()] per WSG to extract
#' the base network and pre-compute breaks, then flattens all (WSG, species)
#' pairs and classifies them via [frs_habitat_species()]. Both phases
#' parallelize with [furrr::future_map()] when `workers > 1`.
#' parallelize with [mirai::mirai_map()] when `workers > 1`.
#'
#' Output tables are WSG-scoped: `working.streams_bulk_co`,
#' `working.streams_morr_bt`, etc.
Expand All @@ -13,13 +13,17 @@
#' @param wsg Character. One or more watershed group codes
#' (e.g. `"BULK"`, `c("BULK", "MORR")`).
#' @param workers Integer. Number of parallel workers. Default `1`
#' (sequential). Values > 1 require the `furrr` package. Each worker
#' opens its own database connection. Used for both Phase 1 (partition
#' prep across WSGs) and Phase 2 (species classification).
#' (sequential). Values > 1 require the `mirai` package. Each worker
#' opens its own database connection (params extracted from `conn`).
#' Used for both Phase 1 (partition prep across WSGs) and Phase 2
#' (species classification).
#' @param break_sources List of break source specs passed to
#' [frs_habitat_access()], or `NULL` for gradient-only. Each spec is a
#' list with `table`, and optionally `where`, `label`, `label_col`,
#' `label_map`. See [frs_habitat_access()] for details.
#' @param password Character. Database password for parallel workers.
#' Required when `workers > 1` and the database uses password auth.
#' Not needed for trust auth or `.pgpass`.
#' @param cleanup Logical. Drop intermediate tables (base network, break
#' tables) when done. Default `TRUE`.
#' @param verbose Logical. Print progress and timing. Default `TRUE`.
Expand Down Expand Up @@ -58,6 +62,7 @@
#' }
frs_habitat <- function(conn, wsg, workers = 1L,
break_sources = NULL,
password = "",
cleanup = TRUE, verbose = TRUE) {
stopifnot(is.character(wsg), length(wsg) > 0)

Expand Down Expand Up @@ -106,34 +111,51 @@ frs_habitat <- function(conn, wsg, workers = 1L,
# Phase 1: Prepare partitions (extract base, pre-compute breaks)
# ==========================================================================
workers <- as.integer(workers)
use_furrr <- workers > 1L
if (use_furrr && !requireNamespace("furrr", quietly = TRUE)) {
stop("furrr package required for parallel execution (workers > 1)",
use_parallel <- workers > 1L
if (use_parallel && !requireNamespace("mirai", quietly = TRUE)) {
stop("mirai package required for parallel execution (workers > 1)",
call. = FALSE)
}

.run_partition <- function(spec) {
p_conn <- if (use_furrr) frs_db_conn() else conn
if (use_furrr) on.exit(DBI::dbDisconnect(p_conn))
frs_habitat_partition(p_conn,
aoi = spec$aoi,
label = spec$label,
species = spec$species,
params_all = spec$params_all,
params_fresh = spec$params_fresh,
break_sources = spec$break_sources,
verbose = verbose && !use_furrr)
}
# Extract connection params from user's conn for worker reconnection
conn_params <- if (use_parallel) .frs_conn_params(conn, password) else NULL

if (use_furrr) {
if (use_parallel) {
if (verbose) cat("\nPhase 1: preparing ", length(wsg_specs),
" partition(s) (", workers, " workers)...\n", sep = "")
old_plan <- future::plan(future::multisession, workers = workers)
on.exit(future::plan(old_plan), add = TRUE)
partitions <- furrr::future_map(wsg_specs, .run_partition,
.options = furrr::furrr_options(seed = TRUE, packages = "fresh"))
mirai::daemons(workers)
on.exit(mirai::daemons(0), add = TRUE)
partitions <- mirai::mirai_map(wsg_specs, function(spec) {
library(fresh)
p_conn <- do.call(DBI::dbConnect,
c(list(drv = RPostgres::Postgres()), conn_params))
on.exit(DBI::dbDisconnect(p_conn))
frs_habitat_partition(p_conn,
aoi = spec$aoi, label = spec$label,
species = spec$species, params_all = spec$params_all,
params_fresh = spec$params_fresh,
break_sources = spec$break_sources, verbose = FALSE)
}, conn_params = conn_params)[]
} else {
partitions <- lapply(wsg_specs, .run_partition)
partitions <- lapply(wsg_specs, function(spec) {
frs_habitat_partition(conn,
aoi = spec$aoi, label = spec$label,
species = spec$species, params_all = spec$params_all,
params_fresh = spec$params_fresh,
break_sources = spec$break_sources, verbose = verbose)
})
}

# Check for worker errors
if (use_parallel) {
errs <- vapply(partitions, inherits, logical(1), "miraiError")
if (any(errs)) {
msgs <- vapply(which(errs), function(i) {
paste0(wsg_specs[[i]]$wsg, ": ", conditionMessage(partitions[[i]]))
}, character(1))
stop("Partition failed:\n ", paste(msgs, collapse = "\n "),
call. = FALSE)
}
}

# Collect all jobs and cleanup tables
Expand All @@ -148,36 +170,56 @@ frs_habitat <- function(conn, wsg, workers = 1L,
# ==========================================================================
# Phase 2: Classify all (partition, species) pairs
# ==========================================================================
.run_one <- function(job) {
worker_conn <- if (use_furrr) frs_db_conn() else conn
if (use_furrr) on.exit(DBI::dbDisconnect(worker_conn))
.run_species <- function(job) {
library(fresh)
worker_conn <- do.call(DBI::dbConnect,
c(list(drv = RPostgres::Postgres()), conn_params))
on.exit(DBI::dbDisconnect(worker_conn))
t0 <- proc.time()
frs_habitat_species(worker_conn, job$species_code, job$base_tbl,
breaks = job$acc_tbl,
breaks_habitat = job$hab_tbl,
params_sp = job$params_sp,
fresh_sp = job$fresh_sp,
breaks = job$acc_tbl, breaks_habitat = job$hab_tbl,
params_sp = job$params_sp, fresh_sp = job$fresh_sp,
to = job$to)
elapsed <- (proc.time() - t0)["elapsed"]
data.frame(
partition = job$partition,
species_code = job$species_code,
data.frame(partition = job$partition, species_code = job$species_code,
access_threshold = job$access_threshold,
habitat_threshold = job$habitat_threshold,
elapsed_s = elapsed,
table_name = job$to,
stringsAsFactors = FALSE
)
elapsed_s = elapsed, table_name = job$to,
stringsAsFactors = FALSE)
}

.run_species_local <- function(job) {
t0 <- proc.time()
frs_habitat_species(conn, job$species_code, job$base_tbl,
breaks = job$acc_tbl, breaks_habitat = job$hab_tbl,
params_sp = job$params_sp, fresh_sp = job$fresh_sp,
to = job$to)
elapsed <- (proc.time() - t0)["elapsed"]
data.frame(partition = job$partition, species_code = job$species_code,
access_threshold = job$access_threshold,
habitat_threshold = job$habitat_threshold,
elapsed_s = elapsed, table_name = job$to,
stringsAsFactors = FALSE)
}

if (use_furrr) {
if (use_parallel) {
if (verbose) cat("Phase 2: classifying (", workers, " workers)...\n",
sep = "")
result_list <- furrr::future_map(all_jobs, .run_one,
.options = furrr::furrr_options(seed = TRUE, packages = "fresh"))
result_list <- mirai::mirai_map(all_jobs, .run_species,
conn_params = conn_params)[]
# Check for worker errors
errs <- vapply(result_list, inherits, logical(1), "miraiError")
if (any(errs)) {
msgs <- vapply(which(errs), function(i) {
paste0(all_jobs[[i]]$partition, "/", all_jobs[[i]]$species_code,
": ", conditionMessage(result_list[[i]]))
}, character(1))
stop("Species classification failed:\n ",
paste(msgs, collapse = "\n "), call. = FALSE)
}
} else {
result_list <- lapply(all_jobs, function(job) {
res <- .run_one(job)
res <- .run_species_local(job)
if (verbose) {
cat(" ", res$partition, "/", res$species_code, ": ",
round(res$elapsed_s, 1), "s -> ", res$table_name, "\n", sep = "")
Expand All @@ -189,7 +231,7 @@ frs_habitat <- function(conn, wsg, workers = 1L,
results <- do.call(rbind, result_list)
rownames(results) <- NULL

if (verbose && use_furrr) {
if (verbose && use_parallel) {
for (i in seq_len(nrow(results))) {
cat(" ", results$partition[i], "/", results$species_code[i], ": ",
round(results$elapsed_s[i], 1), "s -> ",
Expand Down
14 changes: 10 additions & 4 deletions man/frs_habitat.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions scripts/habitat/logs/20260404_habitat_benchmark-mirai-parallel.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
## Habitat pipeline benchmark — mirai parallel
## Date: 2026-04-04
## DB: local Docker fwapg (port 5432, 32GB shared_buffers)
## Branch: mirai-parallel
## Issue: #77

## Setup
# WSGs: ADMS (11,520 segments, 5 species) + ATNA (18,684 segments, 7 species)
# Break sources: falls (barrier_ind = TRUE, label = "blocked")
# Workers: 4 (mirai daemons)
# Optimizations: ltree-enriched breaks, split NOT EXISTS, indexes

## Results
# Phase 1 (partition prep, parallel): ~25s (2 WSGs on 2 workers)
# Phase 2 (species classification, parallel): ~71s (12 species on 4 workers)
# Total: 96s
#
# Per species:
# ADMS: BT 22s, CH 21s, CO 21s, RB 22s, SK 24s
# ATNA: CH 15s, CM 14s, CO 15s, PK 17s, RB 17s, SK 17s, ST 13s

## Comparison
# v0.6.0 sequential (ADMS only, 5 species, remote DB): 3,755s (63 min)
# v0.7.0 optimized sequential (ADMS only, 5 species): 86s
# mirai parallel (ADMS + ATNA, 12 species, 4 workers): 96s
#
# Effective throughput:
# Old: 1 species / 750s = 0.08 species/min
# New: 12 species / 96s = 7.5 species/min
# Speedup: ~94x throughput improvement

## Notes
# - mirai replaces furrr/future — lighter daemons, no serialization overhead
# - Connection params extracted from user's conn, password passed explicitly
# - Workers load fresh package via library(fresh) — must be installed
# - Phase 2 is wall-clock ~71s for 12 species on 4 workers (3 rounds of 4)
Loading