forked from dolthub/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_testsuite.go
269 lines (228 loc) · 9.07 KB
/
client_testsuite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package vtworkerclienttest contains the testsuite against which each
// RPC implementation of the vtworkerclient interface must be tested.
package vtworkerclienttest
// NOTE: This file is not test-only code because it is referenced by
// tests in other packages and therefore it has to be regularly
// visible.
// NOTE: This code is in its own package such that its dependencies
// (e.g. zookeeper) won't be drawn into production binaries as well.
import (
"fmt"
"io"
"strings"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/topo/memorytopo"
"github.com/youtube/vitess/go/vt/vterrors"
"github.com/youtube/vitess/go/vt/vttablet/tmclient"
"github.com/youtube/vitess/go/vt/worker"
"github.com/youtube/vitess/go/vt/worker/vtworkerclient"
// Import the gRPC client implementation for tablet manager because the real
// vtworker implementation requires it.
_ "github.com/youtube/vitess/go/vt/vttablet/grpctmclient"
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
)
func init() {
// enforce we will use the right protocol (gRPC) (note the
// client is unused, but it is initialized, so it needs to exist)
*tmclient.TabletManagerProtocol = "grpc"
}
// CreateWorkerInstance returns a properly configured vtworker instance.
func CreateWorkerInstance(t *testing.T) *worker.Instance {
ts := memorytopo.NewServer("cell1", "cell2")
return worker.NewInstance(ts, "cell1", 1*time.Second)
}
// TestSuite runs the test suite on the given vtworker and vtworkerclient.
func TestSuite(t *testing.T, c vtworkerclient.Client) {
commandSucceeds(t, c)
commandErrors(t, c)
commandErrorsBecauseBusy(t, c, false /* client side cancelation */)
commandErrorsBecauseBusy(t, c, true /* server side cancelation */)
commandPanics(t, c)
}
func commandSucceeds(t *testing.T, client vtworkerclient.Client) {
stream, err := client.ExecuteVtworkerCommand(context.Background(), []string{"Ping", "pong"})
if err != nil {
t.Fatalf("Cannot execute remote command: %v", err)
}
got, err := stream.Recv()
if err != nil {
t.Fatalf("failed to get first line: %v", err)
}
expected := "Ping command was called with message: 'pong'.\n"
if logutil.EventString(got) != expected {
t.Errorf("Got unexpected log line '%v' expected '%v'", got.String(), expected)
}
got, err = stream.Recv()
if err != io.EOF {
t.Fatalf("Didn't get EOF as expected: %v", err)
}
// Reset vtworker for the next test function.
if err := runVtworkerCommand(client, []string{"Reset"}); err != nil {
t.Fatal(err)
}
}
func runVtworkerCommand(client vtworkerclient.Client, args []string) error {
stream, err := client.ExecuteVtworkerCommand(context.Background(), args)
if err != nil {
return fmt.Errorf("cannot execute remote command: %v", err)
}
for {
_, err := stream.Recv()
switch err {
case nil:
// Consume next response.
case io.EOF:
return nil
default:
return vterrors.Errorf(vterrors.Code(err), "unexpected error when reading the stream: %v", err)
}
}
}
// commandErrorsBecauseBusy tests that concurrent commands are rejected with
// TRANSIENT_ERROR while a command is already running.
// It also tests the correct propagation of the CANCELED error code.
func commandErrorsBecauseBusy(t *testing.T, client vtworkerclient.Client, serverSideCancelation bool) {
// Run the vtworker "Block" command which blocks until we cancel the context.
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
// blockCommandStarted will be closed after we're sure that vtworker is
// running the "Block" command.
blockCommandStarted := make(chan struct{})
var errorCodeCheck error
wg.Add(1)
go func() {
stream, err := client.ExecuteVtworkerCommand(ctx, []string{"Block"})
if err != nil {
t.Fatalf("Block command should not have failed: %v", err)
}
firstLineReceived := false
for {
if _, err := stream.Recv(); err != nil {
// We see CANCELED from the RPC client (client side cancelation) or
// from vtworker itself (server side cancelation).
if vterrors.Code(err) != vtrpcpb.Code_CANCELED {
errorCodeCheck = fmt.Errorf("Block command should only error due to canceled context: %v", err)
}
// Stream has finished.
break
}
if !firstLineReceived {
firstLineReceived = true
// The first log line will come from the "Block" command, so we are sure
// now that vtworker is actually executing it.
close(blockCommandStarted)
}
}
wg.Done()
}()
// Try to run a second, concurrent vtworker command.
// vtworker should send an error back that it's busy and we should retry later.
<-blockCommandStarted
gotErr := runVtworkerCommand(client, []string{"Ping", "Are you busy?"})
wantCode := vtrpcpb.Code_UNAVAILABLE
if gotCode := vterrors.Code(gotErr); gotCode != wantCode {
t.Fatalf("wrong error code for second cmd: got = %v, want = %v, err: %v", gotCode, wantCode, gotErr)
}
// Cancel running "Block" command.
if serverSideCancelation {
if err := runVtworkerCommand(client, []string{"Cancel"}); err != nil {
t.Fatal(err)
}
}
// Always cancel the context to not leak it (regardless of client or server
// side cancelation).
cancel()
wg.Wait()
if errorCodeCheck != nil {
t.Fatalf("Block command did not return the CANCELED error code: %v", errorCodeCheck)
}
// vtworker is now in a special state where the current job is already
// canceled but not reset yet. New commands are still failing with a
// retryable error.
gotErr2 := runVtworkerCommand(client, []string{"Ping", "canceled and still busy?"})
wantCode2 := vtrpcpb.Code_UNAVAILABLE
if gotCode2 := vterrors.Code(gotErr2); gotCode2 != wantCode2 {
t.Fatalf("wrong error code for second cmd before reset: got = %v, want = %v, err: %v", gotCode2, wantCode2, gotErr2)
}
// Reset vtworker for the next test function.
if err := resetVtworker(t, client); err != nil {
t.Fatal(err)
}
// Second vtworker command should succeed now after the first has finished.
if err := runVtworkerCommand(client, []string{"Ping", "You should not be busy anymore!"}); err != nil {
t.Fatalf("second cmd should not have failed: %v", err)
}
// Reset vtworker for the next test function.
if err := runVtworkerCommand(client, []string{"Reset"}); err != nil {
t.Fatal(err)
}
}
// resetVtworker will retry to "Reset" vtworker until it succeeds.
// Retries are necessary to cope with the following race:
// a) RPC started vtworker command e.g. "Block".
// b) client cancels RPC and triggers vtworker to cancel the running command.
// c) RPC returns with a response after cancelation was received by vtworker.
// d) vtworker is still canceling and shutting down the command.
// e) A new vtworker command e.g. "Reset" would fail at this point with
// "vtworker still executing" until the cancelation is complete.
func resetVtworker(t *testing.T, client vtworkerclient.Client) error {
start := time.Now()
attempts := 0
for {
attempts++
err := runVtworkerCommand(client, []string{"Reset"})
if err == nil {
return nil
}
if time.Since(start) > 5*time.Second {
return fmt.Errorf("Reset was not successful after 5s and %d attempts: %v", attempts, err)
}
if !strings.Contains(err.Error(), "worker still executing") {
return fmt.Errorf("Reset must not fail: %v", err)
}
t.Logf("retrying to Reset vtworker because the previous command has not finished yet. got err: %v", err)
continue
}
}
func commandErrors(t *testing.T, client vtworkerclient.Client) {
stream, err := client.ExecuteVtworkerCommand(context.Background(), []string{"NonexistingCommand"})
// The expected error could already be seen now or after the output channel is closed.
// To avoid checking for the same error twice, we don't check it here yet.
if err == nil {
// Don't check for errors until the output channel is closed.
// We expect the usage to be sent as output. However, we have to consider it
// optional and do not test for it because not all RPC implementations send
// the output after an error.
for {
_, err = stream.Recv()
if err != nil {
break
}
}
}
expected := "unknown command: NonexistingCommand"
if err == nil || !strings.Contains(err.Error(), expected) {
t.Fatalf("Unexpected remote error, got: '%v' was expecting to find '%v'", err, expected)
}
}
func commandPanics(t *testing.T, client vtworkerclient.Client) {
stream, err := client.ExecuteVtworkerCommand(context.Background(), []string{"Panic"})
// The expected error could already be seen now or after the output channel is closed.
// To avoid checking for the same error twice, we don't check it here yet.
if err == nil {
// Don't check for errors until the output channel is closed.
// No output expected in this case.
_, err = stream.Recv()
}
expected := "uncaught vtworker panic: Panic command was called. This should be caught by the vtworker framework and logged as an error."
if err == nil || !strings.Contains(err.Error(), expected) {
t.Fatalf("Unexpected remote error, got: '%v' was expecting to find '%v'", err, expected)
}
}