-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
roundrobin.go
223 lines (210 loc) · 8.33 KB
/
roundrobin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package roundrobin contains helper functions to check for roundrobin and
// weighted-roundrobin load balancing of RPCs in tests.
package roundrobin
import (
"context"
"fmt"
"math"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
var logger = grpclog.Component("testutils-roundrobin")
// waitForTrafficToReachBackends repeatedly makes RPCs using the provided
// TestServiceClient until RPCs reach all backends specified in addrs, or the
// context expires, in which case a non-nil error is returned.
func waitForTrafficToReachBackends(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
// Make sure connections to all backends are up. We need to do this two
// times (to be sure that round_robin has kicked in) because the channel
// could have been configured with a different LB policy before the switch
// to round_robin. And the previous LB policy could be sharing backends with
// round_robin, and therefore in the first iteration of this loop, RPCs
// could land on backends owned by the previous LB policy.
for j := 0; j < 2; j++ {
for i := 0; i < len(addrs); i++ {
for {
time.Sleep(time.Millisecond)
if ctx.Err() != nil {
return fmt.Errorf("timeout waiting for connection to %q to be up", addrs[i].Addr)
}
var peer peer.Peer
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
// Some tests remove backends and check if round robin is
// happening across the remaining backends. In such cases,
// RPCs can initially fail on the connection using the
// removed backend. Just keep retrying and eventually the
// connection using the removed backend will shutdown and
// will be removed.
continue
}
if peer.Addr.String() == addrs[i].Addr {
break
}
}
}
}
return nil
}
// CheckRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn,
// connected to a server exposing the test.grpc_testing.TestService, are
// roundrobined across the given backend addresses.
//
// Returns a non-nil error if context deadline expires before RPCs start to get
// roundrobined across the given backends.
func CheckRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
if err := waitForTrafficToReachBackends(ctx, client, addrs); err != nil {
return err
}
// At this point, RPCs are getting successfully executed at the backends
// that we care about. To support duplicate addresses (in addrs) and
// backends being removed from the list of addresses passed to the
// roundrobin LB, we do the following:
// 1. Determine the count of RPCs that we expect each of our backends to
// receive per iteration.
// 2. Wait until the same pattern repeats a few times, or the context
// deadline expires.
wantAddrCount := make(map[string]int)
for _, addr := range addrs {
wantAddrCount[addr.Addr]++
}
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
// Perform 3 more iterations.
var iterations [][]string
for i := 0; i < 3; i++ {
iteration := make([]string, len(addrs))
for c := 0; c < len(addrs); c++ {
var peer peer.Peer
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
return fmt.Errorf("EmptyCall() = %v, want <nil>", err)
}
iteration[c] = peer.Addr.String()
}
iterations = append(iterations, iteration)
}
// Ensure the first iteration contains all addresses in addrs.
gotAddrCount := make(map[string]int)
for _, addr := range iterations[0] {
gotAddrCount[addr]++
}
if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
logger.Infof("non-roundrobin, got address count in one iteration: %v, want: %v, Diff: %s", gotAddrCount, wantAddrCount, diff)
continue
}
// Ensure all three iterations contain the same addresses.
if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) {
logger.Infof("non-roundrobin, first iter: %v, second iter: %v, third iter: %v", iterations[0], iterations[1], iterations[2])
continue
}
return nil
}
return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs)
}
// CheckWeightedRoundRobinRPCs verifies that EmptyCall RPCs on the given
// ClientConn, connected to a server exposing the test.grpc_testing.TestService,
// are weighted roundrobined (with randomness) across the given backend
// addresses.
//
// Returns a non-nil error if context deadline expires before RPCs start to get
// roundrobined across the given backends.
func CheckWeightedRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
if err := waitForTrafficToReachBackends(ctx, client, addrs); err != nil {
return err
}
// At this point, RPCs are getting successfully executed at the backends
// that we care about. To take the randomness of the WRR into account, we
// look for approximate distribution instead of exact.
wantAddrCount := make(map[string]int)
for _, addr := range addrs {
wantAddrCount[addr.Addr]++
}
wantRatio := make(map[string]float64)
for addr, count := range wantAddrCount {
wantRatio[addr] = float64(count) / float64(len(addrs))
}
// There is a small possibility that RPCs are reaching backends that we
// don't expect them to reach here. The can happen because:
// - at time T0, the list of backends [A, B, C, D].
// - at time T1, the test updates the list of backends to [A, B, C], and
// immediately starts attempting to check the distribution of RPCs to the
// new backends.
// - there is no way for the test to wait for a new picker to be pushed on
// to the channel (which contains the updated list of backends) before
// starting to attempt the RPC distribution checks.
// - This is usually a transitory state and will eventually fix itself when
// the new picker is pushed on the channel, and RPCs will start getting
// routed to only backends that we care about.
//
// We work around this situation by using two loops. The inner loop contains
// the meat of the calculations, and includes the logic which factors out
// the randomness in weighted roundrobin. If we ever see an RPCs getting
// routed to a backend that we dont expect it to get routed to, we break
// from the inner loop thereby resetting all state and start afresh.
for {
results := make(map[string]float64)
totalCount := float64(0)
InnerLoop:
for {
if ctx.Err() != nil {
return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs)
}
for i := 0; i < len(addrs); i++ {
var peer peer.Peer
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
return fmt.Errorf("EmptyCall() = %v, want <nil>", err)
}
if addr := peer.Addr.String(); wantAddrCount[addr] == 0 {
break InnerLoop
}
results[peer.Addr.String()]++
}
totalCount += float64(len(addrs))
gotRatio := make(map[string]float64)
for addr, count := range results {
gotRatio[addr] = count / totalCount
}
if equalApproximate(gotRatio, wantRatio) {
return nil
}
logger.Infof("non-weighted-roundrobin, gotRatio: %v, wantRatio: %v", gotRatio, wantRatio)
}
<-time.After(time.Millisecond)
}
}
func equalApproximate(got, want map[string]float64) bool {
if len(got) != len(want) {
return false
}
opt := cmp.Comparer(func(x, y float64) bool {
delta := math.Abs(x - y)
mean := math.Abs(x+y) / 2.0
return delta/mean < 0.05
})
for addr := range want {
if !cmp.Equal(got[addr], want[addr], opt) {
return false
}
}
return true
}