Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
Pass streaming context when recording spans
Browse files Browse the repository at this point in the history
Addressing feedback from review.

* Added a TODO to investigate
if/when a streamer's Context can change
* Fixed unnecessary capitalization
  • Loading branch information
odeke-em committed Oct 7, 2018
1 parent 1fe2252 commit 393b819
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
14 changes: 9 additions & 5 deletions interceptor/opencensus/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,17 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err
return errTraceExportProtocolViolation
}

spansMetricsFn := internal.NewReceivedSpansRecorder(context.Background(), "opencensus")
spansMetricsFn := internal.NewReceivedSpansRecorder("opencensus")

processReceivedSpans := func(ni *commonpb.Node, spans []*tracepb.Span) {
processReceivedSpans := func(ctx context.Context, ni *commonpb.Node, spans []*tracepb.Span) {
// Firstly, we'll add them to the bundler.
if len(recv.Spans) > 0 {
bundlerPayload := &spansAndNode{node: ni, spans: recv.Spans}
traceBundler.Add(bundlerPayload, len(bundlerPayload.spans))
}

// We MUST Unconditionally record metrics from this reception.
spansMetricsFn(ni, recv.Spans)
// We MUST unconditionally record metrics from this reception.
spansMetricsFn(ctx, ni, recv.Spans)
}

var lastNonNilNode *commonpb.Node
Expand All @@ -111,7 +111,11 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err
lastNonNilNode = recv.Node
}

processReceivedSpans(lastNonNilNode, recv.Spans)
// Note: In normal operations, the context of a streaming RPC usually doesn't
// change but we haven't examined all the corner cases for gRPC streaming
// hence we'll always pass in the context, in case the underlying implementation
// changes a detail. TODO (@odeke-em) investigate if streamer.Context() can change.
processReceivedSpans(tes.Context(), lastNonNilNode, recv.Spans)

recv, err = tes.Recv()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ var AllViews = []*view.View{
// NewReceivedSpansRecorder creates a function that uses a context created
// from the name of the interceptor to record the number of the spans received
// by the interceptor.
func NewReceivedSpansRecorder(parentCtx context.Context, interceptorName string) func(*commonpb.Node, []*tracepb.Span) {
ctx, _ := tag.New(parentCtx, tag.Upsert(tagKeyInterceptorName, interceptorName))
// TODO: (@odeke-em) perhaps also record information from the node?
func NewReceivedSpansRecorder(interceptorName string) func(context.Context, *commonpb.Node, []*tracepb.Span) {
return func(parentCtx context.Context, ni *commonpb.Node, spans []*tracepb.Span) {
ctx, _ := tag.New(parentCtx, tag.Upsert(tagKeyInterceptorName, interceptorName))
// TODO: (@odeke-em) perhaps also record information from the node?

return func(ni *commonpb.Node, spans []*tracepb.Span) {
stats.Record(ctx, mReceivedSpans.M(int64(len(spans))))
}
}

0 comments on commit 393b819

Please sign in to comment.