/
client_watchers_service.go
135 lines (123 loc) · 3.65 KB
/
client_watchers_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package client
import (
"fmt"
"sync"
)
// ServiceUpdate contains update about the service.
type ServiceUpdate struct {
// Routes contain matchers+actions to route RPCs.
Routes []*Route
}
// WatchService uses LDS and RDS to discover information about the provided
// serviceName.
//
// WatchService can only be called once. The second call will not start a
// watcher and the callback will get an error. It's this case because an xDS
// client is expected to be used only by one ClientConn.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchService(serviceName string, cb func(ServiceUpdate, error)) (cancel func()) {
c.mu.Lock()
if len(c.ldsWatchers) != 0 {
go cb(ServiceUpdate{}, fmt.Errorf("unexpected WatchService when there's another service being watched"))
c.mu.Unlock()
return func() {}
}
c.mu.Unlock()
w := &serviceUpdateWatcher{c: c, serviceCb: cb}
w.ldsCancel = c.watchLDS(serviceName, w.handleLDSResp)
return w.close
}
// serviceUpdateWatcher handles LDS and RDS response, and calls the service
// callback at the right time.
type serviceUpdateWatcher struct {
c *Client
ldsCancel func()
serviceCb func(ServiceUpdate, error)
mu sync.Mutex
closed bool
rdsName string
rdsCancel func()
}
func (w *serviceUpdateWatcher) handleLDSResp(update ldsUpdate, err error) {
w.c.logger.Infof("xds: client received LDS update: %+v, err: %v", update, err)
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
if err != nil {
// We check the error type and do different things. For now, the only
// type we check is ResourceNotFound, which indicates the LDS resource
// was removed, and besides sending the error to callback, we also
// cancel the RDS watch.
if ErrType(err) == ErrorTypeResourceNotFound && w.rdsCancel != nil {
w.rdsCancel()
w.rdsName = ""
w.rdsCancel = nil
}
// The other error cases still return early without canceling the
// existing RDS watch.
w.serviceCb(ServiceUpdate{}, err)
return
}
if w.rdsName == update.routeName {
// If the new routeName is same as the previous, don't cancel and
// restart the RDS watch.
return
}
w.rdsName = update.routeName
if w.rdsCancel != nil {
w.rdsCancel()
}
w.rdsCancel = w.c.watchRDS(update.routeName, w.handleRDSResp)
}
func (w *serviceUpdateWatcher) handleRDSResp(update rdsUpdate, err error) {
w.c.logger.Infof("xds: client received RDS update: %+v, err: %v", update, err)
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
if w.rdsCancel == nil {
// This mean only the RDS watch is canceled, can happen if the LDS
// resource is removed.
return
}
if err != nil {
w.serviceCb(ServiceUpdate{}, err)
return
}
w.serviceCb(ServiceUpdate{
Routes: update.routes,
}, nil)
}
func (w *serviceUpdateWatcher) close() {
w.mu.Lock()
defer w.mu.Unlock()
w.closed = true
w.ldsCancel()
if w.rdsCancel != nil {
w.rdsCancel()
w.rdsCancel = nil
}
}