@@ -37,6 +37,9 @@
import com .google .api .core .BetaApi ;
import com .google .api .core .InternalApi ;
import com .google .api .core .SettableApiFuture ;
import com .google .api .gax .batching .FlowController .FlowControlException ;
import com .google .api .gax .batching .FlowController .FlowControlRuntimeException ;
import com .google .api .gax .batching .FlowController .LimitExceededBehavior ;
import com .google .api .gax .rpc .UnaryCallable ;
import com .google .common .annotations .VisibleForTesting ;
import com .google .common .base .Preconditions ;
@@ -55,6 +58,7 @@
import java .util .concurrent .atomic .AtomicInteger ;
import java .util .logging .Level ;
import java .util .logging .Logger ;
import javax .annotation .Nullable ;
/**
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
@@ -87,13 +91,14 @@
private final Future <?> scheduledFuture ;
private volatile boolean isClosed = false ;
private final BatcherStats batcherStats = new BatcherStats ();
private final FlowController flowController ;
/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response.
* @param unaryCallable a {@link UnaryCallable} object.
* @param prototype a {@link RequestT} object.
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds.
* into wrappers request and response
* @param unaryCallable a {@link UnaryCallable} object
* @param prototype a {@link RequestT} object
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
*/
public BatcherImpl (
BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > batchingDescriptor ,
@@ -102,15 +107,56 @@ public BatcherImpl(
BatchingSettings batchingSettings ,
ScheduledExecutorService executor ) {
this (batchingDescriptor , unaryCallable , prototype , batchingSettings , executor , null );
}
/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
* @param unaryCallable a {@link UnaryCallable} object
* @param prototype a {@link RequestT} object
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
*/
public BatcherImpl (
BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > batchingDescriptor ,
UnaryCallable <RequestT , ResponseT > unaryCallable ,
RequestT prototype ,
BatchingSettings batchingSettings ,
ScheduledExecutorService executor ,
@ Nullable FlowController flowController ) {
this .batchingDescriptor =
Preconditions .checkNotNull (batchingDescriptor , "batching descriptor cannot be null" );
this .unaryCallable = Preconditions .checkNotNull (unaryCallable , "callable cannot be null" );
this .prototype = Preconditions .checkNotNull (prototype , "request prototype cannot be null" );
this .batchingSettings =
Preconditions .checkNotNull (batchingSettings , "batching setting cannot be null" );
Preconditions .checkNotNull (executor , "executor cannot be null" );
if (flowController == null ) {
flowController = new FlowController (batchingSettings .getFlowControlSettings ());
}
// If throttling is enabled, make sure flow control limits are greater or equal to batch sizes
// to avoid deadlocking
if (flowController .getLimitExceededBehavior () != LimitExceededBehavior .Ignore ) {
Preconditions .checkArgument (
flowController .getMaxOutstandingElementCount () == null
|| batchingSettings .getElementCountThreshold () == null
|| flowController .getMaxOutstandingElementCount ()
>= batchingSettings .getElementCountThreshold (),
"If throttling and batching on element count are enabled, FlowController"
+ "#maxOutstandingElementCount must be greater or equal to elementCountThreshold" );
Preconditions .checkArgument (
flowController .getMaxOutstandingRequestBytes () == null
|| batchingSettings .getRequestByteThreshold () == null
|| flowController .getMaxOutstandingRequestBytes ()
>= batchingSettings .getRequestByteThreshold (),
"If throttling and batching on request bytes are enabled, FlowController"
+ "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold" );
}
this .flowController = flowController ;
currentOpenBatch = new Batch <>(prototype , batchingDescriptor , batchingSettings , batcherStats );
if (batchingSettings .getDelayThreshold () != null ) {
long delay = batchingSettings .getDelayThreshold ().toMillis ();
PushCurrentBatchRunnable <ElementT , ElementResultT , RequestT , ResponseT > runnable =
@@ -127,8 +173,29 @@ public BatcherImpl(
@ Override
public ApiFuture <ElementResultT > add (ElementT element ) {
Preconditions .checkState (!isClosed , "Cannot add elements on a closed batcher" );
SettableApiFuture <ElementResultT > result = SettableApiFuture .create ();
// This is not the optimal way of throttling. It does not send out partial batches, which
// means that the Batcher might not use up all the resources allowed by FlowController.
// The more efficient implementation should look like:
// if (!flowController.tryReserve(1, bytes)) {
// sendOutstanding();
// reserve(1, bytes);
// }
// where tryReserve() will return false if there isn't enough resources, or reserve and return
// true.
// However, with the current FlowController implementation, adding a tryReserve() could be
// confusing. FlowController will end up having 3 different reserve behaviors: blocking,
// non blocking and try reserve. And we'll also need to add a tryAcquire() to the Semaphore64
// class, which made it seem unnecessary to have blocking and non-blocking semaphore
// implementations. Some refactoring may be needed for the optimized implementation. So we'll
// defer it till we decide on if refactoring FlowController is necessary.
try {
flowController .reserve (1 , batchingDescriptor .countBytes (element ));
} catch (FlowControlException e ) {
// This exception will only be thrown if the FlowController is set to ThrowException behavior
throw FlowControlRuntimeException .fromFlowControlException (e );
}
SettableApiFuture <ElementResultT > result = SettableApiFuture .create ();
synchronized (elementLock ) {
currentOpenBatch .add (element , result );
}
@@ -169,6 +236,7 @@ public void sendOutstanding() {
@ Override
public void onSuccess (ResponseT response ) {
try {
flowController .release (accumulatedBatch .elementCounter , accumulatedBatch .byteCounter );
accumulatedBatch .onBatchSuccess (response );
} finally {
onBatchCompletion ();
@@ -178,6 +246,7 @@ public void onSuccess(ResponseT response) {
@ Override
public void onFailure (Throwable throwable ) {
try {
flowController .release (accumulatedBatch .elementCounter , accumulatedBatch .byteCounter );
accumulatedBatch .onBatchFailure (throwable );
} finally {
onBatchCompletion ();
@@ -224,6 +293,12 @@ public void close() throws InterruptedException {
}
}
/** Package-private for use in testing. */
@ VisibleForTesting
FlowController getFlowController () {
return flowController ;
}
/**
* This class represent one logical Batch. It accumulates all the elements and their corresponding
* future results for one batch.