-
Notifications
You must be signed in to change notification settings - Fork 436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
contrib/google.golang.org/grpc: streaming interceptor #277
Conversation
Thanks for starting on this so quickly! Some context - we currently are using streaming RPCs just to split up large unary messages (eg. for a file upload) so we might prefer to have the entire duration of a stream show as a single trace. But I could see a use case for one trace per send/recv too, eg. for streaming notifications from one service to another. Unfortunately, I'm not sure what the best strategy is. |
@mbyio thank you for the feedback! The current implementation will trace both the main stream request and the messages sent back and forth. In this example there were 10 sends/recvs: One solution could be an interceptor option to change what is traced for the streaming interceptor (currently our only interceptor option for gRPC is the service name). The default would still be full tracing, but the send/recv trace could be disabled, or vice-versa the outer start-stream trace. (I think there are circumstances where streaming could be used for a very long time, as basically a long-lived connection, and a trace for the entire duration isn't particularly useful) This is not hard to add so I could look into it tomorrow. |
Oh awesome! Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this. It will definitely come in handy. I left some small nits but am happy to do a more thorough review as soon as you take off the WIP label.
It's looking really good.
return err | ||
} | ||
|
||
// StreamClientInterceptor will trace streaming requests for the given gRPC client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// StreamClientInterceptor returns a grpc.StreamClientInterceptor which will trace client
// streams using the given set of options.
I think this would be more correct because there isn't a "given gRPC client" here, in the context of this function signature.
} | ||
} | ||
|
||
// UnaryClientInterceptor will add tracing to a gprc client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do something similar here?
// UnaryClientInterceptor returns a unary client interceptor which will trace requests using
// the given set of options.
} | ||
} | ||
|
||
// startTraceClientRequest is used by both the unary and streaming interceptors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case you've not noticed yet I'm picky with docs 😄 We should say what the function does, not where it is used, so maybe:
// doClientRequest performs the given client requests, returning the wrapping span.
It might also be more Go-like to call it simply doClientRequest
(like http.Do
). The context of this whole library is tracing so we shouldn't necessarily use the word everywhere.
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" | ||
) | ||
|
||
type tracingClientStream struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's minimize the use of the word "tracing" and just call it clientStream
. We did the same in all other integrations. Note that this package is usually imported as grpctrace
based on our examples. Possibly it should be called that but that's a different story. In that context, thinking grpctrace.clientStream
is better than grpctrace.tracingClientStream
(even if this is an unexported structure).
func startTraceClientRequest( | ||
ctx context.Context, method string, serviceName string, opts []grpc.CallOption, | ||
handler func(ctx context.Context, opts []grpc.CallOption) error, | ||
) (tracer.Span, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is confusing and possibly not the best design decision (happy to hear suggestions), but let's try to use ddtrace.Span
because that's the original interface. tracer.Span
is just an alias I've used to get better results from godoc
.
if !ok { | ||
md = metadata.MD{} | ||
} | ||
_ = tracer.Inject(span.Context(), grpcutil.MDCarrier(md)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should either find a way to handle an error or completely remove the _ =
part as it doesn't serve any purpose (unless it's a reminder for you to handle it)
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" | ||
) | ||
|
||
type tracingServerStream struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do the same here and just use serverStream
I think this solution sounds like the way to go. Start full fledged and then be able to narrow it down or fine tune it based on options. This is one of those cases where having what Opentracing calls "logging" or Opencensus calls "annotations" would come in handy. |
return nil, err | ||
} | ||
go func() { | ||
<-stream.Context().Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've expressed your concern in the PR description about leaking here. This is indeed tricky. I've looked at the code myself and it seems that this context is cancelled only when the stream closes with a nil
error.
This is a very brute idea but we could possibly introduce a timeout here as a temporary workaround. Although it's hard to say how long that timeout should be. In the case where there is a MethodConfig
set with a timeout, then we have no problem here because the context will always be cancelled, but that's not something we can rely on. Not sure what else we can do to ensure we don't leak here, WDYT?
We should also handle the error from the returned context, whether we use a timeout or not.
ctx, _ := context.WithTimeout(stream.Context(), ???)
<-ctx.Done()
span.Finish(tracer.WithError(ctx.Err()))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the gRPC code does:
select {
case <-cc.ctx.Done():
cs.finish(ErrClientConnClosing)
case <-ctx.Done():
cs.finish(toRPCErr(ctx.Err()))
}
And finish
does:
cs.cancel()
And the docs say:
// To ensure resources are not leaked due to the stream returned, one of the following
// actions must be performed:
// 1. Call Close on the ClientConn.
// 2. Cancel the context provided.
// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
// client-streaming RPC, for instance, might use the helper function
// CloseAndRecv (note that CloseSend does not Recv, therefore is not
// guaranteed to release all resources).
// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
So I think we will only leak if gRPC was already going to leak.
span.SetTag("grpc.code", grpc.Code(err).String()) | ||
span.Finish(tracer.WithError(err)) | ||
return err | ||
// withStreamError does a tracer.WithError but ignores EOF and Canceled errors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// withStreamError returns a tracer.WithError finish option, disregarding OK, EOF and Canceled errors.
916080a
to
c71eeae
Compare
@gbbr when you get a chance, I made the following changes:
The new StreamMode can be used to adjust what is traced for streaming:
It defaults to tracing both calls and messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Caleb. Just a final set of thoughts. Otherwise LGTM
|
||
// Tags used for gRPC | ||
const ( | ||
TagMethod = "grpc.method" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these need to be exported? They aren't used anywhere else.
} | ||
|
||
t.Run("All", func(t *testing.T) { | ||
mt.Reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if sharing the mocktracer is a good idea. Don't these sub-tests run concurrently? That could cause trouble. I think we should have a separate mocktracer for each test.
type interceptorConfig struct{ serviceName string } | ||
// The StreamMode is used to control what is traced in the | ||
// StreamClientInterceptor and StreamServerInterceptor | ||
type StreamMode byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I really love this pattern, I feel it's overkill here. I find it works well when there are many options and many combinations of them, while in this case we have only two:
- Trace calls
- Trace messages
The third one is implied when both are set. I'd dumb it down completely from an API standpoint, getting rid of bitwise operations and simply having options such as grpctrace.WithStreamCalls
and grpctrace.WithStreamMessages
. Much simpler IMO, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for your patience, and for implementing this. It looks great!
Feel free to (squash &) merge whenever you like. One last comment I’d make is that generally I’ve tried my best to finish all (public) documentation sentences with a period. It’s a comment I got myself on many PRs using Go.
- handle tracer.Inject error - allow control over how much is traced when streaming - extract setSpanTargetFromPeer method and add target tags for send/recv messages and streaming calls - extract a injectSpanIntoContext method - create some constants for tag names - create a more exhaustive streaming test
06dc214
to
adc86d4
Compare
For: #272
This is an implementation of the streaming interceptor for gRPC. Much of the code was reused from the existing unary interceptors.
A few questions:
stream.Context.Done()
should signal that the span should finish, but I'm not 100% sure. We should probably do a more exhaustive test on this so we're not leaking goroutines.If the number of traces and tagging makes sense, I will add more unit tests.