From ac8e4ba837f2d8a2e0e3c445b3447be0fbcbdecc Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Tue, 30 Apr 2024 08:44:32 -0700 Subject: [PATCH] minor changes for destination-snowflake --- .../io/airbyte/cdk/db/jdbc/JdbcDatabase.kt | 3 + .../integrations/base/IntegrationRunner.kt | 26 ++++++--- .../LoggingInvocationInterceptor.kt | 55 ++++++++++++------- .../io/airbyte/cdk/extensions/TestContext.kt | 11 ++++ .../typing_deduping/JdbcDestinationHandler.kt | 50 ++++++++++------- .../destination/DestinationAcceptanceTest.kt | 28 ++++++---- .../internal/DefaultAirbyteDestination.kt | 6 +- .../typing_deduping/BaseTypingDedupingTest.kt | 28 ++++++---- .../bases/base-java/javabase.sh | 2 +- .../destination-snowflake/build.gradle | 2 +- .../destination-snowflake/gradle.properties | 2 +- .../SnowflakeDestinationHandler.java | 27 +++++++++ 12 files changed, 165 insertions(+), 75 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt index 9f4ca36ca76d6..384e85feac358 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt @@ -15,6 +15,8 @@ import java.util.function.Consumer import java.util.function.Function import java.util.stream.Stream import java.util.stream.StreamSupport +import org.slf4j.Logger +import org.slf4j.LoggerFactory /** Database object for interacting with a JDBC connection. */ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSourceOperations<*>?) : @@ -211,6 +213,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource abstract fun executeMetadataQuery(query: Function): T companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(JdbcDatabase::class.java) /** * Map records returned in a result set. It is an "unsafe" stream because the stream must be * manually closed. Otherwise, there will be a database connection leak. diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt index 888d53475737b..923b0a5bf4f4d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt @@ -84,6 +84,7 @@ internal constructor( (destination != null) xor (source != null), "can only pass in a destination or a source" ) + threadCreationStack.set("main") this.cliParser = cliParser this.outputRecordCollector = outputRecordCollector // integration iface covers the commands that are the same for both source and destination. @@ -199,13 +200,9 @@ internal constructor( val catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!! - try { - destination!! - .getSerializedMessageConsumer(config, catalog, outputRecordCollector) - .use { consumer -> consumeWriteStream(consumer!!) } - } finally { - stopOrphanedThreads() - } + destination!! + .getSerializedMessageConsumer(config, catalog, outputRecordCollector) + .use { consumer -> consumeWriteStream(consumer!!) } } } } catch (e: Exception) { @@ -242,6 +239,8 @@ internal constructor( return } throw e + } finally { + stopOrphanedThreads() } LOGGER.info("Completed integration: {}", integration.javaClass.name) @@ -341,6 +340,12 @@ internal constructor( companion object { private val LOGGER: Logger = LoggerFactory.getLogger(IntegrationRunner::class.java) + private val threadCreationStack: InheritableThreadLocal = + object : InheritableThreadLocal() { + override fun childValue(parentValue: String): String { + return Thread.currentThread().stackTrace.joinToString("\n at ") + } + } const val TYPE_AND_DEDUPE_THREAD_NAME: String = "type-and-dedupe" @@ -449,8 +454,13 @@ internal constructor( .daemon(true) .build() ) + val getMethod = ThreadLocal::class.java.getDeclaredMethod("get", Thread::class.java) + getMethod.isAccessible = true for (runningThread in runningThreads) { - val str = "Active non-daemon thread: " + dumpThread(runningThread) + val str = + "Active non-daemon thread: " + + dumpThread(runningThread) + + "\ncreationStack=${getMethod.invoke(threadCreationStack, runningThread)}" LOGGER.warn(str) // even though the main thread is already shutting down, we still leave some // chances to the children diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt index 0c393ad9d013c..bdcc2380b445c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt @@ -46,18 +46,18 @@ class LoggingInvocationInterceptor : InvocationInterceptor { ExtensionContext::class.java ) == null ) { - LOGGER!!.error( + LOGGER.error( "Junit LoggingInvocationInterceptor executing unknown interception point {}", method.name ) - return method.invoke(proxy, *(args!!)) + return method.invoke(proxy, *(args)) } - val invocation = args!![0] as InvocationInterceptor.Invocation<*>? - val invocationContext = args[1] as ReflectiveInvocationContext<*>? + val invocation = args[0] as InvocationInterceptor.Invocation<*>? + val invocationContext = args[1] as ReflectiveInvocationContext<*> val extensionContext = args[2] as ExtensionContext? val methodName = method.name - val logLineSuffix: String? - val methodMatcher = methodPattern!!.matcher(methodName) + val logLineSuffix: String + val methodMatcher = methodPattern.matcher(methodName) if (methodName == "interceptDynamicTest") { logLineSuffix = "execution of DynamicTest %s".formatted(extensionContext!!.displayName) @@ -66,12 +66,19 @@ class LoggingInvocationInterceptor : InvocationInterceptor { "instance creation for %s".formatted(invocationContext!!.targetClass) } else if (methodMatcher.matches()) { val interceptedEvent = methodMatcher.group(1) + val methodRealClassName = invocationContext!!.executable!!.declaringClass.simpleName + val methodName = invocationContext.executable!!.name + val targetClassName = invocationContext!!.targetClass.simpleName + val methodDisplayName = + if (targetClassName == methodRealClassName) methodName + else "$methodName($methodRealClassName)" logLineSuffix = "execution of @%s method %s.%s".formatted( interceptedEvent, - invocationContext!!.executable!!.declaringClass.simpleName, - invocationContext.executable!!.name + targetClassName, + methodDisplayName ) + TestContext.CURRENT_TEST_NAME.set("$targetClassName.$methodName") } else { logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName) } @@ -81,15 +88,16 @@ class LoggingInvocationInterceptor : InvocationInterceptor { try { val timeout = getTimeout(invocationContext) if (timeout != null) { - LOGGER!!.info( + LOGGER.info( "Junit starting {} with timeout of {}", logLineSuffix, DurationFormatUtils.formatDurationWords(timeout.toMillis(), true, true) ) - Timer("TimeoutTimer-" + currentThread.name, true) - .schedule(timeoutTask, timeout.toMillis()) + var timer = + Timer("TimeoutTimer-" + currentThread.name, true) + .schedule(timeoutTask, timeout.toMillis()) } else { - LOGGER!!.warn("Junit starting {} with no timeout", logLineSuffix) + LOGGER.warn("Junit starting {} with no timeout", logLineSuffix) } val retVal = invocation!!.proceed() val elapsedMs = Duration.between(start, Instant.now()).toMillis() @@ -136,7 +144,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor { } } val stackTrace = StringUtils.join(stackToDisplay, "\n ") - LOGGER!!.error( + LOGGER.error( "Junit exception throw during {} after {}:\n{}", logLineSuffix, DurationFormatUtils.formatDurationWords(elapsedMs, true, true), @@ -145,24 +153,29 @@ class LoggingInvocationInterceptor : InvocationInterceptor { throw t1 } finally { timeoutTask.cancel() + TestContext.CURRENT_TEST_NAME.set(null) } } - private class TimeoutInteruptor(private val parentThread: Thread?) : TimerTask() { + private class TimeoutInteruptor(private val parentThread: Thread) : TimerTask() { @Volatile var wasTriggered: Boolean = false override fun run() { + LOGGER.info( + "interrupting running task on ${parentThread.name}. Current Stacktrace is ${parentThread.stackTrace.asList()}" + ) wasTriggered = true - parentThread!!.interrupt() + parentThread.interrupt() } override fun cancel(): Boolean { + LOGGER.info("cancelling timer task on ${parentThread.name}") return super.cancel() } } companion object { - private val methodPattern: Pattern? = Pattern.compile("intercept(.*)Method") + private val methodPattern: Pattern = Pattern.compile("intercept(.*)Method") private val PATTERN: Pattern = Pattern.compile( @@ -201,11 +214,11 @@ class LoggingInvocationInterceptor : InvocationInterceptor { ) } - private fun getTimeout(invocationContext: ReflectiveInvocationContext<*>?): Duration? { + private fun getTimeout(invocationContext: ReflectiveInvocationContext<*>): Duration { var timeout: Duration? = null - var m = invocationContext!!.executable + var m = invocationContext.executable if (m is Method) { - var timeoutAnnotation: Timeout? = m.getAnnotation(Timeout::class.java) + var timeoutAnnotation: Timeout? = m.getAnnotation(Timeout::class.java) if (timeoutAnnotation == null) { timeoutAnnotation = invocationContext.targetClass.getAnnotation(Timeout::class.java) @@ -328,9 +341,9 @@ class LoggingInvocationInterceptor : InvocationInterceptor { } companion object { - private val LOGGER: Logger? = + private val LOGGER: Logger = LoggerFactory.getLogger(LoggingInvocationInterceptor::class.java) - private val JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME: String? = + private val JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME: String = "JunitMethodExecutionTimeout" } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt new file mode 100644 index 0000000000000..bf5ff596852e6 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.extensions + +class TestContext { + companion object { + val CURRENT_TEST_NAME: ThreadLocal = ThreadLocal() + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index 353fc73e2ecba..3e642da540ed7 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -400,6 +400,29 @@ abstract class JdbcDestinationHandler( return actualColumns == intendedColumns } + protected open fun getDeleteStatesSql( + destinationStates: Map + ): String { + return dslContext + .deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))) + .where( + destinationStates.keys + .stream() + .map { streamId: StreamId -> + field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)) + .eq(streamId.originalName) + .and( + field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)) + .eq(streamId.originalNamespace) + ) + } + .reduce(DSL.falseCondition()) { obj: Condition, arg2: Condition? -> + obj.or(arg2) + } + ) + .getSQL(ParamType.INLINED) + } + @Throws(Exception::class) override fun commitDestinationStates(destinationStates: Map) { try { @@ -408,25 +431,7 @@ abstract class JdbcDestinationHandler( } // Delete all state records where the stream name+namespace match one of our states - val deleteStates = - dslContext - .deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))) - .where( - destinationStates.keys - .stream() - .map { streamId: StreamId -> - field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)) - .eq(streamId.originalName) - .and( - field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)) - .eq(streamId.originalNamespace) - ) - } - .reduce(DSL.falseCondition()) { obj: Condition, arg2: Condition? -> - obj.or(arg2) - } - ) - .getSQL(ParamType.INLINED) + var deleteStates = getDeleteStatesSql(destinationStates) // Reinsert all of our states var insertStatesStep = @@ -461,12 +466,17 @@ abstract class JdbcDestinationHandler( } val insertStates = insertStatesStep.getSQL(ParamType.INLINED) - jdbcDatabase.executeWithinTransaction(listOf(deleteStates, insertStates)) + executeWithinTransaction(listOf(deleteStates, insertStates)) } catch (e: Exception) { LOGGER.warn("Failed to commit destination states", e) } } + @Throws(Exception::class) + protected open fun executeWithinTransaction(statements: List) { + jdbcDatabase.executeWithinTransaction(statements) + } + /** * Convert to the TYPE_NAME retrieved from [java.sql.DatabaseMetaData.getColumns] * diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index a33c4b673db9c..c13e4aaf1c84b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode import com.google.common.collect.ImmutableMap import com.google.common.collect.Lists import com.google.common.collect.Sets +import io.airbyte.cdk.extensions.TestContext import io.airbyte.cdk.integrations.destination.NamingConventionTransformer import io.airbyte.cdk.integrations.standardtest.destination.* import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataArgumentsProvider @@ -47,6 +48,7 @@ import io.airbyte.workers.helper.ConnectorConfigUpdater import io.airbyte.workers.helper.EntrypointEnvChecker import io.airbyte.workers.internal.AirbyteDestination import io.airbyte.workers.internal.DefaultAirbyteDestination +import io.airbyte.workers.internal.DefaultAirbyteStreamFactory import io.airbyte.workers.normalization.DefaultNormalizationRunner import io.airbyte.workers.normalization.NormalizationRunner import io.airbyte.workers.process.AirbyteIntegrationLauncher @@ -1573,16 +1575,22 @@ abstract class DestinationAcceptanceTest { protected val destination: AirbyteDestination get() = DefaultAirbyteDestination( - AirbyteIntegrationLauncher( - JOB_ID, - JOB_ATTEMPT, - imageName, - processFactory, - null, - null, - false, - EnvVariableFeatureFlags() - ) + integrationLauncher = + AirbyteIntegrationLauncher( + JOB_ID, + JOB_ATTEMPT, + imageName, + processFactory, + null, + null, + false, + EnvVariableFeatureFlags() + ), + streamFactory = + DefaultAirbyteStreamFactory( + DefaultAirbyteDestination.createContainerLogMdcBuilder() + .setLogPrefix("destination-(${TestContext.CURRENT_TEST_NAME.get()})") + ) ) @Throws(Exception::class) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt index b3809f0231bdb..913f212460da4 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt @@ -39,7 +39,7 @@ class DefaultAirbyteDestination constructor( private val integrationLauncher: IntegrationLauncher, private val streamFactory: AirbyteStreamFactory = - DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER), + DefaultAirbyteStreamFactory(createContainerLogMdcBuilder()), private val messageWriterFactory: AirbyteMessageBufferedWriterFactory = DefaultAirbyteMessageBufferedWriterFactory(), private val protocolSerializer: ProtocolSerializer = DefaultProtocolSerializer() @@ -87,7 +87,7 @@ constructor( destinationProcess!!.errorStream, { msg: String? -> LOGGER.error(msg) }, "airbyte-destination", - CONTAINER_LOG_MDC_BUILDER + createContainerLogMdcBuilder() ) writer = @@ -179,7 +179,7 @@ constructor( companion object { private val LOGGER: Logger = LoggerFactory.getLogger(DefaultAirbyteDestination::class.java) - val CONTAINER_LOG_MDC_BUILDER: MdcScope.Builder = + fun createContainerLogMdcBuilder(): MdcScope.Builder = MdcScope.Builder() .setLogPrefix("destination") .setPrefixColor(LoggingHelper.Color.YELLOW_BACKGROUND) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index fe1657ecb6a1c..4ed4d6d1e85ac 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -6,6 +6,7 @@ package io.airbyte.integrations.base.destination.typing_deduping import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ObjectNode import com.google.common.collect.ImmutableMap +import io.airbyte.cdk.extensions.TestContext import io.airbyte.commons.features.EnvVariableFeatureFlags import io.airbyte.commons.json.Jsons import io.airbyte.commons.lang.Exceptions @@ -15,6 +16,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.* import io.airbyte.workers.internal.AirbyteDestination import io.airbyte.workers.internal.DefaultAirbyteDestination +import io.airbyte.workers.internal.DefaultAirbyteStreamFactory import io.airbyte.workers.process.AirbyteIntegrationLauncher import io.airbyte.workers.process.DockerProcessFactory import io.airbyte.workers.process.ProcessFactory @@ -1033,16 +1035,22 @@ abstract class BaseTypingDedupingTest { val destination: AirbyteDestination = DefaultAirbyteDestination( - AirbyteIntegrationLauncher( - "0", - 0, - imageName, - processFactory, - null, - null, - false, - EnvVariableFeatureFlags() - ) + integrationLauncher = + AirbyteIntegrationLauncher( + "0", + 0, + imageName, + processFactory, + null, + null, + false, + EnvVariableFeatureFlags() + ), + streamFactory = + DefaultAirbyteStreamFactory( + DefaultAirbyteDestination.createContainerLogMdcBuilder() + .setLogPrefix("destination-(${TestContext.CURRENT_TEST_NAME.get()})") + ) ) destination.start(destinationConfig, jobRoot, emptyMap()) diff --git a/airbyte-integrations/bases/base-java/javabase.sh b/airbyte-integrations/bases/base-java/javabase.sh index 59ceb87713fa5..3a1c7482a2ba9 100755 --- a/airbyte-integrations/bases/base-java/javabase.sh +++ b/airbyte-integrations/bases/base-java/javabase.sh @@ -16,7 +16,7 @@ if [[ $IS_CAPTURE_HEAP_DUMP_ON_ERROR = true ]]; then fi fi #30781 - Allocate 32KB for log4j appender buffer to ensure that each line is logged in a single println -JAVA_OPTS=$JAVA_OPTS" -Dlog4j.encoder.byteBufferSize=32768 -Dlog4j2.configurationFile=log4j2.xml" +JAVA_OPTS=$JAVA_OPTS" -Dlog4j.encoder.byteBufferSize=32768 -Dlog4j2.configurationFile=log4j2.xml --add-opens=java.base/java.lang=ALL-UNNAMED" export JAVA_OPTS # Wrap run script in a script so that we can lazy evaluate the value of APPLICATION. APPLICATION is diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 2d3cb653861d0..fc6f7f38f7ce1 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -5,7 +5,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.31.1' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = false + useLocalCdk = true } java { diff --git a/airbyte-integrations/connectors/destination-snowflake/gradle.properties b/airbyte-integrations/connectors/destination-snowflake/gradle.properties index 56b208601bfcb..2fa5a4a844ef2 100644 --- a/airbyte-integrations/connectors/destination-snowflake/gradle.properties +++ b/airbyte-integrations/connectors/destination-snowflake/gradle.properties @@ -1,4 +1,4 @@ # currently limit the number of parallel threads until further investigation into the issues \ # where Snowflake will fail to login using config credentials -testExecutionConcurrency=1 +testExecutionConcurrency=-1 JunitMethodExecutionTimeout=15 m diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 24388f88655d6..7db614b51716d 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -42,6 +42,7 @@ import java.util.UUID; import java.util.stream.Collectors; import net.snowflake.client.jdbc.SnowflakeSQLException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.jooq.SQLDialect; import org.slf4j.Logger; @@ -355,4 +356,30 @@ private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { }; } + protected String getDeleteStatesSql(Map destinationStates) { + // only doing the DELETE where there's rows to delete allows us to avoid taking a lock on the table + // when there's nothing to delete + // This is particularly relevant in the context of tests, where many instance of the snowflake + // destination could be run in parallel + String deleteStatesSql = super.getDeleteStatesSql(destinationStates); + StringBuilder sql = new StringBuilder(); + // sql.append("BEGIN\n"); + sql.append(" IF (EXISTS (").append(deleteStatesSql.replace("delete from", "SELECT 1 FROM ")).append(")) THEN\n"); + sql.append(" ").append(deleteStatesSql).append(";\n"); + sql.append(" END IF\n"); + // sql.append("END;\n"); + return sql.toString(); + } + + protected void executeWithinTransaction(List statements) throws SQLException { + StringBuilder sb = new StringBuilder(); + sb.append("BEGIN\n"); + sb.append(" BEGIN TRANSACTION;\n "); + sb.append(StringUtils.join(statements, ";\n ")); + sb.append(";\n COMMIT;\n"); + sb.append("END;"); + LOGGER.info("executing SQL:" + sb); + getJdbcDatabase().execute(sb.toString()); + } + }