-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
resource_resolver_eds.go
140 lines (118 loc) · 4.29 KB
/
resource_resolver_eds.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterresolver
import (
"sync"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
type edsDiscoveryMechanism struct {
nameToWatch string
cancelWatch func()
topLevelResolver topLevelResolver
stopped *grpcsync.Event
logger *grpclog.PrefixLogger
mu sync.Mutex
update *xdsresource.EndpointsUpdate // Nil indicates no update received so far.
}
func (er *edsDiscoveryMechanism) lastUpdate() (any, bool) {
er.mu.Lock()
defer er.mu.Unlock()
if er.update == nil {
return nil, false
}
return *er.update, true
}
func (er *edsDiscoveryMechanism) resolveNow() {
}
// The definition of stop() mentions that implementations must not invoke any
// methods on the topLevelResolver once the call to `stop()` returns.
func (er *edsDiscoveryMechanism) stop() {
// Canceling a watch with the xDS client can race with an xDS response
// received around the same time, and can result in the watch callback being
// invoked after the watch is canceled. Callers need to handle this race,
// and we fire the stopped event here to ensure that a watch callback
// invocation around the same time becomes a no-op.
er.stopped.Fire()
er.cancelWatch()
}
// newEDSResolver returns an implementation of the endpointsResolver interface
// that uses EDS to resolve the given name to endpoints.
func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelResolver topLevelResolver, logger *grpclog.PrefixLogger) *edsDiscoveryMechanism {
ret := &edsDiscoveryMechanism{
nameToWatch: nameToWatch,
topLevelResolver: topLevelResolver,
logger: logger,
stopped: grpcsync.NewEvent(),
}
ret.cancelWatch = xdsresource.WatchEndpoints(producer, nameToWatch, ret)
return ret
}
// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData) {
if er.stopped.HasFired() {
return
}
er.mu.Lock()
er.update = &update.Resource
er.mu.Unlock()
er.topLevelResolver.onUpdate()
}
func (er *edsDiscoveryMechanism) OnError(err error) {
if er.stopped.HasFired() {
return
}
if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported error: %v", er.nameToWatch, err)
}
er.mu.Lock()
if er.update != nil {
// Continue using a previously received good configuration if one
// exists.
er.mu.Unlock()
return
}
// Else report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()
er.topLevelResolver.onUpdate()
}
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
if er.stopped.HasFired() {
return
}
if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
}
// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()
er.topLevelResolver.onUpdate()
}