Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ ADBC_JDBC_POSTGRESQL_URL=localhost:5432/postgres
ADBC_JDBC_POSTGRESQL_USER=postgres
ADBC_JDBC_POSTGRESQL_PASSWORD=password
ADBC_JDBC_POSTGRESQL_DATABASE=postgres
ADBC_MSSQL_TEST_URI="sqlserver://SA:Password1!@localhost:1433"
ADBC_POSTGRESQL_TEST_URI="postgresql://localhost:5432/postgres?user=postgres&password=password"
ADBC_SQLITE_FLIGHTSQL_URI=grpc+tcp://localhost:8080
ADBC_TEST_FLIGHTSQL_URI=grpc+tls://localhost:41414
Expand Down
30 changes: 27 additions & 3 deletions .github/workflows/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ jobs:
cache: "maven"
distribution: "temurin"
java-version: 11
- name: Build/Test
- name: Build
run: |
set -x
pushd artifacts
Expand All @@ -228,10 +228,34 @@ jobs:
done
popd
cp -r artifacts/*/jni/adbc_driver_jni java/driver/jni/src/main/resources
env BUILD_JNI=ON ./ci/scripts/java_build.sh $(pwd)

- name: Start Dependencies
if: matrix.os == 'Linux' && matrix.arch == 'amd64'
run: |
docker compose up --detach --wait flightsql-sqlite-test mssql-test postgres-test
cat .env | grep -v -e '^#' | grep -e '^ADBC_' | awk NF | sed 's/"//g' | tee -a $GITHUB_ENV

- name: Download thirdparty driver
if: matrix.os == 'Linux' && matrix.arch == 'amd64'
run: |
wget https://dbc-cdn.columnar.tech/mssql/v1.3.1/mssql_linux_amd64_v1.3.1.tar.gz
echo "e6723cf417403f313fb75c1ac03aea9b9ff857d4a947608c8ae44eacc1aa22b3 mssql_linux_amd64_v1.3.1.tar.gz" > mssql_linux_amd64_v1.3.1.tar.gz.sha256
sha256sum -c mssql_linux_amd64_v1.3.1.tar.gz.sha256

tar xvf mssql_linux_amd64_v1.3.1.tar.gz
mkdir -p ~/.config/adbc/drivers/
mv libadbc_driver_mssql.so ~/.config/adbc/drivers/
echo "manifest_version = 1" > ~/.config/adbc/drivers/mssql.toml
echo "[Driver]" >> ~/.config/adbc/drivers/mssql.toml
echo "shared = '$HOME/.config/adbc/drivers/libadbc_driver_mssql.so'" >> ~/.config/adbc/drivers/mssql.toml
cat ~/.config/adbc/drivers/mssql.toml

- name: Test
run: |
for driver in artifacts/*/driver; do
export LD_LIBRARY_PATH=$(pwd)/$driver:${LD_LIBRARY_PATH:-}
export DYLD_LIBRARY_PATH=$(pwd)/$driver:${DYLD_LIBRARY_PATH:-}
done
env BUILD_JNI=ON ./ci/scripts/java_build.sh $(pwd)
cd java
mvn -B -Pjni test -pl :adbc-driver-jni
mvn -B -Pjni test -pl :adbc-driver-jni -pl :adbc-driver-jni-validation
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ export ADBC_USE_ASAN=OFF
export ADBC_USE_UBSAN=OFF
export BUILD_ALL=OFF
export BUILD_DRIVER_MANAGER=ON
export BUILD_DRIVER_POSTGRESQL=ON
export BUILD_DRIVER_SQLITE=ON
./ci/scripts/cpp_build.sh $(pwd) $(pwd)/build $(pwd)/local

Expand Down
4 changes: 2 additions & 2 deletions go/adbc/driver/flightsql/flightsql_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,15 +1145,15 @@ func (c *connectionImpl) ReadPartition(ctx context.Context, serializedPartition
var info flight.FlightInfo
if err := proto.Unmarshal(serializedPartition, &info); err != nil {
return nil, adbc.Error{
Msg: err.Error(),
Msg: fmt.Sprintf("[flightsql] could not unmarshal partition as FlightInfo: %v", err),
Code: adbc.StatusInvalidArgument,
}
}

// The driver only ever returns one endpoint.
if len(info.Endpoint) != 1 {
return nil, adbc.Error{
Msg: fmt.Sprintf("Invalid partition: expected 1 endpoint, got %d", len(info.Endpoint)),
Msg: fmt.Sprintf("[flightsql] invalid partition: expected 1 endpoint, got %d", len(info.Endpoint)),
Code: adbc.StatusInvalidArgument,
}
}
Expand Down
2 changes: 1 addition & 1 deletion go/adbc/driver/flightsql/flightsql_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func (s *statement) ExecutePartitions(ctx context.Context) (*arrow.Schema, adbc.
data, err := proto.Marshal(partition)
if err != nil {
return sc, out, -1, adbc.Error{
Msg: err.Error(),
Msg: fmt.Sprintf("[flightsql] could not marshal partition as FlightInfo: %v", err),
Code: adbc.StatusInternal,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface AdbcConnection extends AutoCloseable, AdbcOptions {
* @since ADBC API revision 1.1.0
*/
default void cancel() throws AdbcException {
throw AdbcException.notImplemented("Statement does not support cancel");
throw AdbcException.notImplemented("Connection does not support cancel");
}

/** Commit the pending transaction. */
Expand All @@ -63,6 +63,18 @@ default AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode)
"Connection does not support bulkIngest(String, BulkIngestMode)");
}

/**
* Create a new statement to bulk insert a {@link VectorSchemaRoot} into a table.
*
* <p>Bind data to the statement, then call {@link AdbcStatement#executeUpdate()}. See {@link
* BulkIngestMode} for description of behavior around creating tables.
*/
default AdbcStatement bulkIngest(
String targetTableName, BulkIngestMode mode, IngestOption... options) throws AdbcException {
throw AdbcException.notImplemented(
"Connection does not support bulkIngest(String, BulkIngestMode, IngestOption...)");
}

/**
* Create a result set from a serialized PartitionDescriptor.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A container for all state needed to execute a database query, such as the query itself,
Expand Down Expand Up @@ -231,19 +232,21 @@ public String toString() {

/** The partitions of a result set. */
class PartitionResult {
private final Schema schema;
private final @Nullable Schema schema;
private final long affectedRows;
private final List<PartitionDescriptor> partitionDescriptors;

public PartitionResult(
Schema schema, long affectedRows, List<PartitionDescriptor> partitionDescriptors) {
@Nullable Schema schema,
long affectedRows,
List<PartitionDescriptor> partitionDescriptors) {
this.schema = schema;
this.affectedRows = affectedRows;
this.partitionDescriptors = partitionDescriptors;
}

/** Get the schema of the eventual result set. */
public Schema getSchema() {
public @Nullable Schema getSchema() {
return schema;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adbc.core;

import java.util.Objects;
import org.checkerframework.checker.nullness.qual.Nullable;

public interface IngestOption {
TemporaryIngestOption TEMPORARY = new TemporaryIngestOption(true);
TemporaryIngestOption NOT_TEMPORARY = new TemporaryIngestOption(false);

static IngestOption targetCatalog(String catalog) {
return new TargetNamespaceIngestOption(catalog, null);
}

static IngestOption targetDbSchema(String dbSchema) {
return new TargetNamespaceIngestOption(null, dbSchema);
}

static IngestOption targetNamespace(@Nullable String catalog, @Nullable String dbSchema) {
return new TargetNamespaceIngestOption(catalog, dbSchema);
}

class TemporaryIngestOption implements IngestOption {
boolean temporary;

TemporaryIngestOption(boolean temporary) {
this.temporary = temporary;
}

public boolean isTemporary() {
return temporary;
}

@Override
public boolean equals(@Nullable Object o) {
if (o == null || getClass() != o.getClass()) return false;
TemporaryIngestOption that = (TemporaryIngestOption) o;
return temporary == that.temporary;
}

@Override
public int hashCode() {
return Objects.hashCode(temporary);
}
}

class TargetNamespaceIngestOption implements IngestOption {
private final @Nullable String targetCatalog;
private final @Nullable String targetDbSchema;

public TargetNamespaceIngestOption(
@Nullable String targetCatalog, @Nullable String targetDbSchema) {
this.targetCatalog = targetCatalog;
this.targetDbSchema = targetDbSchema;
}

public @Nullable String getTargetCatalog() {
return targetCatalog;
}

public @Nullable String getTargetDbSchema() {
return targetDbSchema;
}

@Override
public boolean equals(@Nullable Object o) {
if (o == null || getClass() != o.getClass()) return false;
TargetNamespaceIngestOption that = (TargetNamespaceIngestOption) o;
return Objects.equals(targetCatalog, that.targetCatalog)
&& Objects.equals(targetDbSchema, that.targetDbSchema);
}

@Override
public int hashCode() {
return Objects.hash(targetCatalog, targetDbSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>adbc-driver-jni-validation-sqlite</artifactId>
<artifactId>adbc-driver-jni-validation</artifactId>
<packaging>jar</packaging>
<name>Arrow ADBC Driver JNI Validation with SQLite</name>
<description>Tests validating the JNI driver against SQLite.</description>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.adbc.driver.jni;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashMap;
import java.util.Map;
import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcDatabase;
import org.apache.arrow.adbc.core.AdbcDriver;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class FlightSqlIntegrationTest {
public static final String URI_ENV = "ADBC_SQLITE_FLIGHTSQL_URI";
static String URI = System.getenv(URI_ENV);

BufferAllocator allocator;
JniDriver driver;
AdbcDatabase db;
AdbcConnection conn;

@BeforeAll
static void beforeAll() {
Assumptions.assumeFalse(
URI == null || URI.isEmpty(),
String.format("Must set %s to run Flight SQL integration tests", URI_ENV));
}

@BeforeEach
void beforeEach() throws Exception {
allocator = new RootAllocator();
driver = new JniDriver(allocator);
Map<String, Object> parameters = new HashMap<>();
JniDriver.PARAM_DRIVER.set(parameters, "adbc_driver_flightsql");
AdbcDriver.PARAM_URI.set(parameters, URI);
db = driver.open(parameters);
conn = db.connect();
}

@AfterEach
void afterEach() throws Exception {
conn.close();
db.close();
allocator.close();
}

@Test
void simple() throws Exception {
try (var stmt = conn.createStatement()) {
stmt.setSqlQuery("SELECT 1 + 1 AS sum");
try (var reader = stmt.executeQuery()) {
assertThat(reader.getReader().loadNextBatch()).isTrue();
assertThat(reader.getReader().getVectorSchemaRoot().getVector("sum").getObject(0))
.isEqualTo(2L);
}
}
}

@Test
void partitioned() throws Exception {
try (var stmt = conn.createStatement()) {
stmt.setSqlQuery("SELECT 1 + 1 AS sum");
var partitions = stmt.executePartitioned();
assertThat(partitions.getPartitionDescriptors().size()).isEqualTo(1);
assertThat(partitions.getAffectedRows()).isEqualTo(-1);
// The test server doesn't give a schema.
assertThat(partitions.getSchema()).isNull();

try (var reader =
conn.readPartition(partitions.getPartitionDescriptors().get(0).getDescriptor())) {
assertThat(reader.loadNextBatch()).isTrue();
assertThat(reader.getVectorSchemaRoot().getVector("sum").getObject(0)).isEqualTo(2L);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@

import org.apache.arrow.adbc.driver.testsuite.AbstractConnectionTest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class JniSqliteConnectionTest extends AbstractConnectionTest {
@BeforeAll
static void beforeAll() {
quirks = new JniSqliteQuirks();
}

@Test
@Disabled
@Override
protected void readOnly() {}

@Test
@Disabled
@Override
protected void isolationLevel() {}
}
Loading
Loading