Skip to content

Commit

Permalink
Merge #113658
Browse files Browse the repository at this point in the history
113658: kvserver: fix reproposals test with pipelined writes r=erikgrinaker a=pavelkalinnikov

Before this commit, the test was a no-op reporting 0 metrics. This is due to `isOurCommand` function which assumed that the key increment request is always the first request in `BatchRequest`. With pipelined writes, this is not true.

A typical request is:
> QueryIntent ["00001-testing",/Min), Increment ["00001-testing",/Min)

In this commit, `isOutCommand` scans the `BatchRequest` to find the command.

Before:
```
observed 0 async write restarts, observed 0/0 injected aborts, 0 injected illegal lease applied indexes
commands reproposed (unchanged): 1
commands reproposed (new LAI): 0
```
After:
```
observed 69 async write restarts, observed 0/69 injected aborts, 366 injected illegal lease applied indexes
commands reproposed (unchanged): 1
commands reproposed (new LAI): 297
```

Fixes #106504
Touches #110551
Epic: none
Release note: none

Co-authored-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
  • Loading branch information
craig[bot] and pav-kv committed Nov 3, 2023
2 parents 5aa6a0a + 7ed8c9e commit 08a253e
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions pkg/kv/kvserver/replica_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,12 @@ func testProposalsWithInjectedLeaseIndexAndReproposalError(t *testing.T, pipelin
isOurCommand := func(ba *kvpb.BatchRequest) (_ string, ok bool) {
if ba == nil {
return "", false // not local proposal
}
inc := ba.Requests[0].GetIncrement()
if inc == nil {
return "", false
}
key := string(inc.Key)
if !strings.HasSuffix(key, "-testing") {
} else if inc, found := ba.GetArg(kvpb.Increment); !found {
return "", false
} else if key := string(inc.(*kvpb.IncrementRequest).Key); strings.HasSuffix(key, "-testing") {
return key, true
}
return key, true
return "", false
}

rnd, seed := randutil.NewPseudoRand()
Expand All @@ -191,7 +187,7 @@ func testProposalsWithInjectedLeaseIndexAndReproposalError(t *testing.T, pipelin

cfg := TestStoreConfig(hlc.NewClockForTesting(nil))

var injectedReproposalErrors atomic.Int32
var injectedReproposalErrors atomic.Int64
{
var mu syncutil.Mutex
seen := map[string]int{} // access from proposal buffer under raftMu
Expand All @@ -206,12 +202,13 @@ func testProposalsWithInjectedLeaseIndexAndReproposalError(t *testing.T, pipelin
if !shouldInject(0.2, seen[key]) {
return nil
}
injectedReproposalErrors.Add(1)
t.Logf("inserting reproposal error for %s (seen %d times)", key, seen[key])
return errors.Errorf("injected error")
}
}

var insertedIllegalLeaseIndex atomic.Int32
var insertedIllegalLeaseIndex atomic.Int64
{
var mu syncutil.Mutex
seen := map[string]int{}
Expand Down Expand Up @@ -253,8 +250,8 @@ func testProposalsWithInjectedLeaseIndexAndReproposalError(t *testing.T, pipelin
key = append(key, "-testing"...)
return key
}
var observedAsyncWriteFailures atomic.Int32
var observedReproposalErrors atomic.Int32
var observedAsyncWriteFailures atomic.Int64
var observedReproposalErrors atomic.Int64
const iters = 300
expectations := map[string]int{}
for i := 0; i < iters; i++ {
Expand Down Expand Up @@ -337,19 +334,29 @@ func testProposalsWithInjectedLeaseIndexAndReproposalError(t *testing.T, pipelin
require.NoError(t, err)
require.EqualValues(t, exp, n)
}

t.Logf("observed %d async write restarts, observed %d/%d injected aborts, %d injected illegal lease applied indexes",
observedAsyncWriteFailures.Load(), observedReproposalErrors.Load(), injectedReproposalErrors.Load(), insertedIllegalLeaseIndex.Load())
t.Logf("commands reproposed (unchanged): %d", tc.store.metrics.RaftCommandsReproposed.Count())
t.Logf("commands reproposed (new LAI): %d", tc.store.metrics.RaftCommandsReproposedLAI.Count())

if pipelined {
// If we did pipelined writes, if we needed to repropose and injected an
// error, this should surface as an async write failure instead.
require.Zero(t, observedReproposalErrors.Load())
}
if !pipelined {
require.Equal(t, injectedReproposalErrors.Load(), observedAsyncWriteFailures.Load())
} else {
// If we're not pipelined, we shouldn't be able to get an async write
// failure. This isn't testing anything about reproposals per se, rather
// it's validation that we're truly not doing pipelined writes.
require.Zero(t, observedAsyncWriteFailures.Load())
// All the injected reproposal errors should manifest to the transaction.
require.Equal(t, injectedReproposalErrors.Load(), observedReproposalErrors.Load())
}
// All incorrect lease indices should manifest either as a reproposal, or a
// failed reproposal (when an error is injected).
require.Equal(t, insertedIllegalLeaseIndex.Load(),
tc.store.metrics.RaftCommandsReproposedLAI.Count()+injectedReproposalErrors.Load())
}

func checkNoLeakedTraceSpans(t *testing.T, store *Store) {
Expand Down

0 comments on commit 08a253e

Please sign in to comment.