-
-
Notifications
You must be signed in to change notification settings - Fork 755
Description
Describe the issue:
done_callback in client.py raises RuntimeError: cannot schedule new futures after interpreter shutdown when futures complete during interpreter shutdown.
Future.add_done_callback() schedules callbacks via Future._cb_executor (a class-level ThreadPoolExecutor). During shutdown, CPython's concurrent.futures.thread._python_exit() atexit handler kills all ThreadPoolExecutor instances and sets a module-level _shutdown = True flag. The Tornado IOLoop daemon thread is still alive draining done_callback coroutines — when they call _cb_executor.submit(), it raises RuntimeError.
The fix is to catch shutdown-related RuntimeError in done_callback since the callback is no longer meaningful if the interpreter is shutting down.
Minimal Complete Verifiable Example:
import time
from distributed import Client, LocalCluster
def on_done(future):
print(f"Callback: {future.key}")
if __name__ == "__main__":
cluster = LocalCluster(n_workers=2, threads_per_worker=1)
client = Client(cluster)
# pure=False ensures unique keys; staggered sleeps so futures complete during shutdown
futures = client.map(time.sleep, [0.2 * i for i in range(20)], pure=False)
for f in futures:
f.add_done_callback(on_done)
# Exit while futures are still completing — don't call client.close()
time.sleep(1.5)Anything else we need to know?:
The error path is: done_callback → callback(future) → _cb_executor.submit(execute_callback) → RuntimeError.
Even creating a new ThreadPoolExecutor would not help because CPython's concurrent.futures.thread.submit() checks both the instance flag (self._shutdown) and the module-level flag (_shutdown), which was already set by _python_exit().
Suggested fix in done_callback:
async def done_callback(future, callback):
while future.status == "pending":
await future._state.wait()
try:
callback(future)
except RuntimeError as e:
if "shutdown" not in str(e) and "interpreter" not in str(e):
raiseThis is also a prerequisite for a related dask-gateway bug where cleanup_lingering_clusters atexit fails to send HTTP DELETE to shut down clusters because the same executor shutdown blocks aiohttp's DNS resolution.
Environment:
- Dask version: 2025.9.1
- Python version: 3.11.14
- Operating System: macOS 15.7.4 (arm64)
- Install method: uv (pip)