Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask.order rewrite using a critical path approach #10660

Merged
merged 48 commits into from Dec 12, 2023

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Nov 30, 2023

This is another attempt to rewrite of dask.order

Supersedes #10557
Closes dask/distributed#8255

It does not incorporate #10619 yet but I will likely want to follow up adding this

This takes a quite different approach to our ordering that the PRs before this and actually rewrites it from scratch.

I still have a couple of unit tests to take care of but the failing tests right now are mostly artificial test cases. There are one or two mildly concerning things I still have to look at but generally speaking I'm optimistic about this approach

High level, this change was inspired by what the root task queuing is doing in that it tries to finish off whatever runnable computation branches exist before it starts loading new stuff. On such a high level, the algorithm can be described in a couple of simple steps

  1. Compute a critical path by reverse DFS
  2. Use the critical path as a stack and walk the stack in forward direction. Whenever a node is encountered, that is not runnable, add the missing dependencies
  3. Whenever that happens, look at all currently runnable tasks and walk them as far as possible. Those paths are remembered but not immediately added to the result
  4. If the runnable paths meet a reducer (or final node) and that reducer would be reachable given all runnable paths, execute those paths
  5. If not, proceed with the critical path until all nodes are processed

The nuances of the implementation are a little harder to describe but I'll add some more in-code comments.

@fjetter
Copy link
Member Author

fjetter commented Nov 30, 2023

coiled benchmark results are in

https://github.com/coiled/benchmarks/actions/runs/7046212948

Wall clock time is mostly unaffected. The little signal there is likely just noise

Looking at peak memory, we see a reduction for double_diff and dataframe_align

image

and a very solid reduction for the zarr filtering tests

image

Similar patterns can be seen in average memory use

@fjetter
Copy link
Member Author

fjetter commented Nov 30, 2023

doublediff does not drop as strongly as in #10557 so there is likely still room for improvement. Given that the cost function / sorting key I am using is almost random, I assume this is the most interesting knob I have to tweak

@fjetter
Copy link
Member Author

fjetter commented Nov 30, 2023

This implementation no longer has runtime issues as #10557 had. There are cases where this algorithm is indeed slower but this is caused by a sort in my current code that I haven't safe guarded against way too large collections (main actually safe guards some sorts by a maximum length, e.g. https://github.com/dask/dask/blob/5d75a04132d5ea1535287a649310c8529dbb3a79/dask/order.py#L651C52-L652). Sometimes it is actually faster.

image

I think the most important thing I like about this new approach I am proposing here is that it is very deterministic and does not require any overly magical cost functions (I could never wrap my head around the partition_keys in the original implementation).

That all said, I still have one or two examples I have to inspect a little more closely before we can move forward with this

@fjetter fjetter force-pushed the dask_order_rewrite_critical_path branch from c144c68 to f63b20f Compare November 30, 2023 14:22
@fjetter
Copy link
Member Author

fjetter commented Nov 30, 2023

This critical path approach has one flaw I haven't quite figured out, yet.

The following example is a smaller version of what was reported in #3055

image

The visualization shows clearly that we're very eagerly processing the critical path and are working off unlocked branches as soon as they are available. However, with this particular graph, a root task that is required to reduce everything early on is only required by the critical path very late on such that those branches never get a chance to run

@fjetter
Copy link
Member Author

fjetter commented Dec 5, 2023

So, I changed how the critical path is determined such that we have two different strategies based on a heuristic about graph symmetry. That is working pretty well for the test suite and the benchmark suite.

https://github.com/coiled/benchmarks/actions/runs/7088167915

Wall time looks great for all H20 tests and the zarr filtering

Screenshot 2023-12-05 at 09 25 38 Screenshot 2023-12-05 at 09 25 59

There appears to be a mild regression in the shuffle tests I still want to investigate since I'm surprised those are affected at all (maybe this is runtime of the algo, idk)

Screenshot 2023-12-05 at 09 26 17

Memory looks great overall (below peak memory, avg looks similar)

Screenshot 2023-12-05 at 09 27 03 Screenshot 2023-12-05 at 09 27 27

@mrocklin
Copy link
Member

mrocklin commented Dec 5, 2023

Exciting!

@fjetter
Copy link
Member Author

fjetter commented Dec 6, 2023

I'm still tinkering but even though this implementation is vastly different from #10562 I still see potential for grouping tasks and enable coassignment/sta with this. Below a couple of examples where I am coloring the "generation" of critical paths (still WIP; floats are assigned whenever the process_runnable sets a result while proper integers are crit path only results)

image image image

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2023

I'm mildly curious when we'll be able to merge and release this. There's probably some mild benefit to it going out before AGU (next week) if that's doable.

@fjetter
Copy link
Member Author

fjetter commented Dec 6, 2023

I have to put in a little more time but there is nothing big left that I am aware of. There are a couple of unit tests that are failing and I want do sanity check the benchmarks. If nothing else flares up, I guess we can merge before next week.

@mrocklin
Copy link
Member

mrocklin commented Dec 6, 2023

cc @dcherian

@fjetter
Copy link
Member Author

fjetter commented Dec 7, 2023

I reran the benchmarks and the shuffle related regressions that showed up earlier did vanish. The improvements in H20 unfortunately also vanished but that's somewhat expected.

https://github.com/coiled/benchmarks/actions/runs/7130181112

Note, that this rewrite does not include the "skip data tasks" as proposed in #10619 yet but it does fix the test case I added there to close #10618 so I added it also to this PR.
I'll think about the data tasks a bit but I believe this PR takes a bit pressure off the conversation about inlining

@fjetter
Copy link
Member Author

fjetter commented Dec 7, 2023

I also checked again pydata/xarray#8414 which now produces this ordering

image

which I believe is just right. I'll add this as a test case as well.

@fjetter fjetter marked this pull request as ready for review December 7, 2023 18:28
@fjetter
Copy link
Member Author

fjetter commented Dec 7, 2023

A note about the implementation:

The complexity went through the roof but I think it's worth it. I tried factorizing it but it doesn't break apart easily. However, there are very clear stages and I split off those stages and modularized it as good as possible. I didn't manage to factor this stuff out to functions but I still ended up using local functions to manage scope and namespace pollution.
If anybody is brave enough to review I suggest to start towards the bottom of the order function where the core algorithm starts and is documented. I'll eventually follow up with more docs about the other sections.

@fjetter fjetter changed the title [WIP] Dask order rewrite critical path Dask order rewrite using a critical path approach Dec 7, 2023
@fjetter fjetter changed the title Dask order rewrite using a critical path approach Dask.order rewrite using a critical path approach Dec 7, 2023
@mrocklin
Copy link
Member

mrocklin commented Dec 7, 2023

It looks like the bus factor went from zero to one.

@dcherian
Copy link
Contributor

dcherian commented Dec 7, 2023

Note, that this rewrite does not include the "skip data tasks" as proposed in #10619 yet but it does fix the test case I added there to close #10618 so I added it also to this PR.
I'll think about the data tasks a bit but I believe this PR takes a bit pressure off the conversation about inlining

Cool! Thanks!

Comment on lines +842 to +843
if agg_func in {"first", "last"} and shuffle_method == "disk":
pytest.skip(reason="https://github.com/dask/dask/issues/10034")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently, the reordering made us hit #10034 in the test_groupby_reduction_split

@fjetter
Copy link
Member Author

fjetter commented Dec 8, 2023

Apparently, the reordering made us hit #10034 in the test_groupby_reduction_split. What's even stranger is that it also causes other failures that are more meta related, e.g. a groupby.shift is losing it's freq argument in test_groupby_shift_with_freq.

dask/order.py Outdated Show resolved Hide resolved
dask/order.py Outdated Show resolved Hide resolved
@fjetter
Copy link
Member Author

fjetter commented Dec 11, 2023

Re algorithmic performance, this implementation is now slower than before but it look to be reasonable and in real world testing it doesn't seem to have an impact (the other stuff we're doing on the scheduler is just much slower).

Particularly, the dask-benchmark tests I've been running locally to test performance are often hitting the worst cases of this new algorithm.

Overall, I'm not concerned by a (~linear complexity) increase in runtime

dask/order.py Outdated Show resolved Hide resolved
dask/order.py Show resolved Hide resolved
dask/order.py Outdated Show resolved Hide resolved
dask/order.py Outdated Show resolved Hide resolved
fjetter and others added 2 commits December 11, 2023 16:44
Co-authored-by: Hendrik Makait <hendrik@makait.com>
dask/order.py Outdated Show resolved Hide resolved
dask/order.py Show resolved Hide resolved
dask/order.py Outdated
Comment on lines 495 to 501
cpath_append = critical_path.append
cpath_extend = critical_path.extend
cpath_pop = critical_path.pop
cpath_update = scrit_path.update
cpath_add = scrit_path.add
cpath_discard = scrit_path.discard

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I find this mildy confusing, aliasing these makes me start bookkeeping in my mind which object we append to vs. update, where we pop and where we discard, etc. Personally, I'd prefer either wrapping these in a single function that takes care of modifying both critical_path and scrit_path or explicitly calling the aliased methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this section. Flake8 was actually very useful here since it made me realize that I can reuse the same crticial_path list instead of having it recreated. This add/pop/etc. stuff is now a function that updates both collections as necessary

dask/order.py Outdated Show resolved Hide resolved
dask/order.py Outdated Show resolved Hide resolved
@fjetter
Copy link
Member Author

fjetter commented Dec 12, 2023

So, I saw again regressions in set_index benchmarks that somehow pop up on the ordering. I just cannot understand how ordering would have an impact on P2P shuffling. When inspecting the dashboard, two things pop up that probably cause the benchmark failure

image

For some reason the unpack tasks are flagged as root tasks and are queued which causes a heavy skew in assigned tasks which causes work stealing and a lot of irrelevant data transfer to compute the size. @hendrikmakait the p2p tasks previously carried worker annotations which effectively blocked them from ever being flagged root-ish. By removing those annotations we might have opened ourselves up to this scheduling anomaly. I don't understand how this connects to ordering and I suspect this is unrelated. Maybe the ordering change changed some subtle timing somewhere that makes it more likely to hit this.

@fjetter
Copy link
Member Author

fjetter commented Dec 12, 2023

OK, I think I nailed down the last failing test. Will move forward with merging unless there are any objections

dask/order.py Outdated
Comment on lines 604 to 607
elif not ( # type: ignore[unreachable]
identical_sets
and (result_first is r_child or r_child.issubset(result_first))
):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hendrikmakait I won't hold off merging for your review here but since we talked about it before this is the rewritten version. I think this much cleaner now (and I even wrote a couple of dedicated unit tests 🎉 )

@fjetter fjetter merged commit ff6ad1b into dask:main Dec 12, 2023
25 of 27 checks passed
@dcherian
Copy link
Contributor

This needs a blogpost to get the word out!

@d-v-b
Copy link
Member

d-v-b commented Dec 12, 2023

I think any merged PR with 50 + comments probably warrants a blog post. This one is no exception!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Regression in test_decide_worker_coschedule_order_neighbors
5 participants