From 595cf1efd082a6479589b6e0109c03e636d5efd3 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 23 Apr 2026 00:28:31 +0000 Subject: [PATCH 1/4] refactor: self-contain SQL_GROUPED_MAP_PANDAS_UDF in read_udfs --- python/pyspark/worker.py | 130 +++++++++++++++++++-------------------- 1 file changed, 64 insertions(+), 66 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 95a7ccdc4f8dc..a4af1ac0133d3 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -652,33 +652,6 @@ def verify_arrow_batch(batch, assign_cols_by_name, expected_cols_and_types): verify_arrow_result(batch, assign_cols_by_name, expected_cols_and_types) -def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf): - def wrapped(key_series, value_series): - import pandas as pd - - value_df = pd.concat(value_series, axis=1) - - if len(argspec.args) == 1: - result = f(value_df) - elif len(argspec.args) == 2: - # Extract key from pandas Series, preserving numpy types - key = tuple(s.iloc[0] for s in key_series) - result = f(key, value_df) - - verify_pandas_result( - result, return_type, runner_conf.assign_cols_by_name, truncate_return_schema=False - ) - - yield result - - def flatten_wrapper(k, v): - # Return Iterator[[(df, spark_type)]] directly - for df in wrapped(k, v): - yield [(df, return_type)] - - return flatten_wrapper - - def wrap_grouped_map_pandas_iter_udf(f, return_type, argspec, runner_conf): def wrapped(key_series, value_batches): import pandas as pd @@ -1139,8 +1112,8 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index): elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF: return func, None, None, None elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: - argspec = inspect.getfullargspec(chained_func) # signature was lost when wrapping it - return args_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec, runner_conf) + num_udf_args = len(inspect.getfullargspec(chained_func).args) + return func, args_offsets, return_type, num_udf_args elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF: argspec = inspect.getfullargspec(chained_func) # signature was lost when wrapping it return args_offsets, wrap_grouped_map_pandas_iter_udf( @@ -2393,6 +2366,7 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, eval_conf): ): # NOTE: if timezone is set here, that implies respectSessionTimeZone is True if eval_type in ( + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF, PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF, PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF, @@ -2413,10 +2387,7 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, eval_conf): prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype, int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled, ) - elif ( - eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF - or eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF - ): + elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF: ser = GroupPandasUDFSerializer( timezone=runner_conf.timezone, safecheck=runner_conf.safecheck, @@ -2943,6 +2914,65 @@ def grouped_func( # profiling is not supported for UDF return grouped_func, None, ser, ser + if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: + import pyarrow as pa + import pandas as pd + + assert num_udfs == 1, "One GROUPED_MAP_PANDAS UDF expected here." + grouped_udf, arg_offsets, return_type, num_udf_args = udfs[0] + parsed_offsets = extract_key_value_indexes(arg_offsets) + assert len(parsed_offsets) == 1, "Expected one pair of offsets for GROUPED_MAP_PANDAS UDF." + + key_offsets = parsed_offsets[0][0] + value_offsets = parsed_offsets[0][1] + output_schema = StructType([StructField("_0", return_type)]) + + def grouped_func( + split_index: int, + data: Iterator[Iterator[pa.RecordBatch]], + ) -> Iterator[pa.RecordBatch]: + """Apply groupBy Pandas UDF (non-iterator variant).""" + for group in data: + # Collect all batches and merge at Arrow level for one-shot conversion + all_batches = list(group) + if all_batches: + table = pa.Table.from_batches(all_batches).combine_chunks() + else: + table = pa.table({}) + all_series = ArrowBatchTransformer.to_pandas( + table, + timezone=runner_conf.timezone, + prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype, + ) + value_df = pd.concat([all_series[o] for o in value_offsets], axis=1) + + if num_udf_args == 1: + result = grouped_udf(value_df) + else: + key = tuple(all_series[o].iloc[0] for o in key_offsets) + result = grouped_udf(key, value_df) + + verify_pandas_result( + result, + return_type, + runner_conf.assign_cols_by_name, + truncate_return_schema=False, + ) + + yield PandasToArrowConversion.convert( + [result], + output_schema, + timezone=runner_conf.timezone, + safecheck=runner_conf.safecheck, + arrow_cast=True, + prefers_large_types=runner_conf.use_large_var_types, + assign_cols_by_name=runner_conf.assign_cols_by_name, + int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled, + ) + + # profiling is not supported for UDF + return grouped_func, None, ser, ser + if ( eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF and not runner_conf.use_legacy_pandas_udf_conversion @@ -3187,39 +3217,7 @@ def map_batch(batch): # profiling is not supported for UDF return func, None, ser, ser - if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: - import pyarrow as pa - - # We assume there is only one UDF here because grouped map doesn't - # support combining multiple UDFs. - assert num_udfs == 1 - - # See FlatMapGroupsInPandasExec for how arg_offsets are used to - # distinguish between grouping attributes and data attributes - arg_offsets, f = udfs[0] - parsed_offsets = extract_key_value_indexes(arg_offsets) - - key_offsets = parsed_offsets[0][0] - value_offsets = parsed_offsets[0][1] - - def mapper(batch_iter): - # Collect all Arrow batches and merge at Arrow level - all_batches = list(batch_iter) - if all_batches: - table = pa.Table.from_batches(all_batches).combine_chunks() - else: - table = pa.table({}) - # Convert to pandas once for the entire group - all_series = ArrowBatchTransformer.to_pandas( - table, - timezone=ser._timezone, - prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype, - ) - key_series = [all_series[o] for o in key_offsets] - value_series = [all_series[o] for o in value_offsets] - yield from f(key_series, value_series) - - elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF: + if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF: import pyarrow as pa # We assume there is only one UDF here because grouped map doesn't From 41dc496616bb99c7e325e6d46d505f34aad2363c Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Fri, 24 Apr 2026 21:03:19 +0000 Subject: [PATCH 2/4] refactor: scope grouped map pandas per-group work to bound peakmem --- python/pyspark/worker.py | 90 ++++++++++++++++++++++++---------------- 1 file changed, 54 insertions(+), 36 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index a4af1ac0133d3..da841b6265ff9 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -2927,48 +2927,66 @@ def grouped_func( value_offsets = parsed_offsets[0][1] output_schema = StructType([StructField("_0", return_type)]) + def _process_group(group: Iterator[pa.RecordBatch]) -> "pa.RecordBatch": + """Process one group's batches into a single output RecordBatch. + + Isolating the per-group work in its own frame keeps peakmem + bounded across many groups: once the function returns, every + intermediate (all_batches, table, all_series, value_df) is + released before the next group is pulled in. Without this, + keeping the loop inline in the generator lets those locals + sit on the generator frame across iterations and the working + set grows unbounded on wide-column, large-group inputs. + """ + all_batches = list(group) + if all_batches: + table = pa.Table.from_batches(all_batches).combine_chunks() + else: + table = pa.table({}) + all_series = ArrowBatchTransformer.to_pandas( + table, + timezone=runner_conf.timezone, + prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype, + ) + value_df = pd.concat([all_series[o] for o in value_offsets], axis=1) + + if num_udf_args == 1: + result = grouped_udf(value_df) + else: + key = tuple(all_series[o].iloc[0] for o in key_offsets) + result = grouped_udf(key, value_df) + + # UDFs that build a fresh result (e.g. sort_values) leave + # value_df unused past this point; drop it and its upstream + # so the pandas-to-Arrow conversion doesn't have to work + # alongside the original input DataFrame. + del all_batches, table, all_series, value_df + + verify_pandas_result( + result, + return_type, + runner_conf.assign_cols_by_name, + truncate_return_schema=False, + ) + + return PandasToArrowConversion.convert( + [result], + output_schema, + timezone=runner_conf.timezone, + safecheck=runner_conf.safecheck, + arrow_cast=True, + prefers_large_types=runner_conf.use_large_var_types, + assign_cols_by_name=runner_conf.assign_cols_by_name, + int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled, + ) + def grouped_func( split_index: int, data: Iterator[Iterator[pa.RecordBatch]], ) -> Iterator[pa.RecordBatch]: """Apply groupBy Pandas UDF (non-iterator variant).""" for group in data: - # Collect all batches and merge at Arrow level for one-shot conversion - all_batches = list(group) - if all_batches: - table = pa.Table.from_batches(all_batches).combine_chunks() - else: - table = pa.table({}) - all_series = ArrowBatchTransformer.to_pandas( - table, - timezone=runner_conf.timezone, - prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype, - ) - value_df = pd.concat([all_series[o] for o in value_offsets], axis=1) - - if num_udf_args == 1: - result = grouped_udf(value_df) - else: - key = tuple(all_series[o].iloc[0] for o in key_offsets) - result = grouped_udf(key, value_df) - - verify_pandas_result( - result, - return_type, - runner_conf.assign_cols_by_name, - truncate_return_schema=False, - ) - - yield PandasToArrowConversion.convert( - [result], - output_schema, - timezone=runner_conf.timezone, - safecheck=runner_conf.safecheck, - arrow_cast=True, - prefers_large_types=runner_conf.use_large_var_types, - assign_cols_by_name=runner_conf.assign_cols_by_name, - int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled, - ) + yield _process_group(group) # profiling is not supported for UDF return grouped_func, None, ser, ser From 29e79c7334498e7af062b09c47b7c4502c6c30d6 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 28 Apr 2026 22:09:43 +0000 Subject: [PATCH 3/4] refactor: inline grouped map pandas loop with explicit del to bound peakmem --- python/pyspark/worker.py | 104 ++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 55 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index da841b6265ff9..0d85daa0cbaa5 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -2927,66 +2927,60 @@ def grouped_func( value_offsets = parsed_offsets[0][1] output_schema = StructType([StructField("_0", return_type)]) - def _process_group(group: Iterator[pa.RecordBatch]) -> "pa.RecordBatch": - """Process one group's batches into a single output RecordBatch. - - Isolating the per-group work in its own frame keeps peakmem - bounded across many groups: once the function returns, every - intermediate (all_batches, table, all_series, value_df) is - released before the next group is pulled in. Without this, - keeping the loop inline in the generator lets those locals - sit on the generator frame across iterations and the working - set grows unbounded on wide-column, large-group inputs. - """ - all_batches = list(group) - if all_batches: - table = pa.Table.from_batches(all_batches).combine_chunks() - else: - table = pa.table({}) - all_series = ArrowBatchTransformer.to_pandas( - table, - timezone=runner_conf.timezone, - prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype, - ) - value_df = pd.concat([all_series[o] for o in value_offsets], axis=1) - - if num_udf_args == 1: - result = grouped_udf(value_df) - else: - key = tuple(all_series[o].iloc[0] for o in key_offsets) - result = grouped_udf(key, value_df) - - # UDFs that build a fresh result (e.g. sort_values) leave - # value_df unused past this point; drop it and its upstream - # so the pandas-to-Arrow conversion doesn't have to work - # alongside the original input DataFrame. - del all_batches, table, all_series, value_df - - verify_pandas_result( - result, - return_type, - runner_conf.assign_cols_by_name, - truncate_return_schema=False, - ) - - return PandasToArrowConversion.convert( - [result], - output_schema, - timezone=runner_conf.timezone, - safecheck=runner_conf.safecheck, - arrow_cast=True, - prefers_large_types=runner_conf.use_large_var_types, - assign_cols_by_name=runner_conf.assign_cols_by_name, - int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled, - ) - def grouped_func( split_index: int, data: Iterator[Iterator[pa.RecordBatch]], ) -> Iterator[pa.RecordBatch]: - """Apply groupBy Pandas UDF (non-iterator variant).""" + """Apply groupBy Pandas UDF (non-iterator variant). + + The explicit ``del`` calls below keep peakmem bounded across + groups. Without them, generator locals from the previous + iteration stay bound on the frame until each statement in + the next iteration rebinds its slot, so the input-side + DataFrames overlap with the next group's allocations and + the working set grows unbounded on wide-column, large-group + inputs. ``del result`` runs on resume from yield, before + ``data.__next__()`` is asked for the next group. + """ for group in data: - yield _process_group(group) + all_batches = list(group) + if all_batches: + table = pa.Table.from_batches(all_batches).combine_chunks() + else: + table = pa.table({}) + all_series = ArrowBatchTransformer.to_pandas( + table, + timezone=runner_conf.timezone, + prefer_int_ext_dtype=runner_conf.prefer_int_ext_dtype, + ) + value_df = pd.concat([all_series[o] for o in value_offsets], axis=1) + + if num_udf_args == 1: + result = grouped_udf(value_df) + else: + key = tuple(all_series[o].iloc[0] for o in key_offsets) + result = grouped_udf(key, value_df) + + del all_batches, table, all_series, value_df + + verify_pandas_result( + result, + return_type, + runner_conf.assign_cols_by_name, + truncate_return_schema=False, + ) + + yield PandasToArrowConversion.convert( + [result], + output_schema, + timezone=runner_conf.timezone, + safecheck=runner_conf.safecheck, + arrow_cast=True, + prefers_large_types=runner_conf.use_large_var_types, + assign_cols_by_name=runner_conf.assign_cols_by_name, + int_to_decimal_coercion_enabled=runner_conf.int_to_decimal_coercion_enabled, + ) + del result # profiling is not supported for UDF return grouped_func, None, ser, ser From cbe29b60678b96071ce78a45f41d7fc4292d2337 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Wed, 29 Apr 2026 06:49:50 +0000 Subject: [PATCH 4/4] chore: retrigger ci