Skip to content

Commit

Permalink
Simplify decide_worker
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 30, 2022
1 parent 4cf9baf commit 52e0a88
Showing 1 changed file with 6 additions and 30 deletions.
36 changes: 6 additions & 30 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1839,36 +1839,12 @@ def decide_worker(self, ts: TaskState) -> WorkerState | None:
tg.last_worker_tasks_left -= 1
return ws

if ts.dependencies or valid_workers is not None:
ws = decide_worker(
ts,
self.workers.values(),
valid_workers,
partial(self.worker_objective, ts),
)
else:
# Fastpath when there are no related tasks or restrictions
worker_pool = self.idle or self.workers
wp_vals = worker_pool.values()
n_workers: int = len(wp_vals)
if n_workers < 20: # smart but linear in small case
ws = min(wp_vals, key=operator.attrgetter("occupancy"))
assert ws
if ws.occupancy == 0:
# special case to use round-robin; linear search
# for next worker with zero occupancy (or just
# land back where we started).
wp_i: WorkerState
start: int = self.n_tasks % n_workers
i: int
for i in range(n_workers):
wp_i = wp_vals[(i + start) % n_workers]
if wp_i.occupancy == 0:
ws = wp_i
break
else: # dumb but fast in large case
ws = wp_vals[self.n_tasks % n_workers]

ws = decide_worker(
ts,
self.workers.values(),
valid_workers,
partial(self.worker_objective, ts),
)
if self.validate and ws is not None:
assert ws.address in self.workers

Expand Down

0 comments on commit 52e0a88

Please sign in to comment.