From 9fc9a65e6e0c92c7f5349b352462ca7d0503a46c Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 2 Jun 2023 23:16:42 +0200 Subject: [PATCH] kvserver: add TestCreateManyUnappliedProbes This is the test used for https://github.com/cockroachdb/cockroach/pull/102953. Epic: none Release note: none --- pkg/kv/kvserver/BUILD.bazel | 2 + .../kvserver/client_manual_proposal_test.go | 264 ++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 pkg/kv/kvserver/client_manual_proposal_test.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index b60bf00f524f..9c75361ea619 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -247,6 +247,7 @@ go_test( "client_atomic_membership_change_test.go", "client_invalidsplit_test.go", "client_lease_test.go", + "client_manual_proposal_test.go", "client_merge_test.go", "client_metrics_test.go", "client_migration_test.go", @@ -406,6 +407,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/kv/kvserver/protectedts/ptstorage", "//pkg/kv/kvserver/protectedts/ptutil", + "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rditer", diff --git a/pkg/kv/kvserver/client_manual_proposal_test.go b/pkg/kv/kvserver/client_manual_proposal_test.go new file mode 100644 index 000000000000..1f96839c0395 --- /dev/null +++ b/pkg/kv/kvserver/client_manual_proposal_test.go @@ -0,0 +1,264 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "math" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" + "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" +) + +// TestCreateManyUnappliedProbes is a (by default skipped) test that writes +// a very large unapplied raft log consisting entirely of probes. +// +// It's a toy example for #75729 but has been useful to validate improvements +// in the raft pipeline, so it is checked in to allow for future re-use for +// similar purposes. +// +// See also: https://github.com/cockroachdb/cockroach/issues/105177 +func TestCreateManyUnappliedProbes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + // NB: the actual logic takes only milliseconds, but we need to start/stop a + // test cluster twice and that is slow. + skip.UnderShort(t, "takes ~4s") + + ctx := context.Background() + + // NB: To set up a "real" cockroach-data dir with a large raft log in + // system.settings, and then restart the node to watch it apply the long raft + // log, you can use the below: + // + // p := os.ExpandEnv("$HOME/go/src/github.com/cockroachdb/cockroach/cockroach-data") + // const entsPerBatch = 100000 + // const batches = 1000 + // + // ./dev build && rm -rf cockroach-data && timeout 10 ./cockroach start-single-node --logtostderr --insecure ; \ + // go test ./pkg/kv/kvserver/ -v --run TestCreateManyUnappliedProbes && sleep 3 && \ + // (./cockroach start-single-node --logtostderr=INFO --insecure | grep -F r10/) + // + // Then wait and watch the `raft.commandsapplied` metric to see r10 apply the entries. + p := filepath.Join(t.TempDir(), "cockroach-data") + const entsPerBatch = 10 + const batches = 3 + rangeID := roachpb.RangeID(10) // system.settings + + if _, err := os.Stat(p); err != nil { + args := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{StoreSpecs: []base.StoreSpec{ + {Path: p}, + }}, + ReplicationMode: base.ReplicationManual, + } + tc := testcluster.StartTestCluster(t, 1, args) + // Reload system.settings' rangeID just in case it changes. + require.NoError(t, tc.ServerConn(0).QueryRow(`SELECT + range_id +FROM + [SHOW RANGES FROM TABLE system.settings] +ORDER BY + range_id ASC +LIMIT + 1;`).Scan(&rangeID)) + tc.Stopper().Stop(ctx) + + defer func() { + if t.Failed() { + return + } + tc := testcluster.StartTestCluster(t, 1, args) + defer tc.Stopper().Stop(ctx) + require.NoError(t, tc.ServerConn(0).QueryRow(`SELECT count(1) FROM system.settings`).Err()) + t.Log("read system.settings") + }() + + } + + st := cluster.MakeTestingClusterSettings() + eng, err := storage.Open(ctx, storage.Filesystem(p), st) + require.NoError(t, err) + defer eng.Close() + + // Determine LastIndex, LastTerm, and next MaxLeaseIndex by scanning + // existing log. + it := raftlog.NewIterator(rangeID, eng, raftlog.IterOptions{}) + defer it.Close() + rsl := logstore.NewStateLoader(rangeID) + lastIndex, err := rsl.LoadLastIndex(ctx, eng) + require.NoError(t, err) + t.Logf("loaded LastIndex: %d", lastIndex) + ok, err := it.SeekGE(lastIndex) + require.NoError(t, err) + require.True(t, ok) + + var lai kvpb.LeaseAppliedIndex + var lastTerm uint64 + require.NoError(t, raftlog.Visit(eng, rangeID, lastIndex, math.MaxUint64, func(entry raftpb.Entry) error { + ent, err := raftlog.NewEntry(it.Entry()) + require.NoError(t, err) + if lai < ent.Cmd.MaxLeaseIndex { + lai = ent.Cmd.MaxLeaseIndex + } + lastTerm = ent.Term + return nil + })) + + sl := stateloader.Make(rangeID) + lease, err := sl.LoadLease(ctx, eng) + require.NoError(t, err) + + for batchIdx := 0; batchIdx < batches; batchIdx++ { + t.Logf("batch %d", batchIdx+1) + b := storage.NewOpLoggerBatch(eng.NewBatch()) + defer b.Batch.Close() + + var ents []raftpb.Entry + for i := 0; i < entsPerBatch; i++ { + lai++ + c := &kvpb.ProbeRequest{} + resp := &kvpb.ProbeResponse{} + c.Key = keys.LocalMax + + cmd, ok := batcheval.LookupCommand(c.Method()) + require.True(t, ok) + // NB: this should really operate on a BatchRequest. We need to librarize + // evaluateBatch or its various callers. + + evalCtx := &batcheval.MockEvalCtx{} + args := batcheval.CommandArgs{ + EvalCtx: evalCtx.EvalContext(), + Header: kvpb.Header{}, + Args: c, + Stats: &enginepb.MVCCStats{}, + Uncertainty: uncertainty.Interval{}, + } + res, err := cmd.EvalRW(ctx, b, args, resp) + require.NoError(t, err) + // NB: ideally evaluateProposal could be used directly here; + // we just cobble a `result.Result` together manually in this test. + res.WriteBatch = &kvserverpb.WriteBatch{Data: b.Repr()} + res.LogicalOpLog = &kvserverpb.LogicalOpLog{Ops: b.LogicalOps()} + + // End of evaluation. Start of "proposing". This too is all cobbled + // together here and ideally we'd be able to call a method that is + // also invoked by the production code paths. For example, there is + // no real lease handling here so you can't use this code to propose + // a {Request,Transfer}LeaseRequest; we'd need [^1]. + // + // [^1]: https://github.com/cockroachdb/cockroach/blob/9a7b735b1282bbb3fb7472cc26a47d516a446958/pkg/kv/kvserver/replica_raft.go#L192-L219 + raftCmd := kvserverpb.RaftCommand{ + ProposerLeaseSequence: lease.Sequence, + MaxLeaseIndex: lai, + // Rest was determined by evaluation. + ReplicatedEvalResult: res.Replicated, + WriteBatch: res.WriteBatch, + LogicalOpLog: res.LogicalOpLog, + } + + idKey := raftlog.MakeCmdIDKey() + payload, err := raftlog.EncodeCommand(ctx, &raftCmd, idKey, nil) + require.NoError(t, err) + ents = append(ents, raftpb.Entry{ + Term: lastTerm, + Index: uint64(lastIndex) + uint64(len(ents)) + 1, + Type: raftpb.EntryNormal, + Data: payload, + }) + } + + stats := &logstore.AppendStats{} + + msgApp := raftpb.Message{ + Type: raftpb.MsgStorageAppend, + To: raft.LocalAppendThread, + Term: lastTerm, + LogTerm: lastTerm, + Index: uint64(lastIndex), + Entries: ents, + Commit: uint64(lastIndex) + uint64(len(ents)), + Responses: []raftpb.Message{{}}, // need >0 responses so StoreEntries will sync + } + + fakeMeta := metric.Metadata{ + Name: "fake.meta", + } + swl := logstore.NewSyncWaiterLoop() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + swl.Start(ctx, stopper) + ls := logstore.LogStore{ + RangeID: rangeID, + Engine: eng, + Sideload: nil, + StateLoader: rsl, + SyncWaiter: swl, + EntryCache: raftentry.NewCache(1024), + Settings: st, + Metrics: logstore.Metrics{ + RaftLogCommitLatency: metric.NewHistogram(metric.HistogramOptions{ + Mode: metric.HistogramModePrometheus, + Metadata: fakeMeta, + Duration: time.Millisecond, + Buckets: metric.NetworkLatencyBuckets, + }), + }, + } + + wg := &sync.WaitGroup{} + wg.Add(1) + _, err = ls.StoreEntries(ctx, logstore.RaftState{ + LastIndex: lastIndex, + LastTerm: kvpb.RaftTerm(lastTerm), + }, logstore.MakeMsgStorageAppend(msgApp), (*wgSyncCallback)(wg), stats) + require.NoError(t, err) + wg.Wait() + + lastIndex = kvpb.RaftIndex(ents[len(ents)-1].Index) + } + + t.Logf("LastIndex is now: %d", lastIndex) +} + +type wgSyncCallback sync.WaitGroup + +func (w *wgSyncCallback) OnLogSync( + ctx context.Context, messages []raftpb.Message, stats storage.BatchCommitStats, +) { + (*sync.WaitGroup)(w).Done() +}