Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Co-assign root-ish tasks #4967

Merged
merged 21 commits into from
Jun 30, 2021

Conversation

gjoseph92
Copy link
Collaborator

In decide_worker, rather than spreading out root tasks as much as possible, schedule consecutive (by priority order) root(ish) tasks on the same worker. This ensures the dependencies of a reduction start out on the same worker, reducing future data transfer.

It doesn't like `sum(wws._nthreads for wws in valid_workers)` since that requires defining a closure.
Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woot?

distributed/tests/test_scheduler.py Show resolved Hide resolved
distributed/scheduler.py Show resolved Hide resolved
@mrocklin
Copy link
Member

Here are a couple of possible commits: https://github.com/dask/distributed/compare/main...mrocklin:decide_worker/co-assign-relatives-2?expand=1

  • Slight simplification, although I drop caring about size in the valid_workers case (maybe a bad idea): 28c5e41
  • Larger change, breaking things out into methods (also I broke things): 4589e94

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
@mrocklin
Copy link
Member

@Kirill888 great talk, for memory issues I think that you might find the work in this PR interesting. I don't know precisely what you're running into, but given how you described your problem I give this PR a 50% chance of making you happy.

@mrocklin
Copy link
Member

If tests pass I'm good to merge. However, @gjoseph92 it might make sense to run with this in the wild a bit. Maybe tomorrow is "try a bunch of pangeo workloads" day?

@gjoseph92
Copy link
Collaborator Author

@mrocklin I believe that the one failure (so far) is flaky: #4976. I'd also be happy to try it out for a day tomorrow before merging. Tomorrow definitely is that sort of day.

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/tests/test_scheduler.py Show resolved Hide resolved

trivial_deps = {f"k{i}": delayed(object()) for i in range(ndeps)}

# TODO is there a simpler (non-blockwise) way to make this sort of graph?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO still relevant?

distributed/tests/test_scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Show resolved Hide resolved
Comment on lines +2348 to +2350
valid_workers is None
and len(group) > self._total_nthreads * 2
and sum(map(len, group._dependencies)) < 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it is possible to construct a test which helps us with this cutoff condition. One of the motivators for this heuristics is that this is likely acting as a good binary classifier for graphs where most have either very small or very large numbers here. afaiu, this is an unproven assumption

I'm interested in a test for this particular boundary condition for two reasons

  • It would helps us to identify regressions when this boundary is accidentally moved
  • How good is our classifier? Might it be 40% better if this value was 10? A well written test could help in an analysis for this

I won't push hard on this if it proves too difficult or others disagree on the value. I'm just having a hard time with heuristics if I can change them without tests breaking.

For the current test I can do either of the following without the test breaking

  • remove len(group) > self._total_nthreads * 2 entirely
  • Increase the boundary for total thread count, e.g. len(group) > self._total_nthreads * 10 (it breaks eventually if pushed further)
  • Increase the boundary for dependencies sum(map(len, group._dependencies)) < 100 (increased further, it breaks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 90% of cases the number here is 0 (like da.random.random) or 1 (like da.from_zarr). I can imagine but can't actually come up with cases where this might be 2 (like da.from_zarr(zarr_array, parameter=some_dask_thing))

I get the aversion to magic numbers. This one feels pretty safe to me though.

distributed/scheduler.py Show resolved Hide resolved
@@ -2336,6 +2378,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState:
partial(self.worker_objective, ts),
)
else:
# Fastpath when there are no related tasks or restrictions
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm realizing that this codepath will now only be rarely triggered (when there are 0 deps, but also the TaskGroup is small). Do we need to add this round-robining into our selection of a new worker for root-ish tasks? (Since we know we'll be running the tasks on every worker, I'm not sure it matters much that we may always start with the same one in an idle cluster.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly. More broadly this is probably a good reminder that while we run this on some larger example computations we should also remember to try looking at some profiles of the scheduler to see if/how things have changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gjoseph92 did you run into issues with this yet? I'm curious, have you tried using many workers? (for some sensible definition of many)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gjoseph92 did this come up in profiling? this seems like the only pending comment. I'd like to get this in if possible

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did not come up in profiling, and I haven't run into any issues with it. I feel pretty confident that round-robining is irrelevant when we're running TaskGroups larger than the cluster. I mostly brought it up because this branch is now pretty long and complicated for a codepath that we'll almost never go down. But maybe that's okay.

have you tried using many workers?

I haven't tried pangeo-style workloads with >30 workers, but I have tried my standard shuffle-profile with this which prompted 91aee92 which I need to look into a little more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My hope is that we can get this in today or tomorrow. Is that hope achievable? If not, do you have a sense for what a reasonably deadline would be?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shall prepare the happy dance.

gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Jul 1, 2021
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Jul 1, 2021
mrocklin pushed a commit that referenced this pull request Jul 2, 2021
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Jul 23, 2021
While looking into dask#5083 I happened to notice that the dashboard felt very sluggish. I profiled with py-spy and discovered that the scheduler was spending 20% of runtime calculaing `sum(map(len, group._dependencies)) < 5`! A quick print statement showed some task groups depended on 25,728 other groups (each of size 1). We can easily skip those.

I originally had this conditional in dask#4967 but we removed it for simplicity: dask#4967 (comment); turns out it was relevant after all!
mrocklin pushed a commit that referenced this pull request Jul 24, 2021
While looking into #5083 I happened to notice that the dashboard felt very sluggish. I profiled with py-spy and discovered that the scheduler was spending 20% of runtime calculaing `sum(map(len, group._dependencies)) < 5`! A quick print statement showed some task groups depended on 25,728 other groups (each of size 1). We can easily skip those.

I originally had this conditional in #4967 but we removed it for simplicity: #4967 (comment); turns out it was relevant after all!
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Sep 16, 2021
If a dependency is already on every worker—or will end up on every worker regardless, because many things depend on it—we should ignore it when selecting our candidate workers. Otherwise, we'll end up considering every worker as a candidate, which is 1) slow and 2) often leads to poor choices (xref dask#5253, dask#5324).

Just like with root-ish tasks, this is particularly important at the beginning. Say we have a bunch of tasks `x, 0`..`x, 10` that each depend on `root, 0`..`root, 10` respectively, but every `x` also depends on one task called `everywhere`. If `x, 0` is ready first, but `root, 0` and `everywhere` live on different workers, it appears as though we have a legitimate choice to make: do we schedule near `root, 0`, or near `everywhere`? But if we choose to go closer to `everywhere`, we might have a short-term gain, but we've taken a spot that could have gone to better use in the near future. Say that `everywhere` worker is about to complete `root, 6`. Now `x, 6` may run on yet another worker (because `x, 0` is already running where it should have gone). This can cascade through all the `x`s, until we've transferred most `root` tasks to different workers (on top of `everywhere`, which we have to transfer everywhere no matter what).

The principle of this is the same as dask#4967: be more forward-looking in worker assignment and accept a little short-term slowness to ensure that downstream tasks have to transfer less data.

This PR is a binary choice, but I think we could actually generalize to some weight in `worker_objective` like: the more dependents or replicas a task has, the less weight we should give to the workers that hold it. I wonder if, barring significant data transfer inbalance, having stronger affinity for the more "rare" keys will tend to lead to better placement.
@gjoseph92 gjoseph92 deleted the decide_worker/co-assign-relatives-2 branch June 9, 2022 20:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Co-assign neighboring tasks to neighboring workers
3 participants