Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R][C++] Allow cancelling long-running commands #27688

Closed
asfimport opened this issue Mar 2, 2021 · 15 comments
Closed

[R][C++] Allow cancelling long-running commands #27688

asfimport opened this issue Mar 2, 2021 · 15 comments

Comments

@asfimport
Copy link

asfimport commented Mar 2, 2021

When calling a long-running task (for example reading a CSV file) from the R prompt, users may want to interrupt with Ctrl-C.

Allowing this will require integrating R's user interruption facility with the cancellation API that's going to be exposed in C++ (see ARROW-8732).

Below some information I've gathered on the topic:

There is some hairy discussion of how to interrupt C++ code from R at https://stackoverflow.com/questions/40563522/r-how-to-write-interruptible-c-function-and-recover-partial-results and https://stat.ethz.ch/pipermail/r-devel/2011-April/060714.html .

It seems it may involve polling cpp11::check_user_interrupt() and catching any cpp11::unwind_exception that may signal an interruption. A complication is that apparently R APIs should only be called from the main thread. There's also a small library which claims to make writing all this easier: https://github.com/tnagler/RcppThread/blob/master/inst/include/RcppThread/RMonitor.hpp

But since user interruptions will only be noticed by the R main thread, the solution may be to launch heavy computations (e.g. CSV reading) in a separate thread and have the main R thread periodically poll for interrupts while waiting for the separate thread. This is what this dedicated thread class does in its join method: https://github.com/tnagler/RcppThread/blob/master/inst/include/RcppThread/Thread.hpp#L79

Reporter: Antoine Pitrou / @pitrou
Assignee: Dewey Dunnington / @paleolimbot

Related issues:

PRs and other links:

Note: This issue was originally created as ARROW-11841. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Antoine Pitrou / @pitrou:
cc @romainfrancois

@asfimport
Copy link
Author

Dewey Dunnington / @paleolimbot:
From the R perspective, you can call

SafeCallIntoRVoid([]() { cpp11::check_user_interrupt(); })

anywhere in C++ and it will return a non-OK status if there's a pending interrupt. That will only work for tasks run with RunWithCapturedR(), although that includes most of the operations one would want to cancel (e.g., reading CSV, Feather, and query engine execution after ARROW-16444). It also only works for places where the R package can insert some kind of cancel callback.

If we have a Future that we can cancel, we could rig something similar, maybe using our own event loop (currently we use Arrow's RunInSerialExecutor and I don't know how customizable that is).

In addition to RMonitor, there's also the 'later' package ( https://github.com/r-lib/later ) which can also run event loops although I don't know how customizable they are.

In the R package we have the RMainThread singleton which could be an appropriate place to register a C++ cancel callback (e.g., a std::function that calls some Future's cancel callback). In most cases, though, we don't have a future we can cancel.

@asfimport
Copy link
Author

Weston Pace / @westonpace:

If we have a Future that we can cancel, we could rig something similar, maybe using our own event loop (currently we use Arrow's RunInSerialExecutor and I don't know how customizable that is).

Cancellation is supported in C++ but not via cancellable futures (and probably won't be). Instead, operations which support cancellation take in a stop token. A stop token is something that the C++ code can poll on a regular basis to see if cancellation has been requested (very similar to cpp11::check_user_interrupt() but not global).

The stop token is connected to a stop source which the user holds onto. If the user marks the stop source as cancelled then the stop token will see the cancellation the next time it is polled and exit.

It sounds like, for R, this stop source approach won't work (is there no way to register a callback that gets called on cancellation instead of requiring polling?) In that case maybe we want a custom stop token implementation for R. This stop token's poll method could check cpp11::check_user_interrupt(). However, we'd probably want to debounce the call as a stop token might be polled very frequently and should be pretty fast.

@asfimport
Copy link
Author

Dewey Dunnington / @paleolimbot:
One way we can do this in just the R package is to wrap InputStream/RandomAccessFile objects with ones whose read methods call SafeCallIntoR() (with care to make sure we're not checking so frequently that is slows things down)? That would limit the checking to filesystem operations but I think it's rare for a single Read or ReadAt call to be large enough to matter in this context?

@asfimport
Copy link
Author

Dewey Dunnington / @paleolimbot:
Can the RunInSerialExecutor do something like setTimeout (i.e., schedule something for execution later)? If so, that could just do cpp11::check_user_interrupt(), set the C++ stop token if an interrupt has been requested, and schedule itself to run in a few seconds otherwise?

@asfimport
Copy link
Author

Weston Pace / @westonpace:

Can the RunInSerialExecutor do something like setTimeout (i.e., schedule something for execution later)? If so, that could just do cpp11::check_user_interrupt(), set the C++ stop token if an interrupt has been requested, and schedule itself to run in a few seconds otherwise?

That's not supported today but, it should be possible, and it seems like a pretty good way to solve this problem. One challenge is that we want to make sure to let cleanup happen (e.g. tasks that close file handles). However, there might be some middle ground here. RunInSerialExecutor, when it decides a timeout has occurred, could mark a StopSource as cancelled. We would not need to override stop token in that case.

@asfimport
Copy link
Author

Antoine Pitrou / @pitrou:
@paleolimbot When you try to interrupt e.g. CSV reading using Ctrl-C in R currently, what happens? It might be that it already works, given the changes mentioned.

@asfimport
Copy link
Author

Antoine Pitrou / @pitrou:
Also it would be worth taking a look how it's done in Python currently:
https://github.com/apache/arrow/blob/2519230121b9be3ecac01ac3ed2b610382dbca48/python/pyarrow/error.pxi

and the C++ APIs that it relies on:
https://github.com/apache/arrow/blob/2519230121b9be3ecac01ac3ed2b610382dbca48/cpp/src/arrow/util/cancel.h

They allow to automatically cancel a StopToken from a set of received signals. It should in turn interrupt whatever primitive is checking for StopToken cancellations (such as CSV reading).

@asfimport
Copy link
Author

Dewey Dunnington / @paleolimbot:
It's a little hard to test because it involves seeing how fast you can press Control-C, but I'm pretty sure that the sending an interrupt signal to CSV reading and an exec plan doesn't do anything:

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.

tf <- tempfile()
readr::write_csv(vctrs::vec_rep(mtcars, 5e5), tf)

# try to slow down CSV reading
set_cpu_count(1)
set_io_thread_count(2)

# compare timing of cancelled vs not cancelled (hard to tell the difference)
system.time(read_csv_arrow(tf))
#>    user  system elapsed 
#>   2.852   0.637   5.365
system.time(open_dataset(tf, format = "csv") |> dplyr::collect())
#>    user  system elapsed 
#>   2.920   0.219   3.049

# compare responsiveness of cancelling the read using other APIs
# (usually quite a difference)
system.time(readr::read_csv(tf))
#> Rows: 16000000 Columns: 11
#> ── Column specification ────────────────────────────────────────────────────────
#> Delimiter: ","
#> dbl (11): mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb
#> 
#> ℹ Use `spec()` to retrieve the full column specification for this data.
#> ℹ Specify the column types or set `show_col_types = FALSE` to quiet this message.
#>    user  system elapsed 
#>  19.424   1.267   3.496
system.time(read.csv(tf))
#>    user  system elapsed 
#>  20.858   0.718  21.864

It seems like we would need some sort of "run this bit of code in XX seconds" to implement this in the R bindings (or if there's an easier way that would be great!). It doesn't matter what thread it's on because SafeCallIntoR handles that...I think I know how to do that (start a thread, make it sleep for some number of seconds, then call SafeCallIntoR). The setup/cleanup could live in RunWithCapturedR?

@asfimport
Copy link
Author

Antoine Pitrou / @pitrou:
In your example, read_csv_arrow is actually interrupted or does it run to completion?

@asfimport
Copy link
Author

Dewey Dunnington / @paleolimbot:
I believe that it runs to completion even if you Control-C when it's running (going on evidence that it takes the same amount of time whether you do or do not cancel it). I also checked open_dataset() |> write_dataset(), which both takes the same amount of time AND writes the same thing regardless of whether or not you Control-C in the middle of it.

@asfimport
Copy link
Author

Dewey Dunnington / @paleolimbot:
Another piece of evidence is that if you use an R connection instead of a filename then you get immediate cancellation (because with an R connection as input it's calling SafeCallIntoR() every few ms and the error propagation will also propagate the interrupt condition).

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.

tf <- tempfile()
readr::write_csv(vctrs::vec_rep(mtcars, 5e5), tf)

# try to slow down CSV reading
set_cpu_count(1)
set_io_thread_count(2)

# you can cancel if you use an R connection because the error propagation
# will also propagate the interrupt condition
system.time(read_csv_arrow(file(tf)))
#>    user  system elapsed 
#>   2.909   0.598   3.410

@asfimport
Copy link
Author

Antoine Pitrou / @pitrou:
RunInSerialExecutor internally gets a future (using the private SerialExecutor::Run) and then waits for the future to complete.

Instead there should be a public way to get that future without waiting for it. Then you can wait for the future yourself, but with a timeout to implement whatever polling you want.

Of course, polling is suboptimal. Ideally there would be a way to temporarily override R's signal handlers, and reuse the same strategy as in Python. Perhaps that can be done using plain signal calls in C++?

@asfimport
Copy link
Author

Antoine Pitrou / @pitrou:
To elaborate on the latter strategy (temporarily override signal handlers in C++), you could mimick what is done on the Python/Cython side:

if isinstance(exc_value, ArrowCancelled):
if exc_value.signum:
# Re-emit the exact same signal. We restored the Python signal
# handler above, so it should receive it.
if os.name == 'nt':
SendSignal(exc_value.signum)
else:
SendSignalToThread(exc_value.signum,
threading.main_thread().ident)
else:
# Simulate Python receiving a SIGINT
# (see https://bugs.python.org/issue43356 for why we can't
# simulate the exact signal number)
PyErr_SetInterrupt()
# Maximize chances of the Python signal handler being executed now.
# Otherwise a potential KeyboardInterrupt might be missed by an
# immediately enclosing try/except block.
PyErr_CheckSignals()
# ArrowCancelled will be re-raised if PyErr_CheckSignals()
# returned successfully.

... except that PyErr_SetInterrupt would become either either Rf_onintr or Rf_onintrNoResume (not sure which one). And I don't think PyErr_CheckSignals needs a R equivalent at all.

As for setting and restoring signal handlers, you would do that in C++ become R doesn't seem to have any equivalent APIs (so you would use our own SetSignalHandler helper function).

@asfimport
Copy link
Author

Dewey Dunnington / @paleolimbot:
Issue resolved by pull request 13635
#13635

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants