Skip to content

Commit

Permalink
馃悶 Snowflake destination: use pooled connections (#10342)
Browse files Browse the repository at this point in the history
* Use data source conn supplier for snowflake database

* Format code

* Reuse the same database in integration tests

* Close query stream

* Refactor snowflake staging sql operations

* Close result set

* Add annotations

* Bump version

* Bump version in seed
  • Loading branch information
tuliren committed Feb 16, 2022
1 parent 7c93ef1 commit c27e2a0
Show file tree
Hide file tree
Showing 17 changed files with 182 additions and 210 deletions.
Expand Up @@ -185,7 +185,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.11
dockerImageTag: 0.4.12
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
- name: MariaDB ColumnStore
Expand Down
Expand Up @@ -3817,7 +3817,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.11"
- dockerImage: "airbyte/destination-snowflake:0.4.12"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
21 changes: 21 additions & 0 deletions airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.db.mongodb.MongoDatabase;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import lombok.val;
Expand Down Expand Up @@ -198,6 +199,10 @@ private static BasicDataSource createBasicDataSource(final String username,
Optional.empty());
}

/**
* Prefer to use the method that takes in the connection properties as a map.
*/
@Deprecated
private static BasicDataSource createBasicDataSource(final String username,
final String password,
final String jdbcConnectionString,
Expand All @@ -214,6 +219,22 @@ private static BasicDataSource createBasicDataSource(final String username,
return connectionPool;
}

public static BasicDataSource createBasicDataSource(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final Map<String, String> connectionProperties) {
final BasicDataSource connectionPool = new BasicDataSource();
connectionPool.setDriverClassName(driverClassName);
connectionPool.setUsername(username);
connectionPool.setPassword(password);
connectionPool.setInitialSize(0);
connectionPool.setMaxTotal(5);
connectionPool.setUrl(jdbcConnectionString);
connectionProperties.forEach(connectionPool::addConnectionProperty);
return connectionPool;
}

public static BigQueryDatabase createBigQueryDatabase(final String projectId, final String jsonCreds) {
return new BigQueryDatabase(projectId, jsonCreds);
}
Expand Down
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc;

import java.sql.Connection;
import java.sql.SQLException;

public interface CloseableConnectionSupplier extends AutoCloseable {

Connection getConnection() throws SQLException;

}
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc;

import java.io.Closeable;
import java.sql.Connection;
import java.sql.SQLException;
import javax.sql.DataSource;

public class DataSourceConnectionSupplier implements CloseableConnectionSupplier {

private final DataSource dataSource;

public DataSourceConnectionSupplier(final DataSource dataSource) {
this.dataSource = dataSource;
}

@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}

@Override
public void close() throws Exception {
// Just a safety in case we are using a datasource implementation that requires closing.
// BasicDataSource from apache does since it also provides a pooling mechanism to reuse connections.

if (dataSource instanceof AutoCloseable) {
((AutoCloseable) dataSource).close();
}
if (dataSource instanceof Closeable) {
((Closeable) dataSource).close();
}
}

}
Expand Up @@ -4,10 +4,10 @@

package io.airbyte.db.jdbc;

import com.google.errorprone.annotations.MustBeClosed;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import java.io.Closeable;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -42,11 +42,6 @@ public DefaultJdbcDatabase(final CloseableConnectionSupplier connectionSupplier,
this.connectionSupplier = connectionSupplier;
}

public DefaultJdbcDatabase(final CloseableConnectionSupplier connectionSupplier) {
super(JdbcUtils.getDefaultSourceOperations());
this.connectionSupplier = connectionSupplier;
}

@Override
public void execute(final CheckedConsumer<Connection, SQLException> query) throws SQLException {
try (final Connection connection = connectionSupplier.getConnection()) {
Expand All @@ -58,12 +53,14 @@ public void execute(final CheckedConsumer<Connection, SQLException> query) throw
public <T> List<T> bufferedResultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
try (final Connection connection = connectionSupplier.getConnection()) {
return toStream(query.apply(connection), recordTransform).collect(Collectors.toList());
try (final Connection connection = connectionSupplier.getConnection();
final Stream<T> results = toStream(query.apply(connection), recordTransform)) {
return results.collect(Collectors.toList());
}
}

@Override
@MustBeClosed
public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
Expand Down Expand Up @@ -101,6 +98,7 @@ public DatabaseMetaData getMetaData() throws SQLException {
* @throws SQLException SQL related exceptions.
*/
@Override
@MustBeClosed
public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
Expand All @@ -121,38 +119,4 @@ public void close() throws Exception {
connectionSupplier.close();
}

public interface CloseableConnectionSupplier extends AutoCloseable {

Connection getConnection() throws SQLException;

}

public static final class DataSourceConnectionSupplier implements CloseableConnectionSupplier {

private final DataSource dataSource;

public DataSourceConnectionSupplier(final DataSource dataSource) {
this.dataSource = dataSource;
}

@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}

@Override
public void close() throws Exception {
// Just a safety in case we are using a datasource implementation that requires closing.
// BasicDataSource from apache does since it also provides a pooling mechanism to reuse connections.

if (dataSource instanceof AutoCloseable) {
((AutoCloseable) dataSource).close();
}
if (dataSource instanceof Closeable) {
((Closeable) dataSource).close();
}
}

}

}
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.db.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.errorprone.annotations.MustBeClosed;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
Expand Down Expand Up @@ -65,13 +66,15 @@ public void executeWithinTransaction(final List<String> queries) throws SQLExcep
* @param <T> type that each record will be mapped to
* @return stream of records that the result set is mapped to.
*/
public static <T> Stream<T> toStream(final ResultSet resultSet, final CheckedFunction<ResultSet, T, SQLException> mapper) {
@MustBeClosed
protected static <T> Stream<T> toStream(final ResultSet resultSet, final CheckedFunction<ResultSet, T, SQLException> mapper) {
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {

@Override
public boolean tryAdvance(final Consumer<? super T> action) {
try {
if (!resultSet.next()) {
resultSet.close();
return false;
}
action.accept(mapper.apply(resultSet));
Expand Down Expand Up @@ -116,6 +119,7 @@ public abstract <T> List<T> bufferedResultSetQuery(CheckedFunction<Connection, R
* @return Result of the query mapped to a stream.
* @throws SQLException SQL related exceptions.
*/
@MustBeClosed
public abstract <T> Stream<T> resultSetQuery(CheckedFunction<Connection, ResultSet, SQLException> query,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException;
Expand All @@ -135,6 +139,7 @@ public abstract <T> Stream<T> resultSetQuery(CheckedFunction<Connection, ResultS
* @return Result of the query mapped to a stream.void execute(String sql)
* @throws SQLException SQL related exceptions.
*/
@MustBeClosed
public abstract <T> Stream<T> query(CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException;
Expand All @@ -154,6 +159,7 @@ public int queryInt(final String sql, final String... params) throws SQLExceptio
}
}

@MustBeClosed
@Override
public Stream<JsonNode> query(final String sql, final String... params) throws SQLException {
return query(connection -> {
Expand Down
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.db.jdbc;

import com.google.errorprone.annotations.MustBeClosed;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
Expand Down Expand Up @@ -61,6 +62,7 @@ public <T> List<T> bufferedResultSetQuery(final CheckedFunction<Connection, Resu
}

@Override
@MustBeClosed
public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
Expand All @@ -84,6 +86,7 @@ public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet,
* @throws SQLException SQL related exceptions.
*/
@Override
@MustBeClosed
public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -34,31 +35,33 @@ public class TestDefaultJdbcDatabase {

private static PostgreSQLContainer<?> PSQL_DB;

private JsonNode config;
private JdbcDatabase database;
private final JdbcSourceOperations sourceOperations = JdbcUtils.getDefaultSourceOperations();

@BeforeAll
static void init() {
PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine");
PSQL_DB.start();

}

@BeforeEach
void setup() throws Exception {
final String dbName = Strings.addRandomSuffix("db", "_", 10);

config = getConfig(PSQL_DB, dbName);

final JsonNode config = getConfig(PSQL_DB, dbName);
final String initScriptName = "init_" + dbName.concat(".sql");
final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";");
PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), PSQL_DB);

final JdbcDatabase database = getDatabaseFromConfig(config);
database = getDatabaseFromConfig(config);
database.execute(connection -> {
connection.createStatement().execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
connection.createStatement().execute("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
});
}

@AfterEach
void close() throws Exception {
database.close();
}

Expand All @@ -69,7 +72,7 @@ static void cleanUp() {

@Test
void testBufferedResultQuery() throws SQLException {
final List<JsonNode> actual = getDatabaseFromConfig(config).bufferedResultSetQuery(
final List<JsonNode> actual = database.bufferedResultSetQuery(
connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"),
sourceOperations::rowToJson);

Expand All @@ -78,7 +81,7 @@ void testBufferedResultQuery() throws SQLException {

@Test
void testResultSetQuery() throws SQLException {
final Stream<JsonNode> actual = getDatabaseFromConfig(config).resultSetQuery(
final Stream<JsonNode> actual = database.resultSetQuery(
connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"),
sourceOperations::rowToJson);
final List<JsonNode> actualAsList = actual.collect(Collectors.toList());
Expand All @@ -89,7 +92,7 @@ void testResultSetQuery() throws SQLException {

@Test
void testQuery() throws SQLException {
final Stream<JsonNode> actual = getDatabaseFromConfig(config).query(
final Stream<JsonNode> actual = database.query(
connection -> connection.prepareStatement("SELECT * FROM id_and_name;"),
sourceOperations::rowToJson);

Expand Down
Expand Up @@ -45,15 +45,11 @@ protected JdbcSqlOperations(final DataAdapter dataAdapter) {
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
if (!isSchemaExists(database, schemaName)) {
AirbyteSentry.executeWithTracing("CreateSchema",
() -> database.execute(createSchemaQuery(schemaName)),
() -> database.execute(String.format("CREATE SCHEMA IF NOT EXISTS %s;", schemaName)),
Map.of("schema", schemaName));
}
}

private String createSchemaQuery(final String schemaName) {
return String.format("CREATE SCHEMA IF NOT EXISTS %s;\n", schemaName);
}

@Override
public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException {
AirbyteSentry.executeWithTracing("CreateTableIfNotExists",
Expand Down
Expand Up @@ -18,8 +18,8 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV APPLICATION_VERSION 0.4.11
ENV APPLICATION_VERSION 0.4.12
ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.11
LABEL io.airbyte.version=0.4.12
LABEL io.airbyte.name=airbyte/destination-snowflake

0 comments on commit c27e2a0

Please sign in to comment.