Skip to content

[SPARK-56477][PYTHON] Refactor SQL_GROUPED_MAP_PANDAS_UDF#55495

Closed
Yicong-Huang wants to merge 4 commits intoapache:masterfrom
Yicong-Huang:SPARK-56477
Closed

[SPARK-56477][PYTHON] Refactor SQL_GROUPED_MAP_PANDAS_UDF#55495
Yicong-Huang wants to merge 4 commits intoapache:masterfrom
Yicong-Huang:SPARK-56477

Conversation

@Yicong-Huang
Copy link
Copy Markdown
Contributor

@Yicong-Huang Yicong-Huang commented Apr 23, 2026

What changes were proposed in this pull request?

Refactor SQL_GROUPED_MAP_PANDAS_UDF to be self-contained in read_udfs().

Why are the changes needed?

Part of SPARK-55388 (Refactor PythonEvalType processing logic). Making each eval type self-contained in read_udfs() improves readability and makes it easier to reason about the data flow for each eval type independently.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests. No behavior change.

ASV benchmark (GroupedMapPandasUDFTimeBench):

master: 174fc6014f  vs  PR: 41dc496616

Time (ms, lower = better)
scenario          udf                   master        PR        Δ
-----------------------------------------------------------------
sm_grp_few_col    identity_udf           424.0     423.3   -0.17%
sm_grp_few_col    sort_udf               481.7     473.5   -1.70%
sm_grp_few_col    key_identity_udf       427.6     384.3  -10.12%
sm_grp_many_col   identity_udf           332.6     328.6   -1.21%
sm_grp_many_col   sort_udf               343.4     341.9   -0.43%
sm_grp_many_col   key_identity_udf       330.5     324.6   -1.77%
lg_grp_few_col    identity_udf           242.2     236.1   -2.52%
lg_grp_few_col    sort_udf               359.7     357.6   -0.57%
lg_grp_few_col    key_identity_udf       212.3     216.1+    1.77%
lg_grp_many_col   identity_udf           492.1     517.9+    5.24%
lg_grp_many_col   sort_udf               598.7     613.2+    2.42%
lg_grp_many_col   key_identity_udf       479.2     488.7+    1.97%
mixed_types       identity_udf           422.9     440.2+    4.08%
mixed_types       sort_udf               449.4     456.9+    1.65%
mixed_types       key_identity_udf       398.1     383.2   -3.73%
-----------------------------------------------------------------
SUM                                     5994.5    5986.1   -0.14%

Aggregate essentially flat (-0.14%); per-scenario variation within run-to-run noise.

Was this patch authored or co-authored using generative AI tooling?

No.

@Yicong-Huang Yicong-Huang changed the title [SPARK-56477][PYTHON] Refactor SQL_GROUPED_MAP_PANDAS_UDF to be self-contained in read_udfs [WIP][SPARK-56477][PYTHON] Refactor SQL_GROUPED_MAP_PANDAS_UDF to be self-contained in read_udfs Apr 23, 2026
@Yicong-Huang Yicong-Huang marked this pull request as draft April 23, 2026 00:33
@Yicong-Huang Yicong-Huang changed the title [WIP][SPARK-56477][PYTHON] Refactor SQL_GROUPED_MAP_PANDAS_UDF to be self-contained in read_udfs [SPARK-56477][PYTHON] Refactor SQL_GROUPED_MAP_PANDAS_UDF Apr 23, 2026
@Yicong-Huang Yicong-Huang marked this pull request as ready for review April 24, 2026 09:12
Copy link
Copy Markdown
Contributor

@zhengruifeng zhengruifeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing the SPARK-55388 series, this PR migrates SQL_GROUPED_MAP_PANDAS_UDF from the bottom-of-read_udfs mapper fallback to a dedicated self-contained branch, mirroring SPARK-55608's pattern for the Arrow analogue. read_single_udf now returns (func, args_offsets, return_type, num_udf_args), the serializer switches from GroupPandasUDFSerializer to ArrowStreamGroupSerializer, and grouped_func inlines list-batches → combine_chunks → to_pandas → invoke UDF → verify_pandas_result → PandasToArrowConversion.convert. Architecture is clean.

The 2nd commit's choice of inline-with-del over the first commit's _process_group() helper is justified: freeing input-side allocations before the conversion call lowers peak during conversion, which a function-return-based release cannot achieve. The trade-off is maintenance — every new local in this loop has to be remembered in the del list.

A few items below — the most consequential is the prefers_large_types change, which is technically a behavior change despite the PR description's "No".

Comment thread python/pyspark/worker.py
timezone=runner_conf.timezone,
safecheck=runner_conf.safecheck,
arrow_cast=True,
prefers_large_types=runner_conf.use_large_var_types,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OLD output path went through GroupPandasUDFSerializer.__init__, which omits prefers_large_types when calling super().__init__() and so defaulted to False. The new code respects spark.sql.execution.arrow.useLargeVarTypes. Since ArrowUtils.fromArrowSchema maps both Utf8 and LargeUtf8 to StringType (sql/api/.../ArrowUtils.scala:82,84), this is wire-format-only and user-invisible at the Spark type level — but the PR description's "No" to user-facing change is no longer strictly correct. Either:

  • Note in the PR description that this aligns with the Arrow analogue (SPARK-55608) and is intentional, or
  • Pass prefers_large_types=False here to preserve the exact pre-PR wire format.

Note also the resulting divergence with SQL_GROUPED_MAP_PANDAS_ITER_UDF, which still uses GroupPandasUDFSerializer with the hardcoded False default (worker.py:2391-2397). Once the iter variant migrates as part of SPARK-55388, this divergence resolves.

Comment thread python/pyspark/worker.py
Comment on lines +2934 to +2944
"""Apply groupBy Pandas UDF (non-iterator variant).

The explicit ``del`` calls below keep peakmem bounded across
groups. Without them, generator locals from the previous
iteration stay bound on the frame until each statement in
the next iteration rebinds its slot, so the input-side
DataFrames overlap with the next group's allocations and
the working set grows unbounded on wide-column, large-group
inputs. ``del result`` runs on resume from yield, before
``data.__next__()`` is asked for the next group.
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The peakmem rationale is plausible (input-side del before the convert call lowers peak vs. a helper-function approach), but the PR description benchmarks are wall-clock only. Could you add a peakmem comparison (e.g., tracemalloc.get_traced_memory() peaks or ASV's peakmem_* benchmarks) for the wide-column / large-group scenarios this docstring describes? That would close the loop on why the inline-with-del form was preferred over the simpler helper-function variant from the first commit, and protect against future edits inadvertently dropping a del and regressing peakmem.

Comment thread python/pyspark/worker.py
or eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF
):
elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF:
ser = GroupPandasUDFSerializer(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads up — after this PR, GroupPandasUDFSerializer is only used by SQL_GROUPED_MAP_PANDAS_ITER_UDF. The class comment at python/pyspark/sql/pandas/serializers.py:657 (# Serializer for SQL_GROUPED_MAP_PANDAS_UDF, SQL_GROUPED_MAP_PANDAS_ITER_UDF) is now stale. Worth updating in this PR or noting as a follow-up.

@zhengruifeng
Copy link
Copy Markdown
Contributor

merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants