-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
authority.go
456 lines (410 loc) · 16.6 KB
/
authority.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xdsclient
import (
"context"
"fmt"
"sync"
"time"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/transport"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/anypb"
)
type watchState int
const (
watchStateStarted watchState = iota
watchStateRespReceived
watchStateTimeout
watchStateCanceled
)
type resourceState struct {
watchers map[xdsresource.ResourceWatcher]bool // Set of watchers for this resource
cache xdsresource.ResourceData // Most recent ACKed update for this resource
md xdsresource.UpdateMetadata // Metadata for the most recent update
// Common watch state for all watchers of this resource.
wTimer *time.Timer // Expiry timer
wState watchState // State of the watch
}
// authority wraps all state associated with a single management server. It
// contains the transport used to communicate with the management server and a
// cache of resource state for resources requested from the management server.
//
// Bootstrap configuration could contain multiple entries in the authorities map
// that share the same server config (server address and credentials to use). We
// share the same authority instance amongst these entries, and the reference
// counting is taken care of by the `clientImpl` type.
type authority struct {
serverCfg *bootstrap.ServerConfig // Server config for this authority
bootstrapCfg *bootstrap.Config // Full bootstrap configuration
refCount int // Reference count of watches referring to this authority
serializer *callbackSerializer // Callback serializer for invoking watch callbacks
resourceTypeGetter func(string) xdsresource.Type // ResourceType registry lookup
transport *transport.Transport // Underlying xDS transport to the management server
watchExpiryTimeout time.Duration // Resource watch expiry timeout
logger *grpclog.PrefixLogger
// A two level map containing the state of all the resources being watched.
//
// The first level map key is the ResourceType (Listener, Route etc). This
// allows us to have a single map for all resources instead of having per
// resource-type maps.
//
// The second level map key is the resource name, with the value being the
// actual state of the resource.
resourcesMu sync.Mutex
resources map[xdsresource.Type]map[string]*resourceState
}
// authorityArgs is a convenience struct to wrap arguments required to create a
// new authority. All fields here correspond directly to appropriate fields
// stored in the authority struct.
type authorityArgs struct {
// The reason for passing server config and bootstrap config separately
// (although the former is part of the latter) is because authorities in the
// bootstrap config might contain an empty server config, and in this case,
// the top-level server config is to be used.
//
// There are two code paths from where a new authority struct might be
// created. One is when a watch is registered for a resource, and one is
// when load reporting needs to be started. We have the authority name in
// the first case, but do in the second. We only have the server config in
// the second case.
serverCfg *bootstrap.ServerConfig
bootstrapCfg *bootstrap.Config
serializer *callbackSerializer
resourceTypeGetter func(string) xdsresource.Type
watchExpiryTimeout time.Duration
logger *grpclog.PrefixLogger
}
func newAuthority(args authorityArgs) (*authority, error) {
ret := &authority{
serverCfg: args.serverCfg,
bootstrapCfg: args.bootstrapCfg,
serializer: args.serializer,
resourceTypeGetter: args.resourceTypeGetter,
watchExpiryTimeout: args.watchExpiryTimeout,
logger: args.logger,
resources: make(map[xdsresource.Type]map[string]*resourceState),
}
tr, err := transport.New(transport.Options{
ServerCfg: *args.serverCfg,
UpdateHandler: ret.handleResourceUpdate,
StreamErrorHandler: ret.newConnectionError,
Logger: args.logger,
})
if err != nil {
return nil, fmt.Errorf("creating new transport to %q: %v", args.serverCfg, err)
}
ret.transport = tr
return ret, nil
}
func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate) error {
rType := a.resourceTypeGetter(resourceUpdate.URL)
if rType == nil {
return xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource URL %v unknown in response from server", resourceUpdate.URL)
}
opts := &xdsresource.DecodeOptions{
BootstrapConfig: a.bootstrapCfg,
Logger: a.logger,
}
updates, md, err := decodeAllResources(opts, rType, resourceUpdate)
a.updateResourceStateAndScheduleCallbacks(rType, updates, md)
return err
}
func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Type, updates map[string]resourceDataErrTuple, md xdsresource.UpdateMetadata) {
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()
resourceStates := a.resources[rType]
for name, uErr := range updates {
if state, ok := resourceStates[name]; ok {
// Cancel the expiry timer associated with the resource once a
// response is received, irrespective of whether the update is a
// good one or not.
state.wTimer.Stop()
state.wState = watchStateRespReceived
if uErr.err != nil {
// On error, keep previous version of the resource. But update
// status and error.
state.md.ErrState = md.ErrState
state.md.Status = md.Status
for watcher := range state.watchers {
watcher := watcher
err := uErr.err
a.serializer.Schedule(func(context.Context) { watcher.OnError(err) })
}
continue
}
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if state.cache == nil || !state.cache.Equal(uErr.resource) {
for watcher := range state.watchers {
watcher := watcher
resource := uErr.resource
a.serializer.Schedule(func(context.Context) { watcher.OnUpdate(resource) })
}
}
// Sync cache.
a.logger.Debugf("Resource type %q with name %q, value %s added to cache", rType.TypeEnum().String(), name, uErr.resource.ToJSON())
state.cache = uErr.resource
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
state.md = md
state.md.ErrState = nil
state.md.Status = xdsresource.ServiceStatusACKed
if md.ErrState != nil {
state.md.Version = md.ErrState.Version
}
}
}
// If this resource type requires that all resources be present in every
// SotW response from the server, a response that does not include a
// previously seen resource will be interpreted as a deletion of that
// resource.
if !rType.AllResourcesRequiredInSotW() {
return
}
for name, state := range resourceStates {
if state.cache == nil {
// If the resource state does not contain a cached update, which can
// happen when:
// - resource was newly requested but has not yet been received, or,
// - resource was removed as part of a previous update,
// we don't want to generate an error for the watchers.
//
// For the first of the above two conditions, this ADS response may
// be in reaction to an earlier request that did not yet request the
// new resource, so its absence from the response does not
// necessarily indicate that the resource does not exist. For that
// case, we rely on the request timeout instead.
//
// For the second of the above two conditions, we already generated
// an error when we received the first response which removed this
// resource. So, there is no need to generate another one.
continue
}
if _, ok := updates[name]; !ok {
// The metadata status is set to "ServiceStatusNotExist" if a
// previous update deleted this resource, in which case we do not
// want to repeatedly call the watch callbacks with a
// "resource-not-found" error.
if state.md.Status == xdsresource.ServiceStatusNotExist {
continue
}
// If resource exists in cache, but not in the new update, delete
// the resource from cache, and also send a resource not found error
// to indicate resource removed. Metadata for the resource is still
// maintained, as this is required by CSDS.
state.cache = nil
state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
for watcher := range state.watchers {
watcher := watcher
a.serializer.Schedule(func(context.Context) { watcher.OnResourceDoesNotExist() })
}
}
}
}
type resourceDataErrTuple struct {
resource xdsresource.ResourceData
err error
}
func decodeAllResources(opts *xdsresource.DecodeOptions, rType xdsresource.Type, update transport.ResourceUpdate) (map[string]resourceDataErrTuple, xdsresource.UpdateMetadata, error) {
timestamp := time.Now()
md := xdsresource.UpdateMetadata{
Version: update.Version,
Timestamp: timestamp,
}
topLevelErrors := make([]error, 0) // Tracks deserialization errors, where we don't have a resource name.
perResourceErrors := make(map[string]error) // Tracks resource validation errors, where we have a resource name.
ret := make(map[string]resourceDataErrTuple) // Return result, a map from resource name to either resource data or error.
for _, r := range update.Resources {
result, err := rType.Decode(opts, r)
// Name field of the result is left unpopulated only when resource
// deserialization fails.
name := ""
if result != nil {
name = xdsresource.ParseName(result.Name).String()
}
if err == nil {
ret[name] = resourceDataErrTuple{resource: result.Resource}
continue
}
if name == "" {
topLevelErrors = append(topLevelErrors, err)
continue
}
perResourceErrors[name] = err
// Add place holder in the map so we know this resource name was in
// the response.
ret[name] = resourceDataErrTuple{err: err}
}
if len(topLevelErrors) == 0 && len(perResourceErrors) == 0 {
md.Status = xdsresource.ServiceStatusACKed
return ret, md, nil
}
typeStr := rType.TypeEnum().String()
md.Status = xdsresource.ServiceStatusNACKed
errRet := xdsresource.CombineErrors(typeStr, topLevelErrors, perResourceErrors)
md.ErrState = &xdsresource.UpdateErrorMetadata{
Version: update.Version,
Err: errRet,
Timestamp: timestamp,
}
return ret, md, errRet
}
// newConnectionError is called by the underlying transport when it receives a
// connection error. The error will be forwarded to all the resource watchers.
func (a *authority) newConnectionError(err error) {
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()
// For all resource types, for all resources within each resource type, and
// for all the watchers for every resource, propagate the connection error
// from the transport layer.
for _, rType := range a.resources {
for _, state := range rType {
for watcher := range state.watchers {
watcher := watcher
a.serializer.Schedule(func(context.Context) {
watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err))
})
}
}
}
}
// Increments the reference count. Caller must hold parent's authorityMu.
func (a *authority) refLocked() {
a.refCount++
}
// Decrements the reference count. Caller must hold parent's authorityMu.
func (a *authority) unrefLocked() int {
a.refCount--
return a.refCount
}
func (a *authority) close() {
a.transport.Close()
}
func (a *authority) watchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) func() {
a.logger.Debugf("New watch for type %q, resource name %q", rType.TypeEnum(), resourceName)
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()
// Lookup the ResourceType specific resources from the top-level map. If
// there is no entry for this ResourceType, create one.
resources := a.resources[rType]
if resources == nil {
resources = make(map[string]*resourceState)
a.resources[rType] = resources
}
// Lookup the resourceState for the particular resource that the watch is
// being registered for. If this is the first watch for this resource,
// instruct the transport layer to send a DiscoveryRequest for the same.
state := resources[resourceName]
if state == nil {
a.logger.Debugf("First watch for type %q, resource name %q", rType.TypeEnum(), resourceName)
state = &resourceState{
watchers: make(map[xdsresource.ResourceWatcher]bool),
md: xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusRequested},
wState: watchStateStarted,
}
state.wTimer = time.AfterFunc(a.watchExpiryTimeout, func() {
a.handleWatchTimerExpiry(rType, resourceName, state)
})
resources[resourceName] = state
a.sendDiscoveryRequestLocked(rType, resources)
}
// Always add the new watcher to the set of watchers.
state.watchers[watcher] = true
// If we have a cached copy of the resource, notify the new watcher.
if state.cache != nil {
a.logger.Debugf("Resource type %q with resource name %q found in cache: %s", rType.TypeEnum(), resourceName, state.cache.ToJSON())
resource := state.cache
a.serializer.Schedule(func(context.Context) { watcher.OnUpdate(resource) })
}
return func() {
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()
// We already have a reference to the resourceState for this particular
// resource. Avoid indexing into the two-level map to figure this out.
// Delete this particular watcher from the list of watchers, so that its
// callback will not be invoked in the future.
state.wState = watchStateCanceled
delete(state.watchers, watcher)
if len(state.watchers) > 0 {
return
}
// There are no more watchers for this resource, delete the state
// associated with it, and instruct the transport to send a request
// which does not include this resource name.
delete(resources, resourceName)
a.sendDiscoveryRequestLocked(rType, resources)
}
}
func (a *authority) handleWatchTimerExpiry(rType xdsresource.Type, resourceName string, state *resourceState) {
a.logger.Warningf("Watch for resource %q of type %s timed out", resourceName, rType.TypeEnum().String())
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()
if state.wState == watchStateCanceled {
return
}
state.wState = watchStateTimeout
// With the watch timer firing, it is safe to assume that the resource does
// not exist on the management server.
state.cache = nil
state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
for watcher := range state.watchers {
watcher := watcher
a.serializer.Schedule(func(context.Context) { watcher.OnResourceDoesNotExist() })
}
}
// sendDiscoveryRequestLocked sends a discovery request for the specified
// resource type and resource names. Even though this method does not directly
// access the resource cache, it is important that `resourcesMu` be beld when
// calling this method to ensure that a consistent snapshot of resource names is
// being requested.
func (a *authority) sendDiscoveryRequestLocked(rType xdsresource.Type, resources map[string]*resourceState) {
resourcesToRequest := make([]string, len(resources))
i := 0
for name := range resources {
resourcesToRequest[i] = name
i++
}
a.transport.SendRequest(rType.TypeURL(), resourcesToRequest)
}
func (a *authority) reportLoad() (*load.Store, func()) {
return a.transport.ReportLoad()
}
func (a *authority) dumpResources() map[string]map[string]xdsresource.UpdateWithMD {
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()
dump := make(map[string]map[string]xdsresource.UpdateWithMD)
for rType, resourceStates := range a.resources {
states := make(map[string]xdsresource.UpdateWithMD)
for name, state := range resourceStates {
var raw *anypb.Any
if state.cache != nil {
raw = state.cache.Raw()
}
states[name] = xdsresource.UpdateWithMD{
MD: state.md,
Raw: raw,
}
}
dump[rType.TypeURL()] = states
}
return dump
}