Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P shuffling and queuing combined may cause high memory usage with dask.dataframe.merge #7496

Closed
hendrikmakait opened this issue Jan 23, 2023 · 20 comments · Fixed by #7514
Closed

Comments

@hendrikmakait
Copy link
Member

hendrikmakait commented Jan 23, 2023

When P2P shuffling and queueing are used together in a workload where two dataframes are merged and both dataframes are generated by rootish tasks, they can cause unnecessarily high memory usage and spilling:

Queuing enabled:
queueing-merge

Queuing disabled:
non-queueing-merge

What I expect to happen
We should first finish all transfers of one shuffle as well as its barrier, followed by a single unpack and then the transfers of the second shuffle and the second barrier as prescribed by the topological sort. Once both shuffle barriers are finished, the individual output pairs of the shuffle get read from disk, merged and handed to downstream tasks.

This is what we see without queueing.

What happens instead
All upstream tasks of both shuffles are queued. This causes the scheduler to finish one shuffle transfer followed by its barrier and immediately materialize all of its outputs. The scheduler is forced to ignore the topological sort since the inputs of the second transfer are still queued and therefore ignored while other tasks - the output tasks of the shuffle - are available for execution.

With larger-than-cluster-memory data, this can cause significant spilling, excessive disk I/O, and consequently longer runtime.

Reproducer
The plots above were generated by running test_join_big from coiled/benchmarks#645 with a size factor of 1x the cluster memory using a cluster of 10 workers. Any workload generating two dataframes using rootish tasks (e.g. by generating random partitions or reading from Parquet) and then merging those two should work.

from dask.datasets import timeseries
with dask.config.set(shuffle="p2p"):
    df1_big = timeseries(
        "2000-01-01", 
        "2001-01-01", 
        dtypes={str(i): float for i in range(100)}
    )
    df1_big["predicate"] = df1_big["0"] * 1e9
    df1_big = df1_big.astype({"predicate": "int"})

    df2_big = timeseries(
        "2000-01-01", 
        "2001-01-01", 
        dtypes={str(i): float for i in range(100)}
    )

    df2_big["predicate"] = df2_big["0"] * 1e9
    df2_big = df2_big.astype({"predicate": "int"})

    join = dd.merge(df1_big, df2_big, on="predicate", how="inner")
    await client.compute(join.size)
@hendrikmakait hendrikmakait changed the title P2P shuffling and queuing combined cause high memory usage with dask.dataframe.merge P2P shuffling and queuing combined may cause high memory usage with dask.dataframe.merge Jan 23, 2023
@fjetter
Copy link
Member

fjetter commented Jan 24, 2023

Edit: Sorry, I believe I misread your comment. I thought you were suggesting that the topological sort is wrong

one after another as prescribed by the topological sort.

This is not what the topological sort suggests. Given two simple dataframes

df = pd.DataFrame({
    "A":[2001,2002,2003],
    "B":['Apple','Banana','Coconut']
})
ddf1 = dd.from_pandas(df, 2)
ddf2 = dd.from_pandas(df, 3)
# This join is nonsense, of course but dask doesn't know that ;)
join = dd.merge(ddf1, ddf2, how="inner", shuffle='p2p')

we can inspect the join (I used different partition sizes to allow us to distinguish the DFs in the graph
This graph is merely for reference. It shows how we generate two barriers, etc.

no_color

If we look at the ordering, we can clearly see that the first DF is transferred entirely, followed by the barrier and a single unpack. only then it starts to generate and transfer the second dataframe (right hand side). We can further see that once the second dataframe is transferred, the first unpack (18) will be merged by the previously unpacked partition of the first dataframe. from here on, the tasks will be unpacked in corresponding pairs.
I would argue this is perfect sorting for this case.

ordered

If the unpack task pairs do not have the same worker assingment, this will require another data transfer, of course so I would be more interested in knowing how the worker assignment looks like for this case.
My naive expectation given our current implementation is that the output should already be assigned properly but we should definitely check this.

FWIW I believe tasks with restrictions, i.e. the unpack tasks are not using queuing, yet so this may be a weird interaction between the queued root tasks and the not-queued unpack tasks

@fjetter
Copy link
Member

fjetter commented Jan 24, 2023

This can be easily reproduced

from dask.datasets import timeseries
ddf1 = timeseries(
    "2020-01-01",
    "2030-01-02",
)
ddf2 = timeseries(
    "2020-01-01",
    "2030-01-02",
)
ddf1.merge(ddf2, shuffle='p2p').compute()

screenshot of the tipping point.

image

@hendrikmakait
Copy link
Member Author

one after another as prescribed by the topological sort.

This is not what the topological sort suggests. [...]

If we look at the ordering, we can clearly see that the first DF is transferred entirely, followed by the barrier and a single unpack. only then it starts to generate and transfer the second dataframe (right hand side). We can further see that once the second dataframe is transferred, the first unpack (18) will be merged by the previously unpacked partition of the first dataframe. from here on, the tasks will be unpacked in corresponding pairs.

Sorry, I adjusted the description to be more precise. I skipped over the single unpack to focus on the important part: We should transfer all shuffle data before starting to read (a lot of) outputs.

I would argue this is perfect sorting for this case.

Agreed, as I said, what happens without queueing is what the topological sort prescribes and what I would expect to happen.

@gjoseph92
Copy link
Collaborator

Interesting. I think this is happening because the output tasks use worker restrictions, so they can't be queued.

Queuing would handle this graph structure fine. Both the transfer tasks and the unpack tasks should be root-ish, so everything should be queued and processed in priority order.

But, because the unpack tasks use worker restrictions, they don't meet the is_rootish heuristic, and aren't queued. So while the input tasks wait in an orderly queue, the output tasks all get submitted at once after the barrier (aka root task overproduction). When this happens, the workers become fully saturated, preventing the remaining input tasks in the queue from being scheduled.

(as @fjetter showed) here's the graph in priority order:

p2p-join-order

  • Queuing generally doesn't play well with task restrictions. Normal tasks are always going to defer to restricted tasks, even if numerically, their priorities indicate they should be scheduled first.

    That is, queueing only works assuming all tasks could be queued. If some tasks get to bypass the queue, we'll always execute them first, and traverse the graph way out of priority order.

    More broadly, I'd say scheduling with restrictions is still second-class in dask IMO. I don't know what we want to do about this.

  • Fixing this well would be hard. We'd need to work out better scheduling mechanisms for restricted tasks, including a way of queuing them on the scheduler.

  • The easiest, hacky fix is probably a way to prevent the scheduler from queuing the transfer tasks. Maybe like:

diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index fa167efc..ec04b7f2 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -2211,7 +2211,10 @@ class SchedulerState:
             # removed, there should only be one, which combines co-assignment and
             # queuing. Eventually, special-casing root tasks might be removed entirely,
             # with better heuristics.
-            if math.isinf(self.WORKER_SATURATION):
+            if (
+                math.isinf(self.WORKER_SATURATION)
+                or ts.annotations.get("queue") is False
+            ):
                 if not (ws := self.decide_worker_rootish_queuing_disabled(ts)):
                     return {ts.key: "no-worker"}, {}, {}
             else:
diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py
index 2f07b56e..3fbf5336 100644
--- a/distributed/shuffle/_shuffle.py
+++ b/distributed/shuffle/_shuffle.py
@@ -135,7 +135,7 @@ class P2PShuffleLayer(SimpleShuffleLayer):
         annotations: dict | None = None,
     ):
         annotations = annotations or {}
-        annotations.update({"shuffle": lambda key: key[1]})
+        annotations.update({"shuffle": lambda key: key[1], "queue": False})
         super().__init__(
             name,
             column,

@hendrikmakait
Copy link
Member Author

  • The easiest, hacky fix is probably a way to prevent the scheduler from queuing the transfer tasks.

We'd have to propagate the queue annotation upstream, which might very well have unintended side effects. Otherwise, the issue would be that the assign tasks upstream of the transfer still get queued.

@gjoseph92
Copy link
Collaborator

Yup, setting a queue=False annotation on the P2PShuffleLayer doesn't work. It does prevent the transfer tasks from getting queued—but the upstream tasks from that are still queued. You'd have to set queue=False on your entire pipeline up until the shuffle.

I suppose we could write a HLG optimization to do this for you. This might even be the best option. Implementing the scheduling to make queueing work alongside task restrictions seems like a significant amount of work.

@hendrikmakait
Copy link
Member Author

While queue=False won't work, maybe we could do the opposite, add an annotation that lets the scheduler queue tasks that have worker-restrictions? I'm not sure what the consequences of that would be performance-wise, but that might be a quick hack.

@gjoseph92
Copy link
Collaborator

Unfortunately we can't do that. Because when a slot opens up on a worker, we just pop the first task off the queue. If that task in fact needs to run on a different worker, we can't use it. We'd end up having to search the queue for a task that can run on the worker. This is at least O(n) per task (sort plus linear search)!

The whole design of queuing right now is around tasks being agnostic to which worker they run on (not just regarding restrictions; they don't even have dependencies we need to schedule near). Making queuing performant with tasks that are heterogeneous in any way will require some thought.

@hendrikmakait
Copy link
Member Author

@gjoseph92: Yeah, I feared that was the answer. Thanks for reminding me of the implementation!

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Jan 24, 2023

I'll note that worker restrictions are probably the easiest case of restrictions to add to queueing. Compared to arbitrary (combinations of) resource annotations, for example, worker-restricted tasks are pretty straightforward. You just need a queue per worker, and you don't need to worry about rebalancing those queues (except when restrictions on tasks are changed...).

The trickier part is hooking it into the scheduling process. There are now multiple queues from which to source tasks. So you need to take the top K tasks from both queues. That's not hard with just 2 queues, but making it performant when expanding into other types of task restrictions (and potentially many queues) could be more complex.

(The disappointing thing is that worker restrictions are probably the least-useful type of restriction for other purposes IMO. Resource restrictions are pretty useful and people actually use them sometimes, worker restrictions I imagine less so in both regards.)

@fjetter
Copy link
Member

fjetter commented Jan 30, 2023

In an offline conversation with @hendrikmakait we discussed the possibility to implement a dedicated MergeLayer using the extension instead of performing two shuffles + concat. This would bypass this problem quite elegantly and would further reduce complexity on the internals when it comes to managing resources for concurrent shuffles.
Most importantly this would not require us to adjust queuing. Queuing is fine for just one shuffle layer.
There may exist much more complex topologies where this still breaks down but I think this MergeLayer would cover most applications and is a relatively easy next step

@gjoseph92
Copy link
Collaborator

So basically, the MergeLayer would "triangle-fuse" the outputs of the two shuffles? You'd still have two barriers, but rather than each feeding into separate unpack tasks, the unpack tasks would be combined (each would depend on 2 barriers instead of the usual 1).

This was something we came across a year ago with P2P.

Also FYI, you can make this triangle-fused graph structure happen just by making the output tasks Blockwise. (This is desirable anyway, so they can fuse onto subsequent tasks.) Blockwise fusion will automatically create the structure you're looking for. A special MergeLayer might not be necessary.

However, it opens up a new problem, where a worker joins between when the first and second shuffle starts. Copied from an old internal ticket https://github.com/coiled/dask-engineering/issues/50:

If you make the output tasks of a shuffle Blockwise, a downstream Blockwise that takes both shuffles as inputs will merge all three together. This is what happens during merge.

A simple reproducer is code from #5524 (basically how merge works):

    s1 = shuffle(df1, "id")
    s2 = shuffle(df2, "id")
    merged = dd.map_partitions(
        lambda p1, p2: pd.merge(p1, p2, on="id"), s1, s2, align_dataframes=False
    )
blockwise1

We could just validate the situation: when there are merged sibling shuffles, allow them to proceed iff they use the same list of workers (and have the same number of partitions). If not, fail with a clear error.

But we should make the situation actually work: have subsequent shuffles sharing outputs re-use the workers list from their first sibling that ran. Doing this will require some introspection of the graph at runtime.

@fjetter
Copy link
Member

fjetter commented Jan 31, 2023

This was something we came across a year ago with P2P.

I am aware. Using Blockwise might be an option but as you said this would introduce a different problem about worker assignments. I believe this can be avoided if we combine this into a single MergeLayer

I also appreciate that Blockwise provides desired task fusing but I'd rather approach this topic in a dedicated step and solve this problem in isolation

@fjetter
Copy link
Member

fjetter commented Jan 31, 2023

Apart from the worker assignment, we are currently using the custom layer also to make sure culling works as intended. (A culled layer generates a new token). If we split this layer up into

  • Blockwise
  • Barrier
  • Blockwise

we wouldn't be able to regenerate this token IIUC

There are also ways to deal with this w/out a custom layer but this is how it currently done

@hendrikmakait
Copy link
Member Author

hendrikmakait commented Jan 31, 2023

However, it opens up a new problem, where a worker joins between when the first and second shuffle starts.

FWIW, I think we may have already solved that problem:

if ts.worker_restrictions:
output_worker = list(ts.worker_restrictions)[0]

For any of the output tasks, we check if it already has worker restrictions, and if so, we simply assign it to the first worker in there. This means that if we fuse the output tasks, the earlier shuffle initializes the restrictions on the output task, and the later shuffle makes use of the decisions made by the earlier one.

@fjetter fjetter mentioned this issue Jan 31, 2023
@gjoseph92
Copy link
Collaborator

I think we may have already solved that problem

Yup, it seems like that will do it! Nice.

we are currently using the custom layer also to make sure culling works as intended

Ah, makes sense. Switching to blockwise doesn't seem worthwhile then.

@gjoseph92
Copy link
Collaborator

I think this is a broader problem than just merges. Any binary operation between two shuffled DataFrames will cause high memory usage—merge is just one particular case of a binary op. Here I trigger it just by doing a + b:

# run under jemalloc on macos:
# sudo DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib python p2ptest.py

import dask
import distributed.diagnostics
from dask.datasets import timeseries

if __name__ == "__main__":
    with dask.config.set(shuffle="p2p"):
        df1_big = timeseries(
            "2000-01-01", "2001-01-15", dtypes={str(i): float for i in range(10)}
        )
        df1_big["predicate"] = df1_big["0"] * 1e9
        df1_big = df1_big.astype({"predicate": "int"})

        df2_big = timeseries(
            "2000-01-01", "2001-01-15", dtypes={str(i): float for i in range(10)}
        )

        df2_big["predicate"] = df2_big["0"] * 1e9
        df2_big = df2_big.astype({"predicate": "int"})

        df1_s = df1_big.shuffle("predicate")  # probably `set_index` IRL
        df2_s = df2_big.shuffle("predicate")  # probably `set_index` IRL

        binop = df1_s + df2_s

    with distributed.Client(
        n_workers=4, threads_per_worker=2, memory_limit="2GiB"
    ) as client:
        ms = distributed.diagnostics.MemorySampler()
        with ms.sample("queuing on"):
            binop.size.compute()

        client.run_on_scheduler(
            lambda dask_scheduler: setattr(
                dask_scheduler, "WORKER_SATURATION", float("inf")
            )
        )
        client.restart()

        with ms.sample("queuing off"):
            binop.size.compute()

        ms.plot(align=True).get_figure().savefig("p2p-binop-memory.png")

p2p-binop-memory

Re-setting the index on two dataframes in order to do an aligned binary operation between them is a valid and IMO common use case.

I don't think the approach like a MergeLayer would be feasible for this, because there are just so many places that could generate the problematic graph that would need to be a MergeLayer.

I think we'd have to either detect this via a custom graph optimization, and rewrite to the fused graph, or actually handle the worker-restriction case with queuing.

@fjetter
Copy link
Member

fjetter commented Feb 8, 2023

@gjoseph92 your script on my machine produces this
p2p-binop-memory

I do see the pattern of wrong scheduling but it barely has any effect on this example for me

@gjoseph92
Copy link
Collaborator

Interesting. I ran it again and still got the same result. Did you use jemalloc? (I'm using that in place of malloc_trim on macOS.) I can't even get it to complete without jemalloc (the p2p case runs out of memory and crashes). Possibly could also be an intel vs apple silicon difference? FWIW I had to try a few data sizes to get the graph I showed; at smaller data sizes, my results looked more like what you showed here.

Also just kind of interesting—I'm realizing this issue is the scheduler equivalent of #6137.

@fjetter
Copy link
Member

fjetter commented Feb 9, 2023

Did you use jemalloc? (I'm using that in place of malloc_trim on macOS.)
Yes but it doesn't do a lot. Nothing, really. I verified the paths, etc. but haven't checked within the application what is being used (is there a way to confirm this?)

Possibly could also be an intel vs apple silicon difference?

Yes, strongly suspect that's it.


Doesn't matter a lot that my memory profile looks different. With OSX it is also entirely possible that the memory is just compressed well enough. The scheduling pattern is the harmful one and that's what matters.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants