diff --git a/DESCRIPTION b/DESCRIPTION index ee4e91f..ea15be7 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -26,9 +26,8 @@ Imports: sf Suggests: bookdown, - furrr, - future, gq, + mirai, knitr, rmarkdown, mockery, diff --git a/R/frs_db_conn.R b/R/frs_db_conn.R index 9eccb31..19b0970 100644 --- a/R/frs_db_conn.R +++ b/R/frs_db_conn.R @@ -42,3 +42,32 @@ frs_db_conn <- function( password = password ) } + + +#' Extract connection parameters from an existing connection +#' +#' Reads host, port, dbname, user from a live [RPostgres::Postgres()] +#' connection. Used by [frs_habitat()] to pass connection params to +#' parallel workers so they can open their own connections without +#' depending on `PG_*_SHARE` environment variables. +#' +#' Password is not available from `DBI::dbGetInfo()` (security). Must +#' be provided explicitly via `password` param or the connection will +#' fail for password-authenticated databases. +#' +#' @param conn A [DBI::DBIConnection-class] object. +#' @param password Character. Password for reconnection. Required for +#' password-authenticated databases. Not needed for trust auth or +#' `.pgpass` file. +#' @return Named list with `dbname`, `host`, `port`, `user`, `password`. +#' @noRd +.frs_conn_params <- function(conn, password = "") { + info <- DBI::dbGetInfo(conn) + list( + dbname = info$dbname, + host = info$host, + port = info$port, + user = info$username, + password = password + ) +} diff --git a/R/frs_habitat.R b/R/frs_habitat.R index b4d6ff2..75540b1 100644 --- a/R/frs_habitat.R +++ b/R/frs_habitat.R @@ -4,7 +4,7 @@ #' more watershed groups. Calls [frs_habitat_partition()] per WSG to extract #' the base network and pre-compute breaks, then flattens all (WSG, species) #' pairs and classifies them via [frs_habitat_species()]. Both phases -#' parallelize with [furrr::future_map()] when `workers > 1`. +#' parallelize with [mirai::mirai_map()] when `workers > 1`. #' #' Output tables are WSG-scoped: `working.streams_bulk_co`, #' `working.streams_morr_bt`, etc. @@ -13,13 +13,17 @@ #' @param wsg Character. One or more watershed group codes #' (e.g. `"BULK"`, `c("BULK", "MORR")`). #' @param workers Integer. Number of parallel workers. Default `1` -#' (sequential). Values > 1 require the `furrr` package. Each worker -#' opens its own database connection. Used for both Phase 1 (partition -#' prep across WSGs) and Phase 2 (species classification). +#' (sequential). Values > 1 require the `mirai` package. Each worker +#' opens its own database connection (params extracted from `conn`). +#' Used for both Phase 1 (partition prep across WSGs) and Phase 2 +#' (species classification). #' @param break_sources List of break source specs passed to #' [frs_habitat_access()], or `NULL` for gradient-only. Each spec is a #' list with `table`, and optionally `where`, `label`, `label_col`, #' `label_map`. See [frs_habitat_access()] for details. +#' @param password Character. Database password for parallel workers. +#' Required when `workers > 1` and the database uses password auth. +#' Not needed for trust auth or `.pgpass`. #' @param cleanup Logical. Drop intermediate tables (base network, break #' tables) when done. Default `TRUE`. #' @param verbose Logical. Print progress and timing. Default `TRUE`. @@ -58,6 +62,7 @@ #' } frs_habitat <- function(conn, wsg, workers = 1L, break_sources = NULL, + password = "", cleanup = TRUE, verbose = TRUE) { stopifnot(is.character(wsg), length(wsg) > 0) @@ -106,34 +111,51 @@ frs_habitat <- function(conn, wsg, workers = 1L, # Phase 1: Prepare partitions (extract base, pre-compute breaks) # ========================================================================== workers <- as.integer(workers) - use_furrr <- workers > 1L - if (use_furrr && !requireNamespace("furrr", quietly = TRUE)) { - stop("furrr package required for parallel execution (workers > 1)", + use_parallel <- workers > 1L + if (use_parallel && !requireNamespace("mirai", quietly = TRUE)) { + stop("mirai package required for parallel execution (workers > 1)", call. = FALSE) } - .run_partition <- function(spec) { - p_conn <- if (use_furrr) frs_db_conn() else conn - if (use_furrr) on.exit(DBI::dbDisconnect(p_conn)) - frs_habitat_partition(p_conn, - aoi = spec$aoi, - label = spec$label, - species = spec$species, - params_all = spec$params_all, - params_fresh = spec$params_fresh, - break_sources = spec$break_sources, - verbose = verbose && !use_furrr) - } + # Extract connection params from user's conn for worker reconnection + conn_params <- if (use_parallel) .frs_conn_params(conn, password) else NULL - if (use_furrr) { + if (use_parallel) { if (verbose) cat("\nPhase 1: preparing ", length(wsg_specs), " partition(s) (", workers, " workers)...\n", sep = "") - old_plan <- future::plan(future::multisession, workers = workers) - on.exit(future::plan(old_plan), add = TRUE) - partitions <- furrr::future_map(wsg_specs, .run_partition, - .options = furrr::furrr_options(seed = TRUE, packages = "fresh")) + mirai::daemons(workers) + on.exit(mirai::daemons(0), add = TRUE) + partitions <- mirai::mirai_map(wsg_specs, function(spec) { + library(fresh) + p_conn <- do.call(DBI::dbConnect, + c(list(drv = RPostgres::Postgres()), conn_params)) + on.exit(DBI::dbDisconnect(p_conn)) + frs_habitat_partition(p_conn, + aoi = spec$aoi, label = spec$label, + species = spec$species, params_all = spec$params_all, + params_fresh = spec$params_fresh, + break_sources = spec$break_sources, verbose = FALSE) + }, conn_params = conn_params)[] } else { - partitions <- lapply(wsg_specs, .run_partition) + partitions <- lapply(wsg_specs, function(spec) { + frs_habitat_partition(conn, + aoi = spec$aoi, label = spec$label, + species = spec$species, params_all = spec$params_all, + params_fresh = spec$params_fresh, + break_sources = spec$break_sources, verbose = verbose) + }) + } + + # Check for worker errors + if (use_parallel) { + errs <- vapply(partitions, inherits, logical(1), "miraiError") + if (any(errs)) { + msgs <- vapply(which(errs), function(i) { + paste0(wsg_specs[[i]]$wsg, ": ", conditionMessage(partitions[[i]])) + }, character(1)) + stop("Partition failed:\n ", paste(msgs, collapse = "\n "), + call. = FALSE) + } } # Collect all jobs and cleanup tables @@ -148,36 +170,56 @@ frs_habitat <- function(conn, wsg, workers = 1L, # ========================================================================== # Phase 2: Classify all (partition, species) pairs # ========================================================================== - .run_one <- function(job) { - worker_conn <- if (use_furrr) frs_db_conn() else conn - if (use_furrr) on.exit(DBI::dbDisconnect(worker_conn)) + .run_species <- function(job) { + library(fresh) + worker_conn <- do.call(DBI::dbConnect, + c(list(drv = RPostgres::Postgres()), conn_params)) + on.exit(DBI::dbDisconnect(worker_conn)) t0 <- proc.time() frs_habitat_species(worker_conn, job$species_code, job$base_tbl, - breaks = job$acc_tbl, - breaks_habitat = job$hab_tbl, - params_sp = job$params_sp, - fresh_sp = job$fresh_sp, + breaks = job$acc_tbl, breaks_habitat = job$hab_tbl, + params_sp = job$params_sp, fresh_sp = job$fresh_sp, to = job$to) elapsed <- (proc.time() - t0)["elapsed"] - data.frame( - partition = job$partition, - species_code = job$species_code, + data.frame(partition = job$partition, species_code = job$species_code, access_threshold = job$access_threshold, habitat_threshold = job$habitat_threshold, - elapsed_s = elapsed, - table_name = job$to, - stringsAsFactors = FALSE - ) + elapsed_s = elapsed, table_name = job$to, + stringsAsFactors = FALSE) + } + + .run_species_local <- function(job) { + t0 <- proc.time() + frs_habitat_species(conn, job$species_code, job$base_tbl, + breaks = job$acc_tbl, breaks_habitat = job$hab_tbl, + params_sp = job$params_sp, fresh_sp = job$fresh_sp, + to = job$to) + elapsed <- (proc.time() - t0)["elapsed"] + data.frame(partition = job$partition, species_code = job$species_code, + access_threshold = job$access_threshold, + habitat_threshold = job$habitat_threshold, + elapsed_s = elapsed, table_name = job$to, + stringsAsFactors = FALSE) } - if (use_furrr) { + if (use_parallel) { if (verbose) cat("Phase 2: classifying (", workers, " workers)...\n", sep = "") - result_list <- furrr::future_map(all_jobs, .run_one, - .options = furrr::furrr_options(seed = TRUE, packages = "fresh")) + result_list <- mirai::mirai_map(all_jobs, .run_species, + conn_params = conn_params)[] + # Check for worker errors + errs <- vapply(result_list, inherits, logical(1), "miraiError") + if (any(errs)) { + msgs <- vapply(which(errs), function(i) { + paste0(all_jobs[[i]]$partition, "/", all_jobs[[i]]$species_code, + ": ", conditionMessage(result_list[[i]])) + }, character(1)) + stop("Species classification failed:\n ", + paste(msgs, collapse = "\n "), call. = FALSE) + } } else { result_list <- lapply(all_jobs, function(job) { - res <- .run_one(job) + res <- .run_species_local(job) if (verbose) { cat(" ", res$partition, "/", res$species_code, ": ", round(res$elapsed_s, 1), "s -> ", res$table_name, "\n", sep = "") @@ -189,7 +231,7 @@ frs_habitat <- function(conn, wsg, workers = 1L, results <- do.call(rbind, result_list) rownames(results) <- NULL - if (verbose && use_furrr) { + if (verbose && use_parallel) { for (i in seq_len(nrow(results))) { cat(" ", results$partition[i], "/", results$species_code[i], ": ", round(results$elapsed_s[i], 1), "s -> ", diff --git a/man/frs_habitat.Rd b/man/frs_habitat.Rd index 03916f5..f57ab43 100644 --- a/man/frs_habitat.Rd +++ b/man/frs_habitat.Rd @@ -9,6 +9,7 @@ frs_habitat( wsg, workers = 1L, break_sources = NULL, + password = "", cleanup = TRUE, verbose = TRUE ) @@ -20,15 +21,20 @@ frs_habitat( (e.g. \code{"BULK"}, \code{c("BULK", "MORR")}).} \item{workers}{Integer. Number of parallel workers. Default \code{1} -(sequential). Values > 1 require the \code{furrr} package. Each worker -opens its own database connection. Used for both Phase 1 (partition -prep across WSGs) and Phase 2 (species classification).} +(sequential). Values > 1 require the \code{mirai} package. Each worker +opens its own database connection (params extracted from \code{conn}). +Used for both Phase 1 (partition prep across WSGs) and Phase 2 +(species classification).} \item{break_sources}{List of break source specs passed to \code{\link[=frs_habitat_access]{frs_habitat_access()}}, or \code{NULL} for gradient-only. Each spec is a list with \code{table}, and optionally \code{where}, \code{label}, \code{label_col}, \code{label_map}. See \code{\link[=frs_habitat_access]{frs_habitat_access()}} for details.} +\item{password}{Character. Database password for parallel workers. +Required when \code{workers > 1} and the database uses password auth. +Not needed for trust auth or \code{.pgpass}.} + \item{cleanup}{Logical. Drop intermediate tables (base network, break tables) when done. Default \code{TRUE}.} @@ -44,7 +50,7 @@ Orchestrate the full habitat pipeline for all species present in one or more watershed groups. Calls \code{\link[=frs_habitat_partition]{frs_habitat_partition()}} per WSG to extract the base network and pre-compute breaks, then flattens all (WSG, species) pairs and classifies them via \code{\link[=frs_habitat_species]{frs_habitat_species()}}. Both phases -parallelize with \code{\link[furrr:future_map]{furrr::future_map()}} when \code{workers > 1}. +parallelize with \code{\link[mirai:mirai_map]{mirai::mirai_map()}} when \code{workers > 1}. } \details{ Output tables are WSG-scoped: \code{working.streams_bulk_co}, diff --git a/scripts/habitat/logs/20260404_habitat_benchmark-mirai-parallel.txt b/scripts/habitat/logs/20260404_habitat_benchmark-mirai-parallel.txt new file mode 100644 index 0000000..66823d1 --- /dev/null +++ b/scripts/habitat/logs/20260404_habitat_benchmark-mirai-parallel.txt @@ -0,0 +1,36 @@ +## Habitat pipeline benchmark — mirai parallel +## Date: 2026-04-04 +## DB: local Docker fwapg (port 5432, 32GB shared_buffers) +## Branch: mirai-parallel +## Issue: #77 + +## Setup +# WSGs: ADMS (11,520 segments, 5 species) + ATNA (18,684 segments, 7 species) +# Break sources: falls (barrier_ind = TRUE, label = "blocked") +# Workers: 4 (mirai daemons) +# Optimizations: ltree-enriched breaks, split NOT EXISTS, indexes + +## Results +# Phase 1 (partition prep, parallel): ~25s (2 WSGs on 2 workers) +# Phase 2 (species classification, parallel): ~71s (12 species on 4 workers) +# Total: 96s +# +# Per species: +# ADMS: BT 22s, CH 21s, CO 21s, RB 22s, SK 24s +# ATNA: CH 15s, CM 14s, CO 15s, PK 17s, RB 17s, SK 17s, ST 13s + +## Comparison +# v0.6.0 sequential (ADMS only, 5 species, remote DB): 3,755s (63 min) +# v0.7.0 optimized sequential (ADMS only, 5 species): 86s +# mirai parallel (ADMS + ATNA, 12 species, 4 workers): 96s +# +# Effective throughput: +# Old: 1 species / 750s = 0.08 species/min +# New: 12 species / 96s = 7.5 species/min +# Speedup: ~94x throughput improvement + +## Notes +# - mirai replaces furrr/future — lighter daemons, no serialization overhead +# - Connection params extracted from user's conn, password passed explicitly +# - Workers load fresh package via library(fresh) — must be installed +# - Phase 2 is wall-clock ~71s for 12 species on 4 workers (3 rounds of 4)