forked from newrelic/go-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
nrgrpc_client.go
135 lines (123 loc) · 4.42 KB
/
nrgrpc_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright 2020 New Relic Corporation. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package nrgrpc
import (
"context"
"io"
"net/url"
"strings"
newrelic "github.com/newrelic/go-agent"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func getURL(method, target string) *url.URL {
var host string
// target can be anything from
// https://github.com/grpc/grpc/blob/master/doc/naming.md
// see https://godoc.org/google.golang.org/grpc#DialContext
if strings.HasPrefix(target, "unix:") {
host = "localhost"
} else {
host = strings.TrimPrefix(target, "dns:///")
}
return &url.URL{
Scheme: "grpc",
Host: host,
Path: method,
}
}
// startClientSegment starts an ExternalSegment and adds Distributed Trace
// headers to the outgoing grpc metadata in the context.
func startClientSegment(ctx context.Context, method, target string) (*newrelic.ExternalSegment, context.Context) {
var seg *newrelic.ExternalSegment
if txn := newrelic.FromContext(ctx); nil != txn {
seg = newrelic.StartExternalSegment(txn, nil)
method = strings.TrimPrefix(method, "/")
seg.Host = getURL(method, target).Host
seg.Library = "gRPC"
seg.Procedure = method
payload := txn.CreateDistributedTracePayload()
if txt := payload.Text(); "" != txt {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
}
md.Set(newrelic.DistributedTracePayloadHeader, txt)
ctx = metadata.NewOutgoingContext(ctx, md)
}
}
return seg, ctx
}
// UnaryClientInterceptor instruments client unary RPCs. This interceptor
// records each unary call with an external segment. Using it requires two steps:
//
// 1. Use this function with grpc.WithChainUnaryInterceptor or
// grpc.WithUnaryInterceptor when creating a grpc.ClientConn. Example:
//
// conn, err := grpc.Dial(
// "localhost:8080",
// grpc.WithUnaryInterceptor(nrgrpc.UnaryClientInterceptor),
// grpc.WithStreamInterceptor(nrgrpc.StreamClientInterceptor),
// )
//
// 2. Ensure that calls made with this grpc.ClientConn are done with a context
// which contains a newrelic.Transaction.
//
// Full example:
// https://github.com/newrelic/go-agent/blob/master/_integrations/nrgrpc/example/client/client.go
//
// This interceptor only instruments unary calls. You must use both
// UnaryClientInterceptor and StreamClientInterceptor to instrument unary and
// streaming calls. These interceptors add headers to the call metadata if
// distributed tracing is enabled.
func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
seg, ctx := startClientSegment(ctx, method, cc.Target())
defer seg.End()
return invoker(ctx, method, req, reply, cc, opts...)
}
type wrappedClientStream struct {
grpc.ClientStream
segment *newrelic.ExternalSegment
isUnaryServer bool
}
func (s wrappedClientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
if err == io.EOF || s.isUnaryServer {
s.segment.End()
}
return err
}
// StreamClientInterceptor instruments client streaming RPCs. This interceptor
// records streaming each call with an external segment. Using it requires two steps:
//
// 1. Use this function with grpc.WithChainStreamInterceptor or
// grpc.WithStreamInterceptor when creating a grpc.ClientConn. Example:
//
// conn, err := grpc.Dial(
// "localhost:8080",
// grpc.WithUnaryInterceptor(nrgrpc.UnaryClientInterceptor),
// grpc.WithStreamInterceptor(nrgrpc.StreamClientInterceptor),
// )
//
// 2. Ensure that calls made with this grpc.ClientConn are done with a context
// which contains a newrelic.Transaction.
//
// Full example:
// https://github.com/newrelic/go-agent/blob/master/_integrations/nrgrpc/example/client/client.go
//
// This interceptor only instruments streaming calls. You must use both
// UnaryClientInterceptor and StreamClientInterceptor to instrument unary and
// streaming calls. These interceptors add headers to the call metadata if
// distributed tracing is enabled.
func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
seg, ctx := startClientSegment(ctx, method, cc.Target())
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return s, err
}
return wrappedClientStream{
segment: seg,
ClientStream: s,
isUnaryServer: !desc.ServerStreams,
}, nil
}