-
Notifications
You must be signed in to change notification settings - Fork 1
/
CallrFuture-class.R
444 lines (366 loc) · 14.1 KB
/
CallrFuture-class.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
#' A callr future is a future whose value will be resolved via callr
#'
#' @param expr The \R expression to be evaluated.
#'
#' @param envir The environment in which global environment
#' should be located.
#'
#' @param substitute Controls whether `expr` should be `substitute()`:d
#' or not.
#'
#' @param globals (optional) a logical, a character vector, a named list, or
#' a [globals::Globals] object. If `TRUE`, globals are identified by code
#' inspection based on `expr` and `tweak` searching from environment
#' `envir`. If `FALSE`, no globals are used. If a character vector, then
#' globals are identified by lookup based their names `globals` searching
#' from environment `envir`. If a named list or a Globals object, the
#' globals are used as is.
#'
#' @param label (optional) Label of the future.
#'
#' @param workers (optional) The maximum number of workers the callr
#' backend may use at any time.
#'
#' @param supervise (optional) Argument passed to [callr::r_bg()].
#'
#' @param \ldots Additional arguments passed to [future::MultiprocessFuture()].
#'
#' @return A CallrFuture object
#'
#' @aliases run.CallrFuture
#' @export
#' @importFrom future MultiprocessFuture getGlobalsAndPackages
#' @keywords internal
CallrFuture <- function(expr = NULL, envir = parent.frame(),
substitute = TRUE,
globals = TRUE, packages = NULL,
label = NULL,
workers = NULL,
supervise = FALSE,
...) {
if (substitute) expr <- substitute(expr)
## Record globals
gp <- getGlobalsAndPackages(expr, envir = envir, globals = globals)
future <- MultiprocessFuture(expr = gp$expr, envir = envir,
substitute = FALSE,
globals = gp$globals,
packages = unique(c(packages, gp$packages)),
label = label, ...)
future <- as_CallrFuture(future, workers = workers, supervise = supervise)
future
}
as_CallrFuture <- function(future, workers = NULL, ...) {
args <- list(...)
names <- names(args)
stopifnot(is.character(names), all(nzchar(names)))
if (is.function(workers)) workers <- workers()
if (!is.null(workers)) {
stop_if_not(length(workers) >= 1)
if (is.numeric(workers)) {
stop_if_not(!anyNA(workers), all(workers >= 1))
} else {
stop("Argument 'workers' should be numeric: ", mode(workers))
}
}
future$workers <- workers
for (name in names) future[[name]] <- args[[name]]
future <- structure(future, class = c("CallrFuture", class(future)))
future
}
#' Prints a callr future
#'
#' @param x An CallrFuture object
#'
#' @param \ldots Not used.
#'
#' @export
#' @keywords internal
print.CallrFuture <- function(x, ...) {
NextMethod()
## Ask for status once
process <- x$process
if (inherits(process, "r_process")) {
status <- if (process$is_alive()) "running" else "finished"
x$state <- status
} else {
status <- NA_character_
}
printf("callr status: %s\n", paste(sQuote(status), collapse = ", "))
process <- x$process
if (is_na(status)) {
printf("callr %s: Not found (happens when finished and deleted)\n",
class(process)[1])
} else {
printf("callr information: PID=%d, %s\n",
process$get_pid(), capture_output(print(process)))
}
invisible(x)
}
#' @export
getExpression.CallrFuture <- function(future, mc.cores = 1L, ...) {
NextMethod(mc.cores = mc.cores)
}
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Future API
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
#' @importFrom future resolved
#' @keywords internal
#' @export
resolved.CallrFuture <- function(x, .signalEarly = TRUE, ...) {
resolved <- NextMethod()
if (resolved) return(TRUE)
process <- x$process
if (!inherits(process, "r_process")) return(FALSE)
resolved <- !process$is_alive()
## Signal errors early?
if (.signalEarly && resolved) {
## Trigger a FutureError already here, if exit code != 0
if (process$get_exit_status() != 0L) result(x)
}
resolved
}
#' @importFrom future result UnexpectedFutureResultError
#' @keywords internal
#' @export
result.CallrFuture <- function(future, ...) {
result <- future$result
if (!is.null(result)) {
if (inherits(result, "FutureError")) stop(result)
return(result)
}
if (future$state == "created") {
future <- run(future)
}
result <- await(future, cleanup = FALSE)
if (!inherits(result, "FutureResult")) {
ex <- UnexpectedFutureResultError(future)
future$result <- ex
stop(ex)
}
future$result <- result
future$state <- "finished"
result
}
#' @importFrom future run getExpression FutureError
#' @importFrom callr r_bg
#' @keywords internal
#' @S3method run CallrFuture
#' @export
run.CallrFuture <- local({
## MEMOIZATION
cmdargs <- NULL
fasten <- NULL ## To please R CMD check
tmpl_expr <- bquote_compile(function(globals) {
if (length(globals) > 0) {
local({
fasten <- base::attach ## To please R CMD check
fasten(globals, pos = 2L, name = "r_bg_arguments")
})
}
rm(list = "globals")
.(expr)
})
function(future, ...) {
## Memoization
if (identical(cmdargs, NULL)) {
cmdargs <- eval(formals(r_bg)$cmdargs)
}
if (future$state != "created") {
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("A future ('%s') can only be launched once.", label)
stop(FutureError(msg, future = future))
}
## Assert that the process that created the future is
## also the one that evaluates/resolves/queries it.
assertOwner(future)
## Temporarily disable callr output?
## (i.e. messages and progress bars)
debug <- getOption("future.debug", FALSE)
## Get future expression
stdout <- if (isTRUE(future$stdout)) TRUE else NA
expr <- getExpression(future, stdout = stdout)
## Get globals
globals <- future$globals
## Make a callr::r_bg()-compatible function
expr <- bquote_apply(tmpl_expr)
func <- eval(expr, enclos = baseenv())
## 1. Wait for an available worker
waitForWorker(type = "callr", workers = future$workers)
## 2. Allocate future now worker
FutureRegistry("workers-callr", action = "add", future = future, earlySignal = FALSE)
## Discard standard output? (as soon as possible)
stdout <- if (isTRUE(stdout)) "|" else NULL
## Discard standard error
## WORKAROUND: https://github.com/HenrikBengtsson/future.callr/issues/14
## For unknown reasons, process$is_alive() will always return TRUE if
## we capture stderr, which means that await() will never return.
## Since we don't capture and relay stderr in other backends, it's safe
## to discard all standard error output. /HB 2021-04-05
stderr <- NULL
## Add future label to process call?
if (!is.null(future$label)) {
## Ideally this comes after a '--args' argument to R, but that is
## not possible with the current r_bg() because it will *append*
## '-f a-file.R' after these. /HB 2018-11-10
cmdargs <- c(cmdargs, sprintf("--future-label=%s", shQuote(future$label)))
}
## Have callr "supervise" the subprocess?
supervise <- future$supervise
## Launch
## WORKAROUND: callr::r_bg() updates the RNG state
with_stealth_rng({
future$process <- r_bg(func, args = list(globals = globals), stdout = stdout, stderr = stderr, cmdargs = cmdargs, supervise = supervise)
})
if (debug) mdebugf("Launched future (PID=%d)", future$process$get_pid())
## 3. Running
future$state <- "running"
invisible(future)
} ## run()
})
#' @importFrom utils tail
#' @importFrom future FutureError FutureWarning
await <- local({
function(future, timeout = getOption("future.wait.timeout", 30*24*60*60),
delta = getOption("future.wait.interval", 1.0),
alpha = getOption("future.wait.alpha", 1.01),
...) {
stop_if_not(is.finite(timeout), timeout >= 0)
stop_if_not(is.finite(alpha), alpha > 0)
debug <- getOption("future.debug", FALSE)
expr <- future$expr
process <- future$process
if (debug) mdebug("callr::wait() ...")
## Control callr info output
oopts <- options(callr.verbose = debug)
on.exit(options(oopts))
## Sleep function - increases geometrically as a function of iterations
sleep_fcn <- function(i) delta * alpha ^ (i - 1)
## Poll process
t_timeout <- Sys.time() + timeout
ii <- 1L
while (process$is_alive()) {
## Timed out?
if (Sys.time() > t_timeout) break
timeout_ii <- sleep_fcn(ii)
if (debug && ii %% 100 == 0)
mdebugf("- iteration %d: callr::wait(timeout = %g)", ii, timeout_ii)
res <- process$wait(timeout = timeout_ii)
ii <- ii + 1L
}
if (process$is_alive()) {
if (debug) mdebug("- callr process: running")
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("AsyncNotReadyError: Polled for results for %s seconds every %g seconds, but asynchronous evaluation for %s future (%s) is still running: %s", timeout, delta, class(future)[1], sQuote(label), process$get_pid()) #nolint
if (debug) mdebug(msg)
stop(FutureError(msg, future = future))
}
if (debug) {
mdebug("- callr process: finished")
mdebug("callr::wait() ... done")
}
## callr:::get_result() assert that "result" and "error" files exist
## based on file.exist(). In case there is a delay in the file system
## we might get a false-positive error:
## "Error: callr failed, could not start R, or it has crashed or was killed"
## If so, let's retry a few times before giving up.
## NOTE: This was observed, somewhat randomly, on R-devel (2018-04-20 r74620)
## on Linux (local and on Travis) with tests/demo.R /HB 2018-04-27
if (debug) mdebug("- callr:::get_result() ...")
for (ii in 4:0) {
result <- tryCatch({
process$get_result()
}, error = identity)
if (!inherits(result, "error")) break
if (ii > 0L) {
if (debug) mdebug("- process$get_result() failed; will retry after 0.1s")
Sys.sleep(0.1)
}
}
## Failed?
if (inherits(result, "error")) {
msg <- post_mortem_failure(result, future = future)
ex <- CallrFutureError(msg, future = future)
## Remove future from FutureRegistry?
if (!process$is_alive()) {
reg <- "workers-callr"
if (FutureRegistry(reg, action = "contains", future = future)) {
FutureRegistry(reg, action = "remove", future = future)
}
}
stop(ex)
}
if (debug) mdebugf("- callr:::get_result() ... done (after %d attempts)", ii)
if (debug) {
mdebug("Results:")
mstr(result)
}
## Retrieve any logged standard output and standard error
process <- future$process
## PROTOTYPE RESULTS BELOW:
prototype_fields <- NULL
## Has 'stderr' already been collected (by the future package)?
## Comment: This is unlikely to ever happen because you cannot
## capture stderr reliably in R, cf.
## https://github.com/HenrikBengtsson/Wishlist-for-R/issues/55
## /2021-04-05
if (is.null(result$stderr) && FALSE) {
prototype_fields <- c(prototype_fields, "stderr")
result$stderr <- tryCatch({
res <- process$read_all_error()
res
}, error = function(ex) {
label <- future$label
if (is.null(label)) label <- "<none>"
warning(FutureWarning(sprintf("Failed to retrieve standard error from %s (%s). The reason was: %s", class(future)[1], sQuote(label), conditionMessage(ex)), future = future))
NULL
})
}
if (length(prototype_fields) > 0) {
result$PROTOTYPE_WARNING <- sprintf("WARNING: The fields %s should be considered internal and experimental for now, that is, until the Future API for these additional features has been settled. For more information, please see https://github.com/HenrikBengtsson/future/issues/172", hpaste(sQuote(prototype_fields), max_head = Inf, collapse = ", ", last_collapse = " and "))
}
FutureRegistry("workers-callr", action = "remove", future = future)
result
} # await()
})
post_mortem_failure <- function(reason, future) {
assert_no_references <- import_future("assert_no_references")
summarize_size_of_globals <- import_future("summarize_size_of_globals")
stop_if_not(inherits(future, "Future"))
## (1) Trimmed error message
if (inherits(reason, "error")) reason <- conditionMessage(reason)
## (2) Information on the future
label <- future$label
if (is.null(label)) label <- "<none>"
stop_if_not(length(label) == 1L)
## (3) POST-MORTEM ANALYSIS:
postmortem <- list()
process <- future$process
pid <- tryCatch(process$get_pid(), error = function(e) NA_integer_)
start_time <- tryCatch(format(process$get_start_time(), format = "%Y-%m-%dT%H:%M:%S%z"), error = function(e) NA_character_)
msg2 <- sprintf("The parallel worker (PID %.0f) started at %s", pid, start_time)
if (process$is_alive()) {
msg2 <- sprintf("%s is still running", msg2)
} else {
exit_code <- tryCatch(process$get_exit_status(), error = function(e) NA_integer_)
msg2 <- sprintf("%s finished with exit code %.0f", msg2, exit_code)
}
postmortem$alive <- msg2
## (c) Any non-exportable globals?
globals <- future[["globals"]]
postmortem$non_exportable <- assert_no_references(globals, action = "string")
## (d) Size of globals
postmortem$global_sizes <- summarize_size_of_globals(globals)
## (4) The final error message
msg <- sprintf("%s (%s) failed. The reason reported was %s",
class(future)[1], label, sQuote(reason))
stop_if_not(length(msg) == 1L)
if (length(postmortem) > 0) {
postmortem <- unlist(postmortem, use.names = FALSE)
msg <- sprintf("%s. Post-mortem diagnostic: %s",
msg, paste(postmortem, collapse = ". "))
stop_if_not(length(msg) == 1L)
}
msg
} # post_mortem_failure()