Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WISH: Add resolve() for efficiently resolving and retrieving values asynchronously #29

Closed
HenrikBengtsson opened this issue Dec 20, 2015 · 1 comment

Comments

@HenrikBengtsson
Copy link
Owner

Consider a large number of futures like:

library("listenv")
library("future")

LONG_TIME <- 60
LARGE_NUMBER <- 1e6
create_large_data <- function(ii) {
  Sys.sleep(runif(1L, max=LONG_TIME))  ## Slow process
  rnorm(runif(1L)*LARGE_NUMBER)  ## Large data
}

x <- listenv()
for (ii in 1:10) {
  cat(sprintf("Future #%d\n", ii))
  x[[ii]] %<=% { create_large_data(ii) }
}

If the above futures are processed on a cluster it may take some time to retrieve each of the future values because the values are large and they need to be serialized in order to be transfer back to the main processes. This may take different amount of time for different futures.

Now, if we use

y <- as.list(x)

to resolve and collect the values, we basically do so sequentially. In other words, x[[2]] won't be called until x[[1]] is completed. Now, if future x[[2]] is already resolved but x[[1]] takes a long time to be evaluated, our main process is forces to be idle until x[[1]] is resolved.

It would be better to be able to start retrieving the value of future x[[2]] in the meanwhile. In order to do this, we need to query the futures to check whether they're are resolved or not and only start retrieving values for futures that are resolved. Something like:

resolve <- function(...) UseMethod("resolve")
resolve.listenv <- function(x, ..., sleep=1.0) {

  fs <- futureOf(envir=x, drop=TRUE)
  resolved <- logical(length(fs))
  while (!all(resolved)) {
    for (ii in which(!resolved)) {
      if (!resolved(fs[[ii]])) next
      ## Retrieve value (allow for errors)
      tryCatch({ value(fs[[ii]]) }, error = function(ex) {}) 
      resolved[ii] <- TRUE
    } # for (ii ...)

    ## Wait a bit before checking again
    if (!all(resolved)) Sys.sleep(sleep)
  } # while (...)

  ## Touch every element to trigger removal of internal future variable
  for (ii in seq_along(x)) force(x[[ii]])

  x
} ## resolve() for listenv

which we then can use as:

x <- resolve(x)
x <- as.list(x)
@HenrikBengtsson
Copy link
Owner Author

Implemented in commit 93eae7e.

@HenrikBengtsson HenrikBengtsson changed the title https://github.com/HenrikBengtsson/listenv/issues WISH: Add resolve() for efficiently resolving and retrieving values asynchronously Dec 20, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant