Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reusing intermediate results causes memory issues #854

Open
hendrikmakait opened this issue Feb 6, 2024 · 6 comments
Open

Reusing intermediate results causes memory issues #854

hendrikmakait opened this issue Feb 6, 2024 · 6 comments
Assignees

Comments

@hendrikmakait
Copy link
Member

hendrikmakait commented Feb 6, 2024

Problem

Whenever we reuse intermediate results and there is a pipeline breaker (such as shuffles, joins, reductions, or groupby operations), it forces us to materialize the entire intermediate result (thus, it is breaking the pipelining that we could utilize for reuse for example between multiple element-wise operations).

This result materialization puts a hard limit on our ability to scale as I have observed in multiple TPC-H benchmark queries.

To illustrate this, run these two snippets on a cluster of your choice

With full intermediate result materialization

from dask_expr.datasets import timeseries

from distributed import Client

if __name__ == "__main__":
    with Client() as client:
        print(client.dashboard_link)
        df = timeseries(start="2000-01-01", end="2020-12-31", freq="100ms", dtypes={"x": float})
        # To compute mean, we have to fully materialize df, and we won't free its data
        # until we have reused the chunks to compute a partial of the sum. 
        mean = df["x"].mean()
        df[df["x"] > mean].sum().compute()

Without full intermediate result materialization

from dask_expr.datasets import timeseries

from distributed import Client

if __name__ == "__main__":
    with Client() as client:
        print(client.dashboard_link)
        df = timeseries(start="2000-01-01", end="2020-12-31", freq="100ms", dtypes={"x": float})
        # Compute the mean beforehand so that we don't have to keep all of `df` in memory
        mean = df["x"].mean().compute()
        df[df["x"] > mean].sum().compute()

Possible solution

The easiest approach would be to never reuse any intermediate results. This has a few downsides:

  • Non-deterministic functions will lead to unexpected results
  • We waste a lot of computational resources on recomputations

...but it will allow us to scale.

We can certainly get smarter about intermediate result materialization, but this will require some effort depending on how smart we want to be. (There's a body of (ongoing) research and implementations in the database world we could draw from.)

@phofl
Copy link
Collaborator

phofl commented Feb 7, 2024

This is concerning

@fjetter
Copy link
Member

fjetter commented Feb 7, 2024

The two code examples above are identical, aren't they?

@mrocklin
Copy link
Member

mrocklin commented Feb 7, 2024

FWIW this is a long-standing issue. dask/dask#874

@hendrikmakait
Copy link
Member Author

The two code examples above are identical, aren't they?

Fixed, I messed up the copy+paste.

@hendrikmakait
Copy link
Member Author

Summary from multiple offline conversations with @phofl (and @fjetter):

We are mostly concerned about reuse after a reducer. Another example is df["y"] = df["x"] + df["x"].sum(). There are other possible scenarios where reuse may be problematic because of long processing chains that need to finish first, but they are also dependent on our scheduling algorithm and should be tackled once we see them happening in the real world.

@phofl
Copy link
Collaborator

phofl commented Feb 15, 2024

@fjetter and I chatted a little about this offline. There are 2 different approaches we can take:

  • Disconnect the IO nodes from the rest of the expression if we encounter a Pipeline breaker so that we can compute them independently. This means that we have to adjust the IO nodes from the reduction, which might be far away.

This is relatively easy to do, but it breaks outer assumption that our optimisations are only local and messes up the dependent tracking in the process. I moved away from this a little, because I don't like the bandaids we would need and it doesn't properly fit into our model.

  • Add a parameter like _branch_id to all of our expressions that is initialised with 0 or so by default. If a pipeline breaker is encountered, it increments this id and we bubble the increment down to the io nodes, e.g. basically iterating over dependencies of the pipeline breaker until we reach an IO node (very simplified explanation). This fits nicely into our local optimization approach since we can do one step at a time, similar to how projections move around. That said, I don't want to add this new parameter to every class, so we would need to enforce this somehow in the base class. That's something I am thinking about at the moment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants