Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions core/src/main/java/org/apache/datafusion/ArrowReadOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.datafusion.protobuf.ArrowReadOptionsProto;

/**
* Configuration knobs for Arrow IPC sources passed to {@link SessionContext#registerArrow(String,
* String, ArrowReadOptions)} and {@link SessionContext#readArrow(String, ArrowReadOptions)}.
*
* <p>Mirrors the subset of DataFusion's {@code ArrowReadOptions} that maps onto the Java surface
* today: {@code fileExtension} (default {@code ".arrow"}) and an explicit Arrow {@code schema} that
* bypasses on-read schema inference. {@code tablePartitionCols} is intentionally deferred --
* neither Parquet nor CSV expose Hive-style partitioning on the Java side yet.
*
* <p>Arrow IPC files carry their own body compression (LZ4_FRAME / ZSTD per-buffer) inside the file
* format itself, so unlike CSV / NDJSON there is no {@code FileCompressionType} setter.
*/
public final class ArrowReadOptions {

private String fileExtension = ".arrow";
private Schema schema;

public ArrowReadOptions fileExtension(String ext) {
this.fileExtension = ext;
return this;
}

public ArrowReadOptions schema(Schema schema) {
this.schema = schema;
return this;
}

byte[] toBytes() {
return ArrowReadOptionsProto.newBuilder().setFileExtension(fileExtension).build().toByteArray();
}

Schema schema() {
return schema;
}
}
69 changes: 69 additions & 0 deletions core/src/main/java/org/apache/datafusion/SessionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,69 @@ public DataFrame readParquet(String path, ParquetReadOptions options) {
return new DataFrame(dfHandle);
}

public void registerArrow(String name, String path) {
registerArrow(name, path, new ArrowReadOptions());
}

/**
* Register an Arrow IPC file (or directory of Arrow IPC files) as a table with the supplied
* {@link ArrowReadOptions}.
*
* @throws IllegalArgumentException if any of {@code name}, {@code path}, or {@code options} is
* {@code null}.
* @throws RuntimeException if registration fails (path not found, schema mismatch, etc.).
*/
public void registerArrow(String name, String path, ArrowReadOptions options) {
if (nativeHandle == 0) {
throw new IllegalStateException("SessionContext is closed");
}
if (name == null) {
throw new IllegalArgumentException("registerArrow name must be non-null");
}
if (path == null) {
throw new IllegalArgumentException("registerArrow path must be non-null");
}
if (options == null) {
throw new IllegalArgumentException("registerArrow options must be non-null");
}
registerArrowWithOptions(
nativeHandle,
name,
path,
options.toBytes(),
options.schema() != null ? serializeSchemaIpc(options.schema()) : null);
}

/** Read an Arrow IPC file as a {@link DataFrame} without registering it. */
public DataFrame readArrow(String path) {
return readArrow(path, new ArrowReadOptions());
}

/**
* Read an Arrow IPC file as a {@link DataFrame} with the supplied {@link ArrowReadOptions}.
*
* @throws IllegalArgumentException if {@code path} or {@code options} is {@code null}.
* @throws RuntimeException if the read fails.
*/
public DataFrame readArrow(String path, ArrowReadOptions options) {
if (nativeHandle == 0) {
throw new IllegalStateException("SessionContext is closed");
}
if (path == null) {
throw new IllegalArgumentException("readArrow path must be non-null");
}
if (options == null) {
throw new IllegalArgumentException("readArrow options must be non-null");
}
long dfHandle =
readArrowWithOptions(
nativeHandle,
path,
options.toBytes(),
options.schema() != null ? serializeSchemaIpc(options.schema()) : null);
return new DataFrame(dfHandle);
}

private static byte[] serializeSchemaIpc(Schema schema) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (BufferAllocator allocator = new RootAllocator();
Expand Down Expand Up @@ -277,5 +340,11 @@ private static native void registerCsvWithOptions(
private static native long readCsvWithOptions(
long handle, String path, byte[] optionsBytes, byte[] schemaIpcBytes);

private static native void registerArrowWithOptions(
long handle, String name, String path, byte[] optionsBytes, byte[] schemaIpcBytes);

private static native long readArrowWithOptions(
long handle, String path, byte[] optionsBytes, byte[] schemaIpcBytes);

private static native void closeSessionContext(long handle);
}
58 changes: 58 additions & 0 deletions core/src/test/java/org/apache/datafusion/ArrowReadOptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;

import java.util.List;

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.datafusion.protobuf.ArrowReadOptionsProto;
import org.junit.jupiter.api.Test;

import com.google.protobuf.InvalidProtocolBufferException;

class ArrowReadOptionsTest {

@Test
void defaultsRoundTripThroughProto() throws InvalidProtocolBufferException {
ArrowReadOptionsProto p = ArrowReadOptionsProto.parseFrom(new ArrowReadOptions().toBytes());
assertEquals(".arrow", p.getFileExtension());
}

@Test
void fileExtensionRoundTripsThroughProto() throws InvalidProtocolBufferException {
ArrowReadOptionsProto p =
ArrowReadOptionsProto.parseFrom(new ArrowReadOptions().fileExtension(".ipc").toBytes());
assertEquals(".ipc", p.getFileExtension());
}

@Test
void schemaIsHeldByReferenceAndNotInProto() {
Schema schema =
new Schema(List.of(new Field("x", FieldType.nullable(new ArrowType.Int(32, true)), null)));
ArrowReadOptions opts = new ArrowReadOptions().schema(schema);
assertSame(schema, opts.schema());
}
}
192 changes: 192 additions & 0 deletions core/src/test/java/org/apache/datafusion/SessionContextArrowTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.datafusion;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class SessionContextArrowTest {

/**
* Write three rows of {@code (id INT, name UTF8)} as a single Arrow IPC file using arrow-vector's
* built-in file writer. Returns the path the test can hand to {@code registerArrow} / {@code
* readArrow}.
*/
private static Path writePeopleArrow(Path dir, String name) throws IOException {
Schema schema =
new Schema(
List.of(
new Field("id", FieldType.notNullable(new ArrowType.Int(32, true)), null),
new Field("name", FieldType.notNullable(new ArrowType.Utf8()), null)));
Path file = dir.resolve(name);
try (BufferAllocator allocator = new RootAllocator();
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
IntVector id = (IntVector) root.getVector("id");
VarCharVector nameVec = (VarCharVector) root.getVector("name");
id.allocateNew(3);
nameVec.allocateNew(3);
id.set(0, 1);
id.set(1, 2);
id.set(2, 3);
nameVec.setSafe(0, "alice".getBytes());
nameVec.setSafe(1, "bob".getBytes());
nameVec.setSafe(2, "carol".getBytes());
root.setRowCount(3);

try (FileChannel ch =
FileChannel.open(file, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
ArrowFileWriter writer = new ArrowFileWriter(root, null, ch)) {
writer.start();
writer.writeBatch();
writer.end();
}
}
return file;
}

@Test
void registerArrowInfersSchemaAndCounts(@TempDir Path tempDir) throws Exception {
Path file = writePeopleArrow(tempDir, "people.arrow");

try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext()) {
ctx.registerArrow("people", file.toAbsolutePath().toString());

try (DataFrame df = ctx.sql("SELECT COUNT(*) FROM people");
ArrowReader reader = df.collect(allocator)) {
assertTrue(reader.loadNextBatch());
BigIntVector count = (BigIntVector) reader.getVectorSchemaRoot().getVector(0);
assertEquals(3L, count.get(0));
}

try (DataFrame df = ctx.sql("SELECT name FROM people WHERE id = 2");
ArrowReader reader = df.collect(allocator)) {
assertTrue(reader.loadNextBatch());
VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertEquals(1, root.getRowCount());
VarCharVector names = (VarCharVector) root.getVector(0);
assertEquals("bob", new String(names.get(0)));
}
}
}

@Test
void readArrowYieldsTheStoredRows(@TempDir Path tempDir) throws Exception {
Path file = writePeopleArrow(tempDir, "people.arrow");

try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
DataFrame df = ctx.readArrow(file.toAbsolutePath().toString());
ArrowReader reader = df.collect(allocator)) {
long total = 0;
while (reader.loadNextBatch()) {
total += reader.getVectorSchemaRoot().getRowCount();
}
assertEquals(3L, total);
}
}

@Test
void registerArrowWithCustomExtension(@TempDir Path tempDir) throws Exception {
Path file = writePeopleArrow(tempDir, "people.ipc");

try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext()) {
ctx.registerArrow(
"t", file.toAbsolutePath().toString(), new ArrowReadOptions().fileExtension(".ipc"));

try (DataFrame df = ctx.sql("SELECT SUM(id) FROM t");
ArrowReader reader = df.collect(allocator)) {
assertTrue(reader.loadNextBatch());
BigIntVector sum = (BigIntVector) reader.getVectorSchemaRoot().getVector(0);
assertEquals(6L, sum.get(0));
}
}
}

@Test
void readArrowWithExplicitSchemaIsAccepted(@TempDir Path tempDir) throws Exception {
// Explicit schema overrides on-read inference. We supply the same schema the
// file actually has, so query results stay correct; the test pins that the
// explicit-schema code path is plumbed through and accepted.
Path file = writePeopleArrow(tempDir, "people.arrow");
Schema schema =
new Schema(
List.of(
new Field("id", FieldType.notNullable(new ArrowType.Int(32, true)), null),
new Field("name", FieldType.notNullable(new ArrowType.Utf8()), null)));

try (BufferAllocator allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
DataFrame df =
ctx.readArrow(file.toAbsolutePath().toString(), new ArrowReadOptions().schema(schema));
ArrowReader reader = df.collect(allocator)) {
assertTrue(reader.loadNextBatch());
VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertEquals(3, root.getRowCount());
assertEquals("id", root.getSchema().getFields().get(0).getName());
assertEquals("name", root.getSchema().getFields().get(1).getName());
}
}

@Test
void registerArrowRejectsNullArguments() {
try (SessionContext ctx = new SessionContext()) {
ArrowReadOptions opts = new ArrowReadOptions();
assertThrows(IllegalArgumentException.class, () -> ctx.registerArrow(null, "/p"));
assertThrows(IllegalArgumentException.class, () -> ctx.registerArrow("t", null));
assertThrows(IllegalArgumentException.class, () -> ctx.registerArrow(null, "/p", opts));
assertThrows(IllegalArgumentException.class, () -> ctx.registerArrow("t", null, opts));
assertThrows(IllegalArgumentException.class, () -> ctx.registerArrow("t", "/p", null));
}
}

@Test
void readArrowRejectsNullArguments() {
try (SessionContext ctx = new SessionContext()) {
ArrowReadOptions opts = new ArrowReadOptions();
assertThrows(IllegalArgumentException.class, () -> ctx.readArrow(null));
assertThrows(IllegalArgumentException.class, () -> ctx.readArrow(null, opts));
assertThrows(IllegalArgumentException.class, () -> ctx.readArrow("/p", null));
}
}
}
Loading