diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml index 541a6eb825..60e41f42b7 100644 --- a/.github/workflows/java.yml +++ b/.github/workflows/java.yml @@ -233,7 +233,7 @@ jobs: - name: Start Dependencies if: matrix.os == 'Linux' && matrix.arch == 'amd64' run: | - docker compose up --detach --wait mssql-test postgres-test + docker compose up --detach --wait flightsql-sqlite-test mssql-test postgres-test cat .env | grep -v -e '^#' | grep -e '^ADBC_' | awk NF | sed 's/"//g' | tee -a $GITHUB_ENV - name: Download thirdparty driver diff --git a/go/adbc/driver/flightsql/flightsql_connection.go b/go/adbc/driver/flightsql/flightsql_connection.go index 473d387701..4f672d298c 100644 --- a/go/adbc/driver/flightsql/flightsql_connection.go +++ b/go/adbc/driver/flightsql/flightsql_connection.go @@ -1145,7 +1145,7 @@ func (c *connectionImpl) ReadPartition(ctx context.Context, serializedPartition var info flight.FlightInfo if err := proto.Unmarshal(serializedPartition, &info); err != nil { return nil, adbc.Error{ - Msg: err.Error(), + Msg: fmt.Sprintf("[flightsql] could not unmarshal partition as FlightInfo: %v", err), Code: adbc.StatusInvalidArgument, } } @@ -1153,7 +1153,7 @@ func (c *connectionImpl) ReadPartition(ctx context.Context, serializedPartition // The driver only ever returns one endpoint. if len(info.Endpoint) != 1 { return nil, adbc.Error{ - Msg: fmt.Sprintf("Invalid partition: expected 1 endpoint, got %d", len(info.Endpoint)), + Msg: fmt.Sprintf("[flightsql] invalid partition: expected 1 endpoint, got %d", len(info.Endpoint)), Code: adbc.StatusInvalidArgument, } } diff --git a/go/adbc/driver/flightsql/flightsql_statement.go b/go/adbc/driver/flightsql/flightsql_statement.go index 85f750a0c9..eb034ec66f 100644 --- a/go/adbc/driver/flightsql/flightsql_statement.go +++ b/go/adbc/driver/flightsql/flightsql_statement.go @@ -811,7 +811,7 @@ func (s *statement) ExecutePartitions(ctx context.Context) (*arrow.Schema, adbc. data, err := proto.Marshal(partition) if err != nil { return sc, out, -1, adbc.Error{ - Msg: err.Error(), + Msg: fmt.Sprintf("[flightsql] could not marshal partition as FlightInfo: %v", err), Code: adbc.StatusInternal, } } diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java index a1f9e0f3b4..bbbae4470e 100644 --- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java +++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcStatement.java @@ -24,6 +24,7 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.types.pojo.Schema; +import org.checkerframework.checker.nullness.qual.Nullable; /** * A container for all state needed to execute a database query, such as the query itself, @@ -231,19 +232,21 @@ public String toString() { /** The partitions of a result set. */ class PartitionResult { - private final Schema schema; + private final @Nullable Schema schema; private final long affectedRows; private final List partitionDescriptors; public PartitionResult( - Schema schema, long affectedRows, List partitionDescriptors) { + @Nullable Schema schema, + long affectedRows, + List partitionDescriptors) { this.schema = schema; this.affectedRows = affectedRows; this.partitionDescriptors = partitionDescriptors; } /** Get the schema of the eventual result set. */ - public Schema getSchema() { + public @Nullable Schema getSchema() { return schema; } diff --git a/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/FlightSqlIntegrationTest.java b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/FlightSqlIntegrationTest.java new file mode 100644 index 0000000000..b5ab1c7eee --- /dev/null +++ b/java/driver/jni-validation/src/test/java/org/apache/arrow/adbc/driver/jni/FlightSqlIntegrationTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.jni; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; +import org.apache.arrow.adbc.core.AdbcConnection; +import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcDriver; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class FlightSqlIntegrationTest { + public static final String URI_ENV = "ADBC_SQLITE_FLIGHTSQL_URI"; + static String URI = System.getenv(URI_ENV); + + BufferAllocator allocator; + JniDriver driver; + AdbcDatabase db; + AdbcConnection conn; + + @BeforeAll + static void beforeAll() { + Assumptions.assumeFalse( + URI == null || URI.isEmpty(), + String.format("Must set %s to run Flight SQL integration tests", URI_ENV)); + } + + @BeforeEach + void beforeEach() throws Exception { + allocator = new RootAllocator(); + driver = new JniDriver(allocator); + Map parameters = new HashMap<>(); + JniDriver.PARAM_DRIVER.set(parameters, "adbc_driver_flightsql"); + AdbcDriver.PARAM_URI.set(parameters, URI); + db = driver.open(parameters); + conn = db.connect(); + } + + @AfterEach + void afterEach() throws Exception { + conn.close(); + db.close(); + allocator.close(); + } + + @Test + void simple() throws Exception { + try (var stmt = conn.createStatement()) { + stmt.setSqlQuery("SELECT 1 + 1 AS sum"); + try (var reader = stmt.executeQuery()) { + assertThat(reader.getReader().loadNextBatch()).isTrue(); + assertThat(reader.getReader().getVectorSchemaRoot().getVector("sum").getObject(0)) + .isEqualTo(2L); + } + } + } + + @Test + void partitioned() throws Exception { + try (var stmt = conn.createStatement()) { + stmt.setSqlQuery("SELECT 1 + 1 AS sum"); + var partitions = stmt.executePartitioned(); + assertThat(partitions.getPartitionDescriptors().size()).isEqualTo(1); + assertThat(partitions.getAffectedRows()).isEqualTo(-1); + // The test server doesn't give a schema. + assertThat(partitions.getSchema()).isNull(); + + try (var reader = + conn.readPartition(partitions.getPartitionDescriptors().get(0).getDescriptor())) { + assertThat(reader.loadNextBatch()).isTrue(); + assertThat(reader.getVectorSchemaRoot().getVector("sum").getObject(0)).isEqualTo(2L); + } + } + } +} diff --git a/java/driver/jni/src/main/cpp/jni_wrapper.cc b/java/driver/jni/src/main/cpp/jni_wrapper.cc index 562ddf3fb2..ef650fc081 100644 --- a/java/driver/jni/src/main/cpp/jni_wrapper.cc +++ b/java/driver/jni/src/main/cpp/jni_wrapper.cc @@ -34,6 +34,19 @@ namespace { +void ThrowJavaException(JNIEnv* env, const std::string& klass, + const std::string& message) { + jclass exception_klass = env->FindClass(klass.c_str()); + assert(exception_klass != nullptr); + jmethodID exception_ctor = + env->GetMethodID(exception_klass, "", "(Ljava/lang/String)V"); + assert(exception_ctor != nullptr); + jstring message_jni = env->NewStringUTF(message.c_str()); + auto exc = static_cast( + env->NewObject(exception_klass, exception_ctor, message_jni)); + env->Throw(exc); +} + /// Internal exception. Meant to be used with RaiseAdbcException and /// CHECK_ADBC_ERROR. struct AdbcException { @@ -112,19 +125,26 @@ void RaiseAdbcException(AdbcStatusCode code, const AdbcError& error) { } while (0) /// Require that a Java class exists or error. -jclass RequireImplClass(JNIEnv* env, std::string_view name) { - static std::string kPrefix = "org/apache/arrow/adbc/driver/jni/impl/"; - std::string full_name = kPrefix + std::string(name); - jclass klass = env->FindClass(full_name.c_str()); +jclass RequireClass(JNIEnv* env, const std::string& name) { + jclass klass = env->FindClass(name.c_str()); if (klass == nullptr) { + std::string message = "[JNI] Could not find class "; + message += name; throw AdbcException{ .code = ADBC_STATUS_INTERNAL, - .message = "[JNI] Could not find class " + full_name, + .message = std::move(message), }; } return klass; } +/// Require that a Java class exists or error. +jclass RequireImplClass(JNIEnv* env, std::string_view name) { + static std::string kPrefix = "org/apache/arrow/adbc/driver/jni/impl/"; + std::string full_name = kPrefix + std::string(name); + return RequireClass(env, full_name); +} + /// Require that a Java method exists or error. jmethodID RequireMethod(JNIEnv* env, jclass klass, std::string_view name, std::string_view signature) { @@ -381,6 +401,60 @@ Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementGetParameterSchem return nullptr; } +JNIEXPORT jobject JNICALL +Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementExecutePartitions( + JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) { + struct AdbcError error = ADBC_ERROR_INIT; + auto* ptr = reinterpret_cast(static_cast(handle)); + struct ArrowSchema schema = {}; + struct AdbcPartitions partitions = {}; + int64_t rows_affected = 0; + jobject result = nullptr; + + try { + jclass native_result_class = RequireImplClass(env, "NativePartitionResult"); + jmethodID native_result_ctor = + RequireMethod(env, native_result_class, "", "(JJ)V"); + jmethodID native_result_add_partition = + RequireMethod(env, native_result_class, "addPartition", "([B)V"); + + CHECK_ADBC_ERROR( + AdbcStatementExecutePartitions(ptr, &schema, &partitions, &rows_affected, &error), + error); + + result = env->NewObject(native_result_class, native_result_ctor, rows_affected, + static_cast(reinterpret_cast(&schema))); + if (env->ExceptionCheck()) goto cleanupall; + + for (size_t i = 0; i < partitions.num_partitions; i++) { + size_t length = partitions.partition_lengths[i]; + jbyteArray partition = env->NewByteArray(static_cast(length)); + env->SetByteArrayRegion(partition, 0, static_cast(length), + reinterpret_cast(partitions.partitions[i])); + if (env->ExceptionCheck()) goto cleanupall; + env->CallObjectMethod(result, native_result_add_partition, partition); + if (env->ExceptionCheck()) goto cleanupall; + } + } catch (const AdbcException& e) { + e.ThrowJavaException(env); + } + + // We can't release schema, but we copied out the partitions + if (partitions.release != nullptr) { + partitions.release(&partitions); + } + return result; + +cleanupall: + if (schema.release != nullptr) { + schema.release(&schema); + } + if (partitions.release != nullptr) { + partitions.release(&partitions); + } + return nullptr; +} + JNIEXPORT jobject JNICALL Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_statementExecuteQuery( JNIEnv* env, [[maybe_unused]] jclass self, jlong handle) { @@ -978,6 +1052,95 @@ Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionRollback( } } +JNIEXPORT jobject JNICALL +Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_connectionReadPartition( + JNIEnv* env, [[maybe_unused]] jclass self, jlong handle, jobject partition) { + struct AdbcError error = ADBC_ERROR_INIT; + auto* conn = reinterpret_cast(static_cast(handle)); + struct ArrowArrayStream out = {}; + size_t serialized_length = 0; + const uint8_t* serialized_partition = nullptr; + std::vector allocated_partition; + + try { + jclass bb_class = RequireClass(env, "java/nio/ByteBuffer"); + jmethodID bb_remaining = RequireMethod(env, bb_class, "remaining", "()I"); + + if (!env->IsInstanceOf(partition, bb_class)) { + ThrowJavaException(env, "java/lang/IllegalArgumentException", + "Partition must be a ByteBuffer"); + return nullptr; + } + jint remaining = env->CallIntMethod(partition, bb_remaining); + if (remaining < 0) { + ThrowJavaException(env, "java/lang/IllegalArgumentException", + "ByteBuffer remaining() must be non-negative"); + return nullptr; + } + serialized_length = static_cast(remaining); + + // fast path (if direct buffer) + void* buf = env->GetDirectBufferAddress(partition); + if (buf) { + serialized_partition = static_cast(buf); + } + + // middle path (backing array) + if (!serialized_partition) { + jmethodID bb_has_array = RequireMethod(env, bb_class, "hasArray", "()Z"); + jmethodID bb_array = RequireMethod(env, bb_class, "array", "()[B"); + jmethodID bb_array_offset = RequireMethod(env, bb_class, "arrayOffset", "()I"); + jboolean has_array = env->CallBooleanMethod(partition, bb_has_array); + if (env->ExceptionCheck()) return nullptr; + if (has_array) { + jint array_offset = env->CallIntMethod(partition, bb_array_offset); + if (env->ExceptionCheck()) return nullptr; + + auto array = + reinterpret_cast(env->CallObjectMethod(partition, bb_array)); + if (env->ExceptionCheck()) return nullptr; + + assert(serialized_length <= static_cast(env->GetArrayLength(array))); + allocated_partition.resize(serialized_length); + env->GetByteArrayRegion(array, array_offset, + static_cast(serialized_length), + reinterpret_cast(allocated_partition.data())); + serialized_partition = allocated_partition.data(); + } + } + + // slow path (copy) + if (!serialized_partition) { + jmethodID bb_get = RequireMethod(env, bb_class, "get", "([B)Ljava/nio/ByteBuffer;"); + jbyteArray temp = env->NewByteArray(static_cast(serialized_length)); + if (!temp) { + ThrowJavaException(env, "java/lang/OutOfMemoryError", + "Failed to allocate byte array for partition"); + return nullptr; + } + + env->CallVoidMethod(partition, bb_get, temp); + if (env->ExceptionCheck()) return nullptr; + + allocated_partition.resize(serialized_length); + env->GetByteArrayRegion(temp, 0, static_cast(serialized_length), + reinterpret_cast(allocated_partition.data())); + serialized_partition = allocated_partition.data(); + } + + assert(serialized_partition != nullptr); + + CHECK_ADBC_ERROR(AdbcConnectionReadPartition(conn, serialized_partition, + serialized_length, &out, &error), + error); + + return MakeNativeQueryResult(env, -1, &out); + } catch (const AdbcException& e) { + e.ThrowJavaException(env); + } + return nullptr; +} + JNIEXPORT jbyteArray JNICALL Java_org_apache_arrow_adbc_driver_jni_impl_NativeAdbc_databaseGetOptionBytes( JNIEnv* env, [[maybe_unused]] jclass self, jlong handle, jstring key) { diff --git a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java index 5a121ebb9e..46203c8747 100644 --- a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java +++ b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniConnection.java @@ -17,6 +17,7 @@ package org.apache.arrow.adbc.driver.jni; +import java.nio.ByteBuffer; import org.apache.arrow.adbc.core.AdbcConnection; import org.apache.arrow.adbc.core.AdbcException; import org.apache.arrow.adbc.core.AdbcStatement; @@ -254,6 +255,11 @@ public void setCurrentDbSchema(String dbSchema) throws AdbcException { setOption(JniDriver.CURRENT_DB_SCHEMA, dbSchema); } + @Override + public ArrowReader readPartition(ByteBuffer descriptor) throws AdbcException { + return JniLoader.INSTANCE.connectionReadPartition(handle, descriptor).importStream(allocator); + } + @Override public void close() { handle.close(); diff --git a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java index ec8fd46b20..dd4f581b25 100644 --- a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java +++ b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/JniStatement.java @@ -21,6 +21,7 @@ import org.apache.arrow.adbc.core.AdbcStatement; import org.apache.arrow.adbc.core.TypedKey; import org.apache.arrow.adbc.driver.jni.impl.JniLoader; +import org.apache.arrow.adbc.driver.jni.impl.NativePartitionResult; import org.apache.arrow.adbc.driver.jni.impl.NativeQueryResult; import org.apache.arrow.adbc.driver.jni.impl.NativeStatementHandle; import org.apache.arrow.c.ArrowArray; @@ -72,6 +73,13 @@ private void exportBind() throws AdbcException { } } + @Override + public PartitionResult executePartitioned() throws AdbcException { + exportBind(); + NativePartitionResult result = JniLoader.INSTANCE.statementExecutePartitions(handle); + return result.importResult(allocator); + } + @Override public QueryResult executeQuery() throws AdbcException { exportBind(); diff --git a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java index ebfc3c51ce..b081b4b0f2 100644 --- a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java +++ b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/JniLoader.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.Map; @@ -85,6 +86,11 @@ public void statementCancel(NativeStatementHandle statement) throws AdbcExceptio NativeAdbc.statementCancel(statement.getStatementHandle()); } + public NativePartitionResult statementExecutePartitions(NativeStatementHandle statement) + throws AdbcException { + return NativeAdbc.statementExecutePartitions(statement.getStatementHandle()); + } + public NativeQueryResult statementExecuteQuery(NativeStatementHandle statement) throws AdbcException { return NativeAdbc.statementExecuteQuery(statement.getStatementHandle()); @@ -207,6 +213,11 @@ public void connectionRollback(NativeConnectionHandle connection) throws AdbcExc NativeAdbc.connectionRollback(connection.getConnectionHandle()); } + public NativeQueryResult connectionReadPartition( + NativeConnectionHandle connection, ByteBuffer partition) throws AdbcException { + return NativeAdbc.connectionReadPartition(connection.getConnectionHandle(), partition); + } + public byte[] connectionGetOptionBytes(NativeConnectionHandle handle, String key) throws AdbcException { return NativeAdbc.connectionGetOptionBytes(handle.getConnectionHandle(), key); diff --git a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java index fb93dacc8f..68ad348a7d 100644 --- a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java +++ b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativeAdbc.java @@ -17,6 +17,7 @@ package org.apache.arrow.adbc.driver.jni.impl; +import java.nio.ByteBuffer; import org.apache.arrow.adbc.core.AdbcException; /** All the JNI methods. Don't use this directly, prefer {@link JniLoader}. */ @@ -48,6 +49,8 @@ static native NativeDatabaseHandle openDatabase(int version, String[] parameters static native void statementPrepare(long handle) throws AdbcException; + static native NativePartitionResult statementExecutePartitions(long handle) throws AdbcException; + static native NativeQueryResult statementExecuteQuery(long handle) throws AdbcException; static native NativeSchemaResult statementExecuteSchema(long handle) throws AdbcException; @@ -101,6 +104,9 @@ static native NativeSchemaResult connectionGetTableSchema( static native void connectionRollback(long handle) throws AdbcException; + static native NativeQueryResult connectionReadPartition(long handle, ByteBuffer partition) + throws AdbcException; + static native byte[] connectionGetOptionBytes(long handle, String key) throws AdbcException; static native double connectionGetOptionDouble(long handle, String key) throws AdbcException; diff --git a/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativePartitionResult.java b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativePartitionResult.java new file mode 100644 index 0000000000..9b5526f6a2 --- /dev/null +++ b/java/driver/jni/src/main/java/org/apache/arrow/adbc/driver/jni/impl/NativePartitionResult.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.jni.impl; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.arrow.adbc.core.AdbcStatement; +import org.apache.arrow.adbc.core.PartitionDescriptor; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.types.pojo.Schema; + +public class NativePartitionResult { + private final List partitions; + private final long rowsAffected; + private final ArrowSchema.Snapshot schemaSnapshot; + + public NativePartitionResult(long rowsAffected, long cDataSchema) { + this.partitions = new ArrayList<>(); + this.rowsAffected = rowsAffected; + // Immediately snapshot the stream to take ownership of the contents. + // The address may point to a stack-allocated struct that becomes invalid + // after the JNI call returns. + this.schemaSnapshot = ArrowSchema.wrap(cDataSchema).snapshot(); + } + + /** For use by JNI code only. */ + public void addPartition(byte[] partition) { + this.partitions.add(new PartitionDescriptor(ByteBuffer.wrap(partition))); + } + + public AdbcStatement.PartitionResult importResult(BufferAllocator allocator) { + try (final ArrowSchema schemaHandle = ArrowSchema.allocateNew(allocator)) { + // It's possible the driver doesn't give us a schema. + schemaHandle.save(schemaSnapshot); + // TODO(lidavidm): work out dictionaries + Schema schema = null; + if (schemaSnapshot.release != 0) { + schema = Data.importSchema(allocator, schemaHandle, null); + } + return new AdbcStatement.PartitionResult(schema, rowsAffected, partitions); + } + } +}