Skip to content

Commit

Permalink
fix: aggregate batching throttling latency per attempt and reset it b…
Browse files Browse the repository at this point in the history
…etween (#1905)

* fix: aggregate batching throttling latency per attempt and reset it between

This should improve reporting of latency when bulk mutation throttling is enabled. Also:
- fix tests to properly close the batcher
- simplify tests to avoid unnecessary mocking
- improve test failure messaging

Change-Id: I53748c5e54ebbbe2a896f8ea0ce6c39a8f5fa297

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
igorbernstein2 and gcf-owl-bot[bot] committed Sep 11, 2023
1 parent f4fe6a0 commit e6cc5f6
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 61 deletions.
Expand Up @@ -273,7 +273,8 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
}
}

recorder.putClientBlockingLatencies(totalClientBlockingTime.get());
// Make sure to reset the blocking time after recording it for the next attempt
recorder.putClientBlockingLatencies(totalClientBlockingTime.getAndSet(0));

// Patch the status until it's fixed in gax. When an attempt failed,
// it'll throw a ServerStreamingAttemptException. Unwrap the exception
Expand Down
Expand Up @@ -59,6 +59,9 @@ class MetricsTracer extends BigtableTracer {

private volatile int attempt = 0;

private volatile boolean reportBatchingLatency = false;
private volatile long batchThrottledLatency = 0;

MetricsTracer(
OperationType operationType,
Tagger tagger,
Expand Down Expand Up @@ -167,6 +170,14 @@ private void recordAttemptCompletion(@Nullable Throwable throwable) {
RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY,
attemptTimer.elapsed(TimeUnit.MILLISECONDS));

if (reportBatchingLatency) {
measures.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, batchThrottledLatency);

// Reset batch throttling latency for next attempt. This can't be done in attemptStarted
// because batching flow control will add batching latency before the attempt has started.
batchThrottledLatency = 0;
}

// Patch the throwable until it's fixed in gax. When an attempt failed,
// it'll throw a ServerStreamingAttemptException. Unwrap the exception
// so it could get processed by extractStatus
Expand Down Expand Up @@ -216,11 +227,8 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa

@Override
public void batchRequestThrottled(long totalThrottledMs) {
MeasureMap measures =
stats
.newMeasureMap()
.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs);
measures.record(newTagCtxBuilder().build());
reportBatchingLatency = true;
batchThrottledLatency += totalThrottledMs;
}

private TagContextBuilder newTagCtxBuilder() {
Expand Down
Expand Up @@ -20,10 +20,8 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

import com.google.api.gax.batching.BatchResource;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.batching.BatchingDescriptor;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
Expand Down Expand Up @@ -387,45 +385,38 @@ public Object answer(InvocationOnMock invocation) {
.when(mockService)
.readRows(any(ReadRowsRequest.class), any());

try (Batcher batcher =
try (Batcher<ByteString, Row> batcher =
stub.newBulkReadRowsBatcher(Query.create(TABLE_ID), GrpcCallContext.createDefault())) {
batcher.add(ByteString.copyFromUtf8("row1"));
batcher.sendOutstanding();

long throttledTimeMetric =
StatsTestUtils.getAggregationValueAsLong(
localStats,
RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW,
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
assertThat(throttledTimeMetric).isEqualTo(0);
}

long throttledTimeMetric =
StatsTestUtils.getAggregationValueAsLong(
localStats,
RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW,
ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
assertThat(throttledTimeMetric).isEqualTo(0);
}

@Test
public void testBatchMutateRowsThrottledTime() throws Exception {
FlowController flowController = Mockito.mock(FlowController.class);
BatchingDescriptor batchingDescriptor = Mockito.mock(MutateRowsBatchingDescriptor.class);
when(batchingDescriptor.createResource(any())).thenReturn(new FakeBatchResource());
when(batchingDescriptor.createEmptyResource()).thenReturn(new FakeBatchResource());
MutateRowsBatchingDescriptor batchingDescriptor = new MutateRowsBatchingDescriptor();

// Mock throttling
final long throttled = 50;
doAnswer(
new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(throttled);
return null;
}
invocation -> {
Thread.sleep(throttled);
return null;
})
.when(flowController)
.reserve(any(Long.class), any(Long.class));
when(flowController.getMaxElementCountLimit()).thenReturn(null);
when(flowController.getMaxRequestBytesLimit()).thenReturn(null);
when(batchingDescriptor.newRequestBuilder(any())).thenCallRealMethod();

doAnswer(
new Answer() {
Expand All @@ -444,18 +435,18 @@ public Object answer(InvocationOnMock invocation) {

ApiCallContext defaultContext = GrpcCallContext.createDefault();

Batcher batcher =
new BatcherImpl(
try (Batcher<RowMutationEntry, Void> batcher =
new BatcherImpl<>(
batchingDescriptor,
stub.bulkMutateRowsCallable().withDefaultCallContext(defaultContext),
BulkMutation.create(TABLE_ID),
settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(),
Executors.newSingleThreadScheduledExecutor(),
flowController,
defaultContext);
defaultContext)) {

batcher.add(RowMutationEntry.create("key"));
batcher.sendOutstanding();
batcher.add(RowMutationEntry.create("key").deleteRow());
}

long throttledTimeMetric =
StatsTestUtils.getAggregationValueAsLong(
Expand All @@ -473,29 +464,4 @@ public Object answer(InvocationOnMock invocation) {
private static <T> StreamObserver<T> anyObserver(Class<T> returnType) {
return (StreamObserver<T>) any(returnType);
}

private class FakeBatchResource implements BatchResource {

FakeBatchResource() {}

@Override
public BatchResource add(BatchResource resource) {
return new FakeBatchResource();
}

@Override
public long getElementCount() {
return 1;
}

@Override
public long getByteCount() {
return 1;
}

@Override
public boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold) {
return false;
}
}
}
Expand Up @@ -299,6 +299,11 @@ public static long getAggregationValueAsLong(

AggregationData aggregationData = aggregationMap.get(tagValues);

if (aggregationData == null) {
throw new RuntimeException(
"Failed to find metric for: " + tags + ". Current aggregation data: " + aggregationMap);
}

return aggregationData.match(
new io.opencensus.common.Function<AggregationData.SumDataDouble, Long>() {
@Override
Expand Down

0 comments on commit e6cc5f6

Please sign in to comment.