Skip to content
Permalink
Browse files
feat: dynamic flow control p3: add FlowControllerEventStats (#1332)
* feat: dynamic flow control p3: add FlowControllerEventStats

* add some more tests

* add more documentation

* fix the formatting

* fix the failed test

* Update comments

* updates based on review

* missed one review

* remove the flaky test

* deflake test

* add some comments

Co-authored-by: Igor Bernstein <igorbernstein@google.com>
  • Loading branch information
mutianf and igorbernstein2 committed Apr 5, 2021
1 parent 51c40ab commit 5329ea43c024ca14cea1012c5ab46e694e199492
@@ -293,9 +293,8 @@ public void close() throws InterruptedException {
}
}

/** Package-private for use in testing. */
@VisibleForTesting
FlowController getFlowController() {
@InternalApi("For google-cloud-java client use only")
public FlowController getFlowController() {
return flowController;
}

@@ -0,0 +1,144 @@
/*
* Copyright 2021 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import static com.google.api.gax.batching.FlowController.LimitExceededBehavior;

import com.google.api.core.InternalApi;
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
* Record the statistics of flow control events.
*
* <p>This class is populated by FlowController, which will record throttling events. Currently it
* only keeps the last flow control event, but it could be expanded to record more information in
* the future. The events can be used to dynamically adjust concurrency in the client. For example:
*
* <pre>{@code
* // Increase flow control limits if there was throttling in the past 5 minutes and throttled time
* // was longer than 1 minute.
* while(true) {
* FlowControlEvent event = flowControlEventStats.getLastFlowControlEvent();
* if (event != null
* && event.getTimestampMs() > System.currentMillis() - TimeUnit.MINUTES.toMillis(5)
* && event.getThrottledTimeInMs() > TimeUnit.MINUTES.toMillis(1)) {
* flowController.increaseThresholds(elementSteps, byteSteps);
* }
* Thread.sleep(TimeUnit.MINUTE.toMillis(10));
* }
* }</pre>
*/
@InternalApi("For google-cloud-java client use only")
public class FlowControlEventStats {

private volatile FlowControlEvent lastFlowControlEvent;

// We only need the last event to check if there was throttling in the past X minutes so this
// doesn't need to be super accurate.
void recordFlowControlEvent(FlowControlEvent event) {
if (lastFlowControlEvent == null || event.compareTo(lastFlowControlEvent) > 0) {
lastFlowControlEvent = event;
}
}

public FlowControlEvent getLastFlowControlEvent() {
return lastFlowControlEvent;
}

/**
* A flow control event. Record throttled time if {@link LimitExceededBehavior} is {@link
* LimitExceededBehavior#Block}, or the exception if the behavior is {@link
* LimitExceededBehavior#ThrowException}.
*/
public static class FlowControlEvent implements Comparable<FlowControlEvent> {
static FlowControlEvent createReserveDelayed(long throttledTimeInMs) {
return createReserveDelayed(System.currentTimeMillis(), throttledTimeInMs);
}

static FlowControlEvent createReserveDenied(FlowControlException exception) {
return createReserveDenied(System.currentTimeMillis(), exception);
}

/** Package-private for use in testing. */
@VisibleForTesting
static FlowControlEvent createReserveDelayed(long timestampMs, long throttledTimeInMs) {
Preconditions.checkArgument(timestampMs > 0, "timestamp must be greater than 0");
Preconditions.checkArgument(throttledTimeInMs > 0, "throttled time must be greater than 0");
return new FlowControlEvent(timestampMs, throttledTimeInMs, null);
}

/** Package-private for use in testing. */
@VisibleForTesting
static FlowControlEvent createReserveDenied(long timestampMs, FlowControlException exception) {
Preconditions.checkArgument(timestampMs > 0, "timestamp must be greater than 0");
Preconditions.checkNotNull(
exception, "FlowControlException can't be null when reserve is denied");
return new FlowControlEvent(timestampMs, null, exception);
}

private long timestampMs;
private Long throttledTimeMs;
private FlowControlException exception;

private FlowControlEvent(
long timestampMs,
@Nullable Long throttledTimeMs,
@Nullable FlowControlException exception) {
this.timestampMs = timestampMs;
this.throttledTimeMs = throttledTimeMs;
this.exception = exception;
}

public long getTimestampMs() {
return timestampMs;
}

@Nullable
public FlowControlException getException() {
return exception;
}

@Nullable
public Long getThrottledTime(TimeUnit timeUnit) {
return throttledTimeMs == null
? null
: timeUnit.convert(throttledTimeMs, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(FlowControlEvent otherEvent) {
return Long.compare(this.getTimestampMs(), otherEvent.getTimestampMs());
}
}
}
@@ -31,7 +31,10 @@

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/** Provides flow control capability. */
@@ -149,6 +152,11 @@ public enum LimitExceededBehavior {
private final LimitExceededBehavior limitExceededBehavior;
private final Object updateLimitLock;

// Threshold to record throttling events. If reserve() takes longer than this threshold, it will
// be recorded as a throttling event.
private static final long RESERVE_FLOW_CONTROL_THRESHOLD_MS = 1;
private final FlowControlEventStats flowControlEventStats;

public FlowController(FlowControlSettings settings) {
// When the FlowController is initialized with FlowControlSettings, flow control limits can't be
// adjusted. min, current, max element count and request bytes are initialized with the max
@@ -160,6 +168,7 @@ public FlowController(FlowControlSettings settings) {
public FlowController(DynamicFlowControlSettings settings) {
this.limitExceededBehavior = settings.getLimitExceededBehavior();
this.updateLimitLock = new Object();
this.flowControlEventStats = new FlowControlEventStats();
switch (settings.getLimitExceededBehavior()) {
case ThrowException:
case Block:
@@ -204,10 +213,15 @@ public void reserve(long elements, long bytes) throws FlowControlException {
Preconditions.checkArgument(elements >= 0);
Preconditions.checkArgument(bytes >= 0);

Stopwatch stopwatch = Stopwatch.createStarted();
if (outstandingElementCount != null) {
if (!outstandingElementCount.acquire(elements)) {
throw new MaxOutstandingElementCountReachedException(
outstandingElementCount.getPermitLimit());
MaxOutstandingElementCountReachedException exception =
new MaxOutstandingElementCountReachedException(
outstandingElementCount.getPermitLimit());
flowControlEventStats.recordFlowControlEvent(
FlowControlEvent.createReserveDenied(exception));
throw exception;
}
}

@@ -218,9 +232,17 @@ public void reserve(long elements, long bytes) throws FlowControlException {
if (outstandingElementCount != null) {
outstandingElementCount.release(elements);
}
throw new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit());
MaxOutstandingRequestBytesReachedException exception =
new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit());
flowControlEventStats.recordFlowControlEvent(
FlowControlEvent.createReserveDenied(exception));
throw exception;
}
}
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsed >= RESERVE_FLOW_CONTROL_THRESHOLD_MS) {
flowControlEventStats.recordFlowControlEvent(FlowControlEvent.createReserveDelayed(elapsed));
}
}

public void release(long elements, long bytes) {
@@ -333,4 +355,9 @@ public Long getCurrentElementCountLimit() {
public Long getCurrentRequestBytesLimit() {
return outstandingByteCount == null ? null : outstandingByteCount.getPermitLimit();
}

@InternalApi("For google-cloud-java client use only")
public FlowControlEventStats getFlowControlEventStats() {
return flowControlEventStats;
}
}
@@ -0,0 +1,101 @@
/*
* Copyright 2021 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
import com.google.api.gax.batching.FlowController.MaxOutstandingRequestBytesReachedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class FlowControlEventStatsTest {

@Test
public void testCreateEvent() {
long timestamp = 12345, throttledTimeMs = 5000;
FlowControlEvent event = FlowControlEvent.createReserveDelayed(timestamp, throttledTimeMs);
assertEquals(event.getTimestampMs(), event.getTimestampMs());
assertEquals(throttledTimeMs / 1000, event.getThrottledTime(TimeUnit.SECONDS).longValue());
assertNull(event.getException());

MaxOutstandingRequestBytesReachedException exception =
new MaxOutstandingRequestBytesReachedException(100);
event = FlowControlEvent.createReserveDenied(timestamp, exception);
assertEquals(timestamp, event.getTimestampMs());
assertNotNull(event.getException());
assertEquals(exception, event.getException());
assertNull(event.getThrottledTime(TimeUnit.MILLISECONDS));

try {
event = FlowControlEvent.createReserveDenied(null);
fail("FlowControlEvent did not throw exception");
} catch (NullPointerException e) {
// expected, ignore
}
}

@Test
public void testGetLastEvent() throws InterruptedException {
final FlowControlEventStats stats = new FlowControlEventStats();
final long currentTime = System.currentTimeMillis();

List<Thread> threads = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
final int timeElapsed = i;
Thread t =
new Thread() {
@Override
public void run() {
stats.recordFlowControlEvent(
FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
}
};
threads.add(t);
t.start();
}

for (Thread t : threads) {
t.join(10);
}

assertEquals(currentTime + 10, stats.getLastFlowControlEvent().getTimestampMs());
assertEquals(
10, stats.getLastFlowControlEvent().getThrottledTime(TimeUnit.MILLISECONDS).longValue());
}
}

0 comments on commit 5329ea4

Please sign in to comment.