Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WISH: Add method for interrupting / terminating a future #93

Open
HenrikBengtsson opened this issue Jul 14, 2016 · 21 comments
Open

WISH: Add method for interrupting / terminating a future #93

HenrikBengtsson opened this issue Jul 14, 2016 · 21 comments
Labels
feature request feature/terminate Frontend API Part of the Future API that users of futures rely on

Comments

@HenrikBengtsson
Copy link
Owner

HenrikBengtsson commented Jul 14, 2016

Background / Question

@gdevailly, sorry I didn't see your June 16 question on Twitter until now;

How can one stops futures that are currently running?

The simple answer is that this is not possible.

CLARIFICATION 2018-03-15: but I hope we can add something that can be used (e.g. although not part of the official Future API there could this be functions that the end user can use to manually kill workers)

Ideas / Thoughts

Technically, it should be possible to interrupt and even terminate background R processes that evaluates a future. At least if they run on the local machine.

For instance, for multicore futures we already have an internal handle for the process ID (pid) of the forked process. For multisession futures, we could retrieve the pid for each cluster worker before launching the actual future. With this pid, we should be able to send an interrupt signal, which should signal an interrupt condition within R for that process. What complicates this is that we need to come up with a platform-independent method for terminating/signalling processes. We most likely need to reach out to a system() call for this. This should be doable, but increases the risk for not working everywhere.

Signalling a process running on another machine is a bit more complicated. It would basically require being able to launch a separate signalling process on that same machine. Not impossible, but also not guaranteed to work, e.g. maybe the future process already running occupies the last available port / socket for that machine.

For futures running on a cluster via a job scheduler (as created by the future.BatchJobs package) it should also be possible to terminate such futures / jobs using the killJob() functionality already provided by the BatchJobs package.

That's just some of my thoughts for now.

@gdevailly
Copy link

Thanks for the detailed answer!

@jstockwin
Copy link

@HenrikBengtsson Are there any platform specific workarounds for this until it gets implemented? I develop things on Windows, but hosting is done on Linux servers, so I guess even just a way to do this on Linux would be useful. It is as simple as just getting the process pid and killing it?

In windows, if I force close the background processes ("R for Windows front end") then R get's very unhappy - I get an error error writing to connection when next trying to run a function using future. (I'm using plan(multiprocess) if that makes a difference). This makes me think it might be more complicated than killing the pid, but maybe things would be different on Linux.

P.S. I can't actually find how to get the pid from a future in the documentation. You have lots of examples which have Sys.getpid() calls inside the future, but how do I get access to this pid in the main thread while the future is still running?

@HenrikBengtsson
Copy link
Owner Author

@HenrikBengtsson Are there any platform specific workarounds for this until it gets implemented? I develop things on Windows, but hosting is done on Linux servers, so I guess even just a way to do this on Linux would be useful. It is as simple as just getting the process pid and killing it?

Not really, unless you're using fork R processes, i.e. plan(multicore) - and as written below, it should not be treated as an officially supported approach simply because the Future API does not yet have a spec for this stuff.

In windows, if I force close the background processes ("R for Windows front end") then R get's very unhappy - I get an error error writing to connection when next trying to run a function using future. (I'm using plan(multiprocess) if that makes a difference).

Just to clarify, on Windows multiprocess -> multisession, and on *nix/macOS multiprocess -> multicore. So, to reproduce Windows behavior everywhere, one can use plan(multisession).

This makes me think it might be more complicated than killing the pid, but maybe things would be different on Linux.

So, multisession futures uses workers that are part of a local "cluster" which is basically the same as when you set up a cluster using cl <- parallel::makePSOCKcluster(); plan(cluster, workers = cl). Each worker is a regular R session running in the background. These workers are created once and then servers multiple futures. In other words, if you kill one of these background R workers, then it won't be available to serve other futures. The error message you get is because the future package assumes the cluster worker is still around. There is currently no exception handling implemented in the future package to deal with this case.

This is in contrast to the plan(multicore), which is creating a new fork for each future*. Thus, kill a fork doesn't harm future futures. So, it works to kill a multicore worker, but not a multisession worker. However, that is works is just a side effect and is not there by design, so it should not be relied upon right now (*) one could imagine multicore workers with a longer lifespan just like multisession, but that's a different story.

P.S. I can't actually find how to get the pid from a future in the documentation. You have lots of examples which have Sys.getpid() calls inside the future, but how do I get access to this pid in the main thread while the future is still running?

It's not possible. However, you could achieve it by setting up a cluster manually, e.g.

library("future")
cl <- makeClusterPSOCK(availableCores())

and grab the PIDs with the following hack:

for (kk in seq_along(cl)) {
  parallel:::sendCall(cl[[kk]], fun = Sys.getpid, args = list())
  pid <- parallel:::recvResult(cl[[kk]])
  attr(cl[[kk]]$host, "pid") <- pid
}

That will annotate each worker in 'cl' with also the PID. (I've thought about adding this as an automatic feature that could be added to future::makeClusterPSOCK() - it would then also serve as an additional validation that the master-worker setup works as expected).

To use this cluster with futures, you do:

plan(cluster, workers = cl)

With this, you can grab the worker information for a particular future like this:

f <- future(Sys.sleep(300))
host <- f$workers[[f$node]]$host
print(host)
## [1] "localhost"
## attr(,"pid")
## [1] 7599
pid <- attr(host, "pid")

That gives you the PID of the process for a particular future. Note however, that that f$workers[[f$node]] is not part of the official API and might change at any time. So, if you use it, make sure you're aware it's a hack. For instance, it's not unlikely that I'll change this to f$worker at some point.

Then you can send various system signals (e.g. kill -INT) to this process using system calls or using one of the various R packages that wrap such features in an R API (don't remember them out of my head).

However, to get this working properly you probably also need to make your future expression / future code interrupt aware using withCallingHandlers() etc. It'll also unknown to me what happens if you signal too many interrupts in a row - it might be that you manage to interrupt the main R-loop of the worker, which then will cause the R worker to terminate. That'll result in a missing R worker and you've got that problem you mention at the beginning.

As you see from the above, there are lots of things that needs to be put in place in order to be able to interrupt a future - and the above is just when you run on your local machine. To do this remotely is even harder. Having said this, it should be possible to add bits and pieces to the future that moves this in the right directly.

@HenrikBengtsson
Copy link
Owner Author

Forgot to say, if possible, you could of course always write the future code to once in a while peek at a shared file for instructions from master. For instance, if a file interrupt exists, then the future should return when it becomes aware of it. Then the master can create that file as a way to signal an "interrupt" to the future. A poor-mans signalling scheme.

@jstockwin
Copy link

@HenrikBengtsson Thanks for the very detailed response. Your additional comment is not helpful to me as I'm running a function from an external package in the future, so can't really make it check a file.

The rest makes sense, and I can see now that it really is a lot of work. I'll maybe have a play on a linux machine if I have time, but this probably won't happen anytime soon.

The reason I'm asking is that I have a "Stop" button in an R shiny app which currently makes the main R process just entirely forget about the future'd function. However, locally on Windows if I try do get more than 3 or 4 (I have 4 cores) things going at a time, everything locks up as there are no more cores. Potentially, the external function could take hours if the user puts bad inputs in, so it'd be useful to properly interrupt the process. The "Stop" button doesn't stop the function (which is running in a future), but will let it continue and just forget about it. Thus is a single user tries running something which will take ages once, everything is pretty much dead until one of their jobs finishes, if that makes sense.

I'll happily help contribute towards a solution to this in any way possible, but wouldn't really know how to start I'm afraid...

If a solution to this just isn't going to happen for a long time then fair enough. I understand that this is currently a limitation of how things are done. Thanks for your time!

@jstockwin
Copy link

I do feel like this is an important issue that shouldn't be ignored though! It would be super nice to have some kind of endFuture function attached to a future object. Something like

a <- future(...)
futureObj <- futureOf(a)
killProcess(futureObj)

would be ideal, and I feel like it's applicable to a wide range of situations.

@DarwinAwardWinner
Copy link

It sounds like the hardest case out of the ones you mentioned is the "process on another machine" case. To me, it seems like the simplest solution to that problem is to start and maintain a control process on each remote machine, which just sits idle until instructed to kill something by the master process. That should solve the running out of resources problem since the control process gets to claim those resources before any workers do.

@HenrikBengtsson
Copy link
Owner Author

HenrikBengtsson commented Aug 18, 2017

Thanks for spending time thinking about this. Yes, for SOCK-like cluster, having a janitor process on each machine seems to be the one option. This can be done by launching one janitor per machine. However, this would require a new backend or a heavily modified version of parallel's makeCluster().

An alternative approach would be that for each worker (future) launched a main monitor/janitor process which then launches the actual working process. This can be done with the existing SOCK framework of parallel.

A third alternative is to have the main R session simply log in to the machine when needed and kill the given set of workers.

However, these type of features may better be suited for a separate backend package (think parallel, snow, processx, sys, batchtools, ..., new_backend_pkg).

@DarwinAwardWinner
Copy link

Another option is to make cancellation support optional for future backends, and have the cancellation function return a logical vector indicating which futures were actually cancelled (and probably issue a warning if it can't cancel something). The worst case for a future that can't be cancelled is that it finishes running and then the result is discarded, which is fairly benign in many cases. Presumably you'd also want a function to assert that the current backend supports cancellation so you can check that before starting a future that you can't cancel.

@bedantaguru
Copy link

I was using this function

stopFuture <- function(x){
  tools::pskill(x$job$pid,signal = tools::SIGTERM)
  tools::pskill(x$job$pid,signal = tools::SIGKILL)
}

Here is and example (derived from original examples)

require(future)

# normal 
plan(multiprocess)
a %<-% {
  cat("Resolving 'a' ...")
  Sys.sleep(2)
  cat("done\n")
  Sys.getpid()
}

cat("Waiting for 'a' to be resolved ...\n")

f <- futureOf(a)
count <- 1
while (!resolved(f)) {
  cat(count, "\n")
  Sys.sleep(0.2)
  count <- count  + 1 
}

cat("Waiting for 'a' to be resolved ... DONE\n")

a


# terminated 
plan(multiprocess)
a %<-% {
  cat("Resolving 'a' ...")
  Sys.sleep(2)
  cat("done\n")
  Sys.getpid()
}

cat("Waiting for 'a' to be resolved ...\n")

f <- futureOf(a)
stopFuture(f)
count <- 1
while (!resolved(f)) {
  cat(count, "\n")
  Sys.sleep(0.2)
  count <- count  + 1 
}

cat("Waiting for 'a' to be resolved ... DONE\n")

a

@HenrikBengtsson HenrikBengtsson added the Frontend API Part of the Future API that users of futures rely on label Mar 11, 2018
@HenrikBengtsson
Copy link
Owner Author

Just an FYI, in relationship to this, I just filed bug report PR17395 with a patch for tools::pskill(), because it does not return a correct value. It still sends the signal, but due to the bug (actually two bugs) it is currently impossible to know whether the signal was successfully sent or not.

@HenrikBengtsson
Copy link
Owner Author

UPDATE: The tools::pskill() bug was fixed in R-devel r74426 (to become R 3.5.0 in April 2018), cf. wch/r-source@a84374c

@raphaelvannson
Copy link

raphaelvannson commented May 3, 2018

Hello,

External kill

A tad hacky and obviously needs some logic before kill can be invoked but this could be helpful to some of us until a clean solution is supported by the package.
The logic to identify could be in the parent (if it makes sense for it to be there) or in another R async sweeper process or cron job checking which pids need to be terminated.

promise = future(myFunctionWichDoesAsyncStuff())
async_pid = promise$job$pid
system(paste("kill -9", async_pid))

Submit and retrieve with the promises package

See #222 if you would like to kill the async process after retrieving its return value into the parent process.

Submit and forget

Finally if you need a "fire and forget" call (value returned by async process never retrieved into the parent process), there is always the system call to an R script which will automatically terminate the async process when the processing is done, even if the parent process was terminated before the async completed:

Rcommand = paste("Rscript /path/to/my/script.R", arguments)
out = system(Rcommand, wait = FALSE)

@arumds
Copy link

arumds commented Nov 24, 2018

I am in a similar situation to stop async process. The promise from the future() is piped to the file_rows() reactive output value.

Based on the tip by @raphaelvannson i tried the below way to stop the future.

library(shiny)
library(shinyjs)
library(shinydashboard)
library(ipc)
library(promises)
library(future)
library(V8)
plan(multiprocess)
 server <- function(input, output, session) {
  file_rows <- reactiveVal()
  
  observeEvent(input$rec_run, {
    prog <- Progress$new(session)
    prog$set(message = "Analysis in progress",
             detail = "This may take a while...",
             value = NULL)
fut = future({system(paste(
      "orthofinder",     ###external software
      file_input,     ##reactive input
      "|",
      paste0("head -", file_nrows) ,   ##reactive input
      ">",
      "out.txt"
    ))
    head_rows <- read.delim("out.txt")
    head_rows
    }) %...>%
     file_rows() %>%
     finally(~prog$close())
})

  observeEvent(input$rec_cancel, {
    async_pid <- fut$job$pid
    system(paste("kill -15", async_pid))
  })

However, it returns the error: "Warning: Error in observeEventHandler: object 'fut' not found"

when promise is given as fut<<- future() in the above code then it returns the below error:

sh: 1: kill: Usage: kill [-s sigspec | -signum | -sigspec] [pid | job]... or
kill -l [exitstatus]

Could someone hint what is wrong here?

@DarwinAwardWinner
Copy link

The usage message is telling you that you are calling kill incorrectly, probably because of the lack of a space between "kill -15" and async_pid. I recommend you look into the glue package as a more robust way to interpolate variables into strings. Instead of paste, you would use glue("kill -15 {async_pid}").

@arumds
Copy link

arumds commented Nov 24, 2018

 observeEvent(input$rec_cancel, {
    async_pid <- fut$job$pid
    glue("kill -15 {async_pid}")
  })

This is not doing anything and the log file is also empty.

It does not look like the issue is with paste. It was tested using Sys.getpid()

observeEvent(input$rec_cancel, {
   #async_pid <- fut$job$pid
    async_pid <- Sys.getpid()
    system(paste("kill -15", async_pid))
  })

The log file has the below message:

Terminated R --no-save --slave -f /opt/shiny-server/R/SockJSAdapter.R

It is terminating the pid running "/opt/shiny-server/R/SockJSAdapter.R" but not terminating the PID running the system() command inside the future().

It looks like the problem could be in fetching the PID of future() or the syntax of using future/promises in the above code.

@DarwinAwardWinner
Copy link

Oh, you're right, I forgot paste defaults to using a space in between. Still, the error message you got from kill in your previous comment normally indicates an incorrect usage, so I'm not sure what's going on.

@arumds
Copy link

arumds commented Nov 24, 2018

Sys.getpid() correclty fetches the PID value and therefore the kill command works by killing the corresponding PID. I tired to print PIDs to log file to see if they return any PID value.

      cat(fut1$job$pid),
       cat(Sys.getpid())

It only prints the PID given by Sys.getpid() while fut1$job$pid is empty. It points that kill is not getting any value and hence the usage error in case of fut1$job$pid. Since the future is run using plan(multiprocess) the PID that runs the async future is different from the Sys.getpid() as described at https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html and the core problem here is to fetch the PID of the async task. It would be helpful if someone could help to fetch the PID for the process running the async task with future() in the above code.

@ismirsehregal
Copy link

ismirsehregal commented Dec 7, 2018

In the meantime the above question from @Mehar-git was also carried to Stackoverflow: please see my Edit part.
I was able to give an example which gets the PID and kills the process but this only works a couple of times before R stops with an error:

Unhandled promise error: callr failed, could not start R, exited with non-zero status, has crashed or was killed
Warning: Error in : callr failed, could not start R, exited with non-zero status, has crashed or was killed 
  95: <Anonymous>

I guess this is the case because the fututre isn't interrupt aware as mentioned by @HenrikBengtsson earlier in this thread. Maybe someone can point me in the right direction for the implementation of using withCallingHandlers() as suggested by @HenrikBengtsson .

@HenrikBengtsson
Copy link
Owner Author

Don't have a solution and very little time to look into this, but note that when the promises package is involved, things are getting much more complicated because you are now also dealing with a background, asynchronous event loop (by the later package). That becomes rather complicated to troubleshoot and you are likely to run into issues that are hard to replicate (e.g. "works a few times but then fails").

I guess this is the case because the fututre isn't interrupt aware ...

Correct that there is no support for terminating/interrupting futures in the core Future API. Also, the different future backends hasn't really been implemented making them agile to the R workers failing/terminating - but some try to do post-mortem analysis involving trying to detect when a background R process is gone.

@macieksk
Copy link

@HenrikBengtsson

I was using this function

stopFuture <- function(x){
  tools::pskill(x$job$pid,signal = tools::SIGTERM)
  tools::pskill(x$job$pid,signal = tools::SIGKILL)
}

Thanks! I found that using

stopFuture(ft)
try(value(ft))

cleans parallel:::children, which otherwise is left cluttered with stopped processes ids.

> library(parallel)
> parallel:::children()
list()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request feature/terminate Frontend API Part of the Future API that users of futures rely on
Projects
None yet
Development

No branches or pull requests

9 participants