-
Notifications
You must be signed in to change notification settings - Fork 4.5k
/
Copy pathsoak_tests.go
203 lines (189 loc) · 7.4 KB
/
soak_tests.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package interop
import (
"bytes"
"context"
"fmt"
"os"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/peer"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
// SoakWorkerResults stores the aggregated results for a specific worker during the soak test.
type SoakWorkerResults struct {
iterationsSucceeded int
Failures int
Latencies *stats.Histogram
}
// SoakIterationConfig holds the parameters required for a single soak iteration.
type SoakIterationConfig struct {
RequestSize int // The size of the request payload in bytes.
ResponseSize int // The expected size of the response payload in bytes.
Client testgrpc.TestServiceClient // The gRPC client to make the call.
CallOptions []grpc.CallOption // Call options for the RPC.
}
// SoakTestConfig holds the configuration for the entire soak test.
type SoakTestConfig struct {
RequestSize int
ResponseSize int
PerIterationMaxAcceptableLatency time.Duration
MinTimeBetweenRPCs time.Duration
OverallTimeout time.Duration
ServerAddr string
NumWorkers int
Iterations int
MaxFailures int
ChannelForTest func() (*grpc.ClientConn, func())
}
func doOneSoakIteration(ctx context.Context, config SoakIterationConfig) (latency time.Duration, err error) {
start := time.Now()
// Do a large-unary RPC.
// Create the request payload.
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, config.RequestSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(config.ResponseSize),
Payload: pl,
}
// Perform the GRPC call.
var reply *testpb.SimpleResponse
reply, err = config.Client.UnaryCall(ctx, req, config.CallOptions...)
if err != nil {
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
return 0, err
}
// Validate response.
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != config.ResponseSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, config.ResponseSize)
return 0, err
}
// Calculate latency and return result.
latency = time.Since(start)
return latency, nil
}
func executeSoakTestInWorker(ctx context.Context, config SoakTestConfig, startTime time.Time, workerID int, soakWorkerResults *SoakWorkerResults) {
timeoutDuration := config.OverallTimeout
soakIterationsPerWorker := config.Iterations / config.NumWorkers
if soakWorkerResults.Latencies == nil {
soakWorkerResults.Latencies = stats.NewHistogram(stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
})
}
for i := 0; i < soakIterationsPerWorker; i++ {
if ctx.Err() != nil {
return
}
if time.Since(startTime) >= timeoutDuration {
fmt.Printf("Test exceeded overall timeout of %v, stopping...\n", config.OverallTimeout)
return
}
earliestNextStart := time.After(config.MinTimeBetweenRPCs)
currentChannel, cleanup := config.ChannelForTest()
defer cleanup()
client := testgrpc.NewTestServiceClient(currentChannel)
var p peer.Peer
iterationConfig := SoakIterationConfig{
RequestSize: config.RequestSize,
ResponseSize: config.ResponseSize,
Client: client,
CallOptions: []grpc.CallOption{grpc.Peer(&p)},
}
latency, err := doOneSoakIteration(ctx, iterationConfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s failed: %s\n", workerID, i, 0, p.Addr, config.ServerAddr, err)
soakWorkerResults.Failures++
<-earliestNextStart
continue
}
if latency > config.PerIterationMaxAcceptableLatency {
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s exceeds max acceptable latency: %d\n", workerID, i, latency.Milliseconds(), p.Addr, config.ServerAddr, config.PerIterationMaxAcceptableLatency.Milliseconds())
soakWorkerResults.Failures++
<-earliestNextStart
continue
}
// Success: log the details of the iteration.
soakWorkerResults.Latencies.Add(latency.Milliseconds())
soakWorkerResults.iterationsSucceeded++
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s succeeded\n", workerID, i, latency.Milliseconds(), p.Addr, config.ServerAddr)
<-earliestNextStart
}
}
// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
func DoSoakTest(ctx context.Context, soakConfig SoakTestConfig) {
if soakConfig.Iterations%soakConfig.NumWorkers != 0 {
fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by soakNumWThreads\n")
}
startTime := time.Now()
var wg sync.WaitGroup
soakWorkerResults := make([]SoakWorkerResults, soakConfig.NumWorkers)
for i := 0; i < soakConfig.NumWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
executeSoakTestInWorker(ctx, soakConfig, startTime, workerID, &soakWorkerResults[workerID])
}(i)
}
// Wait for all goroutines to complete.
wg.Wait()
// Handle results.
totalSuccesses := 0
totalFailures := 0
latencies := stats.NewHistogram(stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
})
for _, worker := range soakWorkerResults {
totalSuccesses += worker.iterationsSucceeded
totalFailures += worker.Failures
if worker.Latencies != nil {
// Add latencies from the worker's Histogram to the main latencies.
latencies.Merge(worker.Latencies)
}
}
var b bytes.Buffer
totalIterations := totalSuccesses + totalFailures
latencies.Print(&b)
fmt.Fprintf(os.Stderr,
"(server_uri: %s) soak test successes: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n",
soakConfig.ServerAddr, totalSuccesses, soakConfig.Iterations, totalFailures, b.String())
if totalIterations != soakConfig.Iterations {
logger.Fatalf("Soak test consumed all %v of time and quit early, ran %d out of %d iterations.\n", soakConfig.OverallTimeout, totalIterations, soakConfig.Iterations)
}
if totalFailures > soakConfig.MaxFailures {
logger.Fatalf("Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, soakConfig.MaxFailures)
}
if soakConfig.ChannelForTest != nil {
_, cleanup := soakConfig.ChannelForTest()
defer cleanup()
}
}