diff --git a/core/src/main/java/org/apache/datafusion/ArrowReadOptions.java b/core/src/main/java/org/apache/datafusion/ArrowReadOptions.java new file mode 100644 index 0000000..c48da4b --- /dev/null +++ b/core/src/main/java/org/apache/datafusion/ArrowReadOptions.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.datafusion.protobuf.ArrowReadOptionsProto; + +/** + * Configuration knobs for Arrow IPC sources passed to {@link SessionContext#registerArrow(String, + * String, ArrowReadOptions)} and {@link SessionContext#readArrow(String, ArrowReadOptions)}. + * + *

Mirrors the subset of DataFusion's {@code ArrowReadOptions} that maps onto the Java surface + * today: {@code fileExtension} (default {@code ".arrow"}) and an explicit Arrow {@code schema} that + * bypasses on-read schema inference. {@code tablePartitionCols} is intentionally deferred -- + * neither Parquet nor CSV expose Hive-style partitioning on the Java side yet. + * + *

Arrow IPC files carry their own body compression (LZ4_FRAME / ZSTD per-buffer) inside the file + * format itself, so unlike CSV / NDJSON there is no {@code FileCompressionType} setter. + */ +public final class ArrowReadOptions { + + private String fileExtension = ".arrow"; + private Schema schema; + + public ArrowReadOptions fileExtension(String ext) { + this.fileExtension = ext; + return this; + } + + public ArrowReadOptions schema(Schema schema) { + this.schema = schema; + return this; + } + + byte[] toBytes() { + return ArrowReadOptionsProto.newBuilder().setFileExtension(fileExtension).build().toByteArray(); + } + + Schema schema() { + return schema; + } +} diff --git a/core/src/main/java/org/apache/datafusion/SessionContext.java b/core/src/main/java/org/apache/datafusion/SessionContext.java index 1b2a075..6dbeaa5 100644 --- a/core/src/main/java/org/apache/datafusion/SessionContext.java +++ b/core/src/main/java/org/apache/datafusion/SessionContext.java @@ -232,6 +232,69 @@ public DataFrame readParquet(String path, ParquetReadOptions options) { return new DataFrame(dfHandle); } + public void registerArrow(String name, String path) { + registerArrow(name, path, new ArrowReadOptions()); + } + + /** + * Register an Arrow IPC file (or directory of Arrow IPC files) as a table with the supplied + * {@link ArrowReadOptions}. + * + * @throws IllegalArgumentException if any of {@code name}, {@code path}, or {@code options} is + * {@code null}. + * @throws RuntimeException if registration fails (path not found, schema mismatch, etc.). + */ + public void registerArrow(String name, String path, ArrowReadOptions options) { + if (nativeHandle == 0) { + throw new IllegalStateException("SessionContext is closed"); + } + if (name == null) { + throw new IllegalArgumentException("registerArrow name must be non-null"); + } + if (path == null) { + throw new IllegalArgumentException("registerArrow path must be non-null"); + } + if (options == null) { + throw new IllegalArgumentException("registerArrow options must be non-null"); + } + registerArrowWithOptions( + nativeHandle, + name, + path, + options.toBytes(), + options.schema() != null ? serializeSchemaIpc(options.schema()) : null); + } + + /** Read an Arrow IPC file as a {@link DataFrame} without registering it. */ + public DataFrame readArrow(String path) { + return readArrow(path, new ArrowReadOptions()); + } + + /** + * Read an Arrow IPC file as a {@link DataFrame} with the supplied {@link ArrowReadOptions}. + * + * @throws IllegalArgumentException if {@code path} or {@code options} is {@code null}. + * @throws RuntimeException if the read fails. + */ + public DataFrame readArrow(String path, ArrowReadOptions options) { + if (nativeHandle == 0) { + throw new IllegalStateException("SessionContext is closed"); + } + if (path == null) { + throw new IllegalArgumentException("readArrow path must be non-null"); + } + if (options == null) { + throw new IllegalArgumentException("readArrow options must be non-null"); + } + long dfHandle = + readArrowWithOptions( + nativeHandle, + path, + options.toBytes(), + options.schema() != null ? serializeSchemaIpc(options.schema()) : null); + return new DataFrame(dfHandle); + } + private static byte[] serializeSchemaIpc(Schema schema) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (BufferAllocator allocator = new RootAllocator(); @@ -277,5 +340,11 @@ private static native void registerCsvWithOptions( private static native long readCsvWithOptions( long handle, String path, byte[] optionsBytes, byte[] schemaIpcBytes); + private static native void registerArrowWithOptions( + long handle, String name, String path, byte[] optionsBytes, byte[] schemaIpcBytes); + + private static native long readArrowWithOptions( + long handle, String path, byte[] optionsBytes, byte[] schemaIpcBytes); + private static native void closeSessionContext(long handle); } diff --git a/core/src/test/java/org/apache/datafusion/ArrowReadOptionsTest.java b/core/src/test/java/org/apache/datafusion/ArrowReadOptionsTest.java new file mode 100644 index 0000000..db84334 --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/ArrowReadOptionsTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; + +import java.util.List; + +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.datafusion.protobuf.ArrowReadOptionsProto; +import org.junit.jupiter.api.Test; + +import com.google.protobuf.InvalidProtocolBufferException; + +class ArrowReadOptionsTest { + + @Test + void defaultsRoundTripThroughProto() throws InvalidProtocolBufferException { + ArrowReadOptionsProto p = ArrowReadOptionsProto.parseFrom(new ArrowReadOptions().toBytes()); + assertEquals(".arrow", p.getFileExtension()); + } + + @Test + void fileExtensionRoundTripsThroughProto() throws InvalidProtocolBufferException { + ArrowReadOptionsProto p = + ArrowReadOptionsProto.parseFrom(new ArrowReadOptions().fileExtension(".ipc").toBytes()); + assertEquals(".ipc", p.getFileExtension()); + } + + @Test + void schemaIsHeldByReferenceAndNotInProto() { + Schema schema = + new Schema(List.of(new Field("x", FieldType.nullable(new ArrowType.Int(32, true)), null))); + ArrowReadOptions opts = new ArrowReadOptions().schema(schema); + assertSame(schema, opts.schema()); + } +} diff --git a/core/src/test/java/org/apache/datafusion/SessionContextArrowTest.java b/core/src/test/java/org/apache/datafusion/SessionContextArrowTest.java new file mode 100644 index 0000000..b0efc07 --- /dev/null +++ b/core/src/test/java/org/apache/datafusion/SessionContextArrowTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datafusion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class SessionContextArrowTest { + + /** + * Write three rows of {@code (id INT, name UTF8)} as a single Arrow IPC file using arrow-vector's + * built-in file writer. Returns the path the test can hand to {@code registerArrow} / {@code + * readArrow}. + */ + private static Path writePeopleArrow(Path dir, String name) throws IOException { + Schema schema = + new Schema( + List.of( + new Field("id", FieldType.notNullable(new ArrowType.Int(32, true)), null), + new Field("name", FieldType.notNullable(new ArrowType.Utf8()), null))); + Path file = dir.resolve(name); + try (BufferAllocator allocator = new RootAllocator(); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector id = (IntVector) root.getVector("id"); + VarCharVector nameVec = (VarCharVector) root.getVector("name"); + id.allocateNew(3); + nameVec.allocateNew(3); + id.set(0, 1); + id.set(1, 2); + id.set(2, 3); + nameVec.setSafe(0, "alice".getBytes()); + nameVec.setSafe(1, "bob".getBytes()); + nameVec.setSafe(2, "carol".getBytes()); + root.setRowCount(3); + + try (FileChannel ch = + FileChannel.open(file, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + ArrowFileWriter writer = new ArrowFileWriter(root, null, ch)) { + writer.start(); + writer.writeBatch(); + writer.end(); + } + } + return file; + } + + @Test + void registerArrowInfersSchemaAndCounts(@TempDir Path tempDir) throws Exception { + Path file = writePeopleArrow(tempDir, "people.arrow"); + + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext()) { + ctx.registerArrow("people", file.toAbsolutePath().toString()); + + try (DataFrame df = ctx.sql("SELECT COUNT(*) FROM people"); + ArrowReader reader = df.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + BigIntVector count = (BigIntVector) reader.getVectorSchemaRoot().getVector(0); + assertEquals(3L, count.get(0)); + } + + try (DataFrame df = ctx.sql("SELECT name FROM people WHERE id = 2"); + ArrowReader reader = df.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + assertEquals(1, root.getRowCount()); + VarCharVector names = (VarCharVector) root.getVector(0); + assertEquals("bob", new String(names.get(0))); + } + } + } + + @Test + void readArrowYieldsTheStoredRows(@TempDir Path tempDir) throws Exception { + Path file = writePeopleArrow(tempDir, "people.arrow"); + + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext(); + DataFrame df = ctx.readArrow(file.toAbsolutePath().toString()); + ArrowReader reader = df.collect(allocator)) { + long total = 0; + while (reader.loadNextBatch()) { + total += reader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(3L, total); + } + } + + @Test + void registerArrowWithCustomExtension(@TempDir Path tempDir) throws Exception { + Path file = writePeopleArrow(tempDir, "people.ipc"); + + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext()) { + ctx.registerArrow( + "t", file.toAbsolutePath().toString(), new ArrowReadOptions().fileExtension(".ipc")); + + try (DataFrame df = ctx.sql("SELECT SUM(id) FROM t"); + ArrowReader reader = df.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + BigIntVector sum = (BigIntVector) reader.getVectorSchemaRoot().getVector(0); + assertEquals(6L, sum.get(0)); + } + } + } + + @Test + void readArrowWithExplicitSchemaIsAccepted(@TempDir Path tempDir) throws Exception { + // Explicit schema overrides on-read inference. We supply the same schema the + // file actually has, so query results stay correct; the test pins that the + // explicit-schema code path is plumbed through and accepted. + Path file = writePeopleArrow(tempDir, "people.arrow"); + Schema schema = + new Schema( + List.of( + new Field("id", FieldType.notNullable(new ArrowType.Int(32, true)), null), + new Field("name", FieldType.notNullable(new ArrowType.Utf8()), null))); + + try (BufferAllocator allocator = new RootAllocator(); + SessionContext ctx = new SessionContext(); + DataFrame df = + ctx.readArrow(file.toAbsolutePath().toString(), new ArrowReadOptions().schema(schema)); + ArrowReader reader = df.collect(allocator)) { + assertTrue(reader.loadNextBatch()); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + assertEquals(3, root.getRowCount()); + assertEquals("id", root.getSchema().getFields().get(0).getName()); + assertEquals("name", root.getSchema().getFields().get(1).getName()); + } + } + + @Test + void registerArrowRejectsNullArguments() { + try (SessionContext ctx = new SessionContext()) { + ArrowReadOptions opts = new ArrowReadOptions(); + assertThrows(IllegalArgumentException.class, () -> ctx.registerArrow(null, "/p")); + assertThrows(IllegalArgumentException.class, () -> ctx.registerArrow("t", null)); + assertThrows(IllegalArgumentException.class, () -> ctx.registerArrow(null, "/p", opts)); + assertThrows(IllegalArgumentException.class, () -> ctx.registerArrow("t", null, opts)); + assertThrows(IllegalArgumentException.class, () -> ctx.registerArrow("t", "/p", null)); + } + } + + @Test + void readArrowRejectsNullArguments() { + try (SessionContext ctx = new SessionContext()) { + ArrowReadOptions opts = new ArrowReadOptions(); + assertThrows(IllegalArgumentException.class, () -> ctx.readArrow(null)); + assertThrows(IllegalArgumentException.class, () -> ctx.readArrow(null, opts)); + assertThrows(IllegalArgumentException.class, () -> ctx.readArrow("/p", null)); + } + } +} diff --git a/native/build.rs b/native/build.rs index 5a27cb0..fab28ce 100644 --- a/native/build.rs +++ b/native/build.rs @@ -18,6 +18,7 @@ fn main() { const PROTOS: &[&str] = &[ "../proto/session_options.proto", + "../proto/arrow_read_options.proto", "../proto/csv_read_options.proto", "../proto/parquet_read_options.proto", ]; diff --git a/native/src/arrow.rs b/native/src/arrow.rs new file mode 100644 index 0000000..2bbe7b0 --- /dev/null +++ b/native/src/arrow.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::error::DataFusionError; +use datafusion::execution::options::ArrowReadOptions; +use datafusion::prelude::SessionContext; +use jni::objects::{JByteArray, JClass, JString}; +use jni::sys::jlong; +use jni::JNIEnv; +use prost::Message; + +use crate::errors::{try_unwrap_or_throw, JniResult}; +use crate::proto_gen::ArrowReadOptionsProto; +use crate::runtime; +use crate::schema::decode_optional_schema; + +fn with_arrow_options( + env: &mut JNIEnv, + options_bytes: JByteArray, + schema_ipc_bytes: JByteArray, + f: impl FnOnce(ArrowReadOptions) -> JniResult, +) -> JniResult { + let bytes: Vec = env.convert_byte_array(&options_bytes)?; + let p = ArrowReadOptionsProto::decode(bytes.as_slice())?; + + let schema = decode_optional_schema(env, schema_ipc_bytes)?; + + // ArrowReadOptions exposes `file_extension` as a public field (not a builder + // setter); `schema` is the only field with a fluent setter. Build via + // struct-update syntax to avoid clippy::field_reassign_with_default. + let file_ext = p.file_extension; + let mut opts = ArrowReadOptions { + file_extension: &file_ext, + ..ArrowReadOptions::default() + }; + if let Some(ref s) = schema { + opts = opts.schema(s); + } + + f(opts) +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_datafusion_SessionContext_registerArrowWithOptions< + 'local, +>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + name: JString<'local>, + path: JString<'local>, + options_bytes: JByteArray<'local>, + schema_ipc_bytes: JByteArray<'local>, +) { + try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> { + if handle == 0 { + return Err("SessionContext handle is null".into()); + } + let ctx = unsafe { &*(handle as *const SessionContext) }; + let name: String = env.get_string(&name)?.into(); + let path: String = env.get_string(&path)?.into(); + with_arrow_options(env, options_bytes, schema_ipc_bytes, |opts| { + runtime().block_on(async { + ctx.register_arrow(&name, &path, opts).await?; + Ok::<(), DataFusionError>(()) + })?; + Ok(()) + }) + }) +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_datafusion_SessionContext_readArrowWithOptions<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + path: JString<'local>, + options_bytes: JByteArray<'local>, + schema_ipc_bytes: JByteArray<'local>, +) -> jlong { + try_unwrap_or_throw(&mut env, 0, |env| -> JniResult { + if handle == 0 { + return Err("SessionContext handle is null".into()); + } + let ctx = unsafe { &*(handle as *const SessionContext) }; + let path: String = env.get_string(&path)?.into(); + with_arrow_options(env, options_bytes, schema_ipc_bytes, |opts| { + let df = runtime().block_on(ctx.read_arrow(path, opts))?; + Ok(Box::into_raw(Box::new(df)) as jlong) + }) + }) +} diff --git a/native/src/lib.rs b/native/src/lib.rs index 9041819..f8850d8 100644 --- a/native/src/lib.rs +++ b/native/src/lib.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +mod arrow; mod csv; mod errors; mod proto; diff --git a/proto/arrow_read_options.proto b/proto/arrow_read_options.proto new file mode 100644 index 0000000..4be8cc3 --- /dev/null +++ b/proto/arrow_read_options.proto @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto3"; + +package datafusion_java; + +option java_package = "org.apache.datafusion.protobuf"; +option java_multiple_files = true; + +// Options used to read Arrow IPC files. `file_extension` has a non-null Java +// default and is always sent. The explicit Arrow schema, if present, is +// transferred separately as Arrow IPC bytes through the JNI layer (mirroring +// the parquet, csv, and ndjson paths) and is not encoded in this message. +// +// Arrow IPC files carry their own body compression (LZ4_FRAME / ZSTD, +// per-buffer rather than per-file), so unlike CSV/JSON there is no +// `FileCompressionType` field here. +message ArrowReadOptionsProto { + string file_extension = 1; +}