From 961d5135ea9b058f7bc052b8a262537a0d581ce5 Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Wed, 1 May 2024 14:01:05 -0700 Subject: [PATCH] debuggability improvements to the CDK --- .../integrations/base/IntegrationRunner.kt | 85 ++++++++++++++----- .../src/main/resources/version.properties | 2 +- .../base/IntegrationRunnerTest.kt | 4 +- .../LoggingInvocationInterceptor.kt | 50 ++++++----- .../io/airbyte/cdk/extensions/TestContext.kt | 9 ++ .../destination/DestinationAcceptanceTest.kt | 28 +++--- .../internal/DefaultAirbyteDestination.kt | 15 ++-- .../typing_deduping/BaseTypingDedupingTest.kt | 23 ++--- .../bases/base-java/javabase.sh | 2 + 9 files changed, 144 insertions(+), 74 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt index 888d53475737b8..2a2e50a60fb7d7 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt @@ -25,13 +25,13 @@ import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.validation.json.JsonSchemaValidator import java.io.* +import java.lang.reflect.Method import java.nio.charset.StandardCharsets import java.nio.file.Path +import java.time.Instant import java.util.* import java.util.concurrent.* import java.util.function.Consumer -import java.util.function.Predicate -import java.util.stream.Collectors import org.apache.commons.lang3.ThreadUtils import org.apache.commons.lang3.concurrent.BasicThreadFactory import org.slf4j.Logger @@ -84,6 +84,7 @@ internal constructor( (destination != null) xor (source != null), "can only pass in a destination or a source" ) + threadCreationInfo.set(ThreadCreationInfo()) this.cliParser = cliParser this.outputRecordCollector = outputRecordCollector // integration iface covers the commands that are the same for both source and destination. @@ -189,17 +190,20 @@ internal constructor( } } Command.WRITE -> { - val config = parseConfig(parsed.getConfigPath()) - validateConfig(integration.spec().connectionSpecification, config, "WRITE") - // save config to singleton - DestinationConfig.Companion.initialize( - config, - (integration as Destination).isV2Destination - ) - val catalog = - parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!! - try { + val config = parseConfig(parsed.getConfigPath()) + validateConfig(integration.spec().connectionSpecification, config, "WRITE") + // save config to singleton + DestinationConfig.Companion.initialize( + config, + (integration as Destination).isV2Destination + ) + val catalog = + parseConfig( + parsed.getCatalogPath(), + ConfiguredAirbyteCatalog::class.java + )!! + destination!! .getSerializedMessageConsumer(config, catalog, outputRecordCollector) .use { consumer -> consumeWriteStream(consumer!!) } @@ -339,11 +343,37 @@ internal constructor( } } + class ThreadCreationInfo { + val stack: List = Thread.currentThread().stackTrace.asList() + val time: Instant = Instant.now() + override fun toString(): String { + return "creationStack=${stack.joinToString("\n ")}\ncreationTime=$time" + } + } + companion object { private val LOGGER: Logger = LoggerFactory.getLogger(IntegrationRunner::class.java) + private val threadCreationInfo: InheritableThreadLocal = + object : InheritableThreadLocal() { + override fun childValue(parentValue: ThreadCreationInfo): ThreadCreationInfo { + return ThreadCreationInfo() + } + } const val TYPE_AND_DEDUPE_THREAD_NAME: String = "type-and-dedupe" + // ThreadLocal.get(Thread) is private. So we open it and keep a reference to the + // opened method + private val getMethod: Method = + ThreadLocal::class.java.getDeclaredMethod("get", Thread::class.java).also { + it.isAccessible = true + } + + @JvmStatic + fun getThreadCreationInfo(thread: Thread): ThreadCreationInfo { + return getMethod.invoke(threadCreationInfo, thread) as ThreadCreationInfo + } + /** * Filters threads that should not be considered when looking for orphaned threads at * shutdown of the integration runner. @@ -353,11 +383,12 @@ internal constructor( * active so long as the database connection pool is open. */ @VisibleForTesting - val ORPHANED_THREAD_FILTER: Predicate = Predicate { runningThread: Thread -> - (runningThread.name != Thread.currentThread().name && - !runningThread.isDaemon && - TYPE_AND_DEDUPE_THREAD_NAME != runningThread.name) - } + private val orphanedThreadPredicates: MutableList<(Thread) -> Boolean> = + mutableListOf({ runningThread: Thread -> + (runningThread.name != Thread.currentThread().name && + !runningThread.isDaemon && + TYPE_AND_DEDUPE_THREAD_NAME != runningThread.name) + }) const val INTERRUPT_THREAD_DELAY_MINUTES: Int = 1 const val EXIT_THREAD_DELAY_MINUTES: Int = 2 @@ -398,6 +429,15 @@ internal constructor( LOGGER.info("Finished buffered read of input stream") } + @JvmStatic + fun addOrphanedThreadFilter(predicate: (Thread) -> (Boolean)) { + orphanedThreadPredicates.add(predicate) + } + + fun filterOrphanedThread(thread: Thread): Boolean { + return orphanedThreadPredicates.all { it(thread) } + } + /** * Stops any non-daemon threads that could block the JVM from exiting when the main thread * is done. @@ -425,11 +465,7 @@ internal constructor( ) { val currentThread = Thread.currentThread() - val runningThreads = - ThreadUtils.getAllThreads() - .stream() - .filter(ORPHANED_THREAD_FILTER) - .collect(Collectors.toList()) + val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread).toList() if (runningThreads.isNotEmpty()) { LOGGER.warn( """ @@ -450,7 +486,10 @@ internal constructor( .build() ) for (runningThread in runningThreads) { - val str = "Active non-daemon thread: " + dumpThread(runningThread) + val str = + "Active non-daemon thread: " + + dumpThread(runningThread) + + "\ncreationStack=${getThreadCreationInfo(runningThread)}" LOGGER.warn(str) // even though the main thread is already shutting down, we still leave some // chances to the children diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 7dbdb6cb2ec38b..20005b84fa63cc 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.31.3 \ No newline at end of file +version=0.31.4 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt index 59d06c53b74450..0990d8dd26128b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt @@ -477,7 +477,7 @@ ${Jsons.serialize(message2)}""".toByteArray( val runningThreads = ThreadUtils.getAllThreads() .stream() - .filter(IntegrationRunner.ORPHANED_THREAD_FILTER) + .filter(IntegrationRunner::filterOrphanedThread) .collect(Collectors.toList()) // all threads should be interrupted Assertions.assertEquals(listOf(), runningThreads) @@ -505,7 +505,7 @@ ${Jsons.serialize(message2)}""".toByteArray( val runningThreads = ThreadUtils.getAllThreads() .stream() - .filter(IntegrationRunner.ORPHANED_THREAD_FILTER) + .filter(IntegrationRunner::filterOrphanedThread) .collect(Collectors.toList()) // a thread that refuses to be interrupted should remain Assertions.assertEquals(1, runningThreads.size) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt index 0c393ad9d013c0..d792423bd275e6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt @@ -46,18 +46,18 @@ class LoggingInvocationInterceptor : InvocationInterceptor { ExtensionContext::class.java ) == null ) { - LOGGER!!.error( + LOGGER.error( "Junit LoggingInvocationInterceptor executing unknown interception point {}", method.name ) - return method.invoke(proxy, *(args!!)) + return method.invoke(proxy, *(args)) } - val invocation = args!![0] as InvocationInterceptor.Invocation<*>? - val invocationContext = args[1] as ReflectiveInvocationContext<*>? + val invocation = args[0] as InvocationInterceptor.Invocation<*>? + val invocationContext = args[1] as ReflectiveInvocationContext<*> val extensionContext = args[2] as ExtensionContext? val methodName = method.name - val logLineSuffix: String? - val methodMatcher = methodPattern!!.matcher(methodName) + val logLineSuffix: String + val methodMatcher = methodPattern.matcher(methodName) if (methodName == "interceptDynamicTest") { logLineSuffix = "execution of DynamicTest %s".formatted(extensionContext!!.displayName) @@ -66,12 +66,19 @@ class LoggingInvocationInterceptor : InvocationInterceptor { "instance creation for %s".formatted(invocationContext!!.targetClass) } else if (methodMatcher.matches()) { val interceptedEvent = methodMatcher.group(1) + val methodRealClassName = invocationContext!!.executable!!.declaringClass.simpleName + val methodName = invocationContext.executable!!.name + val targetClassName = invocationContext!!.targetClass.simpleName + val methodDisplayName = + if (targetClassName == methodRealClassName) methodName + else "$methodName($methodRealClassName)" logLineSuffix = "execution of @%s method %s.%s".formatted( interceptedEvent, - invocationContext!!.executable!!.declaringClass.simpleName, - invocationContext.executable!!.name + targetClassName, + methodDisplayName ) + TestContext.CURRENT_TEST_NAME.set("$targetClassName.$methodName") } else { logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName) } @@ -81,7 +88,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor { try { val timeout = getTimeout(invocationContext) if (timeout != null) { - LOGGER!!.info( + LOGGER.info( "Junit starting {} with timeout of {}", logLineSuffix, DurationFormatUtils.formatDurationWords(timeout.toMillis(), true, true) @@ -89,7 +96,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor { Timer("TimeoutTimer-" + currentThread.name, true) .schedule(timeoutTask, timeout.toMillis()) } else { - LOGGER!!.warn("Junit starting {} with no timeout", logLineSuffix) + LOGGER.warn("Junit starting {} with no timeout", logLineSuffix) } val retVal = invocation!!.proceed() val elapsedMs = Duration.between(start, Instant.now()).toMillis() @@ -136,7 +143,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor { } } val stackTrace = StringUtils.join(stackToDisplay, "\n ") - LOGGER!!.error( + LOGGER.error( "Junit exception throw during {} after {}:\n{}", logLineSuffix, DurationFormatUtils.formatDurationWords(elapsedMs, true, true), @@ -145,24 +152,29 @@ class LoggingInvocationInterceptor : InvocationInterceptor { throw t1 } finally { timeoutTask.cancel() + TestContext.CURRENT_TEST_NAME.set(null) } } - private class TimeoutInteruptor(private val parentThread: Thread?) : TimerTask() { + private class TimeoutInteruptor(private val parentThread: Thread) : TimerTask() { @Volatile var wasTriggered: Boolean = false override fun run() { + LOGGER.info( + "interrupting running task on ${parentThread.name}. Current Stacktrace is ${parentThread.stackTrace.asList()}" + ) wasTriggered = true - parentThread!!.interrupt() + parentThread.interrupt() } override fun cancel(): Boolean { + LOGGER.info("cancelling timer task on ${parentThread.name}") return super.cancel() } } companion object { - private val methodPattern: Pattern? = Pattern.compile("intercept(.*)Method") + private val methodPattern: Pattern = Pattern.compile("intercept(.*)Method") private val PATTERN: Pattern = Pattern.compile( @@ -201,11 +213,11 @@ class LoggingInvocationInterceptor : InvocationInterceptor { ) } - private fun getTimeout(invocationContext: ReflectiveInvocationContext<*>?): Duration? { + private fun getTimeout(invocationContext: ReflectiveInvocationContext<*>): Duration { var timeout: Duration? = null - var m = invocationContext!!.executable + var m = invocationContext.executable if (m is Method) { - var timeoutAnnotation: Timeout? = m.getAnnotation(Timeout::class.java) + var timeoutAnnotation: Timeout? = m.getAnnotation(Timeout::class.java) if (timeoutAnnotation == null) { timeoutAnnotation = invocationContext.targetClass.getAnnotation(Timeout::class.java) @@ -328,9 +340,9 @@ class LoggingInvocationInterceptor : InvocationInterceptor { } companion object { - private val LOGGER: Logger? = + private val LOGGER: Logger = LoggerFactory.getLogger(LoggingInvocationInterceptor::class.java) - private val JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME: String? = + private val JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME: String = "JunitMethodExecutionTimeout" } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt new file mode 100644 index 00000000000000..6608ec0696f6c2 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.extensions + +object TestContext { + val CURRENT_TEST_NAME: ThreadLocal = ThreadLocal() +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index fee958f9c1d4a1..4d5e40f9a8c912 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -1469,7 +1469,7 @@ abstract class DestinationAcceptanceTest { } /** Whether the destination should be tested against different namespaces. */ - protected open fun supportNamespaceTest(): Boolean { + open protected fun supportNamespaceTest(): Boolean { return false } @@ -1571,19 +1571,21 @@ abstract class DestinationAcceptanceTest { } protected val destination: AirbyteDestination - get() = - DefaultAirbyteDestination( - AirbyteIntegrationLauncher( - JOB_ID, - JOB_ATTEMPT, - imageName, - processFactory, - null, - null, - false, - EnvVariableFeatureFlags() - ) + get() { + return DefaultAirbyteDestination( + integrationLauncher = + AirbyteIntegrationLauncher( + JOB_ID, + JOB_ATTEMPT, + imageName, + processFactory, + null, + null, + false, + EnvVariableFeatureFlags() + ) ) + } @Throws(Exception::class) protected fun runSyncAndVerifyStateOutput( diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt index b3809f0231bdb4..dcb95891305ba4 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt @@ -5,6 +5,7 @@ package io.airbyte.workers.internal import com.google.common.base.Charsets import com.google.common.base.Preconditions +import io.airbyte.cdk.extensions.TestContext import io.airbyte.commons.io.IOs import io.airbyte.commons.io.LineGobbler import io.airbyte.commons.json.Jsons @@ -39,7 +40,7 @@ class DefaultAirbyteDestination constructor( private val integrationLauncher: IntegrationLauncher, private val streamFactory: AirbyteStreamFactory = - DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER), + DefaultAirbyteStreamFactory(createContainerLogMdcBuilder()), private val messageWriterFactory: AirbyteMessageBufferedWriterFactory = DefaultAirbyteMessageBufferedWriterFactory(), private val protocolSerializer: ProtocolSerializer = DefaultProtocolSerializer() @@ -87,7 +88,7 @@ constructor( destinationProcess!!.errorStream, { msg: String? -> LOGGER.error(msg) }, "airbyte-destination", - CONTAINER_LOG_MDC_BUILDER + createContainerLogMdcBuilder() ) writer = @@ -179,10 +180,14 @@ constructor( companion object { private val LOGGER: Logger = LoggerFactory.getLogger(DefaultAirbyteDestination::class.java) - val CONTAINER_LOG_MDC_BUILDER: MdcScope.Builder = - MdcScope.Builder() - .setLogPrefix("destination") + fun createContainerLogMdcBuilder(): MdcScope.Builder { + val currentTest = TestContext.CURRENT_TEST_NAME.get() + val logPrefix = + if (currentTest == null) "destination" else "destination(${currentTest})" + return MdcScope.Builder() + .setLogPrefix(logPrefix) .setPrefixColor(LoggingHelper.Color.YELLOW_BACKGROUND) + } val IGNORED_EXIT_CODES: Set = setOf( 0, // Normal exit diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index 37645c718f7d15..4940d9f6d0b1b2 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -152,7 +152,7 @@ abstract class BaseTypingDedupingTest { /** Conceptually identical to [.getFinalMetadataColumnNames], but for the raw table. */ get() = HashMap() - val finalMetadataColumnNames: Map + open val finalMetadataColumnNames: Map /** * If the destination connector uses a nonstandard schema for the final table, override this * method. For example, destination-snowflake upcases all column names in the final tables. @@ -1033,16 +1033,17 @@ abstract class BaseTypingDedupingTest { val destination: AirbyteDestination = DefaultAirbyteDestination( - AirbyteIntegrationLauncher( - "0", - 0, - imageName, - processFactory, - null, - null, - false, - EnvVariableFeatureFlags() - ) + integrationLauncher = + AirbyteIntegrationLauncher( + "0", + 0, + imageName, + processFactory, + null, + null, + false, + EnvVariableFeatureFlags() + ) ) destination.start(destinationConfig, jobRoot, emptyMap()) diff --git a/airbyte-integrations/bases/base-java/javabase.sh b/airbyte-integrations/bases/base-java/javabase.sh index 59ceb87713fa54..024edd452ab720 100755 --- a/airbyte-integrations/bases/base-java/javabase.sh +++ b/airbyte-integrations/bases/base-java/javabase.sh @@ -17,6 +17,8 @@ if [[ $IS_CAPTURE_HEAP_DUMP_ON_ERROR = true ]]; then fi #30781 - Allocate 32KB for log4j appender buffer to ensure that each line is logged in a single println JAVA_OPTS=$JAVA_OPTS" -Dlog4j.encoder.byteBufferSize=32768 -Dlog4j2.configurationFile=log4j2.xml" +#needed because we make ThreadLocal.get(Thread) accessible in IntegrationRunner.stopOrphanedThreads +JAVA_OPTS=$JAVA_OPTS" --add-opens=java.base/java.lang=ALL-UNNAMED" export JAVA_OPTS # Wrap run script in a script so that we can lazy evaluate the value of APPLICATION. APPLICATION is