Skip to content

Commit

Permalink
some code clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Apr 3, 2024
1 parent 2a62641 commit 87a043e
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public DataflowWorkProgressUpdater(
super(worker, Integer.MAX_VALUE);
this.workItemStatusClient = workItemStatusClient;
this.workItem = workItem;
this.hotKeyLogger = new HotKeyLogger();
this.hotKeyLogger = HotKeyLogger.ofSystemClock();
this.options = options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,38 @@
package org.apache.beam.runners.dataflow.worker;

import com.google.api.client.util.Clock;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class HotKeyLogger {
private final Logger LOG = LoggerFactory.getLogger(HotKeyLogger.class);

/** Clock used to either provide real system time or mocked to virtualize time for testing. */
private Clock clock = Clock.SYSTEM;

private final Clock clock;
/** Throttles logging the detection to every loggingPeriod */
private final Duration loggingPeriod = Duration.standardMinutes(5);
/**
* The previous time the HotKeyDetection was logged. This is used to throttle logging to every 5
* minutes.
*/
private long prevHotKeyDetectionLogMs = 0;

/** Throttles logging the detection to every loggingPeriod */
private final Duration loggingPeriod = Duration.standardMinutes(5);
private HotKeyLogger(Clock clock) {
this.clock = clock;
}

public HotKeyLogger() {}
public static HotKeyLogger ofSystemClock() {
return new HotKeyLogger(Clock.SYSTEM);
}

HotKeyLogger(Clock clock) {
this.clock = clock;
@VisibleForTesting
static HotKeyLogger forTesting(Clock testClock) {
return new HotKeyLogger(testClock);
}

/** Logs a detection of the hot key every 5 minutes. */
Expand Down Expand Up @@ -80,7 +88,8 @@ public void logHotKeyDetection(String userStepName, Duration hotKeyAge, Object h
* Returns true if the class should log the HotKeyMessage. This method throttles logging to every
* 5 minutes.
*/
protected boolean isThrottled() {
@VisibleForTesting
boolean isThrottled() {
// Throttle logging the HotKeyDetection to every 5 minutes.
long nowMs = clock.currentTimeMillis();
if (nowMs - prevHotKeyDetectionLogMs < loggingPeriod.getMillis()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
IntrinsicMapTaskExecutorFactory.defaultFactory(),
dataflowServiceClient,
options,
new HotKeyLogger(),
HotKeyLogger.ofSystemClock(),
clock,
workerStatusReporter,
failureTracker,
Expand Down Expand Up @@ -1277,7 +1277,7 @@ private void process(
// Add the output to the commit queue.
work.setState(State.COMMIT_QUEUED);
outputBuilder.addAllPerWorkItemLatencyAttributions(
work.getLatencyAttributions(false, work.getLatencyTrackingId(), sampler));
work.getLatencyAttributions(false, sampler));

WorkItemCommitRequest commitRequest = outputBuilder.build();
int byteLimit = maxWorkItemCommitBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
import org.apache.beam.sdk.coders.Coder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoValue
public abstract class ExecutionState {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionState.class);

public abstract DataflowWorkExecutor workExecutor();

Expand All @@ -51,4 +54,13 @@ public abstract static class Builder {

public abstract ExecutionState build();
}

public final void close() {
try {
context().invalidateCache();
workExecutor().close();
} catch (Exception e) {
LOG.warn("Failed to close map task executor: ", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.ActiveLatencyBreakdown;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.ActiveLatencyBreakdown.ActiveElementMetadata;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.ActiveLatencyBreakdown.Distribution;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
import org.apache.beam.runners.dataflow.worker.windmill.work.ProcessWorkItemClient;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
Expand All @@ -46,18 +47,19 @@
@NotThreadSafe
public class Work implements Runnable {
private final @Nullable ProcessWorkItemClient processWorkItemClient;
private final Windmill.WorkItem workItem;
private final WorkItem workItem;
private final Supplier<Instant> clock;
private final Instant startTime;
private final Map<Windmill.LatencyAttribution.State, Duration> totalDurationPerState;
private final Consumer<Work> processWorkFn;
private final WorkId id;
private final String latencyTrackingId;
private TimedState currentState;
private volatile boolean isFailed;

private Work(
@Nullable ProcessWorkItemClient processWorkItemClient,
Windmill.WorkItem workItem,
WorkItem workItem,
Supplier<Instant> clock,
Consumer<Work> processWorkFn) {
this.processWorkItemClient = processWorkItemClient;
Expand All @@ -73,10 +75,11 @@ private Work(
.setCacheToken(workItem.getCacheToken())
.setWorkToken(workItem.getWorkToken())
.build();
this.latencyTrackingId = buildLatencyTrackingId(workItem);
}

public static Work create(
Windmill.WorkItem workItem,
WorkItem workItem,
Supplier<Instant> clock,
Collection<Windmill.LatencyAttribution> getWorkStreamLatencies,
Consumer<Work> processWorkFn) {
Expand All @@ -100,12 +103,57 @@ public static Work create(
return work;
}

private static String buildLatencyTrackingId(WorkItem workItem) {
return Long.toHexString(workItem.getShardingKey())
+ '-'
+ Long.toHexString(workItem.getWorkToken());
}

private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(
boolean isHeartbeat,
LatencyAttribution.Builder builder,
String workId,
DataflowExecutionStateSampler sampler) {
if (isHeartbeat) {
ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder();
Optional<ActiveMessageMetadata> activeMessage =
sampler.getActiveMessageMetadataForWorkId(workId);
if (!activeMessage.isPresent()) {
return builder;
}
stepBuilder.setUserStepName(activeMessage.get().userStepName());
ActiveElementMetadata.Builder activeElementBuilder = ActiveElementMetadata.newBuilder();
activeElementBuilder.setProcessingTimeMillis(
System.currentTimeMillis() - activeMessage.get().startTime());
stepBuilder.setActiveMessageMetadata(activeElementBuilder);
builder.addActiveLatencyBreakdown(stepBuilder.build());
return builder;
}

Map<String, IntSummaryStatistics> processingDistributions =
sampler.getProcessingDistributionsForWorkId(workId);
for (Entry<String, IntSummaryStatistics> entry : processingDistributions.entrySet()) {
ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder();
stepBuilder.setUserStepName(entry.getKey());
Distribution.Builder distributionBuilder =
Distribution.newBuilder()
.setCount(entry.getValue().getCount())
.setMin(entry.getValue().getMin())
.setMax(entry.getValue().getMax())
.setMean((long) entry.getValue().getAverage())
.setSum(entry.getValue().getSum());
stepBuilder.setProcessingTimesDistribution(distributionBuilder.build());
builder.addActiveLatencyBreakdown(stepBuilder.build());
}
return builder;
}

@Override
public void run() {
processWorkFn.accept(this);
}

public Windmill.WorkItem getWorkItem() {
public WorkItem getWorkItem() {
return workItem;
}

Expand Down Expand Up @@ -143,11 +191,7 @@ public Instant getStateStartTime() {
}

public String getLatencyTrackingId() {
StringBuilder workIdBuilder = new StringBuilder(33);
workIdBuilder.append(Long.toHexString(workItem.getShardingKey()));
workIdBuilder.append('-');
workIdBuilder.append(Long.toHexString(workItem.getWorkToken()));
return workIdBuilder.toString();
return latencyTrackingId;
}

public WorkId id() {
Expand All @@ -163,7 +207,7 @@ private void recordGetWorkStreamLatencies(
}

public ImmutableList<LatencyAttribution> getLatencyAttributions(
boolean isHeartbeat, String workId, DataflowExecutionStateSampler sampler) {
boolean isHeartbeat, DataflowExecutionStateSampler sampler) {
List<Windmill.LatencyAttribution> list = new ArrayList<>();
for (Windmill.LatencyAttribution.State state : Windmill.LatencyAttribution.State.values()) {
Duration duration = totalDurationPerState.getOrDefault(state, Duration.ZERO);
Expand All @@ -175,7 +219,8 @@ public ImmutableList<LatencyAttribution> getLatencyAttributions(
}
LatencyAttribution.Builder laBuilder = Windmill.LatencyAttribution.newBuilder();
if (state == LatencyAttribution.State.ACTIVE) {
laBuilder = addActiveLatencyBreakdownToBuilder(isHeartbeat, laBuilder, workId, sampler);
laBuilder =
addActiveLatencyBreakdownToBuilder(isHeartbeat, laBuilder, latencyTrackingId, sampler);
}
Windmill.LatencyAttribution la =
laBuilder.setState(state).setTotalDurationMillis(duration.getMillis()).build();
Expand All @@ -184,45 +229,6 @@ public ImmutableList<LatencyAttribution> getLatencyAttributions(
return ImmutableList.copyOf(list);
}

private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(
boolean isHeartbeat,
LatencyAttribution.Builder builder,
String workId,
DataflowExecutionStateSampler sampler) {
if (isHeartbeat) {
ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder();
Optional<ActiveMessageMetadata> activeMessage =
sampler.getActiveMessageMetadataForWorkId(workId);
if (!activeMessage.isPresent()) {
return builder;
}
stepBuilder.setUserStepName(activeMessage.get().userStepName());
ActiveElementMetadata.Builder activeElementBuilder = ActiveElementMetadata.newBuilder();
activeElementBuilder.setProcessingTimeMillis(
System.currentTimeMillis() - activeMessage.get().startTime());
stepBuilder.setActiveMessageMetadata(activeElementBuilder);
builder.addActiveLatencyBreakdown(stepBuilder.build());
return builder;
}

Map<String, IntSummaryStatistics> processingDistributions =
sampler.getProcessingDistributionsForWorkId(workId);
for (Entry<String, IntSummaryStatistics> entry : processingDistributions.entrySet()) {
ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder();
stepBuilder.setUserStepName(entry.getKey());
Distribution.Builder distributionBuilder =
Distribution.newBuilder()
.setCount(entry.getValue().getCount())
.setMin(entry.getValue().getMin())
.setMax(entry.getValue().getMax())
.setMean((long) entry.getValue().getAverage())
.setSum(entry.getValue().getSum());
stepBuilder.setProcessingTimesDistribution(distributionBuilder.build());
builder.addActiveLatencyBreakdown(stepBuilder.build());
}
return builder;
}

public boolean isFailed() {
return isFailed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,40 @@ private static Stream<HeartbeatRequest> toHeartbeatRequestStream(
.setWorkToken(work.getWorkItem().getWorkToken())
.setCacheToken(work.getWorkItem().getCacheToken())
.addAllLatencyAttribution(
work.getLatencyAttributions(
/* isHeartbeat= */ true, work.getLatencyTrackingId(), sampler))
work.getLatencyAttributions(/* isHeartbeat= */ true, sampler))
.build());
}

private static Stream<DirectHeartbeatRequest> toHeartbeatRequestStreamDirectPath(
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue,
Instant refreshDeadline,
DataflowExecutionStateSampler sampler) {
ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey();
Deque<Work> workQueue = shardedKeyAndWorkQueue.getValue();

return workQueue.stream()
.filter(work -> work.getStartTime().isBefore(refreshDeadline))
.peek(
work -> {
if (work.getProcessWorkItemClient().getDataStream().isClosed()) {
work.setFailed();
}
})
// Don't send heartbeats for queued work we already know is failed.
.filter(work -> !work.isFailed())
.map(
work ->
DirectHeartbeatRequest.create(
work.getProcessWorkItemClient().getDataStream(),
Windmill.HeartbeatRequest.newBuilder()
.setShardingKey(shardedKey.shardingKey())
.setWorkToken(work.getWorkItem().getWorkToken())
.setCacheToken(work.getWorkItem().getCacheToken())
.addAllLatencyAttribution(
work.getLatencyAttributions(/* isHeartbeat= */ true, sampler))
.build()));
}

/**
* Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 4 {@link
* ActivateWorkResult}
Expand Down Expand Up @@ -339,37 +368,6 @@ synchronized ImmutableList<DirectHeartbeatRequest> getKeyHeartbeatsDirectPath(
.collect(toImmutableList());
}

private static Stream<DirectHeartbeatRequest> toHeartbeatRequestStreamDirectPath(
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue,
Instant refreshDeadline,
DataflowExecutionStateSampler sampler) {
ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey();
Deque<Work> workQueue = shardedKeyAndWorkQueue.getValue();

return workQueue.stream()
.filter(work -> work.getStartTime().isBefore(refreshDeadline))
.peek(
work -> {
if (work.getProcessWorkItemClient().getDataStream().isClosed()) {
work.setFailed();
}
})
// Don't send heartbeats for queued work we already know is failed.
.filter(work -> !work.isFailed())
.map(
work ->
DirectHeartbeatRequest.create(
work.getProcessWorkItemClient().getDataStream(),
Windmill.HeartbeatRequest.newBuilder()
.setShardingKey(shardedKey.shardingKey())
.setWorkToken(work.getWorkItem().getWorkToken())
.setCacheToken(work.getWorkItem().getCacheToken())
.addAllLatencyAttribution(
work.getLatencyAttributions(
/* isHeartbeat= */ true, work.getLatencyTrackingId(), sampler))
.build()));
}

/**
* Returns the current aggregate {@link GetWorkBudget} that is active on the user worker. Active
* means that the work is received from Windmill, being processed or queued to be processed in
Expand Down

0 comments on commit 87a043e

Please sign in to comment.