Skip to content

Commit

Permalink
Introducing FutureResult for returning richer sets of information fro…
Browse files Browse the repository at this point in the history
…m resolved futures [#25, #59, #67, #154, #188 #199, #200]. This also fixes #199 and #200.
  • Loading branch information
HenrikBengtsson committed Feb 23, 2018
1 parent a16343c commit f8a9afe
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 59 deletions.
3 changes: 3 additions & 0 deletions NAMESPACE
Expand Up @@ -49,6 +49,7 @@ S3method(resolved,UniprocessFuture)
S3method(resolved,default)
S3method(resolved,environment)
S3method(resolved,list)
S3method(result,Future)
S3method(run,ClusterFuture)
S3method(run,Future)
S3method(run,MulticoreFuture)
Expand Down Expand Up @@ -84,6 +85,7 @@ export(FutureEvaluationMessage)
export(FutureEvaluationWarning)
export(FutureGlobals)
export(FutureMessage)
export(FutureResult)
export(FutureWarning)
export(MulticoreFuture)
export(MultiprocessFuture)
Expand Down Expand Up @@ -117,6 +119,7 @@ export(plan)
export(remote)
export(resolve)
export(resolved)
export(result)
export(run)
export(sequential)
export(sessionDetails)
Expand Down
14 changes: 12 additions & 2 deletions NEWS
@@ -1,11 +1,16 @@
Package: future
===============

Version: 1.7.0-9000 [2018-02-21]
Version: 1.7.0-9000 [2018-02-23]

NEW FEATURES:

o ...
o Add support for a richer set of results returned by resolved futures.
Previously only the value of the future expression, which could be a
captured error to be resignaled, was expected. Now a FutureResult object
may be returned which will be able to contain further information of the
evaluation of the future, e.g. captured output, timing and memory
benchmarks etc.

TEMPORARY WORKAROUND / KNOWN ISSUES:

Expand All @@ -16,6 +21,11 @@ TEMPORARY WORKAROUND / KNOWN ISSUES:

BUG FIXES:

o value() for MulticoreFuture would not produce an error when a (forked)
background R workers would terminate before the future expression is
resolved. This was a limitation inherited from the parallel package.
Now an informative FutureError message is produced.

o value() for MulticoreFuture would not signal errors unless they inherited
from 'simpleError' - now it's enough for them to inherits from 'error'.

Expand Down
2 changes: 1 addition & 1 deletion R/ClusterFuture-class.R
Expand Up @@ -87,7 +87,7 @@ ClusterFuture <- function(expr = NULL, envir = parent.frame(), substitute = FALS
expr <- gp$expr
gp <- NULL

f <- MultiprocessFuture(expr = expr, envir = envir, substitute = FALSE, globals = globals, packages = packages, local = local, gc = gc, persistent = persistent, workers = workers, node = NA_integer_, ...)
f <- MultiprocessFuture(expr = expr, envir = envir, substitute = FALSE, globals = globals, packages = packages, local = local, gc = gc, persistent = persistent, workers = workers, node = NA_integer_, version = "1.8", ...)
structure(f, class = c("ClusterFuture", class(f)))
}

Expand Down
141 changes: 124 additions & 17 deletions R/Future-class.R
Expand Up @@ -235,6 +235,72 @@ run.Future <- function(future, ...) {
run <- function(...) UseMethod("run")


#' @export
#' @keywords internal
result <- function(...) UseMethod("result")

#' Get the results of a resolved future
#'
#' @param future A \link{Future}.
#' @param \dots Not used.
#'
#' @return The \link{FutureResult} object.
#'
#' @details
#' This function is only part of the _backend_ Future API.
#' This function is _not_ part of the frontend Future API.
#'
#' @aliases result
#' @rdname result
#' @export
#' @keywords internal
result.Future <- function(future, ...) {
getFutureResult(future, ...)
}


getFutureResult <- function(future, ...) {
if (future$state == "created") {
future <- run(future)
}

if (!future$state %in% c("finished", "failed", "interrupted")) {
## BACKWARD COMPATIBILITY:
## For now, it is value() that collects the results. Later we want
## all future backends to use result() to do it. /HB 2018-02-22
value(future, signal = FALSE)
}

result <- future$result
if (inherits(result, "FutureResult")) return(result)

## BACKWARD COMPATIBILITY
result <- future$value
if (inherits(result, "FutureResult")) return(result)

version <- future$version
if (is.null(version)) version <- "1.7"

## Sanity check
if (version == "1.8") {
if (is.null(result) && inherits(future, "MulticoreFuture")) {
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("A future ('%s') of class %s did not produce a FutureResult object but NULL. This suggests that the R worker terminated (crashed?) before the future expression was resolved.", label, class(future)[1])
stop(FutureError(msg, future = future))
}
}

## BACKWARD COMPATIBILITY
if (future$state == "failed") {
value <- result
calls <- value$traceback
return(FutureResult(condition = value, calls = calls, version = "1.7"))
}

FutureResult(value = result, version = "1.7")
}


#' The value of a future
#'
Expand Down Expand Up @@ -267,11 +333,19 @@ value.Future <- function(future, signal = TRUE, ...) {
stop(FutureError(msg, future = future))
}

value <- future$value
if (signal && future$state == "failed") {
mdebug("Future state: %s", sQuote(future$state))
mdebug("Future value: %s", sQuote(value))
resignalCondition(future) ## Will produce an error
result <- getFutureResult(future)
stopifnot(inherits(result, "FutureResult"))

value <- result$value
condition <- result$condition
if (inherits(condition, "condition")) {
if (signal) {
mdebug("Future state: %s", sQuote(future$state))
resignalCondition(future) ## Will produce an error
} else {
## BACKWARD COMPATIBILITY
value <- condition
}
}

value
Expand Down Expand Up @@ -329,10 +403,16 @@ resolved.Future <- function(x, ...) {
getExpression <- function(future, ...) UseMethod("getExpression")

#' @export
getExpression.Future <- function(future, mc.cores = NULL, ...) {
getExpression.Future <- function(future, version = NULL, mc.cores = NULL, ...) {
debug <- getOption("future.debug", FALSE)
## mdebug("getExpression() ...")

if (is.null(version)) {
version <- future$version
if (is.null(version)) version <- "1.7"
}


## Should 'mc.cores' be set?
if (!is.null(mc.cores)) {
## mdebug("getExpression(): setting mc.cores = %d inside future", mc.cores)
Expand Down Expand Up @@ -443,7 +523,7 @@ getExpression.Future <- function(future, mc.cores = NULL, ...) {
})
} ## if (length(strategiesR) > 0L)

expr <- makeExpression(expr = future$expr, local = future$local, enter = enter, exit = exit)
expr <- makeExpression(expr = future$expr, local = future$local, enter = enter, exit = exit, version = version)
if (getOption("future.debug", FALSE)) {
print(expr)
}
Expand All @@ -454,7 +534,7 @@ getExpression.Future <- function(future, mc.cores = NULL, ...) {
} ## getExpression()


makeExpression <- function(expr, local = TRUE, globals.onMissing = getOption("future.globals.onMissing", "error"), enter = NULL, exit = NULL) {
makeExpression <- function(expr, local = TRUE, globals.onMissing = getOption("future.globals.onMissing", "error"), enter = NULL, exit = NULL, version = "1.7") {
## Evaluate expression in a local() environment?
if (local) {
a <- NULL; rm(list = "a") ## To please R CMD check
Expand Down Expand Up @@ -483,15 +563,40 @@ makeExpression <- function(expr, local = TRUE, globals.onMissing = getOption("fu
## evaluation in a local is optional, cf. argument 'local'.
## If this was mandatory, we could. Instead we use
## a tryCatch() statement. /HB 2016-03-14
expr <- substitute({
## covr: skip=6
enter
tryCatch({
body
}, finally = {
exit
})
}, env = list(enter = enter, body = expr, exit = exit))

if (version == "1.7") {
expr <- substitute({
## covr: skip=6
enter
tryCatch({
body
}, finally = {
exit
})
}, env = list(enter = enter, body = expr, exit = exit))
} else if (version == "1.8") {
expr <- substitute({
## covr: skip=6
enter
tryCatch({
...future.value <- body
future::FutureResult(value = ...future.value)
}, error = function(cond) {
calls <- sys.calls()
## Drop fluff added by tryCatch()
# calls <- calls[seq_len(length(calls) - 2L)]
## Drop fluff added by outer tryCatch()
# calls <- calls[-seq_len(current+7L)]
## Drop fluff added by outer local = TRUE
# if (future$local) calls <- calls[-seq_len(6L)]
future::FutureResult(value = NULL, condition = cond, calls = calls)
}, finally = {
exit
})
}, env = list(enter = enter, body = expr, exit = exit))
} else {
stop("Internal error: Non-supported future expression version: ", version)
}

expr
} ## makeExpression()
Expand All @@ -508,3 +613,5 @@ packages <- function(future, ...) UseMethod("packages")
packages.Future <- function(future, ...) {
future[["packages"]]
}


10 changes: 6 additions & 4 deletions R/FutureEvaluationCondition-class.R
Expand Up @@ -24,14 +24,15 @@ FutureEvaluationCondition <- function(message, call = NULL, future = NULL, outpu
cond <- NULL
if (inherits(message, "Future")) {
future <- message
value <- future$value
stopifnot(inherits(value, "condition"))
cond <- value
result <- result(future)
cond <- result$condition
message <- conditionMessage(cond)
} else if (inherits(message, "condition")) {
cond <- message
message <- conditionMessage(cond)
}

stopifnot(inherits(cond, "condition"))

## Create a condition object
structure(list(message = as.character(message), cond = cond, call = call),
Expand All @@ -56,7 +57,8 @@ print.FutureEvaluationCondition <- function(x, ...) {
cat("\n")
}

cond <- future$value
result <- result(future)
cond <- future$condition
if (inherits(cond, "condition")) {
fcalls <- cond$traceback
if (!is.null(fcalls)) {
Expand Down
34 changes: 34 additions & 0 deletions R/FutureResult-class.R
@@ -0,0 +1,34 @@
#' Results from resolving a future
#'
#' @param value The value of the future expression.
#' If the expression was not fully resolved (e.g. an error) occurred,
#' the the value is `NULL`.
#'
#' @param condition A [[base::condition]] captured while resolving the future,
#' if any. This is typically an error.
#'
#' @param \ldots (optional) Additional named results to be returned.
#'
#' @param version The version format of the results.
#'
#' @return An object of class FutureResult.
#'
#' @details
#' This function is only part of the _backend_ Future API.
#' This function is _not_ part of the frontend Future API.
#'
#' @export
#' @keywords internal
FutureResult <- function(value = NULL, condition = NULL, ..., version = "1.7") {
args <- list(...)
if (length(args) > 0) {
names <- names(args)
if (is.null(names) || any(nchar(names) == 0)) {
stop(FutureError("Internal error: All arguments to FutureResult() must be named"))
}
}

structure(list(value = value, condition = condition, ..., version = version),
class = c("FutureResult", "list"))
}

2 changes: 1 addition & 1 deletion R/MulticoreFuture-class.R
Expand Up @@ -34,7 +34,7 @@ MulticoreFuture <- function(expr = NULL, envir = parent.frame(), substitute = FA
}
gp <- NULL

f <- MultiprocessFuture(expr = expr, envir = envir, substitute = FALSE, job = NULL, ...)
f <- MultiprocessFuture(expr = expr, envir = envir, substitute = FALSE, job = NULL, version = "1.8", ...)
structure(f, class = c("MulticoreFuture", class(f)))
}

Expand Down
2 changes: 1 addition & 1 deletion R/UniprocessFuture-class.R
Expand Up @@ -34,7 +34,7 @@ UniprocessFuture <- function(expr = NULL, envir = parent.frame(), substitute = F

gp <- NULL

f <- Future(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, asynchronous = FALSE, local = local, globals = globals, packages = packages, ...)
f <- Future(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, asynchronous = FALSE, local = local, globals = globals, packages = packages, version = "1.8", ...)
structure(f, class = c("UniprocessFuture", class(f)))
}

Expand Down
7 changes: 4 additions & 3 deletions R/backtrace.R
Expand Up @@ -25,12 +25,13 @@ backtrace <- function(future, envir = parent.frame(), ...) {
stop("No condition has been caught because the future is unresolved: ", sQuote(expr))
}

value <- future$value
if (!inherits(value, "condition")) {
result <- result(future)
condition <- result$condition
if (!inherits(condition, "condition")) {
stop("No condition was caught for this future: ", sQuote(expr))
}

calls <- value$traceback
calls <- result$calls
if (is.null(calls)) {
stop("No call trace was recorded for this future: ", sQuote(expr))
}
Expand Down

0 comments on commit f8a9afe

Please sign in to comment.