diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt index 9f4ca36ca76d6a..384e85feac3587 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt @@ -15,6 +15,8 @@ import java.util.function.Consumer import java.util.function.Function import java.util.stream.Stream import java.util.stream.StreamSupport +import org.slf4j.Logger +import org.slf4j.LoggerFactory /** Database object for interacting with a JDBC connection. */ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSourceOperations<*>?) : @@ -211,6 +213,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource abstract fun executeMetadataQuery(query: Function): T companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(JdbcDatabase::class.java) /** * Map records returned in a result set. It is an "unsafe" stream because the stream must be * manually closed. Otherwise, there will be a database connection leak. diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index 353fc73e2ecba4..3e642da540ed72 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -400,6 +400,29 @@ abstract class JdbcDestinationHandler( return actualColumns == intendedColumns } + protected open fun getDeleteStatesSql( + destinationStates: Map + ): String { + return dslContext + .deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))) + .where( + destinationStates.keys + .stream() + .map { streamId: StreamId -> + field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)) + .eq(streamId.originalName) + .and( + field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)) + .eq(streamId.originalNamespace) + ) + } + .reduce(DSL.falseCondition()) { obj: Condition, arg2: Condition? -> + obj.or(arg2) + } + ) + .getSQL(ParamType.INLINED) + } + @Throws(Exception::class) override fun commitDestinationStates(destinationStates: Map) { try { @@ -408,25 +431,7 @@ abstract class JdbcDestinationHandler( } // Delete all state records where the stream name+namespace match one of our states - val deleteStates = - dslContext - .deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))) - .where( - destinationStates.keys - .stream() - .map { streamId: StreamId -> - field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)) - .eq(streamId.originalName) - .and( - field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)) - .eq(streamId.originalNamespace) - ) - } - .reduce(DSL.falseCondition()) { obj: Condition, arg2: Condition? -> - obj.or(arg2) - } - ) - .getSQL(ParamType.INLINED) + var deleteStates = getDeleteStatesSql(destinationStates) // Reinsert all of our states var insertStatesStep = @@ -461,12 +466,17 @@ abstract class JdbcDestinationHandler( } val insertStates = insertStatesStep.getSQL(ParamType.INLINED) - jdbcDatabase.executeWithinTransaction(listOf(deleteStates, insertStates)) + executeWithinTransaction(listOf(deleteStates, insertStates)) } catch (e: Exception) { LOGGER.warn("Failed to commit destination states", e) } } + @Throws(Exception::class) + protected open fun executeWithinTransaction(statements: List) { + jdbcDatabase.executeWithinTransaction(statements) + } + /** * Convert to the TYPE_NAME retrieved from [java.sql.DatabaseMetaData.getColumns] * diff --git a/airbyte-integrations/connectors/destination-snowflake/gradle.properties b/airbyte-integrations/connectors/destination-snowflake/gradle.properties index 56b208601bfcb0..2fa5a4a844ef2c 100644 --- a/airbyte-integrations/connectors/destination-snowflake/gradle.properties +++ b/airbyte-integrations/connectors/destination-snowflake/gradle.properties @@ -1,4 +1,4 @@ # currently limit the number of parallel threads until further investigation into the issues \ # where Snowflake will fail to login using config credentials -testExecutionConcurrency=1 +testExecutionConcurrency=-1 JunitMethodExecutionTimeout=15 m diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationRunner.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationRunner.java index 9d6460dcb6683a..88b52e62ba2b4a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationRunner.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationRunner.java @@ -7,12 +7,22 @@ import static io.airbyte.integrations.destination.snowflake.SnowflakeDestination.SCHEDULED_EXECUTOR_SERVICE; import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; +import io.airbyte.cdk.integrations.base.IntegrationRunner; import io.airbyte.cdk.integrations.base.adaptive.AdaptiveDestinationRunner; +import net.snowflake.client.core.SFStatement; import net.snowflake.client.jdbc.SnowflakeSQLException; public class SnowflakeDestinationRunner { public static void main(final String[] args) throws Exception { + IntegrationRunner.addOrphanedThreadFilter((Thread t) -> { + for (StackTraceElement stackTraceElement : IntegrationRunner.getThreadCreationStack(t).getStack()) { + if (stackTraceElement.getClassName().equals(SFStatement.class.getCanonicalName()) && stackTraceElement.getMethodName().equals("close")) { + return false; + } + } + return true; + }); AirbyteExceptionHandler.addThrowableForDeinterpolation(SnowflakeSQLException.class); AdaptiveDestinationRunner.baseOnEnv() .withOssDestination(() -> new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS)) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSourceOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSourceOperations.java index f716fd6a758817..cd549d90a9eba0 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSourceOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSourceOperations.java @@ -67,4 +67,10 @@ protected void putTimestampWithTimezone(final ObjectNode node, final String colu node.put(columnName, timestampWithOffset.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER)); } + protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + // for backward compatibility + var instant = resultSet.getTimestamp(index).toInstant(); + node.put(columnName, DataTypeUtils.toISO8601StringWithMicroseconds(instant)); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 24388f88655d63..7db614b51716de 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -42,6 +42,7 @@ import java.util.UUID; import java.util.stream.Collectors; import net.snowflake.client.jdbc.SnowflakeSQLException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.jooq.SQLDialect; import org.slf4j.Logger; @@ -355,4 +356,30 @@ private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { }; } + protected String getDeleteStatesSql(Map destinationStates) { + // only doing the DELETE where there's rows to delete allows us to avoid taking a lock on the table + // when there's nothing to delete + // This is particularly relevant in the context of tests, where many instance of the snowflake + // destination could be run in parallel + String deleteStatesSql = super.getDeleteStatesSql(destinationStates); + StringBuilder sql = new StringBuilder(); + // sql.append("BEGIN\n"); + sql.append(" IF (EXISTS (").append(deleteStatesSql.replace("delete from", "SELECT 1 FROM ")).append(")) THEN\n"); + sql.append(" ").append(deleteStatesSql).append(";\n"); + sql.append(" END IF\n"); + // sql.append("END;\n"); + return sql.toString(); + } + + protected void executeWithinTransaction(List statements) throws SQLException { + StringBuilder sb = new StringBuilder(); + sb.append("BEGIN\n"); + sb.append(" BEGIN TRANSACTION;\n "); + sb.append(StringUtils.join(statements, ";\n ")); + sb.append(";\n COMMIT;\n"); + sb.append("END;"); + LOGGER.info("executing SQL:" + sb); + getJdbcDatabase().execute(sb.toString()); + } + }