From d84e5a8fa6fd5503ed0a54fb2c6a30bad074fd9d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 23 Aug 2022 08:54:32 +0200 Subject: [PATCH] Always return `ws.address` from `_remove_from_processing` (#6884) --- distributed/scheduler.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e38447c70b9..a8b5f5b3f76 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2259,7 +2259,7 @@ def transition_processing_released(self, key: str, stimulus_id: str): assert ts.state == "processing" w = _remove_from_processing(self, ts) - if w: + if w in self.workers: worker_msgs[w] = [ { "op": "free-keys", @@ -2304,7 +2304,6 @@ def transition_processing_erred( traceback=None, exception_text: str | None = None, traceback_text: str | None = None, - worker: str | None = None, **kwargs, ): ws: WorkerState @@ -2329,7 +2328,7 @@ def transition_processing_erred( w = _remove_from_processing(self, ts) - ts.erred_on.add(w or worker) # type: ignore + ts.erred_on.add(w) if exception is not None: ts.exception = exception ts.exception_text = exception_text # type: ignore @@ -7314,7 +7313,7 @@ def request_remove_replicas( ) -def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str | None: +def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str: """Remove *ts* from the set of processing tasks. See also @@ -7326,7 +7325,7 @@ def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str | None: ts.processing_on = None if ws.address not in state.workers: # may have been removed - return None + return ws.address duration = ws.processing.pop(ts) ws.long_running.discard(ts)