Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JIT graph building #10518

Open
BrandonSmithJ opened this issue Sep 19, 2023 · 8 comments · May be fixed by #10534
Open

JIT graph building #10518

BrandonSmithJ opened this issue Sep 19, 2023 · 8 comments · May be fixed by #10534
Labels
needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. needs triage Needs a response from a contributor

Comments

@BrandonSmithJ
Copy link

BrandonSmithJ commented Sep 19, 2023

Here's a motivating example for what I'm referring to:

import dask.array as da 
import time 

zero = da.zeros((10000,5000,5000,5), dtype='float32', chunks=(1000,500,25,1))
data = da.overlap.overlap(zero, depth={0:10, 1:2, 2:2}, boundary=-1)


# Compiling the graph takes ~43 seconds for ~4.3 million tasks
start = time.time()
tasks = len(data.dask)
print(f'{time.time()-start:.2f} seconds to compile graph')
print(f'Graph contains {tasks:,} tasks')

# Computing the first block takes ~3 seconds since the graph is now compiled
start = time.time()
data.blocks[0,0,0,0].compute()
print(f'{time.time()-start:.2f} seconds to compute the first block\n')


# Create a fresh data graph
data = da.overlap.overlap(zero, depth={0:10, 1:2, 2:2}, boundary=-1)

# Computing the first block immediately takes ~43 seconds (as it will
# interally compile the whole graph, thus taking around the same time)
start = time.time()
data.blocks[0,0,0,0].compute()
print(f'{time.time()-start:.2f} seconds to compute the first block')

# Computing any other blocks is quick, now that the graph is compiled
start = time.time()
data.blocks[1,0,0,0].compute()
print(f'{time.time()-start:.2f} seconds to compute a different block')

# Graph is already compiled, so checking total tasks is now immediate
start = time.time()
tasks = len(data.dask)
print(f'{time.time()-start:.2f} seconds to compile graph')
print(f'Graph contains {tasks:,} tasks')

In the actual data I'm working with, the graph takes a significant amount of time to create before the first block can even start work.

In this case it's due to the rather large amount of operations that need to take place in dask.layers.ArrayOverlapLayer, but the general idea I'm discussing is: how feasible would it be to allow just-in-time graph compilation, in order to remove startup time dependency on the size of the data / overall number of tasks? Is it possible to allow compiling only the exact task dependency graph for a given object when it's computed, rather than having to build all tasks at the outset?

@github-actions github-actions bot added the needs triage Needs a response from a contributor label Sep 19, 2023
@fjetter
Copy link
Member

fjetter commented Sep 19, 2023

Thanks for your report.

Before I get into this, what you call "compilation", i.e. the process of building the "low level graph" / initializing all tasks, is what we typically refer to as "materialization".

With this out of the way, let's look at your data. Calling data.dask shows the representation of your graph up to this point, layer by layer (roughly every expression that is applied to your collection is represented as a layer).

You'll notice the attribute is_materialized which indicates whether the graph was already fully built since this is actually done layer by layer. You'll also notice the white vs grey circles.

image

There are a couple of layer types in here that are already materialized. Looking through the code base it appears that for those operations (primarily concatenate and overlap) the "non-materialized" representation of the graph is simply not implemented yet.

If this is implemented properly, we would indeed just compute whatever is necessary to calculate the desired output key as you are describing.

@fjetter
Copy link
Member

fjetter commented Sep 19, 2023

Apart from the materialization issues, your example is generating a graph of about 4MM tasks. This is larger than what we are typically dealing with and especially if you are adding more layers to this graph, this could quickly grow out of hand. If there is any opportunity to rechunk / use larger chunks, this could help significantly. IIUC your input dataset has chunks of about 50MiB. You may be able to go a little higher here

zero = da.zeros((10000,5000,5000,5), dtype='float32', chunks=(1000, 500, 25, 1))
# If the input data is the way it is, you can still rechunk later
# Mind that the new chunks are a multiple of the input, this way the operation is trivial
zero = zero.rechunk((2000, 1000, 25, 1))
# Adjust the overlap as necessary
data = da.overlap.overlap(zero, depth={0:5, 1:2, 2:2}, boundary=-1)

@BrandonSmithJ
Copy link
Author

Thanks for the reply!

So to reiterate what you're saying, the materialization does in fact already happen just-in-time, it's only because the non-materialization representation isn't implemented for concatenate/overlap that the current behavior requires those layers to be fully materialized in order to compute any output keys. If/when a non-materialization representation is available for those layers, it should be the case that in the example above, the first block can be computed without actually materializing the other (unnecessary for that block) parts of the low level graph. Did I understand that correctly?

For example, this specific area is where that non-materializing representation would be implemented? And this pull request is actually addressing that, except that it hasn't been merged yet. I can bump that pull request, since it seems like it may have slipped through the cracks.

So I guess that answers my main question, thanks! One thing from your reply that I'm unsure of, though: having something like 4MM (or even 400MM) tasks in a graph should only be a problem for the scheduler if you're actually trying to compute that entire graph, right?

If instead, we're only doing a small segment of the graph at a time (like in the example above, computing one block at a time) - and assuming the graph has a full non-materialization representation - there really shouldn't be any bottleneck on how large the overall graph can be. The bottleneck is purely at the number of tasks specifically required for the requested output key. Is that correct?

@fjetter
Copy link
Member

fjetter commented Sep 19, 2023

layers, it should be the case that in the example above, the first block can be computed without actually materializing the other (unnecessary for that block) parts of the low level graph. Did I understand that correctly?

Yes, we call this culling. We'll basically take the output key you requested and walk our way backwards building the graph. This way, you'll only build and compute what you actually require for your result.

For example, this specific area is where that non-materializing representation would be implemented? And #8476 is actually addressing that, except that it hasn't been merged yet. I can bump that pull request, since it seems like it may have slipped through the cracks.

I'm not familiar with this yet but from a brief glance, yes.

For concatenate, I believe it is simply not implemented, yet. One would need to implement a ConcatenateLayer (It looks like this is old code that is generating the low level graph eagerly. I don't know how difficult it is to move this into a layer.)


For the sake of transparency, this HighLevelGraph / Layer is an abstraction we're not entirely satisfied with. For DataFrames, there is a rewrite/experiment happening over in https://github.com/dask-contrib/dask-expr. This does not support arrays yet but it is our intention to add arrays to it, eventually. This will not help you now, I'm just mentioning this in case you are interested.

@BrandonSmithJ
Copy link
Author

BrandonSmithJ commented Sep 22, 2023

Unfortunately my application needs to work with arrays rather than DataFrames, but thanks for mentioning it - I'll definitely keep my eye on the progress.

With that said, I am somewhat invested in getting a faster subgraph computation like in the example above, and so I'm trying to dig into exactly why this example needs to take so long to compute the first block. In the reference you provided above to culling, one of the first lines of that function is to call get_all_external_keys, which is what triggers the materialization of the overlap layer.

The problem with that is, when there are millions of external keys for a layer (which seems to commonly be the case for an overlap layer), this takes significantly more time and memory to create and work with than any single key dependency path actually requires (regardless of if the layer is actually materialized, if I understand things correctly).

I'm having some difficulty understanding exactly why an individual layer needs the entire graph's external keys in order to cull its own dependencies, however. When I trace back the usage of the full external key set, it appears as though each layer is just checking whether each of its dependencies is actually contained in the full graph key set (here)...which to me, seems tautologically true.

I've made some modifications to dask that allow the above example to completely run in < 1 second (essentially removing the time dependency on the size of the graph/data):

0.02 seconds to compile graph
Graph contains 4,268,720 tasks
0.55 seconds to compute the first block

0.21 seconds to compute the first block
0.05 seconds to compute a different block
0.00 seconds to compile graph
Graph contains 4,268,720 tasks

This requires removing the calculation of all external keys that I mentioned above, however, which does work for this example - and I think theoretically should never be required for any other situation - but I'm not familiar enough with dask internals to say for sure. Do you (or anyone else) know why the calculation of all external keys is necessary? In what situation(s) would a layer need the external keys of the entire graph in order to cull its own dependencies?

@fjetter
Copy link
Member

fjetter commented Sep 25, 2023

@rjzamora maybe you got some insights here?

@rjzamora
Copy link
Member

In the reference you provided above to culling, one of the first lines of that function is to call get_all_external_keys, which is what triggers the materialization of the overlap layer... I'm having some difficulty understanding exactly why an individual layer needs the entire graph's external keys in order to cull its own dependencies

You have a good eye @BrandonSmithJ - The call to get_all_external_keys was meant to be a "temporary" thing when we first started shipping HighLevelGraph objects from client to scheduler. Before that, we were always culling a fully-materialized graph anyway, so the optimization was written in a way that assumed you could always check if a particular string or tuple was also a key in the graph. The "real" problem came when this assumption began to leak into critical areas of the HLG serialization code. For example, while "stringifying" both input and output keys during the serialization of a specific Layer, we would often use the set of all known external keys to tell us which tuple objects are actually keys.

You are absolutely correct that there is no fundamental reason that we need get_all_external_keys. In fact, now that the graph is usually not fully materialized when culling is performed, it is quite silly that we are still doing this. After materialization, the scheduler does need a full dependency graph for all keys in the optimized graph, but this certainly shouldn't require us to know all possible keys before optimization.

Both myself and others have proposed changes to this in the past (e.g. #9216). However, slightly more-visible problems, like HighLevelGraph serialization and key "stringification", have always tended to block/delay its prioritization. The good news is that these other priorities are mostly out of the way now, so it is probably a good time to discuss a path forward (even if HighLevelGraph design is ultimately replaced with an expression system).

@BrandonSmithJ
Copy link
Author

Thanks @rjzamora! The historical usage of it makes sense, and I think that does solidify my understanding on things.

I cleaned up my patch a bit and created a pull request (#10534) that addresses this issue, and seems to pass all tests (locally, at least). So it appears this might offer a generalized significant improvement in speed when a user is only computing parts of the overall graph.

Happy to discuss this further - whether it's any other small modifications that should be made, or if the change can't be incorporated for whatever reason.

@github-actions github-actions bot added the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Jan 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. needs triage Needs a response from a contributor
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants