diff --git a/interceptor/opencensus/opencensus.go b/interceptor/opencensus/opencensus.go index f187d246..e25851d8 100644 --- a/interceptor/opencensus/opencensus.go +++ b/interceptor/opencensus/opencensus.go @@ -90,17 +90,17 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err return errTraceExportProtocolViolation } - spansMetricsFn := internal.NewReceivedSpansRecorder(context.Background(), "opencensus") + spansMetricsFn := internal.NewReceivedSpansRecorder("opencensus") - processReceivedSpans := func(ni *commonpb.Node, spans []*tracepb.Span) { + processReceivedSpans := func(ctx context.Context, ni *commonpb.Node, spans []*tracepb.Span) { // Firstly, we'll add them to the bundler. if len(recv.Spans) > 0 { bundlerPayload := &spansAndNode{node: ni, spans: recv.Spans} traceBundler.Add(bundlerPayload, len(bundlerPayload.spans)) } - // We MUST Unconditionally record metrics from this reception. - spansMetricsFn(ni, recv.Spans) + // We MUST unconditionally record metrics from this reception. + spansMetricsFn(ctx, ni, recv.Spans) } var lastNonNilNode *commonpb.Node @@ -111,7 +111,11 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err lastNonNilNode = recv.Node } - processReceivedSpans(lastNonNilNode, recv.Spans) + // Note: In normal operations, the context of a streaming RPC usually doesn't + // change but we haven't examined all the corner cases for gRPC streaming + // hence we'll always pass in the context, in case the underlying implementation + // changes a detail. TODO (@odeke-em) investigate if streamer.Context() can change. + processReceivedSpans(tes.Context(), lastNonNilNode, recv.Spans) recv, err = tes.Recv() if err != nil { diff --git a/internal/observability.go b/internal/observability.go index ffd469c5..3e70c214 100644 --- a/internal/observability.go +++ b/internal/observability.go @@ -50,11 +50,11 @@ var AllViews = []*view.View{ // NewReceivedSpansRecorder creates a function that uses a context created // from the name of the interceptor to record the number of the spans received // by the interceptor. -func NewReceivedSpansRecorder(parentCtx context.Context, interceptorName string) func(*commonpb.Node, []*tracepb.Span) { - ctx, _ := tag.New(parentCtx, tag.Upsert(tagKeyInterceptorName, interceptorName)) - // TODO: (@odeke-em) perhaps also record information from the node? +func NewReceivedSpansRecorder(interceptorName string) func(context.Context, *commonpb.Node, []*tracepb.Span) { + return func(parentCtx context.Context, ni *commonpb.Node, spans []*tracepb.Span) { + ctx, _ := tag.New(parentCtx, tag.Upsert(tagKeyInterceptorName, interceptorName)) + // TODO: (@odeke-em) perhaps also record information from the node? - return func(ni *commonpb.Node, spans []*tracepb.Span) { stats.Record(ctx, mReceivedSpans.M(int64(len(spans)))) } }