Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] How to stop a parallel loop when an error has been raised #213

Open
Jean-Romain opened this issue Apr 9, 2018 · 2 comments
Open

Comments

@Jean-Romain
Copy link

This a question I asked on stackoverflow. Nobody answered so I'm annoying you here 😉

I create parallel loops using future. Sometime the expression called raise an error. In that cases the whole process is ran (takes a long time) and the function fails only at the end when the futures are evaluated.

Instead the function should fail when an error is raised. Especially if the error is raised at the beginning of the process. For example this code will fail only after 100 iterations but it is actually likely to fail before the 10th one.

future::plan(future::multiprocess, workers = 4)

g = function()
{  
  x = vector("list", 100)
  for(i in 1:100)  
  {
    x[[i]] = future::future({ h() })
    cat(sprintf("\rProgress: %g%%", i, file = stderr()))
  }  
  return(future::values(x))to
}

h = function()
{
  u = runif(1, 0, 100)
  if (u > 80) 
    stop("Error")
  return(u)
}

How to manage error handling with future? I perfectly understand why it is difficult but in the meantime future_lapply can do it so it is possible.


Btw, I'm actually able to do it but to me it looks more like a kind of hack than a real solution. I'm using a temporary file to communicate the error to the main process.

g = function()
{
  log = tempfile()
  x = vector("list", 100)
  for(i in 1:100)
  {
    x[[i]] = future::future({tryCatch(h(), error = function(e){ write(toString(e), log) })})
  
    if (file.exists(log))  {
      msg = scan(log, character(), quiet = TRUE)
      msg = paste(msg, collapse = " ")
      cat("\n")
      stop(msg, call. = FALSE)
    }
      
    cat(sprintf("\rProgress: %g%%", i, file = stderr()))
  }
  
  return(future::values(x))
}

Thanks

@HenrikBengtsson
Copy link
Collaborator

HenrikBengtsson commented Apr 10, 2018

This will be a long reply with lots of details:

To understand how to achieve "early stopping", it's important to understand that there is no way for a parallel R workers to "signal" back to the master process. The only way the master process can know whether a future has produces an error or not is for the master process to "ask" the worker. In other words, we need to poll the futures in order to detect errors.

You can poll a future for errors by explicitly retrieving its value, i.e. value(f) - that will trigger / re-signal any errors produced. To avoid blocking when doing this, you do something like if (resolved(f)) value(f).

There is another way to signal errors early. I've kept this feature a little bit under the radar - by specifying earlySignal = TRUE when you create a future, e.g.

    x[[i]] <- future::future(..., earlySignal = TRUE)

What's important to understand is that, just explained above, errors from this future will only be signalled when that future is polled. When does that happen? It happens when there are no more workers available - then the future framework starts polling existing futures to check if they're done. As soon as one is done, it'll collect it's value _and if the future that is collected happens to produce and error and was created using earlySignal = TRUE, then the error will be signalled on master. This also means that if there are more workers than tasks, the there will be no early signalling in this case, e.g. when you have many cores or if you use say plan(future.batchtools::batchtools_sge) then you'll have an infinite number of workers by default. Also, it could be that the above "find-me-and-available-worker" look up never gets to ask the one worker that produce an error because the other workers are available sooner. Then you will also not get early signalling.

A poor man's approach may then be to continuously loop over the existing futures and check if they're resolved (which is non-blocking) and only then query the value (blocking). Something like:

  signal_errors_early <- function(futures) {
    for (future in futures) if (resolved(future)) value(future)
    invisible()
  }

  for(i in 1:100)
  {
    signal_errors_early(x[seq_len(i - 1L)])
    x[[i]] <- future::future({tryCatch(h(), error = function(e){ write(toString(e), log) })})
    [...]
  }

Note that future::future() will eventually collect value() internally whenever there are no more available workers, so the overhead is not that big. What you might wanna do though is to keep track of which resolved futures you've already checked and skip those in signal_errors_early().

About future_lapply(): It does not do any smart early signalling of errors at the moment. For example,

> library("future.apply")
> plan(multiprocess, workers = 3L)
> system.time(y <- future_lapply(1:3, FUN = function(ii) { if (ii == 2) stop("boom"); Sys.sleep(10) }))
Error in ...future.FUN(...future.X_jj, ...) : boom
Timing stopped at: 0.58 0.076 10.19

It could be that you concluded this because chunking takes place by default, so if you future_lapply() over 100 elements and you only have 2 workers, it will process 1:50 in one worker and 51:100 in another using base::lapply(). Since an error in base::lapply() will interrupt instantly, it may appear as it stopped sooner.

I'll add "early signalling" to the list of features that could be improved, e.g. it might be that the above signal_errors_early() could be incorporated into the future.apply package as an optional feature.

@Jean-Romain
Copy link
Author

Jean-Romain commented Apr 11, 2018

Thank you very much for this long and detailed answer. The good new is that my understanding of how future works is good. Your answer confirms some of my guesses for example about future_lapply. The behavior is the one I understood reading the code.

I like the earlySignal = TRUE. This is very simple and good enough for my needs. I understand that it does not interrupt the loop immediately but only when the worker is polled. But is sound good. After all, interrupting the loop immediately is not very important. Being able to stop early compared to the whole process is good. Also the more the number of workers, the later the interruption but considering my usage I do not worried about that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants