Minor optimizations to Dask Array's store#3153
Conversation
|
@mrocklin, would like to get this into the upcoming release if at all possible. |
|
My point in #3082 still stands that I believe that these optimizations would better be done by generating the graphs directly rather than relying on the behavior of the optimizations. Reiterating my argument from there:
This isn't a firm -1, I would just urge you to think about if there's a cleaner way to get the graph structure you're looking for that doesn't rely on calling the optimizations. |
|
Thanks for taking a look, @jcrist. So, as noted above, the optimizations we have added here would happen if the graph was submitted to the scheduler anyways. Also Would add that for the case of If there are points where the intent is unclear, I'm happy to add comments. |
|
To @jcrist's point historically having optimizations in algorithmic code has led to difficult to identify errors.
I'm not very familiar with how store works anymore, so please forgive some questions here. So lets say the user calls store on a dask array with a complex graph behind it, but the user also asks us to give them back an array with the stored resuls. How deep is the graph that they get? I would expect it to be pretty shallow if they're loading from computed results. I would expect no change to the graph at all if they didn't call compute. @jakirkham presumably these optimizations are relevant for your use cases. It might help to motivate this work if you were to include a bit about what is behaving poorly for you. |
|
In regards to the release I'd like to go ahead. I've been stalling for a few days as things have gotten cleaned up. I'd strongly prefer not to stall any longer. I would like to start this process and have most of it wrapped up (along with some other work that depends on it) before end of day today. |
Of course. Avoiding running optimizations is an admirable goal. That said, IMHO this is not algorithmic code (e.g. There were some other concerns that @jcrist reasonably raised in the previous PR regarding locking, which have been dropped from this one in the hopes it would be more appealing/less controversial.
As of PR ( #2980 ), the user will always have Dask Array optimizations applied to the If they did call with |
The reason I asked to try and get this into this release is this was something I had raised as blocking for me at the last meeting. Unfortunately urgent personal matters came up inbetween then and now and I haven't had time to look at fixing the old PR ( #3082 ) until today. So don't want to hold up the release if it is pressing to get the release out (especially given I fumbled the ball here). If it is not pressing though, I would like a chance to address this issue. Please let me know either way. |
If we're using the result of Regarding making the graph even more shallow, are the extra tasks causing significant performance issues? Are there advantages to fusing now rather than letting the downstream optimizations handle it? |
|
If we can knock things out in twenty minutes or so I'm happy to wait. I doubt that this will happen though (and changes get done and tests pass, and more changes, etc.) so I'd rather move on with the release if it's alright. |
|
If you're able to provide a couple of small examples that might help me understand the value of these changes more clearly. |
If it is that urgent, I guess go ahead. |
It is somewhat urgent. Current bokeh builds are broken against dask. Other projects have been asking for a release. More generally though, we've been trying to get a release out for a couple weeks and have constantly had people ask for small PRs to be included. At the meeting last week we decided on today as being a strong cutoff. I think that we should stick to it. |
540786d to
a55b925
Compare
It looks like Dask used milestones to track tasks for releases in the past. Not sure why that stopped, but maybe it is time to bring them back. This would help make it clearer what things are release blocking and when the deadlines are. Doesn't mean there won't be disagreements about both what should go in and when things should be done in the future. Though it would help make the communication about each release clearer and easier to track. Thoughts? |
|
I'm not sure that milestones were ever seriously used. I think that one or two devs tried to make them a consistent practice but it didn't seriously take. I'm open to the idea, but there are also procedural costs to consider (more paperwork to handle on every change). This would take someone to champion the idea and put in a fair amount of work to hold people to it to start up the process. |
0fac252 to
e143886
Compare
|
Took a bit of work, but managed to eliminate |
36af18a to
ba91a70
Compare
4ce912a to
566ec92
Compare
|
Thanks for the advice @mrocklin. Have added a test using Took a different tack for 2. Instead counted the number of times the lock was acquired at the end of the computation. As we know the lock should be acquired only once per chunk when Please let me know what you think. |
|
Planning on merging tomorrow if no comments. |
jcrist
left a comment
There was a problem hiding this comment.
Thanks @jakirkham, overall this looks good to me. I left a few nit-picky comments on style that you can feel free to ignore, and a few small things I'd like changed. Otherwise this looks good to merge. Thanks for being patient here.
dask/array/core.py
Outdated
| store_keys = [] | ||
| store_dsk = [] | ||
| for tgt, src, reg in zip(targets, sources, regions): | ||
| tgt = getattr(tgt, "key", tgt) |
There was a problem hiding this comment.
Can you explain what this does? It's not clear to me what case works with a target with a key attribute. If this is for supporting Delayed targets, I'd prefer an explicit instance check rather than an attribute check as:
- It's more apparent what the intent is
- Won't fail for targets that just happen to have a
keyattribute.
There was a problem hiding this comment.
Right, as you guessed this is dealing with Delayed targets. The original code actually used try...except AttributeError:.... Was originally introduced in PR ( #2181 ). So thought this was some small improvement. Though agree we should be verifying things actually are Delayed. That said, we might want this check a bit earlier as there is a call to __dask_optimize__ for these objects.
There was a problem hiding this comment.
Makes sense to me. Perhaps something like this earlier:
for t in targets:
if isinstance(t, Delayed):
targets2.append(t.key)
elif is_dask_collection(t):
raise TypeError("Targets must be either Delayed objects or array-likes")
else:
targets2.append(t)Then use targets2 unconditionally in this loop.
There was a problem hiding this comment.
Should also probably update the docstring for store to add that targets can be instances of Delayed.
| targets_dsk = sharedict.merge(*targets_dsk) | ||
| targets_dsk = Delayed.__dask_optimize__(targets_dsk, targets_keys) | ||
|
|
||
| load_stored = (return_stored and not compute) |
There was a problem hiding this comment.
nit: these parenthesis are unnecessary
There was a problem hiding this comment.
I know they aren't necessary, but it feels more readable to me. If there are strong objections, can change though.
dask/array/core.py
Outdated
| slices = [fuse_slice(region, slc) for slc in slices] | ||
|
|
||
| name = 'store-%s' % arr.name | ||
| sto_chunk = store_chunk |
There was a problem hiding this comment.
In cases like this in the past we've just used a chunk or func variable name. sto_chunk looks like a typo to me. No strong feelings though.
dask/array/core.py
Outdated
| for t, slc in zip(core.flatten(arr.__dask_keys__()), slices): | ||
| store_key = (name,) + t[1:] | ||
| dsk[store_key] = (store_chunk, t, out, slc, lock, region, return_stored) | ||
| dsk[store_key] = (sto_chunk, t, out, slc, lock, return_stored) + args |
There was a problem hiding this comment.
nit: I'd write this as a dictionary comprehension, but that's just my opinion. No strong feelings.
There was a problem hiding this comment.
Felt like I had made a lot of changes in the code and was trying to keep it easy to review. So didn't rewrite this, but sure we can give it a try.
dask/array/core.py
Outdated
| load_key = ('load-%s' % k[0],) + k[1:] | ||
| # Reuse the result and arguments from `store_chunk` in `load_chunk`. | ||
| load_dsk[load_key] = (load_chunk, each_key,) + dsk[each_key][3:-1] | ||
| load_dsk[load_key] = (load_chunk, dsk_post[k]) + dsk_pre[k][3:-1] |
There was a problem hiding this comment.
nit: I'd write this as a dictionary comprehension, but that's just me. No strong opinions.
load_dsk = {('load-%s' % k[0],) + k[1:]: (load_chunk, dsk_post[k]) + dsk_pre[k][3:-1]
for k in keys}There was a problem hiding this comment.
Same story as above. Was trying to keep this easy to review, but sure it can be rewritten.
| if i == 9: | ||
| assert False | ||
|
|
||
| # Verify number of lock calls |
There was a problem hiding this comment.
Can you expand this comment slightly to say why it should be 2 * nchunks in one case and nchunks in the other? I know why right now, but future maintainers may not.
There was a problem hiding this comment.
Sure. Though I'd prefer to put that comment near the branch below if that is ok.
There was a problem hiding this comment.
Besides this comment, all remaining things I pointed out were nits that can be ignored (up to you).
There was a problem hiding this comment.
Sure. Was working my way through them. Just now got this one.
ce32dd6 to
897ff7b
Compare
There was an extra comma at the end of a `tuple` in `retrieve_from_ooc`. This drops that extraneous comma to improve readability.
Make sure that the Dask keys that are passed to `Array`'s `__dask_optimize__` method are flattened. While the `__dask_optimize__` method of `Array` does flatten the keys, it is best not to rely on that in `store` and just ensure they are properly formatted.
Instead of leaving the user-provided `region` and chunk slice (i.e. `index`) to be fused in the `store_chunk` and `load_chunk` functions, go ahead and fuse them when constructing the Dask graph and leave `region` as `None`. This should cutdown the work done in `store_chunk` and `load_chunk`. Also should allow for the `region` argument in both of these functions to be subsequently dropped.
Since `fuse_slice` is now run as part of Dask construction and this removes the need to pass the `region` on for handling in the store/load chunk functions, simply drop the `region` arguments and related logic from `store_chunk` and `load_chunk`. Makes them a bit easier to follow and maintain going forward.
Use the standard way of optimizing Dask Delayed objects to optimize the targets in advance if needed. Thus if `cull`ing or any other optimization strategy becomes common place for Delayed objects, it will be handled through the standard mechanisms instead of us doing it in some custom way. As an added bonus, this avoids collect empty dictionaries in the event a target does not have a Dask (e.g. not Delayed). Previously `store` had collected empty dictionaries in this case. So this should be an improvement when constructing and working with the `SharedDict` object for `targets` in the common case (i.e. no Delayed targets). Also provides a check to guarantee we are actually getting Dask Delayed objects and are not supporting any other Dask collections. If we don't get a Dask Delayed object, we assume this is some Array-like object.
For some time, `store` has supported `Delayed` `targets`. The intent being to support creating a storage location once storage occurs and possibly handle other things. However this wasn't noted in the docstring. So we make that a little clearer by noting `targets` can be `Delayed`.
Before merging store Dasks with source and target Dasks, go ahead and merge the store Dasks together alone. Simplifies the merging process a bit and avoids directly relying on a dependency. Also makes sure that there is a `SharedDict` for each group of related dictionaries that get added to final store Dask. Namely the `sources`, `targets`, and now the `store_chunk`s steps.
As we are only really interested in the one Dask graph that represents all `store_chunk` operations, renamed `store_dsks_mrg` and `store_dsks` to `store_dsk` to make that clearer.
Provides an option to give `retrieve_from_ooc` a computed Dask as well to fill in values in the `load_chunk`s part of the Dask. This can be useful when inlining results from the `persist`ed `store_chunk`s step. This argument is totally optional and will be used to inline computed results of `Future`s as the case may be. If this argument is not provided, then the keys are simply inserted into the new Dask as was the case before.
Renames `store_chunk` to `load_store_chunk`. The new `load_store_chunk` function differs slightly from `store_chunk`'s previous behavior in that it will skip storing results if `x` is `None`. Also `load_store_chunk` takes the new `load_stored` option, which allows for reading the results back in from `out` after storing them. IOW `load_store_chunk` provides the behavior of `store_chunk`, `load_chunk`, and a fused version of `store_chunk` and `load_chunk`. By keeping this functionality all within one function using different options, it should avoid divergence that could occur with having different functions. Reimplements `store_chunk` and `load_chunk` to be light wrappers around `load_store_chunk` with only a few needed arguments. The rest are handled in some default manner. By doing this, the original behavior of `store_chunk` and `load_chunk` are restored while simply using `load_store_chunk` underneath the hood. The `store_chunk` and `load_chunk` functions are kept to preserve readability in terms of the code, Dasks generated, and scheduler progress. Makes some adjustments to `insert_to_ooc` to handle either using `load_store_chunk` when `return_stored` and `load_stored` are both `True` or simply using `store_chunk` for all other cases. This results in some tweaks to the key name used and arguments provided. The purpose of these adjustments are to ensure `load_store_chunk` shows up when loading and storing will happen in the same step. Otherwise `insert_to_ooc` only adds `store_chunk`, which may or may not be followed by a separate `load_chunk` step depending on whether `return_stored` is `True` or `False`.
Refreshes `store` so as to effectively inline all uses of `store_chunk`s when `return_stored=True`. Thus users only see `load_store_*` nodes whether or not `compute` is `True` or `False`. This avoids some lock contention between storing and loading steps when `compute` is `False`. When `compute` is `True`, this effectively inlines `Future`s or results that the `store_chunk` step would have returned. Thus making the most compact Dask possible when `return_stored=True`.
Appears that `sto_chunk` was more confusing than helpful in the review. So rename it to `func` to make it clear we intend this to be a different function and not a typo of `store_chunk`.
In the `insert_to_ooc` and `retrieve_from_ooc` functions, make use of dictionary comprehensions to simplify the code a bit. These also tend to be a bit faster than Python `for`-loops.
When `return_stored` and `compute` are both set to `True`, ensure that `store` returns Dask Arrays that contain depth 1 Dasks only. This helps ensure that the Dask Arrays generated in this case don't have any lingering connections to the Dask used for storing in the Dask Arrays returned.
Verifies how many times a `Lock` instance is acquired in `store` when using `return_stored`. Namely if `compute=True`, the lock will be acquired two times for each chunk as storing and loading are separate steps to allow for storing of chunks to occur asynchronously. However if `compute=False`, the lock will be acquired only one time for each chunk as store and load are fused into a single step that acquires the lock for both parts. This acts as a check to ensure that the `compute=False` case of `return_stored` does in fact fuse storing and loading into a single step that holds the lock for both parts.
897ff7b to
848f9da
Compare
|
Thanks for the comments @jcrist. Think I have addressed the bulk of your concerns. Could use another look if you have time. |
|
LGTM. Feel free to merge when tests pass. |
|
Thanks everyone. :) |
Performs some optimizations for
Delayedtargets. Namely skips collecting empty dictionaries if noDelayedtarget is present. Also optimized theDelayedtarget portion of the Dask as this was not being done before (already done as part of the Dask Array optimizations). In cases not usingDelayedtargets, this should be a no-op.If
return_storedandcomputeare bothTrue, builds theload_chunkDask after callingpersiston thestore_chunkDask. This comes with the nice benefit of inlining results frompersistdirectly into theload_chunkDask. Thus simplifying the graph used for the Dask Arrays constructed in this case. Also simplifies thefor-loop over sources and targets.In the case of
return_stored=Trueandcompute=False, fuses thestore_chunkandload_chunkcalls into oneload_store_chunkfunction per @jcrist's suggestion. This allows the lock (if any) to be held for the duration of both the store and load steps. Thus this should avoid contention between store and load steps on chunks.Merges Dasks corresponding to chunks of particular types (e.g.
Delayedtargets,store_chunkcalls,load_chunkcalls) in a few cases before merging with other Dask types.Also simplifies
store_chunkandload_chunkby runningfuse_slicefor the providedregionbefore constructing the Dask. This simplifies the logic in these functions and avoids the overhead (however small) of offloading this work to these functions.Admittedly of the optimizations done here, similar optimizations are performed when submitting Dask Arrays for computation. However as
storerepresents a final step in many ways, it seems reasonable to perform some of these optimizations here before returning to the user.