Skip to content

Commit

Permalink
Exclude concurrent stream threads from orphan thread watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jun 13, 2023
1 parent 491e8be commit 0299145
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public class IntegrationRunner {
*/
@VisibleForTesting
static final Predicate<Thread> ORPHANED_THREAD_FILTER = runningThread -> !runningThread.getName().equals(Thread.currentThread().getName())
&& !runningThread.isDaemon() && !runningThread.getName().startsWith("pool-");
&& !runningThread.isDaemon()
&& !runningThread.getName().startsWith("pool-")
&& !runningThread.getName().startsWith(ConcurrentStreamConsumer.CONCURRENT_STREAM_THREAD_NAME);

public static final int INTERRUPT_THREAD_DELAY_MINUTES = 60;
public static final int EXIT_THREAD_DELAY_MINUTES = 70;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.commons.stream.StreamStatusUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -17,7 +18,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
Expand All @@ -42,14 +46,29 @@ public class ConcurrentStreamConsumer implements Consumer<AutoCloseableIterator<

private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentStreamConsumer.class);

/**
* Name of threads spawned by the {@link ConcurrentStreamConsumer}.
*/
public static final String CONCURRENT_STREAM_THREAD_NAME = "concurrent-stream-thread";

private final ExecutorService executorService;
private final List<Exception> exceptions;
private final Collection<CompletableFuture> futures;
private final Consumer<AutoCloseableIterator<AirbyteMessage>> streamConsumer;
private final Optional<Consumer<AirbyteStreamStatusHolder>> streamStatusEmitter = Optional.of(AirbyteTraceMessageUtility::emitStreamStatusTrace);

/**
* Constructs a new {@link ConcurrentStreamConsumer} that will use the provided stream consumer to execute each stream submitted to the
* {@link #accept(AutoCloseableIterator)} method of this consumer. Streams submitted to the {@link #accept(AutoCloseableIterator)} method
* will be converted to a {@link Runnable} and executed on an {@link ExecutorService} configured by this consumer to ensure concurrent
* execution of each stream.
*
* @param streamConsumer The {@link Consumer} that accepts streams as an {@link AutoCloseableIterator}.
* @param requestedParallelism The requested amount of parallelism that will be used as a hint to determine
* the appropriate number of threads to execute concurrently.
*/
public ConcurrentStreamConsumer(final Consumer<AutoCloseableIterator<AirbyteMessage>> streamConsumer, final Integer requestedParallelism) {
this.executorService = Executors.newFixedThreadPool(computeParallelism(requestedParallelism));
this.executorService = createExecutorService(requestedParallelism);
this.exceptions = new ArrayList<>();
this.futures = new ArrayList<>();
this.streamConsumer = streamConsumer;
Expand All @@ -63,7 +82,7 @@ public void accept(final AutoCloseableIterator<AirbyteMessage> stream) {
* passing them to the provided message consumer for further processing. Any exceptions raised
* within the thread will be captured and exposed to the caller.
*/
futures.add(CompletableFuture.runAsync(() -> executeStream(stream), executorService));
futures.add(CompletableFuture.runAsync(new ConcurrentStreamRunnable(stream, this), executorService));
}

/**
Expand Down Expand Up @@ -114,18 +133,32 @@ public void waitFor() throws ExecutionException, InterruptedException {
*/
private Integer computeParallelism(final Integer requestedParallelism) {
/*
* TODO What is the correct upper bound here? It should probably be provided by the code that
* creates this iterator to account for the different types of connectors and their limitations. For
* instance, a JDBC-based source connector should probably set the "MAX_THREADS" to the maximum
* number of connections in the database connection pool used by the source. For API-based source
* connectors, this should be the maximum number of concurrent connections from one host/IP address
* that the API supports.
* Selects the default thread pool size based on the provided value via an environment variable
* or the number of available processors if the environment variable is not set/present. This is to
* ensure that we do not over-parallelize unless requested explicitly.
*/
final Integer parallelism = Math.min(10, requestedParallelism > 0 ? requestedParallelism : 1);
LOGGER.debug("Computed concurrent stream parallelism: {}", parallelism);
final Integer defaultPoolSize = Optional.ofNullable(System.getenv("DEFAULT_CONCURRENT_STREAM_CONSUMER_THREADS"))
.map(Integer::parseInt)
.orElseGet(() -> Runtime.getRuntime().availableProcessors());
final Integer parallelism = Math.min(defaultPoolSize, requestedParallelism > 0 ? requestedParallelism : 1);
LOGGER.debug("Computed concurrent stream consumer parallelism: {}", parallelism);
return parallelism;
}

/**
* Creates the {@link ExecutorService} that will be used by the consumer to consume from the provided
* streams in parallel.
*
* @param requestedParallelism The requested amount of parallelism that will be used as a hint to determine
* the appropriate number of threads to execute concurrently.
* @return The configured {@link ExecutorService}.
*/
private ExecutorService createExecutorService(final Integer requestedParallelism) {
final Integer nThreads = computeParallelism(requestedParallelism);
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
new ConcurrentStreamThreadFactory(), new AbortPolicy());
}

/**
* Executes the stream by providing it to the configured {@link #streamConsumer}.
*
Expand All @@ -152,4 +185,39 @@ public void close() throws Exception {
executorService.awaitTermination(30, TimeUnit.SECONDS);
}

/**
* Custom {@link ThreadFactory} that names the threads used to concurrently execute streams.
*/
private static class ConcurrentStreamThreadFactory implements ThreadFactory {

@Override
public Thread newThread(final Runnable r) {
final Thread thread = new Thread(r);
if (r instanceof ConcurrentStreamRunnable) {
final AutoCloseableIterator<AirbyteMessage> stream = ((ConcurrentStreamRunnable)r).stream();
if (stream.getAirbyteStream().isPresent()) {
final AirbyteStreamNameNamespacePair airbyteStream = stream.getAirbyteStream().get();
thread.setName(String.format("%s-%s-%s", CONCURRENT_STREAM_THREAD_NAME, airbyteStream.getNamespace(), airbyteStream.getName()));
} else {
thread.setName(CONCURRENT_STREAM_THREAD_NAME);
}
} else {
thread.setName(CONCURRENT_STREAM_THREAD_NAME);
}
return thread;
}
}

/**
* Custom {@link Runnable} that exposes the stream for thread naming purposes.
*
* @param stream The stream that is part of the {@link Runnable} execution.
* @param consumer The {@link ConcurrentStreamConsumer} that will execute the stream.
*/
private record ConcurrentStreamRunnable(AutoCloseableIterator<AirbyteMessage> stream, ConcurrentStreamConsumer consumer) implements Runnable {
@Override
public void run() {
consumer.executeStream(stream);
}
}
}

0 comments on commit 0299145

Please sign in to comment.