diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go new file mode 100644 index 00000000000..30793f0c534 --- /dev/null +++ b/stats/opentelemetry/client_metrics.go @@ -0,0 +1,219 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opentelemetry + +import ( + "context" + "sync/atomic" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type clientStatsHandler struct { + o Options + + clientMetrics clientMetrics +} + +func (csh *clientStatsHandler) initializeMetrics() { + // Will set no metrics to record, logically making this stats handler a + // no-op. + if csh.o.MetricsOptions.MeterProvider == nil { + return + } + + meter := csh.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version) + if meter == nil { + return + } + + setOfMetrics := csh.o.MetricsOptions.Metrics.metrics + + clientMetrics := clientMetrics{} + + clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, metric.WithUnit("attempt"), metric.WithDescription("Number of client call attempts started.")) + clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, metric.WithUnit("s"), metric.WithDescription("End-to-end time taken to complete a client call attempt."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes sent per client call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes received per call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, metric.WithUnit("s"), metric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + + csh.clientMetrics = clientMetrics +} + +func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ci := &callInfo{ + target: csh.determineTarget(cc), + method: removeLeadingSlash(csh.determineMethod(method, opts...)), + } + ctx = setCallInfo(ctx, ci) + + startTime := time.Now() + err := invoker(ctx, method, req, reply, cc, opts...) + csh.perCallMetrics(ctx, err, startTime, ci) + return err +} + +// determineTarget determines the target to record attributes with. This will be +// "other" if target filter is set and specifies, the target name as is +// otherwise. +func (csh *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string { + target := cc.CanonicalTarget() + if f := csh.o.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) { + target = "other" + } + return target +} + +// determineMethod determines the method to record attributes with. This will be +// "other" if StaticMethod isn't specified or if method filter is set and +// specifies, the method name as is otherwise. +func (csh *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOption) string { + for _, opt := range opts { + if _, ok := opt.(grpc.StaticMethodCallOption); ok { + return method + } + } + return "other" +} + +func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + ci := &callInfo{ + target: csh.determineTarget(cc), + method: removeLeadingSlash(csh.determineMethod(method, opts...)), + } + ctx = setCallInfo(ctx, ci) + startTime := time.Now() + + callback := func(err error) { + csh.perCallMetrics(ctx, err, startTime, ci) + } + opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) + return streamer(ctx, desc, cc, method, opts...) +} + +func (csh *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { + s := status.Convert(err) + callLatency := float64(time.Since(startTime)) / float64(time.Second) + if csh.clientMetrics.callDuration != nil { + csh.clientMetrics.callDuration.Record(ctx, callLatency, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", canonicalString(s.Code())))) + } +} + +// TagConn exists to satisfy stats.Handler. +func (csh *clientStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} + +// HandleConn exists to satisfy stats.Handler. +func (csh *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {} + +// TagRPC implements per RPC attempt context management. +func (csh *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + mi := &metricsInfo{ // populates information about RPC start. + startTime: time.Now(), + } + ri := &rpcInfo{ + mi: mi, + } + return setRPCInfo(ctx, ri) +} + +func (csh *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + ri := getRPCInfo(ctx) + if ri == nil { + // Shouldn't happen because TagRPC populates this information. + return + } + csh.processRPCEvent(ctx, rs, ri.mi) +} + +func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, mi *metricsInfo) { + switch st := s.(type) { + case *stats.InHeader, *stats.OutHeader, *stats.InTrailer, *stats.OutTrailer: + case *stats.Begin: + ci := getCallInfo(ctx) + if ci == nil { + logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present") + return + } + + if csh.clientMetrics.attemptStarted != nil { + csh.clientMetrics.attemptStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target))) + } + case *stats.OutPayload: + atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength)) + case *stats.InPayload: + atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength)) + case *stats.End: + csh.processRPCEnd(ctx, mi, st) + default: + // Shouldn't happen. gRPC calls into stats handler, and will never not + // be one of the types above. + logger.Errorf("Received unexpected stats type (%T) with data: %v", s, s) + } +} + +func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) { + ci := getCallInfo(ctx) + if ci == nil { + // Shouldn't happen, set by interceptor, defensive programming. + return + } + latency := float64(time.Since(mi.startTime)) / float64(time.Second) + var st string + if e.Error != nil { + s, _ := status.FromError(e.Error) + st = canonicalString(s.Code()) + } else { + st = "OK" + } + clientAttributeOption := metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", st)) + if csh.clientMetrics.attemptDuration != nil { + csh.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption) + } + + if csh.clientMetrics.attemptSentTotalCompressedMessageSize != nil { + csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), clientAttributeOption) + } + + if csh.clientMetrics.attemptRcvdTotalCompressedMessageSize != nil { + csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), clientAttributeOption) + } +} + +const ( + // ClientAttemptStarted is the number of client call attempts started. + ClientAttemptStarted Metric = "grpc.client.attempt.started" + // ClientAttemptDuration is the end-to-end time taken to complete a client + // call attempt. + ClientAttemptDuration Metric = "grpc.client.attempt.duration" + // ClientAttemptSentCompressedTotalMessageSize is the compressed message + // bytes sent per client call attempt. + ClientAttemptSentCompressedTotalMessageSize Metric = "grpc.client.attempt.sent_total_compressed_message_size" + // ClientAttemptRcvdCompressedTotalMessageSize is the compressed message + // bytes received per call attempt. + ClientAttemptRcvdCompressedTotalMessageSize Metric = "grpc.client.attempt.rcvd_total_compressed_message_size" + // ClientCallDuration is the time taken by gRPC to complete an RPC from + // application's perspective. + ClientCallDuration Metric = "grpc.client.call.duration" +) diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go new file mode 100644 index 00000000000..399327b860e --- /dev/null +++ b/stats/opentelemetry/e2e_test.go @@ -0,0 +1,709 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opentelemetry + +import ( + "context" + "fmt" + "io" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +func Example_dialOption() { + // This is setting default bounds for a view. Setting these bounds through + // meter provider from SDK is recommended, as API calls in this module + // provide default bounds, but these calls are not guaranteed to be stable + // and API implementors are not required to implement bounds. Setting bounds + // through SDK ensures the bounds get picked up. The specific fields in + // Aggregation take precedence over defaults from API. For any fields unset + // in aggregation, defaults get picked up, so can have a mix of fields from + // SDK and fields created from API call. The overridden views themselves + // also follow same logic, only the specific views being created in the SDK + // use SDK information, the rest are created from API call. + reader := metric.NewManualReader() + provider := metric.NewMeterProvider( + metric.WithReader(reader), + metric.WithView(metric.NewView(metric.Instrument{ + Name: "grpc.client.call.duration", + }, + metric.Stream{ + Aggregation: metric.AggregationExplicitBucketHistogram{ + Boundaries: DefaultSizeBounds, // The specific fields set in SDK take precedence over API. + }, + }, + )), + ) + + opts := Options{ + MetricsOptions: MetricsOptions{ + MeterProvider: provider, + Metrics: DefaultMetrics, // equivalent to unset - distinct from empty + TargetAttributeFilter: func(str string) bool { + if strings.HasPrefix(str, "dns") { // Filter out DNS targets. + return false + } + return true + }, + }, + } + do := DialOption(opts) + cc, _ := grpc.NewClient("", do) + // Handle err. + if cc != nil { + defer cc.Close() + } +} + +func Example_serverOption() { + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + opts := Options{ + MetricsOptions: MetricsOptions{ + MeterProvider: provider, + Metrics: DefaultMetrics, + MethodAttributeFilter: func(str string) bool { + if str == "/grpc.testing.TestService/UnaryCall" { + return false + } + // Will allow duplex/any other type of RPC. + return true + }, + }, + } + cc, _ := grpc.NewClient("some-target", DialOption(opts)) + // Handle err. + defer cc.Close() +} + +func ExampleMetrics_excludeSome() { + // To exclude specific metrics, initialize Options as follows: + opts := Options{ + MetricsOptions: MetricsOptions{ + Metrics: DefaultMetrics.Remove(ClientAttemptDuration, ClientAttemptRcvdCompressedTotalMessageSize), + }, + } + do := DialOption(opts) + cc, _ := grpc.NewClient("", do) + // Handle err. + if cc != nil { + defer cc.Close() + } +} + +func ExampleMetrics_disableAll() { + // To disable all metrics, initialize Options as follows: + opts := Options{ + MetricsOptions: MetricsOptions{ + Metrics: NewMetrics(), // Distinct to nil, which creates default metrics. This empty set creates no metrics. + }, + } + do := DialOption(opts) + cc, _ := grpc.NewClient("", do) + // Handle err. + if cc != nil { + defer cc.Close() + } +} + +func ExampleMetrics_enableSome() { + // To only create specific metrics, initialize Options as follows: + opts := Options{ + MetricsOptions: MetricsOptions{ + Metrics: NewMetrics(ClientAttemptDuration, ClientAttemptRcvdCompressedTotalMessageSize), // only create these metrics + }, + } + do := DialOption(opts) + cc, _ := grpc.NewClient("", do) + // Handle err. + if cc != nil { + defer cc.Close() + } +} + +var defaultTestTimeout = 5 * time.Second + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// waitForServerCompletedRPCs waits until the unary and streaming stats.End +// calls are finished processing. It does this by waiting for the expected +// metric triggered by stats.End to appear through the passed in metrics reader. +func waitForServerCompletedRPCs(ctx context.Context, reader metric.Reader, wantMetric metricdata.Metrics, t *testing.T) (map[string]metricdata.Metrics, error) { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + val, ok := gotMetrics[wantMetric.Name] + if !ok { + continue + } + if !metricdatatest.AssertEqual(t, wantMetric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + continue + } + return gotMetrics, nil + } + return nil, fmt.Errorf("error waiting for metric %v: %v", wantMetric, ctx.Err()) +} + +// setup creates a stub server with OpenTelemetry component configured on client +// and server side. It returns a reader for metrics emitted from OpenTelemetry +// component and the server. +func setup(t *testing.T, tafOn bool, maf func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { + reader := metric.NewManualReader() + provider := metric.NewMeterProvider( + metric.WithReader(reader), + ) + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, nil + }, + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + } + }, + } + var taf func(string) bool + if tafOn { + taf = func(str string) bool { + if str == ss.Target { + return false + } + return true + } + } + if err := ss.Start([]grpc.ServerOption{ServerOption(Options{ + MetricsOptions: MetricsOptions{ + MeterProvider: provider, + Metrics: DefaultMetrics, + TargetAttributeFilter: taf, + MethodAttributeFilter: maf, + }})}, DialOption(Options{ + MetricsOptions: MetricsOptions{ + MeterProvider: provider, + Metrics: DefaultMetrics, + TargetAttributeFilter: taf, + MethodAttributeFilter: maf, + }, + })); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + return reader, ss +} + +// TestMethodTargetAttributeFilter tests the method and target attribute filter. +// The method and target filter set should bucket the grpc.method/grpc.target +// attribute into "other" if filter specifies. +func (s) TestMethodTargetAttributeFilter(t *testing.T) { + maf := func(str string) bool { + if str == "/grpc.testing.TestService/UnaryCall" { + return false + } + // Will allow duplex/any other type of RPC. + return true + } + // pull out setup into a helper + reader, ss := setup(t, true, maf) + defer ss.Stop() + + // make a single RPC (unary rpc), and filter out the target and method + // that would correspond. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + + wantMetrics := []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall"), attribute.String("grpc.target", "other")), + Value: 1, + }, + { + Attributes: attribute.NewSet(attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall"), attribute.String("grpc.target", "other")), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + } + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + for _, metric := range wantMetrics { + val, ok := gotMetrics[metric.Name] + if !ok { + t.Fatalf("metric %v not present in recorded metrics", metric.Name) + } + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Fatalf("metrics data type not equal for metric: %v", metric.Name) + } + } +} + +// assertDataPointWithinFiveSeconds asserts the metric passed in contains +// a histogram with dataPoints that fall within buckets that are <=5. +func assertDataPointWithinFiveSeconds(metric metricdata.Metrics) error { + histo, ok := metric.Data.(metricdata.Histogram[float64]) + if !ok { + return fmt.Errorf("metric data is not histogram") + } + for _, dataPoint := range histo.DataPoints { + var boundWithFive int + for i, bucket := range dataPoint.Bounds { + if bucket >= 5 { + boundWithFive = i + } + } + foundPoint := false + for i, bucket := range dataPoint.BucketCounts { + if i >= boundWithFive { + return fmt.Errorf("data point not found in bucket <=5 seconds") + } + if bucket == 1 { + foundPoint = true + break + } + } + if !foundPoint { + return fmt.Errorf("no data point found for metric") + } + } + return nil +} + +// TestAllMetricsOneFunction tests emitted metrics from OpenTelemetry +// instrumentation component. It then configures a system with a gRPC Client and +// gRPC server with the OpenTelemetry Dial and Server Option configured +// specifying all the metrics provided by this package, and makes a Unary RPC +// and a Streaming RPC. These two RPCs should cause certain recording for each +// registered metric observed through a Manual Metrics Reader on the provided +// OpenTelemetry SDK's Meter Provider. It then makes an RPC that is unregistered +// on the Client (no StaticMethodCallOption set) and Server. The method +// attribute on subsequent metrics should be bucketed in "other". +func (s) TestAllMetricsOneFunction(t *testing.T) { + reader, ss := setup(t, false, nil) + defer ss.Stop() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Make two RPC's, a unary RPC and a streaming RPC. These should cause + // certain metrics to be emitted, which should be able to be observed + // through the Metric Reader. + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, grpc.UseCompressor(gzip.Name)); err != nil { // Deterministic compression. + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + unaryMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/UnaryCall") + duplexMethodAttr := attribute.String("grpc.method", "grpc.testing.TestService/FullDuplexCall") + + targetAttr := attribute.String("grpc.target", ss.Target) + statusAttr := attribute.String("grpc.status", "OK") + + wantMetrics := []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr), + Value: 1, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, targetAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.client.attempt.duration", + Description: "End-to-end time taken to complete a client call attempt.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.sent_total_compressed_message_size", + Description: "Compressed message bytes sent per client call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + Min: metricdata.NewExtrema(int64(57)), + Max: metricdata.NewExtrema(int64(57)), + Sum: 57, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + Min: metricdata.NewExtrema(int64(0)), + Max: metricdata.NewExtrema(int64(0)), + Sum: 0, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.attempt.rcvd_total_compressed_message_size", + Description: "Compressed message bytes received per call attempt.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + Min: metricdata.NewExtrema(int64(57)), + Max: metricdata.NewExtrema(int64(57)), + Sum: 57, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + Min: metricdata.NewExtrema(int64(0)), + Max: metricdata.NewExtrema(int64(0)), + Sum: 0, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.client.call.duration", + Description: "Time taken by gRPC to complete an RPC from application's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, targetAttr, statusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.started", + Description: "Number of server calls started.", + Unit: "call", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr), + Value: 1, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.server.call.sent_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes sent per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, statusAttr), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + Min: metricdata.NewExtrema(int64(57)), + Max: metricdata.NewExtrema(int64(57)), + Sum: 57, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, statusAttr), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + Min: metricdata.NewExtrema(int64(0)), + Max: metricdata.NewExtrema(int64(0)), + Sum: 0, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.rcvd_total_compressed_message_size", + Unit: "By", + Description: "Compressed message bytes received per server call.", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, statusAttr), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: []uint64{0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + Min: metricdata.NewExtrema(int64(57)), + Max: metricdata.NewExtrema(int64(57)), + Sum: 57, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, statusAttr), + Count: 1, + Bounds: DefaultSizeBounds, + BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + Min: metricdata.NewExtrema(int64(0)), + Max: metricdata.NewExtrema(int64(0)), + Sum: 0, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "grpc.server.call.duration", + Description: "End-to-end time taken to complete a call from server transport's perspective.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, statusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, statusAttr), + Count: 1, + Bounds: DefaultLatencyBounds, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } + + for _, metric := range wantMetrics { + if metric.Name == "grpc.server.call.sent_total_compressed_message_size" || metric.Name == "grpc.server.call.rcvd_total_compressed_message_size" { + // Sync the metric reader to see the event because stats.End is + // handled async server side. Thus, poll until metrics created from + // stats.End show up. + if gotMetrics, err = waitForServerCompletedRPCs(ctx, reader, metric, t); err != nil { + t.Fatalf("error waiting for sent total compressed message size for metric: %v", metric.Name) + } + continue + } + + // If one of the duration metrics, ignore the bucket counts, and make + // sure it count falls within a bucket <= 5 seconds (maximum duration of + // test due to context). + val, ok := gotMetrics[metric.Name] + if !ok { + t.Fatalf("metric %v not present in recorded metrics", metric.Name) + } + if metric.Name == "grpc.client.attempt.duration" || metric.Name == "grpc.client.call.duration" || metric.Name == "grpc.server.call.duration" { + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), metricdatatest.IgnoreValue()) { + t.Fatalf("metrics data type not equal for metric: %v", metric.Name) + } + if err := assertDataPointWithinFiveSeconds(val); err != nil { + t.Fatalf("Data point not within five seconds for metric %v: %v", metric.Name, err) + } + continue + } + + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Fatalf("metrics data type not equal for metric: %v", metric.Name) + } + } + + // This Invoke doesn't pass the StaticMethodCallOption. Thus, the method + // attribute should become "other" on client side metrics. Since it is also + // not registered on the server either, it should also become "other" on the + // server metrics method attribute. + ss.CC.Invoke(ctx, "/grpc.testing.TestService/UnregisteredCall", nil, nil, []grpc.CallOption{}...) + + rm = &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + gotMetrics = map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + otherMethodAttr := attribute.String("grpc.method", "other") + wantMetrics = []metricdata.Metrics{ + { + Name: "grpc.client.attempt.started", + Description: "Number of client call attempts started.", + Unit: "attempt", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr, targetAttr), + Value: 1, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr, targetAttr), + Value: 1, + }, + { + Attributes: attribute.NewSet(otherMethodAttr, targetAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.server.call.started", + Description: "Number of server calls started.", + Unit: "call", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(unaryMethodAttr), + Value: 1, + }, + { + Attributes: attribute.NewSet(duplexMethodAttr), + Value: 1, + }, + { + Attributes: attribute.NewSet(otherMethodAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + } + for _, metric := range wantMetrics { + val, ok := gotMetrics[metric.Name] + if !ok { + t.Fatalf("metric %v not present in recorded metrics", metric.Name) + } + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Fatalf("metrics data type not equal for metric: %v", metric.Name) + } + } +} diff --git a/stats/opentelemetry/go.mod b/stats/opentelemetry/go.mod new file mode 100644 index 00000000000..1573ad9b4a8 --- /dev/null +++ b/stats/opentelemetry/go.mod @@ -0,0 +1,24 @@ +module google.golang.org/grpc/stats/opentelemetry + +go 1.20 + +replace google.golang.org/grpc => ../.. + +require ( + go.opentelemetry.io/otel v1.24.0 + go.opentelemetry.io/otel/metric v1.24.0 + go.opentelemetry.io/otel/sdk/metric v1.24.0 + google.golang.org/grpc v1.62.1 +) + +require ( + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + go.opentelemetry.io/otel/sdk v1.24.0 // indirect + go.opentelemetry.io/otel/trace v1.24.0 // indirect + golang.org/x/net v0.22.0 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect + google.golang.org/protobuf v1.33.0 // indirect +) diff --git a/stats/opentelemetry/go.sum b/stats/opentelemetry/go.sum new file mode 100644 index 00000000000..a2ec727fbf3 --- /dev/null +++ b/stats/opentelemetry/go.sum @@ -0,0 +1,30 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= +go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= +go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go new file mode 100644 index 00000000000..d15c042d88b --- /dev/null +++ b/stats/opentelemetry/opentelemetry.go @@ -0,0 +1,298 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package opentelemetry implements opentelemetry instrumentation code for +// gRPC-Go clients and servers. +package opentelemetry + +import ( + "context" + "strings" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal" + + "go.opentelemetry.io/otel/metric" +) + +var logger = grpclog.Component("otel-plugin") + +var canonicalString = internal.CanonicalString.(func(codes.Code) string) + +var joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.DialOption) + +// Metric is an identifier for a metric provided by this package. +type Metric string + +// Metrics is a set of metrics to record. Once created, Metrics is immutable, +// however Add and Remove can make copies with specific metrics added or +// removed, respectively. +type Metrics struct { + // metrics are the set of metrics to initialize. + metrics map[Metric]bool +} + +// NewMetrics returns a Metrics containing Metrics. +func NewMetrics(metrics ...Metric) *Metrics { + newMetrics := make(map[Metric]bool) + for _, metric := range metrics { + newMetrics[metric] = true + } + return &Metrics{ + metrics: newMetrics, + } +} + +// Add adds the metrics to the metrics set and returns a new copy with the +// additional metrics. +func (m *Metrics) Add(metrics ...Metric) *Metrics { + newMetrics := make(map[Metric]bool) + for metric := range m.metrics { + newMetrics[metric] = true + } + + for _, metric := range metrics { + newMetrics[metric] = true + } + return &Metrics{ + metrics: newMetrics, + } +} + +// Remove removes the metrics from the metrics set and returns a new copy with +// the metrics removed. +func (m *Metrics) Remove(metrics ...Metric) *Metrics { + newMetrics := make(map[Metric]bool) + for metric := range m.metrics { + newMetrics[metric] = true + } + + for _, metric := range metrics { + delete(newMetrics, metric) + } + return &Metrics{ + metrics: newMetrics, + } +} + +// Options are the options for OpenTelemetry instrumentation. +type Options struct { + // MetricsOptions are the metrics options for OpenTelemetry instrumentation. + MetricsOptions MetricsOptions +} + +// MetricsOptions are the metrics options for OpenTelemetry instrumentation. +type MetricsOptions struct { + // MeterProvider is the MeterProvider instance that will be used to create + // instruments. To enable metrics collection, set a meter provider. If + // unset, no metrics will be recorded. Any implementation knobs (i.e. views, + // bounds) set in the MeterProvider take precedence over the API calls from + // this interface. (i.e. it will create default views for unset views). + MeterProvider metric.MeterProvider + + // Metrics are the metrics to instrument. Will create instrument and record telemetry + // for corresponding metric supported by the client and server + // instrumentation components if applicable. If not set, the default metrics + // will be recorded. + Metrics *Metrics + + // TargetAttributeFilter is a callback that takes the target string of the + // channel and returns a bool representing whether to use target as a label + // value or use the string "other". If unset, will use the target string as + // is. This only applies for client side metrics. + TargetAttributeFilter func(string) bool + + // MethodAttributeFilter is to record the method name of RPCs handled by + // grpc.UnknownServiceHandler, but take care to limit the values allowed, as + // allowing too many will increase cardinality and could cause severe memory + // or performance problems. On Client Side, pass a + // grpc.StaticMethodCallOption as a call option into Invoke or NewStream. + // This only applies for server side metrics. + MethodAttributeFilter func(string) bool +} + +// DialOption returns a dial option which enables OpenTelemetry instrumentation +// code for a grpc.ClientConn. +// +// Client applications interested in instrumenting their grpc.ClientConn should +// pass the dial option returned from this function as a dial option to +// grpc.NewClient(). +// +// For the metrics supported by this instrumentation code, specify the client +// metrics to record in metrics options. Also provide an implementation of a +// MeterProvider. If the passed in Meter Provider does not have the view +// configured for an individual metric turned on, the API call in this component +// will create a default view for that metric. +func DialOption(o Options) grpc.DialOption { + csh := &clientStatsHandler{o: o} + csh.initializeMetrics() + return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh)) +} + +// ServerOption returns a server option which enables OpenTelemetry +// instrumentation code for a grpc.Server. +// +// Server applications interested in instrumenting their grpc.Server should pass +// the server option returned from this function as an argument to +// grpc.NewServer(). +// +// For the metrics supported by this instrumentation code, specify the client +// metrics to record in metrics options. Also provide an implementation of a +// MeterProvider. If the passed in Meter Provider does not have the view +// configured for an individual metric turned on, the API call in this component +// will create a default view for that metric. +func ServerOption(o Options) grpc.ServerOption { + ssh := &serverStatsHandler{o: o} + ssh.initializeMetrics() + return grpc.StatsHandler(ssh) +} + +// callInfo is information pertaining to the lifespan of the RPC client side. +type callInfo struct { + target string + + method string +} + +type callInfoKey struct{} + +func setCallInfo(ctx context.Context, ci *callInfo) context.Context { + return context.WithValue(ctx, callInfoKey{}, ci) +} + +// getCallInfo returns the callInfo stored in the context, or nil +// if there isn't one. +func getCallInfo(ctx context.Context) *callInfo { + ci, _ := ctx.Value(callInfoKey{}).(*callInfo) + return ci +} + +// rpcInfo is RPC information scoped to the RPC attempt life span client side, +// and the RPC life span server side. +type rpcInfo struct { + mi *metricsInfo +} + +type rpcInfoKey struct{} + +func setRPCInfo(ctx context.Context, ri *rpcInfo) context.Context { + return context.WithValue(ctx, rpcInfoKey{}, ri) +} + +// getRPCInfo returns the rpcInfo stored in the context, or nil +// if there isn't one. +func getRPCInfo(ctx context.Context) *rpcInfo { + ri, _ := ctx.Value(rpcInfoKey{}).(*rpcInfo) + return ri +} + +func removeLeadingSlash(mn string) string { + return strings.TrimLeft(mn, "/") +} + +// metricsInfo is RPC information scoped to the RPC attempt life span client +// side, and the RPC life span server side. +type metricsInfo struct { + // access these counts atomically for hedging in the future: + // number of bytes after compression (within each message) from side (client + // || server). + sentCompressedBytes int64 + // number of compressed bytes received (within each message) received on + // side (client || server). + recvCompressedBytes int64 + + startTime time.Time + method string + authority string +} + +type clientMetrics struct { + // "grpc.client.attempt.started" + attemptStarted metric.Int64Counter + // "grpc.client.attempt.duration" + attemptDuration metric.Float64Histogram + // "grpc.client.attempt.sent_total_compressed_message_size" + attemptSentTotalCompressedMessageSize metric.Int64Histogram + // "grpc.client.attempt.rcvd_total_compressed_message_size" + attemptRcvdTotalCompressedMessageSize metric.Int64Histogram + + // "grpc.client.call.duration" + callDuration metric.Float64Histogram +} + +type serverMetrics struct { + // "grpc.server.call.started" + callStarted metric.Int64Counter + // "grpc.server.call.sent_total_compressed_message_size" + callSentTotalCompressedMessageSize metric.Int64Histogram + // "grpc.server.call.rcvd_total_compressed_message_size" + callRcvdTotalCompressedMessageSize metric.Int64Histogram + // "grpc.server.call.duration" + callDuration metric.Float64Histogram +} + +func createInt64Counter(setOfMetrics map[Metric]bool, metricName Metric, meter metric.Meter, options ...metric.Int64CounterOption) metric.Int64Counter { + if _, ok := setOfMetrics[metricName]; !ok { + return nil + } + ret, err := meter.Int64Counter(string(metricName), options...) + if err != nil { + logger.Errorf("failed to register metric \"%v\", will not record", metricName) + return nil + } + return ret +} + +func createInt64Histogram(setOfMetrics map[Metric]bool, metricName Metric, meter metric.Meter, options ...metric.Int64HistogramOption) metric.Int64Histogram { + if _, ok := setOfMetrics[metricName]; !ok { + return nil + } + ret, err := meter.Int64Histogram(string(metricName), options...) + if err != nil { + logger.Errorf("failed to register metric \"%v\", will not record", metricName) + return nil + } + return ret +} + +func createFloat64Histogram(setOfMetrics map[Metric]bool, metricName Metric, meter metric.Meter, options ...metric.Float64HistogramOption) metric.Float64Histogram { + if _, ok := setOfMetrics[metricName]; !ok { + return nil + } + ret, err := meter.Float64Histogram(string(metricName), options...) + if err != nil { + logger.Errorf("failed to register metric \"%v\", will not record", metricName) + return nil + } + return ret +} + +// Users of this component should use these bucket boundaries as part of their +// SDK MeterProvider passed in. This component sends this as "advice" to the +// API, which works, however this stability is not guaranteed, so for safety the +// SDK Meter Provider provided should set these bounds for corresponding +// metrics. +var ( + // DefaultLatencyBounds are the default bounds for latency metrics. + DefaultLatencyBounds = []float64{0, 0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0008, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1, 2, 5, 10, 20, 50, 100} // provide "advice" through API, SDK should set this too + // DefaultSizeBounds are the default bounds for metrics which record size. + DefaultSizeBounds = []float64{0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296} + // DefaultMetrics are the default metrics provided by this module. + DefaultMetrics = NewMetrics(ClientAttemptStarted, ClientAttemptDuration, ClientAttemptSentCompressedTotalMessageSize, ClientAttemptRcvdCompressedTotalMessageSize, ClientCallDuration, ServerCallStarted, ServerCallSentCompressedTotalMessageSize, ServerCallRcvdCompressedTotalMessageSize, ServerCallDuration) +) diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go new file mode 100644 index 00000000000..e7b6011173b --- /dev/null +++ b/stats/opentelemetry/server_metrics.go @@ -0,0 +1,164 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opentelemetry + +import ( + "context" + "sync/atomic" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type serverStatsHandler struct { + o Options + + serverMetrics serverMetrics +} + +func (ssh *serverStatsHandler) initializeMetrics() { + // Will set no metrics to record, logically making this stats handler a + // no-op. + if ssh.o.MetricsOptions.MeterProvider == nil { + return + } + + meter := ssh.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version) + if meter == nil { + return + } + setOfMetrics := ssh.o.MetricsOptions.Metrics.metrics + + serverMetrics := serverMetrics{} + serverMetrics.callStarted = createInt64Counter(setOfMetrics, "grpc.server.call.started", meter, metric.WithUnit("call"), metric.WithDescription("Number of server calls started.")) + serverMetrics.callSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.server.call.sent_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes sent per server call."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + serverMetrics.callRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.server.call.rcvd_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes received per server call."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) + serverMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.server.call.duration", meter, metric.WithUnit("s"), metric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + + ssh.serverMetrics = serverMetrics +} + +// TagConn exists to satisfy stats.Handler. +func (ssh *serverStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} + +// HandleConn exists to satisfy stats.Handler. +func (ssh *serverStatsHandler) HandleConn(context.Context, stats.ConnStats) {} + +// TagRPC implements per RPC context management. +func (ssh *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + method := info.FullMethodName + if ssh.o.MetricsOptions.MethodAttributeFilter != nil { + if !ssh.o.MetricsOptions.MethodAttributeFilter(method) { + method = "other" + } + } + server := internal.ServerFromContext.(func(context.Context) *grpc.Server)(ctx) + if server == nil { // Shouldn't happen, defensive programming. + logger.Error("ctx passed into server side stats handler has no grpc server ref") + method = "other" + } else { + isRegisteredMethod := internal.IsRegisteredMethod.(func(*grpc.Server, string) bool) + if !isRegisteredMethod(server, method) { + method = "other" + } + } + + mi := &metricsInfo{ + startTime: time.Now(), + method: removeLeadingSlash(method), + } + ri := &rpcInfo{ + mi: mi, + } + return setRPCInfo(ctx, ri) +} + +// HandleRPC implements per RPC tracing and stats implementation. +func (ssh *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + ri := getRPCInfo(ctx) + if ri == nil { + return + } + ssh.processRPCData(ctx, rs, ri.mi) +} + +func (ssh *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, mi *metricsInfo) { + switch st := s.(type) { + case *stats.Begin, *stats.OutHeader, *stats.InTrailer, *stats.OutTrailer: + // Headers and Trailers are not relevant to the measures, as the + // measures concern number of messages and bytes for messages. This + // aligns with flow control. + case *stats.InHeader: + if ssh.serverMetrics.callStarted != nil { + ssh.serverMetrics.callStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", mi.method))) + } + case *stats.OutPayload: + atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength)) + case *stats.InPayload: + atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength)) + case *stats.End: + ssh.processRPCEnd(ctx, mi, st) + default: + // Shouldn't happen. gRPC calls into stats handler, and will never not + // be one of the types above. + logger.Errorf("Received unexpected stats type (%T) with data: %v", s, s) + } +} + +func (ssh *serverStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInfo, e *stats.End) { + latency := float64(time.Since(mi.startTime)) / float64(time.Second) + var st string + if e.Error != nil { + s, _ := status.FromError(e.Error) + st = canonicalString(s.Code()) + } else { + st = "OK" + } + serverAttributeOption := metric.WithAttributes(attribute.String("grpc.method", mi.method), attribute.String("grpc.status", st)) + + if ssh.serverMetrics.callDuration != nil { + ssh.serverMetrics.callDuration.Record(ctx, latency, serverAttributeOption) + } + if ssh.serverMetrics.callSentTotalCompressedMessageSize != nil { + ssh.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), serverAttributeOption) + } + if ssh.serverMetrics.callRcvdTotalCompressedMessageSize != nil { + ssh.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), serverAttributeOption) + } +} + +const ( + // ServerCallStarted is the number of server calls started. + ServerCallStarted Metric = "grpc.server.call.started" + // ServerCallSentCompressedTotalMessageSize is the compressed message bytes + // sent per server call. + ServerCallSentCompressedTotalMessageSize Metric = "grpc.server.call.sent_total_compressed_message_size" + // ServerCallRcvdCompressedTotalMessageSize is the compressed message bytes + // received per server call. + ServerCallRcvdCompressedTotalMessageSize Metric = "grpc.server.call.rcvd_total_compressed_message_size" + // ServerCallDuration is the end-to-end time taken to complete a call from + // server transport's perspective. + ServerCallDuration Metric = "grpc.server.call.duration" +)