Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
b64accb
build(native): add async-trait and futures deps for Java data sources
andygrove May 18, 2026
484cd12
refactor(native): lift jthrowable_to_string into shared jni_util module
andygrove May 18, 2026
b699291
feat(datasource): add DataSource interface and SessionContext.registe…
andygrove May 18, 2026
e16a99e
feat(native): add JavaDataSource TableProvider and JNI registration
andygrove May 18, 2026
79213dc
docs(native): clarify JavaScanExec safety + schema check + JVM attach
andygrove May 18, 2026
9c60f3c
feat(datasource)!: pass framework allocator to DataSource.scan
andygrove May 18, 2026
cd03d90
test(datasource): cover repeated scans within a single query
andygrove May 18, 2026
1004f6c
test(datasource): cover empty-stream scan
andygrove May 18, 2026
bf9c435
test(datasource): cover column projection through DataFusion
andygrove May 18, 2026
0ff2d8c
test(datasource): reject scan whose schema differs from registered sc…
andygrove May 18, 2026
248dc70
test(datasource): surface Java exception class and message from scan()
andygrove May 18, 2026
82d13fb
test(datasource): reject null ArrowReader from scan()
andygrove May 18, 2026
953fcf2
test(datasource): cover joining two registered Java data sources
andygrove May 18, 2026
af57098
docs(datasource): document SessionContext.registerDataSource
andygrove May 18, 2026
a4eb41e
docs(datasource): clarify scan() is per-physical-scan, not per-query
andygrove May 18, 2026
82c740a
feat(examples): add JDBC-backed DataSource example using H2 + arrow-jdbc
andygrove May 18, 2026
1df2bd2
refactor(datasource)!: rename DataSource API to TableProvider
andygrove May 19, 2026
9e8279d
Merge remote-tracking branch 'apache/main' into feat/columnar-value-udf
andygrove May 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/datafusion/SessionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,41 @@ public void registerUdf(ScalarUdf udf) {
registerScalarUdf(nativeHandle, name, signatureBytes, volatility.code(), impl);
}

/**
* Register a Java-implemented {@link TableProvider} under {@code name}. SQL queries that
* reference {@code name} call back into {@code provider} to fetch batches.
*
* <p>{@link TableProvider#schema()} is called once here, on the calling thread, and cached on the
* native side. {@link TableProvider#scan(org.apache.arrow.memory.BufferAllocator)} is called once
* per query that touches the table, on a Tokio worker thread; it must return a fresh, independent
* {@link org.apache.arrow.vector.ipc.ArrowReader} on every call, with its buffers allocated from
* the {@link org.apache.arrow.memory.BufferAllocator} the framework supplies.
*
* <p>This is the Java counterpart to DataFusion's Rust {@code SessionContext::register_table}.
*
* @throws IllegalArgumentException if {@code name} or {@code provider} is {@code null}.
* @throws IllegalStateException if {@code provider.schema()} returns {@code null}, or this
* context is closed.
* @throws RuntimeException if native registration fails.
*/
public void registerTable(String name, TableProvider provider) {
if (nativeHandle == 0) {
throw new IllegalStateException("SessionContext is closed");
}
if (name == null) {
throw new IllegalArgumentException("registerTable name must be non-null");
}
if (provider == null) {
throw new IllegalArgumentException("registerTable provider must be non-null");
}
Schema schema = provider.schema();
if (schema == null) {
throw new IllegalStateException("TableProvider.schema returned null");
}
byte[] schemaIpc = serializeSchemaIpc(schema);
registerTableNative(nativeHandle, name, schemaIpc, provider);
}

private static byte[] serializeSchemaIpc(Schema schema) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (BufferAllocator allocator = new RootAllocator();
Expand Down Expand Up @@ -523,4 +558,7 @@ private static native long readJsonWithOptions(

private static native void registerScalarUdf(
long handle, String name, byte[] signatureSchemaBytes, byte volatility, ScalarFunction impl);

private static native void registerTableNative(
long handle, String name, byte[] schemaIpcBytes, TableProvider provider);
}
70 changes: 70 additions & 0 deletions core/src/main/java/org/apache/datafusion/SimpleTableProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import java.util.function.Function;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;

/**
* A {@link TableProvider} that pairs a fixed {@link Schema} with a function that opens a fresh
* {@link ArrowReader} for each scan. Provided as a convenience for the common case where there is
* no projection / filter pushdown to implement.
*
* <p>Each call to {@link #scan(BufferAllocator)} invokes the supplied function and returns whatever
* {@link ArrowReader} it produces, so the function MUST return a fresh, independent reader on every
* invocation (see the contract on {@link TableProvider#scan(BufferAllocator)}).
*
* <p>As {@link TableProvider} grows additional methods in the future, this class will provide
* defaults so existing callers keep working without changes.
*/
public final class SimpleTableProvider implements TableProvider {

private final Schema schema;
private final Function<BufferAllocator, ArrowReader> scanFn;

/**
* @param schema the table schema; returned as-is from {@link #schema()}
* @param scanFn called on every {@link #scan(BufferAllocator)} with the framework-supplied
* allocator; must return a fresh, independent {@link ArrowReader} each time
*/
public SimpleTableProvider(Schema schema, Function<BufferAllocator, ArrowReader> scanFn) {
if (schema == null) {
throw new IllegalArgumentException("schema must be non-null");
}
if (scanFn == null) {
throw new IllegalArgumentException("scanFn must be non-null");
}
this.schema = schema;
this.scanFn = scanFn;
}

@Override
public Schema schema() {
return schema;
}

@Override
public ArrowReader scan(BufferAllocator allocator) {
return scanFn.apply(allocator);
}
}
59 changes: 59 additions & 0 deletions core/src/main/java/org/apache/datafusion/TableProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;

/**
* A Java-implemented table that can be registered with a {@link SessionContext} via {@link
* SessionContext#registerTable(String, TableProvider)}. Mirrors the role of DataFusion's Rust
* {@code TableProvider} trait, but at present only exposes the methods needed for a full table
* scan; future versions may add filter/projection pushdown and multi-partition support as default
* methods so existing implementations keep working.
*
* <p>{@link SimpleTableProvider} is a ready-made implementation for the common case of "I have a
* schema and a function that returns an {@link ArrowReader}".
*
* <p>Each call to {@link #scan(BufferAllocator)} must return a fresh, independent {@link
* ArrowReader} so that queries which touch the table more than once (self-joins, {@code UNION ALL},
* repeated reads) work correctly. The returned reader is closed by the framework when the stream
* ends.
*
* <p>The schema returned by {@link #schema()} is captured once at registration time. Every batch
* produced by every {@code ArrowReader} returned from {@link #scan(BufferAllocator)} must conform
* to it; a mismatch fails the query.
*/
public interface TableProvider {
/** The fixed schema of this table. Called once, at registration time. */
Schema schema();

/**
* Open a fresh batch stream for this table. Called once per physical scan of the table — a single
* query may invoke this more than once (self-joins, {@code UNION ALL} over the same table, etc.).
*
* <p>Each invocation MUST return an independent {@link ArrowReader}. The reader's schema MUST
* equal {@link #schema()}. The reader's buffers MUST be allocated from {@code allocator} (or from
* a child of it) — the framework needs the reader's allocator hierarchy to share a root with the
* one it passes here. The allocator contract mirrors the one on {@link ScalarFunction#evaluate}.
*/
ArrowReader scan(BufferAllocator allocator);
}
33 changes: 33 additions & 0 deletions core/src/main/java/org/apache/datafusion/internal/JniBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import java.util.List;

import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.datafusion.ColumnarValue;
import org.apache.datafusion.ScalarFunction;
import org.apache.datafusion.ScalarFunctionArgs;
import org.apache.datafusion.TableProvider;

/** Internal trampoline invoked from native code on every UDF call. Not part of the public API. */
public final class JniBridge {
Expand Down Expand Up @@ -139,4 +142,34 @@ public static byte invokeScalarUdf(
return resultKind;
}
}

/**
* Open a fresh batch stream from a Java {@link TableProvider} and export it through the supplied
* Arrow C Data Interface address. Called from native code; not for application use.
*
* <p>{@link TableProvider#scan(org.apache.arrow.memory.BufferAllocator)} is called with {@link
* #ALLOCATOR} so that the reader's buffers share the same allocator root required by {@link
* Data#exportArrayStream}.
*
* <p>On success, ownership of the returned reader transfers to the FFI stream's release callback,
* so the native side closing the stream also closes the reader. On any failure during export, the
* reader is closed here before the exception propagates.
*/
public static void invokeTableScan(TableProvider provider, long ffiStreamAddr) {
ArrowReader reader = provider.scan(ALLOCATOR);
if (reader == null) {
throw new IllegalStateException("TableProvider.scan returned null");
}
ArrowArrayStream stream = ArrowArrayStream.wrap(ffiStreamAddr);
try {
Data.exportArrayStream(ALLOCATOR, reader, stream);
} catch (Throwable t) {
try {
reader.close();
} catch (Exception ignored) {
// best-effort cleanup; original error wins
}
throw t;
}
}
}
Loading
Loading