Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions core/src/main/java/org/apache/datafusion/CancellationToken.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicLong;

/**
* A handle that signals an in-flight {@link
* DataFrame#collect(org.apache.arrow.memory.BufferAllocator, CancellationToken)} or {@link
* DataFrame#executeStream(org.apache.arrow.memory.BufferAllocator, CancellationToken)} to abort.
*
* <p>Allocate via {@link SessionContext#newCancellationToken()}, pass to the desired call from one
* thread, and invoke {@link #cancel()} from another. The running query aborts at its next
* cooperative poll point. The exception type depends on the call site: {@code collect(..., token)}
* and pre-stream cancellation in {@code executeStream(..., token)} surface {@link
* CancellationException}; mid-stream cancellation in {@code executeStream} surfaces from {@link
* org.apache.arrow.vector.ipc.ArrowReader#loadNextBatch} as a {@link java.io.IOException} whose
* message contains {@code "query cancelled"} (the Arrow C-data wrapper hides the typed signal). See
* the {@code executeStream} Javadoc for the full contract.
*
* <p>The token is not bound to any particular DataFrame; the same token may be passed to several
* concurrent {@code collect} / {@code executeStream} calls, and {@link #cancel()} fires all of them
* at once. Once cancelled, {@link #isCancelled()} returns {@code true} permanently — to "reset",
* allocate a fresh token. This matches the underlying {@code tokio_util::sync::CancellationToken}
* contract.
*
* <p>Instances are safe to call {@link #cancel()} / {@link #isCancelled()} on from any thread, and
* must be {@link #close() closed} to release the native handle. {@code close()} is idempotent and a
* no-op once invoked.
*/
public final class CancellationToken implements AutoCloseable {
static {
NativeLibraryLoader.loadLibrary();
}

// Atomic so concurrent close + cancel/isCancelled/handle-pass-to-JNI cannot
// double-free or pass a stale handle to the native side. Reads observe either
// a live registry id or the post-close 0 sentinel; the close path uses
// getAndSet so only one thread can issue closeToken.
private final AtomicLong nativeHandle;

CancellationToken(long nativeHandle) {
if (nativeHandle == 0) {
throw new IllegalArgumentException("CancellationToken native handle is null");
}
this.nativeHandle = new AtomicLong(nativeHandle);
}

/**
* The internal native handle, or {@code 0} if this token is closed. Package-private so {@link
* DataFrame} can pass it across JNI. The native side's registry lookup gracefully rejects a
* closed handle, so a race between {@code handle()} and {@link #close()} is bounded to a clean
* "closed" error rather than a use-after-free.
*/
long handle() {
return nativeHandle.get();
}

/**
* Signal the token. Any {@code collect} or {@code executeStream} call that received this token
* aborts at its next poll point. The thrown exception type depends on whether the cancel reaches
* the call before or after the JNI call returns — see the class-level Javadoc. Idempotent:
* subsequent calls are no-ops.
*
* @throws IllegalStateException if this token is closed.
*/
public void cancel() {
long h = nativeHandle.get();
if (h == 0) {
throw new IllegalStateException("CancellationToken is closed");
}
cancelToken(h);
}

/**
* @return {@code true} if {@link #cancel()} has been invoked on this token, {@code false}
* otherwise. Non-blocking.
* @throws IllegalStateException if this token is closed.
*/
public boolean isCancelled() {
long h = nativeHandle.get();
if (h == 0) {
throw new IllegalStateException("CancellationToken is closed");
}
return isCancelledToken(h);
}

/**
* Release the native handle. Idempotent. After {@code close()}, {@link #cancel()} and {@link
* #isCancelled()} throw {@link IllegalStateException}. Closing a token that already fired is
* harmless and does not cancel anything else; queries that already received the cancel signal
* remain aborted.
*/
@Override
public void close() {
long h = nativeHandle.getAndSet(0L);
if (h != 0) {
closeToken(h);
}
}

private static native void cancelToken(long handle);

private static native boolean isCancelledToken(long handle);

private static native void closeToken(long handle);
}
73 changes: 69 additions & 4 deletions core/src/main/java/org/apache/datafusion/DataFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,29 @@ public final class DataFrame implements AutoCloseable {
* {@link #executeStream(BufferAllocator)} for analytics-scale queries.
*/
public ArrowReader collect(BufferAllocator allocator) {
return collect(allocator, null);
}

/**
* Execute the plan with cooperative cancellation. Identical to {@link #collect(BufferAllocator)}
* except that {@link CancellationToken#cancel()} on {@code token} from another thread aborts the
* call with a {@link java.util.concurrent.CancellationException} at the next poll point.
*
* <p>{@code token} may be {@code null}, in which case this overload behaves exactly like the
* single-argument form.
*
* @throws java.util.concurrent.CancellationException if the token is fired during the call.
*/
public ArrowReader collect(BufferAllocator allocator, CancellationToken token) {
if (nativeHandle == 0) {
throw new IllegalStateException("DataFrame is closed or already collected");
}
long tokenHandle = resolveTokenHandle(token);
ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator);
long handle = nativeHandle;
nativeHandle = 0;
try {
collectDataFrame(handle, stream.memoryAddress());
collectDataFrame(handle, stream.memoryAddress(), tokenHandle);
return Data.importArrayStream(allocator, stream);
} catch (Throwable e) {
stream.close();
Expand All @@ -91,21 +106,70 @@ public ArrowReader collect(BufferAllocator allocator) {
* use this method.
*/
public ArrowReader executeStream(BufferAllocator allocator) {
return executeStream(allocator, null);
}

/**
* Execute the plan as a streaming reader with cooperative cancellation. Identical to {@link
* #executeStream(BufferAllocator)} except that the returned reader holds the supplied {@code
* token} for its full lifetime: firing the token from another thread aborts the next {@link
* ArrowReader#loadNextBatch} call.
*
* <p>{@code token} may be {@code null}, in which case this overload behaves exactly like the
* single-argument form.
*
* <p><b>Cancellation surface (read carefully).</b> The exception type depends on <em>when</em>
* the cancel fires, because the Arrow C-data stream layer wraps any underlying error before it
* reaches Java:
*
* <ul>
* <li>If the token is fired <b>before</b> the stream is established (e.g., the token was
* already cancelled at call time, or fires during plan compilation), this method throws
* {@link java.util.concurrent.CancellationException}.
* <li>If the token fires <b>after</b> {@code executeStream} returns, the next {@link
* ArrowReader#loadNextBatch} throws {@link java.io.IOException} whose message contains the
* string {@code "query cancelled"}. Callers that need to distinguish cancellation from real
* I/O failures must match on the message until a typed surface lands as a follow-up.
* </ul>
*
* @throws java.util.concurrent.CancellationException if the token fires before the stream is
* established.
*/
public ArrowReader executeStream(BufferAllocator allocator, CancellationToken token) {
if (nativeHandle == 0) {
throw new IllegalStateException("DataFrame is closed or already collected");
}
long tokenHandle = resolveTokenHandle(token);
ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator);
long handle = nativeHandle;
nativeHandle = 0;
try {
executeStreamDataFrame(handle, stream.memoryAddress());
executeStreamDataFrame(handle, stream.memoryAddress(), tokenHandle);
return Data.importArrayStream(allocator, stream);
} catch (Throwable e) {
stream.close();
throw e;
}
}

/**
* A {@code null} token disables cancellation; a non-null but already-closed token is rejected
* with {@link IllegalStateException}, matching how {@link CancellationToken#cancel()} and {@link
* CancellationToken#isCancelled()} behave on a closed token. Without this check, premature {@code
* close()} on a token would silently fall back to an uncancellable call, which is hard to
* diagnose.
*/
private static long resolveTokenHandle(CancellationToken token) {
if (token == null) {
return 0L;
}
long handle = token.handle();
if (handle == 0L) {
throw new IllegalStateException("CancellationToken is closed");
}
return handle;
}

/** Execute the plan and return the number of rows. */
public long count() {
if (nativeHandle == 0) {
Expand Down Expand Up @@ -358,9 +422,10 @@ public void close() {
}
}

private static native void collectDataFrame(long handle, long ffiStreamAddr);
private static native void collectDataFrame(long handle, long ffiStreamAddr, long tokenHandle);

private static native void executeStreamDataFrame(long handle, long ffiStreamAddr);
private static native void executeStreamDataFrame(
long handle, long ffiStreamAddr, long tokenHandle);

private static native void closeDataFrame(long handle);

Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/org/apache/datafusion/SessionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,24 @@ public DataFrame readAvro(String path, AvroReadOptions options) {
return new DataFrame(dfHandle);
}

/**
* Allocate a fresh {@link CancellationToken}. Pass it to {@link
* DataFrame#collect(BufferAllocator, CancellationToken)} or {@link
* DataFrame#executeStream(BufferAllocator, CancellationToken)} to make a query cancellable; fire
* the token from any thread to abort the in-flight call.
*
* <p>The same token may be reused across multiple concurrent calls; firing it cancels them all.
* Once fired, a token stays cancelled — allocate a fresh token for the next query.
*
* <p>The token is independent of this {@link SessionContext}: closing the context does not
* implicitly close outstanding tokens, and a token outliving its session is harmless (it just has
* nothing left to cancel). Always close tokens in their own try-with-resources to release the
* native handle.
*/
public CancellationToken newCancellationToken() {
return new CancellationToken(createCancellationToken());
}

/**
* Register a Java-implemented scalar UDF. After registration, the function can be invoked by SQL
* via the UDF's name or referenced in DataFusion plans deserialised with {@link #fromProto}.
Expand Down Expand Up @@ -559,6 +577,8 @@ private static native long readJsonWithOptions(
private static native void registerScalarUdf(
long handle, String name, byte[] signatureSchemaBytes, byte volatility, ScalarFunction impl);

private static native long createCancellationToken();

private static native void registerTableNative(
long handle, String name, byte[] schemaIpcBytes, TableProvider provider);
}
Loading