Skip to content

Commit

Permalink
Repeat optimize_blockwise for diamond fusion (#4492)
Browse files Browse the repository at this point in the history
Fixes #4373
  • Loading branch information
mrocklin committed Feb 18, 2019
1 parent b293ed8 commit 9e61bbb
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 6 deletions.
1 change: 0 additions & 1 deletion dask/array/tests/test_atop.py
Expand Up @@ -122,7 +122,6 @@ def test_optimize_blockwise():
assert len([layer for layer in dsk.dicts.values() if isinstance(layer, Blockwise)]) == 1


@pytest.mark.xfail(reason="we only look for y-splits, not for total dependencies")
def test_blockwise_diamond_fusion():
x = da.ones(10, chunks=(5,))
y = (((x + 1) + 2) + 3)
Expand Down
9 changes: 9 additions & 0 deletions dask/array/tests/test_optimization.py
Expand Up @@ -286,3 +286,12 @@ def test_gh3937():
y = da.coarsen(np.sum, y, {0: 2})
# How to trigger the optimizer explicitly?
y.compute()


def test_double_dependencies():
x = np.arange(56).reshape((7, 8))
d = da.from_array(x, chunks=(4, 4))
X = d + 1
X = da.dot(X, X.T)

assert_eq(X.compute(optimize_graph=False), X)
40 changes: 36 additions & 4 deletions dask/blockwise.py
Expand Up @@ -44,7 +44,6 @@ def blockwise_token(i, prefix='_'):
def blockwise(func, output, output_indices, *arrind_pairs, **kwargs):
""" Create a Blockwise symbolic mutable mapping
This is like the ``make_blockwise_graph`` function, but rather than construct a dict, it
returns a symbolic Blockwise object.
Expand Down Expand Up @@ -120,6 +119,28 @@ class Blockwise(Mapping):
dictionaries because we are able to fuse them during optimization,
sometimes resulting in much lower overhead.
Parameters
----------
output: str
The name of the output collection. Used in keynames
output_indices: tuple
The output indices, like ``('i', 'j', 'k')`` used to determine the
structure of the block computations
dsk: dict
A small graph to apply per-output-block. May include keys from the
input indices.
indices: Tuple[str, Tuple[str, str]]
An ordered mapping from input key name, like ``'x'``
to input indices, like ``('i', 'j')``
Or includes literals, which have ``None`` for an index value
numblocks: Dict[key, Sequence[int]]
Number of blocks along each dimension for each input
concatenate: boolean
Whether or not to pass contracted dimensions as a list of inputs or a
single input to the block function
new_axes: Dict
New index dimensions that may have been created, and their extent
See Also
--------
dask.blockwise.blockwise
Expand Down Expand Up @@ -177,8 +198,8 @@ def make_blockwise_graph(func, output, out_indices, *arrind_pairs, **kwargs):
""" Tensor operation
Applies a function, ``func``, across blocks from many different input
dasks. We arrange the pattern with which those blocks interact with sets
of matching indices. E.g.::
collections. We arrange the pattern with which those blocks interact with
sets of matching indices. E.g.::
make_blockwise_graph(func, 'z', 'i', 'x', 'i', 'y', 'i')
Expand Down Expand Up @@ -390,7 +411,7 @@ def lol_tuples(head, ind, values, dummies):
for v in dummies[ind[0]]]


def optimize_blockwise(full_graph, keys=()):
def optimize_blockwise(graph, keys=()):
""" High level optimization of stacked Blockwise layers
For operations that have multiple Blockwise operations one after the other, like
Expand All @@ -415,6 +436,14 @@ def optimize_blockwise(full_graph, keys=()):
--------
rewrite_blockwise
"""
out = _optimize_blockwise(graph, keys=keys)
while out.dependencies != graph.dependencies:
graph = out
out = _optimize_blockwise(graph, keys=keys)
return out


def _optimize_blockwise(full_graph, keys=()):
keep = {k[0] if type(k) is tuple else k for k in keys}
layers = full_graph.dicts
dependents = core.reverse_dict(full_graph.dependencies)
Expand Down Expand Up @@ -450,6 +479,9 @@ def optimize_blockwise(full_graph, keys=()):
if layers[dep].concatenate != layers[layer].concatenate:
stack.append(dep)
continue
if sum(k == dep for k, ind in layers[layer].indices if ind is not None) > 1:
stack.append(dep)
continue

# passed everything, proceed
blockwise_layers.add(dep)
Expand Down
1 change: 0 additions & 1 deletion dask/dataframe/tests/test_optimize_dataframe.py
Expand Up @@ -52,7 +52,6 @@ def test_fuse_ave_width():
assert len(b) <= 15


@pytest.mark.xfail(reason="need better high level fusion")
def test_optimize_blockwise():
from dask.array.optimization import optimize_blockwise
df = pd.DataFrame({'x': range(10), 'y': range(10)})
Expand Down

0 comments on commit 9e61bbb

Please sign in to comment.