-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consider candidates that don't hold any dependencies in decide_worker
#4925
base: main
Are you sure you want to change the base?
Changes from 23 commits
5b17f55
4a81ca8
c57fd72
d810d2d
768d660
346ab17
420c99e
0a004b2
b050d14
9e99b7f
f6acdc4
524da73
a5d37ae
5842ca8
a159245
bb991d1
58b4bf8
13975cb
fcb165e
cd382c6
cc57a8b
13911bc
f2445fe
38e6b57
5794540
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ | |
from datetime import timedelta | ||
from functools import partial | ||
from numbers import Number | ||
from typing import Optional | ||
from typing import Optional, ValuesView | ||
|
||
import psutil | ||
import sortedcontainers | ||
|
@@ -2324,7 +2324,10 @@ def decide_worker(self, ts: TaskState) -> WorkerState: | |
if ts._dependencies or valid_workers is not None: | ||
ws = decide_worker( | ||
ts, | ||
self._workers_dv.values(), | ||
dict.values(self._workers_dv), | ||
dict.values(self._idle_dv), | ||
# ^ NOTE: For performance, these must be actual `dict_values`, not `SortedDictValues`. | ||
# In Cython, `_workers_dv` is a plain dict, but in plain Python, it's still a `SortedDict`. | ||
valid_workers, | ||
partial(self.worker_objective, ts), | ||
) | ||
|
@@ -7459,14 +7462,19 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): | |
@cfunc | ||
@exceptval(check=False) | ||
def decide_worker( | ||
ts: TaskState, all_workers, valid_workers: set, objective | ||
ts: TaskState, | ||
all_workers: ValuesView, | ||
idle_workers: ValuesView, | ||
valid_workers: set, | ||
objective, | ||
) -> WorkerState: | ||
""" | ||
Decide which worker should take task *ts*. | ||
|
||
We choose the worker that has the data on which *ts* depends. | ||
We consider all workers which hold dependencies of *ts*, | ||
plus a sample of up to 10 random workers (with preference for idle ones). | ||
|
||
If several workers have dependencies then we choose the less-busy worker. | ||
From those, we choose the worker where the *objective* function is minimized. | ||
|
||
Optionally provide *valid_workers* of where jobs are allowed to occur | ||
(if all workers are allowed to take the task, pass None instead). | ||
|
@@ -7476,6 +7484,8 @@ def decide_worker( | |
of bytes sent between workers. This is determined by calling the | ||
*objective* function. | ||
""" | ||
# NOTE: `all_workers` and `idle_workers` must be plain `dict_values` objects, | ||
# not a `SortedValuesView`, which is much slower to iterate over. | ||
ws: WorkerState = None | ||
wws: WorkerState | ||
dts: TaskState | ||
|
@@ -7485,7 +7495,17 @@ def decide_worker( | |
if ts._actor: | ||
candidates = set(all_workers) | ||
else: | ||
# Select all workers holding deps of this task | ||
candidates = {wws for dts in deps for wws in dts._who_has} | ||
# Add up to 10 random workers into `candidates`, preferring idle ones. | ||
worker_pool = valid_workers if valid_workers is not None else all_workers | ||
if len(candidates) < len(worker_pool): | ||
sample_from = idle_workers or worker_pool | ||
candidates.update( | ||
random.choices(list(sample_from), k=min(10, len(sample_from))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thoughts on reducing 10 to 2 or 5 or something lower? This might be premature, but I'm curious how much adding more workers helps here in aggregate. Part of me thinks that adding just one random worker into the mix probably does 80% of the good in aggregate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd kind of like to do that later once we've gotten more of a sense of the impact of this, and hopefully have some more examples to measure that change against. When a cluster is mostly quiet, picking 2 random workers has a high chance of finding a good one, but in a large cluster with mostly busy workers, it's a big haystack to find the few that are underutilized (but not idle). |
||
if len(sample_from) > 10 | ||
else sample_from | ||
) | ||
if valid_workers is None: | ||
if not candidates: | ||
candidates = set(all_workers) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that we have idle_workers in this function should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting; yes it probably should be. |
||
|
@@ -7495,7 +7515,7 @@ def decide_worker( | |
candidates = valid_workers | ||
if not candidates: | ||
if ts._loose_restrictions: | ||
ws = decide_worker(ts, all_workers, None, objective) | ||
ws = decide_worker(ts, all_workers, idle_workers, None, objective) | ||
return ws | ||
|
||
ncandidates: Py_ssize_t = len(candidates) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if
valid_workers and idle_workers
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like maybe we want to use
idle_workers or all_workers
above in line 7501There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd then intersect
candidates
withvalid_workers
, so at least we wouldn't ever pick an invalid worker:distributed/distributed/scheduler.py
Lines 7512 to 7519 in f2445fe
But it's true that if
idle_workers
andvalid_workers
are disjoint, then we haven't gained anything here.How about
(valid_workers.intersection(idle_workers) or valid_workers) if valid_workers is not None else (idle_workers or valid_workers)
?Or for simplicity we could ignore
idle_workers
when there arevalid_workers
given, and just usevalid_workers
.