New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Filesystem persistence and using dask as a make/snakemake alternative #2119
Comments
I suspect that you could implement this today with a scheduler plugin and an on-disk mutable mapping like shelve or chest. The plugin would look at a dask graph and its on-disk mutable mapping and replace any keys in the dask graph with the values from the mutable mapping (or functions that obtain those values). It would then cull (dask.optimize.cull) the graph to remove excess nodes that no longer need to be there. This is what the opportunistic caching plugin does already (dask.diagnostics.Cache) except that it also includes heuristics for what to store and, by default, it uses an in-memory dict (though this can be changed). Is this enough information to get started on @nbren12 ? |
Thanks for the suggestions @mrocklin. I am not very familiar with the dask internals, but I am willing to make a stab at it, since this would be very useful to me. I will take some time to understand how the opportunistic caching plugin works. Ultimately, it would be also very useful to let the user specify the exact file-type (and possibly path) that they want the data to be stored on disk as. This would then enable including calls to command line tools as part of the graph, like in the following image. |
Delayed doesn't know anything about storing things on disk. I suspect that
you could make a slightly more complex decorator that wrapped around
delayed and did what you wanted though.
…On Fri, Mar 24, 2017 at 1:23 PM, Noah D Brenowitz ***@***.***> wrote:
Thanks for the suggestions @mrocklin <https://github.com/mrocklin>. I am
not very familiar with the dask internals, but I am willing to make a stab
at it, since this would be very useful to me. I will take some time to
understand how the opportunistic caching plugin works.
Ultimately, it would be also very useful to let the user specify the exact
file-type (and possibly path) that they want the data to be stored on disk
as. This would then enable including calls to command line tools as part of
the graph, like in the following image.
<https://camo.githubusercontent.com/45d2cf1cfda0090581b235db57f9a5a961e33d0f/687474703a2f2f692e696d6775722e636f6d2f3634544d6241632e706e67>
Is this something that the could be handled using delayed?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#2119 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszNr8NC4tY5EDGSqpSHKaDJhg9hh0ks5ro_wLgaJpZM4MnVKL>
.
|
Thanks. Re: marking and caching individual nodes, I implemented a simple Callback with a cache = Saved()
np.random.seed(seed=0)
r= rand(100,100)
a = da.from_array(r, (50,50))
b = a**2
c = cache.save(b + a) # just returns a + b, while storing its keys
d = c**10
with cache:
d.compute() Unfortunately, it seems like |
Okay. I think I can make |
Actually, that doesn't work because |
You can avoid optimization by adding the keyword Docstring:
|
Thanks. I did see that command line option, and have been working on a |
@mrocklin I have been thinking some more about how to integrate native dask operations with external command line utilities. Stringing together several external calls using dask.delayed should be pretty straightforward, but reading the output of this data back into With this functionality, it would be possible to implement a really slick interface for combining command line tools with
|
See from_delayed
<http://dask.pydata.org/en/latest/array-api.html#dask.array.from_delayed>
…On Thu, Apr 6, 2017 at 6:29 PM, Noah D Brenowitz ***@***.***> wrote:
@mrocklin <https://github.com/mrocklin> I have been thinking some more
about how to integrate native dask operations with external command line
utilities. Stringing together several external calls using dask.delayed
should be pretty straightforward, but reading the output of this data back
into dask is not straightforward. To do this, I think there needs to be
some way to store/read a dask array from a delayed object (like a
filename). For example, if f is a function returning a filename, it would
be nice to be able to load data like this da.from_array(delayed(f)(*args),
...). Do you think this would be difficult to implement?
With this functionality, it would be possible to implement a really slick
interface for combining command line tools with dask.array operations. I
have something like the following in mind
# start with dask arrays a and b
@shell(chunks_shape='preserve')
def f(input, output):
subprocess.check_call(["external_command", input, output])
return output
# some python operations...
c = a+b
# The @shell decorator automatically saves c to disk, runs f, and loads output.
# to do this it would need some sort of delayed vesion
d = f(c)
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#2119 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszBJxh6E9KA_M28Tjt9-P6cvkQ8Btks5rtWcsgaJpZM4MnVKL>
.
|
Thank you! Unfortunately, the docs say that "the dask array will consist of a single chunk", which won't scale for large datasets, but I guess I could concatenate the individually chunks manually to construct the full array.. What about a delayed |
Yeah, you need to construct many delayed tasks that produce many numpy
arrays, call da.from_delayed and then call da.concatenate or da.stack.
Here is an example:
http://matthewrocklin.com/blog/work/2017/01/17/dask-images
…On Thu, Apr 6, 2017 at 7:18 PM, Noah D Brenowitz ***@***.***> wrote:
Thank you!
Unfortunately, the docs say that "the dask array will consist of a single
chunk", which won't scale for large datasets, but I guess I could
concatenate the individually chunks manually to construct the full array..
What about a delayed store to a delayed object (like an hdf5 file which
has yet to be created)? I could imagine this need arising if all the
workers cannot see the same filesystem, and need to be able to dump a dask
array to disk for processing with some external utiliity.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#2119 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszIRUSx3BMqXuZ2jbR_woaP_XCVNdks5rtXLigaJpZM4MnVKL>
.
|
Ok that seems like a reasonable approach for loading arrays. Hopefully it
wouldn't incur too many performance issues for large arrays. What about
storing to a delayed task which, say, creates an hdf file and returns a
variable where the data will be stored.
…-Noah
|
ok. I made a small modification to |
Have you seen da.store?
…On Thu, Apr 6, 2017 at 9:21 PM, Noah D Brenowitz ***@***.***> wrote:
ok. I made a small modification to dask.array.insert_to_ooc which allows
storing a dask array to a delayed task, I also added a simple test. Would
you be open to a PR?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#2119 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszNEdhnZryS243nGVIzpVaND1eqN7ks5rtY-wgaJpZM4MnVKL>
.
|
Indeed I have, but it doesn't work when the output array is a delayed value, so I had to modify |
The use case I am thinking of is this
Currently, the store method is not fully lazy because the output array needs to be created when the graph is constructed. My modification allows the output dataset to be created when the graph is executed. |
Yes, supporting delayed values in the .store method seems like a good idea
to me.
…On Thu, Apr 6, 2017 at 10:28 PM, Noah D Brenowitz ***@***.***> wrote:
The use case I am thinking of is this
@delayed
def create_on_disk_temporary():
f = h5py.File(...)
v = f.create_dataset(...)
return v
delayed_store =x.store(create_on_disk_temporary(), compute=False)
Currently, the store method is not fully lazy because the output array
needs to be created when the graph is constructed. My modification allows
the output dataset to be created when the graph is executed.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#2119 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszJYPiM27Dc1qQEkz5F2Zw7mPh1qXks5rtZ9LgaJpZM4MnVKL>
.
|
I thought some more about it, and it might be better for the delayed to return a generator which yields the storage array, and then performs any necessary cleanup like closing files, etc. This is a bit trickier to implement though. |
Generators are a no-go. Results of tasks should be concrete.
…On Fri, Apr 7, 2017 at 11:41 AM, Noah D Brenowitz ***@***.***> wrote:
I thought some more about it, and it might be better for the delayed to
return a generator which yields the storage array, and then performs any
necessary cleanup like closing files, etc. This is a bit trickier to
implement though.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#2119 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszLZnt81gS2cKhIcrZprJt7bZH88qks5rtlk3gaJpZM4MnVKL>
.
|
Ok. my current code should work then. I guess it would be possible for users to keep track of the opened files in some sort of queue, and close them if necessary. |
Hello, Should it make sens to have a persistency mechanism for
where |
+1 I would love to see this functionality in dask. The below is a brain dump; I hope it doesn't drag on too much (skip to the end if you don't want context). @mrocklin following up on python-streamz/streamz#14, when building a data pipeline in the financial modelling / trading space, my typical workflow is something like
I've used Luigi pretty extensively in the past, and also joblibs memory.cache, and my thoughts are: Luigi
joblib.cache
Cons:
I think I am would ideally like to have the following for my specific pipeline needs:
So finally, my actual questions:
Edit:
|
What is your objective for a disk-based cache? Are you worried about failure? Or do you want to permanently store every intermediate value?
You might want to take a look at the current visualization code in the dask.distributed scheduler here: https://github.com/dask/distributed/tree/master/distributed/bokeh
That probably depends on how you decide to build your DAG system (which as I understand it is separate from Dask's task graph). I hope that whatever you build I get to see eventually :) It would be interesting to see your situation more concretely |
Permanently store immediate values. Typically this data is all time series, so this would actually be intermediate Dataframes. We have multiple complex DAGs (sharing certain components), so the ability to just rerun all graphs, and intelligently fill in gaps, or invalidate changes (changes to operations/functions could come from end users) would be amazing. We typically run a DAG, then fit a model and evaluate, so the ability to make some changes (perhaps to multiple operations/functions) and rerun the entire DAG for model evaluation is very powerful.
In the past it has been separate but I would much prefer it to be a dask task graph. Assuming it is in dask, how would I go about invalidating children? |
FWIW, since I opened this issue I have been using snakemake lately for this stuff, and for each step of the workflow, I may or may not use dask. While it seems redundant to use two separate DAG systems--- an inner one for computations using dask, and an outer dependency graph between input/output files using snakemake---it might be a lot of work to implement some the conveniences of snakemake (e.g. job submission, re-runnable workflows, make style modification time based rerunning) using dask. @mrocklin Do think these kinds of things are within the scope of this project? Or should dask only be used within the individual steps of a luigi/make/snakemake workflow? |
@nbren12 what has been your experience with snakemake thus far? What do you see as the downside? The appeal of dask IMO is the low task overhead and the intelligent scheduler. These two things are somewhat important, which is why I am even considering trying to fit dask to my problem rather than just using Luigi (or snakemake etc). It may be that this work is outside of the scope of dask/distributed and the answer lies somewhere between snakemake and dask but it is not clear to me yet. |
Have been giving this issue a lot of thought as it is of particular interest for my research for a variety of use cases generally in the realm of data lineage and data versioning. Namely what operations have we performed on a piece of data through a workflow, what were the various intermediate results in that process, how do those results relate to each other, and how can others reproduce my work. If we want to get very technical, the lineage goes all the way back to data acquisition, but I think that part gets very workflow specific and out of scope for Dask. Something that we did very recently was add support for reusing results after they have been stored. ( #2980 ) This does allow one to store a computational result and then reload it later. This comes in two forms. First a persist to disk solution, which triggers a computation that is written out immediately and can be chained in later computations. Second a lazy storing solution, which allows chaining computations and storage operations until some final compute step triggers the full run. Neither of these exactly equals caching, but I believe it should be possible to build something on top of that functionality that does reuse the store for caching. One way of tackling this problem would be to build off the fact that Dask creates a sort of Merkle DAG where each node gets a unique name. The only thing we need to ensure is that the first node of that DAG (loading the input data) is deterministically generated from the input data and not randomly assigned. This is easy enough for anyone to do by overriding the Now all we need is some way to replace Dask nodes with values from this It should be pretty simple to include an outline of this solution in Dask so that it is pretty easy to pick up and reuse for different types of |
@jakirkham I am glad other have been thinking about this issue! I think you have outlined a plausible way to implement this idea of dask.arrays. Do you think a custom callback would be better than an array plugin here? Also, I am concerned about the robustness of inspecting the dask graph based on names. I have lately been thinking about making dask play well with luigi target objects. Luigi already has wide-array of targets that represent different types of persistent data stores (e.g. database, file system, etc). I think we can write a simple API for this that looks like from luigi import Target
a = ... # dask object with name 'a'
tgt = LocalTarget("some/file/path")
b = targeted(a, tgt, reader=..., writer=...)
with TargetedCallback():
b.compute() Behind the scenes, the graph of {'b': (load_if_exists_or_call, 'a', tgt, reader, writer),
'a': ...} Then, the callback would search the graph for This approach should work with arbitrary dask objects as long as a reader/writer functions are given that work with the dask object. For instance, this could easily work immediately with |
It is probably simpler to just move the graph editing to |
This function follows the discussion laid out on dask/dask#2119. This function simply changes the dask graph of the returned value, and does not need to be used with a callback. However, this is something that would probably be work adding since it could show exactly the dependencies which need to be run.
I just made a little github project which follows the API I suggest above. I am calling it dask.targeted for now, but I am open to any naming suggestions. So far I have the basic API I outlined above working. Ultimately, it might be nice to use multiple dispatching to allow a syntax like |
Sorry for the slow reply. Have had a lot of meetings of late.
The main advantage of a custom callback would be to enable substitution of values in the Dask graph with ones from the cache without engagement from the developer/user.
What concerns come to mind?
Can certainly see the value in that. Luigi does have a nice API. At least for me, I'd like to tackle this without adding Luigi as a dependency and think it should be possible.
So I think this is really where the plugin shines. Namely it separates the concerns of operating on the graph from the graph's contents. It also avoids editing the graph per se as it can just replace a node in the graph with its contents loaded from disk. |
No problem.
I don't really see how we can generate keys in a deterministic way when the functions are not pure. The only feasible option I can imagine is to have the user manually specify a good identifier to a persistent resource.
I agree. A MutableMapping interface could be sufficient, but I think luigi targets can really add a lot. For example, you do things like write to a remote file over ssh very easily. Maybe it is possible to implement a MutableMapping front end to arbitrary luigi targets.
I think an argument can be made either way about which approach is cleaner, since plugins edit the graph at construction time, while callbacks "optimize" the graph at evaluation time. However, callbacks are probably more flexible because they can work with any dask object, and not just arrays. In addition, we can do things like analyze the whole graph and print useful logging information at evaluation time. FWIW I think the callback I wrote is fairly straightforward. |
As another option to consider, @mrocklin suggested to me last week that overriding a |
Wanted to share an Array Cache that can be used with Dask Distributed. Much like has been discussed here, the interest in this cache would be to store intermediate values that have long term value while allowing for their retrieval. As there is a lot of value in being able to name the intermediate value placed in the cache, found the |
Is this issue still being considered? I think it's priceless. As mentioned multiple times in the docs, dask with a distributed scheduler is a great for running workloads also on a single machine. Persisting on disk can also be very useful when RAM is limited but computation time is long + you want transparent caching to disk without having to explicitly write out a parquet file or such. |
Graphchain might be of interest to this thread. It is a caching optimiser for dask graphs. Some of its features:
It can be used as a dask graph optimiser (e.g., There's two years worth of comments here, so I'm not sure if it fits all the use cases described, but if you have any feature requests we'd be happy to take a look! |
This looks a little stale, but including a note here since I spent all weekend trying to do similar things! As long as your distributed nodes have access to a shared filesystem, you can use joblib.memory cached functions directly with dask. The cached functions gets pickled/sent as normal, and the workers apply the normal memory logic when calling the function. For example:
could be cached at the individual function call level with
and as long as all of the workers see the same cachedir in their working directory it works out. This scaled very nicely (>10k tasks, >200 workers) since the cache logic was distributed and solved most of the problems I was facing. I was able to get graphchain to work for simpler cases, but for map operations on partitions it looked like
Hope this helps someone, and this might be related to dask/distributed#3811 |
It might be worth filing an issue with the |
After some thought, I now think that dask and a larger workflow scheduler occupy different niches. Yes tools like Dask and airflow both represent tasks as a DAG, but the similarity ends there. Tools like airflow schedule and observe individual long-running tasks, whereas I think dask is focused on coordinating a swarm of tiny tasks. That said, the ability to move data to disk for global shuffles is a really nice feature of spark. |
I think that makes Prefect particularly interesting as it uses dask for the scheduling of larger workflows. That said, there's additional engineering overhead in understanding and using Prefect or larger schedulers, but joblib+dask is pretty easy to explain/use! |
Agreed. joblib + dask will work really well for non-distributed computations. |
It seems like this discussion has reached a reasonable conclusion. Please feel free to open a documentation pull request if there is a place where you think this conversation can be distilled. I am closing this issue now. |
I brought this issue/feature request on an issue on at the cachey repo and @mrocklin suggested that I open up a new issue here.
It would be useful to add a persistent file-system caching mechanism to
dask
, to complement its in-memory caching ability. While the main advantage of the in-memory caching is to speed up computations by saving redundant ouputs, a file-system based caching is more about maintaining persistence across sessions, and potential saving interesting intermediate outputs. This is particularly important when developing computational pipelines, because there are frequently bugs in new code which cause crashes. And it is also useful in maintaining persistence between ipython or jupyter notebook sessions. For example, suppose I am developing a workflow in a typical exploratory fashion whereA
is some dask object:The
correct_but_expensive_function
could successfully finish after 5 minutes work, but thesketchy_under_development_function
might use up all the memory on my linux desktop and force me to quit the process, if I am lucky, or hard-reset the computer, if I am unlucky.There are many tools out there which are used for automatic scientific workflows by having the users specify some sort of dependency graph between files. These tools are especially popular in bioinformatics, but are broadly applicable in many disciplines. Some popular tools for this include make, luigi, Snakemake, nextflow, and many others. I like these tools a lot, but it can be tedious breaking up a lengthy python script into pieces and manually specifing the dependency graph by hand. This effort seems especially redudant because
dask
already builds a computational graph behind the scenes.It would be nice to manually specify nodes of a given dask graph that should be cached and be automatically reloaded from disk or recomputed depending on certain conditions. In theory this could provide a very convenient
Make
-like replacement which does not require manually specifying file names and allows one to use python data structures. Moreover, some simple wrappers arounddask.delayed
could be written that allow users to run external command line utilities on the intermediate outputs.Possible syntax
Some possible syntax for this could be something like this:
The conditions for reloading or rerunning the
a+b
computation should depend on the type ofcache
object. Some nice caching reloading/ recomputing conditions could be things likeAnother possible syntax, which is slightly more verbose, could be a dict-like cache object:
Automatic memoization of functions using some heuristic like Cachey does could be useful, but I am personally okay with manually indicating which steps should be saved to disk.
The text was updated successfully, but these errors were encountered: