Skip to content

Commit

Permalink
Add support for capturing/relaying standard error (stderr = TRUE) [#232]
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikBengtsson committed Sep 12, 2018
1 parent 5c69ae7 commit 7e5572e
Show file tree
Hide file tree
Showing 20 changed files with 207 additions and 55 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export("%lazy%")
export("%packages%")
export("%plan%")
export("%seed%")
export("%stderr%")
export("%stdout%")
export("%tweak%")
export(ClusterFuture)
Expand Down
11 changes: 8 additions & 3 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ NEW FEATURES:
parallel::makeCluster(n, type = "MPI") but that also attempts to workaound
issues where parallel::stopCluster() causes R to stall.

o [BETA] Now resolved() for ClusterFuture is non-blocking also for clusters
of type MPIcluster as created by parallel::makeCluster(..., type = "MPI").

o The error reported when failing to retrieve the results of a future
evaluated on a localhost cluster/multisession worker or a forked/multicore
worker is now more informative. Specifically, it mentions whether the worker
Expand All @@ -27,6 +24,14 @@ NEW FEATURES:
controlling whether the cluster should be automatically stopped when
garbage collected or not.

o [BETA] Now resolved() for ClusterFuture is non-blocking also for clusters
of type MPIcluster as created by parallel::makeCluster(..., type = "MPI").

o [BETA] Analogously to the standard output, the standard error can now be
captured and relayed by using stderr = TRUE. However, the default is
stderr = NA, which leaves the standard error alone. This is due to a
limitation in R preventing us from reliably capturing it in all cases.

BUG FIXES:

o On Windows, plan(multiprocess) would not initiate the workers. Instead
Expand Down
76 changes: 61 additions & 15 deletions R/Future-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
#' @param envir The \link{environment} from where global objects should be
#' identified.
#'
#' @param substitute If TRUE, argument \code{expr} is
#' @param substitute If \code{TRUE}, argument \code{expr} is
#' \code{\link[base]{substitute}()}:ed, otherwise not.
#'
#' @param stdout If TRUE (default), then the standard output is captured,
#' and re-outputted when \code{value()} is called.
#' If FALSE, any output is silenced (by sinking it to the null device as
#' it is outputted).
#' If NA (not recommended), output is \emph{not} intercepted.
#' @param stdout,stderr If \code{TRUE}, then the standard output (error) is
#' captured, and re-outputted when \code{value()} is called.
#' If \code{FALSE}, any output is silenced (by sinking it to the null device).
#' If \code{NA}, the output is \emph{not} intercepted.
#'
#' @param globals (optional) a logical, a character vector, or a named list
#' to control how globals are handled.
Expand Down Expand Up @@ -67,7 +66,7 @@
#' @export
#' @keywords internal
#' @name Future-class
Future <- function(expr = NULL, envir = parent.frame(), substitute = FALSE, stdout = TRUE, globals = NULL, packages = NULL, seed = NULL, lazy = FALSE, local = TRUE, gc = FALSE, earlySignal = FALSE, label = NULL, ...) {
Future <- function(expr = NULL, envir = parent.frame(), substitute = FALSE, stdout = TRUE, stderr = NA, globals = NULL, packages = NULL, seed = NULL, lazy = FALSE, local = TRUE, gc = FALSE, earlySignal = FALSE, label = NULL, ...) {
if (substitute) expr <- substitute(expr)

if (!is.null(seed)) {
Expand All @@ -83,6 +82,7 @@ Future <- function(expr = NULL, envir = parent.frame(), substitute = FALSE, stdo
}

stop_if_not(is.logical(stdout), length(stdout) == 1L)
stop_if_not(is.logical(stderr), length(stderr) == 1L)

if (!is.null(globals)) {
stop_if_not(is.list(globals),
Expand All @@ -109,6 +109,7 @@ Future <- function(expr = NULL, envir = parent.frame(), substitute = FALSE, stdo
core$expr <- expr
core$envir <- envir
core$stdout <- stdout
core$stderr <- stderr
core$globals <- globals
core$packages <- packages
core$seed <- seed
Expand Down Expand Up @@ -156,6 +157,7 @@ print.Future <- function(x, ...) {
cat(sprintf("Local evaluation: %s\n", x$local))
cat(sprintf("Environment: %s\n", capture.output(x$envir)))
cat(sprintf("Capture standard output: %s\n", x$stdout))
cat(sprintf("Capture standard error: %s\n", x$stderr))

## FIXME: Add method globals_of() for Future such that it's possible
## also for SequentialFuture to return something here. /HB 2017-05-17
Expand Down Expand Up @@ -359,8 +361,8 @@ result.Future <- function(future, ...) {
#'
#' @param future A \link{Future}.
#'
#' @param stdout If TRUE, any captured standard output is outputted,
#' otherwise not.
#' @param stdout,stderr If TRUE, any captured standard output (error) is
#' outputted, otherwise not.
#'
#' @param signal A logical specifying whether (\link[base]{conditions})
#' should signaled or be returned as values.
Expand All @@ -377,7 +379,7 @@ result.Future <- function(future, ...) {
#' @rdname value
#' @export
#' @export value
value.Future <- function(future, stdout = TRUE, signal = TRUE, ...) {
value.Future <- function(future, stdout = TRUE, stderr = TRUE, signal = TRUE, ...) {
if (future$state == "created") {
future <- run(future)
}
Expand All @@ -401,7 +403,13 @@ value.Future <- function(future, stdout = TRUE, signal = TRUE, ...) {
inherits(result$stdout, "character")) {
cat(paste(result$stdout, collapse = "\n"))
}


## Output captured stderr output?
if (stdout && length(result$stderr) > 0 &&
inherits(result$stderr, "character")) {
cat(paste(result$stderr, collapse = "\n"), file = stderr())
}

## Signal captured conditions?
condition <- result$condition
if (inherits(condition, "condition")) {
Expand Down Expand Up @@ -471,7 +479,7 @@ resolved.Future <- function(x, ...) {
getExpression <- function(future, ...) UseMethod("getExpression")

#' @export
getExpression.Future <- function(future, local = future$local, stdout = future$stdout, mc.cores = NULL, ...) {
getExpression.Future <- function(future, local = future$local, stdout = future$stdout, stderr = future$stderr, mc.cores = NULL, ...) {
debug <- getOption("future.debug", FALSE)
## mdebug("getExpression() ...")

Expand Down Expand Up @@ -619,7 +627,7 @@ getExpression.Future <- function(future, local = future$local, stdout = future$s
})
} ## if (length(strategiesR) > 0L)

expr <- makeExpression(expr = future$expr, local = local, stdout = stdout, enter = enter, exit = exit, version = version)
expr <- makeExpression(expr = future$expr, local = local, stdout = stdout, stderr = stderr, enter = enter, exit = exit, version = version)
if (getOption("future.debug", FALSE)) {
print(expr)
}
Expand All @@ -630,7 +638,7 @@ getExpression.Future <- function(future, local = future$local, stdout = future$s
} ## getExpression()


makeExpression <- function(expr, local = TRUE, stdout = TRUE, globals.onMissing = getOption("future.globals.onMissing", "ignore"), enter = NULL, exit = NULL, version = "1.7") {
makeExpression <- function(expr, local = TRUE, stdout = TRUE, stderr = TRUE, globals.onMissing = getOption("future.globals.onMissing", "ignore"), enter = NULL, exit = NULL, version = "1.7") {
## Evaluate expression in a local() environment?
if (local) {
expr <- bquote(local(.(expr)))
Expand Down Expand Up @@ -710,7 +718,31 @@ makeExpression <- function(expr, local = TRUE, stdout = TRUE, globals.onMissing
close(...future.stdout)
}, add = TRUE)
}


## Capture standard error?
if (is.na(.(stderr))) { ## stderr = NA
## Don't capture, but also don't block any output
} else {
if (.(stderr)) { ## stderr = TRUE
## Capture all output
## NOTE: Capturing to a raw connection is much more efficient
## than to a character connection, cf.
## https://www.jottr.org/2014/05/26/captureoutput/
...future.stderr <- rawConnection(raw(0L), open = "w")
} else { ## stderr = FALSE
## Silence all output by sending it to the void
...future.stderr <- file(
switch(.Platform$OS.type, windows = "NUL", "/dev/null"),
open = "w"
)
}
sink(...future.stderr, type = "message", split = FALSE)
on.exit(if (!is.null(...future.stderr)) {
sink(type = "outerr", split = FALSE)
close(...future.stderr)
}, add = TRUE)
}

...future.result <- tryCatch({
...future.value <- .(expr)
## A FutureResult object (without requiring the future package)
Expand Down Expand Up @@ -753,6 +785,20 @@ makeExpression <- function(expr, local = TRUE, stdout = TRUE, globals.onMissing
...future.stdout <- NULL
}

if (is.na(.(stderr))) {
} else {
sink(type = "message", split = FALSE)
if (.(stderr)) {
...future.result$stderr <- rawToChar(
rawConnectionValue(...future.stderr)
)
} else {
...future.result["stderr"] <- list(NULL)
}
close(...future.stderr)
...future.stderr <- NULL
}

...future.result
})
} else {
Expand Down
46 changes: 42 additions & 4 deletions R/future.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
#' on these settings and there is no need to modify the code when
#' switching from, say, sequential to parallel processing.
#'
#' @inheritParams Future-class
#'
#' @param expr,value An \R \link[base]{expression}.
#'
#' @inheritParams Future-class
#'
#' @param stdout,stderr If \code{TRUE}, then the standard output (error) is
#' captured, and re-outputted when \code{value()} is called.
#' If \code{FALSE}, any output is silenced (by sinking it to the null device).
#' If \code{NA}, the output is \emph{not} intercepted.
#'
#' @param evaluator The actual function that evaluates
#' the future expression and returns a \link{Future}.
#' The evaluator function should accept all of the same
Expand Down Expand Up @@ -178,6 +183,36 @@
#' y \%<-\% { median(x) } \%globals\% list(x = x, median = stats::median)
#' }
#'
#' @section Standard output and standard error:
#' If \code{stdout = TRUE} (default), then any standard output produced when
#' a future is evaluated is captured and re-outputted ("relayed") when
#' \code{value()} is called.
#' If \code{stdout = FALSE}, then the output is captured and dropped (by
#' directly sending it to the null device).
#' If \code{stdout = NA}, then the standard output is left alone, where the
#' exact behavior should be considered unknown / dependent on the backend.
#' For example,
#' \preformatted{
#' x <- rnorm(1000)
#' f <- future({
#' str(x)
#' median(x)
#' })
#' y <- value(y)
#' ## num [1:1000] -0.369 0.837 -0.422 -1.616 -1.054 ...
#' ## [1] -0.01390193
#' }
#'
#' Analogously, the standard error is captured and relayed when
#' \code{stderr = TRUE}, dropped when \code{stderr = FALSE}, and
#' left alone when \code{stderr = NA} (default).
#'
#' \emph{Known limitations of \code{stderr = TRUE}}: Any call in the future
#' expression to code that, directly or indirectly, captures and sinks the
#' standard error will cause any further standard error to be lost. This
#' is a known limitation in \R and stems from the fact that only one
#' "message" sink can be active at any time.
#'
#' @example incl/future.R
#'
#'
Expand All @@ -190,14 +225,17 @@
#' @aliases futureCall
#' @export
#' @name future
future <- function(expr, envir = parent.frame(), substitute = TRUE, globals = TRUE, packages = NULL, lazy = FALSE, seed = NULL, evaluator = plan("next"), ...) {
future <- function(expr, envir = parent.frame(), substitute = TRUE, stdout = TRUE, stderr = NA, globals = TRUE, packages = NULL, lazy = FALSE, seed = NULL, evaluator = plan("next"), ...) {
if (substitute) expr <- substitute(expr)

if (!is.function(evaluator)) {
stop("Argument 'evaluator' must be a function: ", typeof(evaluator))
}

future <- evaluator(expr, envir = envir, substitute = FALSE, lazy = lazy, seed = seed, globals = globals, packages = packages, ...)
future <- evaluator(expr, envir = envir, substitute = FALSE,
stdout = stdout, stderr = stderr,
globals = globals, packages = packages,
lazy = lazy, seed = seed, ...)

## Assert that a future was returned
if (!inherits(future, "Future")) {
Expand Down
28 changes: 24 additions & 4 deletions R/stdout_OP.R
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
#' Control whether standard output should be captured or not
#' Control whether standard output (error) should be captured or not
#'
#' @usage fassignment \%stdout\% capture
#' @usage
#' fassignment \%stdout\% capture
#' fassignment \%stderr\% capture
#'
#' @inheritParams multiprocess
#'
#'
#' @param fassignment The future assignment, e.g.
#' \code{x \%<-\% \{ expr \}}.
#'
#' @param capture If TRUE, the standard output will be captured, otherwise not.
#' @param capture If \code{TRUE}, then the standard output (error) is
#' captured, and re-outputted when the future is resolved.
#' If \code{FALSE}, any output is silenced (by sinking it to the null device).
#' If \code{NA}, output is \emph{not} intercepted.
#'
#' @aliases %stderr%
#' @export
`%stdout%` <- function(fassignment, capture) {
fassignment <- substitute(fassignment)
Expand All @@ -21,3 +27,17 @@

eval(fassignment, envir = envir, enclos = baseenv())
}


#' @export
`%stderr%` <- function(fassignment, capture) {
fassignment <- substitute(fassignment)
envir <- parent.frame(1)

## Temporarily set 'lazy' argument
args <- getOption("future.disposable", list())
args["stderr"] <- list(capture)
options(future.disposable = args)

eval(fassignment, envir = envir, enclos = baseenv())
}
2 changes: 1 addition & 1 deletion man/ClusterFuture-class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/ConstantFuture-class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 8 additions & 9 deletions man/Future-class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/MulticoreFuture-class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/MultiprocessFuture-class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/UniprocessFuture-class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7e5572e

Please sign in to comment.