From 0299145f6332a28245c4af64ae97ab065ea6c9fc Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Tue, 13 Jun 2023 14:03:50 -0400 Subject: [PATCH] Exclude concurrent stream threads from orphan thread watcher --- .../integrations/base/IntegrationRunner.java | 4 +- .../concurrent/ConcurrentStreamConsumer.java | 90 ++++++++++++++++--- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 22fc52d56e370f..365db74ec9d53b 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -63,7 +63,9 @@ public class IntegrationRunner { */ @VisibleForTesting static final Predicate ORPHANED_THREAD_FILTER = runningThread -> !runningThread.getName().equals(Thread.currentThread().getName()) - && !runningThread.isDaemon() && !runningThread.getName().startsWith("pool-"); + && !runningThread.isDaemon() + && !runningThread.getName().startsWith("pool-") + && !runningThread.getName().startsWith(ConcurrentStreamConsumer.CONCURRENT_STREAM_THREAD_NAME); public static final int INTERRUPT_THREAD_DELAY_MINUTES = 60; public static final int EXIT_THREAD_DELAY_MINUTES = 70; diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumer.java index feb7432fd2ee3c..b9ffe8468560c9 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumer.java @@ -8,6 +8,7 @@ import io.airbyte.commons.stream.StreamStatusUtils; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.integrations.base.AirbyteTraceMessageUtility; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.AirbyteMessage; import java.util.ArrayList; import java.util.Collection; @@ -17,7 +18,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.slf4j.Logger; @@ -42,14 +46,29 @@ public class ConcurrentStreamConsumer implements Consumer exceptions; private final Collection futures; private final Consumer> streamConsumer; private final Optional> streamStatusEmitter = Optional.of(AirbyteTraceMessageUtility::emitStreamStatusTrace); + /** + * Constructs a new {@link ConcurrentStreamConsumer} that will use the provided stream consumer to execute each stream submitted to the + * {@link #accept(AutoCloseableIterator)} method of this consumer. Streams submitted to the {@link #accept(AutoCloseableIterator)} method + * will be converted to a {@link Runnable} and executed on an {@link ExecutorService} configured by this consumer to ensure concurrent + * execution of each stream. + * + * @param streamConsumer The {@link Consumer} that accepts streams as an {@link AutoCloseableIterator}. + * @param requestedParallelism The requested amount of parallelism that will be used as a hint to determine + * the appropriate number of threads to execute concurrently. + */ public ConcurrentStreamConsumer(final Consumer> streamConsumer, final Integer requestedParallelism) { - this.executorService = Executors.newFixedThreadPool(computeParallelism(requestedParallelism)); + this.executorService = createExecutorService(requestedParallelism); this.exceptions = new ArrayList<>(); this.futures = new ArrayList<>(); this.streamConsumer = streamConsumer; @@ -63,7 +82,7 @@ public void accept(final AutoCloseableIterator stream) { * passing them to the provided message consumer for further processing. Any exceptions raised * within the thread will be captured and exposed to the caller. */ - futures.add(CompletableFuture.runAsync(() -> executeStream(stream), executorService)); + futures.add(CompletableFuture.runAsync(new ConcurrentStreamRunnable(stream, this), executorService)); } /** @@ -114,18 +133,32 @@ public void waitFor() throws ExecutionException, InterruptedException { */ private Integer computeParallelism(final Integer requestedParallelism) { /* - * TODO What is the correct upper bound here? It should probably be provided by the code that - * creates this iterator to account for the different types of connectors and their limitations. For - * instance, a JDBC-based source connector should probably set the "MAX_THREADS" to the maximum - * number of connections in the database connection pool used by the source. For API-based source - * connectors, this should be the maximum number of concurrent connections from one host/IP address - * that the API supports. + * Selects the default thread pool size based on the provided value via an environment variable + * or the number of available processors if the environment variable is not set/present. This is to + * ensure that we do not over-parallelize unless requested explicitly. */ - final Integer parallelism = Math.min(10, requestedParallelism > 0 ? requestedParallelism : 1); - LOGGER.debug("Computed concurrent stream parallelism: {}", parallelism); + final Integer defaultPoolSize = Optional.ofNullable(System.getenv("DEFAULT_CONCURRENT_STREAM_CONSUMER_THREADS")) + .map(Integer::parseInt) + .orElseGet(() -> Runtime.getRuntime().availableProcessors()); + final Integer parallelism = Math.min(defaultPoolSize, requestedParallelism > 0 ? requestedParallelism : 1); + LOGGER.debug("Computed concurrent stream consumer parallelism: {}", parallelism); return parallelism; } + /** + * Creates the {@link ExecutorService} that will be used by the consumer to consume from the provided + * streams in parallel. + * + * @param requestedParallelism The requested amount of parallelism that will be used as a hint to determine + * the appropriate number of threads to execute concurrently. + * @return The configured {@link ExecutorService}. + */ + private ExecutorService createExecutorService(final Integer requestedParallelism) { + final Integer nThreads = computeParallelism(requestedParallelism); + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), + new ConcurrentStreamThreadFactory(), new AbortPolicy()); + } + /** * Executes the stream by providing it to the configured {@link #streamConsumer}. * @@ -152,4 +185,39 @@ public void close() throws Exception { executorService.awaitTermination(30, TimeUnit.SECONDS); } + /** + * Custom {@link ThreadFactory} that names the threads used to concurrently execute streams. + */ + private static class ConcurrentStreamThreadFactory implements ThreadFactory { + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = new Thread(r); + if (r instanceof ConcurrentStreamRunnable) { + final AutoCloseableIterator stream = ((ConcurrentStreamRunnable)r).stream(); + if (stream.getAirbyteStream().isPresent()) { + final AirbyteStreamNameNamespacePair airbyteStream = stream.getAirbyteStream().get(); + thread.setName(String.format("%s-%s-%s", CONCURRENT_STREAM_THREAD_NAME, airbyteStream.getNamespace(), airbyteStream.getName())); + } else { + thread.setName(CONCURRENT_STREAM_THREAD_NAME); + } + } else { + thread.setName(CONCURRENT_STREAM_THREAD_NAME); + } + return thread; + } + } + + /** + * Custom {@link Runnable} that exposes the stream for thread naming purposes. + * + * @param stream The stream that is part of the {@link Runnable} execution. + * @param consumer The {@link ConcurrentStreamConsumer} that will execute the stream. + */ + private record ConcurrentStreamRunnable(AutoCloseableIterator stream, ConcurrentStreamConsumer consumer) implements Runnable { + @Override + public void run() { + consumer.executeStream(stream); + } + } }