Skip to content

Commit

Permalink
Use custom 'mclapply' extended for progress reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
dcnorris committed Jul 24, 2021
1 parent 14e8bbf commit 3e561cc
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 16 deletions.
8 changes: 5 additions & 3 deletions DESCRIPTION
Expand Up @@ -2,7 +2,7 @@ Package: precautionary
Type: Package
Title: Safety Diagnostics for Dose-Escalation Trial Designs
Version: 0.2.6
Date: 2021-07-15
Date: 2021-07-24
Authors@R: c(person("David C.", "Norris"
, role = c("aut", "cre", "cph")
, email = "david@precisionmethods.guru"
Expand All @@ -20,7 +20,8 @@ Depends:
data.table,
R6,
R (>= 4.0.0)
Imports: methods, dplyr, rlang, stringr, knitr, kableExtra, microbenchmark, dfcrm, BOIN
Imports: methods, dplyr, rlang, stringr, knitr, kableExtra, microbenchmark, dfcrm, BOIN,
parallelly
Suggests:
dtpcrm,
rmarkdown,
Expand Down Expand Up @@ -55,12 +56,13 @@ Collate:
'crm.R'
'data.R'
'dtp.R'
'dutycycle.R'
'enhance.R'
'exact.R'
'extendr-wrappers.R'
'forkit.R'
'hyperprior.R'
'paraplot.R'
'mclapply.R'
'precautionary-package.R'
'toxicity_generators.R'
'simulate_trials.R'
Expand Down
2 changes: 1 addition & 1 deletion NAMESPACE
Expand Up @@ -74,8 +74,8 @@ importFrom(knitr,kable)
importFrom(magrittr,"%>%")
importFrom(microbenchmark,microbenchmark)
importFrom(parallel,mccollect)
importFrom(parallel,mclapply)
importFrom(parallel,mcparallel)
importFrom(parallelly,availableCores)
importFrom(rlang,.data)
importFrom(stats,addmargins)
importFrom(stats,aggregate)
Expand Down
10 changes: 6 additions & 4 deletions R/cpe.R
Expand Up @@ -76,15 +76,17 @@ Cpe <- R6Class("Cpe",
#' @param root_dose The starting dose for tree of paths
#' @param cohort_sizes Integer vector giving sizes of future cohorts,
#' its length being the maximum number of cohorts to look ahead.
#' @param ... Parameters passed ultimately to `parallel::mclapply`
#' or (when `prog` is not missing) to an similar internal function
#' @param ... Parameters passed ultimately to `mclapply`, presently
#' an unexported, specially adapted version of `parallel::mclapply`
#' that implements progress reporting.
#' @param prog A function of a single integer, the current cumulative
#' path count, to be used for progress reporting
#' @param unroll Integer; how deep to unroll path tree for parallelism
#' @return Self, invisibly
#' @seealso `path_matrix`, `path_table`, `path_array`.
#' @importFrom parallel mclapply
#' @note If the `parallel` package were to incorporate the necessary
#' changes to `mclapply`, I could restore the following import!
#' #@importFrom parallel mclapply
trace_paths = function(root_dose, cohort_sizes, ..., prog = NULL, unroll = 4){
stopifnot("Only constant cohorts_sizes are supported currently" =
length(unique(cohort_sizes))==1)
Expand Down Expand Up @@ -164,7 +166,7 @@ Cpe <- R6Class("Cpe",
paths.(n, x, unroll+1, path_m, cohort_sizes, par_t0 = par_t0)
}
if (!is.null(prog))
cpe_parts <- proglapply(ppe, FUN, init = sum(stopped), prog = prog, ...)
cpe_parts <- mclapply(ppe, FUN, proginit = sum(stopped), progreport = prog, ...)
else
cpe_parts <- mclapply(ppe, FUN, ...)
cpe <- c(cpe_stopped, do.call(c, cpe_parts))
Expand Down
13 changes: 8 additions & 5 deletions R/paraplot.R → R/dutycycle.R
@@ -1,12 +1,14 @@
#' Plot duty cycle from performance report on a parallel computation
#'
#' This is intended for application to the tables which get attached
#' to `Cpe` instances after invocation the `$trace_paths()` method.
#' to `Cpe` instances after invocation of the `$trace_paths()` method.
#' But any table with columns named `pid`, `t1` and `t2` suffices.
#' @param perftab A performance table with columns:
#' * `pid` An integer, character or factor with process ids
#' * `t1`, `t2` Task start and end times, in milliseconds
#' @param ... Optional parameters passed along to `lattice::xyplot`
#' @param FUN A function for summarizing all workers' duty cycles
#' @param layout Trellis layout, passed to `xyplot`
#' @return An `xyplot` with a duty-cycle panel for each worker,
#' plus an overall average
#' @examples
Expand All @@ -30,7 +32,7 @@
#' plot_dutycycle(mod$performance)
#' }
#' @export
plot_dutycycle <- function(perftab, ...) {
plot_dutycycle <- function(perftab, ..., FUN = c(workers = sum), layout = c(1, NA)) {
jpt <- perftab
jpt$job <- seq(nrow(jpt)) # TODO: Better if perftab supplies names

Expand All @@ -45,8 +47,8 @@ plot_dutycycle <- function(perftab, ...) {
m[,pid] <- m[,pid] | (jpt$t1[j] < T & T < jpt$t2[j])
}

## Add an 'avg' column
m <- addmargins(m, margin = 2, FUN = c(avg = mean))
## Add a summary column
m <- addmargins(m, margin = 2, FUN = FUN)

## Reshape this matrix to 'tall' form yielding series to plot
dt <- data.table(t = T
Expand All @@ -55,10 +57,11 @@ plot_dutycycle <- function(perftab, ...) {
)

lattice::xyplot(active ~ t | pid, data = dt, type='l'
, layout=c(1, NA), as.table=TRUE
, layout=layout, as.table=TRUE
, ylab = ""
, xlab = "Sys.time() since parallelization [ms]"
, scales = list(y = list(at = NULL))
, par.strip.text = list(cex = 0.6)
, ...
)
}
1 change: 1 addition & 0 deletions R/forkit.R
Expand Up @@ -43,6 +43,7 @@ testprog <- function(C = 13, ...) {

#' @importFrom stats na.exclude
#' @importFrom parallel mcparallel mccollect
#' @importFrom parallelly availableCores
#' @importFrom utils getFromNamespace
proglapply <- function(X, FUN
, parcellator = NULL # use default defined in body
Expand Down
41 changes: 41 additions & 0 deletions R/mclapply.R
@@ -1,3 +1,5 @@
# -*- ess-indent-offset: 4 -*-

## https://raw.githubusercontent.com/wch/r-source/trunk/src/library/parallel/R/unix/mclapply.R
# File src/library/parallel/R/unix/mclapply.R
# Part of the R package, https://www.R-project.org
Expand All @@ -19,11 +21,38 @@

### Derived from multicore version 0.1-6 by Simon Urbanek

## Modified by David C. Norris, for progress reporting
mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE, mc.allow.recursive = TRUE,
progreport = NULL,
progmetric = length,
proginit = 0L,
affinity.list = NULL)
{
.check_ncores <- getFromNamespace('.check_ncores', 'parallel')
isChild <- getFromNamespace('isChild', 'parallel')
mcaffinity <- parallel::mcaffinity
prepareCleanup <- getFromNamespace('prepareCleanup', 'parallel')
processID <- getFromNamespace('processID', 'parallel')
mcfork <- getFromNamespace('mcfork', 'parallel')
mcexit <- getFromNamespace('mcexit', 'parallel')
sendMaster <- getFromNamespace('sendMaster', 'parallel')
selectChildren <- getFromNamespace('selectChildren', 'parallel')
readChild <- getFromNamespace('readChild', 'parallel')
cleanup <- getFromNamespace('cleanup', 'parallel')
mc.reset.stream <- parallel::mc.reset.stream
mc.advance.stream <- getFromNamespace('mc.advance.stream', 'parallel')
mc.set.stream <- getFromNamespace('mc.set.stream', 'parallel')
closeStdout <- getFromNamespace('closeStdout', 'parallel')

## As a convenience, allow client code to omit `mc.preschedule = FALSE`
## when requesting progress reporting. (Typically, we will be requesting
## progress reports only in those circumstances where the workload lacks
## the uniformity & predictability needed for efficient prescheduling.)
if (missing(mc.preschedule) && !is.null(progreport))
mc.preschedule <- FALSE

cores <- as.integer(mc.cores)
if((is.na(cores) || cores < 1L) && is.null(affinity.list))
stop("'mc.cores' must be >= 1")
Expand Down Expand Up @@ -57,6 +86,10 @@ mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
if (!mc.preschedule) { # sequential (non-scheduled)
FUN <- match.fun(FUN)
if (length(X) <= cores && is.null(affinity.list)) { # we can use one-shot parallel
## TODO: Implement progress reporting in this case, too. This is reasonable
## because the jobs might have varying durations, and it will be nice
## to get confirmation of progress from the moment earliest results
## become available.
jobs <- lapply(seq_along(X),
function(i) mcparallel(FUN(X[[i]], ...),
name = names(X)[i],
Expand Down Expand Up @@ -111,6 +144,7 @@ mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
jobsp <- processID(jobs)
has.errors <- 0L
delivered.result <- 0L
prog.sofar <- proginit
while (!all(fin)) {
s <- selectChildren(jobs[!is.na(jobsp)], -1)
if (is.null(s)) break # no children -> no hope (should not happen)
Expand All @@ -129,6 +163,11 @@ mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
## assignment would remove it from the list
if (!is.null(child.res)) res[[ci]] <- child.res
delivered.result <- delivered.result + 1L
if(!is.null(progreport)) {
prog <- progmetric(child.res)
prog.sofar <- prog.sofar + prog
progreport(prog.sofar)
}
} else {
fin[ci] <- TRUE
## the job has finished, so we must not run
Expand Down Expand Up @@ -166,6 +205,8 @@ mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
}

## mc.preschedule = TRUE from here on.
if(!is.null(progreport))
warning("'mc.preschedule' must be false if progress reporting is used")
if(!is.null(affinity.list))
warning("'mc.preschedule' must be false if 'affinity.list' is used")
sindex <- lapply(seq_len(cores),
Expand Down
8 changes: 6 additions & 2 deletions man/Cpe-class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion tests/manual/speed.R
Expand Up @@ -467,7 +467,11 @@ parallax <- function(C = 11:16, unroll = 4, ...) {
C <- kraken$C[i]
calmod$skeleton(calmod$skeleton()) # reset skeleton to clear cache for honest timing
time <- system.time(
cpe.look <<- calmod$trace_paths(1, rep(2,C), unroll = unroll, ...))
cpe.look <<- calmod$trace_paths(1, rep(2,C), unroll = unroll
, progreport = function(p) NULL
##cat(sprintf("J = %d so far.\n", p))
, ...)
)
kraken$elapsed[i] <- time['elapsed']
kraken$J[i] <- dim(calmod$path_matrix())[1]
print(kraken)
Expand Down

0 comments on commit 3e561cc

Please sign in to comment.