-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
balancer_priority.go
204 lines (191 loc) · 6.55 KB
/
balancer_priority.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package priority
import (
"errors"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
)
var (
// ErrAllPrioritiesRemoved is returned by the picker when there's no priority available.
ErrAllPrioritiesRemoved = errors.New("no priority is provided, all priorities are removed")
// DefaultPriorityInitTimeout is the timeout after which if a priority is
// not READY, the next will be started. It's exported to be overridden by
// tests.
DefaultPriorityInitTimeout = 10 * time.Second
)
// syncPriority handles priority after a config update or a child balancer
// connectivity state update. It makes sure the balancer state (started or not)
// is in sync with the priorities (even in tricky cases where a child is moved
// from a priority to another).
//
// It's guaranteed that after this function returns:
//
// If some child is READY, it is childInUse, and all lower priorities are
// closed.
//
// If some child is newly started(in Connecting for the first time), it is
// childInUse, and all lower priorities are closed.
//
// Otherwise, the lowest priority is childInUse (none of the children is
// ready, and the overall state is not ready).
//
// Steps:
//
// If all priorities were deleted, unset childInUse (to an empty string), and
// set parent ClientConn to TransientFailure
//
// Otherwise, Scan all children from p0, and check balancer stats:
//
// For any of the following cases:
//
// If balancer is not started (not built), this is either a new child with
// high priority, or a new builder for an existing child.
//
// If balancer is Connecting and has non-nil initTimer (meaning it
// transitioned from Ready or Idle to connecting, not from TF, so we
// should give it init-time to connect).
//
// If balancer is READY or IDLE
//
// If this is the lowest priority
//
// do the following:
//
// if this is not the old childInUse, override picker so old picker is no
// longer used.
//
// switch to it (because all higher priorities are neither new or Ready)
//
// forward the new addresses and config
//
// Caller must hold b.mu.
func (b *priorityBalancer) syncPriority(childUpdating string) {
if b.inhibitPickerUpdates {
b.logger.Debugf("Skipping update from child policy %q", childUpdating)
return
}
for p, name := range b.priorities {
child, ok := b.children[name]
if !ok {
b.logger.Warningf("Priority name %q is not found in list of child policies", name)
continue
}
if !child.started ||
child.state.ConnectivityState == connectivity.Ready ||
child.state.ConnectivityState == connectivity.Idle ||
(child.state.ConnectivityState == connectivity.Connecting && child.initTimer != nil) ||
p == len(b.priorities)-1 {
if b.childInUse != child.name || child.name == childUpdating {
b.logger.Debugf("childInUse, childUpdating: %q, %q", b.childInUse, child.name)
// If we switch children or the child in use just updated its
// picker, push the child's picker to the parent.
b.cc.UpdateState(child.state)
}
b.logger.Debugf("Switching to (%q, %v) in syncPriority", child.name, p)
b.switchToChild(child, p)
break
}
}
}
// Stop priorities [p+1, lowest].
//
// Caller must hold b.mu.
func (b *priorityBalancer) stopSubBalancersLowerThanPriority(p int) {
for i := p + 1; i < len(b.priorities); i++ {
name := b.priorities[i]
child, ok := b.children[name]
if !ok {
b.logger.Warningf("Priority name %q is not found in list of child policies", name)
continue
}
child.stop()
}
}
// switchToChild does the following:
// - stop all child with lower priorities
// - if childInUse is not this child
// - set childInUse to this child
// - if this child is not started, start it
//
// Note that it does NOT send the current child state (picker) to the parent
// ClientConn. The caller needs to send it if necessary.
//
// this can be called when
// 1. first update, start p0
// 2. an update moves a READY child from a lower priority to higher
// 2. a different builder is updated for this child
// 3. a high priority goes Failure, start next
// 4. a high priority init timeout, start next
//
// Caller must hold b.mu.
func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) {
// Stop lower priorities even if childInUse is same as this child. It's
// possible this child was moved from a priority to another.
b.stopSubBalancersLowerThanPriority(priority)
// If this child is already in use, do nothing.
//
// This can happen:
// - all priorities are not READY, an config update always triggers switch
// to the lowest. In this case, the lowest child could still be connecting,
// so we don't stop the init timer.
// - a high priority is READY, an config update always triggers switch to
// it.
if b.childInUse == child.name && child.started {
return
}
b.childInUse = child.name
if !child.started {
child.start()
}
}
// handleChildStateUpdate start/close priorities based on the connectivity
// state.
func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.State) {
// Update state in child. The updated picker will be sent to parent later if
// necessary.
child, ok := b.children[childName]
if !ok {
b.logger.Warningf("Child policy not found for %q", childName)
return
}
if !child.started {
b.logger.Warningf("Ignoring update from child policy %q which is not in started state: %+v", childName, s)
return
}
child.state = s
// We start/stop the init timer of this child based on the new connectivity
// state. syncPriority() later will need the init timer (to check if it's
// nil or not) to decide which child to switch to.
switch s.ConnectivityState {
case connectivity.Ready, connectivity.Idle:
child.reportedTF = false
child.stopInitTimer()
case connectivity.TransientFailure:
child.reportedTF = true
child.stopInitTimer()
case connectivity.Connecting:
if !child.reportedTF {
child.startInitTimer()
}
default:
// New state is Shutdown, should never happen. Don't forward.
}
child.parent.syncPriority(childName)
}