/
UniprocessFuture-class.R
126 lines (104 loc) · 4.22 KB
/
UniprocessFuture-class.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#' An uniprocess future is a future whose value will be resolved synchronously in the current process
#'
#' @inheritParams Future-class
#'
#' @param lazy If \code{FALSE} (default), then the setup and validation of
#' global variables are done for eager evaluation, otherwise not.
#'
#' @param \dots Additional named elements passed to \code{\link{Future}()}.
#'
#' @return An object of class \code{UniprocessFuture}.
#'
#' @seealso
#' To evaluate an expression using "uniprocess future", see functions
#' \code{\link{uniprocess}()}.
#'
#' @export
#' @name UniprocessFuture-class
#' @keywords internal
UniprocessFuture <- function(expr = NULL, envir = parent.frame(), substitute = FALSE, globals = TRUE, packages = NULL, lazy = FALSE, local = TRUE, ...) {
if (substitute) expr <- substitute(expr)
if (lazy && !local && (!is.logical(globals) || globals)) {
stop("Non-supported use of lazy uniprocess futures: Whenever argument 'local' is FALSE, then argument 'globals' must also be FALSE. Lazy uniprocess future evaluation in the calling environment (local = FALSE) can only be done if global objects are resolved at the same time.")
}
## Global objects?
gp <- getGlobalsAndPackages(expr, envir = envir, tweak = tweakExpression, globals = globals)
globals <- gp$globals
## Record packages?
if (length(packages) > 0 || (length(gp$packages) > 0 && lazy)) {
packages <- unique(c(gp$packages, packages))
}
gp <- NULL
f <- Future(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, asynchronous = FALSE, local = local, globals = globals, packages = packages, version = "1.8", ...)
structure(f, class = c("UniprocessFuture", class(f)))
}
#' @export
run.UniprocessFuture <- function(future, ...) {
debug <- getOption("future.debug", FALSE)
if (future$state != 'created') {
label <- future$label
if (is.null(label)) label <- "<none>"
stop(sprintf("A future ('%s') can only be launched once.", label))
}
## Assert that the process that created the future is
## also the one that evaluates/resolves/queries it.
assertOwner(future)
expr <- getExpression(future)
envir <- future$envir
## Assign globals to separate "globals" enclosure environment?
globals <- future$globals
if (length(globals) > 0) {
if (future$local) {
envir <- new.env(parent = envir)
}
for (name in names(globals)) {
envir[[name]] <- globals[[name]]
}
}
## Run future
future$state <- 'running'
## WORKAROUND: tryCatch() does not record the traceback and
## it is too late to infer it when the error has been caught.
## Because of this with we use withCallingHandlers() to
## capture errors and if they occur we record the call trace.
current <- sys.nframe()
tryCatch({
withCallingHandlers({
future$value <- eval(expr, envir = envir)
future$state <- 'finished'
}, error = function(ex) {
calls <- sys.calls()
## Drop fluff added by withCallingHandlers()
calls <- calls[seq_len(length(calls)-2L)]
## Drop fluff added by outer tryCatch()
calls <- calls[-seq_len(current+7L)]
## Drop fluff added by outer local = TRUE
if (future$local) calls <- calls[-seq_len(6L)]
ex$traceback <- calls
future$value <- ex
future$state <- 'failed'
})
}, error = function(ex) {})
if (debug) mdebug("%s started (and completed)", class(future)[1])
## Signal conditions early, iff specified for the given future
signalEarly(future, collect = FALSE)
invisible(future)
}
#' @export
resolved.UniprocessFuture <- function(x, ...) {
if (x$lazy) {
## resolved() for lazy uniprocess futures must force value()
## such that the future gets resolved. The reason for this
## is so that polling is always possible, e.g.
## while(!resolved(f)) Sys.sleep(5);
value(x, signal = FALSE)
}
NextMethod("resolved")
}
#' @rdname UniprocessFuture-class
#' @export
SequentialFuture <- function(expr = NULL, envir = parent.frame(), substitute = FALSE, lazy = FALSE, globals = TRUE, local = TRUE, ...) {
if (substitute) expr <- substitute(expr)
f <- UniprocessFuture(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, globals = globals, local = local, ...)
structure(f, class = c("SequentialFuture", class(f)))
}