Skip to content

feat: add CancellationToken with collect/executeStream overloads#69

Open
LantaoJin wants to merge 2 commits into
apache:mainfrom
LantaoJin:feat/dataframe-cancellation
Open

feat: add CancellationToken with collect/executeStream overloads#69
LantaoJin wants to merge 2 commits into
apache:mainfrom
LantaoJin:feat/dataframe-cancellation

Conversation

@LantaoJin
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

A long-running DataFrame.collect(allocator) or DataFrame.executeStream(allocator) call blocks the calling Java thread for the full duration of the query. Thread.interrupt() is a no-op (the thread is parked inside runtime().block_on(...)), and there is no way to free the in-flight query's native resources early. For any embedder running request timeouts, user-cancel actions, or node-shutdown drains, this is a hard operational gap.

What changes are included in this PR?

The cancellation we ship lives on the session, not on the DataFrame. A token is allocated by the session, passed to collect / executeStream, and fired from any thread. We deliberately do not add DataFrame.cancel(): a DataFrame is a lazy plan that can be executed concurrently from multiple threads, so a per-DataFrame cancel verb has ambiguous semantics. The token is the primitive; a Spark-style ctx.addTag(name) / ctx.cancelTag(name) sugar layer can land in a follow-up.

Native shape: tokio_util::sync::CancellationToken per token, owned by a process-global registry keyed by an opaque u64 ID. JNI handlers look up by ID under a Mutex<HashMap> and clone the Arc out before any blocking work. The cloned Arc keeps the inner token alive for the borrow's lifetime, so an in-flight collect() future that already holds a clone keeps working through a concurrent close. close() removes the registry entry; the underlying token drops only when the last Arc clone goes away.

Race safety: CancellationToken.nativeHandle is final AtomicLong. cancel() / isCancelled() read via get(); close() uses getAndSet(0L) so only the winning thread issues closeToken. Combined with the registry on the native side, a concurrent close + cancel + query-start race produces at worst a clean IllegalStateException from a missing-ID lookup -- never a use-after-free of a freed Box. The registry pattern is the same scaffolding upstream issue #40 calls for across all handle types; cancellation tokens get it first because they are designed to be fired from a thread that does not own them, so the race window is the widest of any handle.

Are these changes tested?

Yes -- 19 new tests across CancellationTokenTest and DataFrameCancellationTest.

Are there any user-facing changes?

Yes -- purely additive. New CancellationToken class, one new method on SessionContext, two new overloads on DataFrame. No API removals, no deprecations, no behaviour change for existing callers. New Cargo dependency: tokio-util = { version = "0.7", features = ["rt"] }; already a transitive dep, so no new crate in the build, just a feature flag added.

LantaoJin added 2 commits May 19, 2026 12:51
Tokens are session-allocated, fired from any thread, and abort the next cooperative poll point of the targeted call. Pre-stream cancellations surface as `java.util.concurrent.CancellationException`; mid-stream cancellations from `executeStream(..., token)` surface from `ArrowReader.loadNextBatch` as `IOException` whose message contains `"query cancelled"` (the Arrow C-data wrapper hides the typed signal).

Native handles are opaque registry IDs guarded by an `AtomicLong` on the Java side, so concurrent close + cancel is safe: the worst race outcome is a clean `IllegalStateException` from a missing-ID lookup rather than a use-after-free of a freed `Box`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: query cancellation via CancellationToken on SessionContext

1 participant