Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding chunk & type information to dask high level graphs #7309

Merged
merged 13 commits into from Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions dask/array/core.py
Expand Up @@ -1156,6 +1156,24 @@ def __new__(cls, dask, name, chunks, dtype=None, meta=None, shape=None):
if result is not None:
self = result

try:
layer = self.dask.layers[name]
except (AttributeError, KeyError):
# self is no longer an Array after applying the plugins, OR
# a plugin replaced the HighLevelGraph with a plain dict, OR
# name is not the top layer's name (this can happen after the layer is
# manipulated, to avoid a collision)
pass
else:
layer.collection_annotations.update(
{
"type": type(self),
"chunk_type": type(meta),
GenevieveBuckley marked this conversation as resolved.
Show resolved Hide resolved
"chunks": chunks,
"dtype": dtype,
GenevieveBuckley marked this conversation as resolved.
Show resolved Hide resolved
}
)

return self

def __reduce__(self):
Expand Down
14 changes: 14 additions & 0 deletions dask/dataframe/core.py
Expand Up @@ -3609,6 +3609,20 @@ class DataFrame(_Frame):
_token_prefix = "dataframe-"
_accessors = set()

def __init__(self, dsk, name, meta, divisions):
super().__init__(dsk, name, meta, divisions)
self.dask.layers[name].collection_annotations.update(
{
"type": type(self),
"divisions": divisions,
"dataframe_type": type(meta),
"series_dtypes": {
col: meta[col].dtype if hasattr(meta[col], "dtype") else None
for col in meta.columns
},
}
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. could use the same refactoring as da.Array
  2. why nothing for Series?
  3. dd does not have anything equivalent to array_plugins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. could use the same refactoring as da.Array - ok, done
  2. why nothing for Series? - Good point, I've added "series_dtypes": {col: meta[col].dtype for col in meta.columns}
  3. dd does not have anything equivalent to array_plugins - This looks like a comment and not a request. Thanks for mentioning it!

def __array_wrap__(self, array, context=None):
if isinstance(context, tuple) and len(context) > 0:
if isinstance(context[1][0], np.ndarray) and context[1][0].shape == ():
Expand Down
30 changes: 29 additions & 1 deletion dask/highlevelgraph.py
Expand Up @@ -57,12 +57,40 @@ class Layer(collections.abc.Mapping):
"""

annotations: Optional[Mapping[str, Any]]
collection_annotations: Optional[Mapping[str, Any]]

def __init__(self, annotations: Mapping[str, Any] = None):
def __init__(
self,
annotations: Mapping[str, Any] = None,
collection_annotations: Mapping[str, Any] = {},
GenevieveBuckley marked this conversation as resolved.
Show resolved Hide resolved
):
"""Initialize Layer object.

Parameters
----------
annotations : Mapping[str, Any], optional
By default, None.
Annotations are metadata or soft constraints associated with tasks
that dask schedulers may choose to respect:
They signal intent without enforcing hard constraints.
As such, they are primarily designed for use with the distributed
scheduler. See the dask.annotate function for more information.
collection_annotations : Mapping[str, Any], optional
By default an empty dict.
Experimental, intended to assist with visualizing the performance
characteristics of Dask computations.
These annotations are *not* passed to the distributed scheduler.
"""
if annotations:
self.annotations = annotations
else:
self.annotations = copy.copy(config.get("annotations", None))
if collection_annotations:
self.collection_annotations = collection_annotations
else:
self.collection_annotations = copy.copy(
config.get("collection_annotations", {})
)
GenevieveBuckley marked this conversation as resolved.
Show resolved Hide resolved

@abc.abstractmethod
def is_materialized(self) -> bool:
Expand Down