Skip to content

Commit

Permalink
🎉 Abstract level for SQL relational database sources (#4123)
Browse files Browse the repository at this point in the history
Abstract level for SQL relational database sources
  • Loading branch information
DoNotPanicUA committed Jul 5, 2021
1 parent 9517fae commit 107f5b8
Show file tree
Hide file tree
Showing 65 changed files with 1,380 additions and 925 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* SOFTWARE.
*/

package io.airbyte.integrations.source.jdbc;
package io.airbyte.db;

import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
Expand All @@ -33,7 +33,7 @@ public static String getCursorField(ConfiguredAirbyteStream stream) {
if (stream.getCursorField().size() == 0) {
throw new IllegalStateException("No cursor field specified for stream attempting to do incremental.");
} else if (stream.getCursorField().size() > 1) {
throw new IllegalStateException("JdbcSource does not support nested cursor fields.");
throw new IllegalStateException("Source does not support nested cursor fields.");
} else {
return stream.getCursorField().get(0);
}
Expand Down
56 changes: 56 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/SqlDatabase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import com.fasterxml.jackson.databind.JsonNode;
import java.sql.SQLException;
import java.util.stream.Stream;

public abstract class SqlDatabase implements AutoCloseable {

private JsonNode sourceConfig;
private JsonNode databaseConfig;

public abstract void execute(String sql) throws SQLException;

public abstract Stream<JsonNode> query(String sql, String... params) throws SQLException;

public JsonNode getSourceConfig() {
return sourceConfig;
}

public void setSourceConfig(JsonNode sourceConfig) {
this.sourceConfig = sourceConfig;
}

public JsonNode getDatabaseConfig() {
return databaseConfig;
}

public void setDatabaseConfig(JsonNode databaseConfig) {
this.databaseConfig = databaseConfig;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.commons.functional.CheckedFunction;
import java.io.Closeable;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -41,7 +42,7 @@
/**
* Database object for interacting with a JDBC connection. Can be used for any JDBC compliant db.
*/
public class DefaultJdbcDatabase implements JdbcDatabase {
public class DefaultJdbcDatabase extends JdbcDatabase {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJdbcDatabase.class);

Expand Down Expand Up @@ -86,6 +87,14 @@ public <T> Stream<T> resultSetQuery(CheckedFunction<Connection, ResultSet, SQLEx
});
}

@Override
public DatabaseMetaData getMetaData() throws SQLException {
Connection conn = connectionSupplier.getConnection();
DatabaseMetaData metaData = conn.getMetaData();
conn.close();
return metaData;
}

/**
* You CANNOT assume that data will be returned from this method before the entire {@link ResultSet}
* is buffered in memory. Review the implementation of the database's JDBC driver or use the
Expand Down
43 changes: 31 additions & 12 deletions airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@

package io.airbyte.db.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.SqlDatabase;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -36,21 +39,22 @@
/**
* Database object for interacting with a JDBC connection.
*/
public interface JdbcDatabase extends AutoCloseable {
public abstract class JdbcDatabase extends SqlDatabase {

/**
* Execute a database query.
*
* @param query the query to execute against the database.
* @throws SQLException SQL related exceptions.
*/
void execute(CheckedConsumer<Connection, SQLException> query) throws SQLException;
public abstract void execute(CheckedConsumer<Connection, SQLException> query) throws SQLException;

default void execute(String sql) throws SQLException {
@Override
public void execute(String sql) throws SQLException {
execute(connection -> connection.createStatement().execute(sql));
}

default void executeWithinTransaction(List<String> queries) throws SQLException {
public void executeWithinTransaction(List<String> queries) throws SQLException {
execute(connection -> {
connection.setAutoCommit(false);
for (String s : queries) {
Expand All @@ -74,8 +78,8 @@ default void executeWithinTransaction(List<String> queries) throws SQLException
* @return Result of the query mapped to a list.
* @throws SQLException SQL related exceptions.
*/
<T> List<T> bufferedResultSetQuery(CheckedFunction<Connection, ResultSet, SQLException> query,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
public abstract <T> List<T> bufferedResultSetQuery(CheckedFunction<Connection, ResultSet, SQLException> query,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException;

/**
Expand All @@ -93,8 +97,8 @@ <T> List<T> bufferedResultSetQuery(CheckedFunction<Connection, ResultSet, SQLExc
* @return Result of the query mapped to a stream.
* @throws SQLException SQL related exceptions.
*/
<T> Stream<T> resultSetQuery(CheckedFunction<Connection, ResultSet, SQLException> query,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
public abstract <T> Stream<T> resultSetQuery(CheckedFunction<Connection, ResultSet, SQLException> query,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException;

/**
Expand All @@ -109,14 +113,14 @@ <T> Stream<T> resultSetQuery(CheckedFunction<Connection, ResultSet, SQLException
* just pass the {@link ResultSet} through. it is a stateful object will not be accessible if
* returned from recordTransform.
* @param <T> type that each record will be mapped to.
* @return Result of the query mapped to a stream.
* @return Result of the query mapped to a stream.void execute(String sql)
* @throws SQLException SQL related exceptions.
*/
<T> Stream<T> query(CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
public abstract <T> Stream<T> query(CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException;

default int queryInt(String sql, String... params) throws SQLException {
public int queryInt(String sql, String... params) throws SQLException {
try (Stream<Integer> q = query(c -> {
PreparedStatement statement = c.prepareStatement(sql);
int i = 1;
Expand All @@ -131,4 +135,19 @@ default int queryInt(String sql, String... params) throws SQLException {
}
}

@Override
public Stream<JsonNode> query(String sql, String... params) throws SQLException {
return query(connection -> {
PreparedStatement statement = connection.prepareStatement(sql);
int i = 1;
for (String param : params) {
statement.setString(i, param);
++i;
}
return statement;
}, JdbcUtils::rowToJson);
}

public abstract DatabaseMetaData getMetaData() throws SQLException;

}
62 changes: 1 addition & 61 deletions airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
Expand All @@ -42,10 +41,8 @@
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.StringJoiner;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
Expand All @@ -54,7 +51,7 @@

public class JdbcUtils {

private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset
public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset

/**
* Map records returned in a result set.
Expand Down Expand Up @@ -261,68 +258,11 @@ private static <T> T nullIfInvalid(SQLSupplier<T> valueProducer, Function<T, Boo
}
}

/**
* Create a fully qualified table name (including schema) with db-specific quoted syntax. e.g.
* "public"."my_table"
*
* @param connection connection to jdbc database (gives access to proper quotes)
* @param schemaName name of schema, if exists (CAN BE NULL)
* @param tableName name of the table
* @return fully qualified table name, using db-specific quoted syntax
* @throws SQLException throws if fails to pull correct quote character.
*/
public static String getFullyQualifiedTableNameWithQuoting(Connection connection, String schemaName, String tableName) throws SQLException {
final String quotedTableName = enquoteIdentifier(connection, tableName);
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

/**
* Create a fully qualified table name (including schema). e.g. public.my_table
*
* @param schemaName name of schema, if exists (CAN BE NULL)
* @param tableName name of the table
* @return fully qualified table name
*/
public static String getFullyQualifiedTableName(String schemaName, String tableName) {
return schemaName != null ? schemaName + "." + tableName : tableName;
}

@FunctionalInterface
private interface SQLSupplier<O> {

O apply() throws SQLException;

}

/**
* Given a database connection and identifier, adds db-specific quoting.
*
* @param connection database connection
* @param identifier identifier to quote
* @return quoted identifier
* @throws SQLException throws if there are any issues fulling the quoting metadata from the db.
*/
public static String enquoteIdentifier(Connection connection, String identifier) throws SQLException {
final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString();

return identifierQuoteString + identifier + identifierQuoteString;
}

/**
* Given a database connection and identifiers, adds db-specific quoting to each identifier.
*
* @param connection database connection
* @param identifiers identifiers to quote
* @return quoted identifiers
* @throws SQLException throws if there are any issues fulling the quoting metadata from the db.
*/
public static String enquoteIdentifierList(Connection connection, List<String> identifiers) throws SQLException {
final StringJoiner joiner = new StringJoiner(",");
for (String col : identifiers) {
String s = JdbcUtils.enquoteIdentifier(connection, col);
joiner.add(s);
}
return joiner.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -39,7 +40,7 @@
* allows the developer to specify the correct configuration in order for a
* {@link PreparedStatement} to execute as in a streaming / chunked manner.
*/
public class StreamingJdbcDatabase implements JdbcDatabase {
public class StreamingJdbcDatabase extends JdbcDatabase {

private final DataSource dataSource;
private final JdbcDatabase database;
Expand All @@ -51,6 +52,11 @@ public StreamingJdbcDatabase(DataSource dataSource, JdbcDatabase database, JdbcS
this.jdbcStreamingQueryConfiguration = jdbcStreamingQueryConfiguration;
}

@Override
public DatabaseMetaData getMetaData() throws SQLException {
return database.getMetaData();
}

@Override
public void execute(CheckedConsumer<Connection, SQLException> query) throws SQLException {
database.execute(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
* SOFTWARE.
*/

package io.airbyte.integrations.source.jdbc;
package io.airbyte.db;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -36,6 +35,7 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.util.Collections;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class IncrementalUtilsTest {
Expand All @@ -51,12 +51,13 @@ class IncrementalUtilsTest {
void testGetCursorField() {
final ConfiguredAirbyteStream stream = Jsons.clone(STREAM);
stream.setCursorField(Lists.newArrayList(UUID_FIELD_NAME));
assertEquals(UUID_FIELD_NAME, IncrementalUtils.getCursorField(stream));
Assertions.assertEquals(UUID_FIELD_NAME, IncrementalUtils.getCursorField(stream));
}

@Test
void testGetCursorFieldNoCursorFieldSet() {
assertThrows(IllegalStateException.class, () -> assertEquals(UUID_FIELD_NAME, IncrementalUtils.getCursorField(STREAM)));
assertThrows(IllegalStateException.class, () -> Assertions
.assertEquals(UUID_FIELD_NAME, IncrementalUtils.getCursorField(STREAM)));
}

@Test
Expand All @@ -68,7 +69,7 @@ void testGetCursorFieldCompositCursor() {

@Test
void testGetCursorType() {
assertEquals(JsonSchemaPrimitive.STRING, IncrementalUtils.getCursorType(STREAM, UUID_FIELD_NAME));
Assertions.assertEquals(JsonSchemaPrimitive.STRING, IncrementalUtils.getCursorType(STREAM, UUID_FIELD_NAME));
}

@Test
Expand All @@ -93,13 +94,13 @@ void testGetCursorTypeCursorHasNoType() {
@Test
void testCompareCursors() {
assertTrue(IncrementalUtils.compareCursors("abc", "def", JsonSchemaPrimitive.STRING) < 0);
assertEquals(0, IncrementalUtils.compareCursors("abc", "abc", JsonSchemaPrimitive.STRING));
Assertions.assertEquals(0, IncrementalUtils.compareCursors("abc", "abc", JsonSchemaPrimitive.STRING));
assertTrue(IncrementalUtils.compareCursors("1", "2", JsonSchemaPrimitive.NUMBER) < 0);
assertTrue(IncrementalUtils.compareCursors("5000000000", "5000000001", JsonSchemaPrimitive.NUMBER) < 0);
assertTrue(IncrementalUtils.compareCursors("false", "true", JsonSchemaPrimitive.BOOLEAN) < 0);
assertTrue(IncrementalUtils.compareCursors(null, "def", JsonSchemaPrimitive.STRING) < 1);
assertTrue(IncrementalUtils.compareCursors("abc", null, JsonSchemaPrimitive.STRING) > 0);
assertEquals(0, IncrementalUtils.compareCursors(null, null, JsonSchemaPrimitive.STRING));
Assertions.assertEquals(0, IncrementalUtils.compareCursors(null, null, JsonSchemaPrimitive.STRING));
assertThrows(IllegalStateException.class, () -> IncrementalUtils.compareCursors("a", "a", JsonSchemaPrimitive.ARRAY));
assertThrows(IllegalStateException.class, () -> IncrementalUtils.compareCursors("a", "a", JsonSchemaPrimitive.OBJECT));
assertThrows(IllegalStateException.class, () -> IncrementalUtils.compareCursors("a", "a", JsonSchemaPrimitive.NULL));
Expand Down

0 comments on commit 107f5b8

Please sign in to comment.