Skip to content

feat(dataframe): add executeStream(allocator) for incremental batch iteration#51

Open
LantaoJin wants to merge 1 commit into
apache:mainfrom
LantaoJin:feat/dataframe-execute-stream
Open

feat(dataframe): add executeStream(allocator) for incremental batch iteration#51
LantaoJin wants to merge 1 commit into
apache:mainfrom
LantaoJin:feat/dataframe-execute-stream

Conversation

@LantaoJin
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

DataFrame.collect(allocator) is the only way to retrieve query results today. Internally, the native handler calls df.collect().await which materializes every batch into a Vec<RecordBatch> on the Rust heap before the first batch crosses the FFI boundary. For TB-scale or unbounded result sets, this OOMs the Rust side regardless of how memory accounting is configured downstream.

DataFusion's own SessionContext::execute_stream already returns a SendableRecordBatchStream that yields one batch at a time. The Java binding just needs to wire that to the existing FFI_ArrowArrayStream path so each loadNextBatch() from Java drives one stream.next() on the Rust side.

What changes are included in this PR?

  • DataFrame.executeStream(BufferAllocator) → ArrowReader — peer to collect. Same lifecycle (consumes the DataFrame, caller closes the returned reader, allocator must outlive it). Same return type so existing reader-driven code paths work unchanged.
  • native/src/lib.rs — new Java_org_apache_datafusion_DataFrame_executeStreamDataFrame JNI handler plus a small StreamingReader adapter that bridges DataFusion's async SendableRecordBatchStream to Arrow's synchronous RecordBatchReader interface. Each next() drives one runtime().block_on(stream.next()).
  • native/Cargo.toml — adds futures = "0.3" for StreamExt::next.
  • Updated collect's Javadoc to point at executeStream for analytics-scale queries.

collect is unchanged. The two methods coexist; collect remains the right call for small results that benefit from a single owning buffer, executeStream is the right call for unbounded or very large result sets.

Are these changes tested?

Yes — 6 new tests in DataFrameExecuteStreamTest

Are there any user-facing changes?

Yes — purely additive. New public API:

  • DataFrame.executeStream(BufferAllocator) → ArrowReader

collect's behavior is unchanged; only its Javadoc gains a forward-pointer to executeStream. No deprecations, no API removals.

A new transitive build dependency (futures = "0.3") is added to the native crate for StreamExt. It is already pulled in transitively by tokio and datafusion, so this just makes the dependency explicit on the import path.

…teration

DataFrame.collect(allocator) materializes every batch into a Vec<RecordBatch> on the Rust heap before the first batch crosses the FFI boundary into Java. For TB-scale or unbounded result sets, this OOMs the Rust side regardless of how memory accounting is configured downstream.

Adds DataFrame.executeStream(BufferAllocator) -> ArrowReader as a peer to collect, sharing the same lifecycle (consumes the DataFrame, caller closes the returned reader, allocator must outlive it) but pulling batches lazily. The native side wraps DataFusion's existing SessionContext::execute_stream output (a SendableRecordBatchStream) in a small StreamingReader adapter that bridges async Stream::next() to Arrow's synchronous RecordBatchReader trait; each call to ArrowReader.loadNextBatch() on the Java side drives one runtime().block_on(stream.next()) on the Rust side. Memory pressure stays bounded by the executor pipeline plus a single in-flight batch instead of the full result set.

collect remains on its current code path and is unchanged behaviorally; only its Javadoc gains a forward-pointer to executeStream for analytics-scale queries. A follow-up could consolidate collect onto executeStream + concat (~10 LOC, no API change) but that refactor is out of scope here to keep the diff focused on adding the streaming primitive.

Tests cover equivalence with collect (same row count over a small VALUES query), the consumes-DataFrame contract (second collect/executeStream/count throws after a successful executeStream), incremental delivery (with batchSize=2 over 5 rows the reader yields multiple batches and no single batch holds the full result), early-close survival (closing the reader mid-stream does not panic), TPC-H integration gated on the SF1 lineitem table, and column-value correctness (pins actual cell values across batches, not just row counts).

native/Cargo.toml gains futures = "0.3" for StreamExt::next; it is already pulled in transitively by tokio and datafusion, the addition just makes the import path explicit.
@andygrove
Copy link
Copy Markdown
Member

Code review

Found 2 issues:

  1. Class-level Javadoc on DataFrame still names only collect as the execution/consume method, but after this PR executeStream is a peer that also consumes the DataFrame and makes close() a no-op. Worth updating the top-of-class summary so the lifecycle paragraph covers both methods.

/**
* A lazy representation of a query plan, mirroring the Rust DataFusion {@code DataFrame}. Created
* by {@link SessionContext#sql(String)} or other planning entry points and executed by {@link
* #collect}.
*
* <p>Instances are <strong>not thread-safe</strong> and must be closed. {@link #collect} consumes
* the DataFrame: a successfully collected DataFrame cannot be collected again, and {@link #close()}
* on an already-collected instance is a no-op.
*/

  1. executeStreamSurvivesEarlyClose doesn't put the ArrowReader in a try-with-resources, so if loadNextBatch() or assertTrue(...) throws, the reader leaks and the surrounding BufferAllocator.close() will fail and mask the original failure. The other tests in this file use the right idiom — recommend matching it here.

@Test
void executeStreamSurvivesEarlyClose() throws Exception {
// Close the reader after the first batch and confirm no native panic /
// resource leak. The DataFrame is already consumed; explicit close on it
// must remain a no-op.
try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = SessionContext.builder().batchSize(1).build();
DataFrame df = ctx.sql("SELECT * FROM (VALUES (1), (2), (3)) AS t(x)")) {
ArrowReader reader = df.executeStream(allocator);
assertTrue(reader.loadNextBatch());
reader.close();
}
}

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(dataframe): add executeStream(allocator) for incremental batch iteration

2 participants