Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add TestCreateManyUnappliedProbes #104401

Merged
merged 8 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ go_library(
"//pkg/util/buildutil",
"//pkg/util/circuit",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/errorutil",
"//pkg/util/future",
Expand Down Expand Up @@ -248,6 +247,7 @@ go_test(
"client_atomic_membership_change_test.go",
"client_invalidsplit_test.go",
"client_lease_test.go",
"client_manual_proposal_test.go",
"client_merge_test.go",
"client_metrics_test.go",
"client_migration_test.go",
Expand Down Expand Up @@ -407,6 +407,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptstorage",
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rditer",
Expand Down
264 changes: 264 additions & 0 deletions pkg/kv/kvserver/client_manual_proposal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"math"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

// TestCreateManyUnappliedProbes is a (by default skipped) test that writes
// a very large unapplied raft log consisting entirely of probes.
//
// It's a toy example for #75729 but has been useful to validate improvements
// in the raft pipeline, so it is checked in to allow for future re-use for
// similar purposes.
//
// See also: https://github.com/cockroachdb/cockroach/issues/105177
func TestCreateManyUnappliedProbes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// NB: the actual logic takes only milliseconds, but we need to start/stop a
// test cluster twice and that is slow.
skip.UnderShort(t, "takes ~4s")

ctx := context.Background()

// NB: To set up a "real" cockroach-data dir with a large raft log in
// system.settings, and then restart the node to watch it apply the long raft
// log, you can use the below:
//
// p := os.ExpandEnv("$HOME/go/src/github.com/cockroachdb/cockroach/cockroach-data")
// const entsPerBatch = 100000
// const batches = 1000
//
// ./dev build && rm -rf cockroach-data && timeout 10 ./cockroach start-single-node --logtostderr --insecure ; \
// go test ./pkg/kv/kvserver/ -v --run TestCreateManyUnappliedProbes && sleep 3 && \
// (./cockroach start-single-node --logtostderr=INFO --insecure | grep -F r10/)
//
// Then wait and watch the `raft.commandsapplied` metric to see r10 apply the entries.
p := filepath.Join(t.TempDir(), "cockroach-data")
const entsPerBatch = 10
const batches = 3
rangeID := roachpb.RangeID(10) // system.settings

if _, err := os.Stat(p); err != nil {
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{StoreSpecs: []base.StoreSpec{
{Path: p},
}},
ReplicationMode: base.ReplicationManual,
}
tc := testcluster.StartTestCluster(t, 1, args)
// Reload system.settings' rangeID just in case it changes.
require.NoError(t, tc.ServerConn(0).QueryRow(`SELECT
range_id
FROM
[SHOW RANGES FROM TABLE system.settings]
ORDER BY
range_id ASC
LIMIT
1;`).Scan(&rangeID))
tc.Stopper().Stop(ctx)

defer func() {
if t.Failed() {
return
}
tc := testcluster.StartTestCluster(t, 1, args)
defer tc.Stopper().Stop(ctx)
require.NoError(t, tc.ServerConn(0).QueryRow(`SELECT count(1) FROM system.settings`).Err())
t.Log("read system.settings")
}()

}

st := cluster.MakeTestingClusterSettings()
eng, err := storage.Open(ctx, storage.Filesystem(p), st)
require.NoError(t, err)
defer eng.Close()

// Determine LastIndex, LastTerm, and next MaxLeaseIndex by scanning
// existing log.
it := raftlog.NewIterator(rangeID, eng, raftlog.IterOptions{})
defer it.Close()
rsl := logstore.NewStateLoader(rangeID)
lastIndex, err := rsl.LoadLastIndex(ctx, eng)
require.NoError(t, err)
t.Logf("loaded LastIndex: %d", lastIndex)
ok, err := it.SeekGE(lastIndex)
require.NoError(t, err)
require.True(t, ok)

var lai kvpb.LeaseAppliedIndex
var lastTerm uint64
require.NoError(t, raftlog.Visit(eng, rangeID, lastIndex, math.MaxUint64, func(entry raftpb.Entry) error {
ent, err := raftlog.NewEntry(it.Entry())
require.NoError(t, err)
if lai < ent.Cmd.MaxLeaseIndex {
lai = ent.Cmd.MaxLeaseIndex
}
lastTerm = ent.Term
return nil
}))

sl := stateloader.Make(rangeID)
lease, err := sl.LoadLease(ctx, eng)
require.NoError(t, err)

for batchIdx := 0; batchIdx < batches; batchIdx++ {
t.Logf("batch %d", batchIdx+1)
b := storage.NewOpLoggerBatch(eng.NewBatch())
defer b.Batch.Close()

var ents []raftpb.Entry
for i := 0; i < entsPerBatch; i++ {
lai++
c := &kvpb.ProbeRequest{}
resp := &kvpb.ProbeResponse{}
c.Key = keys.LocalMax

cmd, ok := batcheval.LookupCommand(c.Method())
require.True(t, ok)
// NB: this should really operate on a BatchRequest. We need to librarize
// evaluateBatch or its various callers.

evalCtx := &batcheval.MockEvalCtx{}
args := batcheval.CommandArgs{
EvalCtx: evalCtx.EvalContext(),
Header: kvpb.Header{},
Args: c,
Stats: &enginepb.MVCCStats{},
Uncertainty: uncertainty.Interval{},
}
res, err := cmd.EvalRW(ctx, b, args, resp)
require.NoError(t, err)
// NB: ideally evaluateProposal could be used directly here;
// we just cobble a `result.Result` together manually in this test.
res.WriteBatch = &kvserverpb.WriteBatch{Data: b.Repr()}
res.LogicalOpLog = &kvserverpb.LogicalOpLog{Ops: b.LogicalOps()}

// End of evaluation. Start of "proposing". This too is all cobbled
// together here and ideally we'd be able to call a method that is
// also invoked by the production code paths. For example, there is
// no real lease handling here so you can't use this code to propose
// a {Request,Transfer}LeaseRequest; we'd need [^1].
//
// [^1]: https://github.com/cockroachdb/cockroach/blob/9a7b735b1282bbb3fb7472cc26a47d516a446958/pkg/kv/kvserver/replica_raft.go#L192-L219
raftCmd := kvserverpb.RaftCommand{
ProposerLeaseSequence: lease.Sequence,
MaxLeaseIndex: lai,
// Rest was determined by evaluation.
ReplicatedEvalResult: res.Replicated,
WriteBatch: res.WriteBatch,
LogicalOpLog: res.LogicalOpLog,
}

idKey := raftlog.MakeCmdIDKey()
payload, err := raftlog.EncodeCommand(ctx, &raftCmd, idKey, nil)
require.NoError(t, err)
ents = append(ents, raftpb.Entry{
Term: lastTerm,
Index: uint64(lastIndex) + uint64(len(ents)) + 1,
Type: raftpb.EntryNormal,
Data: payload,
})
}

stats := &logstore.AppendStats{}

msgApp := raftpb.Message{
Type: raftpb.MsgStorageAppend,
To: raft.LocalAppendThread,
Term: lastTerm,
LogTerm: lastTerm,
Index: uint64(lastIndex),
Entries: ents,
Commit: uint64(lastIndex) + uint64(len(ents)),
Responses: []raftpb.Message{{}}, // need >0 responses so StoreEntries will sync
}

fakeMeta := metric.Metadata{
Name: "fake.meta",
}
swl := logstore.NewSyncWaiterLoop()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
swl.Start(ctx, stopper)
ls := logstore.LogStore{
RangeID: rangeID,
Engine: eng,
Sideload: nil,
StateLoader: rsl,
SyncWaiter: swl,
EntryCache: raftentry.NewCache(1024),
Settings: st,
Metrics: logstore.Metrics{
RaftLogCommitLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: fakeMeta,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
}),
},
}

wg := &sync.WaitGroup{}
wg.Add(1)
_, err = ls.StoreEntries(ctx, logstore.RaftState{
LastIndex: lastIndex,
LastTerm: kvpb.RaftTerm(lastTerm),
}, logstore.MakeMsgStorageAppend(msgApp), (*wgSyncCallback)(wg), stats)
require.NoError(t, err)
wg.Wait()

lastIndex = kvpb.RaftIndex(ents[len(ents)-1].Index)
}

t.Logf("LastIndex is now: %d", lastIndex)
}

type wgSyncCallback sync.WaitGroup

func (w *wgSyncCallback) OnLogSync(
ctx context.Context, messages []raftpb.Message, stats storage.BatchCommitStats,
) {
(*sync.WaitGroup)(w).Done()
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/logstore/logstore_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
Term: 1,
Index: 1,
Type: raftpb.EntryNormal,
Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, "deadbeef", data),
Data: raftlog.EncodeCommandBytes(raftlog.EntryEncodingStandardWithoutAC, "deadbeef", data),
})
stats := &AppendStats{}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/logstore/sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func mkEnt(
}
var ent raftpb.Entry
ent.Index, ent.Term = index, term
ent.Data = raftlog.EncodeRaftCommand(enc, kvserverbase.CmdIDKey(cmdIDKey), b)
ent.Data = raftlog.EncodeCommandBytes(enc, kvserverbase.CmdIDKey(cmdIDKey), b)
return ent
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/recovery_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func raftLogFromPendingDescriptorUpdate(
if err != nil {
t.Fatalf("failed to serialize raftCommand: %v", err)
}
data := raftlog.EncodeRaftCommand(
data := raftlog.EncodeCommandBytes(
raftlog.EntryEncodingStandardWithoutAC, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out)
ent := raftpb.Entry{
Term: 1,
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/raftlog/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"encoding.go",
"entry.go",
"iterator.go",
"payload.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog",
visibility = ["//visibility:public"],
Expand All @@ -20,7 +21,11 @@ go_library(
"//pkg/roachpb",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/encoding",
"//pkg/util/iterutil",
"//pkg/util/log",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@io_etcd_go_raft_v3//raftpb",
Expand Down
13 changes: 11 additions & 2 deletions pkg/kv/kvserver/raftlog/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ package raftlog

import (
"fmt"
"math/rand"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)

Expand Down Expand Up @@ -129,9 +131,9 @@ const (
RaftCommandPrefixLen = 1 + RaftCommandIDLen
)

// EncodeRaftCommand encodes a marshaled kvserverpb.RaftCommand using
// EncodeCommandBytes encodes a marshaled kvserverpb.RaftCommand using
// the given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC).
func EncodeRaftCommand(enc EntryEncoding, commandID kvserverbase.CmdIDKey, command []byte) []byte {
func EncodeCommandBytes(enc EntryEncoding, commandID kvserverbase.CmdIDKey, command []byte) []byte {
b := make([]byte, RaftCommandPrefixLen+len(command))
EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], enc, commandID)
copy(b[RaftCommandPrefixLen:], command)
Expand Down Expand Up @@ -170,3 +172,10 @@ func DecodeRaftAdmissionMeta(data []byte) (kvflowcontrolpb.RaftAdmissionMeta, er
}
return raftAdmissionMeta, nil
}

// MakeCmdIDKey populates a random CmdIDKey.
func MakeCmdIDKey() kvserverbase.CmdIDKey {
idKeyBuf := make([]byte, 0, RaftCommandIDLen)
idKeyBuf = encoding.EncodeUint64Ascending(idKeyBuf, uint64(rand.Int63()))
return kvserverbase.CmdIDKey(idKeyBuf)
}
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/raftlog/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func TestNewEntry(t *testing.T) {
"empty entry": {data: nil, expectEmpty: true},
// Proposed by CRDB on unquiescence.
"empty payload": {
data: EncodeRaftCommand(EntryEncodingStandardWithoutAC, "00000000", nil),
data: EncodeCommandBytes(EntryEncodingStandardWithoutAC, "00000000", nil),
expectEmpty: true,
},
"invalid": {
data: EncodeRaftCommand(EntryEncodingStandardWithAC, "00000000", []byte("not a protobuf")),
data: EncodeCommandBytes(EntryEncodingStandardWithAC, "00000000", []byte("not a protobuf")),
expectErr: true,
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raftlog/iter_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) {
cmd := mkRaftCommand(100, 1800, 2000)
cmdB, err := protoutil.Marshal(cmd)
require.NoError(b, err)
data := EncodeRaftCommand(EntryEncodingStandardWithoutAC, "cmd12345", cmdB)
data := EncodeCommandBytes(EntryEncodingStandardWithoutAC, "cmd12345", cmdB)

ent := raftpb.Entry{
Term: 1,
Expand Down
Loading