-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
aggregator.go
309 lines (277 loc) · 10.1 KB
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package weightedaggregator implements state aggregator for weighted_target
// balancer.
//
// This is a separate package so it can be shared by weighted_target and eds.
// The eds balancer will be refactored to use weighted_target directly. After
// that, all functions and structs in this package can be moved to package
// weightedtarget and unexported.
package weightedaggregator
import (
"fmt"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/wrr"
)
type weightedPickerState struct {
weight uint32
state balancer.State
// stateToAggregate is the connectivity state used only for state
// aggregation. It could be different from state.ConnectivityState. For
// example when a sub-balancer transitions from TransientFailure to
// connecting, state.ConnectivityState is Connecting, but stateToAggregate
// is still TransientFailure.
stateToAggregate connectivity.State
}
func (s *weightedPickerState) String() string {
return fmt.Sprintf("weight:%v,picker:%p,state:%v,stateToAggregate:%v", s.weight, s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
}
// Aggregator is the weighted balancer state aggregator.
type Aggregator struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
newWRR func() wrr.WRR
csEvltr *balancer.ConnectivityStateEvaluator
mu sync.Mutex
// If started is false, no updates should be sent to the parent cc. A closed
// sub-balancer could still send pickers to this aggregator. This makes sure
// that no updates will be forwarded to parent when the whole balancer group
// and states aggregator is closed.
started bool
// All balancer IDs exist as keys in this map, even if balancer group is not
// started.
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[string]*weightedPickerState
// Set when UpdateState call propagation is paused.
pauseUpdateState bool
// Set when UpdateState call propagation is paused and an UpdateState call
// is suppressed.
needUpdateStateOnResume bool
}
// New creates a new weighted balancer state aggregator.
func New(cc balancer.ClientConn, logger *grpclog.PrefixLogger, newWRR func() wrr.WRR) *Aggregator {
return &Aggregator{
cc: cc,
logger: logger,
newWRR: newWRR,
csEvltr: &balancer.ConnectivityStateEvaluator{},
idToPickerState: make(map[string]*weightedPickerState),
}
}
// Start starts the aggregator. It can be called after Stop to restart the
// aggretator.
func (wbsa *Aggregator) Start() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.started = true
}
// Stop stops the aggregator. When the aggregator is stopped, it won't call
// parent ClientConn to update balancer state.
func (wbsa *Aggregator) Stop() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.started = false
wbsa.clearStates()
}
// Add adds a sub-balancer state with weight. It adds a place holder, and waits for
// the real sub-balancer to update state.
func (wbsa *Aggregator) Add(id string, weight uint32) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.idToPickerState[id] = &weightedPickerState{
weight: weight,
// Start everything in CONNECTING, so if one of the sub-balancers
// reports TransientFailure, the RPCs will still wait for the other
// sub-balancers.
state: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
},
stateToAggregate: connectivity.Connecting,
}
wbsa.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Connecting)
wbsa.buildAndUpdateLocked()
}
// Remove removes the sub-balancer state. Future updates from this sub-balancer,
// if any, will be ignored.
func (wbsa *Aggregator) Remove(id string) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
if _, ok := wbsa.idToPickerState[id]; !ok {
return
}
// Setting the state of the deleted sub-balancer to Shutdown will get csEvltr
// to remove the previous state for any aggregated state evaluations.
// transitions to and from connectivity.Shutdown are ignored by csEvltr.
wbsa.csEvltr.RecordTransition(wbsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown)
// Remove id and picker from picker map. This also results in future updates
// for this ID to be ignored.
delete(wbsa.idToPickerState, id)
wbsa.buildAndUpdateLocked()
}
// UpdateWeight updates the weight for the given id. Note that this doesn't
// trigger an update to the parent ClientConn. The caller should decide when
// it's necessary, and call BuildAndUpdate.
func (wbsa *Aggregator) UpdateWeight(id string, newWeight uint32) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
pState, ok := wbsa.idToPickerState[id]
if !ok {
return
}
pState.weight = newWeight
}
// PauseStateUpdates causes UpdateState calls to not propagate to the parent
// ClientConn. The last state will be remembered and propagated when
// ResumeStateUpdates is called.
func (wbsa *Aggregator) PauseStateUpdates() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.pauseUpdateState = true
wbsa.needUpdateStateOnResume = false
}
// ResumeStateUpdates will resume propagating UpdateState calls to the parent,
// and call UpdateState on the parent if any UpdateState call was suppressed.
func (wbsa *Aggregator) ResumeStateUpdates() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.pauseUpdateState = false
if wbsa.needUpdateStateOnResume {
wbsa.cc.UpdateState(wbsa.build())
}
}
// NeedUpdateStateOnResume sets the UpdateStateOnResume bool to true, letting a
// picker update be sent once ResumeStateUpdates is called.
func (wbsa *Aggregator) NeedUpdateStateOnResume() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.needUpdateStateOnResume = true
}
// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
// It calls parent ClientConn's UpdateState with the new aggregated state.
func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
state, ok := wbsa.idToPickerState[id]
if !ok {
// All state starts with an entry in pickStateMap. If ID is not in map,
// it's either removed, or never existed.
return
}
if !(state.state.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting) {
// If old state is TransientFailure, and new state is Connecting, don't
// update the state, to prevent the aggregated state from being always
// CONNECTING. Otherwise, stateToAggregate is the same as
// state.ConnectivityState.
wbsa.csEvltr.RecordTransition(state.stateToAggregate, newState.ConnectivityState)
state.stateToAggregate = newState.ConnectivityState
}
state.state = newState
wbsa.buildAndUpdateLocked()
}
// clearState Reset everything to init state (Connecting) but keep the entry in
// map (to keep the weight).
//
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) clearStates() {
for _, pState := range wbsa.idToPickerState {
pState.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
pState.stateToAggregate = connectivity.Connecting
}
}
// buildAndUpdateLocked aggregates the connectivity states of the sub-balancers,
// builds a new picker and sends an update to the parent ClientConn.
//
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) buildAndUpdateLocked() {
if !wbsa.started {
return
}
if wbsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
wbsa.needUpdateStateOnResume = true
return
}
wbsa.cc.UpdateState(wbsa.build())
}
// build combines sub-states into one.
//
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) build() balancer.State {
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
// Make sure picker's return error is consistent with the aggregatedState.
pickers := make([]weightedPickerState, 0, len(wbsa.idToPickerState))
switch aggState := wbsa.csEvltr.CurrentState(); aggState {
case connectivity.Connecting:
return balancer.State{
ConnectivityState: aggState,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)}
case connectivity.TransientFailure:
// this means that all sub-balancers are now in TransientFailure.
for _, ps := range wbsa.idToPickerState {
pickers = append(pickers, *ps)
}
return balancer.State{
ConnectivityState: aggState,
Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)}
default:
for _, ps := range wbsa.idToPickerState {
if ps.stateToAggregate == connectivity.Ready {
pickers = append(pickers, *ps)
}
}
return balancer.State{
ConnectivityState: aggState,
Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)}
}
}
type weightedPickerGroup struct {
w wrr.WRR
}
// newWeightedPickerGroup takes pickers with weights, and groups them into one
// picker.
//
// Note it only takes ready pickers. The map shouldn't contain non-ready
// pickers.
func newWeightedPickerGroup(readyWeightedPickers []weightedPickerState, newWRR func() wrr.WRR) *weightedPickerGroup {
w := newWRR()
for _, ps := range readyWeightedPickers {
w.Add(ps.state.Picker, int64(ps.weight))
}
return &weightedPickerGroup{
w: w,
}
}
func (pg *weightedPickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
p, ok := pg.w.Next().(balancer.Picker)
if !ok {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
return p.Pick(info)
}