Skip to content

Commit

Permalink
FEAT-modin-project#6883: Support grouping on a Series with range-part…
Browse files Browse the repository at this point in the history
…itioning impl

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Jan 25, 2024
1 parent 43134ef commit a13bfee
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 114 deletions.
163 changes: 126 additions & 37 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2379,7 +2379,14 @@ def combine_and_apply(
)

def _apply_func_to_range_partitioning(
self, key_columns, func, ascending=True, preserve_columns=False, **kwargs
self,
key_columns,
func,
ascending=True,
preserve_columns=False,
data=None,
data_key_columns=None,
**kwargs,
):
"""
Reshuffle data so it would be range partitioned and then apply the passed function row-wise.
Expand All @@ -2402,21 +2409,57 @@ def _apply_func_to_range_partitioning(
PandasDataframe
A new dataframe.
"""
if data is not None:
new_grouper_cols = pandas.MultiIndex.from_tuples(
[
("grouper", *col) if isinstance(col, tuple) else ("grouper", col)
for col in self.columns
]
)
key_columns = [
("grouper", *col) if isinstance(col, tuple) else ("grouper", col)
for col in key_columns
]
new_data_cols = pandas.MultiIndex.from_tuples(
[
("data", *col) if isinstance(col, tuple) else ("data", col)
for col in data.columns
]
)
if data_key_columns is None:
data_key_columns = []
else:
data_key_columns = [
("data", *col) if isinstance(col, tuple) else ("data", col)
for col in data_key_columns
]
key_columns += data_key_columns

grouper = self.copy()
grouper.columns = new_grouper_cols

data = data.copy()
data.columns = new_data_cols

grouper = grouper.concat(axis=1, others=[data], how="right", sort=False)
else:
grouper = self

# If there's only one row partition can simply apply the function row-wise without the need to reshuffle
if self._partitions.shape[0] == 1:
result = self.apply_full_axis(
if grouper._partitions.shape[0] == 1:
result = grouper.apply_full_axis(
axis=1,
func=func,
new_columns=self.copy_columns_cache() if preserve_columns else None,
new_columns=grouper.copy_columns_cache() if preserve_columns else None,
)
if preserve_columns:
result._set_axis_lengths_cache(self._column_widths_cache, axis=1)
result._set_axis_lengths_cache(grouper._column_widths_cache, axis=1)
return result

# don't want to inherit over-partitioning so doing this 'min' check
ideal_num_new_partitions = min(len(self._partitions), NPartitions.get())
m = len(self) / ideal_num_new_partitions
sampling_probability = (1 / m) * np.log(ideal_num_new_partitions * len(self))
ideal_num_new_partitions = min(len(grouper._partitions), NPartitions.get())
m = len(grouper) / ideal_num_new_partitions
sampling_probability = (1 / m) * np.log(ideal_num_new_partitions * len(grouper))
# If this df is overpartitioned, we try to sample each partition with probability
# greater than 1, which leads to an error. In this case, we can do one of the following
# two things. If there is only enough rows for one partition, and we have only 1 column
Expand All @@ -2427,65 +2470,65 @@ def _apply_func_to_range_partitioning(
if sampling_probability >= 1:
from modin.config import MinPartitionSize

ideal_num_new_partitions = round(len(self) / MinPartitionSize.get())
if len(self) < MinPartitionSize.get() or ideal_num_new_partitions < 2:
ideal_num_new_partitions = round(len(grouper) / MinPartitionSize.get())
if len(grouper) < MinPartitionSize.get() or ideal_num_new_partitions < 2:
# If the data is too small, we shouldn't try reshuffling/repartitioning but rather
# simply combine all partitions and apply the sorting to the whole dataframe
return self.combine_and_apply(func=func)
return grouper.combine_and_apply(func=func)

if ideal_num_new_partitions < len(self._partitions):
if len(self._partitions) % ideal_num_new_partitions == 0:
if ideal_num_new_partitions < len(grouper._partitions):
if len(grouper._partitions) % ideal_num_new_partitions == 0:
joining_partitions = np.split(
self._partitions, ideal_num_new_partitions
grouper._partitions, ideal_num_new_partitions
)
else:
step = round(len(self._partitions) / ideal_num_new_partitions)
step = round(len(grouper._partitions) / ideal_num_new_partitions)
joining_partitions = np.split(
self._partitions,
range(step, len(self._partitions), step),
grouper._partitions,
range(step, len(grouper._partitions), step),
)

new_partitions = np.array(
[
self._partition_mgr_cls.column_partitions(
grouper._partition_mgr_cls.column_partitions(
ptn_grp, full_axis=False
)
for ptn_grp in joining_partitions
]
)
else:
new_partitions = self._partitions
new_partitions = grouper._partitions
else:
new_partitions = self._partitions
new_partitions = grouper._partitions

shuffling_functions = ShuffleSortFunctions(
self,
grouper,
key_columns,
ascending[0] if is_list_like(ascending) else ascending,
ideal_num_new_partitions,
**kwargs,
)

# here we want to get indices of those partitions that hold the key columns
key_indices = self.columns.get_indexer_for(key_columns)
key_indices = grouper.columns.get_indexer_for(key_columns)
partition_indices = np.unique(
np.digitize(key_indices, np.cumsum(self.column_widths))
np.digitize(key_indices, np.cumsum(grouper.column_widths))
)

new_partitions = self._partition_mgr_cls.shuffle_partitions(
new_partitions = grouper._partition_mgr_cls.shuffle_partitions(
new_partitions,
partition_indices,
shuffling_functions,
func,
)

result = self.__constructor__(new_partitions)
result = grouper.__constructor__(new_partitions)
if preserve_columns:
result.set_columns_cache(self.copy_columns_cache())
result.set_columns_cache(grouper.copy_columns_cache())
# We perform the final steps of the sort on full axis partitions, so we know that the
# length of each partition is the full length of the dataframe.
if self.has_materialized_columns:
result._set_axis_lengths_cache([len(self.columns)], axis=1)
if grouper.has_materialized_columns:
result._set_axis_lengths_cache([len(grouper.columns)], axis=1)
return result

@lazy_metadata_decorator(apply_axis="both")
Expand Down Expand Up @@ -3710,7 +3753,9 @@ def _compute_new_widths():
def groupby(
self,
axis: Union[int, Axis],
by: Union[str, List[str]],
internal_by: List[str],
external_by: List["PandasDataframe"],
by_positions: List[int],
operator: Callable,
result_schema: Optional[Dict[Hashable, type]] = None,
align_result_columns=False,
Expand All @@ -3723,8 +3768,10 @@ def groupby(
----------
axis : int or modin.core.dataframe.base.utils.Axis
The axis to apply the grouping over.
by : string or list of strings
internal_by : list of strings
One or more column labels to use for grouping.
external_by : list of PandasDataframes
by_positions : list of ints
operator : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame
The operation to carry out on each of the groups. The operator is another
algebraic operator with its own user-defined function parameter, depending
Expand Down Expand Up @@ -3761,12 +3808,28 @@ def groupby(
f"Algebra groupby only implemented row-wise. {axis.name} axis groupby not implemented yet!"
)

if not isinstance(by, list):
by = [by]

has_external_grouper = len(external_by) > 0
skip_on_aligning_flag = "__skip_me_on_aligning__"

def apply_func(df): # pragma: no cover
if has_external_grouper:
external_grouper = df["grouper"]
external_grouper = [
external_grouper.iloc[:, i]
for i in range(len(external_grouper.columns))
]
df = df["data"]
else:
external_grouper = []

by = []
# breakpoint()
for idx in by_positions:
if idx >= 0:
by.append(external_grouper[idx])
else:
by.append(internal_by[-idx - 1])

result = operator(df.groupby(by, **kwargs))
if (
align_result_columns
Expand All @@ -3780,11 +3843,37 @@ def apply_func(df): # pragma: no cover
# executed over this partition and so it has incorrect columns
# that shouldn't be considered on the aligning phase
result.attrs[skip_on_aligning_flag] = True
# breakpoint()
new_index_names = tuple(
None
if isinstance(name, str) and name.startswith(MODIN_UNNAMED_SERIES_LABEL)
else name
for name in result.index.names
)
result.index.names = new_index_names
return result

result = self._apply_func_to_range_partitioning(
key_columns=by,
if len(external_by) > 0:
grouper = (
external_by[0]
if len(external_by) == 1
else external_by[0].concat(
axis=1, others=external_by[1:], how="left", sort=False
)
)
grouper_key_columns = grouper.columns
data = self
data_key_columns = internal_by
else:
grouper = self
grouper_key_columns = internal_by
data, data_key_columns = None, None

result = grouper._apply_func_to_range_partitioning(
key_columns=grouper_key_columns,
func=apply_func,
data=data,
data_key_columns=data_key_columns,
)

# no need aligning columns if there's only one row partition
Expand Down Expand Up @@ -3871,8 +3960,8 @@ def join_cols(df, *cols):
row_lengths=result._row_lengths_cache,
)

if not result.has_materialized_index:
by_dtypes = ModinDtypes(self._dtypes).lazy_get(by)
if not result.has_materialized_index and not has_external_grouper:
by_dtypes = ModinDtypes(self._dtypes).lazy_get(internal_by)
if by_dtypes.is_materialized:
new_index = ModinIndex(value=result, axis=0, dtypes=by_dtypes)
result.set_index_cache(new_index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,7 @@ def shuffle_partitions(
new_partitions = [
[
cls._column_partitions_class(row_partition, full_axis=False).apply(
final_shuffle_func
final_shuffle_func,
)
]
for row_partition in split_row_partitions
Expand Down

0 comments on commit a13bfee

Please sign in to comment.