feat(datasource): add Java-implemented data sources#65
Conversation
…rDataSource Java API
Arrow Java's Data.exportArrayStream requires the reader's buffers to share the same allocator root as the export allocator. The previous workaround re-serialised every batch through IPC bytes, defeating zero-copy. The correct fix is to require DataSource.scan to accept a BufferAllocator argument (the framework's own ALLOCATOR) and allocate its reader's buffers from it. This mirrors the ScalarFunction.evaluate(BufferAllocator, ...) API.
|
@pgwhalen could you review? |
| * is closed. | ||
| * @throws RuntimeException if native registration fails. | ||
| */ | ||
| public void registerDataSource(String name, DataSource source) { |
There was a problem hiding this comment.
Since this is basically a simplified API on top of the SessionContext::register_table rust function, what if we called the java function that instead (registerTable), and made the interface it accepts TableProvider?
I get that this PR is basically barebones support for custom table registration in java, and that data_source.rs is handling a lot so the java user gets a simple scan() callback. I think only providing that for now makes sense as a first step (and will always be useful for simple cases), but I'd like to make sure this can evolve towards all the flexibility of the TableProvider trait that interacts with ExecutionPlan and ultimately an ArrowReader. The LiteralGuaranteeTest from my bindings demonstrates what this could look like and what it enables (filter pushdown).
To keep things minimal for PR, maybe we could just
- rename
registerDataSourcetoregisterTable - rename the
DataSourceinterface toTableProvider - provide a simple implementation of
TableProviderthat just holds what the currentDataSourcedoes - not sure about a name for that, but maybe likeSimpleTableProviderorFullScanTableProvideror something
Then we can make TableProvider more featured over time. Totally open to other ideas too.
Part of my motivation in renaming is that in the back of my head I'm thinking about eventual support for the separate DataSource, so don't want to clash on naming.
There was a problem hiding this comment.
Thanks @pgwhalen. I have addressed your feedback.
Address PR apache#65 review: align Java-side naming with DataFusion's Rust TableProvider trait and free up the DataSource name for the separate datafusion-datasource concept in the future. Add SimpleTableProvider as a convenience wrapper for the (schema, scan-fn) case. - DataSource -> TableProvider (Java interface) - SessionContext.registerDataSource -> registerTable - JniBridge.invokeDataSourceScan -> invokeTableScan - Native JavaDataSource struct + module renamed to JavaTableProvider / table_provider.rs; JNI entry point + signature updated accordingly - New SimpleTableProvider class wraps a Schema and a Function<BufferAllocator, ArrowReader> for the common no-pushdown case - Test, example, and user-guide docs updated to match
# Conflicts: # native/Cargo.toml
Which issue does this PR close?
Rationale for this change
Java users have no way to expose custom in-process tables (JDBC scans, in-memory
collections, custom file formats, etc.) to DataFusion. This adds a minimal
DataSourceinterface and the JNI wiring to register it on aSessionContext.The implementation mirrors the existing scalar-UDF JNI pattern.
What changes are included in this PR?
DataSourceinterface inorg.apache.datafusionwithSchema schema()andArrowReader scan(BufferAllocator).SessionContext.registerDataSource(name, source)registers a Java-backedtable; schema is captured at registration time.
JniBridge.invokeDataSourceScanexports the user'sArrowReaderthroughthe Arrow C Data Interface (zero-copy).
JavaDataSource: TableProvider+JavaScanExec: ExecutionPlaninnative/src/data_source.rs, plus the JNI entry point.jthrowable_to_stringhelper lifted intonative/src/jni_util.rsso the UDF and data-source paths share Java-exception formatting.
JdbcExamplein theexamplesmodule demonstrating an end-to-endJDBC-backed
DataSource: populates an H2 in-memory table, wraps a JDBCquery in a
JdbcDataSource, registers it, and runs an aggregation query.Streams batches via
arrow-jdbc'sArrowVectorIteratorwrapped in a smallArrowReadersubclass — no IPC re-serialisation. Addsarrow-jdbcandcom.h2database:h2asexamples-module deps.(DataFusion projects/filters on top), no
deregisterTable. Multi-partition,pushdown, and deregistration are listed as follow-ups in the user guide.
Run the JDBC example with:
Are these changes tested?
Yes — eight integration tests in
`core/src/test/java/org/apache/datafusion/DataSourceTest.java`:
The JDBC example is exercised end-to-end manually (output verified: aggregation
produces `alice → 119.99` and `bob → 7.50` over the H2 fixture). It compiles
as part of the standard `mvn package` build alongside the other example
classes (`AddOneExample`, `DataFrameExample`, etc.) — none of which carry
JUnit tests, by convention.
Are there any user-facing changes?
Yes — new public `DataSource` interface and `SessionContext.registerDataSource`
method, plus a new user-guide page at `docs/source/user-guide/data-source.md`
covering the API, contract, threading, errors, and v1 limitations. The
runnable `JdbcExample` shows the API in action against an embedded H2.