Skip to content

Commit

Permalink
add tests, remove some unneeded classes
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Apr 16, 2024
1 parent a2edf60 commit af4f202
Show file tree
Hide file tree
Showing 11 changed files with 380 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import java.io.PrintWriter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -84,7 +83,7 @@ private ActiveWorkState(
}

static ActiveWorkState create(WindmillStateCache.ForComputation computationStateCache) {
return new ActiveWorkState(new HashMap<>(), computationStateCache);
return new ActiveWorkState(new ConcurrentHashMap<>(), computationStateCache);
}

@VisibleForTesting
Expand Down Expand Up @@ -292,8 +291,8 @@ private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt(
return stuckCommits.build();
}

synchronized Map<ShardedKey, Deque<Work>> getReadOnlyActiveWork() {
return Collections.unmodifiableMap(activeWork);
synchronized ImmutableMap<ShardedKey, Deque<Work>> getReadOnlyActiveWork() {
return ImmutableMap.copyOf(activeWork);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private void forceExecute(Work work) {
executor.forceExecute(work, work.getWorkItem().getSerializedSize());
}

public Map<ShardedKey, Deque<Work>> currentActiveWorkReadOnly() {
public ImmutableMap<ShardedKey, Deque<Work>> currentActiveWorkReadOnly() {
return activeWorkState.getReadOnlyActiveWork();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceConfigLoader;
import org.apache.beam.runners.dataflow.worker.streaming.processing.ExecutionStateFactory;
import org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingCommitFinalizer;
import org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingWorkExecutor;
import org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingWorkItemScheduler;
import org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingWorkScheduler;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
Expand All @@ -64,6 +63,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkProcessingContext;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingApplianceFailureTracker;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.WorkFailureProcessor;
Expand Down Expand Up @@ -98,7 +98,7 @@ public final class ApplianceWorkerHarness implements StreamingWorkerHarness {
private final MetricTrackingWindmillServerStub getDataClient;
private final MemoryMonitor memoryMonitor;
private final ComputationStateCache computationCache;
private final StreamingWorkItemScheduler workItemScheduler;
private final StreamingWorkScheduler streamingWorkScheduler;
private final WorkCommitter workCommitter;
private final StreamingWorkerStatusReporter workerStatusReporter;
private final StreamingStatusPages statusPages;
Expand All @@ -116,7 +116,7 @@ private ApplianceWorkerHarness(
MetricTrackingWindmillServerStub getDataClient,
MemoryMonitor memoryMonitor,
ComputationStateCache computationCache,
StreamingWorkItemScheduler workItemScheduler,
StreamingWorkScheduler streamingWorkScheduler,
WorkCommitter workCommitter,
StreamingWorkerStatusReporter workerStatusReporter,
StreamingStatusPages statusPages,
Expand All @@ -132,7 +132,7 @@ private ApplianceWorkerHarness(
this.getDataClient = getDataClient;
this.memoryMonitor = memoryMonitor;
this.computationCache = computationCache;
this.workItemScheduler = workItemScheduler;
this.streamingWorkScheduler = streamingWorkScheduler;
this.workCommitter = workCommitter;
this.workerStatusReporter = workerStatusReporter;
this.statusPages = statusPages;
Expand Down Expand Up @@ -194,9 +194,10 @@ public static ApplianceWorkerHarness fromOptions(DataflowWorkerHarnessOptions op
streamingCounters,
memoryMonitor,
workExecutor);
StreamingWorkExecutor streamingWorkExecutor =
new StreamingWorkExecutor(
StreamingWorkScheduler streamingWorkScheduler =
new StreamingWorkScheduler(
options,
clock,
new ExecutionStateFactory(
options,
IntrinsicMapTaskExecutorFactory.defaultFactory(),
Expand All @@ -220,8 +221,6 @@ public static ApplianceWorkerHarness fromOptions(DataflowWorkerHarnessOptions op
stageInfo,
sampler,
maxWorkItemCommitBytes);
StreamingWorkItemScheduler workScheduler =
new StreamingWorkItemScheduler(clock, streamingWorkExecutor);
AtomicBoolean isRunning = new AtomicBoolean(false);
StreamingStatusPages statusPages =
StreamingWorkerStatusPages.forAppliance(
Expand Down Expand Up @@ -251,7 +250,7 @@ public static ApplianceWorkerHarness fromOptions(DataflowWorkerHarnessOptions op
getDataClient,
memoryMonitor,
computationStateCache,
workScheduler,
streamingWorkScheduler,
applianceWorkCommitter,
workerStatusReporter,
statusPages,
Expand Down Expand Up @@ -346,9 +345,10 @@ public static ApplianceWorkerHarness forTesting(
MetricTrackingWindmillServerStub.builder(windmillServer, memoryMonitor)
.setUseStreamingRequests(false)
.build();
StreamingWorkExecutor streamingWorkExecutor =
new StreamingWorkExecutor(
StreamingWorkScheduler streamingWorkScheduler =
new StreamingWorkScheduler(
options,
clock,
executionStateFactory,
new SideInputStateFetcher(getDataClient::getSideInputData, options),
failureTracker,
Expand Down Expand Up @@ -376,7 +376,7 @@ public static ApplianceWorkerHarness forTesting(
getDataClient,
memoryMonitor,
computationStateCache,
new StreamingWorkItemScheduler(clock, streamingWorkExecutor),
streamingWorkScheduler,
applianceWorkCommitter,
workerStatusReporter,
testStatusPages,
Expand Down Expand Up @@ -532,14 +532,21 @@ private void scheduleWork(
WindmillTimeUtils.windmillToHarnessWatermark(
computationProto.getDependentRealtimeInputWatermark());
for (Windmill.WorkItem workItem : computationProto.getWorkList()) {
workItemScheduler.scheduleWork(
computationState,
inputDataWatermark,
synchronizedProcessingTime,
workItem,
workCommitter::commit,
getDataClient::getStateData,
ImmutableList.of());
WorkProcessingContext workProcessingContext =
WorkProcessingContext.builder(
computationState.getComputationId(),
(id, request) ->
Optional.ofNullable(
getDataClient.getStateData(computationProto.getComputationId(), request)))
.setInputDataWatermark(Preconditions.checkNotNull(inputDataWatermark))
.setSynchronizedProcessingTime(synchronizedProcessingTime)
.setWorkItem(workItem)
.setWorkCommitter(workCommitter::commit)
.setOutputDataWatermark(
WindmillTimeUtils.windmillToHarnessWatermark(workItem.getOutputDataWatermark()))
.build();
streamingWorkScheduler.scheduleWork(
computationState, workProcessingContext, ImmutableList.of());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub;
import org.apache.beam.runners.dataflow.worker.ReaderCache;
import org.apache.beam.runners.dataflow.worker.WindmillComputationKey;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
Expand All @@ -57,8 +58,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig;
import org.apache.beam.runners.dataflow.worker.streaming.processing.ExecutionStateFactory;
import org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingCommitFinalizer;
import org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingWorkExecutor;
import org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingWorkItemScheduler;
import org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingWorkScheduler;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
Expand All @@ -78,6 +78,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkProcessingContext;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingEngineFailureTracker;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.WorkFailureProcessor;
Expand Down Expand Up @@ -116,7 +117,7 @@ public final class DispatchedStreamingEngineWorkerHarness implements StreamingWo
private final WindmillServerStub windmillServer;
private final WorkCommitter workCommitter;
private final MetricTrackingWindmillServerStub getDataClient;
private final StreamingWorkItemScheduler workItemScheduler;
private final StreamingWorkScheduler streamingWorkScheduler;
private final MemoryMonitor memoryMonitor;
private final StreamingWorkerStatusReporter workerStatusReporter;
private final StreamingStatusPages statusPages;
Expand All @@ -133,7 +134,7 @@ private DispatchedStreamingEngineWorkerHarness(
WindmillServerStub windmillServer,
WorkCommitter workCommitter,
MetricTrackingWindmillServerStub getDataClient,
StreamingWorkItemScheduler workItemScheduler,
StreamingWorkScheduler streamingWorkScheduler,
MemoryMonitor memoryMonitor,
StreamingWorkerStatusReporter workerStatusReporter,
StreamingStatusPages statusPages,
Expand All @@ -148,7 +149,7 @@ private DispatchedStreamingEngineWorkerHarness(
this.windmillServer = windmillServer;
this.workCommitter = workCommitter;
this.getDataClient = getDataClient;
this.workItemScheduler = workItemScheduler;
this.streamingWorkScheduler = streamingWorkScheduler;
this.memoryMonitor = memoryMonitor;
this.workerStatusReporter = workerStatusReporter;
this.statusPages = statusPages;
Expand Down Expand Up @@ -243,9 +244,10 @@ public static DispatchedStreamingEngineWorkerHarness fromOptions(
StreamingWorkerEnvironment.mapTaskToBaseNetworkFnInstance(),
stateNameMap,
StreamingWorkerEnvironment.getMaxSinkBytes(options));
StreamingWorkExecutor streamingWorkExecutor =
new StreamingWorkExecutor(
StreamingWorkScheduler streamingWorkScheduler =
new StreamingWorkScheduler(
options,
clock,
executionStateFactory,
new SideInputStateFetcher(getDataClient::getSideInputData, options),
failureTracker,
Expand Down Expand Up @@ -300,7 +302,7 @@ public static DispatchedStreamingEngineWorkerHarness fromOptions(
windmillServer,
workCommitter,
getDataClient,
new StreamingWorkItemScheduler(clock, streamingWorkExecutor),
streamingWorkScheduler,
memoryMonitor,
workerStatusReporter,
statusPages,
Expand Down Expand Up @@ -411,9 +413,10 @@ public static DispatchedStreamingEngineWorkerHarness forTesting(
StreamingWorkerEnvironment.mapTaskToBaseNetworkFnInstance(),
stateNameMap,
StreamingWorkerEnvironment.getMaxSinkBytes(options));
StreamingWorkExecutor streamingWorkExecutor =
new StreamingWorkExecutor(
StreamingWorkScheduler streamingWorkScheduler =
new StreamingWorkScheduler(
options,
clock,
executionStateFactory,
new SideInputStateFetcher(getDataClient::getSideInputData, options),
failureTracker,
Expand Down Expand Up @@ -443,7 +446,7 @@ public static DispatchedStreamingEngineWorkerHarness forTesting(
windmillServer,
workCommitter,
getDataClient,
new StreamingWorkItemScheduler(clock, streamingWorkExecutor),
streamingWorkScheduler,
memoryMonitor,
workerStatusReporter,
testStatusPages,
Expand Down Expand Up @@ -591,14 +594,22 @@ void streamingDispatchLoop() {
Optional<ComputationState> computationState =
computationStateCache.getComputationState(computationId);
if (computationState.isPresent()) {
workItemScheduler.scheduleWork(
computationState.get(),
Preconditions.checkNotNull(inputDataWatermark),
synchronizedProcessingTime,
workItem,
workCommitter::commit,
getDataClient::getStateData,
getWorkStreamLatencies);
WorkProcessingContext workProcessingContext =
WorkProcessingContext.builder(
computationId,
(id, request) ->
Optional.ofNullable(
getDataClient.getStateData(computationId, request)))
.setInputDataWatermark(Preconditions.checkNotNull(inputDataWatermark))
.setSynchronizedProcessingTime(synchronizedProcessingTime)
.setWorkItem(workItem)
.setWorkCommitter(workCommitter::commit)
.setOutputDataWatermark(
WindmillTimeUtils.windmillToHarnessWatermark(
workItem.getOutputDataWatermark()))
.build();
streamingWorkScheduler.scheduleWork(
computationState.get(), workProcessingContext, getWorkStreamLatencies);
} else {
LOG.warn(
"Computation computationId={} is unknown. Known computations:{}",
Expand Down

0 comments on commit af4f202

Please sign in to comment.