Skip to content

Commit

Permalink
Refactor orphaned thread logic to account for concurrent execution
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jun 14, 2023
1 parent e3b41a1 commit cc46862
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.commons.stream.StreamStatusUtils;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
Expand Down Expand Up @@ -72,6 +71,8 @@ public class IntegrationRunner {

public static final int FORCED_EXIT_CODE = 2;

private static final Runnable EXIT_HOOK = () -> System.exit(FORCED_EXIT_CODE);

private final IntegrationCliParser cliParser;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Integration integration;
Expand Down Expand Up @@ -178,19 +179,15 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);

final Procedure consumeWriteStreamCallable = () -> {
try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) {
consumeWriteStream(consumer);
}
};

watchForOrphanThreads(
consumeWriteStreamCallable,
() -> System.exit(FORCED_EXIT_CODE),
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) {
consumeWriteStream(consumer);
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
}
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
}
Expand Down Expand Up @@ -227,16 +224,9 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
LOGGER.info("Completed integration: {}", integration.getClass().getName());
}

private void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator, final Consumer<AirbyteMessage> recordCollector)
throws Exception {
private void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator, final Consumer<AirbyteMessage> recordCollector) {
messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Producing messages for stream {}...", s));
watchForOrphanThreads(
() -> messageIterator.forEachRemaining(recordCollector),
() -> System.exit(FORCED_EXIT_CODE),
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
messageIterator.forEachRemaining(recordCollector);
}

private void readConcurrent(final JsonNode config, ConfiguredAirbyteCatalog catalog, final Optional<JsonNode> stateOptional) throws Exception {
Expand All @@ -250,12 +240,24 @@ private void readConcurrent(final JsonNode config, ConfiguredAirbyteCatalog cata
if (streamConsumer.getException().isPresent()) {
throw streamConsumer.getException().get();
}
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
}
}

private void readSerial(final JsonNode config, ConfiguredAirbyteCatalog catalog, final Optional<JsonNode> stateOptional) throws Exception {
try (final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null))) {
produceMessages(messageIterator, outputRecordCollector);
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
}
}

Expand Down Expand Up @@ -312,59 +314,58 @@ static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer,
}

/**
* This method calls a runMethod and make sure that it won't produce orphan non-daemon active
* threads once it is done. Active non-daemon threads blocks JVM from exiting when the main thread
* is done, whereas daemon ones don't.
* Stops any non-daemon threads that could block the JVM from exiting when the main thread is done.
* <p>
* If any active non-daemon threads would be left as orphans, this method will schedule some
* interrupt/exit hooks after giving it some time delay to close up properly. It is generally
* preferred to have a proper closing sequence from children threads instead of interrupting or
* force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs
* for maintainers to fix the code behavior instead.
*
* @param exitHook The {@link Runnable} exit hook to execute for any orphaned threads.
* @param interruptTimeDelay The time to delay execution of the orphaned thread interrupt attempt.
* @param interruptTimeUnit The time unit of the interrupt delay.
* @param exitTimeDelay The time to delay execution of the orphaned thread exit hook.
* @param exitTimeUnit The time unit of the exit delay.
*/
@VisibleForTesting
static void watchForOrphanThreads(final Procedure runMethod,
final Runnable exitHook,
final int interruptTimeDelay,
final TimeUnit interruptTimeUnit,
final int exitTimeDelay,
final TimeUnit exitTimeUnit)
throws Exception {
static void stopOrphanedThreads(final Runnable exitHook,
final int interruptTimeDelay,
final TimeUnit interruptTimeUnit,
final int exitTimeDelay,
final TimeUnit exitTimeUnit) {
final Thread currentThread = Thread.currentThread();
try {
runMethod.call();
} finally {
final List<Thread> runningThreads = ThreadUtils.getAllThreads()
.stream()
.filter(ORPHANED_THREAD_FILTER)
.collect(Collectors.toList());
if (!runningThreads.isEmpty()) {
LOGGER.warn("""
The main thread is exiting while children non-daemon threads from a connector are still active.
Ideally, this situation should not happen...
Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead.
The main thread is: {}""", dumpThread(currentThread));
final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder()
// this thread executor will create daemon threads, so it does not block exiting if all other active
// threads are already stopped.
.daemon(true).build());
for (final Thread runningThread : runningThreads) {
final String str = "Active non-daemon thread: " + dumpThread(runningThread);
LOGGER.warn(str);
// even though the main thread is already shutting down, we still leave some chances to the children
// threads to close properly on their own.
// So, we schedule an interrupt hook after a fixed time delay instead...
scheduledExecutorService.schedule(runningThread::interrupt, interruptTimeDelay, interruptTimeUnit);
}
scheduledExecutorService.schedule(() -> {
if (ThreadUtils.getAllThreads().stream()
.anyMatch(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(currentThread.getName()))) {
LOGGER.error("Failed to interrupt children non-daemon threads, forcefully exiting NOW...\n");
exitHook.run();
}
}, exitTimeDelay, exitTimeUnit);

final List<Thread> runningThreads = ThreadUtils.getAllThreads()
.stream()
.filter(ORPHANED_THREAD_FILTER)
.collect(Collectors.toList());
if (!runningThreads.isEmpty()) {
LOGGER.warn("""
The main thread is exiting while children non-daemon threads from a connector are still active.
Ideally, this situation should not happen...
Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead.
The main thread is: {}""", dumpThread(currentThread));
final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder()
// this thread executor will create daemon threads, so it does not block exiting if all other active
// threads are already stopped.
.daemon(true).build());
for (final Thread runningThread : runningThreads) {
final String str = "Active non-daemon thread: " + dumpThread(runningThread);
LOGGER.warn(str);
// even though the main thread is already shutting down, we still leave some chances to the children
// threads to close properly on their own.
// So, we schedule an interrupt hook after a fixed time delay instead...
scheduledExecutorService.schedule(runningThread::interrupt, interruptTimeDelay, interruptTimeUnit);
}
scheduledExecutorService.schedule(() -> {
if (ThreadUtils.getAllThreads().stream()
.anyMatch(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(currentThread.getName()))) {
LOGGER.error("Failed to interrupt children non-daemon threads, forcefully exiting NOW...\n");
exitHook.run();
}
}, exitTimeDelay, exitTimeUnit);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,13 @@ void testDestinationConsumerLifecycleFailure() throws Exception {
}

@Test
void testInterruptOrphanThreadFailure() {
void testInterruptOrphanThread() {
final List<Exception> caughtExceptions = new ArrayList<>();
startSleepingThread(caughtExceptions, false);
assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads(
() -> {
throw new IOException("random error");
},
IntegrationRunner.stopOrphanedThreads(
Assertions::fail,
3, TimeUnit.SECONDS,
10, TimeUnit.SECONDS));
10, TimeUnit.SECONDS);
try {
TimeUnit.SECONDS.sleep(15);
} catch (final Exception e) {
Expand All @@ -396,17 +393,14 @@ void testInterruptOrphanThreadFailure() {
}

@Test
void testNoInterruptOrphanThreadFailure() {
void testNoInterruptOrphanThread() {
final List<Exception> caughtExceptions = new ArrayList<>();
final AtomicBoolean exitCalled = new AtomicBoolean(false);
startSleepingThread(caughtExceptions, true);
assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads(
() -> {
throw new IOException("random error");
},
IntegrationRunner.stopOrphanedThreads(
() -> exitCalled.set(true),
3, TimeUnit.SECONDS,
10, TimeUnit.SECONDS));
10, TimeUnit.SECONDS);
try {
TimeUnit.SECONDS.sleep(15);
} catch (final Exception e) {
Expand Down

0 comments on commit cc46862

Please sign in to comment.