Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experimental] Add cardinality argument to GroupBy.aggregate #9446

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 93 additions & 9 deletions dask/dataframe/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,12 @@ def wrapper(func):
- string function name
- list of functions and/or function names, e.g. ``[np.sum, 'mean']``
- dict of column names -> function, function name or list of such.
cardinality : float or "infer", optional
Approximate ratio of aggregated data size with respect to the
initial data size. If specified, this ratio will be used to override
the defaults for ``split_every``, ``split_out``, and ``shuffle``.
If ``"infer"`` is specified, the first non-empty partition will be
used to estimate the global cardinality ratio.
split_every : int, optional
Number of intermediate partitions that may be aggregated at once.
Default is 8.
Expand Down Expand Up @@ -1653,7 +1659,9 @@ def get_group(self, key):
)

@_aggregate_docstring()
def aggregate(self, arg, split_every=None, split_out=1, shuffle=None):
def aggregate(
self, arg, cardinality=None, split_every=None, split_out=None, shuffle=None
):
column_projection = None
if isinstance(self.obj, DataFrame):
if isinstance(self.by, tuple) or np.isscalar(self.by):
Expand Down Expand Up @@ -1732,6 +1740,45 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None):
f"if pandas < 1.1.0. Pandas version is {pd.__version__}"
)

if cardinality and (split_out is None and split_every is None):
if cardinality == "infer":
# Infer from first non-empty partition
_cardinality = self.obj.map_partitions(
_groupby_cardinality_factor,
self.by,
arg,
meta=self.obj.groupby(self.by).aggregate(arg),
enforce_metadata=False,
)
max_trials = 5
for p in range(max_trials):
_val = _cardinality.partitions[p].compute()
if len(_val):
cardinality = _val.iloc[0]
break
if cardinality == "infer":
raise RuntimeError(
f"Cardinality inference failed! First {max_trials} "
f"partitions are empty."
)
elif not isinstance(cardinality, (int, float)):
# Unsupported option
raise ValueError

# Use cardinality -> split_out/every heuristics
_split_out = min(
max(
int(cardinality * self.obj.npartitions * 0.66),
1,
),
self.obj.npartitions,
)
split_every = split_every or min(max(int(1.0 / cardinality), 2), 32)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to allow split_every to be one (cf #9406 (comment)), so that there isn't any repartitioning to fewer partitions before shuffling. I found this makes a significant difference for the case when there are fewer, larger partitions.

Otherwise, this looks like a reasonable heuristic to me.

split_out = split_out or _split_out
if shuffle is None:
shuffle = _split_out > 1
split_out = split_out or 1

if shuffle:
# Shuffle-based aggregation
#
Expand Down Expand Up @@ -2225,18 +2272,30 @@ def __getattr__(self, key):
raise AttributeError(e) from e

@_aggregate_docstring(based_on="pd.core.groupby.DataFrameGroupBy.aggregate")
def aggregate(self, arg, split_every=None, split_out=1, shuffle=None):
def aggregate(
self, arg, cardinality=None, split_every=None, split_out=None, shuffle=None
):
if arg == "size":
return self.size()

return super().aggregate(
arg, split_every=split_every, split_out=split_out, shuffle=shuffle
arg,
cardinality=cardinality,
split_every=split_every,
split_out=split_out,
shuffle=shuffle,
)

@_aggregate_docstring(based_on="pd.core.groupby.DataFrameGroupBy.agg")
def agg(self, arg, split_every=None, split_out=1, shuffle=None):
def agg(
self, arg, cardinality=None, split_every=None, split_out=None, shuffle=None
):
return self.aggregate(
arg, split_every=split_every, split_out=split_out, shuffle=shuffle
arg,
cardinality=cardinality,
split_every=split_every,
split_out=split_out,
shuffle=shuffle,
)


Expand Down Expand Up @@ -2304,9 +2363,15 @@ def nunique(self, split_every=None, split_out=1):
)

@_aggregate_docstring(based_on="pd.core.groupby.SeriesGroupBy.aggregate")
def aggregate(self, arg, split_every=None, split_out=1, shuffle=None):
def aggregate(
self, arg, cardinality=None, split_every=None, split_out=None, shuffle=None
):
result = super().aggregate(
arg, split_every=split_every, split_out=split_out, shuffle=shuffle
arg,
cardinality=cardinality,
split_every=split_every,
split_out=split_out,
shuffle=shuffle,
)
if self._slice:
result = result[self._slice]
Expand All @@ -2317,9 +2382,15 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None):
return result

@_aggregate_docstring(based_on="pd.core.groupby.SeriesGroupBy.agg")
def agg(self, arg, split_every=None, split_out=1, shuffle=None):
def agg(
self, arg, cardinality=None, split_every=None, split_out=None, shuffle=None
):
return self.aggregate(
arg, split_every=split_every, split_out=split_out, shuffle=shuffle
arg,
cardinality=cardinality,
split_every=split_every,
split_out=split_out,
shuffle=shuffle,
)

@derived_from(pd.core.groupby.SeriesGroupBy)
Expand Down Expand Up @@ -2533,3 +2604,16 @@ def _shuffle_aggregate(
if split_out < shuffle_npartitions:
return result.repartition(npartitions=split_out)
return result


def _groupby_cardinality_factor(part, by, agg_spec):
# Simple utility to find the fractional memory usage of
# a groupby aggregation (relative to the initial data)
if len(part) == 0:
return part._constructor_sliced([], dtype="float64")
size_i = part.memory_usage(index=True, deep=True).sum()
size_f = (
part.groupby(by).aggregate(agg_spec).memory_usage(index=True, deep=True).sum()
)
ratio = max(min(float(size_f) / float(size_i), 1.0), 0.0)
return part._constructor_sliced([ratio])