Skip to content

Commit

Permalink
aggregate grpc queued and batch flow controlled latencies
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Apr 18, 2023
1 parent 5a081a5 commit 2cc55a1
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,10 @@ public void putGfeMissingHeaders(long connectivityErrors) {
attemptMeasureMap.put(BuiltinMeasureConstants.CONNECTIVITY_ERROR_COUNT, connectivityErrors);
}

public void putBatchRequestThrottled(long throttledTimeMs) {
public void putClientBlockingLatencies(long throttledTimeMs) {
operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs);
}

public void putGrpcChannelQueuedLatencies(long blockedTime) {
attemptMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, blockedTime);
}

private TagContextBuilder newTagContextBuilder(String tableId, String zone, String cluster) {
TagContextBuilder tagContextBuilder =
tagger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public void testStreamingOperation() throws InterruptedException {
recorderWrapper.putGfeLatencies(serverLatency);
recorderWrapper.putGfeMissingHeaders(connectivityErrorCount);
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
recorderWrapper.putBatchRequestThrottled(throttlingLatency);
recorderWrapper.putGrpcChannelQueuedLatencies(throttlingLatency);
recorderWrapper.putClientBlockingLatencies(throttlingLatency);

recorderWrapper.recordOperation("OK", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("OK", TABLE_ID, ZONE, CLUSTER);
Expand Down Expand Up @@ -291,8 +290,7 @@ public void testUnaryOperations() throws InterruptedException {
recorderWrapper.putGfeLatencies(serverLatency);
recorderWrapper.putGfeMissingHeaders(connectivityErrorCount);
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
recorderWrapper.putBatchRequestThrottled(throttlingLatency);
recorderWrapper.putGrpcChannelQueuedLatencies(throttlingLatency);
recorderWrapper.putClientBlockingLatencies(throttlingLatency);

recorderWrapper.recordOperation("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class BuiltinMetricsTracer extends BigtableTracer {
private String zone = "global";
private String cluster = "unspecified";

private AtomicLong totalClientBlockingTime = new AtomicLong(0);

@VisibleForTesting
BuiltinMetricsTracer(
OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) {
Expand Down Expand Up @@ -219,12 +221,12 @@ public void setLocations(String zone, String cluster) {

@Override
public void batchRequestThrottled(long throttledTimeMs) {
recorder.putBatchRequestThrottled(throttledTimeMs);
totalClientBlockingTime.addAndGet(throttledTimeMs);
}

@Override
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
recorder.putGrpcChannelQueuedLatencies(queuedTimeMs);
totalClientBlockingTime.addAndGet(queuedTimeMs);
}

@Override
Expand Down Expand Up @@ -271,6 +273,8 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
}
}

recorder.putClientBlockingLatencies(totalClientBlockingTime.get());

// Patch the status until it's fixed in gax. When an attempt failed,
// it'll throw a ServerStreamingAttemptException. Unwrap the exception
// so it could get processed by extractStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ public void testBatchBlockingLatencies() throws InterruptedException {

int expectedNumRequests = 6 / batchElementCount;
ArgumentCaptor<Long> throttledTime = ArgumentCaptor.forClass(Long.class);
verify(statsRecorderWrapper, times(expectedNumRequests))
.putBatchRequestThrottled(throttledTime.capture());
verify(statsRecorderWrapper, timeout(1000).times(expectedNumRequests))
.putClientBlockingLatencies(throttledTime.capture());

// Adding the first 2 elements should not get throttled since the batch is empty
assertThat(throttledTime.getAllValues().get(0)).isEqualTo(0);
Expand All @@ -493,7 +493,7 @@ public void testBatchBlockingLatencies() throws InterruptedException {
}

@Test
public void testBlockedOnChannelServerStreamLatencies() throws InterruptedException {
public void testQueuedOnChannelServerStreamLatencies() throws InterruptedException {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
Expand All @@ -505,14 +505,14 @@ public void testBlockedOnChannelServerStreamLatencies() throws InterruptedExcept

ArgumentCaptor<Long> blockedTime = ArgumentCaptor.forClass(Long.class);

verify(statsRecorderWrapper, times(fakeService.attemptCounter.get()))
.putGrpcChannelQueuedLatencies(blockedTime.capture());
verify(statsRecorderWrapper, timeout(1000).times(fakeService.attemptCounter.get()))
.putClientBlockingLatencies(blockedTime.capture());

assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY);
}

@Test
public void testBlockedOnChannelUnaryLatencies() throws InterruptedException {
public void testQueuedOnChannelUnaryLatencies() throws InterruptedException {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
Expand All @@ -521,8 +521,8 @@ public void testBlockedOnChannelUnaryLatencies() throws InterruptedException {

ArgumentCaptor<Long> blockedTime = ArgumentCaptor.forClass(Long.class);

verify(statsRecorderWrapper, times(fakeService.attemptCounter.get()))
.putGrpcChannelQueuedLatencies(blockedTime.capture());
verify(statsRecorderWrapper, timeout(1000).times(fakeService.attemptCounter.get()))
.putClientBlockingLatencies(blockedTime.capture());

assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY);
assertThat(blockedTime.getAllValues().get(2)).isAtLeast(CHANNEL_BLOCKING_LATENCY);
Expand Down

0 comments on commit 2cc55a1

Please sign in to comment.