Skip to content

Commit

Permalink
Get rid of high-level objects at the algebra level
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Jul 5, 2023
1 parent 9614b2b commit 1a8f23e
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 47 deletions.
12 changes: 6 additions & 6 deletions modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,9 @@ def apply(
Parameters
----------
left : modin.pandas.DataFrame or modin.pandas.Series
left : PandasQueryCompiler
Left operand.
right : modin.pandas.DataFrame or modin.pandas.Series
right : PandasQueryCompiler
Right operand.
func : callable(pandas.DataFrame, pandas.DataFrame, \*args, axis, \*\*kwargs) -> pandas.DataFrame
A binary function to apply `left` and `right`.
Expand All @@ -447,11 +447,11 @@ def apply(
func_args = tuple() if func_args is None else func_args
func_kwargs = dict() if func_kwargs is None else func_kwargs
qc_result = operator(
left._query_compiler,
right._query_compiler,
broadcast=right.ndim == 1,
left,
right,
broadcast=right._shape_hint in ("column", "row"),
*func_args,
axis=axis,
**func_kwargs,
)
return type(left)(query_compiler=qc_result)
return qc_result
8 changes: 4 additions & 4 deletions modin/core/dataframe/algebra/fold.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ def caller(query_compiler, fold_axis=None, *args, **kwargs):
return caller

@classmethod
def apply(cls, df, func, fold_axis=0, func_args=None, func_kwargs=None):
def apply(cls, qc, func, fold_axis=0, func_args=None, func_kwargs=None):
r"""
Apply a Fold (full-axis) function to the dataframe.
Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
qc : PandasQueryCompiler
Object to apply the operator against.
func : callable(pandas.DataFrame[NxM], \*args, \*\*kwargs) -> pandas.DataFrame[NxM]
A function to apply to every partition. Note that the function shouldn't change
the shape of the dataframe.
Expand All @@ -93,4 +93,4 @@ def apply(cls, df, func, fold_axis=0, func_args=None, func_kwargs=None):
func_kwargs = dict() if func_kwargs is None else func_kwargs
func_args = (fold_axis,) + func_args

return super().apply(df, func, func_args, func_kwargs)
return super().apply(qc, func, func_args, func_kwargs)
16 changes: 8 additions & 8 deletions modin/core/dataframe/algebra/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class GroupByReduce(TreeReduce):
arbitrary aggregation. Note: this attribute should be considered private.
"""

OUTPUT_DIM = 2
ID_LEVEL_NAME = "__ID_LEVEL_NAME__"
_GROUPBY_REDUCE_IMPL_FLAG = "__groupby_reduce_impl_func__"

Expand Down Expand Up @@ -740,7 +741,7 @@ def wrapper(df):
@classmethod
def apply(
cls,
df,
qc,
map_func,
reduce_func,
by,
Expand All @@ -753,14 +754,14 @@ def apply(
Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
qc : PandasQueryCompiler
A source DataFrame to group.
map_func : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame
A map function to apply to a groupby object in every partition.
reduce_func : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame
A reduction function to apply to the results of the map functions.
by : label or list of labels
Columns of the `df` to group on.
Columns of the `qc` to group on.
groupby_kwargs : dict, optional
Keyword arguments matching the signature of ``pandas.DataFrame.groupby``.
agg_args : tuple, optional
Expand All @@ -770,21 +771,20 @@ def apply(
Returns
-------
The same type as `df`.
PandasQueryCompiler
"""
agg_args = tuple() if agg_args is None else agg_args
agg_kwargs = dict() if agg_kwargs is None else agg_kwargs
groupby_kwargs = dict() if groupby_kwargs is None else groupby_kwargs

operator = cls.register(map_func, reduce_func)
qc_result = operator(
df._query_compiler,
df[by]._query_compiler,
qc,
qc.getitem_column_array(by if isinstance(by, list) else [by]),
axis=0,
groupby_kwargs=groupby_kwargs,
agg_args=agg_args,
agg_kwargs=agg_kwargs,
drop=True,
)

return type(df)(query_compiler=qc_result)
return qc_result
13 changes: 7 additions & 6 deletions modin/core/dataframe/algebra/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
class Operator(object):
"""Interface for building operators that can execute in parallel across partitions."""

OUTPUT_DIM = 2

def __init__(self):
raise ValueError(
"Please use {}.register instead of the constructor".format(
Expand Down Expand Up @@ -62,14 +64,14 @@ def validate_axis(cls, axis: Optional[int]) -> int:
return 0 if axis is None else axis

@classmethod
def apply(cls, df, func, func_args=None, func_kwargs=None, **kwargs):
def apply(cls, qc, func, func_args=None, func_kwargs=None, **kwargs):
r"""
Apply a function to a Modin DataFrame using the operators scheme.
Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
qc : PandasQueryCompiler
Object to apply the operator against.
func : callable(pandas.DataFrame, \*args, \*\*kwargs) -> pandas.DataFrame
A function to apply.
func_args : tuple, optional
Expand All @@ -81,12 +83,11 @@ def apply(cls, df, func, func_args=None, func_kwargs=None, **kwargs):
Returns
-------
return_type
PandasQueryCompiler
"""
operator = cls.register(func, **kwargs)

func_args = tuple() if func_args is None else func_args
func_kwargs = dict() if func_kwargs is None else func_kwargs

qc_result = operator(df._query_compiler, *func_args, **func_kwargs)
return type(df)(query_compiler=qc_result)
return operator(qc, *func_args, **func_kwargs)
11 changes: 6 additions & 5 deletions modin/core/dataframe/algebra/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
class Reduce(Operator):
"""Builder class for Reduce operator."""

OUTPUT_DIM = 1

@classmethod
def register(cls, reduce_function, axis=None):
"""
Expand Down Expand Up @@ -58,8 +60,8 @@ def apply(cls, df, func, axis=0, func_args=None, func_kwargs=None):
Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
df : PandasQueryCompiler
Object to apply the operator against.
func : callable(pandas.DataFrame, \*args, \*\*kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]]
A function to apply.
axis : int, default: 0
Expand All @@ -71,7 +73,6 @@ def apply(cls, df, func, axis=0, func_args=None, func_kwargs=None):
Returns
-------
modin.pandas.Series
PandasQueryCompiler[1xN]
"""
result = super().apply(df, func, func_args, func_kwargs, axis=axis)
return result if result.ndim == 1 else result.squeeze(axis)
return super().apply(df, func, func_args, func_kwargs, axis=axis)
15 changes: 8 additions & 7 deletions modin/core/dataframe/algebra/tree_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
class TreeReduce(Operator):
"""Builder class for TreeReduce operator."""

OUTPUT_DIM = 1

@classmethod
def register(cls, map_function, reduce_function=None, axis=None):
"""
Expand Down Expand Up @@ -57,15 +59,15 @@ def caller(query_compiler, *args, **kwargs):

@classmethod
def apply(
cls, df, map_function, reduce_function, axis=0, func_args=None, func_kwargs=None
cls, qc, map_function, reduce_function, axis=0, func_args=None, func_kwargs=None
):
r"""
Apply a map-reduce function to the dataframe.
Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
qc : PandasQueryCompiler
Object to apply the operator against.
map_function : callable(pandas.DataFrame, \*args, \*\*kwargs) -> pandas.DataFrame
A map function to apply to every partition.
reduce_function : callable(pandas.DataFrame, \*args, \*\*kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]]
Expand All @@ -79,14 +81,13 @@ def apply(
Returns
-------
modin.pandas.Series
PandasQueryCompiler[1xN]
"""
result = super().apply(
df,
return super().apply(
qc,
map_function,
func_args,
func_kwargs,
reduce_function=reduce_function,
axis=axis,
)
return result if result.ndim == 1 else result.squeeze(axis)
43 changes: 43 additions & 0 deletions modin/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,49 @@ def broadcast_item(
)


def apply_operator(operator_cls, *args, **kwargs):
"""
Apply a function using the specified operator's scheme to a DataFrame object.
Parameters
----------
operator_cls : type
Operator's class from ``modin.core.dataframe.algebra``.
*args : tuple
Positional arguments satisfying the ``.apply()`` method
signature of the passed operator (you can pass high-level
DataFrame and Series objects instead of query compilers).
**kwargs : dict
Keyword arguments satisfying the ``.apply()`` method
signature of the passed operator (you can pass high-level
DataFrame and Series objects instead of query compilers).
Returns
-------
modin.pandas.DataFrame or modin.pandas.Series
Depends on the ``operator_cls.OUTPUT_DIM`` attribute.
"""
from .dataframe import DataFrame
from .series import Series

def process_arg(arg):
if isinstance(arg, Series) and arg._query_compiler._shape_hint is None:
arg._query_compiler._shape_hint = "column"
if isinstance(arg, (DataFrame, Series)):
arg = arg._query_compiler
return arg

args = (process_arg(arg) for arg in args)
kwargs = {key: process_arg(value) for key, value in kwargs.items()}

res_qc = operator_cls.apply(*args, **kwargs)

if operator_cls.OUTPUT_DIM == 2:
return DataFrame(query_compiler=res_qc)
else:
return Series(query_compiler=res_qc)


def _walk_aggregation_func(
key: IndexLabel, value: AggFuncType, depth: int = 0
) -> Iterator[Tuple[IndexLabel, AggFuncTypeBase, Optional[str], bool]]:
Expand Down
29 changes: 18 additions & 11 deletions modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from modin.config import NPartitions, Engine, MinPartitionSize
from modin.distributed.dataframe.pandas import from_partitions
from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas
from modin.pandas.utils import apply_operator

import numpy as np
import pandas
Expand Down Expand Up @@ -1055,12 +1056,14 @@ def func(df, other, value, axis=None):
return df * other + value

# DataFrame case
md_res = Binary.apply(md_df1, md_df2, func, func_args=(10,))
md_res = apply_operator(Binary, md_df1, md_df2, func, func_args=(10,))
pd_res = func(pd_df1, pd_df2, 10)
df_equals(md_res, pd_res)

# Series case
md_res = Binary.apply(md_df1, md_df2.iloc[0, :], func, axis=1, func_args=(10,))
md_res = apply_operator(
Binary, md_df1, md_df2.iloc[0, :], func, axis=1, func_args=(10,)
)
pd_res = func(pd_df1, pd_df2.iloc[0, :], 10)
df_equals(md_res, pd_res)

Expand All @@ -1072,11 +1075,11 @@ def test_fold(self):
def func(df, value, axis):
return (df + value).cumsum(axis)

md_res = Fold.apply(md_df, func, fold_axis=0, func_args=(10, 0))
md_res = apply_operator(Fold, md_df, func, fold_axis=0, func_args=(10, 0))
pd_res = func(pd_df, 10, 0)
df_equals(md_res, pd_res)

md_res = Fold.apply(md_df, func, fold_axis=1, func_args=(10, 1))
md_res = apply_operator(Fold, md_df, func, fold_axis=1, func_args=(10, 1))
pd_res = func(pd_df, 10, 1)
df_equals(md_res, pd_res)

Expand All @@ -1092,11 +1095,15 @@ def map_func(grp):
def reduce_func(grp):
return grp.sum()

md_res = GroupByReduce.apply(
md_df, map_func, reduce_func, by=by_col, groupby_kwargs={"as_index": False}
md_res = apply_operator(
GroupByReduce,
md_df,
map_func,
reduce_func,
by=by_col,
groupby_kwargs={"as_index": False},
)
pd_res = pd_df.groupby(by_col, as_index=False).count()
# breakpoint()
df_equals(md_res, pd_res)

def test_map(self):
Expand All @@ -1107,7 +1114,7 @@ def test_map(self):
def func(df, value):
return df**value

md_res = Map.apply(md_df, func, func_args=(3,))
md_res = apply_operator(Map, md_df, func, func_args=(3,))
pd_res = func(pd_df, 3)
df_equals(md_res, pd_res)

Expand All @@ -1119,11 +1126,11 @@ def test_reduce(self):
def func(df, value, axis):
return (df**value).sum(axis)

md_res = Reduce.apply(md_df, func, func_args=(3, 0))
md_res = apply_operator(Reduce, md_df, func, func_args=(3, 0))
pd_res = func(pd_df, 3, 0)
df_equals(md_res, pd_res)

md_res = Reduce.apply(md_df, func, axis=1, func_args=(3, 1))
md_res = apply_operator(Reduce, md_df, func, axis=1, func_args=(3, 1))
pd_res = func(pd_df, 3, 1)
df_equals(md_res, pd_res)

Expand All @@ -1138,6 +1145,6 @@ def map_func(df):
def reduce_func(df):
return df.sum()

md_res = TreeReduce.apply(md_df, map_func, reduce_func)
md_res = apply_operator(TreeReduce, md_df, map_func, reduce_func)
pd_res = pd_df.count()
df_equals(md_res, pd_res)

0 comments on commit 1a8f23e

Please sign in to comment.