Skip to content

feat: query cancellation via CancellationToken on SessionContext #68

@LantaoJin

Description

@LantaoJin

Is your feature request related to a problem or challenge?

A long-running DataFrame.collect(allocator) or DataFrame.executeStream(allocator) call blocks the calling Java thread for the entire duration of the query. Thread.interrupt() does nothing — the JNI thread is parked inside runtime().block_on(...) (native/src/lib.rs), and the interrupt flag is ignored by the Tokio runtime. There is no way to abort an in-flight query, free its native resources early, or unblock the calling thread short of waiting for the query to finish.

For any embedder running multi-tenant workloads — request timeouts, user-cancel actions, node shutdown, leader-election handover — this is a hard operational gap. The OpenSearch analytics backend (OpenSearch/sandbox/plugins/analytics-backend-datafusion/rust/src/cancellation.rs and query_tracker.rs) carries a CancellationToken-based wrapper precisely because upstream offers nothing.

This is complementary to issue #40 (close()/JNI use-after-free race) but distinct: #40 is about safely tearing down a finished handle; this is about signalling an in-flight future to stop. Both eventually share the atomic-handle scaffolding from #40's option 2, so coordination is worthwhile, but the surface lands cleanly without #40 having to merge first.

Describe the solution you'd like

A token-based cancellation API on SessionContext, modeled on Spark 4.0's interruptTag shape (cancel lives on the session, not on the DataFrame). The token is a separate handle from the DataFrame so cancel can fire from a thread that does not hold the DataFrame.

v1 surface

try (SessionContext ctx = new SessionContext();
     CancellationToken token = ctx.newCancellationToken();
     DataFrame df = ctx.sql("SELECT ... FROM big_table")) {

    Future<ArrowReader> fut = pool.submit(() -> df.collect(allocator, token));

    // from another thread (timeout watcher, user-cancel handler, ...):
    token.cancel();

    // fut completes with CancellationException
}

New methods:

  • SessionContext.newCancellationToken() -- returns a fresh CancellationToken bound to this session.
  • CancellationToken.cancel() -- fires the token; idempotent.
  • CancellationToken.isCancelled() -- non-blocking check.
  • CancellationToken.close() -- releases the native handle; the token is AutoCloseable so try-with-resources handles cleanup.
  • DataFrame.collect(BufferAllocator, CancellationToken) -- overload that takes a token. The existing zero-token collect(BufferAllocator) is unchanged.
  • DataFrame.executeStream(BufferAllocator, CancellationToken) -- same overload pattern. Token is held by the returned ArrowReader for its full lifetime; cancel mid-stream aborts the next loadNextBatch().

Describe alternatives you've considered

No response

Additional context

Out of scope

  • Tag form. Ship the token primitive first; tag is sugar that can land in a follow-up if a user actually asks for it.
  • Sync-API breakage. df.collect(allocator) keeps working unchanged; the new method is df.collect(allocator, token) (overload).
  • Per-operator cancel granularity. Today the cancel point is each block_on site; sub-operator cancellation is upstream-DataFusion territory.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions