Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1964. Fuse tasks that form reductions (aka diamond fuse) #1979

Merged
merged 32 commits into from Feb 24, 2017

Conversation

@eriknw
Copy link
Contributor

commented Feb 12, 2017

This trades parallelism opportunities for faster scheduling by making tasks less granular.

This is a very rough cut. The code is not well tested and probably broken, so I don't recommend exercising it yet. Nevertheless, there may be value in sharing this early and allowing for discussions to take place.

There are many options (and opinions) for what the parameterization of this function should be. For the sake of experimentation, I have included many parameters as described below. My gut feeling right now is that we only need one parameter, ave_width, and to choose a reasonable value of max_depth_new_edges based on ave_width (this controls against pathologies while allowing desirable behavior).

This optimization operation is more general than "single input, single output". It applies to all reductions, so it may be "multiple input, single output". I have attempted to make this well-behaved and robust against pathologies and surprises. Notably, by allowing multiple inputs (what I refer to as edges in the code), there may be parallelism opportunities w.r.t. the edges that are otherwise not captured by analyzing the fusible reduction tasks alone. I added a cheap-to-compute heuristic that is conservative; i.e., it will never underestimate the degree of edge parallelism. This heuristic increases the value compared against min_width--a measure of parallizability--which is one of the reasons I prefer min_width.

I think it will be easy for this operation to supersede fuse, so we won't have to call fuse before calling fuse_reductions. I'm ignoring task renaming for now.

Parameters
----------
dsk: dict
    dask graph
keys: list or set
    Keys that must remain in the dask graph
ave_width: float
    Limit for `width = num_nodes / height`, a good measure of parallelizability
max_depth_new_edges: int
    Don't fuse if new dependencies are added after this many levels
max_height: int
    Don't fuse more than this many levels
max_width: int
    Don't fuse if total width is greater than this

(No CC's, because it's the weekend!)

eriknw added some commits Feb 12, 2017

WIP #1964. Probably broken. Fuse tasks that form reductions.
This trades parallelism opportunities for faster scheduling by making
tasks less granular.

There are many options (and opinions) for what the parameterization
of this function should be.  For the sake of experimentation, I have
included many parameters as described below.  My gut feeling right
now is that we only need one parameter, `ave_width`, and to choose
a reasonable value of `max_depth_new_edges` based on `ave_width`
(this controls against pathologies while allowing desirable behavior).

This optimization operation is more general than "single input, single
output".  It applies to all reductions, so it may be "multiple input,
single output".  I have attempted to make this well-behaved and
robust against pathologies.  Notably, by allowing multiple inputs
(what I refer to as `edges` in the code), there may be parallelism
opportunities w.r.t. the edges that are otherwise not captured by
analyzing the fusible reduction tasks alone.  I added a cheap-to-
compute heuristic that is conservative; i.e., it will never
underestimate the degree of edge parallelism.  This heuristic
increases the value compared against `min_width`--a measure of
parallizability--which is one of the reasons I prefer `min_width`.

I think it will be easy for this operation to supercede `fuse`.
I'm ignoring task renaming for now.

Parameters
----------
dsk: dict
    dask graph
keys: list or set
    Keys that must remain in the dask graph
ave_width: float
    Limit for `width = num_nodes / height`, a good measure of
    parallelizability.
max_depth_new_edges: int
    Don't fuse if new dependencies are added after this many levels
max_height: int
    Don't fuse more than this many levels
max_width: int
    Don't fuse if total width is greater than this
Another test, another fix.
I'm beginning to get confused.  I think I should make the traversal strategy clearer.
Another test, another fix.
I'm still a little confused and should rework things to be more explicit.
Sorry for the tangle of logic with implicit behaviors.
@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 13, 2017

Was playing with this locally (despite non-prime-time warning)

In [1]: import dask.array as da
In [2]: from dask_glm.logistic import sigmoid, logistic_gradient

In [3]: beta = da.ones((2,), chunks=(2,))
   ...: X = da.ones((20, 2), chunks=(5, 2))
   ...: y = da.ones((20,), chunks=(5,))
   ...: 

In [4]: z = logistic_gradient(beta, X, y)

In [5]: from dask.dot import dot_graph
In [7]: dot_graph(z.dask, filename='before.png')
Out[7]: <IPython.core.display.Image object>

In [8]: from dask.optimize import fuse_reductions
In [9]: dsk = fuse_reductions(z.dask, z._keys())
In [10]: dot_graph(dsk, filename='after.png')
Out[10]: <IPython.core.display.Image object>

Before

before

Normal fuse

fuse

Fancy fuse

after

@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 13, 2017

Timing check:

In [16]: beta = da.ones((2,), chunks=(2,))
    ...: X = da.ones((2000, 2), chunks=(5, 2))
    ...: y = da.ones((2000,), chunks=(5,))
    ...: 

In [17]: %time z = logistic_gradient(beta, X, y)
/home/mrocklin/workspace/dask/dask/array/core.py:464: RuntimeWarning: overflow encountered in true_divide
  o = func(*args, **kwargs)
CPU times: user 40 ms, sys: 0 ns, total: 40 ms
Wall time: 38.9 ms

In [18]: len(z.dask)
Out[18]: 4803

In [19]: %time dsk = fuse_reductions(z.dask, z._keys())
CPU times: user 88 ms, sys: 4 ms, total: 92 ms
Wall time: 89.4 ms

In [20]: len(dsk)
Out[20]: 802

So we're at around 20us per task, which is definitely a win in terms of performance.

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 13, 2017

Cool, thanks for taking a look!

eriknw added some commits Feb 14, 2017

Remove the use of `irreducible` and change `deps` to sets.
We now allow for fusing reducible tasks below tasks that weren't fused, but
only in strict circumstances (such as allowing for linear fusing).
@mrocklin mrocklin referenced this pull request Feb 14, 2017

eriknw added some commits Feb 14, 2017

Handle linear fusing so this can replace `fuse` optimization.
This actually performs a superset of linear fusing that `fuse` performs.
Specifically, `fuse_reductions(dsk, ave_width=1)` will fuse linear tasks
that may refer to the inputs used in the top-level task.  In `fuse`, we
only fuse chains with single-input, single-output.
No more `continue`s or implied fall-through behavior. More explicit a…
…nd clear?

I think the clarity and cleanliness is worth the cost of sometimes performing
`item = list.pop()` followed by `list.append(item)`, which is what we avoided
doing previously.
@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 14, 2017

Okay, the code has finally reached a state of tidiness that I'm no longer embarrassed for others to look at it. @jcrist, this should be much cleaner than when you looked at it yesterday and Sunday. I think it's ready for review w.r.t. clarity, which I think is very important. Suggestions and requests for more code comments welcome.

I plan to add more tests with multiple inputs next (probably tonight). The heuristics needed exercised.

max_height = len(dsk)

# TODO: accept `dependencies=` keyword and return updated dependencies
deps = {k: get_dependencies(dsk, k, as_list=True) for k in dsk}

This comment has been minimized.

Copy link
@mrocklin

mrocklin Feb 14, 2017

Member

Supporting taking in and passing through dependencies would help with benchmarking. I suspect that carrying this around would also slow down development though, so no rush while things are in transition.

This comment has been minimized.

Copy link
@mrocklin

mrocklin Feb 14, 2017

Member

Also, FWIW, the dask.array test suite passed a few commits ago with this replacing normal fuse. So at least it didn't break anything :)

This comment has been minimized.

Copy link
@eriknw

eriknw Feb 14, 2017

Author Contributor

Agreed. This should behave the same as fuse. This may be cheap to add now. Give me a minute.

This comment has been minimized.

Copy link
@eriknw

eriknw Feb 14, 2017

Author Contributor

Added. Barely tested.

This comment has been minimized.

Copy link
@eriknw

eriknw Feb 14, 2017

Author Contributor

Okay, the returned dependencies should now be working correctly. My earlier attempt failed. fuse_reductions now passes the fuse tests (sans key-renaming).

eriknw added some commits Feb 14, 2017

@@ -427,6 +428,179 @@ def fuse_getitem(dsk, func, place):
lambda a, b: tuple(b[:place]) + (a[2], ) + tuple(b[place + 1:]))


def fuse_reductions(dsk, keys=None, ave_width=2, max_depth_new_edges=None,
max_height=None, max_width=None):

This comment has been minimized.

Copy link
@mrocklin

mrocklin Feb 14, 2017

Member

We'll want ways to specify these from dask.set_options and compute calls. I think the convention we started to use was to prepend the name of the optimization like the following:

x.compute(fuse_ave_width=5)

Although adding all of these to the various compute methods seems tedious. Perhaps check for the presence of dask.context._globals['fuse_ave_width'] instead so that we can do the following:

with dask.set_options(fuse_ave_width=5):
    x.compute()

This comment has been minimized.

Copy link
@mrocklin

mrocklin Feb 14, 2017

Member

Although I can also do this later.

This comment has been minimized.

Copy link
@eriknw

eriknw Feb 14, 2017

Author Contributor

What's the order of precedence? What if the parameter exists in _globals but is None? Is there common practice here yet?

This comment has been minimized.

Copy link
@eriknw

eriknw Feb 14, 2017

Author Contributor

Done. Note that None never means no bound (well, except for max_height, which has a lenient default).

@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 14, 2017

@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 14, 2017

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 14, 2017

I've become more comfortable keeping all the parameters to control behavior, but ave_width should still be considered the primary one.

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 14, 2017

For this particular function, the falsey values None and 0 aren't in the valid domain of any of the parameters, so things work out fine. If one wants no bound, just give an excessively large value.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 20, 2017

We may want to include a single linear fusion along with this. See the following example:

In [1]: import dask.array as da

In [2]: x = da.ones(10, chunks=(5,))

In [3]: z = da.exp(x + 1) - 2*x

In [4]: z.visualize('pre-optimize.png', optimize_graph=False)
Out[4]: <IPython.core.display.Image object>

In [5]: z.visualize('post-optimize.png', optimize_graph=True)
Out[5]: <IPython.core.display.Image object>

Without optimization

pre-optimize

With optimization

I've hard coded ave_width to five

post-optimize

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2017

Hmm, it may make sense to have the old fuse with simple linear fusing when visualizaing graphs.

Thoughts on increasing average width based on the size of the graph? We probably don't want to fuse graphs of size less than 100 or so, but larger graphs in the thousands might benefit substantially from fusion without us losing a significant ability to parallelize.

I don't think this logic belongs in fuse/fuse_reductions. Seems reasonable to have it in the optimize functions of dask collections.

Are there reasonable schemes to renaming that are less costly, even if they are less accurate?

Meh. Maybe I'm low on brain juices, but it's not slow enough for me to care atm. Do you have any benchmarks that would make me care?

We may want to include a single linear fusion along with this.

What do you mean? The optimized graphs may look linear and fusible, but they're not fused because e.g. ('wrapped-#1', 0) is used twice by ('add-exp-mul-sub-#0', 0). @jcrist was recently talking (maybe brainstorming out-loud) about a way to work around this limitation.

Thanks for taking this for a test drive!

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2017

Are there reasonable schemes to renaming that are less costly, even if they are less accurate?

How about a sorted set of keys returned by key_split except for the root node, which is the last item in the joined keys? This should be cheap enough to compute, nice enough to look at, and well-behaved enough to test.

Experiment with a cheaper task renamer:
a sorted set of keys returned by key_split except for the root node, which is
the last item in the joined keys.  This should be cheap enough to compute,
nice enough to look at, and well-behaved enough to test.  Of course, this
also breaks existing tests that check renamed keys, which will need fixed
if this scheme is approved.
@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2017

Okay, I just pushed a cheaper version of task renaming. Let me know if you prefer this (or some other strategy). I'm going to wait to update the tests until we decide.

In one of my benchmarks, this dropped the time from 7 to 5 units of time, and the original fuse is about 4.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 20, 2017

This is the benchmark that I'm using: https://gist.github.com/5bfca026a68194eb85d09d02f6b8abfd

I notice a similar moderate drop. It looks like we're about half bound by key_split.

@jcrist

This comment has been minimized.

Copy link
Member

commented Feb 20, 2017

How about a sorted set of keys returned by key_split except for the root node, which is the last item in the joined keys?

How does this handle non-equivalent operations with the same set of names? E.g.:

inc = lambda x: x + 1
func = lambda x, y: 2 * x + y

dsk1 = {'a-1': 1,
        'b-1': 2,
        'c-1': (inc, 'a-1'),
        'd-1': (inc, 'b-1'),
        'res-1': (func, 'c-1', 'd-1')}

dsk2 = {'a-2': 1,
        'b-2': 2,
        'c-2': (inc, 'a-2'),
        'd-2': (inc, 'b-2'),
        'res-2': (func, 'd-2', 'c-2')}   # arguments are swapped here

If I understand correctly, both of these would rename to the same prefix, even though they are technically different operations.

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2017

You understand correctly Jim. The new keys would be a-b-c-d-res-1 and a-b-c-d-res-2. We don't traverse the graphs deterministically, yet we want the key names to be deterministic. What do you suggest?

@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 20, 2017

For what it's worth I don't see a problem with occasional collisions in key-prefix as long as the keys themselves are unique. Prefix collisions aren't great, but they're also not terrible.

@jcrist

This comment has been minimized.

Copy link
Member

commented Feb 20, 2017

Erik and I just had a discussion about this. I agree that the above case isn't terrible, I was just curious about it. One case that might be less-than optimal is fusing of repeated calls. In the current implementation the following ends up with the same prefix of 'f' for any number of calls.

dsk1 = {'f-1': (f, 1),
        'f-2': (f, 'f-1'),
        'f-3': (f, 'f-2'), ...}

Since prefixes are used for the timing of tasks in distributed, this may lead to wildly different times. One cheapish way around this would be to also include the number of calls in the prefix. So (f, (f, (f, x))) would have the prefix 'f3' or something.

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2017

I'm intrigued by the idea of including multiplicity (if >1) in renamed keys. Should we do, e.g., 'a*3-b*2-c-1' or 'a3-b2-c-1'? Would either one impact how keys are used by schedulers?

@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 20, 2017

FWIW I prefer a-b-c to a3-b2-c4 aesthetically. I'm not super concerned yet about conflicts here. I haven't seen an application yet where this is likely to cause an issue.

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 20, 2017

No problem. a-b-c-1 is clean. I was more intrigued by capturing more information--the multiplicity--rather than avoiding conflicts.

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 21, 2017

I just pushed an experimental method to perform even fancier fusing for chains of tasks that reference previous tasks multiple times. An example of this was shown a few posts prior.

Here's an example that performs two optimization passes:

In [1]: import dask
   ...: import time
   ...: from dask.optimize import fuse_reductions
   ...: 
   ...: def f(*args):
   ...:     time.sleep(1)
   ...:     return args
   ...: 

In [2]: d1 = {
   ...:     'a': 1,
   ...:     'b1': (f, 'a'),
   ...:     'b2': (f, 'a'),
   ...:     'c': (f, 'b1', 'b2'),
   ...:     'd1': (f, 'c'),
   ...:     'd2': (f, 'c'),
   ...:     'e': (f, 'd1', 'd2'),
   ...:     'f1': (f, 'e', 'e'),
   ...:     'f2': (f, 'e', 'e'),
   ...:     'g': (f, 'f1', 'f2'),
   ...: }
   ...: keys = 'g'
   ...: 

In [3]: %time rv = dask.get(d1, 'g')
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 9.01 s

In [4]: dask.visualize(d1)

d1

In [5]: d2, deps2 = fuse_reductions(d1, ave_width=2, keys=keys)

In [6]: %time assert dask.get(d2, keys) == rv
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 9.01 s

In [7]: dask.visualize(d2)

d2

In [8]: d3, deps3 = fuse_reductions(d2, ave_width=1, dependencies=deps2, keys=keys, is_fancy=True, rename_keys=False)

In [9]: %time assert dask.get(d3, keys) == rv
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 9.01 s

In [10]: dask.visualize(d3)

d3

In [11]: d3
Out[11]: 
{'g': (<function dask.optimize.fancy_get>,
  {'keys': ('f1-f2-g',), 'task': 'f1-f2-g'},
  (<function dask.optimize.fancy_get>,
   {'keys': ('d1-d2-e',),
    'task': (<function __main__.f>,
     (<function __main__.f>, 'd1-d2-e', 'd1-d2-e'),
     (<function __main__.f>, 'd1-d2-e', 'd1-d2-e'))},
   (<function dask.optimize.fancy_get>,
    {'keys': ('b1-b2-c',),
     'task': (<function __main__.f>,
      (<function __main__.f>, 'b1-b2-c'),
      (<function __main__.f>, 'b1-b2-c'))},
    (<function dask.optimize.fancy_get>,
     {'keys': ('a',),
      'task': (<function __main__.f>,
       (<function __main__.f>, 'a'),
       (<function __main__.f>, 'a'))},
     1))))}

Beginning again with d1:

In [12]: d4, deps4 = fuse_reductions(d1, ave_width=2, keys=keys, is_fancy=True)

In [13]: %time assert dask.get(d4, keys) == rv
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 9.01 s

In [14]: dask.visualize(d4)  # Same as d2

In [15]: d4
Out[15]: 
{'a': 1,
 'b1-b2-c': (<function dask.optimize.fancy_get>,
  {'keys': ('a', 'b1', 'b2'), 'task': (<function __main__.f>, 'b1', 'b2')},
  'a',
  (<function __main__.f>, 'a'),
  (<function __main__.f>, 'a')),
 'd1-d2-e': (<function dask.optimize.fancy_get>,
  {'keys': ('c', 'd1', 'd2'), 'task': (<function __main__.f>, 'd1', 'd2')},
  'b1-b2-c',
  (<function __main__.f>, 'b1-b2-c'),
  (<function __main__.f>, 'b1-b2-c')),
 'f1-f2-g': (<function dask.optimize.fancy_get>,
  {'keys': ('e', 'f1', 'f2'), 'task': (<function __main__.f>, 'f1', 'f2')},
  'd1-d2-e',
  (<function __main__.f>, 'd1-d2-e', 'd1-d2-e'),
  (<function __main__.f>, 'd1-d2-e', 'd1-d2-e')),
 'g': 'f1-f2-g'}

In [16]: d5, deps5 = fuse_reductions(d4, ave_width=1, dependencies=deps4, keys=keys, is_fancy=True, rename_keys=False)

In [17]: %time assert dask.get(d5, keys) == rv
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 9.01 s

In [18]: dask.visualize(d5)  # same as d3

In [19]: d5
Out[19]: 
{'g': (<function dask.optimize.fancy_get>,
  {'keys': ('f1-f2-g',), 'task': 'f1-f2-g'},
  (<function dask.optimize.fancy_get>,
   {'keys': ('d1-d2-e',),
    'task': (<function dask.optimize.fancy_get>,
     {'keys': ('e', 'f1', 'f2'), 'task': (<function __main__.f>, 'f1', 'f2')},
     'd1-d2-e',
     (<function __main__.f>, 'd1-d2-e', 'd1-d2-e'),
     (<function __main__.f>, 'd1-d2-e', 'd1-d2-e'))},
   (<function dask.optimize.fancy_get>,
    {'keys': ('b1-b2-c',),
     'task': (<function dask.optimize.fancy_get>,
      {'keys': ('c', 'd1', 'd2'), 'task': (<function __main__.f>, 'd1', 'd2')},
      'b1-b2-c',
      (<function __main__.f>, 'b1-b2-c'),
      (<function __main__.f>, 'b1-b2-c'))},
    (<function dask.optimize.fancy_get>,
     {'keys': ('a',),
      'task': (<function dask.optimize.fancy_get>,
       {'keys': ('a', 'b1', 'b2'),
        'task': (<function __main__.f>, 'b1', 'b2')},
       'a',
       (<function __main__.f>, 'a'),
       (<function __main__.f>, 'a'))},
     1))))}

Thanks @jcrist for the idea. I hope you can make this better!

My goal for this week is to get this PR merged, and I don't expect this fancy fusing to be included. Still, I thought this was worth sharing especially given how little work it was!

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 22, 2017

Okay, I think this is ready to merge. I went with the cheap key renamer, and I renamed the new fuse function to fuse.

We can address "fancy" fusing--i.e., fusing dependencies that are referenced multiple times--elsewhere. I don't have the bandwidth this week to pursue it.

Similarly, I think we can address choosing parameters for fuse based on e.g. size of the graph elsewhere. I doubt I'll do this.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 22, 2017

I'm happy with this. I plan to issue a release before merging though just so we can have some time with it in master before it reaches the more general public. Should happen sometime today.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 22, 2017

Sorry, should have read "I'm very happy with this"

@mrocklin

This comment has been minimized.

Copy link
Member

commented Feb 22, 2017

Thank you for your effort here @eriknw . That's quite the piece of code.

@eriknw

This comment has been minimized.

Copy link
Contributor Author

commented Feb 22, 2017

Thanks! It was interesting (except for key renaming, grrr). You know how to nerd-snipe me ;) . And, yeah, I would wait until releasing to merge this.

@jcrist

This comment has been minimized.

Copy link
Member

commented Feb 24, 2017

Merging now that 0.14 has been released. Thanks!

@jcrist jcrist merged commit 76d237a into dask:master Feb 24, 2017

2 checks passed

continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details

@sinhrks sinhrks added this to the 0.14.1 milestone Mar 30, 2017

@eriknw eriknw referenced this pull request Sep 15, 2018
3 of 3 tasks complete
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.