Skip to content

Commit

Permalink
Automated Commit - Format and Process Resources Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored and octavia-squidington-iii committed Jun 13, 2023
1 parent 0299145 commit 385ea1e
Showing 1 changed file with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ public class ConcurrentStreamConsumer implements Consumer<AutoCloseableIterator<
private final Optional<Consumer<AirbyteStreamStatusHolder>> streamStatusEmitter = Optional.of(AirbyteTraceMessageUtility::emitStreamStatusTrace);

/**
* Constructs a new {@link ConcurrentStreamConsumer} that will use the provided stream consumer to execute each stream submitted to the
* {@link #accept(AutoCloseableIterator)} method of this consumer. Streams submitted to the {@link #accept(AutoCloseableIterator)} method
* will be converted to a {@link Runnable} and executed on an {@link ExecutorService} configured by this consumer to ensure concurrent
* execution of each stream.
* Constructs a new {@link ConcurrentStreamConsumer} that will use the provided stream consumer to
* execute each stream submitted to the {@link #accept(AutoCloseableIterator)} method of this
* consumer. Streams submitted to the {@link #accept(AutoCloseableIterator)} method will be
* converted to a {@link Runnable} and executed on an {@link ExecutorService} configured by this
* consumer to ensure concurrent execution of each stream.
*
* @param streamConsumer The {@link Consumer} that accepts streams as an {@link AutoCloseableIterator}.
* @param requestedParallelism The requested amount of parallelism that will be used as a hint to determine
* the appropriate number of threads to execute concurrently.
* @param streamConsumer The {@link Consumer} that accepts streams as an
* {@link AutoCloseableIterator}.
* @param requestedParallelism The requested amount of parallelism that will be used as a hint to
* determine the appropriate number of threads to execute concurrently.
*/
public ConcurrentStreamConsumer(final Consumer<AutoCloseableIterator<AirbyteMessage>> streamConsumer, final Integer requestedParallelism) {
this.executorService = createExecutorService(requestedParallelism);
Expand Down Expand Up @@ -133,8 +135,8 @@ public void waitFor() throws ExecutionException, InterruptedException {
*/
private Integer computeParallelism(final Integer requestedParallelism) {
/*
* Selects the default thread pool size based on the provided value via an environment variable
* or the number of available processors if the environment variable is not set/present. This is to
* Selects the default thread pool size based on the provided value via an environment variable or
* the number of available processors if the environment variable is not set/present. This is to
* ensure that we do not over-parallelize unless requested explicitly.
*/
final Integer defaultPoolSize = Optional.ofNullable(System.getenv("DEFAULT_CONCURRENT_STREAM_CONSUMER_THREADS"))
Expand All @@ -146,11 +148,11 @@ private Integer computeParallelism(final Integer requestedParallelism) {
}

/**
* Creates the {@link ExecutorService} that will be used by the consumer to consume from the provided
* streams in parallel.
* Creates the {@link ExecutorService} that will be used by the consumer to consume from the
* provided streams in parallel.
*
* @param requestedParallelism The requested amount of parallelism that will be used as a hint to determine
* the appropriate number of threads to execute concurrently.
* @param requestedParallelism The requested amount of parallelism that will be used as a hint to
* determine the appropriate number of threads to execute concurrently.
* @return The configured {@link ExecutorService}.
*/
private ExecutorService createExecutorService(final Integer requestedParallelism) {
Expand Down Expand Up @@ -194,7 +196,7 @@ private static class ConcurrentStreamThreadFactory implements ThreadFactory {
public Thread newThread(final Runnable r) {
final Thread thread = new Thread(r);
if (r instanceof ConcurrentStreamRunnable) {
final AutoCloseableIterator<AirbyteMessage> stream = ((ConcurrentStreamRunnable)r).stream();
final AutoCloseableIterator<AirbyteMessage> stream = ((ConcurrentStreamRunnable) r).stream();
if (stream.getAirbyteStream().isPresent()) {
final AirbyteStreamNameNamespacePair airbyteStream = stream.getAirbyteStream().get();
thread.setName(String.format("%s-%s-%s", CONCURRENT_STREAM_THREAD_NAME, airbyteStream.getNamespace(), airbyteStream.getName()));
Expand All @@ -206,6 +208,7 @@ public Thread newThread(final Runnable r) {
}
return thread;
}

}

/**
Expand All @@ -215,9 +218,12 @@ public Thread newThread(final Runnable r) {
* @param consumer The {@link ConcurrentStreamConsumer} that will execute the stream.
*/
private record ConcurrentStreamRunnable(AutoCloseableIterator<AirbyteMessage> stream, ConcurrentStreamConsumer consumer) implements Runnable {

@Override
public void run() {
consumer.executeStream(stream);
}

}

}

0 comments on commit 385ea1e

Please sign in to comment.