Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't optimize HighLevelGraph (delayed object) #6219

Open
rabernat opened this issue May 18, 2020 · 13 comments
Open

Can't optimize HighLevelGraph (delayed object) #6219

rabernat opened this issue May 18, 2020 · 13 comments
Labels
highlevelgraph Issues relating to HighLevelGraphs.

Comments

@rabernat
Copy link
Contributor

I am trying to optimize a simple read / store pipeline by fusing the read and write tasks into a single operation.

Here is a concrete example:

import zarr
import dask
import dask.array as dsa

store_source = {}
shape = (8, 8)
source_chunks = (1, 8) 
read_chunks = (2, 8)
dtype = 'f4'

a_source = zarr.ones(shape, chunks=source_chunks,
                     dtype=dtype, store=store_source)
a_source.attrs['foo'] = 'bar'
d_source = dsa.from_zarr(a_source, chunks=read_chunks)

target_store = {}
a_target = zarr.empty(shape, chunks=source_chunks, dtype=dtype, store=target_store)
a_target.attrs.update(a_source.attrs)
store_future = dsa.store(d_source, a_target, lock=False, compute=False)

dask.visualize(store_future)

image

I would like to fuse the from-zarr and store tasks into one task.

However, I can't apply any optimizations to my graph, because of this error:

from dask.optimization import fuse
store_future_opt = dask.optimize(store_future, optimizations=[fuse])

raises

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-45-3afc3be4ea23> in <module>
      1 from dask.optimization import fuse
----> 2 store_future_opt = dask.optimize(store_future, optimizations=[fuse])

~/opt/miniconda3/envs/pangeo-py38/lib/python3.8/site-packages/dask/base.py in optimize(*args, **kwargs)
    366         return args
    367 
--> 368     dsk = collections_to_dsk(collections, **kwargs)
    369     postpersists = [
    370         a.__dask_postpersist__() if is_dask_collection(a) else (None, a) for a in args

~/opt/miniconda3/envs/pangeo-py38/lib/python3.8/site-packages/dask/base.py in collections_to_dsk(collections, optimize_graph, **kwargs)
    212 
    213         for opt in optimizations:
--> 214             groups = {k: (opt(dsk, keys), keys) for k, (dsk, keys) in groups.items()}
    215 
    216         dsk = merge(

~/opt/miniconda3/envs/pangeo-py38/lib/python3.8/site-packages/dask/base.py in <dictcomp>(.0)
    212 
    213         for opt in optimizations:
--> 214             groups = {k: (opt(dsk, keys), keys) for k, (dsk, keys) in groups.items()}
    215 
    216         dsk = merge(

~/opt/miniconda3/envs/pangeo-py38/lib/python3.8/site-packages/dask/optimization.py in fuse(dsk, keys, dependencies, ave_width, max_width, max_height, max_depth_new_edges, rename_keys, fuse_subgraphs)
    530         return dsk, deps
    531 
--> 532     rv = dsk.copy()
    533     fused_trees = {}
    534     # These are the stacks we use to store data as we traverse the graph

AttributeError: 'HighLevelGraph' object has no attribute 'copy'

(Something similar was seen in radix-ai/graphchain#36.)

As a workaround, I tried to do the optimization on the low level graph:

dsk_fused = fuse(store_future.dask.dicts)
dask.visualize(dsk_fused)

image

However, I don't know how to go from this back to a delayed object that I can compute. The documentation on optimization does not really explain how to go back and forth between Dask's various datatypes and this low-level graph.

I feel I am missing some important concept ere, but I can't figure out what.

Dask version 2.10.1.

@jcrist
Copy link
Member

jcrist commented May 18, 2020

The functions in dask.optimization currently don't know anything about HighLevelGraph objects, they're meant to work with the underlying dict interface instead. You'll need to use dask.utils.ensure_dict to convert to raw dictionaries beforehand. The optimization and HighLevelGraph interfaces kind of evolved organically to fit our internal needs - there are definitely rough edges for external use.

Apologies, but I don't have time right now to respond to your actual issue of fusing these tasks, but if you get stuck ping again and I can try to take a look.

@rabernat
Copy link
Contributor Author

You'll need to use dask.utils.ensure_dict to convert to raw dictionaries beforehand.

Thanks for the tip. I think figured out some version of this in my last code example. But what I don't understand is how to go back to a Delayed object from the raw optimized graph:

dsk_fused = fuse(dask.utils.ensure_dict(store_future.dask))
dask.compute(dsk_fused) # doesn't compute, just returns a the same dict

@TomAugspurger
Copy link
Member

TomAugspurger commented May 18, 2020

@rabernat Delayed can be built from a dask graph.

In [1]: from dask.delayed import Delayed

In [2]: def inc(x): return x + 1

In [3]: dsk = {'data': 0, 'result': (inc, 'data')}

In [4]: r = Delayed("result", dsk)

In [5]: r.compute()
Out[5]: 1

Looking into why it doesn't fuse now. I have a guess.

Edit: For "why doesn't this fuse", I think it's because we don't call fuse on Delayed collections. With this diff

diff --git a/dask/delayed.py b/dask/delayed.py
index 2d9ba5fdd..eba1da8c7 100644
--- a/dask/delayed.py
+++ b/dask/delayed.py
@@ -13,7 +13,7 @@ from .compatibility import is_dataclass, dataclass_fields
 
 from .core import quote
 from .context import globalmethod
-from .optimization import cull
+from .optimization import cull, fuse
 from .utils import funcname, methodcaller, OperatorMethodMixin, ensure_dict, apply
 from .highlevelgraph import HighLevelGraph
 
@@ -456,8 +456,9 @@ def right(method):
 
 def optimize(dsk, keys, **kwargs):
     dsk = ensure_dict(dsk)
-    dsk2, _ = cull(dsk, keys)
-    return dsk2
+    dsk2, deps = cull(dsk, keys)
+    dsk3, deps = fuse(dsk2, keys, dependencies=deps)
+    return dsk3
 
 
 def rebuild(dsk, key, length):

Then we get fusion, assuming the ave-width is configured appropriately (`>= 4 in this case).

with dask.config.set(**{"optimization.fuse.ave-width": 4}):
    display(store_future.visualize(optimize_graph=True, verbose=True))

Screen Shot 2020-05-18 at 10 33 21 AM

Should we be fusing Delayed (thoughts @eriknw?). I think not fusing is the proper default, if we view delayed as specifying an appropriate point for parallelization. Or perhaps we might be able to specify conditions under which we should fuse?

@rabernat
Copy link
Contributor Author

Thanks for the tip @TomAugspurger! Seems obvious in retrospect, and is probably well documented. With that hint, I got to

from dask.delayed import Delayed
delayed_fused = Delayed("read-and-store", dsk_fused)
delayed_fused.compute()

However, this brought me a new error:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-74-07ddfa95cb06> in <module>
      1 from dask.delayed import Delayed
      2 delayed_fused = Delayed("read-and-store", dsk_fused)
----> 3 delayed_fused.compute()

~/opt/miniconda3/envs/pangeo-py38/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    163         dask.base.compute
    164         """
--> 165         (result,) = compute(self, traverse=False, **kwargs)
    166         return result
    167 

~/opt/miniconda3/envs/pangeo-py38/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    431     )
    432 
--> 433     dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
    434     keys = [x.__dask_keys__() for x in collections]
    435     postcomputes = [x.__dask_postcompute__() for x in collections]

~/opt/miniconda3/envs/pangeo-py38/lib/python3.8/site-packages/dask/base.py in collections_to_dsk(collections, optimize_graph, **kwargs)
    209     if optimize_graph:
    210         groups = groupby(optimization_function, collections)
--> 211         groups = {opt: _extract_graph_and_keys(val) for opt, val in groups.items()}
    212 
    213         for opt in optimizations:

~/opt/miniconda3/envs/pangeo-py38/lib/python3.8/site-packages/dask/base.py in <dictcomp>(.0)
    209     if optimize_graph:
    210         groups = groupby(optimization_function, collections)
--> 211         groups = {opt: _extract_graph_and_keys(val) for opt, val in groups.items()}
    212 
    213         for opt in optimizations:

~/opt/miniconda3/envs/pangeo-py38/lib/python3.8/site-packages/dask/base.py in _extract_graph_and_keys(vals)
    237         graph = HighLevelGraph.merge(*graphs)
    238     else:
--> 239         graph = merge(*map(ensure_dict, graphs))
    240 
    241     return graph, keys

~/opt/miniconda3/envs/pangeo-py38/lib/python3.8/site-packages/dask/utils.py in ensure_dict(d)
   1021                 seen.add(dd_id)
   1022         return result
-> 1023     return dict(d)
   1024 
   1025 

ValueError: dictionary update sequence element #0 has length 10; 2 is required

I have concluded that the way forward for many of my problems is to become more hands-on with dask optimizations, so this issue represents me trying to figure out how to apply them. Sorry if the issue tracker is not the ideal place for this to play out. I do feel that this reveals some opportunities for improving the documentation.

@rabernat
Copy link
Contributor Author

Screen Shot 2020-05-18 at 10 33 21 AM

I'm not sure this is the graph I want though. I want to fuse vertically, not horizontally. I don't want all my reads in one task and all my writes in the next task. Although this would reduce the total number of tasks, it would make my memory and communication problems worse.

Instead, I want four distinct from-zarr-store tasks.

Does this make sense?

@TomAugspurger
Copy link
Member

To your error, fuse returns a tuple of dsk, dependencies, and the key passed to Delayed needs to be in the task graph. Try

dsk_fused, deps = fuse(dask.utils.ensure_dict(store_future.dask))
delayed_fused = Delayed(store_future._key, dsk_fused)
delayed_fused.compute()

I'm thinking more about your desired task graph.

@mrocklin
Copy link
Member

mrocklin commented May 18, 2020 via email

@rabernat
Copy link
Contributor Author

😀 your code worked and gave me exactly what I wanted

delayed_fused.visualize()

image

@jcrist
Copy link
Member

jcrist commented May 18, 2020

Should we be fusing Delayed (thoughts @eriknw?). I think not fusing is the proper default, if we view delayed as specifying an appropriate point for parallelization. Or perhaps we might be able to specify conditions under which we should fuse?

My 2 cents: I think we should leave fuse off for delayed (in my experience long linear chains of Delayed tasks are rarer), but for workloads that compute (like store) we should handle this optimization explicitly. That said, if fuse overhead is low enough, maybe it doesn't matter if we enable it for Delayed.

I'm not sure this is the graph I want though. I want to fuse vertically, not horizontally. I don't want all my reads in one task and all my writes in the next task. Although this would reduce the total number of tasks, it would make my memory and communication problems worse.

The issue was the configuration, if you drop setting the width, you get a single fused chain for each branch.

@jcrist
Copy link
Member

jcrist commented May 18, 2020

My 2 cents: I think we should leave fuse off for delayed (in my experience long linear chains of Delayed tasks are rarer), but for workloads that compute (like store) we should handle this optimization explicitly.

Rethinking this - most workloads using Delayed to build graphs are paying a speed cost anyway (working with Delayed is more ergonomic, but slower than using the raw graph api). As such, adding the small overhead of fuse always being on seems minor. I'll toss up a PR.

@TomAugspurger
Copy link
Member

TomAugspurger commented May 18, 2020

@jcrist to make sure I understand: you'll explicitly call fuse in da.store before creating the Delayed that's returned? If so, then +1.

@mrocklin
Copy link
Member

mrocklin commented May 18, 2020 via email

@jcrist
Copy link
Member

jcrist commented May 18, 2020

There are two fixes here:

  • Use fuse in delayed's optimize routine. Compared to the cost of using delayed to build the graphs, the cost of applying the optimization is negligible. In this case, this fixes @rabernat's issue. See Add fuse to delayed object optimization #6222 for this.
  • Call optimize() in store beforehand to optimize the array explicitly before converting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
highlevelgraph Issues relating to HighLevelGraphs.
Projects
None yet
Development

No branches or pull requests

5 participants