-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
leastrequest.go
181 lines (156 loc) · 5.35 KB
/
leastrequest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package leastrequest implements a least request load balancer.
package leastrequest
import (
"encoding/json"
"fmt"
"sync/atomic"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/serviceconfig"
)
// grpcranduint32 is a global to stub out in tests.
var grpcranduint32 = grpcrand.Uint32
// Name is the name of the least request balancer.
const Name = "least_request_experimental"
var logger = grpclog.Component("least-request")
func init() {
balancer.Register(bb{})
}
// LBConfig is the balancer config for least_request_experimental balancer.
type LBConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`
// ChoiceCount is the number of random SubConns to sample to find the one
// with the fewest outstanding requests. If unset, defaults to 2. If set to
// < 2, the config will be rejected, and if set to > 10, will become 10.
ChoiceCount uint32 `json:"choiceCount,omitempty"`
}
type bb struct{}
func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
lbConfig := &LBConfig{
ChoiceCount: 2,
}
if err := json.Unmarshal(s, lbConfig); err != nil {
return nil, fmt.Errorf("least-request: unable to unmarshal LBConfig: %v", err)
}
// "If `choice_count < 2`, the config will be rejected." - A48
if lbConfig.ChoiceCount < 2 { // sweet
return nil, fmt.Errorf("least-request: lbConfig.choiceCount: %v, must be >= 2", lbConfig.ChoiceCount)
}
// "If a LeastRequestLoadBalancingConfig with a choice_count > 10 is
// received, the least_request_experimental policy will set choice_count =
// 10." - A48
if lbConfig.ChoiceCount > 10 {
lbConfig.ChoiceCount = 10
}
return lbConfig, nil
}
func (bb) Name() string {
return Name
}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
b.Balancer = baseBuilder.Build(cc, bOpts)
return b
}
type leastRequestBalancer struct {
// Embeds balancer.Balancer because needs to intercept UpdateClientConnState
// to learn about choiceCount.
balancer.Balancer
choiceCount uint32
scRPCCounts map[balancer.SubConn]*atomic.Int32 // Hold onto RPC counts to keep track for subsequent picker updates.
}
func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
lrCfg, ok := s.BalancerConfig.(*LBConfig)
if !ok {
logger.Errorf("least-request: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
return balancer.ErrBadResolverState
}
lrb.choiceCount = lrCfg.ChoiceCount
return lrb.Balancer.UpdateClientConnState(s)
}
type scWithRPCCount struct {
sc balancer.SubConn
numRPCs *atomic.Int32
}
func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("least-request: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
for sc := range lrb.scRPCCounts {
if _, ok := info.ReadySCs[sc]; !ok { // If no longer ready, no more need for the ref to count active RPCs.
delete(lrb.scRPCCounts, sc)
}
}
// Create new refs if needed.
for sc := range info.ReadySCs {
if _, ok := lrb.scRPCCounts[sc]; !ok {
lrb.scRPCCounts[sc] = new(atomic.Int32)
}
}
// Copy refs to counters into picker.
scs := make([]scWithRPCCount, 0, len(info.ReadySCs))
for sc := range info.ReadySCs {
scs = append(scs, scWithRPCCount{
sc: sc,
numRPCs: lrb.scRPCCounts[sc], // guaranteed to be present due to algorithm
})
}
return &picker{
choiceCount: lrb.choiceCount,
subConns: scs,
}
}
type picker struct {
// choiceCount is the number of random SubConns to find the one with
// the least request.
choiceCount uint32
// Built out when receives list of ready RPCs.
subConns []scWithRPCCount
}
func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
var pickedSC *scWithRPCCount
var pickedSCNumRPCs int32
for i := 0; i < int(p.choiceCount); i++ {
index := grpcranduint32() % uint32(len(p.subConns))
sc := p.subConns[index]
n := sc.numRPCs.Load()
if pickedSC == nil || n < pickedSCNumRPCs {
pickedSC = &sc
pickedSCNumRPCs = n
}
}
// "The counter for a subchannel should be atomically incremented by one
// after it has been successfully picked by the picker." - A48
pickedSC.numRPCs.Add(1)
// "the picker should add a callback for atomically decrementing the
// subchannel counter once the RPC finishes (regardless of Status code)." -
// A48.
done := func(balancer.DoneInfo) {
pickedSC.numRPCs.Add(-1)
}
return balancer.PickResult{
SubConn: pickedSC.sc,
Done: done,
}, nil
}