Skip to content

Commit

Permalink
Always return ws.address from _remove_from_processing (#6884)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Aug 23, 2022
1 parent b7e184a commit 2a2c3bb
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2259,7 +2259,7 @@ def transition_processing_released(self, key: str, stimulus_id: str):
assert ts.state == "processing"

w = _remove_from_processing(self, ts)
if w:
if w in self.workers:
worker_msgs[w] = [
{
"op": "free-keys",
Expand Down Expand Up @@ -2304,7 +2304,6 @@ def transition_processing_erred(
traceback=None,
exception_text: str | None = None,
traceback_text: str | None = None,
worker: str | None = None,
**kwargs,
):
ws: WorkerState
Expand All @@ -2329,7 +2328,7 @@ def transition_processing_erred(

w = _remove_from_processing(self, ts)

ts.erred_on.add(w or worker) # type: ignore
ts.erred_on.add(w)
if exception is not None:
ts.exception = exception
ts.exception_text = exception_text # type: ignore
Expand Down Expand Up @@ -7314,7 +7313,7 @@ def request_remove_replicas(
)


def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str | None:
def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str:
"""Remove *ts* from the set of processing tasks.
See also
Expand All @@ -7326,7 +7325,7 @@ def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str | None:
ts.processing_on = None

if ws.address not in state.workers: # may have been removed
return None
return ws.address

duration = ws.processing.pop(ts)
ws.long_running.discard(ts)
Expand Down

0 comments on commit 2a2c3bb

Please sign in to comment.