-
-
Notifications
You must be signed in to change notification settings - Fork 748
Description
This issue is meant for higher-level discussion around dropping Tornado and our async infrastructure #6047.
I count 42 instances of loop.add_callback or loop.call_later in distributed. That's equivalent to saying we have 42 goto statements in distributed.
Unfamiliar with this comparison? Please read https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful. (This whole issue is really just saying "please read this". If you've read it, read it again. It took me a few reads for the ideas to really sink in.)
The basic idea is that when you use add_callback, or asyncio.create_task (or go in golang, etc.), you're starting something to run concurrently, but not dealing with the endgame:
- what if the callback raises an error?
- what if an error occurs somewhere else and you now need to cancel the callback?
- how do you even know when the callback is done?
This can be fine. But because you can just fire off callbacks willy-nilly and not deal with their consequences or lifetimes, it's very hard to reason about and maintain code that does this. (For the same reason code that uses goto statements can work, but is hard to reason about and maintain.) When things go as expected, they probably work, but when anything unexpected happens, it's very easy to go off the rails. Of course, a distributed system is just the kind of system where unexpected things are nearly guaranteed to happen, and it's unacceptable to not handle them correctly when they do.
Here are some recent issues that I think, at their core, stem from using unstructured concurrency. That is, using structured concurrency, we'd either not (be likely/able to) design something that could get into this sort of broken state, or an unhandled error would propagate up and shut down the whole worker, instead of ignoring the error and hobbling along doing the wrong thing as though everything's fine:
BaseExceptionin task leads to task never completing #5958 (exception not propagated inexecute)- Deadlock stealing a
resumedtask #6159 (can't cancelgather_depcoroutine) - Missing input data for a task should fail worker, not the task #6142 (can't propagate exception from
execute) - Properly support restarting
BatchedSend#5481 (coroutine assumed running when it's failed; exception not propagated) - KeyError in gather_dep #6194 (exception not propagated)
- Deadlock - task not running #5366 (exception not propagated?)
- [Retrospective] Use of ContextVars for passing stimulus_id's within the Scheduler #6107 (contextvars mishandled in async framework)
- Keyboard Interrupt of the
dask-schedulerCLI results intornado.util.TimeoutError#5955 (handling shutdown mess) - Error closing a local cluster when client still running #6087 (handling shutdown mess)
- I'll also include the recent things involving
gather_dep, since I think not being able to cancel or respond to errors with in-progressgather_depcallbacks is the core reason it's so brittle (Deadlock stealing aresumedtask #6159 (comment)). And using structured concurrency, it would be awkward/non-idiomatic to implement something like our current design.
I'm not saying "we need to rewrite all of distributed to use trio right now!" I recognize this isn't going to happen immediately.
But I think we should at least take the concepts of structured concurrency to heart. TaskGroups are getting added to asyncio in py3.11, and are available now in aiotools and anyio. When using concurrency, we should always:
- Have a way for the coroutine to propagate errors upwards
- Have a way to cancel the coroutine
This is kinda possible to do just with plain asyncio... it's just that to do it easily and consistently and reliably (in the face of signal handlers, __del__ methods, exceptions in finally blocks, etc.) you'd start wishing you were using trio.
Also, switching to using trio/anyio may not be as dramatic as you'd think. Trio can be hard to adopt because it doesn't support asyncio-based libraries (though anyio can help with this). But good news—we're not using any asyncio-based libraries (besides Tornado)!
And we might not have to immediately rewrite every loop.add_callback to do structured concurrency "properly" (an async with trio.nursery() block, etc.). Trio's clever "escape hatch" means we could just store a global Nursery object on the Worker/Scheduler/Client instance, and use it a lot like we use the current event loop. Replace self.loop.add_callback -> self.nursery.start_soon and we're in a strictly better state than we were before, because now previously-unhandled exceptions (aka fatal errors) in callbacks will propagate and cleanly shut down the whole worker/scheduler/client. Dramatic, but correct, and better than deadlocking. And then we can go gradually better-structure these places once the tools are available.
But, that said... structured concurrency is so fundamentally different from the traditional horrid mess of callbacks that there's debate on anyio around whether to even write a tutorial on how to port asyncio libraries: agronholm/anyio#245 (comment). It's considered unlikely that trio's advances could be brought back to asyncio. It's probably reasonable to expect that adopting structured concurrency would, eventually, mean many of our existing systems would get re-architected. But if we can do that incrementally (which we probably can with anyio), I think we'd end up in a much more reliable and maintainable state than it would be possible to have with the current unstructured callbacks.