Skip to content

Commit

Permalink
Array/DataFrame optimization requires HLG
Browse files Browse the repository at this point in the history
In dask#8452 I realized that an incorrect pattern had emerged from dask#6510 of including
```python
    if not isinstance(dsk, HighLevelGraph):
        dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
```
in optimization functions. Specifically, `id(dsk)` is incorrect as the layer name here. The layer name must match the `.name` of the resulting collection that gets created by `__dask_postpersist__()`, otherwise `__dask_layers__()` on the optimized collection will be wrong. Since `optimize` doesn't know about collections and isn't passed a layer name, the only reasonable thing to do here is to error when given a low-level graph.
This is safe to do for Arrays and DataFrames, since their constructors convert any low-level graphs to HLGs.

This PR doesn't really fix anything—the code path removed should be unused—but it eliminates a confusing pattern that has already wandered its way into other places dask#8316 (comment).
  • Loading branch information
gjoseph92 committed Dec 13, 2021
1 parent 27a7928 commit d2be0ce
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 14 deletions.
6 changes: 5 additions & 1 deletion dask/array/optimization.py
Expand Up @@ -40,7 +40,11 @@ def optimize(
keys = list(flatten(keys))

if not isinstance(dsk, HighLevelGraph):
dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
# NOTE: we cannot convert to a HLG here, because we don't know the proper
# layer name. So an Array re-constructed from the HLG could have a mismatch between
# its `.name` and the layer name in the HLG.
# `Array.__new__` ensures all Arrays use HLGs, so this case should be impossible in normal use.
raise TypeError("Array optimization can only be performed on high-level graphs")

dsk = optimize_blockwise(dsk, keys=keys)
dsk = fuse_roots(dsk, keys=keys)
Expand Down
17 changes: 17 additions & 0 deletions dask/array/tests/test_optimization.py
Expand Up @@ -455,3 +455,20 @@ def test_fuse_roots_annotations():
assert {"foo": "bar"} in [l.annotations for l in hlg.layers.values()]
za = da.Array(hlg, z.name, z.chunks, z.dtype)
assert_eq(za, z)


def test_optimization_requires_hlg():
x = da.ones(10, chunks=(2,)) + 1
dsk = x.dask.to_dict()
assert isinstance(dsk, dict)

with pytest.raises(TypeError, match="high-level graphs"):
x.__dask_optimize__(dsk, x.__dask_keys__())

# Ensure Arrays constructed from low-level graphs still work
x_from_lowlevel = da.Array(dsk, x.name, x.chunks, x.dtype, x._meta)
assert isinstance(x_from_lowlevel.dask, HighLevelGraph)
# ^ `Array.__new__` converts to HLG
assert tuple(x_from_lowlevel.dask.layers) == x_from_lowlevel.__dask_layers__()
(x_opt,) = dask.optimize(x_from_lowlevel)
assert_eq(x, x_opt)
18 changes: 12 additions & 6 deletions dask/dataframe/optimize.py
Expand Up @@ -16,12 +16,18 @@ def optimize(dsk, keys, **kwargs):
keys = list(core.flatten(keys))

if not isinstance(dsk, HighLevelGraph):
dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
else:
# Perform Blockwise optimizations for HLG input
dsk = optimize_dataframe_getitem(dsk, keys=keys)
dsk = optimize_blockwise(dsk, keys=keys)
dsk = fuse_roots(dsk, keys=keys)
# NOTE: we cannot convert to a HLG here, because we don't know the proper
# layer name. So a _Frame re-constructed from the HLG could have a mismatch between
# its `.name` and the layer name in the HLG.
# `_Frame/Scalar.__init__` converts low-level graphs to HLGs, so this case should be impossible in normal use.
raise TypeError(
"DataFrame optimization can only be performed on high-level graphs"
)

# Perform Blockwise optimizations for HLG input
dsk = optimize_dataframe_getitem(dsk, keys=keys)
dsk = optimize_blockwise(dsk, keys=keys)
dsk = fuse_roots(dsk, keys=keys)
dsk = dsk.cull(set(keys))

# Do not perform low-level fusion unless the user has
Expand Down
50 changes: 43 additions & 7 deletions dask/dataframe/tests/test_optimize_dataframe.py
@@ -1,14 +1,11 @@
import pandas as pd
import pytest

import dask
import dask.dataframe as dd

dsk = {
("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]),
("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]),
("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}, index=[9, 9, 9]),
}
dfs = list(dsk.values())
from dask.dataframe.core import Scalar
from dask.dataframe.utils import assert_eq
from dask.highlevelgraph import HighLevelGraph


def test_fuse_ave_width():
Expand Down Expand Up @@ -38,3 +35,42 @@ def test_optimize_blockwise():
graph = optimize_blockwise(ddf.dask)

assert len(graph) <= 4


def test_optimization_requires_hlg():
df = pd.DataFrame({"x": range(10)})
df = dd.from_pandas(df, npartitions=5) + 1

dsk = df.dask.to_dict()
assert isinstance(dsk, dict)

with pytest.raises(TypeError, match="high-level graphs"):
df.__dask_optimize__(dsk, df.__dask_keys__())

# Ensure DataFrames constructed from low-level graphs still work
df_from_lowlevel = dd.DataFrame(dsk, df._name, df._meta, df.divisions)
assert isinstance(df_from_lowlevel.dask, HighLevelGraph)
# ^ `_Frame.__init__` converts to HLG
assert tuple(df_from_lowlevel.dask.layers) == df_from_lowlevel.__dask_layers__()
(df_opt,) = dask.optimize(df_from_lowlevel)
assert_eq(df, df_opt)

# Ensure Series constructed from low-level graphs still work
s = df.x
dsk = s.dask.to_dict()
s_from_lowlevel = dd.Series(dsk, s._name, s._meta, s.divisions)
assert isinstance(s_from_lowlevel.dask, HighLevelGraph)
# ^ `_Frame.__init__` converts to HLG
assert tuple(s_from_lowlevel.dask.layers) == s_from_lowlevel.__dask_layers__()
(df_opt,) = dask.optimize(s_from_lowlevel)
assert_eq(s, df_opt)

# Ensure Scalars constructed from low-level graphs still work
sc = s.sum()
dsk = sc.dask.to_dict()
sc_from_lowlevel = Scalar(dsk, sc._name, sc._meta)
assert isinstance(sc_from_lowlevel.dask, HighLevelGraph)
# ^ `Scalar.__init__` converts to HLG
assert tuple(sc_from_lowlevel.dask.layers) == sc_from_lowlevel.__dask_layers__()
(df_opt,) = dask.optimize(sc_from_lowlevel)
assert_eq(sc, df_opt)

0 comments on commit d2be0ce

Please sign in to comment.