Skip to content

Commit

Permalink
Merge pull request #8971 from dustin12/ThrottlePiping
Browse files Browse the repository at this point in the history
[BEAM-7666] Throttle piping
  • Loading branch information
angoenka committed Jul 15, 2019
2 parents b60aef1 + beb62c1 commit 8001ad2
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public enum StreamingSystemCounterNames {
JAVA_HARNESS_USED_MEMORY("dataflow_java_harness_used_memory"),
JAVA_HARNESS_MAX_MEMORY("dataflow_java_harness_max_memory"),
JAVA_HARNESS_RESTARTS("dataflow_java_harness_restarts"),
WINDMILL_QUOTA_THROTTLING("dataflow_streaming_engine_throttled_msecs");
WINDMILL_QUOTA_THROTTLING("dataflow_streaming_engine_throttled_msecs"),
MEMORY_THRASHING("dataflow_streaming_engine_user_worker_thrashing");

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ public int getSize() {
private final Counter<Long, Long> javaHarnessUsedMemory;
private final Counter<Long, Long> javaHarnessMaxMemory;
private final Counter<Integer, Integer> windmillMaxObservedWorkItemCommitBytes;
private final Counter<Integer, Integer> memoryThrashing;
private Timer refreshActiveWorkTimer;
private Timer statusPageTimer;

Expand Down Expand Up @@ -593,6 +594,9 @@ public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions(
this.windmillMaxObservedWorkItemCommitBytes =
pendingCumulativeCounters.intMax(
StreamingSystemCounterNames.WINDMILL_MAX_WORK_ITEM_COMMIT_BYTES.counterName());
this.memoryThrashing =
pendingCumulativeCounters.intSum(
StreamingSystemCounterNames.MEMORY_THRASHING.counterName());
this.isDoneFuture = new CompletableFuture<>();

this.threadFactory =
Expand Down Expand Up @@ -1851,6 +1855,9 @@ private void sendWorkerUpdatesToDataflowService(

// Throttle time is tracked by the windmillServer but is reported to DFE here.
windmillQuotaThrottling.addValue(windmillServer.getAndResetThrottleTime());
if (memoryMonitor.isThrashing()) {
memoryThrashing.addValue(1);
}

List<CounterUpdate> counterUpdates = new ArrayList<>(128);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ public void stop() {
}
}

public boolean isThrashing() {
return isThrashing.get();
}

/**
* Check if we've observed high gc workload in sufficient sample periods to justify classifying
* the server as in gc thrashing.
Expand Down

0 comments on commit 8001ad2

Please sign in to comment.