-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
rds_handler.go
188 lines (168 loc) · 6.1 KB
/
rds_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package server
import (
"sync"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
// rdsHandler handles any RDS queries that need to be started for a given server
// side listeners Filter Chains (i.e. not inline). It persists rdsWatcher
// updates for later use and also determines whether all the rdsWatcher updates
// needed have been received or not.
type rdsHandler struct {
xdsC XDSClient
logger *igrpclog.PrefixLogger
callback func(string, rdsWatcherUpdate)
// updates is a map from routeName to rdsWatcher update, including
// RouteConfiguration resources and any errors received. If not written in
// this map, no RouteConfiguration or error for that route name yet. If
// update set in value, use that as valid route configuration, otherwise
// treat as an error case and fail at L7 level.
updates map[string]rdsWatcherUpdate
mu sync.Mutex
cancels map[string]func()
}
// newRDSHandler creates a new rdsHandler to watch for RouteConfiguration
// resources. listenerWrapper updates the list of route names to watch by
// calling updateRouteNamesToWatch() upon receipt of new Listener configuration.
func newRDSHandler(cb func(string, rdsWatcherUpdate), xdsC XDSClient, logger *igrpclog.PrefixLogger) *rdsHandler {
return &rdsHandler{
xdsC: xdsC,
logger: logger,
callback: cb,
updates: make(map[string]rdsWatcherUpdate),
cancels: make(map[string]func()),
}
}
// updateRouteNamesToWatch handles a list of route names to watch for a given
// server side listener (if a filter chain specifies dynamic
// RouteConfiguration). This function handles all the logic with respect to any
// routes that may have been added or deleted as compared to what was previously
// present. Must be called within an xDS Client callback.
func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool) {
rh.mu.Lock()
defer rh.mu.Unlock()
// Add and start watches for any new routes in routeNamesToWatch.
for routeName := range routeNamesToWatch {
if _, ok := rh.cancels[routeName]; !ok {
// The xDS client keeps a reference to the watcher until the cancel
// func is invoked. So, we don't need to keep a reference for fear
// of it being garbage collected.
w := &rdsWatcher{parent: rh, routeName: routeName}
cancel := xdsresource.WatchRouteConfig(rh.xdsC, routeName, w)
// Set bit on cancel function to eat any RouteConfiguration calls
// for this watcher after it has been canceled.
rh.cancels[routeName] = func() {
w.mu.Lock()
w.canceled = true
w.mu.Unlock()
cancel()
}
}
}
// Delete and cancel watches for any routes from persisted routeNamesToWatch
// that are no longer present.
for routeName := range rh.cancels {
if _, ok := routeNamesToWatch[routeName]; !ok {
rh.cancels[routeName]()
delete(rh.cancels, routeName)
delete(rh.updates, routeName)
}
}
}
// determines if all dynamic RouteConfiguration needed has received
// configuration or update. Must be called from an xDS Client Callback.
func (rh *rdsHandler) determineRouteConfigurationReady() bool {
// Safe to read cancels because only written to in other parts of xDS Client
// Callbacks, which are sync.
return len(rh.updates) == len(rh.cancels)
}
// Must be called from an xDS Client Callback.
func (rh *rdsHandler) handleRouteUpdate(routeName string, update rdsWatcherUpdate) {
rwu := rh.updates[routeName]
// Accept the new update if any of the following are true:
// 1. we had no valid update data.
// 2. the update is valid.
// 3. the update error is ResourceNotFound.
if rwu.data == nil || update.err == nil || xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
rwu = update
}
rh.updates[routeName] = rwu
rh.callback(routeName, rwu)
}
// close() is meant to be called by wrapped listener when the wrapped listener
// is closed, and it cleans up resources by canceling all the active RDS
// watches.
func (rh *rdsHandler) close() {
rh.mu.Lock()
defer rh.mu.Unlock()
for _, cancel := range rh.cancels {
cancel()
}
}
type rdsWatcherUpdate struct {
data *xdsresource.RouteConfigUpdate
err error
}
// rdsWatcher implements the xdsresource.RouteConfigWatcher interface and is
// passed to the WatchRouteConfig API.
type rdsWatcher struct {
parent *rdsHandler
logger *igrpclog.PrefixLogger
routeName string
mu sync.Mutex
canceled bool // eats callbacks if true
}
func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
return
}
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
}
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource})
}
func (rw *rdsWatcher) OnError(err error) {
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
return
}
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
}
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
}
func (rw *rdsWatcher) OnResourceDoesNotExist() {
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
return
}
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName)
}
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName)
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
}