/
grpc_monitoring.go
195 lines (171 loc) · 7.07 KB
/
grpc_monitoring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------
package diagnostics
import (
"context"
"time"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
diag_utils "github.com/dapr/dapr/pkg/diagnostics/utils"
)
// This implementation is inspired by
// https://github.com/census-instrumentation/opencensus-go/tree/master/plugin/ocgrpc
// Tag key definitions for http requests.
var (
KeyServerMethod = tag.MustNewKey("grpc_server_method")
KeyServerStatus = tag.MustNewKey("grpc_server_status")
KeyClientMethod = tag.MustNewKey("grpc_client_method")
KeyClientStatus = tag.MustNewKey("grpc_client_status")
)
type grpcMetrics struct {
serverReceivedBytes *stats.Int64Measure
serverSentBytes *stats.Int64Measure
serverLatency *stats.Float64Measure
serverCompletedRpcs *stats.Int64Measure
clientSentBytes *stats.Int64Measure
clientReceivedBytes *stats.Int64Measure
clientRoundtripLatency *stats.Float64Measure
clientCompletedRpcs *stats.Int64Measure
appID string
enabled bool
}
func newGRPCMetrics() *grpcMetrics {
return &grpcMetrics{
serverReceivedBytes: stats.Int64(
"grpc.io/server/received_bytes_per_rpc",
"Total bytes received across all messages per RPC.",
stats.UnitBytes),
serverSentBytes: stats.Int64(
"grpc.io/server/sent_bytes_per_rpc",
"Total bytes sent in across all response messages per RPC.",
stats.UnitBytes),
serverLatency: stats.Float64(
"grpc.io/server/server_latency",
"Time between first byte of request received to last byte of response sent, or terminal error.",
stats.UnitMilliseconds),
serverCompletedRpcs: stats.Int64(
"grpc.io/server/completed_rpcs",
"Distribution of bytes sent per RPC, by method.",
stats.UnitDimensionless),
clientSentBytes: stats.Int64(
"grpc.io/client/sent_bytes_per_rpc",
"Total bytes sent across all request messages per RPC.",
stats.UnitBytes),
clientReceivedBytes: stats.Int64(
"grpc.io/client/received_bytes_per_rpc",
"Total bytes received across all response messages per RPC.",
stats.UnitBytes),
clientRoundtripLatency: stats.Float64(
"grpc.io/client/roundtrip_latency",
"Time between first byte of request sent to last byte of response received, or terminal error.",
stats.UnitMilliseconds),
clientCompletedRpcs: stats.Int64(
"grpc.io/client/completed_rpcs",
"Count of RPCs by method and status.",
stats.UnitDimensionless),
enabled: false,
}
}
func (g *grpcMetrics) Init(appID string) error {
g.appID = appID
g.enabled = true
return view.Register(
diag_utils.NewMeasureView(g.serverReceivedBytes, []tag.Key{appIDKey, KeyServerMethod}, defaultSizeDistribution),
diag_utils.NewMeasureView(g.serverSentBytes, []tag.Key{appIDKey, KeyServerMethod}, defaultSizeDistribution),
diag_utils.NewMeasureView(g.serverLatency, []tag.Key{appIDKey, KeyServerMethod}, defaultLatencyDistribution),
diag_utils.NewMeasureView(g.serverCompletedRpcs, []tag.Key{appIDKey, KeyServerMethod, KeyServerStatus}, view.Count()),
diag_utils.NewMeasureView(g.clientSentBytes, []tag.Key{appIDKey, KeyClientMethod}, defaultSizeDistribution),
diag_utils.NewMeasureView(g.clientReceivedBytes, []tag.Key{appIDKey, KeyClientMethod}, defaultSizeDistribution),
diag_utils.NewMeasureView(g.clientRoundtripLatency, []tag.Key{appIDKey, KeyClientMethod, KeyClientStatus}, defaultLatencyDistribution),
diag_utils.NewMeasureView(g.clientCompletedRpcs, []tag.Key{appIDKey, KeyClientMethod, KeyClientStatus}, view.Count()),
)
}
func (g *grpcMetrics) IsEnabled() bool {
return g.enabled
}
func (g *grpcMetrics) ServerRequestReceived(ctx context.Context, method string, contentSize int64) time.Time {
if g.enabled {
stats.RecordWithTags(
ctx,
diag_utils.WithTags(appIDKey, g.appID, KeyServerMethod, method),
g.serverReceivedBytes.M(contentSize))
}
return time.Now()
}
func (g *grpcMetrics) ServerRequestSent(ctx context.Context, method, status string, contentSize int64, start time.Time) {
if g.enabled {
elapsed := float64(time.Since(start) / time.Millisecond)
stats.RecordWithTags(
ctx,
diag_utils.WithTags(appIDKey, g.appID, KeyServerMethod, method, KeyServerStatus, status),
g.serverCompletedRpcs.M(1))
stats.RecordWithTags(
ctx,
diag_utils.WithTags(appIDKey, g.appID, KeyServerMethod, method),
g.serverSentBytes.M(contentSize))
stats.RecordWithTags(
ctx,
diag_utils.WithTags(appIDKey, g.appID, KeyServerMethod, method, KeyServerStatus, status),
g.serverLatency.M(elapsed))
}
}
func (g *grpcMetrics) ClientRequestSent(ctx context.Context, method string, contentSize int64) time.Time {
if g.enabled {
stats.RecordWithTags(
ctx,
diag_utils.WithTags(appIDKey, g.appID, KeyClientMethod, method),
g.clientSentBytes.M(contentSize))
}
return time.Now()
}
func (g *grpcMetrics) ClientRequestReceived(ctx context.Context, method, status string, contentSize int64, start time.Time) {
if g.enabled {
elapsed := float64(time.Since(start) / time.Millisecond)
stats.RecordWithTags(
ctx,
diag_utils.WithTags(appIDKey, g.appID, KeyClientMethod, method, KeyClientStatus, status),
g.clientCompletedRpcs.M(1))
stats.RecordWithTags(
ctx,
diag_utils.WithTags(appIDKey, g.appID, KeyClientMethod, method, KeyClientStatus, status),
g.clientRoundtripLatency.M(elapsed))
stats.RecordWithTags(
ctx, diag_utils.WithTags(appIDKey, g.appID),
g.clientReceivedBytes.M(contentSize))
}
}
func (g *grpcMetrics) getPayloadSize(payload interface{}) int {
return proto.Size(payload.(proto.Message))
}
// UnaryServerInterceptor is a gRPC server-side interceptor for Unary RPCs.
func (g *grpcMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := g.ServerRequestReceived(ctx, info.FullMethod, int64(g.getPayloadSize(req)))
resp, err := handler(ctx, req)
size := 0
if err == nil {
size = g.getPayloadSize(resp)
}
g.ServerRequestSent(ctx, info.FullMethod, status.Code(err).String(), int64(size), start)
return resp, err
}
}
// UnaryClientInterceptor is a gRPC client-side interceptor for Unary RPCs.
func (g *grpcMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := g.ClientRequestSent(ctx, method, int64(g.getPayloadSize(req)))
err := invoker(ctx, method, req, reply, cc, opts...)
size := 0
if err == nil {
size = g.getPayloadSize(reply)
}
g.ClientRequestReceived(ctx, method, status.Code(err).String(), int64(size), start)
return err
}
}