From 52e0a887d973d1fd392f8609b6b714c1c7cfea45 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 30 Aug 2022 13:52:49 +0200 Subject: [PATCH] Simplify decide_worker --- distributed/scheduler.py | 36 ++++++------------------------------ 1 file changed, 6 insertions(+), 30 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c8d59c04d7..abd4895501 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1839,36 +1839,12 @@ def decide_worker(self, ts: TaskState) -> WorkerState | None: tg.last_worker_tasks_left -= 1 return ws - if ts.dependencies or valid_workers is not None: - ws = decide_worker( - ts, - self.workers.values(), - valid_workers, - partial(self.worker_objective, ts), - ) - else: - # Fastpath when there are no related tasks or restrictions - worker_pool = self.idle or self.workers - wp_vals = worker_pool.values() - n_workers: int = len(wp_vals) - if n_workers < 20: # smart but linear in small case - ws = min(wp_vals, key=operator.attrgetter("occupancy")) - assert ws - if ws.occupancy == 0: - # special case to use round-robin; linear search - # for next worker with zero occupancy (or just - # land back where we started). - wp_i: WorkerState - start: int = self.n_tasks % n_workers - i: int - for i in range(n_workers): - wp_i = wp_vals[(i + start) % n_workers] - if wp_i.occupancy == 0: - ws = wp_i - break - else: # dumb but fast in large case - ws = wp_vals[self.n_tasks % n_workers] - + ws = decide_worker( + ts, + self.workers.values(), + valid_workers, + partial(self.worker_objective, ts), + ) if self.validate and ws is not None: assert ws.address in self.workers