Skip to content
Permalink
Browse files
feat: Surface the server-timing metric (#535)
* Extract server-timing trailer and create metrics for gfe latency

* Add more tests and refactor

* Refactor comments and imports

* reformatting

* Clean up comments

* Refactor, use GrpcMetadataResponse to get the trailer

* Fix based on the code review

* clean up HeaderTracerResponseObserver

* Add more tests for all the ops

* Improve documents, changes for directPath and more tests

* Small fixes in the doc

* small clean up
  • Loading branch information
mutianf committed Dec 17, 2020
1 parent 3f28923 commit 8240779434a602dc8b2bf90dbe539c5d7470d850
Showing with 1,161 additions and 130 deletions.
  1. +13 −1 README.md
  2. +14 −0 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java
  3. +66 −32 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
  4. +23 −0 ...d-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
  5. +123 −0 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracer.java
  6. +125 −0 ...e/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java
  7. +83 −0 ...table/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java
  8. +14 −0 ...ud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java
  9. +22 −0 ...cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java
  10. +34 −0 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java
  11. +30 −6 ...gtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java
  12. +428 −0 ...gtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java
  13. +77 −0 ...cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerTest.java
  14. +30 −91 ...loud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
  15. +79 −0 ...e-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java
@@ -296,12 +296,22 @@ metrics will be tagged with:
each client RPC, tagged by operation name and the attempt status. Under normal
circumstances, this will be identical to op_latency. However, when the client
receives transient errors, op_latency will be the sum of all attempt_latencies
and the exponential delays
and the exponential delays.

* `cloud.google.com/java/bigtable/attempts_per_op`: A distribution of attempts that
each operation required, tagged by operation name and final operation status.
Under normal circumstances, this will be 1.

### GFE metric views:

* `cloud.google.com/java/bigtable/gfe_latency`: A distribution of the latency
between Google's network receives an RPC and reads back the first byte of
the response.

* `cloud.google.com/java/bigtable/gfe_header_missing_count`: A counter of the
number of RPC responses received without the server-timing header, which
indicates that the request probably never reached Google's network.


By default, the functionality is disabled. For example to enable metrics using
[Google Stackdriver](https://cloud.google.com/monitoring/docs/):
@@ -357,6 +367,8 @@ StackdriverStatsExporter.createAndRegister(
);
BigtableDataSettings.enableOpenCensusStats();
// Enable GFE metric views
BigtableDataSettings.enableGfeOpenCensusStats();
```

## Version Conflicts
@@ -175,6 +175,20 @@ public static void enableOpenCensusStats() {
// io.opencensus.contrib.grpc.metrics.RpcViews.registerClientGrpcBasicViews();
}

/**
* Enables OpenCensus GFE metric aggregations.
*
* <p>This will register views for gfe_latency and gfe_header_missing_count metrics.
*
* <p>gfe_latency measures the latency between Google's network receives an RPC and reads back the
* first byte of the response. gfe_header_missing_count is a counter of the number of RPC
* responses received without the server-timing header.
*/
@BetaApi("OpenCensus stats integration is currently unstable and may change in the future")
public static void enableGfeOpenCensusStats() {
com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews();
}

/** Returns the target project id. */
public String getProjectId() {
return stubSettings.getProjectId();
@@ -66,6 +66,8 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
@@ -162,6 +164,15 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build());
}

ImmutableMap<TagKey, TagValue> attributes =
ImmutableMap.<TagKey, TagValue>builder()
.put(RpcMeasureConstants.BIGTABLE_PROJECT_ID, TagValue.create(settings.getProjectId()))
.put(
RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(settings.getInstanceId()))
.put(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
@@ -187,23 +198,17 @@ public static EnhancedBigtableStubSettings finalizeSettings(
GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class))
.build()),
// Add OpenCensus Metrics
MetricsTracerFactory.create(
tagger,
stats,
ImmutableMap.<TagKey, TagValue>builder()
.put(
RpcMeasureConstants.BIGTABLE_PROJECT_ID,
TagValue.create(settings.getProjectId()))
.put(
RpcMeasureConstants.BIGTABLE_INSTANCE_ID,
TagValue.create(settings.getInstanceId()))
.put(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build()),
MetricsTracerFactory.create(tagger, stats, attributes),
// Add user configured tracer
settings.getTracerFactory())));

builder.setHeaderTracer(
builder
.getHeaderTracer()
.toBuilder()
.setStats(stats)
.setTagger(tagger)
.setStatsAttributes(attributes)
.build());
return builder.build();
}

@@ -268,11 +273,10 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");
ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowsUserCallable,
clientContext.getTracerFactory(),
SpanName.of(CLIENT_NAME, "ReadRows"));
readRowsUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
@@ -315,6 +319,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured by the {@code rowAdapter} parameter.
* <li>Add header tracer for tracking GFE metrics.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
@@ -356,10 +361,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable<ReadRowsRequest, RowT> watched =
Callables.watched(merging, innerSettings, clientContext);

ServerStreamingCallable<ReadRowsRequest, RowT> withHeaderTracer =
new HeaderTracerStreamingCallable<>(
watched, settings.getHeaderTracer(), getSpanName("ReadRows").toString());

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(watched);
new ReadRowsRetryCompletedCallable<>(withHeaderTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
@@ -380,6 +389,8 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
* </ul>
*/
private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
String methodName = "SampleRowKeys";

ServerStreamingCallable<SampleRowKeysRequest, SampleRowKeysResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<SampleRowKeysRequest, SampleRowKeysResponse>newBuilder()
@@ -399,11 +410,15 @@ public Map<String, String> extract(

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> spoolable = base.all();

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
spoolable, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(spoolable, settings.sampleRowKeysSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);

return createUserFacingUnaryCallable(
"SampleRowKeys", new SampleRowKeysCallable(retryable, requestContext));
methodName, new SampleRowKeysCallable(retryable, requestContext));
}

/**
@@ -415,6 +430,7 @@ public Map<String, String> extract(
* </ul>
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
String methodName = "MutateRow";
UnaryCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<MutateRowRequest, MutateRowResponse>newBuilder()
@@ -431,11 +447,15 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(base, settings.mutateRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"MutateRow", new MutateRowCallable(retrying, requestContext));
methodName, new MutateRowCallable(retrying, requestContext));
}

/**
@@ -459,11 +479,13 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<BulkMutation, Void> userFacing =
new BulkMutateRowsUserFacingCallable(baseCallable, requestContext);

SpanName spanName = getSpanName("MutateRows");
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(
userFacing, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, "MutateRows"));
new TracedUnaryCallable<>(userFacing, clientContext.getTracerFactory(), spanName);
UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(traced, settings.getHeaderTracer(), spanName.toString());

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return withHeaderTracer.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
@@ -569,6 +591,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
* </ul>
*/
private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCallable() {
String methodName = "CheckAndMutateRow";
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<CheckAndMutateRowRequest, CheckAndMutateRowResponse>newBuilder()
@@ -586,11 +609,15 @@ public Map<String, String> extract(
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(base, settings.checkAndMutateRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"CheckAndMutateRow", new CheckAndMutateRowCallable(retrying, requestContext));
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
}

/**
@@ -619,12 +646,16 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());
String methodName = "ReadModifyWriteRow";
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(base, settings.readModifyWriteRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"ReadModifyWriteRow", new ReadModifyWriteRowCallable(retrying, requestContext));
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
}

/**
@@ -635,8 +666,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
String methodName, UnaryCallable<RequestT, ResponseT> inner) {

UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(
inner, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, methodName));
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
@@ -686,6 +716,10 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
}
// </editor-fold>

private SpanName getSpanName(String methodName) {
return SpanName.of(CLIENT_NAME, methodName);
}

@Override
public void close() {
for (BackgroundResource backgroundResource : clientContext.getBackgroundResources()) {
@@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.common.base.MoreObjects;
@@ -154,6 +155,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final String appProfileId;
private final boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private HeaderTracer headerTracer;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
@@ -187,6 +189,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
appProfileId = builder.appProfileId;
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
headerTracer = builder.headerTracer;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
@@ -231,6 +234,11 @@ public List<String> getPrimedTableIds() {
return primedTableIds;
}

/** Gets the tracer for capturing metrics in the header. */
HeaderTracer getHeaderTracer() {
return headerTracer;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
@@ -488,6 +496,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private String appProfileId;
private boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private HeaderTracer headerTracer;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
@@ -511,6 +520,7 @@ private Builder() {
this.appProfileId = SERVER_DEFAULT_APP_PROFILE_ID;
this.isRefreshingChannel = false;
primedTableIds = ImmutableList.of();
headerTracer = HeaderTracer.newBuilder().build();
setCredentialsProvider(defaultCredentialsProviderBuilder().build());

// Defaults provider
@@ -617,6 +627,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
appProfileId = settings.appProfileId;
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
headerTracer = settings.headerTracer;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
@@ -739,6 +750,17 @@ public List<String> getPrimedTableIds() {
return primedTableIds;
}

/** Configure the header tracer for surfacing metrics in the header. */
Builder setHeaderTracer(HeaderTracer headerTracer) {
this.headerTracer = headerTracer;
return this;
}

/** Gets the header tracer that'll be used to surface metrics in the header. */
HeaderTracer getHeaderTracer() {
return headerTracer;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
@@ -818,6 +840,7 @@ public String toString() {
.add("appProfileId", appProfileId)
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
.add("headerTracer", headerTracer)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)

0 comments on commit 8240779

Please sign in to comment.