From a36b8ec34db5c0fbf806693580025f0900327771 Mon Sep 17 00:00:00 2001 From: prince8273 Date: Thu, 14 May 2026 11:39:39 +0530 Subject: [PATCH 1/2] [Observability] Log interrupted processing tasks on unexpected worker death When a worker drops off the cluster unexpectedly (e.g., due to an OOM kill), the scheduler tracks the processing_keys but previously did not log them to the console. This change surfaces exactly which tasks were interrupted, significantly improving debugging provenance for cluster hangs and memory crashes. --- distributed/scheduler.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ace360d093..68abfc3469 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5714,6 +5714,12 @@ async def remove_worker( f"Removing worker {ws.address!r} caused the cluster to lose scattered " f"data, which can't be recovered: {lost_keys} ({stimulus_id=})" ) + if not expected and processing_keys: + logger.warning( + f"Worker {ws.address!r} dropped unexpectedly. " + f"Interrupting {len(processing_keys)} processing tasks: " + f"{processing_keys} ({stimulus_id=})" + ) event_msg = { "action": "remove-worker", From 82137211ad869c77c4f958e9b0fab17e3557debe Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Thu, 14 May 2026 18:06:12 +0530 Subject: [PATCH 2/2] [Observability] Log interrupted processing tasks on unexpected worker death Fixes #9263 --- distributed/tests/test_scheduler.py | 135 ++++++++++++++++++++++++++++ distributed/tests/test_worker.py | 133 --------------------------- 2 files changed, 135 insertions(+), 133 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index b2cf5958b6..ce735556e6 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -5216,3 +5216,138 @@ def __dask_postcompute__(self): sum([s.is_rootish(v) and v.run_spec.data_producer for v in s.tasks.values()]) == 2 ) + + +@gen_cluster(client=True) +async def test_log_remove_worker(c, s, a, b): + # Computed task + x = c.submit(inc, 1, key="x", workers=a.address) + await x + ev = Event() + # Processing task + y = c.submit( + lambda ev: ev.wait(), ev, key="y", workers=a.address, allow_other_workers=True + ) + await wait_for_state("y", "processing", s) + # Scattered task + z = await c.scatter({"z": 3}, workers=a.address) + + s._broker.truncate() + + with captured_logger("distributed.scheduler", level=logging.INFO) as log: + # Successful graceful shutdown + await s.retire_workers([a.address], stimulus_id="graceful") + # Refuse to retire gracefully as there's nowhere to put x and z + await s.retire_workers([b.address], stimulus_id="graceful_abort") + await asyncio.sleep(0.2) + # Ungraceful shutdown + await s.remove_worker(b.address, stimulus_id="ungraceful") + await asyncio.sleep(0.2) + await ev.set() + + assert log.getvalue().splitlines() == [ + # Successful graceful + f"Retire worker addresses (stimulus_id='graceful') ['{a.address}']", + f"Remove worker addr: {a.address} name: {a.name} (stimulus_id='graceful')", + f"Retired worker '{a.address}' (stimulus_id='graceful')", + # Aborted graceful + f"Retire worker addresses (stimulus_id='graceful_abort') ['{b.address}']", + f"Could not retire worker '{b.address}': unique data could not be " + "moved to any other worker (stimulus_id='graceful_abort')", + # Ungraceful + f"Remove worker addr: {b.address} name: {b.name} (stimulus_id='ungraceful')", + f"Removing worker '{b.address}' caused the cluster to lose already " + "computed task(s), which will be recomputed elsewhere: {'x'} " + "(stimulus_id='ungraceful')", + f"Removing worker '{b.address}' caused the cluster to lose scattered " + "data, which can't be recovered: {'z'} (stimulus_id='ungraceful')", + f"Worker {b.address!r} dropped unexpectedly. Interrupting 1 " + "processing tasks: {'y'} (stimulus_id='ungraceful')", + "Lost all workers", + ] + + events = {topic: [ev for _, ev in evs] for topic, evs in s.get_events().items()} + for evs in events.values(): + for ev in evs: + if ev.get("action", None) == "retire-workers": + for k in ("retired", "could-not-retire"): + ev[k] = {addr: "snip" for addr in ev[k]} + if "stimulus_id" in ev: # Strip timestamp + ev["stimulus_id"] = ev["stimulus_id"].rsplit("-", 1)[0] + + assert events == { + a.address: [ + { + "action": "worker-status-change", + "prev-status": "running", + "status": "closing_gracefully", + "stimulus_id": "graceful", + }, + { + "action": "remove-worker", + "lost-computed-tasks": set(), + "lost-scattered-tasks": set(), + "processing-tasks": {"y"}, + "expected": True, + "stimulus_id": "graceful", + }, + {"action": "retired", "stimulus_id": "graceful"}, + ], + b.address: [ + { + "action": "worker-status-change", + "prev-status": "running", + "status": "closing_gracefully", + "stimulus_id": "graceful_abort", + }, + {"action": "could-not-retire", "stimulus_id": "graceful_abort"}, + { + "action": "worker-status-change", + "prev-status": "closing_gracefully", + "status": "running", + "stimulus_id": "worker-status-change", + }, + { + "action": "remove-worker", + "lost-computed-tasks": {"x"}, + "lost-scattered-tasks": {"z"}, + "processing-tasks": {"y"}, + "expected": False, + "stimulus_id": "ungraceful", + }, + {"action": "closing-worker", "reason": "scheduler-remove-worker"}, + ], + "all": [ + { + "action": "remove-worker", + "lost-computed-tasks": set(), + "lost-scattered-tasks": set(), + "processing-tasks": {"y"}, + "expected": True, + "stimulus_id": "graceful", + "worker": a.address, + }, + { + "action": "retire-workers", + "stimulus_id": "graceful", + "retired": {a.address: "snip"}, + "could-not-retire": {}, + }, + { + "action": "retire-workers", + "stimulus_id": "graceful_abort", + "retired": {}, + "could-not-retire": {b.address: "snip"}, + }, + { + "action": "remove-worker", + "lost-computed-tasks": {"x"}, + "lost-scattered-tasks": {"z"}, + "processing-tasks": {"y"}, + "expected": False, + "stimulus_id": "ungraceful", + "worker": b.address, + }, + ], + "worker-get-client": [{"client": c.id, "timeout": 5, "worker": b.address}], + } diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 1418df86bb..00f4b21d23 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2965,139 +2965,6 @@ async def test_worker_status_sync(s, a): ] -@gen_cluster(client=True) -async def test_log_remove_worker(c, s, a, b): - # Computed task - x = c.submit(inc, 1, key="x", workers=a.address) - await x - ev = Event() - # Processing task - y = c.submit( - lambda ev: ev.wait(), ev, key="y", workers=a.address, allow_other_workers=True - ) - await wait_for_state("y", "processing", s) - # Scattered task - z = await c.scatter({"z": 3}, workers=a.address) - - s._broker.truncate() - - with captured_logger("distributed.scheduler", level=logging.INFO) as log: - # Successful graceful shutdown - await s.retire_workers([a.address], stimulus_id="graceful") - # Refuse to retire gracefully as there's nowhere to put x and z - await s.retire_workers([b.address], stimulus_id="graceful_abort") - await asyncio.sleep(0.2) - # Ungraceful shutdown - await s.remove_worker(b.address, stimulus_id="ungraceful") - await asyncio.sleep(0.2) - await ev.set() - - assert log.getvalue().splitlines() == [ - # Successful graceful - f"Retire worker addresses (stimulus_id='graceful') ['{a.address}']", - f"Remove worker addr: {a.address} name: {a.name} (stimulus_id='graceful')", - f"Retired worker '{a.address}' (stimulus_id='graceful')", - # Aborted graceful - f"Retire worker addresses (stimulus_id='graceful_abort') ['{b.address}']", - f"Could not retire worker '{b.address}': unique data could not be " - "moved to any other worker (stimulus_id='graceful_abort')", - # Ungraceful - f"Remove worker addr: {b.address} name: {b.name} (stimulus_id='ungraceful')", - f"Removing worker '{b.address}' caused the cluster to lose already " - "computed task(s), which will be recomputed elsewhere: {'x'} " - "(stimulus_id='ungraceful')", - f"Removing worker '{b.address}' caused the cluster to lose scattered " - "data, which can't be recovered: {'z'} (stimulus_id='ungraceful')", - "Lost all workers", - ] - - events = {topic: [ev for _, ev in evs] for topic, evs in s.get_events().items()} - for evs in events.values(): - for ev in evs: - if ev.get("action", None) == "retire-workers": - for k in ("retired", "could-not-retire"): - ev[k] = {addr: "snip" for addr in ev[k]} - if "stimulus_id" in ev: # Strip timestamp - ev["stimulus_id"] = ev["stimulus_id"].rsplit("-", 1)[0] - - assert events == { - a.address: [ - { - "action": "worker-status-change", - "prev-status": "running", - "status": "closing_gracefully", - "stimulus_id": "graceful", - }, - { - "action": "remove-worker", - "lost-computed-tasks": set(), - "lost-scattered-tasks": set(), - "processing-tasks": {"y"}, - "expected": True, - "stimulus_id": "graceful", - }, - {"action": "retired", "stimulus_id": "graceful"}, - ], - b.address: [ - { - "action": "worker-status-change", - "prev-status": "running", - "status": "closing_gracefully", - "stimulus_id": "graceful_abort", - }, - {"action": "could-not-retire", "stimulus_id": "graceful_abort"}, - { - "action": "worker-status-change", - "prev-status": "closing_gracefully", - "status": "running", - "stimulus_id": "worker-status-change", - }, - { - "action": "remove-worker", - "lost-computed-tasks": {"x"}, - "lost-scattered-tasks": {"z"}, - "processing-tasks": {"y"}, - "expected": False, - "stimulus_id": "ungraceful", - }, - {"action": "closing-worker", "reason": "scheduler-remove-worker"}, - ], - "all": [ - { - "action": "remove-worker", - "lost-computed-tasks": set(), - "lost-scattered-tasks": set(), - "processing-tasks": {"y"}, - "expected": True, - "stimulus_id": "graceful", - "worker": a.address, - }, - { - "action": "retire-workers", - "stimulus_id": "graceful", - "retired": {a.address: "snip"}, - "could-not-retire": {}, - }, - { - "action": "retire-workers", - "stimulus_id": "graceful_abort", - "retired": {}, - "could-not-retire": {b.address: "snip"}, - }, - { - "action": "remove-worker", - "lost-computed-tasks": {"x"}, - "lost-scattered-tasks": {"z"}, - "processing-tasks": {"y"}, - "expected": False, - "stimulus_id": "ungraceful", - "worker": b.address, - }, - ], - "worker-get-client": [{"client": c.id, "timeout": 5, "worker": b.address}], - } - - @gen_cluster(client=True) async def test_task_flight_compute_oserror(c, s, a, b): """If the remote worker dies while a task is in flight, the task may be