Skip to content
Permalink
Browse files
feat: dynamic flow control for batcher part 2 (#1310)
* feat: dynamic flow control for batcher part 2

* add test on construtor for code coverage

* Update comments based on the review

* missed some error messages in test

* Fix possible deadlock

* fix comment

* update comments

* update naming

* removing setThreshold because there's no use case for it

* no need to use 2 locks on semaphore

* fix comment

* fix test name

* make nonblocking semaphore not blocking

* move limit to semaphore class

* add an addPermits method

* fixing naming, don't throw on overflow and add some tests
  • Loading branch information
mutianf committed Mar 22, 2021
1 parent 7f7aa25 commit 20f6ecf4807bb4dffd0fa80717302c1f46b1d789
@@ -141,16 +141,16 @@ public BatcherImpl(
// to avoid deadlocking
if (flowController.getLimitExceededBehavior() != LimitExceededBehavior.Ignore) {
Preconditions.checkArgument(
flowController.getMaxOutstandingElementCount() == null
flowController.getMaxElementCountLimit() == null
|| batchingSettings.getElementCountThreshold() == null
|| flowController.getMaxOutstandingElementCount()
|| flowController.getMaxElementCountLimit()
>= batchingSettings.getElementCountThreshold(),
"If throttling and batching on element count are enabled, FlowController"
+ "#maxOutstandingElementCount must be greater or equal to elementCountThreshold");
Preconditions.checkArgument(
flowController.getMaxOutstandingRequestBytes() == null
flowController.getMaxRequestBytesLimit() == null
|| batchingSettings.getRequestByteThreshold() == null
|| flowController.getMaxOutstandingRequestBytes()
|| flowController.getMaxRequestBytesLimit()
>= batchingSettings.getRequestByteThreshold(),
"If throttling and batching on request bytes are enabled, FlowController"
+ "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");
@@ -34,40 +34,90 @@

/** A {@link Semaphore64} that blocks until permits become available. */
class BlockingSemaphore implements Semaphore64 {
private long currentPermits;
private long availablePermits;
private long limit;

private static void checkNotNegative(long l) {
Preconditions.checkArgument(l >= 0, "negative permits not allowed: %s", l);
}

BlockingSemaphore(long permits) {
checkNotNegative(permits);
this.currentPermits = permits;
this.availablePermits = permits;
this.limit = permits;
}

@Override
public synchronized void release(long permits) {
checkNotNegative(permits);

currentPermits += permits;
// TODO: throw exceptions when the permits overflow
availablePermits = Math.min(availablePermits + permits, limit);
notifyAll();
}

@Override
public synchronized boolean acquire(long permits) {
checkNotNegative(permits);

boolean interrupted = false;
while (currentPermits < permits) {
while (availablePermits < permits) {
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
}
}
// TODO: if thread is interrupted, we should not grant the permits
availablePermits -= permits;

if (interrupted) {
Thread.currentThread().interrupt();
}
return true;
}

@Override
public synchronized boolean acquirePartial(long permits) {
checkNotNegative(permits);

boolean interrupted = false;
// To allow individual oversized requests to be sent, clamp the requested permits to the maximum
// limit. This will allow individual large requests to be sent. Please note that this behavior
// will result in availablePermits going negative.
while (availablePermits < Math.min(limit, permits)) {
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
}
}
currentPermits -= permits;

if (interrupted) {
Thread.currentThread().interrupt();
}

availablePermits -= permits;
return true;
}

@Override
public synchronized void increasePermitLimit(long permits) {
checkNotNegative(permits);
availablePermits += permits;
limit += permits;
notifyAll();
}

@Override
public synchronized void reducePermitLimit(long reduction) {
checkNotNegative(reduction);
Preconditions.checkState(limit - reduction > 0, "permit limit underflow");
availablePermits -= reduction;
limit -= reduction;
}

@Override
public synchronized long getPermitLimit() {
return limit;
}
}
@@ -0,0 +1,169 @@
/*
* Copyright 2021 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.core.InternalApi;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;

/** Settings for dynamic flow control */
@AutoValue
@InternalApi("For google-cloud-java client use only")
public abstract class DynamicFlowControlSettings {

/** Number of outstanding elements that {@link FlowController} allows when it's initiated. */
@Nullable
public abstract Long getInitialOutstandingElementCount();

/** Number of outstanding bytes that {@link FlowController} allows when it's initiated. */
@Nullable
public abstract Long getInitialOutstandingRequestBytes();

/**
* Maximum number of outstanding elements {@link FlowController} allows before enforcing flow
* control.
*/
@Nullable
public abstract Long getMaxOutstandingElementCount();

/**
* Maximum number of outstanding bytes {@link FlowController} allows before enforcing flow
* control.
*/
@Nullable
public abstract Long getMaxOutstandingRequestBytes();

/**
* Minimum number of outstanding elements {@link FlowController} allows before enforcing flow
* control.
*/
@Nullable
public abstract Long getMinOutstandingElementCount();

/**
* Minimum number of outstanding bytes {@link FlowController} allows before enforcing flow
* control.
*/
@Nullable
public abstract Long getMinOutstandingRequestBytes();

/** @see FlowControlSettings#getLimitExceededBehavior() */
public abstract LimitExceededBehavior getLimitExceededBehavior();

public abstract Builder toBuilder();

public static Builder newBuilder() {
return new AutoValue_DynamicFlowControlSettings.Builder()
.setLimitExceededBehavior(LimitExceededBehavior.Block);
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setInitialOutstandingElementCount(Long value);

public abstract Builder setInitialOutstandingRequestBytes(Long value);

public abstract Builder setMaxOutstandingElementCount(Long value);

public abstract Builder setMaxOutstandingRequestBytes(Long value);

public abstract Builder setMinOutstandingElementCount(Long value);

public abstract Builder setMinOutstandingRequestBytes(Long value);

public abstract Builder setLimitExceededBehavior(LimitExceededBehavior value);

abstract DynamicFlowControlSettings autoBuild();

public DynamicFlowControlSettings build() {
DynamicFlowControlSettings settings = autoBuild();

verifyElementCountSettings(settings);
verifyRequestBytesSettings(settings);

return settings;
}

private void verifyElementCountSettings(DynamicFlowControlSettings settings) {
boolean isEnabled =
settings.getInitialOutstandingElementCount() != null
|| settings.getMinOutstandingElementCount() != null
|| settings.getMaxOutstandingElementCount() != null;
if (!isEnabled) {
return;
}
Preconditions.checkState(
settings.getInitialOutstandingElementCount() != null
&& settings.getMinOutstandingElementCount() != null
&& settings.getMaxOutstandingElementCount() != null,
"Throttling on element count is disabled by default. To enable this setting,"
+ " minOutstandingElementCount, initialOutstandingElementCount, and "
+ "maxOutstandingElementCount must all be set.");
Preconditions.checkState(
settings.getMinOutstandingElementCount() > 0
&& settings.getInitialOutstandingElementCount()
<= settings.getMaxOutstandingElementCount()
&& settings.getInitialOutstandingElementCount()
>= settings.getMinOutstandingElementCount(),
"If throttling on element count is set, minOutstandingElementCount must be"
+ " greater than 0, and minOutstandingElementCount <= "
+ "initialOutstandingElementCount <= maxOutstandingElementCount");
}

private void verifyRequestBytesSettings(DynamicFlowControlSettings settings) {
boolean isEnabled =
settings.getInitialOutstandingRequestBytes() != null
|| settings.getMinOutstandingRequestBytes() != null
|| settings.getMaxOutstandingRequestBytes() != null;
if (!isEnabled) {
return;
}
Preconditions.checkState(
settings.getInitialOutstandingRequestBytes() != null
&& settings.getMinOutstandingRequestBytes() != null
&& settings.getMaxOutstandingRequestBytes() != null,
"Throttling on number of bytes is disabled by default. To enable this "
+ "setting, minOutstandingRequestBytes, initialOutstandingRequestBytes, and "
+ "maxOutstandingRequestBytes must all be set");
Preconditions.checkState(
settings.getMinOutstandingRequestBytes() > 0
&& settings.getInitialOutstandingRequestBytes()
<= settings.getMaxOutstandingRequestBytes()
&& settings.getInitialOutstandingRequestBytes()
>= settings.getMinOutstandingRequestBytes(),
"If throttling on number of bytes is set, minOutstandingRequestBytes must "
+ "be greater than 0, and minOutstandingRequestBytes <= "
+ "initialOutstandingRequestBytes <= maxOutstandingRequestBytes");
}
}
}

0 comments on commit 20f6ecf

Please sign in to comment.