forked from grpc/grpc-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_watchers.go
287 lines (262 loc) · 8.45 KB
/
client_watchers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package client
import (
"fmt"
"sync"
"time"
)
type watchInfoState int
const (
watchInfoStateStarted watchInfoState = iota
watchInfoStateRespReceived
watchInfoStateTimeout
watchInfoStateCanceled
)
// watchInfo holds all the information from a watch() call.
type watchInfo struct {
c *Client
rType ResourceType
target string
ldsCallback func(ListenerUpdate, error)
rdsCallback func(RouteConfigUpdate, error)
cdsCallback func(ClusterUpdate, error)
edsCallback func(EndpointsUpdate, error)
expiryTimer *time.Timer
// mu protects state, and c.scheduleCallback().
// - No callback should be scheduled after watchInfo is canceled.
// - No timeout error should be scheduled after watchInfo is resp received.
mu sync.Mutex
state watchInfoState
}
func (wi *watchInfo) newUpdate(update interface{}) {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.c.scheduleCallback(wi, update, nil)
}
func (wi *watchInfo) resourceNotFound() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %v target %s not found in received response", wi.rType, wi.target))
}
func (wi *watchInfo) timeout() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled || wi.state == watchInfoStateRespReceived {
return
}
wi.state = watchInfoStateTimeout
wi.sendErrorLocked(fmt.Errorf("xds: %v target %s not found, watcher timeout", wi.rType, wi.target))
}
// Caller must hold wi.mu.
func (wi *watchInfo) sendErrorLocked(err error) {
var (
u interface{}
)
switch wi.rType {
case ListenerResource:
u = ListenerUpdate{}
case RouteConfigResource:
u = RouteConfigUpdate{}
case ClusterResource:
u = ClusterUpdate{}
case EndpointsResource:
u = EndpointsUpdate{}
}
wi.c.scheduleCallback(wi, u, err)
}
func (wi *watchInfo) cancel() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.expiryTimer.Stop()
wi.state = watchInfoStateCanceled
}
func (c *Client) watch(wi *watchInfo) (cancel func()) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger.Debugf("new watch for type %v, resource name %v", wi.rType, wi.target)
var watchers map[string]map[*watchInfo]bool
switch wi.rType {
case ListenerResource:
watchers = c.ldsWatchers
case RouteConfigResource:
watchers = c.rdsWatchers
case ClusterResource:
watchers = c.cdsWatchers
case EndpointsResource:
watchers = c.edsWatchers
}
resourceName := wi.target
s, ok := watchers[wi.target]
if !ok {
// If this is a new watcher, will ask lower level to send a new request
// with the resource name.
//
// If this (type+name) is already being watched, will not notify the
// underlying versioned apiClient.
c.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.rType, wi.target)
s = make(map[*watchInfo]bool)
watchers[resourceName] = s
c.apiClient.AddWatch(wi.rType, resourceName)
}
// No matter what, add the new watcher to the set, so it's callback will be
// call for new responses.
s[wi] = true
// If the resource is in cache, call the callback with the value.
switch wi.rType {
case ListenerResource:
if v, ok := c.ldsCache[resourceName]; ok {
c.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case RouteConfigResource:
if v, ok := c.rdsCache[resourceName]; ok {
c.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case ClusterResource:
if v, ok := c.cdsCache[resourceName]; ok {
c.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
case EndpointsResource:
if v, ok := c.edsCache[resourceName]; ok {
c.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, v)
wi.newUpdate(v)
}
}
return func() {
c.logger.Debugf("watch for type %v, resource name %v canceled", wi.rType, wi.target)
wi.cancel()
c.mu.Lock()
defer c.mu.Unlock()
if s := watchers[resourceName]; s != nil {
// Remove this watcher, so it's callback will not be called in the
// future.
delete(s, wi)
if len(s) == 0 {
c.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.rType, wi.target)
// If this was the last watcher, also tell xdsv2Client to stop
// watching this resource.
delete(watchers, resourceName)
c.apiClient.RemoveWatch(wi.rType, resourceName)
// Remove the resource from cache. When a watch for this
// resource is added later, it will trigger a xDS request with
// resource names, and client will receive new xDS responses.
switch wi.rType {
case ListenerResource:
delete(c.ldsCache, resourceName)
case RouteConfigResource:
delete(c.rdsCache, resourceName)
case ClusterResource:
delete(c.cdsCache, resourceName)
case EndpointsResource:
delete(c.edsCache, resourceName)
}
}
}
}
}
// WatchListener uses LDS to discover information about the provided listener.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchListener(serviceName string, cb func(ListenerUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: ListenerResource,
target: serviceName,
ldsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}
// WatchRouteConfig starts a listener watcher for the service..
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchRouteConfig(routeName string, cb func(RouteConfigUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: RouteConfigResource,
target: routeName,
rdsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}
// WatchCluster uses CDS to discover information about the provided
// clusterName.
//
// WatchCluster can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: ClusterResource,
target: clusterName,
cdsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}
// WatchEndpoints uses EDS to discover endpoints in the provided clusterName.
//
// WatchEndpoints can be called multiple times, with same or different
// clusterNames. Each call will start an independent watcher for the resource.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) {
wi := &watchInfo{
c: c,
rType: EndpointsResource,
target: clusterName,
edsCallback: cb,
}
wi.expiryTimer = time.AfterFunc(c.opts.WatchExpiryTimeout, func() {
wi.timeout()
})
return c.watch(wi)
}