Skip to content

[SPARK-56648][PYTHON] Refactor SQL_SCALAR_PANDAS_UDF#55613

Open
Yicong-Huang wants to merge 2 commits intoapache:masterfrom
Yicong-Huang:SPARK-56648/refactor/scalar-pandas-udf
Open

[SPARK-56648][PYTHON] Refactor SQL_SCALAR_PANDAS_UDF#55613
Yicong-Huang wants to merge 2 commits intoapache:masterfrom
Yicong-Huang:SPARK-56648/refactor/scalar-pandas-udf

Conversation

@Yicong-Huang
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Refactor SQL_SCALAR_PANDAS_UDF to use ArrowStreamSerializer as pure I/O, moving Arrow-to-Pandas and Pandas-to-Arrow conversion logic from ArrowStreamPandasUDFSerializer into read_udfs() in worker.py.

Specifically:

  • Remove the dedicated wrap_scalar_pandas_udf wrapper.
  • Route SQL_SCALAR_PANDAS_UDF through ArrowStreamSerializer(write_start_stream=True).
  • In read_udfs(), add a self-contained handler that:
    • Converts each Arrow RecordBatch to pandas Series via ArrowBatchTransformer.to_pandas() (with struct_in_pandas=\"dict\", df_for_struct=True, ndarray_as_list=False).
    • Invokes each UDF column-wise on the pandas inputs and validates the return type (must be array-like) and row count (must match input).
    • Enforces the existing rule that struct return types must be pandas.DataFrame.
    • Converts results back to an Arrow RecordBatch via PandasToArrowConversion.convert().

Why are the changes needed?

Part of SPARK-55388. This consolidates UDF dispatch, verification, and conversion logic for SQL_SCALAR_PANDAS_UDF into a single inline handler in read_udfs(), mirroring the pattern already applied to SQL_SCALAR_ARROW_UDF (SPARK-55390) and SQL_ARROW_BATCHED_UDF (SPARK-55902). The dedicated ArrowStreamPandasUDFSerializer is no longer used by the scalar pandas path, reducing indirection and bringing the eval-type processing paths closer to a uniform structure.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests. No behavior change.

pyspark.sql.tests.pandas.test_pandas_udf_scalar (81 tests) plus test_pandas_udf, test_pandas_udf_typehints, test_pandas_udf_window, and test_arrow_python_udf all pass.

ASV benchmark comparison via COLUMNS=120 asv run --python=same --bench \"ScalarPandasUDF\" --attribute \"repeat=(3,5,5.0)\":

ScalarPandasUDFTimeBench - Before (master):

=================== ============== ============ ===============
--                                      udf
------------------- -------------------------------------------
      scenario       identity_udf    sort_udf    nullcheck_udf
=================== ============== ============ ===============
  sm_batch_few_col     366+-2ms       509+-1ms      445+-2ms
 sm_batch_many_col     283+-2ms       305+-2ms      293+-2ms
  lg_batch_few_col    1.08+-0.01s   1.34+-0.01s   1.16+-0.01s
 lg_batch_many_col     1.22+-0s     1.28+-0.01s   1.27+-0.01s
     pure_ints        190+-0.8ms     268+-0.4ms     217+-2ms
    pure_floats       187+-0.4ms     280+-0.3ms     218+-1ms
    pure_strings     1.16+-0.02s    1.70+-0.02s   1.11+-0.01s
       pure_ts         348+-3ms       441+-10ms     370+-5ms
    mixed_types        650+-2ms       745+-7ms      692+-7ms
=================== ============== ============ ===============

ScalarPandasUDFTimeBench - After (this PR):

=================== ============== ============ ===============
--                                      udf
------------------- -------------------------------------------
      scenario       identity_udf    sort_udf    nullcheck_udf
=================== ============== ============ ===============
  sm_batch_few_col     381+-10ms     503+-2ms      439+-2ms
 sm_batch_many_col     289+-3ms      302+-1ms      291+-2ms
  lg_batch_few_col    1.09+-0s     1.33+-0.02s   1.17+-0.01s
 lg_batch_many_col   1.24+-0.01s   1.29+-0.02s   1.26+-0.02s
     pure_ints       190+-0.4ms     270+-0.9ms    220+-0.4ms
    pure_floats      188+-0.8ms     277+-0.6ms     218+-1ms
    pure_strings    1.13+-0.01s   1.68+-0.01s   1.11+-0.01s
       pure_ts         360+-10ms     419+-4ms      369+-1ms
    mixed_types        661+-8ms      748+-10ms    697+-20ms
=================== ============== ============ ===============

ScalarPandasUDFPeakmemBench - Before (master):

=================== ============== ========== ===============
--                                     udf
------------------- -----------------------------------------
      scenario       identity_udf   sort_udf   nullcheck_udf
=================== ============== ========== ===============
  sm_batch_few_col       481M         482M         479M
 sm_batch_many_col       481M         482M         481M
  lg_batch_few_col       621M         619M         602M
 lg_batch_many_col       627M         629M         628M
     pure_ints           546M         547M         544M
    pure_floats          543M         545M         544M
    pure_strings         564M         565M         561M
       pure_ts           546M         547M         547M
    mixed_types          525M         526M         525M
=================== ============== ========== ===============

ScalarPandasUDFPeakmemBench - After (this PR):

=================== ============== ========== ===============
--                                     udf
------------------- -----------------------------------------
      scenario       identity_udf   sort_udf   nullcheck_udf
=================== ============== ========== ===============
  sm_batch_few_col       481M         482M         479M
 sm_batch_many_col       481M         482M         481M
  lg_batch_few_col       621M         618M         602M
 lg_batch_many_col       627M         628M         627M
     pure_ints           546M         547M         544M
    pure_floats          543M         544M         544M
    pure_strings         563M         564M         561M
       pure_ts           546M         547M         546M
    mixed_types          525M         526M         525M
=================== ============== ========== ===============

Summary: Latency and peak memory are essentially neutral (within run-to-run noise). The refactor reorganizes logic without changing data layout or buffering.

Was this patch authored or co-authored using generative AI tooling?

No

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant