Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How do I terminate Pools cleanly? #72

Closed
Zaharid opened this issue Jul 10, 2016 · 15 comments
Closed

How do I terminate Pools cleanly? #72

Zaharid opened this issue Jul 10, 2016 · 15 comments

Comments

@Zaharid
Copy link
Contributor

Zaharid commented Jul 10, 2016

Looking at #70, this looks like a difficult problem. I would like a way to shut down cleanly a Kernel using pools when the user cancels the program (e.g. KeyboardInterrupting it). The most obvious problem is in curio/workers.py, here https://github.com/dabeaz/curio/blob/master/curio/workers.py#L284: By the time the finally is executed, shutdown is already called, setting the workers to None, and thus resulting in an attribute error. Once that is worked around, with e.g.

diff --git a/curio/workers.py b/curio/workers.py
index 78895fd..d779d99 100644
--- a/curio/workers.py
+++ b/curio/workers.py
@@ -282,7 +282,8 @@ class WorkerPool(object):
                 worker = self.workercls()
                 raise
             finally:
-                self.workers.append(worker)
+                if self.workers is not None:
+                    self.workers.append(worker)

There are still some issues. If I have a simple program like:

import curio

def f():
    import time
    time.sleep(40)

async def main():
    coro = curio.run_in_process(f)
    task = await curio.spawn(coro)

kernel = curio.Kernel()
try:
    kernel.run(main())
except KeyboardInterrupt:
    print("OK")

and I run it and then press Ctrl+C, it almost works, except for some noisy multiprocessing delete action:

$ python  simple.py 
^COK
Process Process-1:

If I change f() to explicitly ignore the interrupt signal, it shuts quiet:

def f():
    import time
    import signal
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    time.sleep(40)
$ python  simple.py 
^COK

However, if I have a more complex code (specifically this https://github.com/NNPDF/reportengine/blob/25f7b0c4680a91b692653e24b1a0b5f8c222dc99/src/reportengine/resourcebuilder.py#L135) with more tasks being ran and waited for I get the dreaded GeneratorExitthing 4 times, once for each concurrent run_in_process. Not really sure why it behaves differently from the simple case:

Exception ignored in: <bound method Task.__del__ of Task(id=9, <coroutine object run_in_process at 0x7f6dadf60830>, state='READ_WAIT')>
Traceback (most recent call last):
  File "/home/zah/sourcecode/curio/curio/task.py", line 53, in __del__
    self.coro.close()
  File "/home/zah/sourcecode/curio/curio/workers.py", line 108, in run_in_process
    return await kernel._process_pool.apply(callable, args, kwargs)
RuntimeError: coroutine ignored GeneratorExit
Exception ignored in: <bound method Task.__del__ of Task(id=22, <coroutine object run_in_process at 0x7f6dae474728>, state='READ_WAIT')>
Traceback (most recent call last):
  File "/home/zah/sourcecode/curio/curio/task.py", line 53, in __del__
    self.coro.close()
  File "/home/zah/sourcecode/curio/curio/workers.py", line 108, in run_in_process
    return await kernel._process_pool.apply(callable, args, kwargs)
RuntimeError: coroutine ignored GeneratorExit
Exception ignored in: <bound method Task.__del__ of Task(id=24, <coroutine object run_in_process at 0x7f6dadf69258>, state='READ_WAIT')>
Traceback (most recent call last):
  File "/home/zah/sourcecode/curio/curio/task.py", line 53, in __del__
    self.coro.close()
  File "/home/zah/sourcecode/curio/curio/workers.py", line 108, in run_in_process
    return await kernel._process_pool.apply(callable, args, kwargs)
RuntimeError: coroutine ignored GeneratorExit
Exception ignored in: <bound method Task.__del__ of Task(id=25, <coroutine object run_in_process at 0x7f6dadf696d0>, state='READ_WAIT')>
Traceback (most recent call last):
  File "/home/zah/sourcecode/curio/curio/task.py", line 53, in __del__
    self.coro.close()
  File "/home/zah/sourcecode/curio/curio/workers.py", line 108, in run_in_process
    return await kernel._process_pool.apply(callable, args, kwargs)
RuntimeError: coroutine ignored GeneratorExit
@dabeaz
Copy link
Owner

dabeaz commented Jul 10, 2016

Try doing this and see what happens::

kernel = curio.Kernel()
try:
    kernel.run(main())
except KeyboardInterrupt:
    kernel.run(shutdown=True)
    print("OK")

@Zaharid
Copy link
Contributor Author

Zaharid commented Jul 10, 2016

The first Ctrl+C appears ignored, and a second one gives:

^C^CTraceback (most recent call last):
  File "simple.py", line 17, in <module>
    kernel.run(main())
  File "/home/zah/sourcecode/curio/curio/kernel.py", line 532, in run
    events = selector_select(timeout)
  File "/home/zah/anaconda3/lib/python3.5/selectors.py", line 432, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "simple.py", line 19, in <module>
    kernel.run(shutdown=True)
  File "/home/zah/sourcecode/curio/curio/kernel.py", line 532, in run
    events = selector_select(timeout)
  File "/home/zah/anaconda3/lib/python3.5/selectors.py", line 432, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

This is always with the patch for the AttributeError, otherwise it fails there.

@dabeaz
Copy link
Owner

dabeaz commented Jul 10, 2016

Hmmm. Will have to investigate further. One thing is critically important though--quitting without calling kernel.run(shutdown=True) is likely to cause a lot of chaos. This is supposed to issue a cancellation to all remaining tasks which will cause them to cleanly exit. I'm not exactly sure what might be going on in the above code that would require you to have to issue a second KeyboardInterrupt.

Big picture: It should be possible to do this with curio if you are careful about how it happens though. It might require some experimentation and patches.

@dabeaz
Copy link
Owner

dabeaz commented Jul 10, 2016

I think I have a better idea of what is happening here and possibly how to fix it. Short version: The keyboard interrupt is being caught in a strange spot and the kernel is not able to cleanly cancel things. Need to spend some time fiddling with it.

@dabeaz
Copy link
Owner

dabeaz commented Jul 11, 2016

Wow, okay. Control-C is tremendously evil and tricky. Let's discuss.

First, in order to have an orderly shutdown of coroutines, it's not sufficient to simply catch KeyboardInterrupt and exit. You'll probably get a ton of errors about GeneratorExit, non-awaited coroutines, and similar things. So, that final step of shutting down the kernel is going to be a critical step.

Even if you do the shutdown, the arrival of KeyboardInterrupt itself is crazy if you consider where it might arrive:

  1. It is most likely to arrive while the kernel is paused waiting for I/O on select. This makes the kernel die with a KeyboardInterrupt, but doesn't kill any of the executing coroutines.
  2. It could potentially happen on any statement in the kernel. Possibly leaving the kernel in an unknown state (for example, arrival of keyboard interrupt in the middle of code that's performing some kind of task-related bookkeeping).
  3. It could happen while executing any statement in any task (coroutine). This will make that coroutine terminate. This seems to make the kernel itself terminate, but it's not clear to me that it's left in a well-defined state.

It seems that one can restore a lot of sanity to the situation by trying to force all Control-C handling into a coroutine like this::

async def kb_interrupt():
        await curio.SignalSet(signal.SIGINT).wait()
        raise KeyboardInterrupt()

async def main():
       await spawn(kb_interrupt())
       ... rest of program here ...

kernel = curio.Kernel()
try:
       kernel.run(main())
except KeyboardInterrupt:
       kernel.run(shutdown=True)

If you do this, Control-C will be routed to a known and predictable location in the program (always to the kb_interrupt coroutine). When it happens, the kernel will stop. After that, you can perform the cleanup step and terminate.

Yikes.

I've pushed a few minor fixes to make this work correctly. Open to all ideas on how to make this more sane.

Zaharid added a commit to NNPDF/reportengine that referenced this issue Jul 16, 2016
@Zaharid
Copy link
Contributor Author

Zaharid commented Jul 16, 2016

I got around playing with this and I got some AttributeErrosrs in Kernel.del.

If I change it to:

    def __del__(self):
        if self._kernel_task_id is not None:
            if self._notify_sock is not None:
                self._notify_sock.close()
            if self._wait_sock is not None:
                self._wait_sock.close()
            self._kernel_task_id = None
        if self._thread_pool:
            self._thread_pool.shutdown()
        if self._process_pool:
            self._process_pool.shutdown()

Then everything seems to work fine.

Could there be an option in curio.run() to do all these things automatically?
Thanks!

@dabeaz
Copy link
Owner

dabeaz commented Jul 18, 2016

Reviewing some of the code, it seems that there is some kind of duplication between del() and _shutdown_resources(). In the big picture, I think that destroying a Kernel without some kind of controlled shutdown should be considered an error.

So, I've modified the _shutdown_resources() method so clean up pools in addition to its usual cleanup. This should now happen automatically for the curio.run() method or if you explicitly call Kernel.run(shutdown=True). I've modified del() to raise an exception if it's invoked without some kind of cleanup taking place first (note: maybe a warning message is better?). I've also made the Kernel class work as a context manager to give better control over the shutdown process.

@Zaharid
Copy link
Contributor Author

Zaharid commented Aug 1, 2016

A problem I am seeing now is the following:

If I have a very deeply nested set of task running in pools, e.g. a generalized DAG dependency resolver as I am trying in the code above I find no reasonable way of shutting down everything once I get an error in one of the tasks. After trying with many combinations of try except, the tasks scheduled with spawn seem to try to run in the pool before giving the opportunity to cancel them, creating a very long cascade of errors and unwanted work.

The workaround I have found is:

        wait_tasks = curio.wait(pending_tasks)
        async for completed_task in wait_tasks:
            try:
                result = await completed_task.join()
            except curio.TaskError as e:
                raise KeyboardInterrupt from e

which I believe works thanks to the special way in which Keyboard interrupt is treated now. Then this can be handled like:

     with kernel:
            try:
                kernel.run(main())
            except KeyboardInterrupt as e:
                log.info("Cancelling remaining tasks.")
                if e.__cause__:
                    raise e.__cause__
                raise

Could there be something like curio.shutdown() (or raise curio.Shutdown()) to handle this kind of thing?

@dabeaz
Copy link
Owner

dabeaz commented Aug 1, 2016

I'm a bit curious. Instead of raising KeyboardInterrupt, does raising SystemExit also work?

@Zaharid
Copy link
Contributor Author

Zaharid commented Aug 1, 2016

Yes, it works in the same way.

@dabeaz
Copy link
Owner

dabeaz commented Aug 3, 2016

Been thinking about this a bit more. Are you thinking that their ought to be a special Shutdown exception one could raise in a coroutine and have that shut down the entire kernel and all running tasks?

@Zaharid
Copy link
Contributor Author

Zaharid commented Aug 3, 2016

Yes. I think that's an useful functionality that there is currently no easy way to do (other than with the above hacks, which would be equivalent).

Also I'm wondering if that should be the default behaviour in the face of an unhandled exception that propagates down to the point where you call run. What is the use of having tasks running if the thing that invoked them is in a state of error?

@dabeaz
Copy link
Owner

dabeaz commented Aug 3, 2016

I could imagine a scenario where the same kernel is used to run multiple coroutines submitted to it. So, for that case, I'm not sure it makes sense to shutdown all of the resources upon completion of each run() call. I could also envision a situation where there are some background (daemon) tasks sitting there waiting to do things on each run() invocation--and you wouldn't want to have them shutdown.

Let me play around with it. In thinking about this problem, I realize that there is a very subtle bug in the whole shutdown process that could cause the kernel to spin indefinitely if SystemExit or KeyboardInterrupt is raised as described in this bug. So, I'd like to fix that too.

@dabeaz
Copy link
Owner

dabeaz commented Aug 5, 2016

I have added a new exception KernelExit that can make the kernel exit. It's still going to function more-or-less like SystemExit though. So, you'd need to do something like this::

async def coro():
       ...
       if something_bad:
           raise KernelExit()
try:
     run(main())
except KernelExit:
     # Whatever extra cleanup might be involved
     ..

Or alternatively:

with kernel:
      try:
           kernel.run(main())
      except KernelExit:
           # Cleanup
           ...

I'm still kicking around the concept of whether or not there should be a more general kind of abort() or terminate() functionality. I don't know. It seems that raising an exception and kicking it out to the top level can probably work as well as anything. All things equal, I'd like to have "one way to do it" as opposed to many different ways of terminating though.

Leaving the issue open as I'm still open to ideas on how to improve all of this.

Zaharid added a commit to NNPDF/reportengine that referenced this issue Dec 7, 2016
@dabeaz
Copy link
Owner

dabeaz commented Jan 4, 2017

Closing for now. Think shutdown is working correctly in the current version, but if not this should be reopened.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants