-
Notifications
You must be signed in to change notification settings - Fork 289
/
Copy pathinstrument_handlers.go
191 lines (162 loc) · 6.07 KB
/
instrument_handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
// Package metric provides functions to initialize the controller specific
// collectors and hooks to measure metrics and update the relevant collectors.
package metric
import (
"context"
"net"
"strings"
"sync"
"time"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/util"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
)
/* The following methods are used to instrument handlers for gRPC server and client connections. */
// statsHandler satisfies grpc's stats.Handler interface. This helps measure the latency of grpc requests as close to the
// wire as possible, and allows us to capture error codes returned by the grpc go library which our service may
// never return, or error codes for requests that our service may never even see.
type statsHandler struct {
reqLatency prometheus.ObserverVec
}
// NewStatsHandler takes a request latency metric (prometheus.ObserverVec) and
// returns a grpc stats.Handler that updates the provided metric with the
// request latency.
func NewStatsHandler(ctx context.Context, o prometheus.ObserverVec) (*statsHandler, error) {
const op = "metric.NewStatsHandler"
if util.IsNil(o) {
return nil, errors.New(ctx, errors.InvalidParameter, op, "prometheus.ObserverVec is nil")
}
return &statsHandler{reqLatency: o}, nil
}
var _ stats.Handler = (*statsHandler)(nil)
type metricMethodNameContextKey struct{}
func (sh *statsHandler) TagRPC(ctx context.Context, i *stats.RPCTagInfo) context.Context {
return context.WithValue(ctx, metricMethodNameContextKey{}, i.FullMethodName)
}
func (sh *statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (sh *statsHandler) HandleConn(context.Context, stats.ConnStats) {
}
func (sh *statsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
switch v := s.(type) {
case *stats.End:
// Accept the ok, but ignore it. This code doesn't need to panic
// and if "fullName" is an empty string SplitMethodName will
// set service and method to "unknown".
fullName, _ := ctx.Value(metricMethodNameContextKey{}).(string)
service, method := SplitMethodName(fullName)
labels := prometheus.Labels{
LabelGrpcMethod: method,
LabelGrpcService: service,
LabelGrpcCode: StatusFromError(v.Error).Code().String(),
}
sh.reqLatency.With(labels).Observe(v.EndTime.Sub(v.BeginTime).Seconds())
}
}
type requestRecorder struct {
reqLatency prometheus.ObserverVec
labels prometheus.Labels
// measurements
start time.Time
}
// NewGrpcRequestRecorder creates a requestRecorder struct which is used to measure gRPC client request latencies.
func NewGrpcRequestRecorder(fullMethodName string, reqLatency prometheus.ObserverVec) requestRecorder {
service, method := SplitMethodName(fullMethodName)
r := requestRecorder{
reqLatency: reqLatency,
labels: prometheus.Labels{
LabelGrpcMethod: method,
LabelGrpcService: service,
},
start: time.Now(),
}
return r
}
func (r requestRecorder) Record(err error) {
r.labels[LabelGrpcCode] = StatusFromError(err).Code().String()
r.reqLatency.With(r.labels).Observe(time.Since(r.start).Seconds())
}
type connectionTrackingListener struct {
net.Listener
acceptedConns prometheus.Counter
closedConns prometheus.Counter
}
func (l *connectionTrackingListener) Accept() (net.Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
l.acceptedConns.Inc()
if c, ok := conn.(stateProvidingConn); ok {
return &connectionTrackingListenerStateConn{
stateProvidingConn: c,
closedConns: l.closedConns,
}, nil
}
return &connectionTrackingListenerConn{Conn: conn, closedConns: l.closedConns}, nil
}
// NewConnectionTrackingListener registers a new Prometheus gauge with an unique
// connection type label and wraps an existing listener to track when connections
// are accepted and closed.
// Multiple calls to Close() a listener connection will only decrement the gauge
// once. A call to Close() will decrement the gauge even if Close() errors.
func NewConnectionTrackingListener(l net.Listener, ac prometheus.Counter, cc prometheus.Counter) *connectionTrackingListener {
return &connectionTrackingListener{Listener: l, acceptedConns: ac, closedConns: cc}
}
type stateProvidingConn interface {
net.Conn
ClientState() *structpb.Struct
}
// connectionTrackingListenerStateConn wraps connections that expose
// a State structpb.Struct, for example with *protocol.Conn. This allows
// code downstream being able to get the connection's state.
type connectionTrackingListenerStateConn struct {
stateProvidingConn
dec sync.Once
closedConns prometheus.Counter
}
func (c *connectionTrackingListenerStateConn) Close() error {
c.dec.Do(func() { c.closedConns.Inc() })
return c.stateProvidingConn.Close()
}
type connectionTrackingListenerConn struct {
net.Conn
dec sync.Once
closedConns prometheus.Counter
}
func (c *connectionTrackingListenerConn) Close() error {
c.dec.Do(func() { c.closedConns.Inc() })
return c.Conn.Close()
}
// StatusFromError retrieves the *status.Status from the provided error. It'll
// attempt to unwrap the *status.Error, which is something status.FromError
// does not do.
func StatusFromError(err error) *status.Status {
if s, ok := status.FromError(err); ok {
return s
}
type gRPCStatus interface {
GRPCStatus() *status.Status
}
var unwrappedStatus gRPCStatus
if ok := errors.As(err, &unwrappedStatus); ok {
return unwrappedStatus.GRPCStatus()
}
return status.New(codes.Unknown, "Unknown Code")
}
// SplitMethodName returns the service and the method name when given the full
// method name as provided by the grpc request handler.
func SplitMethodName(fullMethodName string) (string, string) {
fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
if i := strings.Index(fullMethodName, "/"); i >= 0 {
return fullMethodName[:i], fullMethodName[i+1:]
}
return "unknown", "unknown"
}