-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
replica_probe_test.go
190 lines (180 loc) · 6.83 KB
/
replica_probe_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver_test
import (
"context"
"testing"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
func TestReplicaProbeRequest(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
args := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
}
var seen struct {
syncutil.Mutex
m map[roachpb.StoreID]int
injectedErr *roachpb.Error
}
seen.m = map[roachpb.StoreID]int{}
filter := func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
if !args.IsProbe {
return 0, args.ForcedError
}
if args.ForcedError == nil {
t.Error("probe has no forcedError") // avoid Fatal on goroutine
return 0, args.ForcedError
}
seen.Lock()
defer seen.Unlock()
seen.m[args.StoreID]++
if seen.injectedErr != nil {
return 0, seen.injectedErr
}
return 0, args.ForcedError
}
// NB: At one point we tried to verify that the probe the tests sends below
// is seen in command application on *all* replicas.
//
// This was flaky[^1] due to snapshot application (g1 and g2 are the
// intertwined operations):
//
// - g1 | `tc.AddXOrFatal` begins
// - g1 | it calls into ChangeReplicas
// - g1 | Replica added to descriptor
// g2 | leader sends first MsgApp
// - g2 | follower gets MsgApp, rejects it, leader enqueues in Raft snapshot queue
// - g1 | block raft snapshots to follower
// - g1 | send INITIAL snapshot
// - g1 | unblock raft snapshots to follower
// - g1 | AddVoterOrWhatever returns to the test
// - g1 | test sends probe
// - g2 | raft log queue processes the queued Replica (followers's
// MsgAppResp following INITIAL snapshot not having reached leader yet)[^1]
// - g2 | it sends another snap which includes the probe
// - g2 | follower applies the probe via the snap.
// - g1 | times out waiting for the interceptor to see the probe on follower.
//
// We attempted to disable the raft snapshot queue. This caused deadlocks: if raft
// (erroneously or not) requests a snapshot, it will not replicate to the follower
// until it has been caught up. But the quorum size here is two, so this would cause
// a hanging test. We tried to work around this by marking snapshots as failed when
// the raft snapshot queue is disabled, but this then threw off other tests[^2] that
// wanted to observe this state.
//
// So we just check that the probe applies on at least one Replica, which
// always has to be true.
//
// [^1]: https://github.com/cockroachdb/cockroach/pull/92380
// [^2]: TestMigrateWithInflightSnapshot, TestSnapshotsToDrainingNodes, TestRaftSnapshotQueueSeesLearner
args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
// We set an ApplyFilter even though the probe should never
// show up there (since it always catches a forced error),
// precisely to ensure that it doesn't.
TestingApplyCalledTwiceFilter: filter,
// This is the main workhorse that counts probes and injects
// errors.
TestingApplyForcedErrFilter: filter,
}
tc := testcluster.StartTestCluster(t, 3 /* nodes */, args)
defer tc.Stopper().Stop(ctx)
k := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, k)
// Establish configuration [LEASEHOLDER FOLLOWER NONVOTER].
tc.AddVotersOrFatal(t, k, tc.Target(1))
tc.AddNonVotersOrFatal(t, k, tc.Target(2))
probeReq := &roachpb.ProbeRequest{
RequestHeader: roachpb.RequestHeader{
Key: k,
},
}
// Sanity check that ProbeRequest is fit for sending through the entire KV
// stack, with both routing policies.
for _, srv := range tc.Servers {
db := srv.DB()
{
var b kv.Batch
b.AddRawRequest(probeReq)
b.Header.RoutingPolicy = roachpb.RoutingPolicy_LEASEHOLDER
require.NoError(t, db.Run(ctx, &b))
}
{
var b kv.Batch
b.AddRawRequest(probeReq)
b.Header.RoutingPolicy = roachpb.RoutingPolicy_NEAREST
require.NoError(t, db.Run(ctx, &b))
}
}
// Check expected number of probes seen on each Replica in the apply loop.
// Each Replica must see every probe, since the probe is replicated and we
// don't expect truncations here.
// There could be more due to reproposals. If there are fewer, some probes
// returned success but didn't actually go through raft (for example because
// they short-circuited due to being no-op, which of course we're careful
// to avoid).
testutils.SucceedsSoon(t, func() error {
seen.Lock()
defer seen.Unlock()
if exp, act := len(seen.m), len(tc.Servers); exp != act {
return errors.Errorf("waiting for stores to apply command: %d/%d", act, exp)
}
// We'll usually see 2 * len(tc.Servers) probes since we sent two probes, but see
// the comment about errant snapshots above. We just want this test to be reliable
// so expect at least one probe in command application.
n := 1
for storeID, count := range seen.m {
if count < n {
return errors.Errorf("saw only %d probes on s%d", count, storeID)
}
}
return nil
})
// We can also probe directly at each Replica. This is the intended use case
// for Replica-level circuit breakers (#33007).
for _, srv := range tc.Servers {
repl, _, err := srv.Stores().GetReplicaForRangeID(ctx, desc.RangeID)
require.NoError(t, err)
ba := &roachpb.BatchRequest{}
ba.Add(probeReq)
ba.Timestamp = srv.Clock().Now()
_, pErr := repl.Send(ctx, ba)
require.NoError(t, pErr.GoError())
}
// If the probe applies with any nonstandard forced error, we get the error
// back. Not sure what other error might occur in practice, but checking this
// anyway gives us extra confidence in the implementation mechanics of this
// request.
injErr := roachpb.NewErrorf("bang")
seen.Lock()
seen.injectedErr = injErr
seen.Unlock()
for _, srv := range tc.Servers {
repl, _, err := srv.Stores().GetReplicaForRangeID(ctx, desc.RangeID)
require.NoError(t, err)
ba := &roachpb.BatchRequest{}
ba.Timestamp = srv.Clock().Now()
ba.Add(probeReq)
_, pErr := repl.Send(ctx, ba)
require.True(t, errors.Is(pErr.GoError(), injErr.GoError()), "%+v", pErr.GoError())
}
}