Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider candidates that don't hold any dependencies in decide_worker #4925

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5b17f55
Test workers without deps are considered
gjoseph92 Jun 17, 2021
4a81ca8
Consider random subset of workers in decide_worker
gjoseph92 Jun 17, 2021
c57fd72
no-sleep test
gjoseph92 Jun 17, 2021
d810d2d
Comment fastpath. Maybe this is still unnecessary?
gjoseph92 Jun 17, 2021
768d660
Pick from idle workers first
gjoseph92 Jun 17, 2021
346ab17
Update `many_independent_leaves` test
gjoseph92 Jun 18, 2021
420c99e
Uppercase Mb
gjoseph92 Jun 18, 2021
0a004b2
move N_RANDOM_WORKERS within conditional
gjoseph92 Jun 18, 2021
b050d14
Pass in sortedcontainers values, not pydict values
gjoseph92 Jun 18, 2021
9e99b7f
Use sleep test again
gjoseph92 Jun 18, 2021
f6acdc4
Simpler logic
gjoseph92 Jun 18, 2021
524da73
20 -> 10
gjoseph92 Jun 18, 2021
a5d37ae
Over-optimized
gjoseph92 Jun 18, 2021
5842ca8
Revert "Over-optimized"
gjoseph92 Jun 18, 2021
a159245
`random_choices_iter`. over-optimized for now.
gjoseph92 Jun 18, 2021
bb991d1
use `random.choices`
gjoseph92 Jun 18, 2021
58b4bf8
REBASEME Actor: don't hold key references on workers
gjoseph92 Jun 19, 2021
13975cb
Remove flaky data-length check
gjoseph92 Jun 21, 2021
fcb165e
No randomness if < 10 workers to choose from
gjoseph92 Jun 21, 2021
cd382c6
Ensure `decide_worker` args are plain dict_values
gjoseph92 Jun 21, 2021
cc57a8b
1 worker for `test_statistical_profiling`
gjoseph92 Jun 22, 2021
13911bc
no conditional on compiled
gjoseph92 Jun 22, 2021
f2445fe
rerun tests
gjoseph92 Jun 22, 2021
38e6b57
Merge remote-tracking branch 'upstream/main' into decide_worker/add-r…
gjoseph92 Jul 20, 2021
5794540
fix errant actor test
gjoseph92 Jul 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7470,7 +7470,7 @@ def decide_worker(
Decide which worker should take task *ts*.

We consider all workers which hold dependencies of *ts*,
plus a sample of 20 random workers (with preference for idle ones).
plus a sample of up to 20 random workers (with preference for idle ones).
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

From those, we choose the worker where the *objective* function is minimized.

Expand All @@ -7491,19 +7491,16 @@ def decide_worker(
if ts._actor:
candidates = set(all_workers)
else:
# Select all workers holding deps of this task
candidates = {wws for dts in deps for wws in dts._who_has}
# Add some random workers to into `candidates`, starting with idle ones
# TODO shuffle to prevent hotspots?
N_RANDOM_WORKERS: Py_ssize_t = 20
candidates.update(idle_workers[:N_RANDOM_WORKERS])
if len(idle_workers) < N_RANDOM_WORKERS:
sample_from = (
list(valid_workers) if valid_workers is not None else all_workers
)
candidates.update(
random.sample(sample_from, min(N_RANDOM_WORKERS, len(sample_from)))
# ^ NOTE: `min` because `random.sample` errors if `len(sample) < k`
)
# Add up to 10 random workers into `candidates`, preferring idle ones.
sample_from = (
list(valid_workers)
if valid_workers is not None
else idle_workers or all_workers
)
candidates.update(random.sample(sample_from, min(10, len(sample_from))))
# ^ NOTE: `min` because `random.sample` errors if `len(sample) < k`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import random
import sortedcontainers

d = sortedcontainers.SortedDict()
for i in range(100):
    d[str(i)] = i

candidates = set()

%timeit candidates.update(random.sample(d.values(), min(10, len(d))))
12.4 µs ± 652 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

You're not wrong to be concerned. 12us is a decent chunk of time.

It looks like random.sample isn't cheap

%timeit random.sample(d.values(), min(10, len(d)))
# 11.7 µs ± 61.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit d.values()[:10]
# 2.09 µs ± 27.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might help with efficiency while also up with simplicity?

        candidates = {wws for dts in deps for wws in dts._who_has}

        extra = (
            list(valid_workers)
            if valid_workers is not None
            else (idle_workers or all_workers)
        )
        if len(extra) > 10:
            extra = random.sample(extra, 10)
        
        candidates.update(extra)

Copy link
Member

@jakirkham jakirkham Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some time is spent just working with sortedcontainers.

import random

d = dict()
for i in range(100):
    d[str(i)] = i

candidates = set()

%timeit candidates.update(random.sample(list(d.values()), min(10, len(d))))
7.21 µs ± 31.3 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

As a side note it seems random.sample won't take a dict or its views directly. So this includes coercing to a list

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

l = list(d.values())

%timeit candidates.update(random.sample(l, min(10, len(l))))
# 5.09 µs ± 179 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit l = list(d.values()[:10])
1.98 µs ± 34.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit l[:10]
# 79.3 ns ± 2.2 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

SortedValuesView does say lookups are O(log(n)), but it seems like it's more than that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes any indexable, which sortedcontainers.SortedDict.values provides. Indexability is the reason why we use sortedcontainers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For comparison:

from distributed.scheduler import TaskState, WorkerState

N = 5
extra = 2
workers = [WorkerState(str(i)) for i in range(N)]
deps = [TaskState(str(i), None) for i in range(N)]
for dts, ws in zip(deps, workers):
    dts.who_has.add(ws)

for i in range(extra):
    deps[i].who_has.add(workers[-i])
deps = set(deps)

%timeit candidates = {wws for dts in deps for wws in dts._who_has}
971 ns ± 24 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
N = 50
extra = 50
12 µs ± 202 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

N = 100
extra = 10
17.2 µs ± 214 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

So just to be clear that the random sample is the thing to try to avoid, not the who_has selection. In the uncommon case of many many dependencies duplicated over many workers, that selection is still on par with the random sample.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So sampling the keys and then using them to look up values is 2x faster with SortedDict, and the same as what we'd get with a plain dict:

%timeit candidates.update(d[k] for k in random.sample(list(d), 10))
6.95 µs ± 227 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

pd = dict(d)
%timeit candidates.update(pd[k] for k in random.sample(list(pd), 10))
6.36 µs ± 95.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

Also random.choices is 2x faster than random.sample (makes sense since it doesn't have to hold onto any state):

%timeit candidates.update(d[k] for k in random.choices(list(d), k=10))
3.65 µs ± 11.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit candidates.update(pd[k] for k in random.choices(list(pd), k=10))
3.17 µs ± 36.1 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

I'd feel okay about random.choices; we'd get slightly fewer extra workers in the pool, but it's probably worth the time savings.

Copy link
Collaborator Author

@gjoseph92 gjoseph92 Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're okay with duplicates then we can go much faster.

Iterating values of a SortedDict is slow:

%timeit list(d.values())
8.43 µs ± 85.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

But it's fast on a plain dict:

pd = dict(d)
%timeit list(pd.values())
691 ns ± 5.07 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

So we just have to make sure we're passing in self._workers_dv.values(), not self._workers.values().

This is decent:

%timeit candidates.update(random.choices(list(pd.values()), k=10))
2.45 µs ± 15.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

But we can go much faster and avoid a copy with an iterator:

import itertools
import random

def random_choices_iter(xs: "Iterator", k: int) -> "Iterator":
    return itertools.islice(itertools.takewhile(lambda _: random.random() < 0.5, xs), k)

%timeit candidates.update(random_choices_iter(pd.values(), k=10))
643 ns ± 12.5 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

EDIT: The iterator method could return less than k items though. So particularly if there's one idle worker, that's not ideal. I feel like random.choices(list(pd.values()), k=10) is okay for now. If future profiling shows it's too slow, then we could do

worker_pool = valid_workers if valid_workers is not None else all_workers
if len(candidates) < len(worker_pool):
    sample_from = idle_workers or worker_pool
    candidates.update(
        random_choices_iter(idle_workers or worker_pool, 10)
        if len(sample_from) > 10
        else sample_from
    )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks random.sample will take d.keys() from playing around with it locally. So we can probably skip the coercion to a list in that case. Though maybe we've already moved well beyond this at this point

I'd probably shy away from getting to clever around random item selection. It's probably more likely to have surprising behavior that we only discover later. Just my 2 cents 🙂

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're beyond that, since we're only taking dict_values objects in this function now. Making it able to support both dicts and sets was a bit of a mess and not worth it IMO.

I didn't go with the too-clever itertools option because it did have surprising behavior. I did have to add another conditional though because choices can repeat items: fcb165e. So perhaps even that's too clever?

if valid_workers is None:
if not candidates:
candidates = set(all_workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have idle_workers in this function should this be idle_workers or all_workers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting; yes it probably should be.

Expand Down
10 changes: 8 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,19 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c):
config={"distributed.scheduler.work-stealing": False},
)
async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c):
await client.submit(slowinc, 10, delay=0.1) # learn that slowinc is slow
root = await client.scatter(1)
assert sum(root.key in worker.data for worker in [a, b, c]) == 1

tasks = client.map(inc, [root] * 6, pure=False)
start = time()
tasks = client.map(slowinc, [root] * 6, delay=0.1, pure=False)
await wait(tasks)
elapsed = time() - start

assert all(root.key in worker.data for worker in [a, b, c])
assert elapsed <= 4
assert all(root.key in worker.data for worker in [a, b, c]), [
list(worker.data.keys()) for worker in [a, b, c]
]
assert len(a.data) == len(b.data) == len(c.data) == 3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, now that I look at this again, maybe we do need the sleep.

If we're scheduling incs then it's not unreasonable to schedule six of these things on one worker. That's where the data is. This computation is very cheap. But in contrast, the dependency is very small. We may need the scheduler to learn that slowinc is slow, and so heavy relative to the int dependency.

Maybe

await client.submit(slowinc, 10, delay=0.1)  # learn that slowinc is slow
root = await client.scatter(1)

futures = client.submit(slowinc, [root] * 6, delay=0.1, pure=False)
await wait(futures)

This way we're confident that the computational cost of the future will far outweigh the communciation cost, but things are still fast-ish.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's why I originally went with sleep. It also feels more like the case we're trying to test for. Though the test still passes with 6d91816 (and note that that test fails on main), so I'm not sure what we're thinking about wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently 28 bytes / 100MB/s is just still smaller than however fast inc ran in (a few microseconds probably). It would be safer to bump this up to milliseconds though.

gen_cluster tests can run in less than a second, but more than 100ms. So sleeps of around that 100ms time are more welcome than 1s sleeps.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is passing as-is; do you still think I should change it back to sleep?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would become a problem if the inc ran in 28 bytes / 100MB/s = 0.2us or less. Inc does actually run this fast.

In [1]: 28 / 100e6
Out[1]: 2.8e-07

In [2]: def inc(x):
   ...:     return x + 1
   ...: 

In [3]: %timeit inc(1)
64.2 ns ± 0.914 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

But presumably the infrastructure around the inc is taking longer. If our timing infrsatructure got much faster then this might result in intermittent errors. It probably doesn't matter at this scale, but it's probably a good idea if it takes only a few minutes.

We might also introduce a penalty of something like 1ms for any communication (I'm surprised that we don't do this today actually) which might tip the scales in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll switch back to a short sleep for now.

We might also introduce a penalty of something like 1ms for any communication

That seems very worth doing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We maybe used to do this? It might be worth checking logs. The worker_objective function is pretty clean if you wanted to add this. We should probably wait until we find a case where it comes up though in an impactful way just to keep from cluttering things.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it should be a separate PR for cleanliness



Expand Down