From f8a9afe76428d7d8245d482b20ff513e22ac5f0c Mon Sep 17 00:00:00 2001 From: hb Date: Fri, 23 Feb 2018 14:37:17 -0800 Subject: [PATCH] Introducing FutureResult for returning richer sets of information from resolved futures [#25, #59, #67, #154, #188 #199, #200]. This also fixes #199 and #200. --- NAMESPACE | 3 + NEWS | 14 ++- R/ClusterFuture-class.R | 2 +- R/Future-class.R | 141 ++++++++++++++++++++++++---- R/FutureEvaluationCondition-class.R | 10 +- R/FutureResult-class.R | 34 +++++++ R/MulticoreFuture-class.R | 2 +- R/UniprocessFuture-class.R | 2 +- R/backtrace.R | 7 +- R/signalEarly.R | 59 ++++++------ man/FutureResult.Rd | 31 ++++++ man/result.Rd | 25 +++++ tests/backtrace.R | 1 + tests/cluster.R | 2 +- tests/multicore.R | 14 +++ 15 files changed, 288 insertions(+), 59 deletions(-) create mode 100644 R/FutureResult-class.R create mode 100644 man/FutureResult.Rd create mode 100644 man/result.Rd diff --git a/NAMESPACE b/NAMESPACE index c37ba85d..603fedc3 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -49,6 +49,7 @@ S3method(resolved,UniprocessFuture) S3method(resolved,default) S3method(resolved,environment) S3method(resolved,list) +S3method(result,Future) S3method(run,ClusterFuture) S3method(run,Future) S3method(run,MulticoreFuture) @@ -84,6 +85,7 @@ export(FutureEvaluationMessage) export(FutureEvaluationWarning) export(FutureGlobals) export(FutureMessage) +export(FutureResult) export(FutureWarning) export(MulticoreFuture) export(MultiprocessFuture) @@ -117,6 +119,7 @@ export(plan) export(remote) export(resolve) export(resolved) +export(result) export(run) export(sequential) export(sessionDetails) diff --git a/NEWS b/NEWS index 5245b969..1b1e53c3 100644 --- a/NEWS +++ b/NEWS @@ -1,11 +1,16 @@ Package: future =============== -Version: 1.7.0-9000 [2018-02-21] +Version: 1.7.0-9000 [2018-02-23] NEW FEATURES: - o ... + o Add support for a richer set of results returned by resolved futures. + Previously only the value of the future expression, which could be a + captured error to be resignaled, was expected. Now a FutureResult object + may be returned which will be able to contain further information of the + evaluation of the future, e.g. captured output, timing and memory + benchmarks etc. TEMPORARY WORKAROUND / KNOWN ISSUES: @@ -16,6 +21,11 @@ TEMPORARY WORKAROUND / KNOWN ISSUES: BUG FIXES: + o value() for MulticoreFuture would not produce an error when a (forked) + background R workers would terminate before the future expression is + resolved. This was a limitation inherited from the parallel package. + Now an informative FutureError message is produced. + o value() for MulticoreFuture would not signal errors unless they inherited from 'simpleError' - now it's enough for them to inherits from 'error'. diff --git a/R/ClusterFuture-class.R b/R/ClusterFuture-class.R index cc0d36ea..8acbc04b 100644 --- a/R/ClusterFuture-class.R +++ b/R/ClusterFuture-class.R @@ -87,7 +87,7 @@ ClusterFuture <- function(expr = NULL, envir = parent.frame(), substitute = FALS expr <- gp$expr gp <- NULL - f <- MultiprocessFuture(expr = expr, envir = envir, substitute = FALSE, globals = globals, packages = packages, local = local, gc = gc, persistent = persistent, workers = workers, node = NA_integer_, ...) + f <- MultiprocessFuture(expr = expr, envir = envir, substitute = FALSE, globals = globals, packages = packages, local = local, gc = gc, persistent = persistent, workers = workers, node = NA_integer_, version = "1.8", ...) structure(f, class = c("ClusterFuture", class(f))) } diff --git a/R/Future-class.R b/R/Future-class.R index 4c60b5cd..b292592d 100644 --- a/R/Future-class.R +++ b/R/Future-class.R @@ -235,6 +235,72 @@ run.Future <- function(future, ...) { run <- function(...) UseMethod("run") +#' @export +#' @keywords internal +result <- function(...) UseMethod("result") + +#' Get the results of a resolved future +#' +#' @param future A \link{Future}. +#' @param \dots Not used. +#' +#' @return The \link{FutureResult} object. +#' +#' @details +#' This function is only part of the _backend_ Future API. +#' This function is _not_ part of the frontend Future API. +#' +#' @aliases result +#' @rdname result +#' @export +#' @keywords internal +result.Future <- function(future, ...) { + getFutureResult(future, ...) +} + + +getFutureResult <- function(future, ...) { + if (future$state == "created") { + future <- run(future) + } + + if (!future$state %in% c("finished", "failed", "interrupted")) { + ## BACKWARD COMPATIBILITY: + ## For now, it is value() that collects the results. Later we want + ## all future backends to use result() to do it. /HB 2018-02-22 + value(future, signal = FALSE) + } + + result <- future$result + if (inherits(result, "FutureResult")) return(result) + + ## BACKWARD COMPATIBILITY + result <- future$value + if (inherits(result, "FutureResult")) return(result) + + version <- future$version + if (is.null(version)) version <- "1.7" + + ## Sanity check + if (version == "1.8") { + if (is.null(result) && inherits(future, "MulticoreFuture")) { + label <- future$label + if (is.null(label)) label <- "" + msg <- sprintf("A future ('%s') of class %s did not produce a FutureResult object but NULL. This suggests that the R worker terminated (crashed?) before the future expression was resolved.", label, class(future)[1]) + stop(FutureError(msg, future = future)) + } + } + + ## BACKWARD COMPATIBILITY + if (future$state == "failed") { + value <- result + calls <- value$traceback + return(FutureResult(condition = value, calls = calls, version = "1.7")) + } + + FutureResult(value = result, version = "1.7") +} + #' The value of a future #' @@ -267,11 +333,19 @@ value.Future <- function(future, signal = TRUE, ...) { stop(FutureError(msg, future = future)) } - value <- future$value - if (signal && future$state == "failed") { - mdebug("Future state: %s", sQuote(future$state)) - mdebug("Future value: %s", sQuote(value)) - resignalCondition(future) ## Will produce an error + result <- getFutureResult(future) + stopifnot(inherits(result, "FutureResult")) + + value <- result$value + condition <- result$condition + if (inherits(condition, "condition")) { + if (signal) { + mdebug("Future state: %s", sQuote(future$state)) + resignalCondition(future) ## Will produce an error + } else { + ## BACKWARD COMPATIBILITY + value <- condition + } } value @@ -329,10 +403,16 @@ resolved.Future <- function(x, ...) { getExpression <- function(future, ...) UseMethod("getExpression") #' @export -getExpression.Future <- function(future, mc.cores = NULL, ...) { +getExpression.Future <- function(future, version = NULL, mc.cores = NULL, ...) { debug <- getOption("future.debug", FALSE) ## mdebug("getExpression() ...") + if (is.null(version)) { + version <- future$version + if (is.null(version)) version <- "1.7" + } + + ## Should 'mc.cores' be set? if (!is.null(mc.cores)) { ## mdebug("getExpression(): setting mc.cores = %d inside future", mc.cores) @@ -443,7 +523,7 @@ getExpression.Future <- function(future, mc.cores = NULL, ...) { }) } ## if (length(strategiesR) > 0L) - expr <- makeExpression(expr = future$expr, local = future$local, enter = enter, exit = exit) + expr <- makeExpression(expr = future$expr, local = future$local, enter = enter, exit = exit, version = version) if (getOption("future.debug", FALSE)) { print(expr) } @@ -454,7 +534,7 @@ getExpression.Future <- function(future, mc.cores = NULL, ...) { } ## getExpression() -makeExpression <- function(expr, local = TRUE, globals.onMissing = getOption("future.globals.onMissing", "error"), enter = NULL, exit = NULL) { +makeExpression <- function(expr, local = TRUE, globals.onMissing = getOption("future.globals.onMissing", "error"), enter = NULL, exit = NULL, version = "1.7") { ## Evaluate expression in a local() environment? if (local) { a <- NULL; rm(list = "a") ## To please R CMD check @@ -483,15 +563,40 @@ makeExpression <- function(expr, local = TRUE, globals.onMissing = getOption("fu ## evaluation in a local is optional, cf. argument 'local'. ## If this was mandatory, we could. Instead we use ## a tryCatch() statement. /HB 2016-03-14 - expr <- substitute({ - ## covr: skip=6 - enter - tryCatch({ - body - }, finally = { - exit - }) - }, env = list(enter = enter, body = expr, exit = exit)) + + if (version == "1.7") { + expr <- substitute({ + ## covr: skip=6 + enter + tryCatch({ + body + }, finally = { + exit + }) + }, env = list(enter = enter, body = expr, exit = exit)) + } else if (version == "1.8") { + expr <- substitute({ + ## covr: skip=6 + enter + tryCatch({ + ...future.value <- body + future::FutureResult(value = ...future.value) + }, error = function(cond) { + calls <- sys.calls() + ## Drop fluff added by tryCatch() + # calls <- calls[seq_len(length(calls) - 2L)] + ## Drop fluff added by outer tryCatch() + # calls <- calls[-seq_len(current+7L)] + ## Drop fluff added by outer local = TRUE + # if (future$local) calls <- calls[-seq_len(6L)] + future::FutureResult(value = NULL, condition = cond, calls = calls) + }, finally = { + exit + }) + }, env = list(enter = enter, body = expr, exit = exit)) + } else { + stop("Internal error: Non-supported future expression version: ", version) + } expr } ## makeExpression() @@ -508,3 +613,5 @@ packages <- function(future, ...) UseMethod("packages") packages.Future <- function(future, ...) { future[["packages"]] } + + diff --git a/R/FutureEvaluationCondition-class.R b/R/FutureEvaluationCondition-class.R index 6faec5ba..a7c574b5 100644 --- a/R/FutureEvaluationCondition-class.R +++ b/R/FutureEvaluationCondition-class.R @@ -24,14 +24,15 @@ FutureEvaluationCondition <- function(message, call = NULL, future = NULL, outpu cond <- NULL if (inherits(message, "Future")) { future <- message - value <- future$value - stopifnot(inherits(value, "condition")) - cond <- value + result <- result(future) + cond <- result$condition message <- conditionMessage(cond) } else if (inherits(message, "condition")) { cond <- message message <- conditionMessage(cond) } + + stopifnot(inherits(cond, "condition")) ## Create a condition object structure(list(message = as.character(message), cond = cond, call = call), @@ -56,7 +57,8 @@ print.FutureEvaluationCondition <- function(x, ...) { cat("\n") } - cond <- future$value + result <- result(future) + cond <- future$condition if (inherits(cond, "condition")) { fcalls <- cond$traceback if (!is.null(fcalls)) { diff --git a/R/FutureResult-class.R b/R/FutureResult-class.R new file mode 100644 index 00000000..8d7bc446 --- /dev/null +++ b/R/FutureResult-class.R @@ -0,0 +1,34 @@ +#' Results from resolving a future +#' +#' @param value The value of the future expression. +#' If the expression was not fully resolved (e.g. an error) occurred, +#' the the value is `NULL`. +#' +#' @param condition A [[base::condition]] captured while resolving the future, +#' if any. This is typically an error. +#' +#' @param \ldots (optional) Additional named results to be returned. +#' +#' @param version The version format of the results. +#' +#' @return An object of class FutureResult. +#' +#' @details +#' This function is only part of the _backend_ Future API. +#' This function is _not_ part of the frontend Future API. +#' +#' @export +#' @keywords internal +FutureResult <- function(value = NULL, condition = NULL, ..., version = "1.7") { + args <- list(...) + if (length(args) > 0) { + names <- names(args) + if (is.null(names) || any(nchar(names) == 0)) { + stop(FutureError("Internal error: All arguments to FutureResult() must be named")) + } + } + + structure(list(value = value, condition = condition, ..., version = version), + class = c("FutureResult", "list")) +} + diff --git a/R/MulticoreFuture-class.R b/R/MulticoreFuture-class.R index 6569e148..fcdf6f02 100644 --- a/R/MulticoreFuture-class.R +++ b/R/MulticoreFuture-class.R @@ -34,7 +34,7 @@ MulticoreFuture <- function(expr = NULL, envir = parent.frame(), substitute = FA } gp <- NULL - f <- MultiprocessFuture(expr = expr, envir = envir, substitute = FALSE, job = NULL, ...) + f <- MultiprocessFuture(expr = expr, envir = envir, substitute = FALSE, job = NULL, version = "1.8", ...) structure(f, class = c("MulticoreFuture", class(f))) } diff --git a/R/UniprocessFuture-class.R b/R/UniprocessFuture-class.R index da7a6d70..c5a1435c 100644 --- a/R/UniprocessFuture-class.R +++ b/R/UniprocessFuture-class.R @@ -34,7 +34,7 @@ UniprocessFuture <- function(expr = NULL, envir = parent.frame(), substitute = F gp <- NULL - f <- Future(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, asynchronous = FALSE, local = local, globals = globals, packages = packages, ...) + f <- Future(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, asynchronous = FALSE, local = local, globals = globals, packages = packages, version = "1.8", ...) structure(f, class = c("UniprocessFuture", class(f))) } diff --git a/R/backtrace.R b/R/backtrace.R index 37f62860..078fa8c5 100644 --- a/R/backtrace.R +++ b/R/backtrace.R @@ -25,12 +25,13 @@ backtrace <- function(future, envir = parent.frame(), ...) { stop("No condition has been caught because the future is unresolved: ", sQuote(expr)) } - value <- future$value - if (!inherits(value, "condition")) { + result <- result(future) + condition <- result$condition + if (!inherits(condition, "condition")) { stop("No condition was caught for this future: ", sQuote(expr)) } - calls <- value$traceback + calls <- result$calls if (is.null(calls)) { stop("No call trace was recorded for this future: ", sQuote(expr)) } diff --git a/R/signalEarly.R b/R/signalEarly.R index 14451cca..5a2e09b4 100644 --- a/R/signalEarly.R +++ b/R/signalEarly.R @@ -8,27 +8,32 @@ signalEarly <- function(future, collect = TRUE, ...) { if (!earlySignal) return(future) debug <- getOption("future.debug", FALSE) - if (debug) mdebug("signalEarly(): Retrieving value ...") + if (debug) mdebug("signalEarly() ...") - ## Collect value? - if (collect) { - if (debug) mdebug("signalEarly(): v <- value(f, signal = FALSE)") - value <- value(future, signal = FALSE) - } else { - if (debug) mdebug("signalEarly(): v <- f$value") - value <- future$value + ## Nothing to do? + if (!collect && !resolved(future)) { + if (debug) mdebug("Future not resolved and collect = FALSE. Skipping") + return(future) } - - if (debug) { - mdebug("signalEarly(): class(v) = c(%s)", paste(sQuote(class(value)), collapse = ", ")) - mdebug("signalEarly(): Retrieving value ... DONE") + + result <- result(future) + stopifnot(inherits(result, "FutureResult")) + + condition <- result$condition + + ## Nothing to do? + if (is.null(condition)) { + if (debug) mdebug("signalEarly() ... DONE") + return(future) } + + if (debug) mdebug("signalEarly(): Condition class = c(%s)", paste(sQuote(class(condition)), collapse = ", ")) - ## Was a condition caught? - if (!inherits(value, "condition")) return(future) + ## Sanity check + stopifnot(inherits(condition, "condition")) - if (debug) mdebug("signalEarly(): signalCondition(v)") resignalCondition(future) + if (debug) mdebug("signalEarly() ... DONE") invisible(future) @@ -41,26 +46,22 @@ resignalCondition <- function(future, ...) { future = future)) } - value <- future$value - - ## Was a condition caught? - if (!inherits(value, "condition")) { - stop(FutureError("Internal error: Future did not produce a condition", - future = future)) - } - - cond <- value + result <- result(future) + stopifnot(inherits(result, "FutureResult")) + condition <- result$condition + stopifnot(inherits(condition, "condition")) + ## Signal detected condition - if (inherits(cond, "error")) { + if (inherits(condition, "error")) { stop(FutureEvaluationError(future)) - } else if (inherits(cond, "warning")) { + } else if (inherits(condition, "warning")) { warning(FutureEvaluationWarning(future)) - } else if (inherits(cond, "message")) { + } else if (inherits(condition, "message")) { message(FutureEvaluationMessage(future)) message("\n") ## TODO: Remove this? /HB 2018-02-03 - } else { - signalCondition(value) + } else if (inherits(condition, "condition")) { + signalCondition(condition) } invisible(future) diff --git a/man/FutureResult.Rd b/man/FutureResult.Rd new file mode 100644 index 00000000..5dfb4bba --- /dev/null +++ b/man/FutureResult.Rd @@ -0,0 +1,31 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/FutureResult-class.R +\name{FutureResult} +\alias{FutureResult} +\title{Results from resolving a future} +\usage{ +FutureResult(value = NULL, condition = NULL, ..., version = "1.7") +} +\arguments{ +\item{value}{The value of the future expression. +If the expression was not fully resolved (e.g. an error) occurred, +the the value is `NULL`.} + +\item{condition}{A [[base::condition]] captured while resolving the future, +if any. This is typically an error.} + +\item{version}{The version format of the results.} + +\item{\ldots}{(optional) Additional named results to be returned.} +} +\value{ +An object of class FutureResult. +} +\description{ +Results from resolving a future +} +\details{ +This function is only part of the _backend_ Future API. +This function is _not_ part of the frontend Future API. +} +\keyword{internal} diff --git a/man/result.Rd b/man/result.Rd new file mode 100644 index 00000000..0df95c2f --- /dev/null +++ b/man/result.Rd @@ -0,0 +1,25 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/Future-class.R +\name{result.Future} +\alias{result.Future} +\alias{result} +\title{Get the results of a resolved future} +\usage{ +\method{result}{Future}(future, ...) +} +\arguments{ +\item{future}{A \link{Future}.} + +\item{\dots}{Not used.} +} +\value{ +The \link{FutureResult} object. +} +\description{ +Get the results of a resolved future +} +\details{ +This function is only part of the _backend_ Future API. +This function is _not_ part of the frontend Future API. +} +\keyword{internal} diff --git a/tests/backtrace.R b/tests/backtrace.R index 8afce6b7..376b2445 100644 --- a/tests/backtrace.R +++ b/tests/backtrace.R @@ -61,6 +61,7 @@ message("- No call stack ...") f <- future({ 42L; stop("Woops") }) v <- value(f, signal = FALSE) f$value$traceback <- NULL ## Remove call stack +f$value$calls <- NULL ## Remove call stack res <- tryCatch(backtrace(f), error = identity) print(res) stopifnot(inherits(res, "error")) diff --git a/tests/cluster.R b/tests/cluster.R index 638b446d..ddc89cbc 100644 --- a/tests/cluster.R +++ b/tests/cluster.R @@ -284,7 +284,7 @@ for (type in types) { print(cl) - ## Crashing FORK:ed processes seems to harsh on R (< 3.2.0) + ## Crashing FORK:ed processes seems too harsh on R (< 3.2.0) if (type != "FORK" || getRversion() >= "3.2.0") { message("*** cluster() - crashed worker ...") diff --git a/tests/multicore.R b/tests/multicore.R index bd262e36..255532d4 100644 --- a/tests/multicore.R +++ b/tests/multicore.R @@ -97,6 +97,20 @@ for (cores in 1:min(2L, availableCores("multicore"))) { } # for (globals ...) + message("*** multicore() - terminating workers ...") + + ## Force R worker to quit + x %<-% quit(save = "no") + res <- tryCatch(y <- x, error = identity) + print(res) + stopifnot( + inherits(res, "simpleError"), + inherits(res, "FutureError") + ) + + message("*** multicore() - terminating workers ... DONE") + + message("*** multicore(..., workers = 1L) ...") a <- 2