diff --git a/census/src/main/java/io/grpc/census/CensusStatsModule.java b/census/src/main/java/io/grpc/census/CensusStatsModule.java index 03eaf73570d..6a33a8889a9 100644 --- a/census/src/main/java/io/grpc/census/CensusStatsModule.java +++ b/census/src/main/java/io/grpc/census/CensusStatsModule.java @@ -17,6 +17,7 @@ package io.grpc.census; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.census.internal.ObservabilityCensusConstants.API_LATENCY_PER_CALL; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; @@ -409,6 +410,7 @@ static final class CallAttemptsTracerFactory extends ClientTracer inboundMetricTracer; private final CensusStatsModule module; private final Stopwatch stopwatch; + private final Stopwatch callStopwatch; @GuardedBy("lock") private boolean callEnded; private final TagContext parentCtx; @@ -423,6 +425,7 @@ static final class CallAttemptsTracerFactory extends private final Object lock = new Object(); // write @GuardedBy("lock") and happens before read private long retryDelayNanos; + private long callLatencyNanos; @GuardedBy("lock") private int activeStreams; @GuardedBy("lock") @@ -434,6 +437,7 @@ static final class CallAttemptsTracerFactory extends this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.stopwatch = module.stopwatchSupplier.get(); + this.callStopwatch = module.stopwatchSupplier.get().start(); TagValue methodTag = TagValue.create(fullMethodName); startCtx = module.tagger.toBuilder(parentCtx) .putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag) @@ -495,6 +499,7 @@ void callEnded(Status status) { if (!module.recordFinishedRpcs) { return; } + callStopwatch.stop(); this.status = status; boolean shouldRecordFinishedCall = false; synchronized (lock) { @@ -532,10 +537,12 @@ void recordFinishedCall() { if (attempts > 0) { retriesPerCall = attempts - 1; } + callLatencyNanos = callStopwatch.elapsed(TimeUnit.NANOSECONDS); MeasureMap measureMap = module.statsRecorder.newMeasureMap() .put(RETRIES_PER_CALL, retriesPerCall) .put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get()) - .put(RETRY_DELAY_PER_CALL, retryDelayNanos / NANOS_PER_MILLI); + .put(RETRY_DELAY_PER_CALL, retryDelayNanos / NANOS_PER_MILLI) + .put(API_LATENCY_PER_CALL, callLatencyNanos / NANOS_PER_MILLI); TagValue methodTag = TagValue.create(fullMethodName); TagValue statusTag = TagValue.create(status.getCode().toString()); measureMap.record( diff --git a/census/src/main/java/io/grpc/census/internal/ObservabilityCensusConstants.java b/census/src/main/java/io/grpc/census/internal/ObservabilityCensusConstants.java index 681957c7431..a944e4ffd49 100644 --- a/census/src/main/java/io/grpc/census/internal/ObservabilityCensusConstants.java +++ b/census/src/main/java/io/grpc/census/internal/ObservabilityCensusConstants.java @@ -25,18 +25,43 @@ import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER_SENT_BYTES_PER_RPC; import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER_STATUS; +import com.google.common.annotations.VisibleForTesting; import io.opencensus.contrib.grpc.metrics.RpcViewConstants; import io.opencensus.stats.Aggregation; +import io.opencensus.stats.Measure; +import io.opencensus.stats.Measure.MeasureDouble; import io.opencensus.stats.View; import java.util.Arrays; -/** Temporary holder class for the observability specific OpenCensus constants. - * The class will be removed once the new views are added in OpenCensus library. */ +// TODO(dnvindhya): Remove metric and view definitions from this class once it is moved to +// OpenCensus library. +/** + * Temporary holder class for the observability specific OpenCensus constants. The class will be + * removed once the new views are added in OpenCensus library. + */ +@VisibleForTesting public final class ObservabilityCensusConstants { static final Aggregation AGGREGATION_WITH_BYTES_HISTOGRAM = RpcViewConstants.GRPC_CLIENT_SENT_BYTES_PER_RPC_VIEW.getAggregation(); + static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM = + RpcViewConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY_VIEW.getAggregation(); + + public static final MeasureDouble API_LATENCY_PER_CALL = + Measure.MeasureDouble.create( + "grpc.io/client/api_latency", + "Time taken by gRPC to complete an RPC from application's perspective", + "ms"); + + public static final View GRPC_CLIENT_API_LATENCY_VIEW = + View.create( + View.Name.create("grpc.io/client/api_latency"), + "Time taken by gRPC to complete an RPC from application's perspective", + API_LATENCY_PER_CALL, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + Arrays.asList(GRPC_CLIENT_METHOD, GRPC_CLIENT_STATUS)); + public static final View GRPC_CLIENT_SENT_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW = View.create( View.Name.create("grpc.io/client/sent_compressed_message_bytes_per_rpc"), diff --git a/census/src/test/java/io/grpc/census/CensusModulesTest.java b/census/src/test/java/io/grpc/census/CensusModulesTest.java index 9768797d579..b651f559d21 100644 --- a/census/src/test/java/io/grpc/census/CensusModulesTest.java +++ b/census/src/test/java/io/grpc/census/CensusModulesTest.java @@ -21,6 +21,7 @@ import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES_PER_CALL; import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL; import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL; +import static io.grpc.census.internal.ObservabilityCensusConstants.API_LATENCY_PER_CALL; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -63,6 +64,7 @@ import io.grpc.Status; import io.grpc.census.CensusTracingModule.CallAttemptsTracerFactory; import io.grpc.census.internal.DeprecatedCensusConstants; +import io.grpc.census.internal.ObservabilityCensusConstants; import io.grpc.internal.FakeClock; import io.grpc.internal.testing.StatsTestUtils; import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; @@ -121,6 +123,8 @@ */ @RunWith(JUnit4.class) public class CensusModulesTest { + + private static final double TOLERANCE = 1e-6; private static final CallOptions.Key CUSTOM_OPTION = CallOptions.Key.createWithDefault("option1", "default"); private static final CallOptions CALL_OPTIONS = @@ -368,7 +372,7 @@ record = statsRecorder.pollRecord(); .setSampleToLocalSpanStore(false) .build()); verify(spyClientSpan, never()).end(); - assertZeroRetryRecorded(); + assertPerCallMetrics(0D); } @Test @@ -503,7 +507,7 @@ private void subtestClientBasicStatsDefaultContext( DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); assertEquals(30 + 100 + 16 + 24, record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY)); - assertZeroRetryRecorded(); + assertPerCallMetrics(30D + 100 + 16 + 24); } else { assertNull(statsRecorder.pollRecord()); } @@ -691,6 +695,8 @@ record = statsRecorder.pollRecord(); assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(1); assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(2); assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(1000D + 10 + 10); + assertThat(record.getMetric(API_LATENCY_PER_CALL)) + .isEqualTo(30D + 100 + 24 + 1000 + 100 + 10 + 10 + 16 + 24); } private void assertRealTimeMetric( @@ -716,13 +722,14 @@ private void assertRealTimeMetric( assertEquals(expectedValue, record.getMetricAsLongOrFail(measure)); } - private void assertZeroRetryRecorded() { + private void assertPerCallMetrics(double expectedLatencyValue) { StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); assertEquals(method.getFullMethodName(), methodTag.asString()); assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(0); assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0); assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D); + assertThat(record.getMetric(API_LATENCY_PER_CALL)).isEqualTo(expectedLatencyValue); } @Test @@ -849,7 +856,7 @@ record = statsRecorder.pollRecord(); 3000, record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY)); assertNull(record.getMetric(RpcMeasureConstants.GRPC_CLIENT_SERVER_LATENCY)); - assertZeroRetryRecorded(); + assertPerCallMetrics(3000D); } @Test @@ -989,7 +996,7 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS assertNull(clientRecord.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)); TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG); assertEquals("extra-tag-value-897", clientPropagatedTag.asString()); - assertZeroRetryRecorded(); + assertPerCallMetrics(0D); } if (!recordStats) { @@ -1507,6 +1514,81 @@ public Long apply(AggregationData arg) { }); } + @Test + public void callLatencyView() throws InterruptedException { + StatsComponent localStats = new StatsComponentImpl(); + + localStats + .getViewManager() + .registerView(ObservabilityCensusConstants.GRPC_CLIENT_API_LATENCY_VIEW); + + CensusStatsModule localCensusStats = new CensusStatsModule( + tagger, tagCtxSerializer, localStats.getStatsRecorder(), fakeClock.getStopwatchSupplier(), + false, false, true, false /* real-time */, true); + + CensusStatsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new CensusStatsModule.CallAttemptsTracerFactory( + localCensusStats, tagger.empty(), method.getFullMethodName()); + + Metadata headers = new Metadata(); + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, headers); + tracer.streamCreated(Attributes.EMPTY, headers); + fakeClock.forwardTime(50, MILLISECONDS); + Status status = Status.OK.withDescription("Success"); + tracer.streamClosed(status); + callAttemptsTracerFactory.callEnded(status); + + // Give OpenCensus a chance to update the views asynchronously. + Thread.sleep(100); + + assertDistributionData( + localStats, + ObservabilityCensusConstants.GRPC_CLIENT_API_LATENCY_VIEW, + ImmutableList.of(TagValue.create(method.getFullMethodName()), TagValue.create("OK")), + 50.0, 1, 0.0, + ImmutableList.of( + 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 1L, + 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L)); + } + + private void assertDistributionData(StatsComponent localStats, View view, + List dimension, double mean, long count, double sumOfSquaredDeviations, + List expectedBucketCounts) { + AggregationData aggregationData = localStats.getViewManager() + .getView(view.getName()) + .getAggregationMap() + .get(dimension); + + aggregationData.match( + Functions.throwAssertionError(), + Functions.throwAssertionError(), + Functions.throwAssertionError(), + /* p3= */ new Function() { + @Override + public Void apply(AggregationData.DistributionData arg) { + assertThat(arg.getMean()).isWithin(TOLERANCE).of(mean); + assertThat(arg.getCount()).isEqualTo(count); + assertThat(arg.getSumOfSquaredDeviations()) + .isWithin(TOLERANCE) + .of(sumOfSquaredDeviations); + assertThat(arg.getBucketCounts()) + .containsExactlyElementsIn(expectedBucketCounts) + .inOrder(); + return null; + } + }, + Functions.throwAssertionError(), + Functions.throwAssertionError(), + new Function() { + @Override + public Void apply(AggregationData arg) { + assertThat(((AggregationData.DistributionData) arg).getCount()).isEqualTo(count); + return null; + } + }); + } + static class CallInfo extends ServerCallInfo { private final MethodDescriptor methodDescriptor; private final Attributes attributes; diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index 2a9d03e3156..2b7a106d6de 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -154,6 +154,7 @@ private static void registerObservabilityViews() { viewManager.registerView(RpcViewConstants.GRPC_CLIENT_COMPLETED_RPC_VIEW); viewManager.registerView(RpcViewConstants.GRPC_CLIENT_STARTED_RPC_VIEW); viewManager.registerView(RpcViewConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY_VIEW); + viewManager.registerView(ObservabilityCensusConstants.GRPC_CLIENT_API_LATENCY_VIEW); viewManager.registerView( ObservabilityCensusConstants.GRPC_CLIENT_SENT_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW); viewManager.registerView(