-
Notifications
You must be signed in to change notification settings - Fork 287
/
client_util.go
239 lines (214 loc) · 7.36 KB
/
client_util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rpc
import (
"context"
"io"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/semconv"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/util/mathutils"
)
func (conn *Connection) startGC() {
// TODO 从hashing环中删除频繁失败的节点
logger.GrpcLogger.With("conn", conn.name).Debugf("start the gc connections job")
// execute the GC by fixed delay
ticker := time.NewTicker(conn.gcConnInterval)
for {
select {
case <-conn.ctx.Done():
logger.GrpcLogger.With("conn", conn.name).Info("conn close, exit gc")
return
case <-ticker.C:
removedConnCount := 0
totalNodeSize := 0
startTime := time.Now()
// TODO use anther locker, @santong
conn.rwMutex.Lock()
// range all connections and determine whether they are expired
conn.accessNodeMap.Range(func(node, accessTime interface{}) bool {
serverNode := node.(string)
totalNodeSize++
atime := accessTime.(time.Time)
if time.Since(atime) < conn.connExpireTime {
return true
}
conn.gcConn(serverNode)
removedConnCount++
return true
})
// TODO use anther locker, @santong
conn.rwMutex.Unlock()
// slow GC detected, report it with a log warning
if timeElapse := time.Since(startTime); timeElapse > conn.gcConnTimeout {
logger.GrpcLogger.With("conn", conn.name).Warnf("gc %d conns, cost: %.3f seconds", removedConnCount, timeElapse.Seconds())
}
actualTotal := 0
conn.node2ClientMap.Range(func(key, value interface{}) bool {
if value != nil {
actualTotal++
}
return true
})
logger.GrpcLogger.With("conn", conn.name).Infof("successfully gc clientConn count(%d), remainder count(%d), actualTotalConnCount(%d)",
removedConnCount, totalNodeSize-removedConnCount, actualTotal)
}
}
}
// gcConn gc keys and clients associated with server node
func (conn *Connection) gcConn(node string) {
logger.GrpcLogger.With("conn", conn.name).Infof("gc keys and clients associated with server node: %s starting", node)
value, ok := conn.node2ClientMap.Load(node)
if ok {
clientCon := value.(*grpc.ClientConn)
err := clientCon.Close()
if err == nil {
conn.node2ClientMap.Delete(node)
logger.GrpcLogger.With("conn", conn.name).Infof("success gc clientConn: %s", node)
} else {
logger.GrpcLogger.With("conn", conn.name).Warnf("failed to close clientConn: %s: %v", node, err)
}
} else {
logger.GrpcLogger.With("conn", conn.name).Warnf("server node: %s dose not found in node2ClientMap", node)
}
// gc hash keys
conn.key2NodeMap.Range(func(key, value interface{}) bool {
if value == node {
conn.key2NodeMap.Delete(key)
logger.GrpcLogger.With("conn", conn.name).Infof("success gc key: %s associated with server node %s", key, node)
}
return true
})
conn.accessNodeMap.Delete(node)
logger.GrpcLogger.With("conn", conn.name).Infof("gc keys and clients associated with server node: %s ending", node)
}
type messageType attribute.KeyValue
var (
messageSent = messageType(attribute.Key("message.type").String("request"))
messageReceived = messageType(attribute.Key("message.type").String("response"))
)
func (m messageType) Event(ctx context.Context, id int, message interface{}) {
span := trace.SpanFromContext(ctx)
if p, ok := message.(proto.Message); ok {
content, _ := proto.Marshal(p)
span.AddEvent("message", trace.WithAttributes(
attribute.KeyValue(m),
semconv.RPCMessageIDKey.Int(id),
semconv.RPCMessageUncompressedSizeKey.String(string(content)),
))
}
}
type wrappedClientStream struct {
grpc.ClientStream
method string
cc *grpc.ClientConn
receivedMessageID int
sentMessageID int
}
func (w *wrappedClientStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m)
if err != nil && err != io.EOF {
err = convertClientError(err)
logger.GrpcLogger.Errorf("client receive a message: %T error: %v for method: %s target: %s connState: %s", m, err, w.method, w.cc.Target(), w.cc.GetState().String())
}
if err == nil {
w.receivedMessageID++
messageReceived.Event(w.Context(), w.receivedMessageID, m)
}
return err
}
func (w *wrappedClientStream) SendMsg(m interface{}) error {
err := w.ClientStream.SendMsg(m)
w.sentMessageID++
messageSent.Event(w.Context(), w.sentMessageID, m)
if err != nil && err != io.EOF {
logger.GrpcLogger.Errorf("client send a message: %T error: %v for method: %s target: %s connState: %s", m, err, w.method, w.cc.Target(), w.cc.GetState().String())
}
return err
}
func streamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
err = convertClientError(err)
logger.GrpcLogger.Errorf("create client stream error: %v for method: %s target: %s connState: %s", err, method, cc.Target(), cc.GetState().String())
return nil, err
}
return &wrappedClientStream{
ClientStream: s,
method: method,
cc: cc,
}, nil
}
func unaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
messageSent.Event(ctx, 1, req)
err := invoker(ctx, method, req, reply, cc, opts...)
messageReceived.Event(ctx, 1, reply)
if err != nil {
err = convertClientError(err)
logger.GrpcLogger.Errorf("do unary client error: %v for method: %s target: %s connState: %s", err, method, cc.Target(), cc.GetState().String())
}
return err
}
func convertClientError(err error) error {
if err == nil {
return nil
}
s := status.Convert(err)
for _, d := range s.Details() {
switch internal := d.(type) {
case *base.GrpcDfError:
return &dferrors.DfError{
Code: internal.Code,
Message: internal.Message,
}
}
}
// grpc framework error
return err
}
type RetryMeta struct {
StreamTimes int // times of replacing stream on the current client
MaxAttempts int // limit times for execute
InitBackoff float64 // second
MaxBackOff float64 // second
}
func ExecuteWithRetry(f func() (interface{}, error), initBackoff float64, maxBackoff float64, maxAttempts int, cause error) (interface{}, error) {
var res interface{}
for i := 0; i < maxAttempts; i++ {
if _, ok := cause.(*dferrors.DfError); ok {
return res, cause
}
if status.Code(cause) == codes.DeadlineExceeded || status.Code(cause) == codes.Canceled {
return res, cause
}
if i > 0 {
time.Sleep(mathutils.RandBackoff(initBackoff, maxBackoff, 2.0, i))
}
res, cause = f()
if cause == nil {
break
}
}
return res, cause
}