Skip to content

feat(dataframe): add executeStream(allocator) for incremental batch iteration #50

@LantaoJin

Description

@LantaoJin

Is your feature request related to a problem or challenge?

DataFrame.collect(allocator) (core/src/main/java/org/apache/datafusion/DataFrame.java) is the only way to retrieve query results today. Internally, the native side calls df.collect().await (native/src/lib.rs) which materializes the entire result set into a Vec<RecordBatch> on the Rust heap before the first batch crosses the FFI boundary into Java.

For TB-scale queries, or any query whose result set exceeds the native heap budget, this OOMs the Rust side regardless of how memory accounting is configured downstream. The Java caller can drain batches one at a time from the returned ArrowReader, but by the time the reader exists, every batch is already buffered in native memory.

SessionContext::execute_stream already exists upstream — it returns a SendableRecordBatchStream (Pin<Box<dyn RecordBatchStream + Send>>) that yields one batch at a time. The Java binding just needs to wire that to the existing FFI_ArrowArrayStream path and hand the resulting reader back to Java.

This is what most embedders running production-shaped workloads need by default; collect() is the right API for small results but the wrong default for analytics-scale queries.

Describe the solution you'd like

A new method on DataFrame, peer to collect:

try (DataFrame df = ctx.sql("SELECT ... FROM big_table");
     ArrowReader reader = df.executeStream(allocator)) {
    while (reader.loadNextBatch()) {
        VectorSchemaRoot batch = reader.getVectorSchemaRoot();
        // process one batch at a time; nothing else materialized
    }
}

Same return type as collect (ArrowReader), same lifecycle rules (caller closes, allocator must outlive). The semantic difference is purely "one batch is in memory at a time" instead of "all batches in memory before the first is yielded."

Native shape

The Rust side already exports record batches via FFI_ArrowArrayStream::new(Box::new(iter)) (see the existing Java_org_apache_datafusion_DataFrame_collectDataFrame handler in native/src/lib.rs). The streaming variant substitutes a RecordBatchReader that pulls from df.execute_stream() lazily:

struct StreamingReader {
    schema: SchemaRef,
    stream: Pin<Box<SendableRecordBatchStream>>,
}

impl Iterator for StreamingReader {
    type Item = Result<RecordBatch, ArrowError>;
    fn next(&mut self) -> Option<Self::Item> {
        runtime().block_on(self.stream.next())
            .map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e))))
    }
}

impl RecordBatchReader for StreamingReader {
    fn schema(&self) -> SchemaRef { self.schema.clone() }
}

Each loadNextBatch() call from Java drives one runtime().block_on(stream.next()) on the Rust side. No Vec<RecordBatch> is ever built; memory pressure stays bounded by what the executor's pipeline holds plus one batch in flight.

Lifecycle

  • Like collect, executeStream consumes the DataFrame: the native plan handle is released as the stream is established. Subsequent executeStream or collect on the same instance throws IllegalStateException.
  • close() on a successfully-streamed DataFrame is a no-op; the ArrowReader is the new owner of native resources.
  • If construction of the stream fails (e.g. plan error), the native handle must be cleaned up so the original DataFrame.close() doesn't double-free.

Coexistence with collect

Both methods stay. collect remains the right call for small results where you want a single owning buffer; executeStream is the right call for unbounded or large results. After this PR lands, the two paths could share a single Rust implementation (collect becomes executeStream + concat), but that re-implementation is out of scope here — keeping collect on its current code path keeps the diff small and lets a follow-up consolidate.

Describe alternatives you've considered

  1. Replace collect with executeStream and offer collectAll(allocator) for the small-result case. Cleaner long-term but breaks an unreleased-but-shipping API.

  2. Add a BatchSize knob that limits how much collect materializes before yielding control. Doesn't actually fix the OOM — the executor still buffers everything; just makes it harder to reason about.

  3. Expose execute_stream_partitioned. Useful for parallel consumers, but a single-stream API is the v1 most callers want. Partitioned can be a follow-up.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions