Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 39 additions & 102 deletions python/benchmarks/bench_eval_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,17 +758,15 @@ def _cogrouped_map_arrow_left_semi(left, right):
"multi_key": (200, 5_000, 3, 5),
}

@staticmethod
def _build_scenario(name):
@classmethod
def _build_scenario(cls, name):
"""Build a cogroup scenario: two DataFrames with the same grouping structure.

Unlike grouped map (which wraps columns in a struct), cogroup batches
have flat columns: [key_col_0, ..., key_col_k, val_col_0, ..., val_col_v].
"""
np.random.seed(42)
num_groups, rows_per_group, num_key_cols, num_value_cols = (
_CogroupedMapArrowBenchMixin._scenario_configs[name]
)
num_groups, rows_per_group, num_key_cols, num_value_cols = cls._scenario_configs[name]
n_cols = num_key_cols + num_value_cols
type_pool = MockDataFactory.MIXED_TYPES[:n_cols]
while len(type_pool) < n_cols:
Expand All @@ -784,22 +782,27 @@ def _build_scenario(name):
return_type = StructType(schema.fields[num_key_cols:])
return (cogroups, return_type, num_key_cols, num_value_cols)

_eval_type = PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF
# Each UDF entry: (func, n_args). n_args=2 -> func(left, right);
# n_args=3 -> func(key, left, right). The Arrow path has no 3-arg variant,
# but the tuple shape is shared with the Pandas sibling so ``_write_scenario``
# can be inherited unchanged.
_udfs = {
"identity_udf": _cogrouped_map_arrow_identity,
"concat_udf": _cogrouped_map_arrow_concat,
"left_semi_udf": _cogrouped_map_arrow_left_semi,
"identity_udf": (_cogrouped_map_arrow_identity, 2),
"concat_udf": (_cogrouped_map_arrow_concat, 2),
"left_semi_udf": (_cogrouped_map_arrow_left_semi, 2),
}
params = [list(_scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]

def _write_scenario(self, scenario, udf_name, buf):
groups, schema, num_key_cols, num_value_cols = self._build_scenario(scenario)
udf_func = self._udfs[udf_name]
udf_func, _ = self._udfs[udf_name]
left_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, num_value_cols)
right_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, num_value_cols)
arg_offsets = left_offsets + right_offsets
MockProtocolWriter.write_worker_input(
PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF,
self._eval_type,
lambda b: MockProtocolWriter.write_udf_payload(udf_func, schema, arg_offsets, b),
lambda b: MockProtocolWriter.write_grouped_data_payload(groups, buf=b),
buf,
Expand All @@ -819,8 +822,15 @@ class CogroupedMapArrowUDFPeakmemBench(_CogroupedMapArrowBenchMixin, _PeakmemBen
# ``pandas.DataFrame``. Optional 3-arg variant ``(key, left, right)``.


class _CogroupedMapPandasBenchMixin:
"""Provides _write_scenario for SQL_COGROUPED_MAP_PANDAS_UDF."""
class _CogroupedMapPandasBenchMixin(_CogroupedMapArrowBenchMixin):
"""Provides _write_scenario for SQL_COGROUPED_MAP_PANDAS_UDF.

Inherits ``_build_scenario`` and ``_write_scenario`` from the Arrow
sibling; only the eval type, the UDFs, and the per-scenario row counts
differ. Adds a 3-arg ``key_identity_udf`` variant the Arrow path lacks
(``_write_scenario`` ignores the ``n_args`` slot, so the extra entry is
handled by the inherited writer).
"""

def _cogrouped_map_pandas_identity(left, right):
"""Identity cogroup UDF: returns left DataFrame as-is."""
Expand Down Expand Up @@ -852,32 +862,7 @@ def _cogrouped_map_pandas_key_identity(key, left, right):
"multi_key": (100, 1_000, 3, 5),
}

@staticmethod
def _build_scenario(name):
"""Build a cogroup scenario: two DataFrames with the same grouping structure.

Like cogrouped arrow, batches have flat columns:
[key_col_0, ..., key_col_k, val_col_0, ..., val_col_v].
"""
np.random.seed(42)
num_groups, rows_per_group, num_key_cols, num_value_cols = (
_CogroupedMapPandasBenchMixin._scenario_configs[name]
)
n_cols = num_key_cols + num_value_cols
type_pool = MockDataFactory.MIXED_TYPES[:n_cols]
while len(type_pool) < n_cols:
type_pool = type_pool + MockDataFactory.MIXED_TYPES[: n_cols - len(type_pool)]

cogroups, schema = MockDataFactory.make_cogrouped_batches(
num_groups=num_groups,
num_rows=rows_per_group,
num_cols=n_cols,
spark_type_pool=type_pool,
batch_size=rows_per_group,
)
return_type = StructType(schema.fields[num_key_cols:])
return (cogroups, return_type, num_key_cols, num_value_cols)

_eval_type = PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF
# Each UDF entry: (func, n_args). n_args=2 -> func(left, right);
# n_args=3 -> func(key, left, right).
_udfs = {
Expand All @@ -889,19 +874,6 @@ def _build_scenario(name):
params = [list(_scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]

def _write_scenario(self, scenario, udf_name, buf):
groups, schema, num_key_cols, num_value_cols = self._build_scenario(scenario)
udf_func, _ = self._udfs[udf_name]
left_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, num_value_cols)
right_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, num_value_cols)
arg_offsets = left_offsets + right_offsets
MockProtocolWriter.write_worker_input(
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
lambda b: MockProtocolWriter.write_udf_payload(udf_func, schema, arg_offsets, b),
lambda b: MockProtocolWriter.write_grouped_data_payload(groups, buf=b),
buf,
)


class CogroupedMapPandasUDFTimeBench(_CogroupedMapPandasBenchMixin, _TimeBenchBase):
pass
Expand Down Expand Up @@ -1729,11 +1701,11 @@ def _window_agg_arrow_mean_multi(col0, col1):
"wide_cols": (200, 5_000, 20),
}

@staticmethod
def _build_scenario(name):
@classmethod
def _build_scenario(cls, name):
"""Build a single scenario by name."""
np.random.seed(42)
num_groups, rows_per_group, n_cols = _WindowAggArrowBenchMixin._scenario_configs[name]
num_groups, rows_per_group, n_cols = cls._scenario_configs[name]
return MockDataFactory.make_grouped_batches(
num_groups=num_groups,
num_rows=rows_per_group,
Expand All @@ -1742,6 +1714,7 @@ def _build_scenario(name):
batch_size=rows_per_group,
)

_eval_type = PythonEvalType.SQL_WINDOW_AGG_ARROW_UDF
_udfs = {
"sum_udf": _window_agg_arrow_sum,
"mean_multi_udf": _window_agg_arrow_mean_multi,
Expand All @@ -1765,7 +1738,7 @@ def write_udf(b):
MockProtocolWriter.write_udf_payload(udf_func, return_type, arg_offsets, b)

MockProtocolWriter.write_worker_input(
PythonEvalType.SQL_WINDOW_AGG_ARROW_UDF,
self._eval_type,
write_udf,
lambda b: MockProtocolWriter.write_grouped_data_payload(groups, buf=b),
buf,
Expand All @@ -1785,8 +1758,15 @@ class WindowAggArrowUDFPeakmemBench(_WindowAggArrowBenchMixin, _PeakmemBenchBase
# UDF receives ``pd.Series`` columns for the entire window partition, returns scalar.


class _WindowAggPandasBenchMixin:
"""Provides _write_scenario for SQL_WINDOW_AGG_PANDAS_UDF."""
class _WindowAggPandasBenchMixin(_WindowAggArrowBenchMixin):
"""Provides _write_scenario for SQL_WINDOW_AGG_PANDAS_UDF.

Inherits ``_build_scenario`` and ``_write_scenario`` from the Arrow
sibling; only the eval type and the UDFs differ. ``_scenario_configs``
is intentionally identical to the Arrow variant for apples-to-apples
comparison (the aggregations are cheap enough that pandas conversion
is not the bottleneck here).
"""

def _window_agg_pandas_sum(col):
"""Sum a single Pandas Series."""
Expand All @@ -1796,57 +1776,14 @@ def _window_agg_pandas_mean_multi(col0, col1):
"""Mean of two Pandas Series combined."""
return (col0.mean() or 0) + (col1.mean() or 0)

_scenario_configs = {
"few_groups_sm": (50, 5_000, 5),
"few_groups_lg": (50, 50_000, 5),
"many_groups_sm": (2_000, 500, 5),
"many_groups_lg": (500, 10_000, 5),
"wide_cols": (200, 5_000, 20),
}

@staticmethod
def _build_scenario(name):
"""Build a single scenario by name."""
np.random.seed(42)
num_groups, rows_per_group, n_cols = _WindowAggPandasBenchMixin._scenario_configs[name]
return MockDataFactory.make_grouped_batches(
num_groups=num_groups,
num_rows=rows_per_group,
num_cols=n_cols,
spark_type_pool=MockDataFactory.NUMERIC_TYPES,
batch_size=rows_per_group,
)

_eval_type = PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF
_udfs = {
"sum_udf": _window_agg_pandas_sum,
"mean_multi_udf": _window_agg_pandas_mean_multi,
}
params = [list(_scenario_configs), list(_udfs)]
params = [list(_WindowAggArrowBenchMixin._scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]

def _write_scenario(self, scenario, udf_name, buf):
groups, _schema = self._build_scenario(scenario)
udf_func = self._udfs[udf_name]

# sum_udf uses 1 arg, mean_multi_udf uses 2 args
if "multi" in udf_name:
arg_offsets = [0, 1]
else:
arg_offsets = [0]

return_type = DoubleType()

def write_udf(b):
MockProtocolWriter.write_udf_payload(udf_func, return_type, arg_offsets, b)

MockProtocolWriter.write_worker_input(
PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF,
write_udf,
lambda b: MockProtocolWriter.write_grouped_data_payload(groups, buf=b),
buf,
runner_conf={"window_bound_types": "unbounded"},
)


class WindowAggPandasUDFTimeBench(_WindowAggPandasBenchMixin, _TimeBenchBase):
pass
Expand Down