-
Notifications
You must be signed in to change notification settings - Fork 1
/
sync_rpc_client.go
481 lines (432 loc) · 14 KB
/
sync_rpc_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
/*
Copyright 2020 The Magma Authors.
This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// package service implements the core of bootstrapper
package service
import (
"bytes"
"context"
"crypto/tls"
"encoding/binary"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/golang/glog"
"golang.org/x/net/http2"
"github.com/go-magma/magma/gateway/go/service_registry"
"github.com/go-magma/magma/lib/go/definitions"
_ "github.com/go-magma/magma/lib/go/initflag"
"github.com/go-magma/magma/lib/go/protos"
)
const (
// grpc message is delivered as a length prefixed message within the HTTP2 DATA
// frames, with first 5 bytes for compression and msg length
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
GRPC_MSGLEN_SZ = 5
GRPC_LEN_OFFSET = 1
MinRetryInterval = time.Millisecond * 500
MaxRetryInterval = time.Second * 30
RetryBackoffIncrement = MinRetryInterval
SuccessStartInterval = time.Second * 5
DefaultSyncRpcHeartbeatInterval = time.Second * 30
DefaultGatewayKeepaliveInterval = time.Second * 10
DefaultGatewayResponseTimeout = time.Second * 120
)
type Config struct {
// Sync rpc client sends across heartbeat responses with empty reqID
// in case no messages were sent within this interval(in seconds)
SyncRpcHeartbeatInterval time.Duration `yaml:"sync_rpc_heartbeat_interval"`
// Sync rpc client sends across gateway response with KeepConnActive
// flag set in case it doesn't receive any responses from gateway service
// within this interval(in seconds)
GatewayKeepaliveInterval time.Duration `yaml:"gateway_keepalive_interval"`
// Sync rpc client sends across a gateway timed out error response to
// dispatcher in case it doesn't receive any responses from gateway service
// within timeout period(in seconds)
GatewayResponseTimeout time.Duration `yaml:"gateway_response_timeout"`
}
// Request - outstanding request
type Request struct {
CancelFunc context.CancelFunc
terminated bool
}
// SyncRpcClient opens a bidirectional connection with the cloud
type SyncRpcClient struct {
sync.RWMutex
// service registry to coonect to the cloud
serviceRegistry service_registry.GatewayRegistry
// responseTimeout in seconds
cfg Config
// requests which are still being processed
outstandingReqs map[uint32]*Request
// channel receiving broker responses
respCh chan *protos.SyncRPCResponse
// broker
broker broker
// underlying SyncRPC grpc client
client protos.SyncRPCServiceClient
}
// NewClient returns a new SyncRPC client using given service registry
// If given registry is nil - use shared GW Service Registry config values
func NewClient(reg service_registry.GatewayRegistry) *SyncRpcClient {
// create a new grpc connection to the dispatcher in the cloud??
if reg == nil {
reg = service_registry.Get()
}
cfg := &Config{
SyncRpcHeartbeatInterval: DefaultSyncRpcHeartbeatInterval,
GatewayKeepaliveInterval: DefaultGatewayKeepaliveInterval,
GatewayResponseTimeout: DefaultGatewayResponseTimeout,
}
client := &SyncRpcClient{
serviceRegistry: reg,
cfg: *cfg, // copy configs
outstandingReqs: make(map[uint32]*Request),
respCh: make(chan *protos.SyncRPCResponse),
broker: newbrokerImpl(cfg),
}
return client
}
// Run starts SyncRPC worker loop and block forever
func (c *SyncRpcClient) Run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for currentBackoffInterval := MinRetryInterval; ; currentBackoffInterval = (currentBackoffInterval + RetryBackoffIncrement) % MaxRetryInterval {
conn, err := c.serviceRegistry.GetCloudConnection(definitions.DispatcherServiceName)
if err != nil {
// TODO
// Continue for retryable grpc errors
// Add a delay/jitter for retrying cloud connection for non retryable grpc errors
glog.Errorf("[SyncRpc] error creating cloud connection: %v", err)
time.Sleep(currentBackoffInterval)
continue
} else {
glog.Infof("[SyncRpc] successfully connected to cloud '%s' service", definitions.DispatcherServiceName)
}
// this should simply wait here for requests and process responses
// in case we see any error we will retry connecting to the dispatcher
resetBackoffTime := time.Now().Add(SuccessStartInterval) // make sure, run lasts at least SuccessStartInterval
c.runSyncRpcClient(ctx, protos.NewSyncRPCServiceClient(conn))
if time.Now().After(resetBackoffTime) {
currentBackoffInterval = MinRetryInterval // reset backoff interval
}
conn.Close()
time.Sleep(currentBackoffInterval)
// exit loop if context is done
select {
case <-ctx.Done():
return
default:
}
}
}
// run forever
func (c *SyncRpcClient) runSyncRpcClient(ctx context.Context, client protos.SyncRPCServiceClient) error {
c.client = client
stream, err := c.client.EstablishSyncRPCStream(ctx)
if err != nil {
glog.Errorf("[SyncRPC] Failed establishing SyncRpc stream; error: %v", err)
return err
}
// close connection if error
errChan := make(chan error)
go func() {
stream.Send(&protos.SyncRPCResponse{HeartBeat: true}) // send first heartbeat to establish orc8r queue
errChan <- c.processStream(ctx, stream)
}()
timer := time.NewTimer(c.cfg.SyncRpcHeartbeatInterval)
defer timer.Stop()
for {
select {
case resp := <-c.respCh:
if !c.isReqTerminated(resp.ReqId) {
err := stream.Send(resp)
if err != nil {
glog.Errorf("[SyncRpc] send to dispatcher failed: %v", err)
return err
}
timer.Reset(c.cfg.SyncRpcHeartbeatInterval)
glog.V(3).Infof("[SyncRpc] sent resp: %s", resp)
} else {
glog.Errorf("[SyncRpc] request canceled for Id: %d", resp.ReqId)
}
case err := <-errChan:
// error recd handling processStream
return err
case <-timer.C:
err := stream.Send(&protos.SyncRPCResponse{HeartBeat: true})
if err != nil {
glog.Error("[SyncRpc] heartbeat to dispatcher failed")
return err
}
glog.V(2).Info("[SyncRpc] heartbeat success")
timer.Reset(c.cfg.SyncRpcHeartbeatInterval)
case <-ctx.Done():
glog.Info("[SyncRPC] Stopping SyncRpcClient")
return nil
}
}
}
func (c *SyncRpcClient) updateTerminatedReqs(reqID uint32) context.CancelFunc {
c.Lock()
defer c.Unlock()
if r, ok := c.outstandingReqs[reqID]; ok {
r.terminated = true
return r.CancelFunc
}
return nil
}
func (c *SyncRpcClient) isReqTerminated(reqID uint32) bool {
c.RLock()
defer c.RUnlock()
if r, ok := c.outstandingReqs[reqID]; ok {
return r.terminated
}
return false
}
func (c *SyncRpcClient) updateOutstandingdReqs(reqID uint32, cancelFn context.CancelFunc) {
c.Lock()
c.outstandingReqs[reqID] = &Request{CancelFunc: cancelFn}
c.Unlock()
}
func (c *SyncRpcClient) getOutstandingReqCancelFn(reqID uint32) context.CancelFunc {
c.RLock()
defer c.RUnlock()
if r, ok := c.outstandingReqs[reqID]; ok {
return r.CancelFunc
}
return nil
}
func (c *SyncRpcClient) removeOutstandingReq(reqID uint32) {
c.Lock()
delete(c.outstandingReqs, reqID)
c.Unlock()
}
// handleSyncRpcRequest forwards the incoming request to the appropriate destination
func (c *SyncRpcClient) handleSyncRpcRequest(inCtx context.Context, req *protos.SyncRPCRequest) {
if req == nil {
glog.Error("[SyncRpc] error empty request received")
return
}
if req.HeartBeat {
glog.V(3).Info("[SyncRpc] received heartbeat")
return
}
gatewayReq := req.GetReqBody()
if req.GetConnClosed() {
glog.V(1).Infof("[SyncRpc] connection closed handling ReqId: %d", req.ReqId)
cancelFn := c.updateTerminatedReqs(req.ReqId)
if cancelFn != nil {
cancelFn()
} else {
glog.V(1).Infof("[SyncRpc] closed ReqId: %d is not found", req.ReqId)
}
return
}
if glog.V(2) {
glog.Infof("[SyncRpc] request ID %d from GW %s, for service: %s, path: %s",
req.ReqId, gatewayReq.GetGwId(), gatewayReq.GetAuthority(), gatewayReq.GetPath())
}
// get the cancellation fn of the request if present
// return early if request is outstanding
if cancelFn := c.getOutstandingReqCancelFn(req.ReqId); cancelFn != nil {
glog.Warningf("[SyncRpc] duplicate request to %s, %s with Id: %d",
gatewayReq.GetAuthority(), gatewayReq.GetPath(), req.ReqId)
c.respCh <- buildSyncRpcErrorResponse(
req.ReqId, fmt.Sprintf("request ID %d is already being handled", req.ReqId))
return
}
serviceAddr, err := c.serviceRegistry.GetServiceAddress(gatewayReq.GetAuthority())
if err != nil {
glog.Errorf("[SyncRpc] error getting service address: %v", err)
return
}
ctx, cancelFn := context.WithCancel(inCtx)
c.updateOutstandingdReqs(req.ReqId, cancelFn)
go func() {
c.broker.send(ctx, serviceAddr, req, c.respCh)
c.removeOutstandingReq(req.ReqId)
}()
}
// processStream handles the incoming gateway requests
func (c *SyncRpcClient) processStream(
ctx context.Context, stream protos.SyncRPCService_EstablishSyncRPCStreamClient) error {
for {
req, err := stream.Recv()
if err != nil {
glog.Errorf("[SyncRPC] error: %v, failed handling sync request", err)
return err
}
c.handleSyncRpcRequest(ctx, req)
select {
case <-ctx.Done():
glog.Info("[SyncRPC] exiting processing stream")
return nil
default:
break
}
}
}
// broker handles the responsibility of translating the incoming SyncGrpcRequest
// into a http2 request to the Grpc service running on the gateway
type broker interface {
// Send method sends across the request to the appropriate gateway service
send(context.Context, string, *protos.SyncRPCRequest, chan *protos.SyncRPCResponse)
}
type brokerImpl struct {
cfg *Config
}
func newbrokerImpl(cfg *Config) *brokerImpl {
return &brokerImpl{cfg: cfg}
}
func getRequestHeaders(hdr http.Header, trailer http.Header) map[string]string {
// following block reads the response
respHeaders := make(map[string]string, len(hdr)+len(trailer))
for k, v := range hdr {
respHeaders[k] = strings.Join(v, ",")
}
if len(trailer) > 0 {
for k, v := range trailer {
respHeaders[k] = strings.Join(v, ",")
}
}
return respHeaders
}
func buildSyncRpcErrorResponse(reqID uint32, err string) *protos.SyncRPCResponse {
return &protos.SyncRPCResponse{
ReqId: reqID,
RespBody: &protos.GatewayResponse{
Err: err,
},
}
}
func buildHeaders(hdrMap map[string]string) http.Header {
hdr := http.Header{}
for k, v := range hdrMap {
hdr.Add(k, v)
}
return hdr
}
func sendInternal(address string, req *protos.SyncRPCRequest, respCh chan *protos.SyncRPCResponse, tout time.Duration) {
var respErr error
defer func() {
// handle and send error
if respErr != nil {
respErr = fmt.Errorf("ReqID %d failed: %v", req.ReqId, respErr)
glog.Errorf("[SyncRPC] %v", respErr)
respCh <- buildSyncRpcErrorResponse(req.ReqId, respErr.Error())
}
}()
// populate headers
gatewayReq := req.ReqBody
// http2 client to connect to the grpc port
// override DialTLS to create a vannilla tcp connection
client := &http.Client{
Transport: &http2.Transport{
AllowHTTP: true,
DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) {
return net.Dial(netw, addr)
},
},
Timeout: tout,
}
brokerReq := &http.Request{
RequestURI: "",
Method: "POST",
URL: &url.URL{
Scheme: "http",
Path: gatewayReq.Path,
Host: address,
},
Body: ioutil.NopCloser(bytes.NewReader(gatewayReq.Payload)),
Header: buildHeaders(gatewayReq.Headers),
Host: gatewayReq.Authority,
}
resp, err := client.Do(brokerReq)
if err != nil {
respErr = fmt.Errorf("grpc request failed: %v", err)
return
}
if resp.StatusCode != http.StatusOK {
respErr = fmt.Errorf(
"http response error, status %s statuscode %d, err: %v", resp.Status, resp.StatusCode, err)
return
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
respErr = fmt.Errorf("failed reading grpc message: %v", err)
}
lb := len(b)
if lb < GRPC_MSGLEN_SZ {
glog.V(2).Infof("[SyncRPC] resp body for ReqId %d is too short: %d", req.ReqId, lb)
respCh <- &protos.SyncRPCResponse{
ReqId: req.ReqId,
RespBody: &protos.GatewayResponse{
Status: strconv.Itoa(resp.StatusCode),
Headers: getRequestHeaders(resp.Header, resp.Trailer),
},
}
return
}
// grpc messages are sent as length prefixed messages, first byte is
// for indicating compression next 4 bytes is length, followed by payload
// read the first 5 bytes and using the length read the rest of the message
msgLen := binary.BigEndian.Uint32(b[GRPC_LEN_OFFSET:])
respMsg := &protos.SyncRPCResponse{
ReqId: req.ReqId,
RespBody: &protos.GatewayResponse{
Status: strconv.Itoa(resp.StatusCode),
Headers: getRequestHeaders(resp.Header, resp.Trailer),
Payload: b,
},
}
respCh <- respMsg
glog.V(3).Infof("[SyncRPC] sending resp: %s (payload: %v; msg len: %d; body len: %d)", respMsg, b, msgLen, lb)
}
func (p *brokerImpl) send(
ctx context.Context, serviceAddr string, req *protos.SyncRPCRequest, respCh chan *protos.SyncRPCResponse) {
clientRespCh := make(chan *protos.SyncRPCResponse)
go sendInternal(serviceAddr, req, clientRespCh, p.cfg.GatewayResponseTimeout)
timer := time.NewTimer(p.cfg.GatewayKeepaliveInterval)
defer timer.Stop()
timeoutTime := time.Now().Add(p.cfg.GatewayResponseTimeout)
for {
select {
case resp, ok := <-clientRespCh:
if !ok {
glog.Errorf("[SyncRPC] channel closed for ReqId %d", req.ReqId)
} else {
respCh <- resp
}
return
case <-timer.C:
if time.Now().After(timeoutTime) {
// max request timeout exceeded send error back to the caller
respCh <- buildSyncRpcErrorResponse(req.ReqId, "grpc request timed out on read")
return
}
// construct SyncRpcResponse and keep connection active
respCh <- &protos.SyncRPCResponse{
ReqId: req.ReqId,
RespBody: &protos.GatewayResponse{KeepConnActive: true}}
timer.Reset(p.cfg.GatewayKeepaliveInterval)
glog.V(2).Infof("[SyncRPC] sending keepalive while on ReqId %d", req.ReqId)
case <-ctx.Done():
return
}
}
}