Skip to content

Drop the per-batch Comet→Spark buffer copy in CometColumnarPythonInput #4383

@andygrove

Description

@andygrove

What is the problem the feature request solves?

PR #4234 streams Comet's columnar batches to the Python worker via CometColumnarPythonInput.writeNextBatchToArrowStream. Per batch, the current path does two copies of every buffer:

  1. Comet's FieldVector → destination IPC root. copyVector walks the source/destination trees in lockstep, allocates destination child vectors sized from the source, and ArrowBuf.setBytes-copies each buffer. The destination root is a child of ArrowUtils.rootAllocator, set up by Spark's PythonArrowInput trait.
  2. Destination IPC root → pipe. VectorUnloader.getRecordBatch walks the root and produces an ArrowRecordBatch; MessageSerializer.serialize writes bytes to the DataOutputStream that feeds the Python worker's stdin.

Copy 2 is structural: Spark's transport to Python is fork + pipe + Arrow IPC, so the bytes must reach the pipe at least once. Copy 1 is droppable — we can build the IPC stream directly from Comet's vectors and skip the intermediate root entirely. After this change the path is at the single-copy floor for JVM-to-Python Arrow transport.

The cross-allocator constraint discussed in #4294 is independent of this: even after copy 1 is dropped, JVM-side zero-copy via TransferPair is blocked because Comet's source FieldVectors are imported from native via Arrow C Data Interface (their buffers route release through FFI), while Spark's destination IPC root is a child of ArrowUtils.rootAllocator. The two reference managers cannot share buffers. The path here doesn't try to: it bypasses the intermediate root entirely.

Describe the potential solution

spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:217-241 (serializeBatches) already does the equivalent walk for the shuffle path: build a VectorSchemaRoot from Comet's FieldVectors, run ArrowStreamWriter.writeBatch against it, write the resulting bytes to a stream. The wire format is the same Arrow IPC the Python worker reads.

For CometColumnarPythonInput, the shape is slightly different because Spark's PythonArrowInput trait owns the ArrowStreamWriter and writes the schema header once via writer.start(). The per-batch contract is "append one RecordBatch to the already-open stream", not "write a full IPC stream". The implementation outline:

  • The trait's destination root is still constructed (its schema is what writer.start() serialises into the stream header). It is never populated.
  • Per batch, build an ArrowRecordBatch directly from Comet's source vectors:
    • Allocate one small validity buffer for the wrapping struct (all-0xff).
    • Collect ArrowFieldNodes and ArrowBufs in the same depth-first order VectorUnloader.appendNodes uses, with Comet's FieldVector buffers in place of the destination root's.
    • Pass the assembled batch to MessageSerializer.serialize.

Net result: per-buffer setBytes and per-child allocateNew go away. The struct validity buffer (a few bytes per batch) is the only remaining JVM-side allocation on the input path.

Additional context

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions