Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add direct path code path #30764

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@ public class MetricsLogger extends MetricsContainerImpl {
AtomicLong lastReportedMillis = new AtomicLong(System.currentTimeMillis());
@Nullable MetricsContainerImpl lastMetricsSnapshot = null;

public MetricsLogger(@Nullable String stepName) {
private MetricsLogger(@Nullable String stepName) {
super(stepName);
}

public static MetricsLogger createUnboundedMetricsLogger() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unbounded is confusing with bounded/unbounded pcollection.
How about workerMetricsLogger if it is metrics not scoped to a step?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return new MetricsLogger(null);
}

public static MetricsLogger forStep(String stepName) {
return new MetricsLogger(stepName);
}

public String generateLogMessage(
String header, Set<String> allowedMetricUrns, long lastReported) {
MetricsContainerImpl nextMetricsSnapshot = new MetricsContainerImpl(this.stepName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testGeneratedLogMessageShowsDeltas() {
MonitoringInfoConstants.Urns.ELEMENT_COUNT,
Collections.singletonMap("name", "histogram"));

MetricsLogger logger = new MetricsLogger(null);
MetricsLogger logger = MetricsLogger.createUnboundedMetricsLogger();
logger.getCounter(cName).inc(2L);
// Set buckets counts to: [0,1,1,,0,0,...]
logger.getHistogram(hName, bucketType).update(1);
Expand Down
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,4 @@ checkstyleMain.enabled = false
checkstyleTest.enabled = false
//TODO(https://github.com/apache/beam/issues/19119): javadoc task should be enabled in the future.
javadoc.enabled = false
test.outputs.upToDateWhen {false}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public abstract class DataflowExecutionStateRegistry {
public DataflowOperationContext.DataflowExecutionState getState(
final NameContext nameContext,
final String stateName,
final MetricsContainer container,
final @Nullable MetricsContainer container,
final ProfileScope profileScope) {
return getStateInternal(nameContext, stateName, null, null, container, profileScope);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public DataflowWorkProgressUpdater(
super(worker, Integer.MAX_VALUE);
this.workItemStatusClient = workItemStatusClient;
this.workItem = workItem;
this.hotKeyLogger = new HotKeyLogger();
this.hotKeyLogger = HotKeyLogger.ofSystemClock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can cleanups be moved to separate PRs? Less churn if things are reverted and easier to review and summarize with commit description.

this.options = options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkProgressUpdater;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
Expand All @@ -65,7 +66,8 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class DataflowWorkUnitClient implements WorkUnitClient {
@Internal
public class DataflowWorkUnitClient implements WorkUnitClient {
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
private final Logger logger;

/**
Expand All @@ -87,7 +89,7 @@ class DataflowWorkUnitClient implements WorkUnitClient {
*
* @param options The pipeline options.
*/
DataflowWorkUnitClient(DataflowWorkerHarnessOptions options, Logger logger) {
public DataflowWorkUnitClient(DataflowWorkerHarnessOptions options, Logger logger) {
this.dataflow = options.getDataflowClient();
this.options = options;
this.logger = logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,41 @@
package org.apache.beam.runners.dataflow.worker;

import com.google.api.client.util.Clock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
@Internal
public class HotKeyLogger {
private final Logger LOG = LoggerFactory.getLogger(HotKeyLogger.class);

/** Clock used to either provide real system time or mocked to virtualize time for testing. */
private Clock clock = Clock.SYSTEM;

private final Clock clock;
/** Throttles logging the detection to every loggingPeriod */
private final Duration loggingPeriod = Duration.standardMinutes(5);
/**
* The previous time the HotKeyDetection was logged. This is used to throttle logging to every 5
* minutes.
*/
private long prevHotKeyDetectionLogMs = 0;

/** Throttles logging the detection to every loggingPeriod */
private final Duration loggingPeriod = Duration.standardMinutes(5);
private HotKeyLogger(Clock clock) {
this.clock = clock;
}

HotKeyLogger() {}
public static HotKeyLogger ofSystemClock() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just name create?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return new HotKeyLogger(Clock.SYSTEM);
}

HotKeyLogger(Clock clock) {
this.clock = clock;
@VisibleForTesting
static HotKeyLogger forTesting(Clock testClock) {
return new HotKeyLogger(testClock);
}

/** Logs a detection of the hot key every 5 minutes. */
Expand All @@ -62,7 +73,7 @@ public void logHotKeyDetection(String userStepName, Duration hotKeyAge) {
}

/** Logs a detection of the hot key every 5 minutes with the given key. */
public void logHotKeyDetection(String userStepName, Duration hotKeyAge, Object hotkey) {
public void logHotKeyDetection(String userStepName, Duration hotKeyAge, @Nullable Object hotkey) {
if (isThrottled()) {
return;
}
Expand All @@ -80,7 +91,8 @@ public void logHotKeyDetection(String userStepName, Duration hotKeyAge, Object h
* Returns true if the class should log the HotKeyMessage. This method throttles logging to every
* 5 minutes.
*/
protected boolean isThrottled() {
@VisibleForTesting
boolean isThrottled() {
// Throttle logging the HotKeyDetection to every 5 minutes.
long nowMs = clock.currentTimeMillis();
if (nowMs - prevHotKeyDetectionLogMs < loggingPeriod.getMillis()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ public abstract static class Builder {

abstract Builder setServer(WindmillServerStub server);

abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);
public abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);

abstract Builder setUseStreamingRequests(boolean useStreamingRequests);
public abstract Builder setUseStreamingRequests(boolean useStreamingRequests);

abstract Builder setUseSeparateHeartbeatStreams(boolean useSeparateHeartbeatStreams);
public abstract Builder setUseSeparateHeartbeatStreams(boolean useSeparateHeartbeatStreams);

abstract Builder setNumGetDataStreams(int numGetDataStreams);
public abstract Builder setNumGetDataStreams(int numGetDataStreams);

abstract MetricTrackingWindmillServerStub build();
public abstract MetricTrackingWindmillServerStub build();
}

public static Builder builder(WindmillServerStub server, MemoryMonitor gcThrashingMonitor) {
Expand Down Expand Up @@ -254,6 +254,20 @@ public Windmill.KeyedGetDataResponse getStateData(
}
}

public Windmill.KeyedGetDataResponse getStateData(
GetDataStream stream, String computation, Windmill.KeyedGetDataRequest request) {
gcThrashingMonitor.waitForResources("GetStateData");
activeStateReads.getAndIncrement();

try {
return stream.requestKeyedData(computation, request);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
activeStateReads.getAndDecrement();
}
}

public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request) {
gcThrashingMonitor.waitForResources("GetSideInputData");
activeSideInputs.getAndIncrement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
Expand All @@ -39,7 +40,8 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class ReaderCache {
@Internal
public class ReaderCache {
scwhittle marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger LOG = LoggerFactory.getLogger(ReaderCache.class);
private final Executor invalidationExecutor;
Expand Down Expand Up @@ -67,7 +69,7 @@ private static class CacheEntry {
private final Cache<WindmillComputationKey, CacheEntry> cache;

/** Cache reader for {@code cacheDuration}. Readers will be closed on {@code executor}. */
ReaderCache(Duration cacheDuration, Executor invalidationExecutor) {
public ReaderCache(Duration cacheDuration, Executor invalidationExecutor) {
this.invalidationExecutor = invalidationExecutor;
this.cache =
CacheBuilder.newBuilder()
Expand Down Expand Up @@ -137,7 +139,7 @@ void cacheReader(
}

/** If a reader is cached for this key, remove and close it. */
void invalidateReader(WindmillComputationKey computationKey) {
public void invalidateReader(WindmillComputationKey computationKey) {
// use an invalid cache token that will trigger close.
acquireReader(computationKey, -1L, -1);
}
Expand Down