@@ -40,9 +40,11 @@
import com .google .api .gax .batching .FlowController .FlowControlException ;
import com .google .api .gax .batching .FlowController .FlowControlRuntimeException ;
import com .google .api .gax .batching .FlowController .LimitExceededBehavior ;
import com .google .api .gax .rpc .ApiCallContext ;
import com .google .api .gax .rpc .UnaryCallable ;
import com .google .common .annotations .VisibleForTesting ;
import com .google .common .base .Preconditions ;
import com .google .common .base .Stopwatch ;
import com .google .common .util .concurrent .Futures ;
import java .lang .ref .Reference ;
import java .lang .ref .ReferenceQueue ;
@@ -93,22 +95,26 @@
private SettableApiFuture <Void > closeFuture ;
private final BatcherStats batcherStats = new BatcherStats ();
private final FlowController flowController ;
private final ApiCallContext callContext ;
/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
* @param unaryCallable a {@link UnaryCallable} object
* @param prototype a {@link RequestT} object
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @deprecated Please instantiate the Batcher with {@link FlowController} and {@link
* ApiCallContext}
*/
@ Deprecated
public BatcherImpl (
BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > batchingDescriptor ,
UnaryCallable <RequestT , ResponseT > unaryCallable ,
RequestT prototype ,
BatchingSettings batchingSettings ,
ScheduledExecutorService executor ) {
this (batchingDescriptor , unaryCallable , prototype , batchingSettings , executor , null );
this (batchingDescriptor , unaryCallable , prototype , batchingSettings , executor , null , null );
}
/**
@@ -119,7 +125,9 @@ public BatcherImpl(
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
* @deprecated Please instantiate the Batcher with {@link ApiCallContext}
*/
@ Deprecated
public BatcherImpl (
BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > batchingDescriptor ,
UnaryCallable <RequestT , ResponseT > unaryCallable ,
@@ -128,6 +136,35 @@ public BatcherImpl(
ScheduledExecutorService executor ,
@ Nullable FlowController flowController ) {
this (
batchingDescriptor ,
unaryCallable ,
prototype ,
batchingSettings ,
executor ,
flowController ,
null );
}
/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
* @param unaryCallable a {@link UnaryCallable} object
* @param prototype a {@link RequestT} object
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
* @param callContext a {@link ApiCallContext} object that'll be merged in unaryCallable
*/
public BatcherImpl (
BatchingDescriptor <ElementT , ElementResultT , RequestT , ResponseT > batchingDescriptor ,
UnaryCallable <RequestT , ResponseT > unaryCallable ,
RequestT prototype ,
BatchingSettings batchingSettings ,
ScheduledExecutorService executor ,
@ Nullable FlowController flowController ,
@ Nullable ApiCallContext callContext ) {
this .batchingDescriptor =
Preconditions .checkNotNull (batchingDescriptor , "batching descriptor cannot be null" );
this .unaryCallable = Preconditions .checkNotNull (unaryCallable , "callable cannot be null" );
@@ -168,6 +205,7 @@ public BatcherImpl(
scheduledFuture = Futures .immediateCancelledFuture ();
}
currentBatcherReference = new BatcherReference (this );
this .callContext = callContext ;
}
/** {@inheritDoc} */
@@ -192,16 +230,18 @@ public ApiFuture<ElementResultT> add(ElementT element) {
// class, which made it seem unnecessary to have blocking and non-blocking semaphore
// implementations. Some refactoring may be needed for the optimized implementation. So we'll
// defer it till we decide on if refactoring FlowController is necessary.
Stopwatch stopwatch = Stopwatch .createStarted ();
try {
flowController .reserve (1 , batchingDescriptor .countBytes (element ));
} catch (FlowControlException e ) {
// This exception will only be thrown if the FlowController is set to ThrowException behavior
throw FlowControlRuntimeException .fromFlowControlException (e );
}
long throttledTimeMs = stopwatch .elapsed (TimeUnit .MILLISECONDS );
SettableApiFuture <ElementResultT > result = SettableApiFuture .create ();
synchronized (elementLock ) {
currentOpenBatch .add (element , result );
currentOpenBatch .add (element , result , throttledTimeMs );
}
if (currentOpenBatch .hasAnyThresholdReached ()) {
@@ -230,8 +270,14 @@ public void sendOutstanding() {
currentOpenBatch = new Batch <>(prototype , batchingDescriptor , batchingSettings , batcherStats );
}
// This check is for old clients that instantiated the batcher without ApiCallContext
ApiCallContext callContextWithOption = null ;
if (callContext != null ) {
callContextWithOption =
callContext .withOption (THROTTLED_TIME_KEY , accumulatedBatch .totalThrottledTimeMs );
}
final ApiFuture <ResponseT > batchResponse =
unaryCallable .futureCall (accumulatedBatch .builder .build ());
unaryCallable .futureCall (accumulatedBatch .builder .build (), callContextWithOption );
numOfOutstandingBatches .incrementAndGet ();
ApiFutures .addCallback (
@@ -367,6 +413,7 @@ public FlowController getFlowController() {
private long elementCounter = 0 ;
private long byteCounter = 0 ;
private long totalThrottledTimeMs = 0 ;
private Batch (
RequestT prototype ,
@@ -383,11 +430,12 @@ private Batch(
this .batcherStats = batcherStats ;
}
void add (ElementT element , SettableApiFuture <ElementResultT > result ) {
void add (ElementT element , SettableApiFuture <ElementResultT > result , long throttledTimeMs ) {
builder .add (element );
entries .add (BatchEntry .create (element , result ));
elementCounter ++;
byteCounter += descriptor .countBytes (element );
totalThrottledTimeMs += throttledTimeMs ;
}
void onBatchSuccess (ResponseT response ) {