diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 365db74ec9d53..f4c1630c2ed89 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -12,7 +12,6 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.lang.Exceptions.Procedure; import io.airbyte.commons.stream.StreamStatusUtils; import io.airbyte.commons.string.Strings; import io.airbyte.commons.util.AutoCloseableIterator; @@ -72,6 +71,8 @@ public class IntegrationRunner { public static final int FORCED_EXIT_CODE = 2; + private static final Runnable EXIT_HOOK = () -> System.exit(FORCED_EXIT_CODE); + private final IntegrationCliParser cliParser; private final Consumer outputRecordCollector; private final Integration integration; @@ -178,19 +179,15 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); - final Procedure consumeWriteStreamCallable = () -> { - try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) { - consumeWriteStream(consumer); - } - }; - - watchForOrphanThreads( - consumeWriteStreamCallable, - () -> System.exit(FORCED_EXIT_CODE), - INTERRUPT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES, - EXIT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES); + try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) { + consumeWriteStream(consumer); + } finally { + stopOrphanedThreads(EXIT_HOOK, + INTERRUPT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES, + EXIT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES); + } } default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); } @@ -227,16 +224,9 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { LOGGER.info("Completed integration: {}", integration.getClass().getName()); } - private void produceMessages(final AutoCloseableIterator messageIterator, final Consumer recordCollector) - throws Exception { + private void produceMessages(final AutoCloseableIterator messageIterator, final Consumer recordCollector) { messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Producing messages for stream {}...", s)); - watchForOrphanThreads( - () -> messageIterator.forEachRemaining(recordCollector), - () -> System.exit(FORCED_EXIT_CODE), - INTERRUPT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES, - EXIT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES); + messageIterator.forEachRemaining(recordCollector); } private void readConcurrent(final JsonNode config, ConfiguredAirbyteCatalog catalog, final Optional stateOptional) throws Exception { @@ -250,12 +240,24 @@ private void readConcurrent(final JsonNode config, ConfiguredAirbyteCatalog cata if (streamConsumer.getException().isPresent()) { throw streamConsumer.getException().get(); } + } finally { + stopOrphanedThreads(EXIT_HOOK, + INTERRUPT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES, + EXIT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES); } } private void readSerial(final JsonNode config, ConfiguredAirbyteCatalog catalog, final Optional stateOptional) throws Exception { try (final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null))) { produceMessages(messageIterator, outputRecordCollector); + } finally { + stopOrphanedThreads(EXIT_HOOK, + INTERRUPT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES, + EXIT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES); } } @@ -312,59 +314,58 @@ static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer, } /** - * This method calls a runMethod and make sure that it won't produce orphan non-daemon active - * threads once it is done. Active non-daemon threads blocks JVM from exiting when the main thread - * is done, whereas daemon ones don't. + * Stops any non-daemon threads that could block the JVM from exiting when the main thread is done. *

* If any active non-daemon threads would be left as orphans, this method will schedule some * interrupt/exit hooks after giving it some time delay to close up properly. It is generally * preferred to have a proper closing sequence from children threads instead of interrupting or * force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs * for maintainers to fix the code behavior instead. + * + * @param exitHook The {@link Runnable} exit hook to execute for any orphaned threads. + * @param interruptTimeDelay The time to delay execution of the orphaned thread interrupt attempt. + * @param interruptTimeUnit The time unit of the interrupt delay. + * @param exitTimeDelay The time to delay execution of the orphaned thread exit hook. + * @param exitTimeUnit The time unit of the exit delay. */ @VisibleForTesting - static void watchForOrphanThreads(final Procedure runMethod, - final Runnable exitHook, - final int interruptTimeDelay, - final TimeUnit interruptTimeUnit, - final int exitTimeDelay, - final TimeUnit exitTimeUnit) - throws Exception { + static void stopOrphanedThreads(final Runnable exitHook, + final int interruptTimeDelay, + final TimeUnit interruptTimeUnit, + final int exitTimeDelay, + final TimeUnit exitTimeUnit) { final Thread currentThread = Thread.currentThread(); - try { - runMethod.call(); - } finally { - final List runningThreads = ThreadUtils.getAllThreads() - .stream() - .filter(ORPHANED_THREAD_FILTER) - .collect(Collectors.toList()); - if (!runningThreads.isEmpty()) { - LOGGER.warn(""" - The main thread is exiting while children non-daemon threads from a connector are still active. - Ideally, this situation should not happen... - Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead. - The main thread is: {}""", dumpThread(currentThread)); - final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder() - // this thread executor will create daemon threads, so it does not block exiting if all other active - // threads are already stopped. - .daemon(true).build()); - for (final Thread runningThread : runningThreads) { - final String str = "Active non-daemon thread: " + dumpThread(runningThread); - LOGGER.warn(str); - // even though the main thread is already shutting down, we still leave some chances to the children - // threads to close properly on their own. - // So, we schedule an interrupt hook after a fixed time delay instead... - scheduledExecutorService.schedule(runningThread::interrupt, interruptTimeDelay, interruptTimeUnit); - } - scheduledExecutorService.schedule(() -> { - if (ThreadUtils.getAllThreads().stream() - .anyMatch(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(currentThread.getName()))) { - LOGGER.error("Failed to interrupt children non-daemon threads, forcefully exiting NOW...\n"); - exitHook.run(); - } - }, exitTimeDelay, exitTimeUnit); + + final List runningThreads = ThreadUtils.getAllThreads() + .stream() + .filter(ORPHANED_THREAD_FILTER) + .collect(Collectors.toList()); + if (!runningThreads.isEmpty()) { + LOGGER.warn(""" + The main thread is exiting while children non-daemon threads from a connector are still active. + Ideally, this situation should not happen... + Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead. + The main thread is: {}""", dumpThread(currentThread)); + final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder() + // this thread executor will create daemon threads, so it does not block exiting if all other active + // threads are already stopped. + .daemon(true).build()); + for (final Thread runningThread : runningThreads) { + final String str = "Active non-daemon thread: " + dumpThread(runningThread); + LOGGER.warn(str); + // even though the main thread is already shutting down, we still leave some chances to the children + // threads to close properly on their own. + // So, we schedule an interrupt hook after a fixed time delay instead... + scheduledExecutorService.schedule(runningThread::interrupt, interruptTimeDelay, interruptTimeUnit); } + scheduledExecutorService.schedule(() -> { + if (ThreadUtils.getAllThreads().stream() + .anyMatch(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(currentThread.getName()))) { + LOGGER.error("Failed to interrupt children non-daemon threads, forcefully exiting NOW...\n"); + exitHook.run(); + } + }, exitTimeDelay, exitTimeUnit); } } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index ce73ade8d5713..8f2aaf57615cb 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -372,16 +372,13 @@ void testDestinationConsumerLifecycleFailure() throws Exception { } @Test - void testInterruptOrphanThreadFailure() { + void testInterruptOrphanThread() { final List caughtExceptions = new ArrayList<>(); startSleepingThread(caughtExceptions, false); - assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads( - () -> { - throw new IOException("random error"); - }, + IntegrationRunner.stopOrphanedThreads( Assertions::fail, 3, TimeUnit.SECONDS, - 10, TimeUnit.SECONDS)); + 10, TimeUnit.SECONDS); try { TimeUnit.SECONDS.sleep(15); } catch (final Exception e) { @@ -396,17 +393,14 @@ void testInterruptOrphanThreadFailure() { } @Test - void testNoInterruptOrphanThreadFailure() { + void testNoInterruptOrphanThread() { final List caughtExceptions = new ArrayList<>(); final AtomicBoolean exitCalled = new AtomicBoolean(false); startSleepingThread(caughtExceptions, true); - assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads( - () -> { - throw new IOException("random error"); - }, + IntegrationRunner.stopOrphanedThreads( () -> exitCalled.set(true), 3, TimeUnit.SECONDS, - 10, TimeUnit.SECONDS)); + 10, TimeUnit.SECONDS); try { TimeUnit.SECONDS.sleep(15); } catch (final Exception e) {