diff --git a/interceptor/opencensus/observability_test.go b/interceptor/opencensus/observability_test.go new file mode 100644 index 00000000..89f6ba53 --- /dev/null +++ b/interceptor/opencensus/observability_test.go @@ -0,0 +1,212 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ocinterceptor_test + +import ( + "fmt" + "reflect" + "strings" + "sync" + "testing" + "time" + + "contrib.go.opencensus.io/exporter/ocagent" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "go.opencensus.io/trace" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + "github.com/census-instrumentation/opencensus-service/interceptor/opencensus" + "github.com/census-instrumentation/opencensus-service/internal" +) + +// Ensure that if we add a metrics exporter that our target metrics +// will be recorded but also with the proper tag keys and values. +// See Issue https://github.com/census-instrumentation/opencensus-service/issues/63 +func TestEnsureRecordedMetrics(t *testing.T) { + sappender := newSpanAppender() + + _, port, doneFn := ocInterceptorOnGRPCServer(t, sappender, ocinterceptor.WithSpanBufferPeriod(2*time.Millisecond)) + defer doneFn() + + // Now the opencensus-agent exporter. + oce, err := ocagent.NewExporter(ocagent.WithPort(uint16(port)), ocagent.WithInsecure()) + if err != nil { + t.Fatalf("Failed to create the ocagent-exporter: %v", err) + } + trace.RegisterExporter(oce) + defer func() { + oce.Stop() + trace.UnregisterExporter(oce) + }() + + // Now for the stats exporter + if err := view.Register(internal.AllViews...); err != nil { + t.Fatalf("Failed to register all views: %v", err) + } + defer view.Unregister(internal.AllViews...) + + metricsReportingPeriod := 5 * time.Millisecond + view.SetReportingPeriod(metricsReportingPeriod) + // On exit, revert the metrics reporting period. + defer view.SetReportingPeriod(60 * time.Second) + + cme := newCountMetricsExporter() + view.RegisterExporter(cme) + defer view.UnregisterExporter(cme) + + n := 20 + // Now it is time to send over some spans + // and we'll count the numbers received. + for i := 0; i < n; i++ { + now := time.Now().UTC() + oce.ExportSpan(&trace.SpanData{ + StartTime: now.Add(-10 * time.Second), + EndTime: now.Add(20 * time.Second), + SpanContext: trace.SpanContext{ + TraceID: trace.TraceID{byte(0x20 + i), 0x4E, 0x4D, 0x4C, 0x4B, 0x4A, 0x49, 0x48, 0x47, 0x46, 0x45, 0x44, 0x43, 0x42, 0x41}, + SpanID: trace.SpanID{0x7F, 0x7E, 0x7D, 0x7C, 0x7B, 0x7A, 0x79, 0x78}, + TraceOptions: trace.TraceOptions(i & 0x01), + }, + ParentSpanID: trace.SpanID{byte(0x01 + i), 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37}, + Name: fmt.Sprintf("Span-%d", i), + Status: trace.Status{Code: trace.StatusCodeInternal, Message: "Blocked by firewall"}, + }) + } + + // Give them some time to be exported. + // say n * metricsReportingPeriod + <-time.After(time.Duration(n) * metricsReportingPeriod) + oce.Flush() + + checkCountMetricsExporterResults(t, cme, n, 1) +} + +func TestEnsureRecordedMetrics_zeroLengthSpansSender(t *testing.T) { + t.Skipf("Currently disabled, enable this test when the following are fixed:\nIssue %s\nPR %s", + "https://github.com/census-instrumentation/opencensus-go/issues/862", + "https://github.com/census-instrumentation/opencensus-go/pull/922", + ) + sappender := newSpanAppender() + + _, port, doneFn := ocInterceptorOnGRPCServer(t, sappender, ocinterceptor.WithSpanBufferPeriod(2*time.Millisecond)) + defer doneFn() + + // Now the opencensus-agent exporter. + oce, err := ocagent.NewExporter(ocagent.WithPort(uint16(port)), ocagent.WithInsecure()) + if err != nil { + t.Fatalf("Failed to create the ocagent-exporter: %v", err) + } + trace.RegisterExporter(oce) + defer func() { + oce.Stop() + trace.UnregisterExporter(oce) + }() + + // Now for the stats exporter + if err := view.Register(internal.AllViews...); err != nil { + t.Fatalf("Failed to register all views: %v", err) + } + defer view.Unregister(internal.AllViews...) + + metricsReportingPeriod := 10 * time.Millisecond + view.SetReportingPeriod(metricsReportingPeriod) + // On exit, revert the metrics reporting period. + defer view.SetReportingPeriod(60 * time.Second) + + cme := newCountMetricsExporter() + view.RegisterExporter(cme) + defer view.UnregisterExporter(cme) + + n := 20 + // Now for the traceExporter that sends 0 length spans + traceSvcClient, traceSvcDoneFn, err := makeTraceServiceClient(port) + if err != nil { + t.Fatalf("Failed to create the trace service client: %v", err) + } + defer traceSvcDoneFn() + for i := 0; i <= n; i++ { + _ = traceSvcClient.Send(&agenttracepb.ExportTraceServiceRequest{Spans: nil, Node: &commonpb.Node{}}) + } + <-time.After(time.Duration(n) * metricsReportingPeriod) + checkCountMetricsExporterResults(t, cme, n, 0) +} + +func checkCountMetricsExporterResults(t *testing.T, cme *countMetricsExporter, n int, wantAllCountsToBe int64) { + cme.mu.Lock() + defer cme.mu.Unlock() + + // The only tags that we are expecting are "opencensus_interceptor": "opencensus" * n + wantTagKey, _ := tag.NewKey("opencensus_interceptor") + valuesPlusBlank := strings.Split(strings.Repeat("opencensus,opencensus,", n/2), ",") + wantValues := valuesPlusBlank[:len(valuesPlusBlank)-1] + wantTags := map[tag.Key][]string{ + wantTagKey: wantValues, + } + + gotTags := cme.tags + if !reflect.DeepEqual(gotTags, wantTags) { + t.Errorf("\nGotTags:\n\t%#v\n\nWantTags:\n\t%#v\n", gotTags, wantTags) + } + + // The only data types we are expecting are: + // * DistributionData + for key, aggregation := range cme.data { + switch agg := aggregation.(type) { + case *view.DistributionData: + if g, w := agg.Count, int64(1); g != w { + t.Errorf("Data point #%d GotCount %d Want %d", key, g, w) + } + default: + t.Errorf("Data point #%d Got %T want %T", key, agg, (*view.DistributionData)(nil)) + } + } +} + +type countMetricsExporter struct { + mu sync.Mutex + tags map[tag.Key][]string + data map[int]view.AggregationData +} + +func newCountMetricsExporter() *countMetricsExporter { + return &countMetricsExporter{ + tags: make(map[tag.Key][]string), + data: make(map[int]view.AggregationData), + } +} + +func (cme *countMetricsExporter) clear() { + cme.mu.Lock() + defer cme.mu.Unlock() + + cme.data = make(map[int]view.AggregationData) + cme.tags = make(map[tag.Key][]string) +} + +var _ view.Exporter = (*countMetricsExporter)(nil) + +func (cme *countMetricsExporter) ExportView(vd *view.Data) { + cme.mu.Lock() + defer cme.mu.Unlock() + + for _, row := range vd.Rows { + cme.data[len(cme.data)] = row.Data + for _, tag_ := range row.Tags { + cme.tags[tag_.Key] = append(cme.tags[tag_.Key], tag_.Value) + } + } +} diff --git a/interceptor/opencensus/opencensus.go b/interceptor/opencensus/opencensus.go index 1184930f..f187d246 100644 --- a/interceptor/opencensus/opencensus.go +++ b/interceptor/opencensus/opencensus.go @@ -15,6 +15,7 @@ package ocinterceptor import ( + "context" "errors" "time" @@ -23,6 +24,7 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/spanreceiver" ) @@ -88,8 +90,20 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err return errTraceExportProtocolViolation } - var lastNonNilNode *commonpb.Node + spansMetricsFn := internal.NewReceivedSpansRecorder(context.Background(), "opencensus") + + processReceivedSpans := func(ni *commonpb.Node, spans []*tracepb.Span) { + // Firstly, we'll add them to the bundler. + if len(recv.Spans) > 0 { + bundlerPayload := &spansAndNode{node: ni, spans: recv.Spans} + traceBundler.Add(bundlerPayload, len(bundlerPayload.spans)) + } + // We MUST Unconditionally record metrics from this reception. + spansMetricsFn(ni, recv.Spans) + } + + var lastNonNilNode *commonpb.Node // Now that we've got the first message with a Node, we can start to receive streamed up spans. for { // If a Node has been sent from downstream, save and use it. @@ -97,11 +111,7 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err lastNonNilNode = recv.Node } - // Otherwise add them to the bundler. - if len(recv.Spans) > 0 { - bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans} - traceBundler.Add(bundlerPayload, len(bundlerPayload.spans)) - } + processReceivedSpans(lastNonNilNode, recv.Spans) recv, err = tes.Recv() if err != nil { diff --git a/internal/observability.go b/internal/observability.go new file mode 100644 index 00000000..ccfe113e --- /dev/null +++ b/internal/observability.go @@ -0,0 +1,60 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +// This file contains helpers that are useful to add observability +// with metrics and tracing using OpenCensus to the various pieces +// of the service. + +import ( + "context" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" +) + +var tagKeyInterceptorName, _ = tag.NewKey("opencensus_interceptor") +var mReceivedSpans = stats.Int64("oc.io/interceptor/received_spans", "Counts the number of spans received by the interceptor", "1") + +var ViewReceivedSpansInterceptor = &view.View{ + Name: "oc.io/interceptor/received_spans", + Description: "The number of spans received by the interceptor", + Measure: mReceivedSpans, + Aggregation: view.Distribution( + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 35, 40, 45, 50, 60, 70, 80, 90, + 100, 150, 200, 250, 300, 450, 500, 600, 700, 800, 900, 1000, 1200, 1400, 1600, 1800, 2000, + ), + TagKeys: []tag.Key{tagKeyInterceptorName}, +} + +var AllViews = []*view.View{ + ViewReceivedSpansInterceptor, +} + +// NewReceivedSpansRecorder creates a function that uses a context created +// from the name of the interceptor to record the number of the spans received +// by the interceptor. +func NewReceivedSpansRecorder(parentCtx context.Context, interceptorName string) func(*commonpb.Node, []*tracepb.Span) { + ctx, _ := tag.New(parentCtx, tag.Upsert(tagKeyInterceptorName, interceptorName)) + // TODO: (@odeke-em) perhaps also record information from the node? + + return func(ni *commonpb.Node, spans []*tracepb.Span) { + stats.Record(ctx, mReceivedSpans.M(int64(len(spans)))) + } +}