Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally return stored data after storing #2980

Merged
merged 19 commits into from Jan 3, 2018

Conversation

jakirkham
Copy link
Member

Related to issue ( #2156 )

Adds an argument Dask Array's store, called keep. When set, keep will return the stored result(s). The behavior of keep differs a little depending on whether compute is set. If compute is False, Dask Arrays corresponding to the stored values are returned, which can be chained into other computations, persisted, etc. If compute is True, then it is like calling compute on Dask Arrays that would have been returned if keep was False.

@jakirkham jakirkham changed the title WIP: Return stored data after storing WIP: Optionally return stored data after storing Dec 9, 2017
@jakirkham
Copy link
Member Author

This works fine based on the tests added thus far. That said, would be good to get some feedback on this approach and whether it seems reasonable for Dask. Happy to provide more context if needed.

@mrocklin
Copy link
Member

mrocklin commented Dec 9, 2017

Interesting. I like the approach. Just so that I'm clear the objective here is something like

x = ... load data
y = ... transform data
y = y.store(..., keep=True)
z = ... transform data further 

In this case it seems like keep=True should almost imply compute=False

@jakirkham
Copy link
Member Author

Correct. That is the idea. Have a few other thoughts about how this can be used. Caching being one of them. Though there is already value in being able to checkpoint this computation.

Yeah was thinking about the meaning of compute in the context of keep as well. We can probably tweak the default arguments (e.g. make them None initially and then fill them out depending on what is set).

Another option would be to make compute=True, keep=True mean that persist is run on the Dask Array(s). Thus storing the data while loading it across the cluster. Storage in this case acts more like a checkpoint.

Something else to consider is we could break this up into insert_to_ooc and retrieve_from_ooc functions. In this implementation insert_to_ooc wouldn't actually return any data, but would be chained to a corresponding retrieve_from_ooc function that would. When providing compute=True, keep=True, persist could be called on the store portion. This would allow the store portion to occur asynchronously and tie any load steps to their corresponding store step's Future. If the data is too large to actually persist (without spilling to disk) and the intermediate result is actually useful, this ends up being a nice option. Expect there are some good opportunities to explore loading data and caching here as well.

@mrocklin
Copy link
Member

Another option would be to make compute=True, keep=True mean that persist is run on the Dask Array(s). Thus storing the data while loading it across the cluster. Storage in this case acts more like a checkpoint.

I'm against this. People should just use persist in this case.

Something else to consider is we could break this up into insert_to_ooc and retrieve_from_ooc functions. In this implementation insert_to_ooc wouldn't actually return any data, but would be chained to a corresponding retrieve_from_ooc function that would. When providing compute=True, keep=True, persist could be called on the store portion. This would allow the store portion to occur asynchronously and tie any load steps to their corresponding store step's Future. If the data is too large to actually persist (without spilling to disk) and the intermediate result is actually useful, this ends up being a nice option. Expect there are some good opportunities to explore loading data and caching here as well.

I like this. It seems fairly clean. I think we've recommended this workflow anecdotally to people in the past.

@jakirkham
Copy link
Member Author

Agreed. I'll retool this for the last proposal. Expect the code to get a lot cleaner (fewer branches) as a consequence.

@jakirkham jakirkham force-pushed the store_return_data branch 4 times, most recently from 4bffe44 to 75bc1dc Compare December 12, 2017 18:01
@jakirkham
Copy link
Member Author

Ok, I think this is starting to look the way we want. Would be curious to hear what you think.

@jakirkham jakirkham force-pushed the store_return_data branch 2 times, most recently from a9128bf to 0825850 Compare December 12, 2017 19:31
Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't given this a thorough review, just put in a couple comments.

In general this is more code than I expected to be necessary. My hope is that this would be closer to something like

finished_writing = delayed(...)
if keep:
    return da.from_array(out, chunks=inp.chunks, wait_token=finished_writing)
else:
    return finished_writing

src.dask
))
).persist().dask[each_store_key]
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.persist is blocking when using the single machine scheduler. We definitely don't want to call it in a loop.

FWIW I find this statement difficult to parse. That's subjective though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure we can drop all of these into one persist call. Expect that will make it easier to parse as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add that generally what we are trying to accomplish here is submitting the Dask Graph associated with storing each chunk.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also valuable to combine persist calls to reuse intermediates.

FWIW I think this statement should now be a bit cleaner and more performant.

dsk = dict()
for store_key in out_dsk:
load_key = ('load-%s' % store_key[0].lstrip("store-"),) + store_key[1:]
dsk[load_key] = (load, store_key,) + out_dsk[store_key][3:-1]

return dsk
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary on top of from_array? They seem very similar

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😕 I don't think we are calling from_array or are you meaning that you feel the behavior is similar to from_array? If the latter, more on this in the comment to follow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seemed similar to from_array. I was curious if there was a reason why we can't reuse the existing functionality. Mostly I'm concerned with the amount of code being written into dask.array.core to support this feature. If it isn't easy to accomplish what we want to do with existing tools then is there something we can do to improve our current tools?

@mrocklin
Copy link
Member

To be clear, current machinery doesn't implement the necessary bits to do this currently, but my hope was that we could modify the existing machinery in relatively minor ways rather than create a parallel system alongside.

@mrocklin
Copy link
Member

There may be reasons against this of course. As a disclaimer I haven't yet given this a very thorough review.

@jakirkham
Copy link
Member Author

Thanks for the feedback.

From a big picture standpoint, we'd really like to be able to view results as they are being computed. Also we'd really like to make sure that what we view is an accurate reflection of what we have saved to disk. In order to handle both of these, we really need to be able to view a chunk as soon as its done computing and saved to disk even if other ones are still being computed and/or written. Hence having this one-to-one mapping of store to load steps aids that goal by letting us pull in computed chunks from disk once they are ready. It's pretty easy to handle the problem in this context as we have all the relevant pieces. Moving them elsewhere may be possible, but could make it harder to reason about.

Ultimately I'd like to condense the branching in this PR. Expect that the end result will be able to reuse the bulk of store for the various different use cases. However it has been easier to try and deal with this as a separate code path initially. Mainly have done this to avoid locking us into a particular way of handling this case initially until we are happy with what it is doing. Also have done this to avoid messing up special case handling at the end of store or dealing with some special cases that store has like using delayed objects for output. IOW this is a consequence of where I have spent my time initially and not how I expect it to look in finished form.

@mrocklin
Copy link
Member

OK, I see how what I was proposing doesn't achieve the same goals as what you're doing here. I agree that what you're proposing sounds attractive.

@jakirkham
Copy link
Member Author

This could use another look if you have time.

Instead of merging each of `load_dsks`' graphs within a `for`-loop,
merge them altogether before entering the `for`-loop and reuse that
graph for all Dask Arrays constructed.
Make sure there is only one `return` statement to make it easier to see
that a `return` does occur.
Includes some tests to exercise `store`'s `keep=True` cases to make sure
this behaves as intended. Namely that a Dask Array is returned and that
storage only occurs if `compute=True` or if `compute=False` and a later
call to `compute` triggers this.
Follow a suggestion to rename the argument to make it a little clearer.
Namely explain how arguments are reused from the `store_chunk` calls to
construct the `load_chunk` calls. Also explain what is dropped and
replaced in these new calls.
@jakirkham
Copy link
Member Author

Thanks @jcrist. Pushed a bunch of changes based on your suggestions above. There is one point regarding the return statement that I'm not quite following. Will happily correct that once I have a clearer picture of what you are looking for. Please let me know your thoughts on the other changes.

# `(store_chunk, t, out, slc, lock, region, return_stored)`
# Namely drop the first 3 arguments as they are the function and two
# arrays that were already used up by `store_chunk`. In their place use
# `load_chunk` and the result from `store_chunk` (i.e. `each_key`).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verbosity makes it harder for people to read when skimming code. I'd just write:

# Reuse extra arguments from `store_chunk` in `load_chunk`.

Again, not a huge deal. I don't mean to bikeshed your code, I think we have two very different styles when writing. The comment is fine as is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries. Wasn't sure the level of detail we wanted here. Happy to cut to one line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Should make it a little clearer what is returned under which conditions.
@jakirkham
Copy link
Member Author

Think that addresses the last round of comments. Please let me know if there is anything else.

src.chunks,
src.dtype
))
result = tuple(result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies. One last nit: this loop/append/tuple can be reduced to a one liner, which IMO improves readability.

        return tuple(Array(load_dsks_mrg, name, src.chunks, src.dtype)
                     for name in load_names)

Take it or leave it. Otherwise this looks good to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Done.

Copy link
Member Author

@jakirkham jakirkham Jan 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...I think we got too clever here and introduced a bug. Shouldn't be src as that would only repeat the last array's metadata for all arrays. Will submit a follow-up PR to fix this.

Edit: The src was my doing originally though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test to demonstrate the bug and included a fix for it in PR ( #3064 ).

sources_dsk = Array.__dask_optimize__(
sources_dsk,
[e.__dask_keys__() for e in sources]
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you are looking @jcrist, would be good if you could give these optimization lines a quick look. Based on the docs this seemed ok, but you certainly know better since you wrote all of these __dask_*__ functions. 😉

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly how they should be used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking.

Follow-up on a suggestion to make this code more compact.
"""

result = None
if return_stored:
result = out
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this bit before. Is this still used? From below it looks like the result is always re-read from the store, but I may be missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is still used.

In an effort to chain the store_chunk calls to the load_chunk calls, we return out unchanged so that it can be fed as an argument into load_chunk via a key. Does that make sense? It could be there is a better way to do this. Just seemed simple enough to implement at the time.

We could also move this branch to the end and simply return either out or None if that would make it more clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, missed that it was out and not x. Fine by me.

@jcrist jcrist merged commit adff783 into dask:master Jan 3, 2018
@jcrist
Copy link
Member

jcrist commented Jan 3, 2018

Thanks @jakirkham!

@jakirkham jakirkham deleted the store_return_data branch January 3, 2018 04:12
@jakirkham
Copy link
Member Author

Thanks @mrocklin and @jcrist for the reviews.

@mrocklin
Copy link
Member

mrocklin commented Jan 3, 2018 via email

result = Delayed(name, dsk)

if compute:
result.compute()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed that this call should still get **kwargs. Fixing with PR ( #3300 ).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants