Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -57,7 +59,16 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
*/
private static final long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0;
private final EvaluationContext evaluationContext;
@VisibleForTesting final ExecutorService executor = Executors.newCachedThreadPool();

// TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner.
@VisibleForTesting
final ExecutorService executor =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setThreadFactory(MoreExecutors.platformThreadFactory())
.setDaemon(true)
.setNameFormat("direct-dynamic-split-requester")
.build());

private final long minimumDynamicSplitSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -51,7 +53,13 @@
class DirectMetrics extends MetricResults {

// TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner.
private static final ExecutorService COUNTER_COMMITTER = Executors.newCachedThreadPool();
private static final ExecutorService COUNTER_COMMITTER =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setThreadFactory(MoreExecutors.platformThreadFactory())
.setDaemon(true)
.setNameFormat("direct-metrics-counter-committer")
.build());

private interface MetricAggregation<UpdateT, ResultT> {
UpdateT zero();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ private EvaluationContext(
this.applicationStateInternals = new ConcurrentHashMap<>();
this.metrics = new DirectMetrics();

this.callbackExecutor =
WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
this.callbackExecutor = WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
}

public void initialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -132,7 +134,15 @@ private ExecutorServiceParallelExecutor(
Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
EvaluationContext context) {
this.targetParallelism = targetParallelism;
this.executorService = Executors.newFixedThreadPool(targetParallelism);
// Don't use Daemon threads for workers. The Pipeline should continue to execute even if there
// are no other active threads (for example, because waitUntilFinish was not called)
this.executorService =
Executors.newFixedThreadPool(
targetParallelism,
new ThreadFactoryBuilder()
.setThreadFactory(MoreExecutors.platformThreadFactory())
.setNameFormat("direct-runner-worker")
.build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment why this one is not a daemon?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

this.graph = graph;
this.rootProviderRegistry = rootProviderRegistry;
this.registry = registry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -171,7 +173,14 @@ public <AdditionalOutputT> void outputWindowedValue(
outputWindowedValue,
evaluationContext.createSideInputReader(transform.getSideInputs()),
// TODO: For better performance, use a higher-level executor?
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
// TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the
// DirectRunner.
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setThreadFactory(MoreExecutors.platformThreadFactory())
.setDaemon(true)
.setNameFormat("direct-splittable-process-element-checkpoint-executor")
.build()),
10000,
Duration.standardSeconds(10)));

Expand Down