/
picker.go
406 lines (366 loc) · 15.8 KB
/
picker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/keys"
"google.golang.org/grpc/connectivity"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/metadata"
)
var (
errRLSThrottled = errors.New("RLS call throttled at client side")
// Function to compute data cache entry size.
computeDataCacheEntrySize = dcEntrySize
)
// exitIdler wraps the only method on the BalancerGroup that the picker calls.
type exitIdler interface {
ExitIdleOne(id string)
}
// rlsPicker selects the subConn to be used for a particular RPC. It does not
// manage subConns directly and delegates to pickers provided by child policies.
type rlsPicker struct {
// The keyBuilder map used to generate RLS keys for the RPC. This is built
// by the LB policy based on the received ServiceConfig.
kbm keys.BuilderMap
// Endpoint from the user's original dial target. Used to set the `host_key`
// field in `extra_keys`.
origEndpoint string
lb *rlsBalancer
// The picker is given its own copy of the below fields from the RLS LB policy
// to avoid having to grab the mutex on the latter.
defaultPolicy *childPolicyWrapper // Child policy for the default target.
ctrlCh *controlChannel // Control channel to the RLS server.
maxAge time.Duration // Cache max age from LB config.
staleAge time.Duration // Cache stale age from LB config.
bg exitIdler
logger *internalgrpclog.PrefixLogger
}
// isFullMethodNameValid return true if name is of the form `/service/method`.
func isFullMethodNameValid(name string) bool {
return strings.HasPrefix(name, "/") && strings.Count(name, "/") == 2
}
// Pick makes the routing decision for every outbound RPC.
func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if name := info.FullMethodName; !isFullMethodNameValid(name) {
return balancer.PickResult{}, fmt.Errorf("rls: method name %q is not of the form '/service/method", name)
}
// Build the request's keys using the key builders from LB config.
md, _ := metadata.FromOutgoingContext(info.Ctx)
reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)
// Grab a read-lock to perform a cache lookup. If it so happens that we need
// to write to the cache (if we have to send out an RLS request), we will
// release the read-lock and acquire a write-lock.
p.lb.cacheMu.RLock()
// Lookup data cache and pending request map using request path and keys.
cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
dcEntry := p.lb.dataCache.getEntry(cacheKey)
pendingEntry := p.lb.pendingMap[cacheKey]
now := time.Now()
switch {
// No data cache entry. No pending request.
case dcEntry == nil && pendingEntry == nil:
p.lb.cacheMu.RUnlock()
bs := &backoffState{bs: defaultBackoffStrategy}
return p.sendRequestAndReturnPick(cacheKey, bs, reqKeys.Map, info)
// No data cache entry. Pending request exits.
case dcEntry == nil && pendingEntry != nil:
p.lb.cacheMu.RUnlock()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
// Data cache hit. No pending request.
case dcEntry != nil && pendingEntry == nil:
if dcEntry.expiryTime.After(now) {
if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) {
// Executing the proactive cache refresh in a goroutine simplifies
// acquiring and releasing of locks.
go func(bs *backoffState) {
p.lb.cacheMu.Lock()
// It is OK to ignore the return value which indicates if this request
// was throttled. This is an attempt to proactively refresh the cache,
// and it is OK for it to fail.
p.sendRouteLookupRequest(cacheKey, bs, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
p.lb.cacheMu.Unlock()
}(dcEntry.backoffState)
}
// Delegate to child policies.
res, err := p.delegateToChildPolicies(dcEntry, info)
p.lb.cacheMu.RUnlock()
return res, err
}
// We get here only if the data cache entry has expired. If entry is in
// backoff, delegate to default target or fail the pick.
if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
status := dcEntry.status
p.lb.cacheMu.RUnlock()
return p.useDefaultPickIfPossible(info, status)
}
// We get here only if the entry has expired and is not in backoff.
bs := *dcEntry.backoffState
p.lb.cacheMu.RUnlock()
return p.sendRequestAndReturnPick(cacheKey, &bs, reqKeys.Map, info)
// Data cache hit. Pending request exists.
default:
if dcEntry.expiryTime.After(now) {
res, err := p.delegateToChildPolicies(dcEntry, info)
p.lb.cacheMu.RUnlock()
return res, err
}
// Data cache entry has expired and pending request exists. Queue pick.
p.lb.cacheMu.RUnlock()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
}
// delegateToChildPolicies is a helper function which iterates through the list
// of child policy wrappers in a cache entry and attempts to find a child policy
// to which this RPC can be routed to. If there is no child policy in READY
// state, we delegate to the first child policy arbitrarily.
//
// Caller must hold at least a read-lock on p.lb.cacheMu.
func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
for _, cpw := range dcEntry.childPolicyWrappers {
ok, res, err := p.pickIfFeasible(cpw, info)
if ok {
return res, err
}
}
if len(dcEntry.childPolicyWrappers) != 0 {
state := (*balancer.State)(atomic.LoadPointer(&dcEntry.childPolicyWrappers[0].state))
return state.Picker.Pick(info)
}
// In the unlikely event that we have a cache entry with no targets, we end up
// queueing the RPC.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
// sendRequestAndReturnPick is called to send out an RLS request on the control
// channel. Since sending out an RLS request entails creating an entry in the
// pending request map, this method needs to acquire the write-lock on the
// cache. This also means that the caller must release the read-lock that they
// could have been holding. This means that things could have happened in
// between and therefore a fresh lookup on the cache needs to be performed here
// with the write-lock and all cases need to be handled.
//
// Acquires the write-lock on the cache. Caller must not hold p.lb.cacheMu.
func (p *rlsPicker) sendRequestAndReturnPick(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, info balancer.PickInfo) (balancer.PickResult, error) {
p.lb.cacheMu.Lock()
defer p.lb.cacheMu.Unlock()
// We need to perform another cache lookup to ensure that things haven't
// changed since the last lookup.
dcEntry := p.lb.dataCache.getEntry(cacheKey)
pendingEntry := p.lb.pendingMap[cacheKey]
// Existence of a pending map entry indicates that someone sent out a request
// before us and the response is pending. Skip sending a new request.
// Piggyback on the existing one by queueing the pick.
if pendingEntry != nil {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
// If no data cache entry exists, it means that no one jumped in front of us.
// We need to send out an RLS request and queue the pick.
if dcEntry == nil {
throttled := p.sendRouteLookupRequest(cacheKey, bs, reqKeys, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
// Existence of a data cache entry indicates either that someone sent out a
// request before us and received a response, or we got here in the first
// place because we found an expired entry in the data cache.
now := time.Now()
switch {
// Valid data cache entry. Delegate to its child policies.
case dcEntry.expiryTime.After(now):
return p.delegateToChildPolicies(dcEntry, info)
// Entry is in backoff. Delegate to default target or fail the pick.
case dcEntry.backoffState != nil && dcEntry.backoffTime.After(now):
return p.useDefaultPickIfPossible(info, dcEntry.status)
// Entry has expired, but is not in backoff. Send request and queue pick.
default:
throttled := p.sendRouteLookupRequest(cacheKey, bs, reqKeys, rlspb.RouteLookupRequest_REASON_MISS, "")
if throttled {
return p.useDefaultPickIfPossible(info, errRLSThrottled)
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
}
// useDefaultPickIfPossible is a helper method which delegates to the default
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
if p.defaultPolicy != nil {
_, res, err := p.pickIfFeasible(p.defaultPolicy, info)
return res, err
}
return balancer.PickResult{}, errOnNoDefault
}
// sendRouteLookupRequest adds an entry to the pending request map and sends out
// an RLS request using the passed in arguments. Returns a value indicating if
// the request was throttled by the client-side adaptive throttler.
//
// Caller must hold a write-lock on p.lb.cacheMu.
func (p *rlsPicker) sendRouteLookupRequest(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string) bool {
if p.lb.pendingMap[cacheKey] != nil {
return false
}
p.lb.pendingMap[cacheKey] = bs
throttled := p.ctrlCh.lookup(reqKeys, reason, staleHeaders, func(targets []string, headerData string, err error) {
p.handleRouteLookupResponse(cacheKey, targets, headerData, err)
})
if throttled {
delete(p.lb.pendingMap, cacheKey)
}
return throttled
}
// pickIfFeasible determines if a pick can be delegated to child policy based on
// its connectivity state.
// - If state is CONNECTING, the pick is to be queued
// - If state is IDLE, the child policy is instructed to exit idle, and the pick
// is to be queued
// - If state is READY, pick it delegated to the child policy's picker
func (p *rlsPicker) pickIfFeasible(cpw *childPolicyWrapper, info balancer.PickInfo) (bool, balancer.PickResult, error) {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
switch state.ConnectivityState {
case connectivity.Connecting:
return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
case connectivity.Idle:
p.bg.ExitIdleOne(cpw.target)
return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
case connectivity.Ready:
r, e := state.Picker.Pick(info)
return true, r, e
}
return false, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
// handleRouteLookupResponse is the callback invoked by the control channel upon
// receipt of an RLS response. Modifies the data cache and pending requests map
// and sends a new picker.
//
// Acquires the write-lock on the cache. Caller must not hold p.lb.cacheMu.
func (p *rlsPicker) handleRouteLookupResponse(cacheKey cacheKey, targets []string, headerData string, err error) {
p.logger.Infof("Received RLS response for key %+v with targets %+v, headerData %q, err: %v", cacheKey, targets, headerData, err)
p.lb.cacheMu.Lock()
defer func() {
// Pending request map entry is unconditionally deleted since the request is
// no longer pending.
p.logger.Infof("Removing pending request entry for key %+v", cacheKey)
delete(p.lb.pendingMap, cacheKey)
p.lb.sendNewPicker()
p.lb.cacheMu.Unlock()
}()
// Lookup the data cache entry or create a new one.
dcEntry := p.lb.dataCache.getEntry(cacheKey)
if dcEntry == nil {
dcEntry = &cacheEntry{}
if _, ok := p.lb.dataCache.addEntry(cacheKey, dcEntry); !ok {
// This is a very unlikely case where we are unable to add a
// data cache entry. Log and leave.
p.logger.Warningf("Failed to add data cache entry for %+v", cacheKey)
return
}
}
// For failed requests, the data cache entry is modified as follows:
// - status is set to error returned from the control channel
// - current backoff state is available in the pending entry
// - `retries` field is incremented and
// - backoff state is moved to the data cache
// - backoffTime is set to the time indicated by the backoff state
// - backoffExpirationTime is set to twice the backoff time
// - backoffTimer is set to fire after backoffTime
//
// When a proactive cache refresh fails, this would leave the targets and the
// expiry time from the old entry unchanged. And this mean that the old valid
// entry would be used until expiration, and a new picker would be sent upon
// backoff expiry.
now := time.Now()
if err != nil {
dcEntry.status = err
pendingEntry := p.lb.pendingMap[cacheKey]
pendingEntry.retries++
backoffTime := pendingEntry.bs.Backoff(pendingEntry.retries)
dcEntry.backoffState = pendingEntry
dcEntry.backoffTime = now.Add(backoffTime)
dcEntry.backoffExpiryTime = now.Add(2 * backoffTime)
if dcEntry.backoffState.timer != nil {
dcEntry.backoffState.timer.Stop()
}
dcEntry.backoffState.timer = time.AfterFunc(backoffTime, p.lb.sendNewPicker)
return
}
// For successful requests, the cache entry is modified as follows:
// - childPolicyWrappers is set to point to the child policy wrappers
// associated with the targets specified in the received response
// - headerData is set to the value received in the response
// - expiryTime, stateTime and earliestEvictionTime are set
// - status is set to nil (OK status)
// - backoff state is cleared
p.setChildPolicyWrappersInCacheEntry(dcEntry, targets)
dcEntry.headerData = headerData
dcEntry.expiryTime = now.Add(p.maxAge)
if p.staleAge != 0 {
dcEntry.staleTime = now.Add(p.staleAge)
}
dcEntry.earliestEvictTime = now.Add(minEvictDuration)
dcEntry.status = nil
dcEntry.backoffState = &backoffState{bs: defaultBackoffStrategy}
dcEntry.backoffTime = time.Time{}
dcEntry.backoffExpiryTime = time.Time{}
p.lb.dataCache.updateEntrySize(dcEntry, computeDataCacheEntrySize(cacheKey, dcEntry))
}
// setChildPolicyWrappersInCacheEntry sets up the childPolicyWrappers field in
// the cache entry to point to the child policy wrappers for the targets
// specified in the RLS response.
//
// Caller must hold a write-lock on p.lb.cacheMu.
func (p *rlsPicker) setChildPolicyWrappersInCacheEntry(dcEntry *cacheEntry, newTargets []string) {
// If the childPolicyWrappers field is already pointing to the right targets,
// then the field's value does not need to change.
targetsChanged := true
func() {
if cpws := dcEntry.childPolicyWrappers; cpws != nil {
if len(newTargets) != len(cpws) {
return
}
for i, target := range newTargets {
if cpws[i].target != target {
return
}
}
targetsChanged = false
}
}()
if !targetsChanged {
return
}
// If the childPolicyWrappers field is not already set to the right targets,
// then it must be reset. We construct a new list of child policies and
// then swap out the old list for the new one.
newChildPolicies := p.lb.acquireChildPolicyReferences(newTargets)
oldChildPolicyTargets := make([]string, len(dcEntry.childPolicyWrappers))
for i, cpw := range dcEntry.childPolicyWrappers {
oldChildPolicyTargets[i] = cpw.target
}
p.lb.releaseChildPolicyReferences(oldChildPolicyTargets)
dcEntry.childPolicyWrappers = newChildPolicies
}
func dcEntrySize(key cacheKey, entry *cacheEntry) int64 {
return int64(len(key.path) + len(key.keys) + len(entry.headerData))
}