Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A HighLevelGraph abstract layer for map_overlap #7595

Merged
merged 22 commits into from Jun 10, 2021

Conversation

GenevieveBuckley
Copy link
Contributor

@GenevieveBuckley GenevieveBuckley commented Apr 23, 2021

This PR introduces a high level graph layer for array overlaps.

Note: there is no actual optimization included in this PR, we've just delayed materializing the whole task graph until a later point. We expect that delaying graph materialization is still a semi-useful thing to do, and that this PR sets us up for further work with optimizations in the future.

  • Tests added / passed
  • Passes black dask / flake8 dask / isort dask

Related discussion: #7404

@github-actions github-actions bot added the array label Apr 23, 2021
@GenevieveBuckley GenevieveBuckley marked this pull request as draft April 23, 2021 08:42
@mrocklin
Copy link
Member

Also adding @rjzamora

@ian-r-rose
Copy link
Collaborator

Thanks @GenevieveBuckley! I should be able to take a look today

Comment on lines 59 to 63
def __iter__(self):
return iter(self._dict)

def __len__(self):
return len(self._dict)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we plan on these not needing to materialize self._dict? Maybe we'd want a get_output_keys() method similar to Blockwise. In particular, __len__ gets called by _repr_html_, so materializing the graph in there can make display in a notebook slow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are good things to flag, thanks @gjoseph92

Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for getting started on this @GenevieveBuckley, I think if we go down the path of doing a full implementation of map_overlap (which involves both rechunking and slicing) it would be too much to bite off. But if we can identify a useful subset with, e.g., no rechunking and constant boundary conditions, we'd have a good start.

Do you know if those conditions are met by some of the imaging applications you are interested in?

dask/array/overlap.py Show resolved Hide resolved
dask/array/overlap.py Show resolved Hide resolved
dask/array/overlap.py Show resolved Hide resolved
@GenevieveBuckley
Copy link
Contributor Author

@emilmelnikov - while this PR isn't much to look at, Ian and I have had one meeting today (and another planned later this week) to work out the best way to tackle things.

@GenevieveBuckley
Copy link
Contributor Author

FYI, I had a chance to try moving the guts of the overlap_internal function into the _construct_graph method of the OverlapArrayLayer class.

My current problem is working out how to add dependencies to that task graph (essentially, the input array 'x' was specified as a dependency originally when the code was using HighLevelGraph.from_collections. I still haven't worked out how to do this part, so that the graph I'm making is something that can be computed. 

@GenevieveBuckley
Copy link
Contributor Author

Good news, bad news

Good news: the dependencies are fixed now (thanks Ian!), and things also work using LocalCluster.

Bad news: We've only made a tiny, tiny dent in the time it takes. We need things to be orders of magnitude less than they are to be comparable with map_blocks. I used Emil's benchmarking script from here: #7404

@GenevieveBuckley
Copy link
Contributor Author

We could try and delay more of the graph-building steps, but I suspect we might need to look at how to improve slicing too (sorry @ian-r-rose I know you were hoping to avoid that)

@ian-r-rose
Copy link
Collaborator

Bad news: We've only made a tiny, tiny dent in the time it takes. We need things to be orders of magnitude less than they are to be comparable with map_blocks. I used Emil's benchmarking script from here: #7404

Interesting, I get a huge speedup on your branch using Emil's benchmark! When I use main I get ~15 seconds to construct the last graph, and when I use this branch I ~0.5 seconds (30x!).

@ian-r-rose
Copy link
Collaborator

I would note: I'll bet we can make this even faster by moving the computation of the keys into the _compute_graph() function as well

@GenevieveBuckley
Copy link
Contributor Author

Bad news: We've only made a tiny, tiny dent in the time it takes. We need things to be orders of magnitude less than they are to be comparable with map_blocks. I used Emil's benchmarking script from here: #7404

Interesting, I get a huge speedup on your branch using Emil's benchmark! When I use main I get ~15 seconds to construct the last graph, and when I use this branch I ~0.5 seconds (30x!).

Huh, that's very different. ...Turns out I have trouble telling the 8 and 0 characters apart with the terminal font, and read that character as a leading zero before the decimal point. Doh! I get roughly a 24x speedup.

# On the main dask branch
dask.__version__ = '2021.04.1+11.gbcb2fcab'
src.shape = (2048, 2048, 2048)
src.chunks = (64, 64, 64)
src.nbytes / (2**30) = 8.0
create dask array 'a' from zarr array 'src': 0.050859681 seconds
len(a.__dask_graph__()) = 32769
map_blocks dask array 'a' to dask array 'b': 0.000711317989 seconds
len(b.__dask_graph__()) = 65537
map_overlap dask array 'a' to dask array 'c': 8.85817131 seconds
len(c.__dask_graph__()) = 994425

# On the map-overlap-HLG branch
dask.__version__ = '2021.04.0+28.g87050e94'
src.shape = (2048, 2048, 2048)
src.chunks = (64, 64, 64)
src.nbytes / (2**30) = 8.0
create dask array 'a' from zarr array 'src': 0.046892996 seconds
len(a.__dask_graph__()) = 32769
map_blocks dask array 'a' to dask array 'b': 0.00107047 seconds
len(b.__dask_graph__()) = 65537
map_overlap dask array 'a' to dask array 'c': 0.363535573 seconds
len(c.__dask_graph__()) = 994425

That's roughly ~24x faster (it's only faster for this first bit, we've just delayed the slowness caused by materializing the graph until later on).

Note, currently we're just delaying materializing the graph. So it's not faster overall, you still have to wait for that when things are computed. We'll try to optimize that a bit later on.

@emilmelnikov it would be useful to hear how much benefit or not you'd get just from shifting the slow parts to later on. Do you know if that's useful by itself, or if it's critical to have the overall time including computation improved, too?

@GenevieveBuckley
Copy link
Contributor Author

I would note: I'll bet we can make this even faster by moving the computation of the keys into the _compute_graph() function as well

I've just tried moving a __dask_keys__() method into our new class, but I don't think that has helped us.

Interestingly, it doesn't seem to matter as much as I expected exactly which boundary condition you choose. Maybe a lot of the other slowness is due to all the sanity checks we see around the place? I'll need to do some line profiling to work out where the slowest bits are.

@GenevieveBuckley
Copy link
Contributor Author

Perhaps this will help us figure out what to target next:

Details - click to expand!
In [7]: cProfile.run("result = dask.array.map_overlap(identity, a, depth=1, boundary='none')", sort=2)
         724712 function calls (724648 primitive calls) in 0.317 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.317    0.317 {built-in method builtins.exec}
        1    0.000    0.000    0.317    0.317 <string>:1(<module>)
        1    0.000    0.000    0.317    0.317 overlap.py:691(map_overlap)
        2    0.165    0.082    0.316    0.158 core.py:445(map_blocks)
        1    0.000    0.000    0.316    0.316 overlap.py:307(trim_internal)
   131072    0.028    0.000    0.035    0.000 core.py:786(<genexpr>)
   131072    0.026    0.000    0.034    0.000 core.py:808(<genexpr>)
    32768    0.024    0.000    0.024    0.000 core.py:793(<listcomp>)
65583/65578    0.019    0.000    0.024    0.000 functools.py:947(__get__)
    32768    0.015    0.000    0.015    0.000 core.py:779(<dictcomp>)
    32768    0.015    0.000    0.015    0.000 core.py:803(<listcomp>)
   163979    0.011    0.000    0.011    0.000 {method 'get' of 'dict' objects}
    98340    0.008    0.000    0.008    0.000 core.py:1297(chunks)
    32788    0.003    0.000    0.003    0.000 {method 'items' of 'dict' objects}
        3    0.000    0.000    0.001    0.000 blockwise.py:12(blockwise)
        5    0.000    0.000    0.001    0.000 core.py:1121(__new__)
        1    0.000    0.000    0.001    0.001 overlap.py:909(<listcomp>)
        1    0.000    0.000    0.000    0.000 overlap.py:579(overlap)
        3    0.000    0.000    0.000    0.000 core.py:5188(new_da_object)
        5    0.000    0.000    0.000    0.000 utils.py:1489(has_keyword)
        6    0.000    0.000    0.000    0.000 core.py:2625(normalize_chunks)
        5    0.000    0.000    0.000    0.000 inspect.py:3103(signature)
        5    0.000    0.000    0.000    0.000 inspect.py:2851(from_callable)
      7/5    0.000    0.000    0.000    0.000 inspect.py:2218(_signature_from_callable)
        1    0.000    0.000    0.000    0.000 overlap.py:259(overlap_internal)
       24    0.000    0.000    0.000    0.000 core.py:2784(<genexpr>)
        5    0.000    0.000    0.000    0.000 highlevelgraph.py:577(from_collections)
       11    0.000    0.000    0.000    0.000 utils.py:32(meta_from_array)
        3    0.000    0.000    0.000    0.000 base.py:784(tokenize)
        3    0.000    0.000    0.000    0.000 blockwise.py:78(blockwise)
      541    0.000    0.000    0.000    0.000 {built-in method builtins.isinstance}
        1    0.000    0.000    0.000    0.000 utils.py:127(compute_meta)
        2    0.000    0.000    0.000    0.000 inspect.py:1736(_signature_get_partial)
        5    0.000    0.000    0.000    0.000 core.py:1431(ndim)
        1    0.000    0.000    0.000    0.000 chunk.py:145(trim)
     41/8    0.000    0.000    0.000    0.000 utils.py:507(__call__)
        1    0.000    0.000    0.000    0.000 core.py:1679(__getitem__)
        5    0.000    0.000    0.000    0.000 core.py:1271(shape)
        5    0.000    0.000    0.000    0.000 inspect.py:2124(_signature_from_function)
       24    0.000    0.000    0.000    0.000 {built-in method builtins.all}
       21    0.000    0.000    0.000    0.000 slicing.py:1317(cached_cumsum)
       20    0.000    0.000    0.000    0.000 core.py:1273(<genexpr>)
        1    0.000    0.000    0.000    0.000 core.py:2463(rechunk)
        3    0.000    0.000    0.000    0.000 highlevelgraph.py:557(_from_collection)
        1    0.000    0.000    0.000    0.000 slicing.py:849(normalize_index)
        1    0.000    0.000    0.000    0.000 rechunk.py:186(rechunk)
        3    0.000    0.000    0.000    0.000 overlap.py:903(assert_int_chunksize)
        5    0.000    0.000    0.000    0.000 base.py:840(normalize_object)
       78    0.000    0.000    0.000    0.000 abc.py:96(__instancecheck__)
       23    0.000    0.000    0.000    0.000 numeric.py:1782(isscalar)
        3    0.000    0.000    0.000    0.000 base.py:804(normalize_dict)
        3    0.000    0.000    0.000    0.000 blockwise.py:239(__init__)
       21    0.000    0.000    0.000    0.000 inspect.py:2489(__init__)
        5    0.000    0.000    0.000    0.000 highlevelgraph.py:543(__init__)
      3/2    0.000    0.000    0.000    0.000 base.py:852(normalize_function)
        1    0.000    0.000    0.000    0.000 base.py:868(_normalize_function)
        9    0.000    0.000    0.000    0.000 config.py:436(get)
      291    0.000    0.000    0.000    0.000 overlap.py:904(<genexpr>)
      579    0.000    0.000    0.000    0.000 {built-in method math.isnan}
        5    0.000    0.000    0.000    0.000 highlevelgraph.py:552(<dictcomp>)
       15    0.000    0.000    0.000    0.000 slicing.py:1307(_cumsum)
     12/3    0.000    0.000    0.000    0.000 base.py:819(normalize_seq)
        3    0.000    0.000    0.000    0.000 base.py:876(<genexpr>)
       78    0.000    0.000    0.000    0.000 {built-in method _abc._abc_instancecheck}
        2    0.000    0.000    0.000    0.000 inspect.py:3039(bind_partial)
     12/3    0.000    0.000    0.000    0.000 base.py:821(func)
        2    0.000    0.000    0.000    0.000 overlap.py:876(coerce)
        4    0.000    0.000    0.000    0.000 highlevelgraph.py:61(__init__)
        2    0.000    0.000    0.000    0.000 inspect.py:2901(_bind)
        1    0.000    0.000    0.000    0.000 core.py:345(apply_infer_dtype)
        7    0.000    0.000    0.000    0.000 inspect.py:2772(__init__)
        2    0.000    0.000    0.000    0.000 overlap.py:879(<listcomp>)
       10    0.000    0.000    0.000    0.000 inspect.py:2555(replace)
        1    0.000    0.000    0.000    0.000 utils.py:131(<listcomp>)
       83    0.000    0.000    0.000    0.000 {built-in method builtins.hasattr}
        3    0.000    0.000    0.000    0.000 slicing.py:918(check_index)
        5    0.000    0.000    0.000    0.000 utils.py:1110(is_arraylike)
       13    0.000    0.000    0.000    0.000 {built-in method builtins.any}
        2    0.000    0.000    0.000    0.000 core.py:412(normalize_arg)
        1    0.000    0.000    0.000    0.000 overlap.py:235(_overlap_internal_chunks)
        2    0.000    0.000    0.000    0.000 overlap.py:922(coerce_depth)
      3/2    0.000    0.000    0.000    0.000 delayed.py:46(unpack_collections)
        4    0.000    0.000    0.000    0.000 _ufunc_config.py:39(seterr)
        5    0.000    0.000    0.000    0.000 core.py:1288(_chunks)
        3    0.000    0.000    0.000    0.000 slicing.py:38(sanitize_index)
        1    0.000    0.000    0.000    0.000 core.py:377(<listcomp>)
        5    0.000    0.000    0.000    0.000 typing.py:868(__new__)
        8    0.000    0.000    0.000    0.000 {built-in method builtins.sorted}
        2    0.000    0.000    0.000    0.000 _ufunc_config.py:441(__enter__)
        7    0.000    0.000    0.000    0.000 inspect.py:493(unwrap)
      230    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
        1    0.000    0.000    0.000    0.000 highlevelgraph.py:451(__init__)
       16    0.000    0.000    0.000    0.000 base.py:154(is_dask_collection)
       21    0.000    0.000    0.000    0.000 enum.py:313(__call__)
       14    0.000    0.000    0.000    0.000 <frozen importlib._bootstrap>:389(parent)
        1    0.000    0.000    0.000    0.000 core.py:3520(unify_chunks)
        1    0.000    0.000    0.000    0.000 numeric.py:166(ones)
        5    0.000    0.000    0.000    0.000 contextlib.py:238(helper)
        2    0.000    0.000    0.000    0.000 inspect.py:2865(replace)
      165    0.000    0.000    0.000    0.000 {built-in method builtins.len}
       25    0.000    0.000    0.000    0.000 core.py:1200(_reset_cache)
        9    0.000    0.000    0.000    0.000 config.py:39(canonical_name)
        3    0.000    0.000    0.000    0.000 utils.py:1153(is_dataframe_like)
        9    0.000    0.000    0.000    0.000 blockwise.py:66(index_subs)
        2    0.000    0.000    0.000    0.000 inspect.py:1838(_signature_is_builtin)
        3    0.000    0.000    0.000    0.000 overlap.py:944(coerce_boundary)
       21    0.000    0.000    0.000    0.000 slicing.py:1303(__hash__)
        5    0.000    0.000    0.000    0.000 contextlib.py:82(__init__)
       38    0.000    0.000    0.000    0.000 utils.py:91(<genexpr>)
        2    0.000    0.000    0.000    0.000 _ufunc_config.py:446(__exit__)
        1    0.000    0.000    0.000    0.000 core.py:769(<listcomp>)
        3    0.000    0.000    0.000    0.000 blockwise.py:108(<dictcomp>)
        9    0.000    0.000    0.000    0.000 slicing.py:20(_sanitize_index_element)
        1    0.000    0.000    0.000    0.000 <__array_function__ internals>:2(copyto)
       41    0.000    0.000    0.000    0.000 utils.py:481(dispatch)
        5    0.000    0.000    0.000    0.000 contextlib.py:117(__exit__)
      3/2    0.000    0.000    0.000    0.000 utils.py:716(funcname)
        1    0.000    0.000    0.000    0.000 core.py:773(<listcomp>)
        1    0.000    0.000    0.000    0.000 delayed.py:27(unzip)
        1    0.000    0.000    0.000    0.000 warnings.py:165(simplefilter)
        2    0.000    0.000    0.000    0.000 core.py:666(<listcomp>)
        5    0.000    0.000    0.000    0.000 contextlib.py:108(__enter__)
        4    0.000    0.000    0.000    0.000 _ufunc_config.py:139(geterr)
        2    0.000    0.000    0.000    0.000 inspect.py:1850(_signature_is_functionlike)
       16    0.000    0.000    0.000    0.000 inspect.py:2821(<genexpr>)
        7    0.000    0.000    0.000    0.000 blockwise.py:71(<listcomp>)
       24    0.000    0.000    0.000    0.000 core.py:2744(<genexpr>)
       14    0.000    0.000    0.000    0.000 {built-in method builtins.next}
        3    0.000    0.000    0.000    0.000 slicing.py:813(normalize_slice)
        3    0.000    0.000    0.000    0.000 {built-in method numpy.empty}
       15    0.000    0.000    0.000    0.000 blockwise.py:74(blockwise_token)
        2    0.000    0.000    0.000    0.000 inspect.py:90(ismethoddescriptor)
        1    0.000    0.000    0.000    0.000 sizeof.py:41(sizeof_python_collection)
       29    0.000    0.000    0.000    0.000 {built-in method builtins.getattr}
       21    0.000    0.000    0.000    0.000 slicing.py:1290(__init__)
        4    0.000    0.000    0.000    0.000 core.py:1214(numblocks)
        1    0.000    0.000    0.000    0.000 {built-in method numpy.core._multiarray_umath.implement_array_function}
        3    0.000    0.000    0.000    0.000 utils.py:1163(is_series_like)
        2    0.000    0.000    0.000    0.000 {built-in method builtins.sum}
      4/1    0.000    0.000    0.000    0.000 slicing.py:700(posify_index)
        4    0.000    0.000    0.000    0.000 copy.py:66(copy)
        1    0.000    0.000    0.000    0.000 warnings.py:181(_add_filter)
        6    0.000    0.000    0.000    0.000 utils.py:1157(<genexpr>)
        6    0.000    0.000    0.000    0.000 utils.py:1030(ensure_dict)
        4    0.000    0.000    0.000    0.000 overlap.py:639(<genexpr>)
       14    0.000    0.000    0.000    0.000 {method 'rpartition' of 'str' objects}
       35    0.000    0.000    0.000    0.000 {method 'pop' of 'dict' objects}
       16    0.000    0.000    0.000    0.000 core.py:1134(<genexpr>)
       56    0.000    0.000    0.000    0.000 inspect.py:2551(kind)
        3    0.000    0.000    0.000    0.000 utils.py:1173(is_index_like)
        2    0.000    0.000    0.000    0.000 delayed.py:94(<genexpr>)
       14    0.000    0.000    0.000    0.000 inspect.py:158(isfunction)
       20    0.000    0.000    0.000    0.000 core.py:1460(name)
        2    0.000    0.000    0.000    0.000 sizeof.py:20(sizeof_default)
        3    0.000    0.000    0.000    0.000 {built-in method _hashlib.openssl_md5}
        2    0.000    0.000    0.000    0.000 {built-in method builtins.max}
       28    0.000    0.000    0.000    0.000 {built-in method builtins.id}
        1    0.000    0.000    0.000    0.000 overlap.py:495(boundaries)
       21    0.000    0.000    0.000    0.000 enum.py:631(__new__)
       10    0.000    0.000    0.000    0.000 {method 'update' of 'dict' objects}
        2    0.000    0.000    0.000    0.000 overlap.py:935(coerce_depth_type)
        3    0.000    0.000    0.000    0.000 overlap.py:528(ensure_minimum_chunksize)
        4    0.000    0.000    0.000    0.000 utils.py:1144(<genexpr>)
        1    0.000    0.000    0.000    0.000 overlap.py:891(<listcomp>)
        9    0.000    0.000    0.000    0.000 {method 'split' of 'str' objects}
        4    0.000    0.000    0.000    0.000 {built-in method numpy.seterrobj}
       21    0.000    0.000    0.000    0.000 {method 'isidentifier' of 'str' objects}
       29    0.000    0.000    0.000    0.000 inspect.py:2539(name)
        3    0.000    0.000    0.000    0.000 overlap.py:953(<dictcomp>)
        5    0.000    0.000    0.000    0.000 core.py:1158(__dask_layers__)
       15    0.000    0.000    0.000    0.000 blockwise.py:264(<genexpr>)
       24    0.000    0.000    0.000    0.000 core.py:2746(<genexpr>)
        3    0.000    0.000    0.000    0.000 {method 'hexdigest' of '_hashlib.HASH' objects}
        2    0.000    0.000    0.000    0.000 dataclasses.py:1045(is_dataclass)
        1    0.000    0.000    0.000    0.000 overlap.py:887(<listcomp>)
        3    0.000    0.000    0.000    0.000 core.py:1180(__dask_tokenize__)
        9    0.000    0.000    0.000    0.000 {method 'replace' of 'str' objects}
        7    0.000    0.000    0.000    0.000 inspect.py:513(_is_wrapper)
        1    0.000    0.000    0.000    0.000 warnings.py:458(__enter__)
       10    0.000    0.000    0.000    0.000 {built-in method __new__ of type object at 0x56085981e580}
        1    0.000    0.000    0.000    0.000 <frozen importlib._bootstrap>:1017(_handle_fromlist)
        3    0.000    0.000    0.000    0.000 {built-in method sys.getsizeof}
        4    0.000    0.000    0.000    0.000 core.py:818(<genexpr>)
        1    0.000    0.000    0.000    0.000 abc.py:100(__subclasscheck__)
        5    0.000    0.000    0.000    0.000 core.py:1279(dtype)
        6    0.000    0.000    0.000    0.000 slicing.py:1293(__eq__)
        2    0.000    0.000    0.000    0.000 overlap.py:931(<dictcomp>)
       20    0.000    0.000    0.000    0.000 {built-in method builtins.callable}
        4    0.000    0.000    0.000    0.000 core.py:671(<genexpr>)
        2    0.000    0.000    0.000    0.000 core.py:3573(<genexpr>)
        6    0.000    0.000    0.000    0.000 utils.py:1167(<genexpr>)
        1    0.000    0.000    0.000    0.000 overlap.py:46(__init__)
        3    0.000    0.000    0.000    0.000 blockwise.py:165(<setcomp>)
        9    0.000    0.000    0.000    0.000 blockwise.py:257(<genexpr>)
        1    0.000    0.000    0.000    0.000 overlap.py:915(<lambda>)
        3    0.000    0.000    0.000    0.000 blockwise.py:176(<listcomp>)
        1    0.000    0.000    0.000    0.000 slicing.py:792(replace_ellipsis)
        3    0.000    0.000    0.000    0.000 blockwise.py:105(<setcomp>)
        4    0.000    0.000    0.000    0.000 core.py:1725(<genexpr>)
        8    0.000    0.000    0.000    0.000 {built-in method numpy.geterrobj}
       10    0.000    0.000    0.000    0.000 core.py:1155(__dask_graph__)
        2    0.000    0.000    0.000    0.000 _ufunc_config.py:437(__init__)
        4    0.000    0.000    0.000    0.000 core.py:777(<genexpr>)
        1    0.000    0.000    0.000    0.000 core.py:3565(<listcomp>)
        5    0.000    0.000    0.000    0.000 core.py:1454(_name)
        4    0.000    0.000    0.000    0.000 overlap.py:646(<genexpr>)
        3    0.000    0.000    0.000    0.000 blockwise.py:252(<listcomp>)
        1    0.000    0.000    0.000    0.000 warnings.py:477(__exit__)
       10    0.000    0.000    0.000    0.000 utils.py:87(ignoring)
        1    0.000    0.000    0.000    0.000 {method 'remove' of 'list' objects}
        6    0.000    0.000    0.000    0.000 utils.py:1177(<genexpr>)
        4    0.000    0.000    0.000    0.000 inspect.py:72(isclass)
        3    0.000    0.000    0.000    0.000 {built-in method builtins.min}
        4    0.000    0.000    0.000    0.000 core.py:2775(<genexpr>)
        6    0.000    0.000    0.000    0.000 {method 'copy' of 'dict' objects}
        4    0.000    0.000    0.000    0.000 core.py:2754(<genexpr>)
        2    0.000    0.000    0.000    0.000 core.py:664(<listcomp>)
        1    0.000    0.000    0.000    0.000 {built-in method _abc._abc_subclasscheck}
        3    0.000    0.000    0.000    0.000 {method 'indices' of 'slice' objects}
        8    0.000    0.000    0.000    0.000 {method 'extend' of 'list' objects}
        7    0.000    0.000    0.000    0.000 {built-in method sys.getrecursionlimit}
        9    0.000    0.000    0.000    0.000 inspect.py:2857(parameters)
        1    0.000    0.000    0.000    0.000 chunk.py:162(<listcomp>)
        6    0.000    0.000    0.000    0.000 {method 'move_to_end' of 'collections.OrderedDict' objects}
        4    0.000    0.000    0.000    0.000 {method 'items' of 'mappingproxy' objects}
        4    0.000    0.000    0.000    0.000 chunk.py:164(<genexpr>)
        3    0.000    0.000    0.000    0.000 blockwise.py:143(<listcomp>)
        3    0.000    0.000    0.000    0.000 {method 'encode' of 'str' objects}
        4    0.000    0.000    0.000    0.000 core.py:2750(<genexpr>)
        4    0.000    0.000    0.000    0.000 core.py:1722(<genexpr>)
        2    0.000    0.000    0.000    0.000 inspect.py:285(isbuiltin)
        1    0.000    0.000    0.000    0.000 overlap.py:638(<listcomp>)
        4    0.000    0.000    0.000    0.000 core.py:1720(<genexpr>)
        2    0.000    0.000    0.000    0.000 inspect.py:80(ismethod)
        2    0.000    0.000    0.000    0.000 overlap.py:865(<genexpr>)
        1    0.000    0.000    0.000    0.000 warnings.py:437(__init__)
        2    0.000    0.000    0.000    0.000 inspect.py:2631(__init__)
        4    0.000    0.000    0.000    0.000 rechunk.py:252(<genexpr>)
        1    0.000    0.000    0.000    0.000 slicing.py:802(<listcomp>)
        3    0.000    0.000    0.000    0.000 blockwise.py:132(<setcomp>)
        4    0.000    0.000    0.000    0.000 copy.py:107(_copy_immutable)
        2    0.000    0.000    0.000    0.000 {method 'values' of 'mappingproxy' objects}
        4    0.000    0.000    0.000    0.000 {built-in method builtins.iter}
        1    0.000    0.000    0.000    0.000 core.py:4131(asanyarray)
        2    0.000    0.000    0.000    0.000 core.py:3571(<genexpr>)
        3    0.000    0.000    0.000    0.000 blockwise.py:119(<dictcomp>)
        3    0.000    0.000    0.000    0.000 {built-in method _warnings._filters_mutated}
        2    0.000    0.000    0.000    0.000 rechunk.py:243(<genexpr>)
        2    0.000    0.000    0.000    0.000 overlap.py:891(<genexpr>)
        4    0.000    0.000    0.000    0.000 inspect.py:2543(default)
        1    0.000    0.000    0.000    0.000 slicing.py:897(<listcomp>)
        1    0.000    0.000    0.000    0.000 {method 'insert' of 'list' objects}
        2    0.000    0.000    0.000    0.000 <ipython-input-1-ac29718f1adf>:9(identity)
        2    0.000    0.000    0.000    0.000 {method 'values' of 'dict' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
        1    0.000    0.000    0.000    0.000 utils.py:132(<dictcomp>)
        1    0.000    0.000    0.000    0.000 multiarray.py:1043(copyto)
        2    0.000    0.000    0.000    0.000 {method 'values' of 'collections.OrderedDict' objects}
        1    0.000    0.000    0.000    0.000 base.py:874(<genexpr>)

@ian-r-rose
Copy link
Collaborator

@emilmelnikov it would be useful to hear how much benefit or not you'd get just from shifting the slow parts to later on. Do you know if that's useful by itself, or if it's critical to have the overall time including computation improved, too?

There are actually two related benefits we could have here:

  1. Delaying graph materialization until later. That is to say, if we are constructing the high-level graph from the interactive application but never forcing materialization by calling persist/compute until we are ready, then we can block the interactive client much less.
  2. Doing the graph materialization in a different process. If we are in a distributed context, then we can make the scheduler process do the graph materialization. So even if it is still kind of expensive to do, it won't ever happen on the client, and any interactive processes will never be blocked.

dask/array/overlap.py Outdated Show resolved Hide resolved
self.chunks = chunks
self.numblocks = numblocks
self.token = token
self._cached_keys = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is an ugly way to do things - we're passing in a whole bunch of stuff in the initialization that is really only being used in the _construct_graph method. The aim is to get things working and then tidy it up, but I want to point this out now regardless.

There's also some potential confusion with all the different sorts of name (there is a name equivalent to the original x.chunks, a name that is "overlap-"+tokenize(x, axes), and then a getitem name as well). Ideally this would be simplified.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is an ugly way to do things - we're passing in a whole bunch of stuff in the initialization that is really only being used in the _construct_graph method. The aim is to get things working and then tidy it up, but I want to point this out now regardless.

Yeah, that's a fair concern -- I would say that it comes down to the fact that we are sort-of reimplementing some of the logic belonging to a dask collection, but in a more restricted setting (no deserialization, no numpy/pandas), so it's both (a) kind of duplicative feeling, and (b) a little awkward. Perhaps @rjzamora has some ideas about patterns for making things like this less verbose.

There's also some potential confusion with all the different sorts of name (there is a name equivalent to the original x.chunks, a name that is "overlap-"+tokenize(x, axes), and then a getitem name as well). Ideally this would be simplified.

Yep, I agree! I'm certainly open to ideas for how to name things in a clearer way (so to speak).

At the very least, I think some of these things could be computed in __init__ rather than passed in. This would help it be more self-consistent as well. For example, numblocks is computable from chunks, so they aren't really independent parameters to be passed in. You may also be able to tokenize (x.name, axes) so you wouldn't have to pass in a token (I'm not actually sure if there would be other consequences to changing how getitem-name is computed)

@emilmelnikov
Copy link

emilmelnikov commented May 3, 2021

map_overlap dask array 'a' to dask array 'c': 0.363535573 seconds

That's great, happy to see this!

@emilmelnikov it would be useful to hear how much benefit or not you'd get just from shifting the slow parts to later on. Do you know if that's useful by itself, or if it's critical to have the overall time including computation improved, too?

Unfortunately, while it's awesome to be able to work with dask collections without immediate graph construction overhead, we still need to be able to slice input and compute output in a way that takes O(output_size), not O(input_size).

I've updated the benchmark script to print time needed to compute a single chunk from map_overlap'ed array.

platform.python_version() = '3.9.1'
numpy.__version__ = '1.20.2'
zarr.__version__ = '2.8.1'
dask.__version__ = '2021.04.0+30.g01603678'
src.shape = (2048, 2048, 2048)
src.chunks = (64, 64, 64)
src.nbytes = 8.00 GiB
create dask array 'a' from zarr array 'src': 0.027057957 seconds
len(a.__dask_graph__()) = 32769
map_blocks dask array 'a' to dask array 'b': 0.000718789 seconds
len(b.__dask_graph__()) = 65537
map_overlap dask array 'a' to dask array 'c': 0.349759064 seconds
slice dask array 'c' to dask array 'd': 0.000300995 seconds
d.shape = (64, 64, 64)
d.chunks = ((64,), (64,), (64,))
d.nbytes = 256.00 kiB
compute dask array 'd': 9.01714485 seconds
len(c.__dask_graph__()) = 994425

Although I'm not sure if this is just too much to ask for the current state of HighLevelGraph in distributed scheduler.

@GenevieveBuckley
Copy link
Contributor Author

Unfortunately, while it's awesome to be able to work with dask collections without immediate graph construction overhead, we still need to be able to slice input and compute output in a way that takes O(output_size), not O(input_size).

There's a related discussion here from @bmerry about making slicing scale better #5918 which I believe is relevant to this discussion too.

@gjoseph92 has also been thinking about how to improve slicing, so he may have some thoughts for our next steps, too.

@GenevieveBuckley GenevieveBuckley changed the title Skeleton for a HighLevelGraph abstract layer for map_overlap A HighLevelGraph abstract layer for map_overlap May 6, 2021
@gjoseph92
Copy link
Collaborator

@GenevieveBuckley I don't much like this pattern, but for now I'd follow the lead of other layers and take a deserializing= kwarg:

def _construct_graph(self, deserializing=False):

Conditionally switch between CallableLazyImport and a plain import based on deserializing:

dask/dask/layers.py

Lines 797 to 811 in b3a8646

if deserializing:
# Use CallableLazyImport objects to avoid importing dataframe
# module on the scheduler
split_partition_func = CallableLazyImport(
"dask.dataframe.multi._split_partition"
)
concat_func = CallableLazyImport("dask.dataframe.multi._concat_wrapper")
merge_chunk_func = CallableLazyImport(
"dask.dataframe.multi._merge_chunk_wrapper"
)
else:
# Not running on distributed scheduler - Use explicit functions
from dask.dataframe.multi import _concat_wrapper as concat_func
from dask.dataframe.multi import _merge_chunk_wrapper as merge_chunk_func
from dask.dataframe.multi import _split_partition as split_partition_func

And only set deserializing=True in __dask_distributed_unpack__:

layer_dsk = cls(**state)._construct_graph(deserializing=True)

@GenevieveBuckley
Copy link
Contributor Author

Thanks @gjoseph92, I probably should have thought to realize that other people would have run into the same problem and solved it somewhere. The tip about __dask_distributed_unpack__ was helpful, I might have missed that otherwise.

@GenevieveBuckley
Copy link
Contributor Author

And yes, this still works well with LocalCluster :)

@jakirkham jakirkham mentioned this pull request May 26, 2021
3 tasks
Copy link
Member

@jsignell jsignell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ready to be merged in then?

As a small note, it seems a little strange to me to move all layer definitions up out of the array directory. Is there an established logic behind where the Layer classes should be defined? I see that the Blockwise class is in blockwise.py for instance.

@GenevieveBuckley
Copy link
Contributor Author

Is this ready to be merged in then?

That was my impression

As a small note, it seems a little strange to me to move all layer definitions up out of the array directory. Is there an established logic behind where the Layer classes should be defined? I see that the Blockwise class is in blockwise.py for instance.

There is no established logic. This might be a good point in time to establish what the organization should be. (We could do the re-organising in a separate PR, if need be)

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ian-r-rose @gjoseph92 if either of you get a chance, it would be great to give this a final review

As a small note, it seems a little strange to me to move all layer definitions up out of the array directory

IIRC we decided to keep most layer classes in a separate layer.py module to more easily check that extra libraries aren't imported within the module (there's more context around this over in #7381). This is because we want to avoid importing libraries like NumPy, pandas, etc. on the scheduler which may not be installed.

dask/tests/test_layers.py Outdated Show resolved Hide resolved
dask/layers.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of very minor comments, but I think this is a good checkpoint. Follow-up work could include:

  1. computing layer length without materialization
  2. implementing cull (right now culling forces materialization as well)

dask/layers.py Outdated Show resolved Hide resolved
dask/tests/test_layers.py Outdated Show resolved Hide resolved
@GenevieveBuckley
Copy link
Contributor Author

GenevieveBuckley commented May 28, 2021

Now that I've fixed one of the tests so that it actually runs (thanks @jrbourbeau and @ian-r-rose), I now have a test failure to fix too 😆
I think it's failing because I haven't managed to keep numpy off the scheduler as well as I thought I had. It's late on Friday so I have to log off now, but I will take a closer look at this later.

@GenevieveBuckley
Copy link
Contributor Author

GenevieveBuckley commented Jun 1, 2021

Turns out, I made a typo and forgot a single . period character. I think this will fix my problem with the tests.

         (_array_creation, "numpy."),
-        (_array_map_overlap, "numpy"),
+        (_array_map_overlap, "numpy."),
     ],
 )

@GenevieveBuckley
Copy link
Contributor Author

Is this ready to be merged in then?

Ok, now we should be done @jsignell

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau jrbourbeau merged commit 6599905 into dask:main Jun 10, 2021
@ian-r-rose
Copy link
Collaborator

Woo!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
array highlevelgraph Issues relating to HighLevelGraphs.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants