[SPARK-56312][PYTHON] Refactor SQL_COGROUPED_MAP_ARROW_UDF#55377
[SPARK-56312][PYTHON] Refactor SQL_COGROUPED_MAP_ARROW_UDF#55377Yicong-Huang wants to merge 2 commits intoapache:masterfrom
Conversation
793e8cf to
0e5992c
Compare
9ef1a2b to
43b90ad
Compare
…-contained in read_udfs
43b90ad to
4b101f8
Compare
zhengruifeng
left a comment
There was a problem hiding this comment.
Same SPARK-55388 effort as #55495, this time migrating SQL_COGROUPED_MAP_ARROW_UDF. Drops wrap_cogrouped_map_arrow_udf and the bottom-of-read_udfs mapper case; switches the serializer from CogroupArrowUDFSerializer (which handled struct wrap + by-name reorder) to ArrowStreamCoGroupSerializer(write_start_stream=True); inlines select_columns for keys/values, UDF invocation, strict result validation, by-name reorder, and wrap_struct inside the new cogrouped_func. A small ArrowBatchTransformer.select_columns helper is added in conversion.py, and verify_result_type is factored out of verify_arrow_table/verify_arrow_batch.
The fix commit (444d6fb) is a worthwhile correction: it walks back from a permissive verify_result_type(result, pa.Table) + enforce_schema (which silently coerces types) to the original strict verify_arrow_table with expected_cols_and_types. The "no silent coercion" comment captures the intent well.
Unlike #55495, this PR does not introduce a prefers_large_types behavior change — the path was Arrow-in/Arrow-out before, output types pass through from the user UDF, and the by-name reorder is type-agnostic.
Architecture is clean. Two minor items below.
| def cogrouped_func( | ||
| split_index: int, | ||
| data: Iterator[Tuple[list[pa.RecordBatch], list[pa.RecordBatch]]], | ||
| ) -> Iterator[pa.RecordBatch]: |
There was a problem hiding this comment.
Suggest adding a docstring to match the peer pattern. The Arrow analogue at worker.py:2802 has """Apply groupBy Arrow UDF (non-iterator variant).""", and the new grouped_func in #55495 has a longer one. Without it cogrouped_func reads a bit terse compared to its peers.
| def cogrouped_func( | |
| split_index: int, | |
| data: Iterator[Tuple[list[pa.RecordBatch], list[pa.RecordBatch]]], | |
| ) -> Iterator[pa.RecordBatch]: | |
| def cogrouped_func( | |
| split_index: int, | |
| data: Iterator[Tuple[list[pa.RecordBatch], list[pa.RecordBatch]]], | |
| ) -> Iterator[pa.RecordBatch]: | |
| """Apply cogroupBy Arrow UDF.""" |
| ArrowStreamPandasUDTFSerializer, | ||
| GroupPandasUDFSerializer, | ||
| CogroupArrowUDFSerializer, | ||
| ArrowStreamCoGroupSerializer, |
There was a problem hiding this comment.
After this PR lands, CogroupArrowUDFSerializer (python/pyspark/sql/pandas/serializers.py:704) loses its only user. Its parent ArrowStreamGroupUDFSerializer (line 301) was already orphaned by SPARK-55608 — its only remaining subclass was CogroupArrowUDFSerializer. After this PR, both classes are unreachable: no imports in worker.py, no other subclasses, no public re-exports in __init__.py. Suggest removing both in this PR (or as a small follow-up) so the dead code doesn't accumulate across the SPARK-55388 series.
What changes were proposed in this pull request?
Refactor
SQL_COGROUPED_MAP_ARROW_UDFto be self-contained inread_udfs().Why are the changes needed?
Part of SPARK-55388 (Refactor PythonEvalType processing logic). Making each eval type self-contained in
read_udfs()improves readability and makes it easier to reason about the data flow for each eval type independently.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests. No behavior change.
ASV benchmark comparison (
CogroupedMapArrowUDFTimeBench, average of 5 runs):few_groups_lg/identity_udf+12.5% is a benchmark ordering artifact -- when run in isolation (54.25 -> 54.02 ms, -0.4%), no regression is observed. The effect comes from prior scenarios polluting the Python process memory state, which does not occur in production where each Spark task runs in a fresh Python worker. 17 of 18 scenarios show improvement or no change.Was this patch authored or co-authored using generative AI tooling?
No.