forked from etcd-io/etcd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stresser.go
231 lines (200 loc) · 5.28 KB
/
stresser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"math/rand"
"net"
"net/http"
"sync"
"time"
clientV2 "github.com/coreos/etcd/client"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/transport"
)
func init() {
grpclog.SetLogger(plog)
}
type Stresser interface {
// Stress starts to stress the etcd cluster
Stress() error
// Cancel cancels the stress test on the etcd cluster
Cancel()
// Report reports the success and failure of the stress test
Report() (success int, failure int)
}
type stresser struct {
Endpoint string
KeySize int
KeySuffixRange int
N int
mu sync.Mutex
wg *sync.WaitGroup
cancel func()
conn *grpc.ClientConn
success int
}
func (s *stresser) Stress() error {
// TODO: add backoff option
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
if err != nil {
return fmt.Errorf("%v (%s)", err, s.Endpoint)
}
defer conn.Close()
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(s.N)
s.mu.Lock()
s.conn = conn
s.cancel = cancel
s.wg = wg
s.mu.Unlock()
kvc := pb.NewKVClient(conn)
for i := 0; i < s.N; i++ {
go func(i int) {
defer wg.Done()
for {
// TODO: 10-second is enough timeout to cover leader failure
// and immediate leader election. Find out what other cases this
// could be timed out.
putctx, putcancel := context.WithTimeout(ctx, 10*time.Second)
_, err := kvc.Put(putctx, &pb.PutRequest{
Key: []byte(fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))),
Value: []byte(randStr(s.KeySize)),
})
putcancel()
if err != nil {
shouldContinue := false
switch grpc.ErrorDesc(err) {
case context.DeadlineExceeded.Error():
// This retries when request is triggered at the same time as
// leader failure. When we terminate the leader, the request to
// that leader cannot be processed, and times out. Also requests
// to followers cannot be forwarded to the old leader, so timing out
// as well. We want to keep stressing until the cluster elects a
// new leader and start processing requests again.
shouldContinue = true
case etcdserver.ErrStopped.Error():
// one of the etcd nodes stopped from failure injection
shouldContinue = true
case transport.ErrConnClosing.Desc:
// server closed the transport (failure injected node)
shouldContinue = true
case rpctypes.ErrNotCapable.Error():
// capability check has not been done (in the beginning)
shouldContinue = true
// default:
// errors from stresser.Cancel method:
// rpc error: code = 1 desc = context canceled (type grpc.rpcError)
// rpc error: code = 2 desc = grpc: the client connection is closing (type grpc.rpcError)
}
if shouldContinue {
continue
}
return
}
s.mu.Lock()
s.success++
s.mu.Unlock()
}
}(i)
}
<-ctx.Done()
return nil
}
func (s *stresser) Cancel() {
s.mu.Lock()
cancel, conn, wg := s.cancel, s.conn, s.wg
s.mu.Unlock()
cancel()
wg.Wait()
conn.Close()
}
func (s *stresser) Report() (int, int) {
s.mu.Lock()
defer s.mu.Unlock()
// TODO: find a better way to report v3 tests
return s.success, -1
}
type stresserV2 struct {
Endpoint string
KeySize int
KeySuffixRange int
N int
mu sync.Mutex
failure int
success int
cancel func()
}
func (s *stresserV2) Stress() error {
cfg := clientV2.Config{
Endpoints: []string{s.Endpoint},
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
MaxIdleConnsPerHost: s.N,
},
}
c, err := clientV2.New(cfg)
if err != nil {
return err
}
kv := clientV2.NewKeysAPI(c)
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
for i := 0; i < s.N; i++ {
go func() {
for {
setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
key := fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))
_, err := kv.Set(setctx, key, randStr(s.KeySize), nil)
setcancel()
if err == context.Canceled {
return
}
s.mu.Lock()
if err != nil {
s.failure++
} else {
s.success++
}
s.mu.Unlock()
}
}()
}
<-ctx.Done()
return nil
}
func (s *stresserV2) Cancel() {
s.cancel()
}
func (s *stresserV2) Report() (success int, failure int) {
s.mu.Lock()
defer s.mu.Unlock()
return s.success, s.failure
}
func randStr(size int) string {
data := make([]byte, size)
for i := 0; i < size; i++ {
data[i] = byte(int('a') + rand.Intn(26))
}
return string(data)
}