Skip to content

[SPARK-56642][SQL] Add pipelined JVM-Python UDF data transfer#55552

Open
viirya wants to merge 32 commits intoapache:masterfrom
viirya:pipelined-python-udf
Open

[SPARK-56642][SQL] Add pipelined JVM-Python UDF data transfer#55552
viirya wants to merge 32 commits intoapache:masterfrom
viirya:pipelined-python-udf

Conversation

@viirya
Copy link
Copy Markdown
Member

@viirya viirya commented Apr 26, 2026

What changes were proposed in this pull request?

Add an opt-in pipelined execution mode for Python UDFs (spark.python.udf.pipelined.enabled). When enabled, a dedicated writer thread serializes input and writes directly to the Python worker socket in blocking mode, while the task main thread reads output from the same socket concurrently. Socket full-duplex allows both directions to overlap, achieving pipeline parallelism.

Key changes:

  • PipelinedWriterRunnable in PythonRunner.scala: serializes input + writes to socket in a pool thread
  • pipelined_process() in worker.py: background reader thread pre-fetches input batches; lazy iterators from grouped/aggregate serializers are eagerly materialized to avoid cross-thread socket conflicts
  • InMemoryRowQueue gains a lockFree parameter: when true (pipelined mode only), synchronized is skipped since blocking socket I/O already provides happens-before guarantees
  • Proper propagation of TaskContext and InputFileBlockHolder to the writer pool thread
  • Socket idle timeout handling via SO_TIMEOUT + SocketTimeoutException wrapper

Benchmark: python/pyspark/sql/tests/pandas/bench_pipelined_udf.py
Environment: local[*] (16 cores), 5 iterations, 2 warmup

Scenario Sync (ms) Pipelined (ms) Speedup
Light UDF (1M rows) 100 90 1.11x
CPU-bound UDF (1M rows) 143 146 0.98x
Heavy UDF (10ms sleep/batch) 1180 1176 1.00x
Large data (5M rows) 338 300 1.13x
Multi-UDF (3 columns, 1M) 123 100 1.24x

ASV Benchmark: python/benchmarks/bench_pipelined_udf.py
ScalarUDFTimeBench (scalar UDF x + 1)

pipelined 100K rows 1M rows
False 122±0ms 200±0ms
True 86.5±0ms 164±0ms

LargeDataUDFTimeBench (5M rows, scalar UDF x + 1)

pipelined time peak memory
False 526±0ms 116M
True 496±0ms 110M

MultiUDFTimeBench (3 UDF columns)

pipelined 100K rows 1M rows
False 157±0ms 305±0ms
True 123±0ms 269±0ms

Memory usage is the same for both modes (~110M).

Why are the changes needed?

The current single-threaded NIO selector model serializes input and reads output in the same thread alternately. For multi-column UDFs or compute-heavy UDFs, this leaves the JVM idle while waiting for Python output. Pipelined mode overlaps serialization with output reading, improving throughput by up to 22% for multi-UDF queries and 10% for large data workloads.

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • New dedicated test suite test_pipelined_udf.py with 12 tests running with spark.python.udf.pipelined.enabled=true via SparkConf, covering scalar UDF, string UDF, multi-column UDF, chained UDF, null handling, UDAF, empty partitions, multiple partitions, large data, batched UDF, and exception propagation
  • All PySpark UDF test suites pass with pipelined=true as the compile-time default: test_pandas_udf_scalar, test_pandas_udf_grouped_agg, test_pandas_udf_window, test_udf, test_udtf
  • JVM test suites pass: PythonUDFSuite, BatchEvalPythonExecSuite, ArrowColumnarPythonUDFSuite
  • Benchmark (bench_pipelined_udf.py) across 5 scenarios confirms no regression vs sync mode and up to 22% speedup for multi-UDF workloads

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

@dongjoon-hyun dongjoon-hyun marked this pull request as draft April 27, 2026 17:22
@dongjoon-hyun
Copy link
Copy Markdown
Member

Please get a new JIRA ID~ 😄

@viirya viirya changed the title [SPARK-XXXXX][SQL] Add pipelined JVM-Python UDF data transfer [SPARK-56642][SQL] Add pipelined JVM-Python UDF data transfer Apr 27, 2026
@dongjoon-hyun dongjoon-hyun marked this pull request as ready for review April 27, 2026 19:38
@viirya viirya force-pushed the pipelined-python-udf branch from 020866b to 51abb53 Compare April 27, 2026 23:01
viirya added 25 commits April 28, 2026 07:09
Add an opt-in pipelined execution mode for Python UDFs that decouples input
serialization from the NIO selector loop. When enabled via
spark.python.udf.pipelined.enabled=true, a dedicated PipelinedWriterThread
serializes input data into DirectByteBufferOutputStream instances and hands
them off to the selector thread via a bounded ArrayBlockingQueue. The selector
thread writes the pre-serialized direct buffers to the Python worker socket
while simultaneously reading output -- enabling pipeline parallelism.

Key design: the writer thread never touches the socket, avoiding the deadlock
class that plagued the old WriterThread design removed in SPARK-44705. Buffer
swapping between writer and selector threads eliminates memory copies.

On the Python side, pipelined_process() in worker.py pre-fetches input batches
in a background reader thread, and ArrowStreamSerializer flushes per-batch in
pipelined mode.

Changes:
- Add spark.python.udf.pipelined.enabled (default false) and
  spark.python.udf.pipelined.queueDepth (default 2) configs
- Add PipelinedWriterThread and PipelinedReaderInputStream in PythonRunner
- Add pipelined_process() with background reader thread in worker.py
- Add per-batch flush in ArrowStreamSerializer for pipelined mode
- Add @volatile to HybridQueue counters for cross-thread visibility
- Add benchmark (bench_pipelined_udf.py) and verification script

Co-authored-by: Claude Code
Replace the queue-based pipelined design with true full-duplex I/O:
the writer thread serializes and writes directly to the socket in
blocking mode, while the reader thread (task main thread) reads from
the same socket. This eliminates all inter-thread queues, locks,
and selector overhead.

TCP sockets are full-duplex -- concurrent read()/write() from
different threads operate on independent OS-level buffers.

Co-authored-by: Claude Code
…ention

Replace Channels.newInputStream with a direct InputStream wrapper that
calls channel.read() without going through ChannelInputStream's
synchronized(blockingLock()) wrapper. This ensures the reader thread
does not contend with the writer thread's channel.write() calls.

Co-authored-by: Claude Code
…for SPSC

Replace synchronized add()/remove() with a lock-free SPSC design using
a volatile writeOffset. In the pipelined Python UDF execution mode,
add() is called from the writer thread while remove() is called from
the task main thread. The previous synchronized blocks caused ~10M
monitor enter/exit operations (5M rows x 2) with expensive memory
fences per operation, totaling 200-500ms overhead.

The volatile writeOffset provides the necessary happens-before
guarantee: all memory writes (row data via Platform.putInt and
Platform.copyMemory) are visible to the reader before it sees the
updated offset. readOffset does not need to be volatile because the
writer never reads it.

This reduces the Large data benchmark regression from 1.78x to 1.34x
and Light UDF from 1.6x to 1.19x.

Co-authored-by: Claude Code
Replace per-task Thread creation with a shared cached thread pool for
the pipelined writer. Also propagate TaskContext to the writer thread
so upstream operators that depend on TaskContext.get() work correctly.

Note: Instrumentation revealed that the remaining performance gap between
sync and pipelined modes is NOT caused by socket I/O (only 6ms for 5M rows),
thread creation overhead, or volatile RowQueue access. The serialization
code itself runs ~2.5x slower in the writer thread (575ms) than in the
sync selector thread (233ms) for reasons still under investigation.

Co-authored-by: Claude Code
async-profiler revealed that @volatile numElementsQueued in HybridQueue
was the #1 CPU hotspot in pipelined mode, consuming 16.28% of total CPU
time (1150ms out of 7066 samples). The volatile write on every row causes
expensive store-buffer flushes and cache-line bouncing between the writer
thread and reader thread cores.

numElementsQueued/numElementsQueuedOnDisk are metrics counters that don't
need cross-thread visibility guarantees -- stale reads are acceptable.

InMemoryRowQueue.writeOffset also doesn't need volatile: memory visibility
is guaranteed by the blocking socket I/O that separates writer and reader.
The reader only calls remove() after Python has processed the corresponding
input, which requires the writer's channel.write() to have completed first,
providing a stronger-than-volatile happens-before guarantee.

Benchmark results after this fix (local[*], 16 cores):

  Light UDF (1M rows):    sync  98ms  pipelined  95ms  (no regression)
  CPU-bound UDF:          sync 142ms  pipelined 151ms  (-6%, noise range)
  Heavy UDF (10ms sleep): sync 1190ms pipelined 1180ms (+1%)
  Large data (5M rows):   sync 335ms  pipelined 342ms  (-2%, no regression)
  Multi-UDF (3 columns):  sync 128ms  pipelined 100ms  (+29% faster)

Co-authored-by: Claude Code
Pipelined mode's background reader thread pre-fetches batches from
deserializer.load_stream() into a queue. This is only safe when
load_stream() yields fully-materialized data (e.g., pa.RecordBatch).

Grouped/aggregate UDF serializers (ArrowStreamAggPandasUDFSerializer,
ArrowStreamGroupSerializer, etc.) yield lazy iterators that still read
from the socket. When the background reader thread advances the outer
generator while the main thread consumes the yielded lazy iterator,
both threads read from the same socket/generator concurrently, causing
"ValueError: generator already executing".

Restrict pipelined mode to eval types with safe serializers:
NON_UDF, SQL_BATCHED_UDF, SQL_ARROW_BATCHED_UDF,
SQL_SCALAR_PANDAS_UDF, SQL_SCALAR_PANDAS_ITER_UDF,
SQL_SCALAR_ARROW_UDF, SQL_SCALAR_ARROW_ITER_UDF.

All other eval types (grouped map, grouped agg, window agg, UDTF, etc.)
fall back to the synchronous process() path.

Co-authored-by: Claude Code
Replace the eval-type whitelist approach with a simpler fix: the
background reader thread now eagerly materializes any lazy iterator
yielded by deserializer.load_stream() before putting it in the queue.

Some serializers (ArrowStreamGroupSerializer, ArrowStreamAggPandasUDFSerializer)
yield lazy iterators from load_stream() that still read from the socket.
If these lazy iterators are consumed by the main thread while the reader
thread continues advancing the outer generator, both threads read from the
same socket concurrently, causing "ValueError: generator already executing".

By checking hasattr(batch, "__next__") and converting to list() in the
reader thread, all socket reads happen exclusively in the reader thread.
The main thread receives fully-materialized data from the queue.

This enables pipelined mode for all UDF eval types (scalar, grouped agg,
window agg, UDTF, etc.) without restrictions.

Tested with pipelined=true as default:
  test_pandas_udf_scalar (incl. variant UDAF tests) - passed
  test_pandas_udf_grouped_agg - passed
  test_pandas_udf_window - passed
  test_udf - passed
  test_udtf - passed

Co-authored-by: Claude Code
Two fixes for pipelined mode regressions found during testing with
pipelined=true as the compile-time default:

1. InputFileBlockHolder propagation: In pipelined mode, upstream scan
   operators call InputFileBlockHolder.set() in the writer pool thread,
   but InputFileBlockHolder uses InheritableThreadLocal which doesn't
   inherit to pool threads. Fix: capture the task thread's value when
   constructing PipelinedWriterRunnable and set it in the pool thread.

2. Idle timeout detection: In sync mode, ReaderInputStream uses
   selector.select(timeout) for idle detection. In pipelined mode with
   blocking socket, there's no selector. Fix: set SO_TIMEOUT on the
   socket and wrap the InputStream to catch SocketTimeoutException,
   kill the worker (if configured), and throw PythonWorkerException
   with the same error message as sync mode.

Verified with pipelined=true as compile-time default (all tests passed):
  test_pandas_udf_scalar - passed
  test_pandas_udf_grouped_agg - passed
  test_pandas_udf_window - passed
  test_udf (incl. input_file_name, kill_on_timeout) - passed
  test_udtf (incl. kill_on_timeout) - passed

Co-authored-by: Claude Code
verify_pipelined_path.py was a local development script used to confirm
the pipelined code path was active. Not a formal test or benchmark.

Co-authored-by: Claude Code
Restore the original synchronized InMemoryRowQueue and HybridQueue to
avoid affecting the sync code path. Add a lockFree parameter that is
only set to true in pipelined mode via EvalPythonEvaluatorFactory.

When lockFree=false (default, sync mode): add()/remove() use
synchronized as before -- no behavior change for existing code.

When lockFree=true (pipelined mode): add()/remove() skip synchronized.
Memory visibility is guaranteed by the blocking socket I/O between
the writer thread and reader thread.

This approach ensures zero impact on the sync path while avoiding the
16% CPU overhead from synchronized monitor enter/exit in pipelined mode
(as identified by async-profiler).

Co-authored-by: Claude Code
Add test_pipelined_udf.py with 12 tests that run with
spark.python.udf.pipelined.enabled=true via SparkConf override.

Tests cover:
- Code path verification (SPARK_PIPELINED_UDF_ACTIVE env var)
- Scalar Arrow UDF (basic, string type, chained, null handling)
- Multiple UDF columns in a single query
- Multiple partitions and empty partitions
- Grouped aggregation UDF (UDAF)
- Large data (500K rows, backpressure)
- Non-Arrow batched UDF (pickle serialization)
- Exception propagation from UDF

Co-authored-by: Claude Code
Add test_pipelined_udf to sparktestsupport/modules.py to fix
CI "dangling Python tests" error.

Co-authored-by: Claude Code
Add .withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE) to both
pipelined config entries to fix SparkConfigBindingPolicySuite CI failure.
These are execution-level configs not applicable to SQL views/UDFs/procedures.

Co-authored-by: Claude Code
Address review feedback: relying on OS socket I/O for JMM happens-before
is not formally guaranteed. Replace with proper SPSC release-acquire
protocol using @volatile writeOffset:

- add() caches writeOffset in a local var to avoid repeated volatile
  reads, writes row data via Platform.putInt/copyMemory, then performs
  a volatile store of the updated offset (release fence).
- remove() performs a volatile load of writeOffset (acquire fence),
  which guarantees visibility of all prior row data writes.

In sync mode (lockFree=false), synchronized already provides the memory
barrier, so the volatile is redundant but harmless (JIT optimizes it
away inside synchronized blocks).

async-profiler showed volatile writeOffset accounts for ~2.4% CPU,
which is acceptable. The major bottleneck was HybridQueue's
@volatile numElementsQueued (16.3%), which is handled separately
by skipping the counter increment in lockFree mode.

Co-authored-by: Claude Code
…hang

Address review feedback: if the writer thread fails (e.g., upstream
operator exception), it sets writer._exception but the reader thread
may still be blocked on socket.read() waiting for Python output. Since
Python never receives complete input, it won't produce output, causing
the task to hang indefinitely (unless idle timeout is enabled).

Fix: call channel.shutdownOutput() on writer failure. This sends EOF
to Python, which terminates and closes its socket end. The reader's
read() then returns -1, and ReaderIterator.handleException checks
writer.exception to propagate the original failure.

This matches the error handling in Writer.open()'s catch block in the
sync path (PythonRunner.scala line 536-544).

Co-authored-by: Claude Code
Address review feedback: pipelined mode's SocketTimeoutException handler
threw PythonWorkerException unconditionally, but sync mode only kills the
worker when killOnIdleTimeout=true and otherwise just logs and retries.

Fix the pipelined InputStream wrapper to match sync mode behavior:
- On timeout: log warning with worker status
- If killOnIdleTimeout=true: kill worker, retry read to get EOF, then
  throw PythonWorkerException with the expected error message
- If killOnIdleTimeout=false: log only, retry read (continue waiting)
- If worker was already killed: log "waiting for termination" on
  subsequent timeouts (matches sync mode's pythonWorkerKilled path)

Co-authored-by: Claude Code
Address review feedback: if the main thread stops consuming from the
queue (e.g., UDF exception), the reader thread blocks forever on
queue.put() since the queue is full. The finally block's sentinel
also cannot be enqueued, leaving the reader thread stuck.

Fix: use a threading.Event as a stop signal. The reader thread uses
put(timeout=1) and checks stop_event periodically. On main thread
exit (normal or exception), the finally block sets stop_event and
drains the queue so the reader can unblock and terminate cleanly.

Co-authored-by: Claude Code
Address review feedback: pipelinedEnabled is read in BasePythonRunner
which is also the base class for PythonRunner (mapPartitions/RDD path).
The NON_UDF path has different serialization and worker lifecycle
assumptions that have not been tested with pipelined execution.

Guard both the JVM-side pipelined path and the SPARK_PIPELINED_UDF
env var (which controls the Python-side pipelined_process) with
evalType != NON_UDF, so only SQL UDF/UDAF/UDTF eval types use
pipelined mode.

Co-authored-by: Claude Code
Address review feedback: Python workers are reused across tasks, but
SO_TIMEOUT persists on the socket. If a previous task set an idle
timeout, the reused socket retains it even when the next task has
idleTimeoutSeconds=0, causing unexpected SocketTimeoutException.

Fix: always call setSoTimeout() explicitly, setting 0 (no timeout)
when idleTimeoutSeconds <= 0 to clear any stale value from a
previous task.

Co-authored-by: Claude Code
Address review feedback: the acquire fence from the volatile read of
writeOffset was only performed inside assert(), which can be elided
by JVM flag -da. Move the volatile read into an unconditional local
val so the memory ordering guarantee does not depend on assertions
being enabled.

Co-authored-by: Claude Code
The doc still described the removed JVM-side queue between writer
thread and NIO selector thread. Update to reflect the actual behavior:
the value controls the Python worker's background reader thread
pre-fetch queue depth.

Co-authored-by: Claude Code
…free mode

Address review feedback: the full-page path writes the -1 end-of-page
marker via Platform.putInt but returns false without a volatile store
to writeOffset. In lock-free mode, the reader may not see the marker
due to missing release fence.

Fix: do a volatile store of writeOffset (value unchanged) after writing
the -1 marker. This provides the release fence so the reader's acquire
load of writeOffset sees the marker when it reads the page data.

Co-authored-by: Claude Code
- bench_pipelined_udf.py: use double quotes for template string
- test_pipelined_udf.py: fix line length and chained method formatting
- worker.py: fix long threading.Thread line

Co-authored-by: Claude Code
@viirya viirya force-pushed the pipelined-python-udf branch from 7f5cb28 to b74efa5 Compare April 28, 2026 14:10
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
case t: Throwable if NonFatal(t) || t.isInstanceOf[Exception] =>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, ClosedByInterruptException is handled here?

Image

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. ClosedByInterruptException is thrown when channel.write() is interrupted (e.g., Future.cancel(true) on task cancellation). It extends AsynchronousCloseException → IOException → Exception, so it was being caught by the generic case t: Throwable if NonFatal(t) || t.isInstanceOf[Exception] handler — which would set writer._exception and try shutdownOutput() on an already-closed channel.

Fixed: added an explicit case _: ClosedByInterruptException that treats it as cancellation (same as InterruptedException), re-sets the interrupt flag, and skips the shutdownOutput() call since the channel is already closed by the JVM.

Comment thread python/pyspark/worker.py
# that still read from infile. Materialize them here so the
# main thread can consume them without touching infile.
if hasattr(batch, "__next__"):
batch = list(batch)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Any chance of OOM due to the memory peak due to this?

Copy link
Copy Markdown
Member Author

@viirya viirya Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list(batch) materialize happens for grouped/aggregate UDF serializers whose load_stream() yields lazy iterators. In sync mode, the same data is already materialized in mapper(batch_iter) via list(batch_iter) — so per-group peak memory is the same.

The difference is that with queueDepth=2 (default), up to 2 additional groups can be buffered in the queue. In the worst case (skewed key with one very large group), this could increase peak memory by ~2x the group size. Users can set spark.python.udf.pipelined.queueDepth=1 to reduce this, or disable pipelined mode entirely for memory-sensitive workloads.

* using freshly created threads.
*/
private[python] lazy val pipelinedWriterThreadPool =
ThreadUtils.newDaemonCachedThreadPool("python-udf-pipelined-writer")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the maximum size of this thread pool, can we have a bounded thread pool?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Changed from unbounded newDaemonCachedThreadPool to bounded newDaemonCachedThreadPool("python-udf-pipelined-writer", maxThreads) where maxThreads = SparkEnv.get.conf.get(EXECUTOR_CORES). Each task uses at most one writer thread, and the number of concurrent tasks is bounded by executor cores, so this is the natural upper bound.

viirya added 2 commits April 28, 2026 15:42
Co-authored-by: Claude Code
1. Handle ClosedByInterruptException explicitly in writer thread.
   When task is cancelled via Future.cancel(true) while writer is
   blocked in channel.write(), JVM throws ClosedByInterruptException
   (not InterruptedException). Handle it as cancellation rather than
   letting it fall through to the generic exception handler.

2. Bound the pipelined writer thread pool by executor cores.
   Each task uses at most one writer thread, so the pool size
   should not exceed the number of concurrent tasks.

Co-authored-by: Claude Code
@gaogaotiantian
Copy link
Copy Markdown
Contributor

Before a more detailed review of the code itself - we do have a more dedicated benchmark framework based on asv now. The code lives in python/benchmarks and is mostly done by @Yicong-Huang . Can we use that framework for this and maybe get some more trustworthy data?

The current benchmark is meaningful of course, but there could be some factors that impact the result, like garbage collection. Also asv provides memory consumption metrics too. It would be nice to write all of our benchmarks with the same framework.

@Yicong-Huang
Copy link
Copy Markdown
Contributor

I was about to mention this as well. can we use ASV benchmark to evaluate it?

@viirya
Copy link
Copy Markdown
Member Author

viirya commented Apr 30, 2026

Thanks for the suggestion! I agree ASV is the right direction for benchmarks.

However, the current ASV benchmarks in python/benchmarks/ (e.g., bench_eval_type.py) work by directly calling worker_main(infile, outfile) with a mock protocol — they bypass the JVM and socket communication entirely. This means they can only measure the Python worker's internal processing time.

The pipelined mode's performance benefit comes from overlapping JVM-side socket writes with Python-side computation across a real socket connection (full-duplex blocking I/O between a JVM writer thread and the task reader thread). To benchmark this end-to-end, we would need the ASV framework to support running a full SparkSession and executing actual Spark queries, which the current setup doesn't do.

Would it make sense to open a separate PR to extend the ASV framework with end-to-end SparkSession-based benchmark support, and then migrate this benchmark? For now, the standalone script (bench_pipelined_udf.py) serves as an ad-hoc verification tool for this PR.

@Yicong-Huang
Copy link
Copy Markdown
Contributor

However, the current ASV benchmarks in python/benchmarks/ (e.g., bench_eval_type.py) work by directly calling worker_main(infile, outfile) with a mock protocol — they bypass the JVM and socket communication entirely. This means they can only measure the Python worker's internal processing time.

Those benchmark are designed to be micro benchmark without JVM: so there is no plan to extend it. But I think we definitely can use ASV to measure PySpark performance directly, end to end. We just need to create one bench file.

Add end-to-end ASV benchmarks that run full Spark queries through a
real SparkSession to measure JVM-Python socket I/O pipeline overlap.

Unlike the microbenchmarks in bench_eval_type.py (which test Python
worker internals in isolation), these benchmarks capture the true
end-to-end performance difference between sync and pipelined modes.

Benchmark classes:
- ScalarUDFTimeBench: scalar Arrow UDF (x + 1) with 100K/1M rows
- MultiUDFTimeBench: 3 UDF columns in a single query
- LargeDataUDFTimeBench: scalar UDF with 5M rows

Each parameterizes over pipelined=true/false for comparison.
SparkSession is created/stopped per setup/teardown since
spark.python.udf.pipelined.enabled is a SparkConf-level config.

Co-authored-by: Claude Code
@viirya
Copy link
Copy Markdown
Member Author

viirya commented Apr 30, 2026

I've updated ASV benchmark.

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make the CI happy?

ruff format checks failed:
--- python/benchmarks/bench_pipelined_udf.py

@viirya
Copy link
Copy Markdown
Member Author

viirya commented Apr 30, 2026

Thank you @dongjoon-hyun . I fixed the format errors.

Co-authored-by: Claude Code
@viirya viirya force-pushed the pipelined-python-udf branch from ed5ab57 to ce3c94e Compare April 30, 2026 19:59
return pd.Series([flag] * len(x))

result = self.spark.range(1).select(check_env(col("id"))).first()[0]
self.assertEqual(result, "1", "pipelined_process() should set SPARK_PIPELINED_UDF_ACTIVE=1")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Brittle: relies on the leaky os.environ set in pipelined_process() (see worker.py:3598). With worker reuse, a stale env can pass this even when the current task didn't use the pipelined path. Prefer a JVM-side metric/accumulator.

* Bounded by executor cores since each task uses at most one writer thread.
*/
private[python] lazy val pipelinedWriterThreadPool = {
val maxThreads = SparkEnv.get.conf.get(EXECUTOR_CORES)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EXECUTOR_CORES defaults to 1 in local[*] mode, so the cached pool queues all writers serially behind one thread. would it be a problem?

new ReaderInputStream(worker, writer, handle,
faultHandlerEnabled, idleTimeoutSeconds, killOnIdleTimeout, context),
bufferSize))
val dataIn = if (usePipelined) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this if (usePipelined) { ... } block is ~100 lines inline. Extracting to a private createPipelinedDataIn(...) would help readability.

Comment thread python/pyspark/worker.py Outdated
This allows input deserialization to overlap with UDF computation.
"""
# Mark that pipelined mode is active so UDFs can verify the code path.
os.environ["SPARK_PIPELINED_UDF_ACTIVE"] = "1"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Env var leaks across tasks under spark.python.worker.reuse=true: this is set but never cleared. A worker first used for pipelined keeps =1 for subsequent tasks, defeating the test_pipelined_mode_is_active check. Clear in finally, or use a Python module-level flag instead of os.environ.

Comment thread python/pyspark/worker.py
# that still read from infile. Materialize them here so the
# main thread can consume them without touching infile.
if hasattr(batch, "__next__"):
batch = list(batch)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eagerly materializing lazy iterators (grouped/aggregate UDFs) and queueing up to queue_depth of them risks OOM on large groups where sync mode wouldn't. I wonder how come this was not reflected on the ASV peakmem bench...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The eager materialization in the reader thread produces the same data volume that sync mode already materializes — in sync mode, mapper(batch_iter) calls list(batch_iter) on the same lazy iterator. The only additional memory comes from the pre-fetch queue: with the default queueDepth=2, up to 2 extra groups can be buffered ahead. For the ASV peakmem benchmarks, both modes show ~110M because the test data is small enough that the queue overhead is negligible. A workload with very large skewed groups could see higher peak memory; users can set spark.python.udf.pipelined.queueDepth=1 to reduce this to at most 1 extra group, or disable pipelined mode entirely.


def dump_stream(self, iterator: Iterable["pa.RecordBatch"], stream: IO[bytes]) -> None:
"""Optionally prepend START_ARROW_STREAM, then write batches."""
import os
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import os at function scope is unusual. the rest of this file imports at module top.

# In pipelined mode, flush after each batch so the JVM can read output
# while still sending input, rather than buffering all output.
if pipelined:
stream.flush()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gating per-batch flush on a global env var means any sync-mode process where SPARK_PIPELINED_UDF=1 happens to be set pays a syscall per batch. Consider passing as a serializer ctor arg so this can't accidentally affect sync mode.

Comment thread python/pyspark/worker.py
if hasattr(out_iter, "close"):
out_iter.close()

def pipelined_process():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if it is possible to extract common code so that we don't duplicate the majority of the processing logic?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm I saw you already done it in the new version

Comment thread python/pyspark/worker.py
Comment on lines +3620 to +3627
while not stop_event.is_set():
try:
input_queue.put(batch, timeout=1)
break
except queue.Full:
continue
if stop_event.is_set():
return
Copy link
Copy Markdown
Contributor

@Yicong-Huang Yicong-Huang Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when a UDF takes a long time to process, this loop will try to put a batch into the input queue every second. I am not a fan of this design. why can't we let the consumer thread notify the producer thread that you can put now?

In the case of UDF exception, which happens on the consumer thread, it can alert the producer thread which waits on the queue or event, and shutdown the producer thread.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python's queue.Queue doesn't support "put and wait on either queue space or an external event" in a single call — it only offers put(timeout). To avoid a busy loop while still responding to the stop event, the reader uses put(timeout=0.1) and checks stop_event between attempts. On the normal path (queue not full), put succeeds immediately with no polling. The 0.1s timeout only kicks in when the queue is full and the consumer has stopped — at which point stop_event will be set and the reader exits within 0.1s. An alternative would be using threading.Condition with a custom bounded buffer, but that adds complexity for the same result.

viirya added 2 commits May 1, 2026 00:24
1. Remove SPARK_PIPELINED_UDF_ACTIVE env var (leaked across worker reuse).
   Test now checks SPARK_PIPELINED_UDF (set by JVM, not by worker).

2. Per-batch flush: replace env var check with serializer constructor arg
   (flush_per_batch). Set on the serializer instance before calling
   pipelined_process, so sync mode is never accidentally affected.

3. Revert bounded thread pool to unbounded cached pool. EXECUTOR_CORES
   defaults to 1 in local[*] mode, which would serialize all writers.
   The pool naturally bounds itself by the number of concurrent tasks.

4. Extract pipelined setup to private createPipelinedDataIn() method
   for readability (~100 lines out of compute()).

5. Reduce reader thread put timeout from 1s to 0.1s for faster response
   to stop_event.

Co-authored-by: Claude Code
Co-authored-by: Claude Code
@viirya viirya force-pushed the pipelined-python-udf branch from 70ff0a3 to a52deb0 Compare May 1, 2026 17:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants