Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding chunk & type information to dask high level graphs #7309

Merged
merged 13 commits into from Apr 29, 2021

Conversation

GenevieveBuckley
Copy link
Contributor

This PR is to give us a place for discussing implementation strategies for #7141 (comment)

@jrbourbeau - Matt says you'd probably like to have this discussion with me, or will tag in somebody else to handle it.
cc @madsbk, another person who might also be interested.

Steps:

  1. Add info about the chunks & dtype to the dask high level graph, for both Array and Dataframe
  2. Work out which tests are now failing & fix them (this is where we're at right now)
  3. Think about how we could use this information for better high level graph visualizations (probably in a separate PR)

@GenevieveBuckley
Copy link
Contributor Author

GenevieveBuckley commented Mar 3, 2021

Copying over info from #7141 (comment)

Brief update:
I've added three lines to to dask Array __new__ (I think this is what Matt was suggesting above)

class Array(DaskMethodsMixin):
    ...
    def __new__(cls, dask, name, chunks, dtype=None, meta=None, shape=None):
        ...
        self.dask.layers[name].type = type(self)
        self.dask.layers[name].chunks = chunks
        self.dask.layers[name].dtype = dtype
        return self

Test failures look localized in two different areas.

Failing tests from test_array_core.py

Running pytest, there are 7 failures in pytest dask/array/tests/test_array_core.py -v

The failures are almost all happening because name isn't a valid key in self.dask.layers for those particular tests. Usually, that's because we have name = 'some-string' and the key is a tuple like ('some-string', 0, 1).

EDIT: this is not quite accurate, the keys of self.dask are like the tuple ('some-string', 0, 1), but the keys of self.dask.layers are more like this:

(Pdb) self.dask.layers.keys()
dict_keys([139682423878752]) 

Anyway, continuing on...

In one test (test_constructor_plugin), it says that self is actually a numpy array instead and gives us AttributeError: 'numpy.ndarray' object has no attribute 'dask'.

Failing tests from test_distributed.py

There are 14 failing tests if you run pytest on just this subset: pytest dask/tests/test_distributed.py -v

Oddly, if you run the whole pytest suite then you get 10 ERRORS from test_distributed.py but no actual test failures reported (maybe they're all skipped or something, idk).

I haven't looked at what's going wrong with these yet.

@GenevieveBuckley
Copy link
Contributor Author

This has been slower going than I'd like, because I keep getting failing tests locally when I run pytest on the master branch. Some of it is this #7291 but my workaround of increasing ulimit -n hasn't quite resolved everything.

@GenevieveBuckley
Copy link
Contributor Author

This has been slower going than I'd like, because I keep getting failing tests locally when I run pytest on the master branch. Some of it is this #7291 but my workaround of increasing ulimit -n hasn't quite resolved everything.

Update: deleting everything (github repo & conda env) then starting again did the trick.

@GenevieveBuckley
Copy link
Contributor Author

GenevieveBuckley commented Mar 3, 2021

So I think for dataframes the best place is to edit the new_dd_object function. Wait, no that's silly, Ii should stick with my original plan (adding it to either the __init__ of _Frame or making a new __init__ for DataFrame which first calls super().__init__ - but maybe we don't need the extra complexity).

Are dataframe divisions analogous to dask array chunks?

I think I can find the dtype by looking at what meta is.

@GenevieveBuckley
Copy link
Contributor Author

The tests all pass if I run pytest with only the changes made to dask/dataframe/core.py. So that's an unexpected bonus, I was expecting to find more problems pop up when I did that.

@GenevieveBuckley
Copy link
Contributor Author

Also, the errors/failures I saw earlier in test_distributed.py seem to have disappeared (maybe related to whatever weirdness was causing this). So that's good news too.

@GenevieveBuckley
Copy link
Contributor Author

GenevieveBuckley commented Mar 5, 2021

Found the common thread!
These changes aren't compatible with dask.array.store() given compute=False AND return_stored=True keyword arguments.

@GenevieveBuckley
Copy link
Contributor Author

Here is a better minimal reproducible example for our test failures (example is from the docs here):

import dask.array as da
import zarr as zr
c = (2, 2)
d = da.ones((10, 11), chunks=c)
z1 = zr.open_array('lazy.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
d1 = d.store(z1, compute=False, return_stored=True)
# fails with KeyError

@GenevieveBuckley
Copy link
Contributor Author

The problem seems to be here:

dask/dask/array/core.py

Lines 1028 to 1034 in 850472d

load_store_dsk = retrieve_from_ooc(store_keys, store_dsk, store_dsk_2)
result = tuple(
Array(load_store_dsk, "load-store-%s" % t, s.chunks, meta=s)
for s, t in zip(sources, toks)
)

Commenting out line 1029 will cause the same kind of failure, when passing a compute=True keyword argument.

@GenevieveBuckley
Copy link
Contributor Author

tl:dr

What is happening

Most tests are failing because we're calling dask.array.store() with compute=False and return_stored=True. That means this line isn't executed

load_store_dsk = retrieve_from_ooc(store_keys, store_dsk, store_dsk_2)

So then the dictionary self.dask.layers doesn't contain the keys we are expecting. We think there should be a key called 'load-store-6e49a6e2-7d68-11eb-9953-782b46cc48ef' but instead we only get dict_keys([139682423878752]).

Separately, test_constructor_plugin fails because of this line in the test:

with dask.config.set(array_plugins=[lambda x: x.compute()]):

which means that when we make a new dask array, self.dask is a numpy array and not a HighLevelGraph (so we get an AttributeError).

What will we do about it?

We need to work out:
!. What behaviour do we want in these situations?
2. How will we know when we're in that situation? (It's probably easy to check if self.dask is not a HighLevelGraph, but maybe it's a bit harder when it is a HighLevelGraph but just not one that has a dictionary with the keys we were expecting)

Base automatically changed from master to main March 8, 2021 20:20
@GenevieveBuckley
Copy link
Contributor Author

@rjzamora & @crusaderky - Martin suggested you might be interested in taking a look at this. It's blocked until we've made a decision about the desired behaviour moving forward. Here's the summary:; #7309 (comment)

@crusaderky
Copy link
Collaborator

There's no guarantee that name matches an actually existing layer. It normally does, but there are exceptions. And you have to be very careful going down the route of "fixing" these exceptions, because you must avoid at all costs collisions in layer names.
This is, by the way, also a problem in __dask_layers__().
I think that your PR should try fetching a layer matching the name and gracefully do nothing if not found.

Please don't set attributes in a Python class that are not explicitly declared in the class itself. It's bad practice and a guarantee for things to fall apart when either (1) the class declares __slots__ or (2) it is compiled with Cython or equivalent. I think that a generic dict container, e.g. Layer.info or symilar, would be a reasonable design.

self.dask is a numpy array

This makes no sense to me; how can a function that replaces a da.Array with a np.ndarray alter the da.Array.dask?

dask/array/core.py Outdated Show resolved Hide resolved
@GenevieveBuckley
Copy link
Contributor Author

Thanks for your comments @crusaderky

  1. I've added an info dictionary as an attribute to the Layer class with self.info = {}.
  2. I've added an if name in self.dask.layers . (In retrospect this seems super obvious, I think at the time I was hesitant to add it just in case it was a sign I needed to "fix" things properly. Glad to hear that's not the case)
  3. The reason we get a numpy array in test_constructor_plugin is that the array plugin passed is a function to call .compute() - the result of which is a numpy array. I'll explain this more clearly in another comment.

@GenevieveBuckley
Copy link
Contributor Author

GenevieveBuckley commented Mar 12, 2021

Here is a more minimal example showing the reason for the failure in test_constructor_plugin.

def test_genevieve():
    with dask.config.set(array_plugins=[lambda x: x.compute()]):
        x = da.ones(10, chunks=5)
        y = x + 1

    assert isinstance(y, np.ndarray)

Because the array plugin is a function calling .compute(), we get a numpy array which leads to unexpected results. Click to expand below to see the full details from pytest:

Details:
=================================== FAILURES ===================================
________________________________ test_genevieve ________________________________

    def test_genevieve():
        with dask.config.set(array_plugins=[lambda x: x.compute()]):
>           x = da.ones(10, chunks=5)

dask/array/tests/test_array_core.py:3895: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/array/wrap.py:78: in wrap_func_shape_as_first_arg
    return Array(dsk, name, chunks, dtype=dtype, meta=kwargs.get("meta", None))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

cls = <class 'dask.array.core.Array'>
dask = <dask.highlevelgraph.HighLevelGraph object at 0x7fe43c9f7fa0>
name = 'ones-5dd10755422ede42ff13cddda545a499', chunks = ((5, 5),)
dtype = dtype('float64'), meta = array(2.5e-323), shape = None

    def __new__(cls, dask, name, chunks, dtype=None, meta=None, shape=None):
        self = super(Array, cls).__new__(cls)
        assert isinstance(dask, Mapping)
        if not isinstance(dask, HighLevelGraph):
            dask = HighLevelGraph.from_collections(name, dask, dependencies=())
        self.dask = dask
        self.name = str(name)
        meta = meta_from_array(meta, dtype=dtype)
    
        if (
            isinstance(chunks, str)
            or isinstance(chunks, tuple)
            and chunks
            and any(isinstance(c, str) for c in chunks)
        ):
            dt = meta.dtype
        else:
            dt = None
        self._chunks = normalize_chunks(chunks, shape, dtype=dt)
        if self.chunks is None:
            raise ValueError(CHUNKS_NONE_ERROR_MESSAGE)
    
        self._meta = meta_from_array(meta, ndim=self.ndim, dtype=dtype)
    
        for plugin in config.get("array_plugins", ()):
            result = plugin(self)
            if result is not None:
                self = result
    
>       if name in self.dask.layers:
E       AttributeError: 'numpy.ndarray' object has no attribute 'dask'

dask/array/core.py:1159: AttributeError

The most obvious fix is to add a hasattr check to the if statement, so:

        if hasattr(self, 'dask') and name in self.dask.layers:

@GenevieveBuckley GenevieveBuckley marked this pull request as ready for review March 12, 2021 03:01
@GenevieveBuckley
Copy link
Contributor Author

I'm happy with this, unless there's any other feedback

Copy link
Collaborator

@crusaderky crusaderky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing unit tests. Also, have there been measures on real-life use cases for storing the chunks information for each intermediate step this way? It can get pretty large pretty fast.

I'd like to hear from others what they think about the design - particularly since this new info dict is heavily overlapping with the annotations dict (but, unlike annotations, it will get lost in transit when moving to the distributed scheduler).

dask/array/core.py Outdated Show resolved Hide resolved
self.dask.layers[name].info["type"] = type(self)
self.dask.layers[name].info["divisions"] = divisions
self.dask.layers[name].info["chunk_type"] = type(meta)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. could use the same refactoring as da.Array
  2. why nothing for Series?
  3. dd does not have anything equivalent to array_plugins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. could use the same refactoring as da.Array - ok, done
  2. why nothing for Series? - Good point, I've added "series_dtypes": {col: meta[col].dtype for col in meta.columns}
  3. dd does not have anything equivalent to array_plugins - This looks like a comment and not a request. Thanks for mentioning it!

@@ -59,6 +59,7 @@ class Layer(collections.abc.Mapping):
annotations: Optional[Mapping[str, Any]]

def __init__(self, annotations: Mapping[str, Any] = None):
self.info = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. please add type annotations above
  2. please add documentation
  3. you'll lose everything on clone(), cull(), and possibly some other methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. please add type annotations above - added
  2. please add documentation - added
  3. you'll lose everything on clone(), cull(), and possibly some other methods - I think this is a comment, not a request to fix just now? If I've misunderstood please let me know.

Co-authored-by: crusaderky <crusaderky@gmail.com>
@GenevieveBuckley
Copy link
Contributor Author

I'd like to hear from others what they think about the design - particularly since this new info dict is heavily overlapping with the annotations dict (but, unlike annotations, it will get lost in transit when moving to the distributed scheduler).

Are you suggesting putting this information into the annotations dict instead?

@crusaderky
Copy link
Collaborator

@GenevieveBuckley That is a possibility but the potential performance implications of transferring the extra data to the scheduler are non-trivial. I'm suggesting that those who are heavily involved in the design of graph annotations should be involved in this design too

@GenevieveBuckley
Copy link
Contributor Author

I think that we would not want these annotations to be expanded to every key at either stage. The chunks in particular is likely to be large. For large arrays this would be an n-squared cost.

But there may be cases where we want to annotate the layers without annotating the underlying tasks. Showing the chunks of an array layer is such a case I think.

In a lot of cases, I don't think users actually want this level of information. At least, not all of the time. With arrays, it's common to have uniformly sized chunks, perhaps with some funny sized ones towards the edges. Can we separate this somehow into:

  1. What is the typical chunk size at this point in the computation? This is probably what people want to know first, then...
  2. What is the size of all of the chunks?

I don't know if it's easy to create an answer to (1), but I think that's the first question people try and answer when troubleshooting, even if you start out by giving them all the detailed chunk sizes.

@mrocklin
Copy link
Member

mrocklin commented Apr 1, 2021

This conversation has stalled, mostly due to future-leaning concerns about how this information that we're adding to the layers on the client side could affect downstream submission on the scheduler and the workers. However, these concerns aren't yet valid today (we don't currently care about this information on the scheduler or workers yet, and so it would be good if these concerns did not block us).

Short term, I recommend that we procede without using the current annotations machinery, and instead put this information somewhere else, something like collection_annotations. We can just ignore these annotations completely when shipping things off to the scheduler for now. If in the future we do want to send this data (which sounds like fun to me) then we can address the issues about overly replicating this data then.

@sjperkins are you ok with deferring the broader question here and letting this work go on? If so, are you ok with putting this metadata somewhere other than annotations for the short-term?

@sjperkins
Copy link
Member

Apologies I did not realise this issue was blocked. I'm happy for this to proceed and defer the broader question.

@GenevieveBuckley
Copy link
Contributor Author

Ok I've tried out the collection_annotations approach suggested. Can I ask you all to take a look & see what you think now?

(If any of the CI tests fail, I'll come back and address those - it's late on Friday here so I should stop working now)

@mrocklin
Copy link
Member

Ok I've tried out the collection_annotations approach suggested. Can I ask you all to take a look & see what you think now?

(If any of the CI tests fail, I'll come back and address those - it's late on Friday here so I should stop working now)

@sjperkins would you be interested in taking another look through this?

Copy link
Member

@sjperkins sjperkins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understand of this PR is that it exists to provide information to dask visualization routines (dask.visualize?). I think the approach for the Array and Dataframe collections is fine.

What about Delayed and Bag? I believe Delayed objects have a HLG with layers but Bags do not. I'd be happy with attaching annotations to Delayed objects only, perhaps just with a type attribute. Are there any other attributes appropriate for Delayed?

dask/highlevelgraph.py Show resolved Hide resolved
@mrocklin
Copy link
Member

I think the approach for the Array and Dataframe collections is fine.

Woo

What about Delayed and Bag? I believe Delayed objects have a HLG with layers but Bags do not. I'd be happy with attaching annotations to Delayed objects only, perhaps just with a type attribute. Are there any other attributes appropriate for Delayed?

Bag could use high level graphs if anyone spent the time to do it. I think that we're establishing a protocol here that can be extended in the future.

@GenevieveBuckley
Copy link
Contributor Author

My understand of this PR is that it exists to provide information to dask visualization routines
Yes, this is accurate.

I think the approach for the Array and Dataframe collections is fine.
What about Delayed and Bag?

Since this PR supports experimental mucking around with visualizations, I think it makes sense to defer any decisions for Delayed & Bag until after we've had a chance to try it and see if it's a useful thing we want to do.

@GenevieveBuckley
Copy link
Contributor Author

Thank you for looking over this again @sjperkins (and thank you @crusaderky for your earlier review, too)

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work on this @GenevieveBuckley! I've left a few small final comments, but overall this looks ready to merge

dask/array/core.py Outdated Show resolved Hide resolved
dask/highlevelgraph.py Outdated Show resolved Hide resolved
dask/array/core.py Outdated Show resolved Hide resolved
GenevieveBuckley and others added 2 commits April 29, 2021 10:39
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @GenevieveBuckley for your work on this (and @sjperkins @crusaderky for reviewing)! I'm looking forward to seeing how we can use this new information

@GenevieveBuckley
Copy link
Contributor Author

@freyam this PR might be relevant for your GSOC project later on.

You don't have to read everything or understand all of the discussion (obviously things changed as we went, so the first parts might not make much sense.

The part that's relevant for you is that we now have a new dictionary called collection_annotations (alongside the existing annotations dictionary). The new collection_annotations dictionary is an attribute of the dask layer, and holds information about the characteristics of dask arrays or dataframes (the info you get is slightly different depending on whether it's an array or a dataframe). We might try to integrate this information into our visualizations.

In [1]: import dask.array as da

In [2]: arr = da.random.random((100,100), chunks=(10,10))

In [3]: arr.dask.layers
Out[3]: {'random_sample-8c39afc91c532043b96497a2f2fa9875': <dask.highlevelgraph.MaterializedLayer at 0x7fe4b4339640>}

In [4]: mylayer = arr.dask.layers['random_sample-8c39afc91c532043b96497a2f2fa987
   ...: 5']

In [5]: mylayer
Out[5]: <dask.highlevelgraph.MaterializedLayer at 0x7fe4b4339640>

In [6]: mylayer.collection_annotations
Out[6]: 
{'type': dask.array.core.Array,
 'chunk_type': numpy.ndarray,
 'chunks': ((10, 10, 10, 10, 10, 10, 10, 10, 10, 10),
  (10, 10, 10, 10, 10, 10, 10, 10, 10, 10)),
 'dtype': None}

@rjzamora
Copy link
Member

rjzamora commented Jul 1, 2021

Thanks @GenevieveBuckley for your work on this (and @sjperkins @crusaderky for reviewing)! I'm looking forward to seeing how we can use this new information

Just a note: I am starting to (slowly) work through a design doc for general column-projection in Dask-Dataframe, and I am getting the sense that collection_annotations may be the right place to store Layer-wise column-dependency properties. That is, I am thinking that optimize_dataframe_getitem can be extended to work accross multiple layers if common Dataframe operations are modified to (optionally) store the required input and output columns for the generated Layers. If all Layers in a HLG include this metadata, and the root Layer is a DataFrameIOLayer, then column projection becomes relatively simple.

@mrocklin
Copy link
Member

mrocklin commented Jul 1, 2021 via email

@rjzamora
Copy link
Member

rjzamora commented Jul 2, 2021

Rick, I think that it is also reasonable for you to make a DataframeLayer
class that defines a consistent set of attributes.

Right - That is certainly plan "A", but I am starting to consider alternatives as I struggle to decide on a "clean" sub-classing design :)

@mrocklin
Copy link
Member

mrocklin commented Jul 2, 2021 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants