Skip to content

Commit

Permalink
integrate direct path code path in streaming dataflow worker
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Apr 4, 2024
1 parent c44b454 commit ec12c72
Show file tree
Hide file tree
Showing 63 changed files with 3,726 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public DataflowWorkProgressUpdater(
super(worker, Integer.MAX_VALUE);
this.workItemStatusClient = workItemStatusClient;
this.workItem = workItem;
this.hotKeyLogger = new HotKeyLogger();
this.hotKeyLogger = HotKeyLogger.ofSystemClock();
this.options = options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class DataflowWorkUnitClient implements WorkUnitClient {
public class DataflowWorkUnitClient implements WorkUnitClient {
private final Logger logger;

/**
Expand All @@ -86,7 +86,7 @@ class DataflowWorkUnitClient implements WorkUnitClient {
*
* @param options The pipeline options.
*/
DataflowWorkUnitClient(DataflowWorkerHarnessOptions options, Logger logger) {
public DataflowWorkUnitClient(DataflowWorkerHarnessOptions options, Logger logger) {
this.dataflow = options.getDataflowClient();
this.options = options;
this.logger = logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,38 @@
package org.apache.beam.runners.dataflow.worker;

import com.google.api.client.util.Clock;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class HotKeyLogger {
private final Logger LOG = LoggerFactory.getLogger(HotKeyLogger.class);

/** Clock used to either provide real system time or mocked to virtualize time for testing. */
private Clock clock = Clock.SYSTEM;

private final Clock clock;
/** Throttles logging the detection to every loggingPeriod */
private final Duration loggingPeriod = Duration.standardMinutes(5);
/**
* The previous time the HotKeyDetection was logged. This is used to throttle logging to every 5
* minutes.
*/
private long prevHotKeyDetectionLogMs = 0;

/** Throttles logging the detection to every loggingPeriod */
private final Duration loggingPeriod = Duration.standardMinutes(5);
private HotKeyLogger(Clock clock) {
this.clock = clock;
}

HotKeyLogger() {}
public static HotKeyLogger ofSystemClock() {
return new HotKeyLogger(Clock.SYSTEM);
}

HotKeyLogger(Clock clock) {
this.clock = clock;
@VisibleForTesting
static HotKeyLogger forTesting(Clock testClock) {
return new HotKeyLogger(testClock);
}

/** Logs a detection of the hot key every 5 minutes. */
Expand Down Expand Up @@ -80,7 +88,8 @@ public void logHotKeyDetection(String userStepName, Duration hotKeyAge, Object h
* Returns true if the class should log the HotKeyMessage. This method throttles logging to every
* 5 minutes.
*/
protected boolean isThrottled() {
@VisibleForTesting
boolean isThrottled() {
// Throttle logging the HotKeyDetection to every 5 minutes.
long nowMs = clock.currentTimeMillis();
if (nowMs - prevHotKeyDetectionLogMs < loggingPeriod.getMillis()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ public abstract static class Builder {

abstract Builder setServer(WindmillServerStub server);

abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);
public abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);

abstract Builder setUseStreamingRequests(boolean useStreamingRequests);
public abstract Builder setUseStreamingRequests(boolean useStreamingRequests);

abstract Builder setUseSeparateHeartbeatStreams(boolean useSeparateHeartbeatStreams);
public abstract Builder setUseSeparateHeartbeatStreams(boolean useSeparateHeartbeatStreams);

abstract Builder setNumGetDataStreams(int numGetDataStreams);
public abstract Builder setNumGetDataStreams(int numGetDataStreams);

abstract MetricTrackingWindmillServerStub build();
public abstract MetricTrackingWindmillServerStub build();
}

public static Builder builder(WindmillServerStub server, MemoryMonitor gcThrashingMonitor) {
Expand Down Expand Up @@ -254,6 +254,20 @@ public Windmill.KeyedGetDataResponse getStateData(
}
}

public Windmill.KeyedGetDataResponse getStateData(
GetDataStream stream, String computation, Windmill.KeyedGetDataRequest request) {
gcThrashingMonitor.waitForResources("GetStateData");
activeStateReads.getAndIncrement();

try {
return stream.requestKeyedData(computation, request);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
activeStateReads.getAndDecrement();
}
}

public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request) {
gcThrashingMonitor.waitForResources("GetSideInputData");
activeSideInputs.getAndIncrement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class ReaderCache {
public class ReaderCache {

private static final Logger LOG = LoggerFactory.getLogger(ReaderCache.class);
private final Executor invalidationExecutor;
Expand Down Expand Up @@ -67,7 +67,7 @@ private static class CacheEntry {
private final Cache<WindmillComputationKey, CacheEntry> cache;

/** Cache reader for {@code cacheDuration}. Readers will be closed on {@code executor}. */
ReaderCache(Duration cacheDuration, Executor invalidationExecutor) {
public ReaderCache(Duration cacheDuration, Executor invalidationExecutor) {
this.invalidationExecutor = invalidationExecutor;
this.cache =
CacheBuilder.newBuilder()
Expand Down Expand Up @@ -137,7 +137,7 @@ void cacheReader(
}

/** If a reader is cached for this key, remove and close it. */
void invalidateReader(WindmillComputationKey computationKey) {
public void invalidateReader(WindmillComputationKey computationKey) {
// use an invalid cache token that will trigger close.
acquireReader(computationKey, -1L, -1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@
import org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider;
import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState;
import org.apache.beam.runners.dataflow.worker.streaming.KeyCommitTooLargeException;
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
import org.apache.beam.runners.dataflow.worker.streaming.computations.ComputationState;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingEngineDirectPathWorkerHarness;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerHarness;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
Expand Down Expand Up @@ -153,7 +155,7 @@
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class StreamingDataflowWorker {
public class StreamingDataflowWorker implements StreamingWorkerHarness {

// TODO(https://github.com/apache/beam/issues/19632): Update throttling counters to use generic
// throttling-msecs metric.
Expand Down Expand Up @@ -443,7 +445,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
IntrinsicMapTaskExecutorFactory.defaultFactory(),
dataflowServiceClient,
options,
new HotKeyLogger(),
HotKeyLogger.ofSystemClock(),
clock,
workerStatusReporter,
failureTracker,
Expand Down Expand Up @@ -599,8 +601,10 @@ public static void main(String[] args) throws Exception {
StreamingDataflowWorker.class.getSimpleName());

LOG.debug("Creating StreamingDataflowWorker from options: {}", options);
StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options);

StreamingWorkerHarness worker =
isDirectPathPipeline(options)
? StreamingEngineDirectPathWorkerHarness.fromOptions(options)
: StreamingDataflowWorker.fromOptions(options);
// Use the MetricsLogger container which is used by BigQueryIO to periodically log process-wide
// metrics.
MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
Expand All @@ -615,6 +619,12 @@ public static void main(String[] args) throws Exception {
worker.start();
}

private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) {
return options.isEnableStreamingEngine()
&& options.getIsWindmillServiceDirectPathEnabled()
&& options.getDataflowServiceOptions().contains("enable_private_ipv6_google_access");
}

private static WindmillServerStub createWindmillServerStub(
DataflowWorkerHarnessOptions options,
GrpcWindmillStreamFactory windmillStreamFactory,
Expand Down Expand Up @@ -709,6 +719,7 @@ int numCommitThreads() {
}

@SuppressWarnings("FutureReturnValueIgnored")
@Override
public void start() {
running.set(true);

Expand Down Expand Up @@ -764,13 +775,15 @@ public void start() {
activeWorkRefresher.start();
}

@Override
public void startStatusPages() {
if (debugCaptureManager != null) {
debugCaptureManager.start();
}

if (windmillServiceEnabled) {
ChannelzServlet channelzServlet = new ChannelzServlet(CHANNELZ_PATH, options, windmillServer);
ChannelzServlet channelzServlet =
new ChannelzServlet(CHANNELZ_PATH, options, windmillServer::getWindmillServiceEndpoints);
statusPages.addServlet(channelzServlet);
statusPages.addCapturePage(channelzServlet);
}
Expand All @@ -788,6 +801,7 @@ public void startStatusPages() {
statusPages.start();
}

@Override
public void stop() {
try {
for (ScheduledExecutorService timer : scheduledExecutors) {
Expand Down Expand Up @@ -1067,14 +1081,14 @@ private void process(
stageInfoMap.computeIfAbsent(
mapTask.getStageName(), s -> StageInfo.create(s, mapTask.getSystemName()));

ExecutionState executionState = null;
@Nullable ExecutionState executionState = null;
String counterName = "dataflow_source_bytes_processed-" + mapTask.getSystemName();

try {
if (work.isFailed()) {
throw new WorkItemCancelledException(workItem.getShardingKey());
}
executionState = computationState.getExecutionStateQueue().poll();
executionState = computationState.getExecutionState().orElse(null);
if (executionState == null) {
MutableNetwork<Node, Edge> mapTaskNetwork = mapTaskToNetwork.apply(mapTask);
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -1257,13 +1271,13 @@ private void process(
commitCallbacks.putAll(executionState.context().flushState());

// Release the execution state for another thread to use.
computationState.getExecutionStateQueue().offer(executionState);
computationState.releaseExecutionState(executionState);
executionState = null;

// Add the output to the commit queue.
work.setState(State.COMMIT_QUEUED);
outputBuilder.addAllPerWorkItemLatencyAttributions(
work.getLatencyAttributions(false, work.getLatencyTrackingId(), sampler));
work.getLatencyAttributions(false, sampler));

WorkItemCommitRequest commitRequest = outputBuilder.build();
int byteLimit = maxWorkItemCommitBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.status;

import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.PrintWriter;
Expand All @@ -25,33 +26,42 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;

/**
* General servlet for providing a bunch of information on the statusz page.
*
* <p><b>Not actually serializable</b>. Its superclass is serializable but this subclass is not.
*/
@SuppressFBWarnings("SE_BAD_FIELD") // not serializable
public class StatuszServlet extends BaseStatusServlet implements Capturable {
public final class StatuszServlet extends BaseStatusServlet implements Capturable {

private static class DataProviderInfo {
private final String longName;
private final StatusDataProvider dataProvider;

public DataProviderInfo(String longName, StatusDataProvider dataProvider) {
this.longName = longName;
this.dataProvider = dataProvider;
@AutoValue
abstract static class DataProviderInfo {
private static DataProviderInfo create(String longName, StatusDataProvider dataProvider) {
return new AutoValue_StatuszServlet_DataProviderInfo(longName, dataProvider);
}

abstract String longName();

abstract StatusDataProvider dataProvider();
}

private LinkedHashMap<String, DataProviderInfo> dataProviders = new LinkedHashMap<>();
private final LinkedHashMap<String, DataProviderInfo> dataProviders = new LinkedHashMap<>();

public StatuszServlet() {
super("statusz");
}

public void addDataProvider(String shortName, String longName, StatusDataProvider provider) {
dataProviders.put(shortName, new DataProviderInfo(longName, provider));
Preconditions.checkState(
!dataProviders.containsKey(shortName),
"key={} is already mapped to dataProvider={}. Choose a different key for new StatusProvider [name={}, dataProvider={}].",
shortName,
dataProviders.get(shortName),
longName,
provider);
dataProviders.put(shortName, DataProviderInfo.create(longName, provider));
}

@Override
Expand All @@ -75,10 +85,10 @@ public void captureData(PrintWriter writer) {

for (DataProviderInfo info : dataProviders.values()) {
writer.print("<h2>");
writer.print(info.longName);
writer.print(info.longName());
writer.println("</h2>");

info.dataProvider.appendSummaryHtml(writer);
info.dataProvider().appendSummaryHtml(writer);
}
writer.println("</html>");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class WorkerStatusPages {

public final class WorkerStatusPages {
private static final Logger LOG = LoggerFactory.getLogger(WorkerStatusPages.class);

private final Server statusServer;
private final List<Capturable> capturePages;
private final StatuszServlet statuszServlet = new StatuszServlet();
private final ThreadzServlet threadzServlet = new ThreadzServlet();
private final ServletHandler servletHandler = new ServletHandler();

@VisibleForTesting
Expand All @@ -56,6 +54,7 @@ public class WorkerStatusPages {
this.statusServer.setHandler(servletHandler);

// Install the default servlets (threadz, healthz, heapz, jfrz, statusz)
ThreadzServlet threadzServlet = new ThreadzServlet();
addServlet(threadzServlet);
addServlet(new HealthzServlet(healthyIndicator));
addServlet(new HeapzServlet(memoryMonitor));
Expand Down

0 comments on commit ec12c72

Please sign in to comment.