diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 688bab6d029b..c15156e3231a 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -630,10 +630,9 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) { startWithSingleRange: true, } defer mtc.Stop() - mtc.Start(t, 3) + mtc.Start(t, 1) const rangeID = 1 - mtc.replicateRange(rangeID, 1, 2) repl, err := mtc.stores[0].GetReplica(rangeID) if err != nil { @@ -647,8 +646,6 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) { t.Fatal(err) } - mtc.waitForValues(key, []int64{5, 5, 5}) - index, err := repl.GetLastIndex() if err != nil { t.Fatal(err) @@ -656,28 +653,24 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) { // Verifies the recomputed log size against what we track in `r.mu.raftLogSize`. assertCorrectRaftLogSize := func() error { - for _, s := range mtc.stores { - repl, err := s.GetReplica(rangeID) - if err != nil { - t.Fatal(err) - } - - // Recompute under raft lock so that the log doesn't change while we - // compute its size. - repl.RaftLock() - realSize, err := storage.ComputeRaftLogSize( - context.Background(), repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(), - ) - size := repl.GetRaftLogSize() - repl.RaftUnlock() + // Recompute under raft lock so that the log doesn't change while we + // compute its size. + repl.RaftLock() + realSize, err := storage.ComputeRaftLogSize( + context.Background(), repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(), + ) + size, _ := repl.GetRaftLogSize() + repl.RaftUnlock() - if err != nil { - t.Fatal(err) - } + if err != nil { + t.Fatal(err) + } - if size != realSize { - return fmt.Errorf("%s: raft log claims size %d, but is in fact %d", repl, size, realSize) - } + // If the size isn't trusted, it won't have to match (and in fact + // likely won't). In this test, this is because the upreplication + // elides old Raft log entries in the snapshot it uses. + if size != realSize { + return fmt.Errorf("%s: raft log claims size %d, but is in fact %d", repl, size, realSize) } return nil } @@ -690,6 +683,9 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) { t.Fatal(err) } + // Note that if there were multiple nodes, the Raft log sizes would not + // be correct for the followers as they would have received a shorter + // Raft log than the leader. assert.NoError(t, assertCorrectRaftLogSize()) } @@ -1304,7 +1300,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { } // Determine the current raft log size. - initLogSize := leaderRepl.GetRaftLogSize() + initLogSize, _ := leaderRepl.GetRaftLogSize() // While a majority nodes are down, write some data. putRes := make(chan *roachpb.Error) @@ -1335,7 +1331,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { // etc.). The important thing here is that the log doesn't grow // forever. logSizeLimit := int64(2 * sc.RaftMaxUncommittedEntriesSize) - curlogSize := leaderRepl.GetRaftLogSize() + curlogSize, _ := leaderRepl.GetRaftLogSize() logSize := curlogSize - initLogSize logSizeStr := humanizeutil.IBytes(logSize) // Note that logSize could be negative if something got truncated. diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index cd57b763c6a6..6eaf779025c4 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -322,12 +322,12 @@ func (r *Replica) ShouldBackpressureWrites() bool { return r.shouldBackpressureWrites() } -// GetRaftLogSize returns the approximate raft log size. See r.mu.raftLogSize -// for details. -func (r *Replica) GetRaftLogSize() int64 { +// GetRaftLogSize returns the approximate raft log size and whether it is +// trustworthy.. See r.mu.raftLogSize for details. +func (r *Replica) GetRaftLogSize() (int64, bool) { r.mu.RLock() defer r.mu.RUnlock() - return r.mu.raftLogSize + return r.mu.raftLogSize, r.mu.raftLogSizeTrusted } // GetCachedLastTerm returns the cached last term value. May return diff --git a/pkg/storage/raft_log_queue_test.go b/pkg/storage/raft_log_queue_test.go index ec2a6fe1bff4..d756d58f9470 100644 --- a/pkg/storage/raft_log_queue_test.go +++ b/pkg/storage/raft_log_queue_test.go @@ -887,20 +887,32 @@ func TestTruncateLogRecompute(t *testing.T) { key := roachpb.Key("a") repl := tc.store.LookupReplica(keys.MustAddr(key)) - var v roachpb.Value - v.SetBytes(bytes.Repeat([]byte("x"), RaftLogQueueStaleSize*5)) - put := roachpb.NewPut(key, v) - var ba roachpb.BatchRequest - ba.Add(put) - ba.RangeID = repl.RangeID - - if _, pErr := tc.store.Send(ctx, ba); pErr != nil { - t.Fatal(pErr) + trusted := func() bool { + repl.mu.Lock() + defer repl.mu.Unlock() + return repl.mu.raftLogSizeTrusted } + put := func() { + var v roachpb.Value + v.SetBytes(bytes.Repeat([]byte("x"), RaftLogQueueStaleSize*5)) + put := roachpb.NewPut(key, v) + var ba roachpb.BatchRequest + ba.Add(put) + ba.RangeID = repl.RangeID + + if _, pErr := tc.store.Send(ctx, ba); pErr != nil { + t.Fatal(pErr) + } + } + + put() + decision, err := newTruncateDecision(ctx, repl) assert.NoError(t, err) assert.True(t, decision.ShouldTruncate()) + // Should never trust initially, until recomputed at least once. + assert.False(t, trusted()) repl.mu.Lock() repl.mu.raftLogSizeTrusted = false @@ -913,5 +925,10 @@ func TestTruncateLogRecompute(t *testing.T) { // grown over threshold again; we compute instead that its size is correct). tc.store.SetRaftLogQueueActive(true) tc.store.MustForceRaftLogScanAndProcess() - verifyLogSizeInSync(t, repl) + + for i := 0; i < 2; i++ { + verifyLogSizeInSync(t, repl) + assert.True(t, trusted()) + put() // make sure we remain trusted and in sync + } } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index ebee6802ae1e..497e216c08ce 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -888,12 +888,6 @@ func (r *Replica) sendSnapshot( return &benignError{errors.New("raft status not initialized")} } - // TODO(tbg): send snapshots without the past raft log. This means replacing - // the state's truncated state with one whose index and term equal that of - // the RaftAppliedIndex of the snapshot. It looks like the code sending out - // the actual entries will do the right thing from then on (see anchor - // below). - _ = (*kvBatchSnapshotStrategy)(nil).Send usesReplicatedTruncatedState, err := engine.MVCCGetProto( ctx, snap.EngineSnap, keys.RaftTruncatedStateLegacyKey(r.RangeID), hlc.Timestamp{}, nil, engine.MVCCGetOptions{}, ) @@ -901,6 +895,22 @@ func (r *Replica) sendSnapshot( return errors.Wrap(err, "loading legacy truncated state") } + if !usesReplicatedTruncatedState && snap.State.TruncatedState.Index < snap.State.RaftAppliedIndex { + // If we're not using a legacy (replicated) truncated state, we avoid + // sending the (past) Raft log in the snapshot in the first place and + // send only those entries that are actually useful to the follower. + // This is done by changing the truncated state, which we're allowed + // to do since it is not a replicated key (and thus not subject to + // matching across replicas). The actual sending happens here: + _ = (*kvBatchSnapshotStrategy)(nil).Send + // and results in no log entries being sent at all. Note that + // Metadata.Index is really the applied index of the replica. + snap.State.TruncatedState = &roachpb.RaftTruncatedState{ + Index: snap.RaftSnap.Metadata.Index, + Term: snap.RaftSnap.Metadata.Term, + } + } + req := SnapshotRequest_Header{ State: snap.State, // Tell the recipient whether it needs to synthesize the new diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 2f1577eb6f8c..21fc60ed6f0f 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -2333,21 +2333,20 @@ func (r *Replica) applyRaftCommand( return storagepb.ReplicatedEvalResult{}, err } if !apply { - // TODO(tbg): As written, there is low confidence that nil'ing out - // the truncated state has the desired effect as our caller actually - // applies the side effects. It may have taken a copy and won't - // observe what we did. - // - // It's very difficult to test this functionality because of how - // difficult it is to test (*Replica).processRaftCommand (and this - // method). Instead of adding yet another terrible that that bends - // reality to its will in some clunky way, assert that we're never - // hitting this branch, which is supposed to be true until we stop - // sending the raft log in snapshots (#34287). - // Morally we would want to drop the command in checkForcedErrLocked, - // but that may be difficult to achieve. - log.Fatal(ctx, log.Safe(fmt.Sprintf("TruncatedState regressed:\nold: %+v\nnew: %+v", oldTruncatedState, rResult.State.TruncatedState))) + // The truncated state was discarded, so make sure we don't apply + // it to our in-memory state. rResult.State.TruncatedState = nil + rResult.RaftLogDelta = 0 + // We received a truncation that doesn't apply to us, so we know that + // there's a leaseholder out there with a log that has earlier entries + // than ours. That leader also guided our log size computations by + // giving us RaftLogDeltas for past truncations, and this was likely + // off. Mark our Raft log size is not trustworthy so that, assuming + // we step up as leader at some point in the future, we recompute + // our numbers. + r.mu.Lock() + r.mu.raftLogSizeTrusted = false + r.mu.Unlock() } } @@ -2392,6 +2391,22 @@ func (r *Replica) applyRaftCommand( return rResult, nil } +// handleTruncatedStateBelowRaft is called when a Raft command updates the truncated +// state. This isn't 100% trivial for two reasons: +// - in 19.1 we're making the TruncatedState key unreplicated, so there's a migration +// - we're making use of the above by not sending the Raft log in snapshots (the truncated +// state effectively determines the first index of the log, which requires it to be unreplicated). +// Updates to the HardState are sent out by a leaseholder truncating the log based on its local +// knowledge. For example, the leader might have a log 10..100 and truncates to 50, and will send +// out a TruncatedState with Index 50 to that effect. However, some replicas may not even have log +// entries that old, and must make sure to ignore this update to the truncated state, as it would +// otherwise clobber their "newer" truncated state. +// +// The returned boolean tells the caller whether to apply the truncated state's +// side effects, which means replacing the in-memory TruncatedState and applying +// the associated RaftLogDelta. It is usually expected to be true, but may not +// be for the first truncation after on a replica that recently received a +// snapshot. func handleTruncatedStateBelowRaft( ctx context.Context, oldTruncatedState, newTruncatedState *roachpb.RaftTruncatedState, diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 0791a4201f97..549edb5ed02c 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -999,6 +999,9 @@ func (r *Replica) applySnapshot( // by r.leasePostApply, but we called those above, so now it's safe to // wholesale replace r.mu.state. r.mu.state = s + // Snapshots typically have fewer log entries than the leaseholder. The next + // time we hold the lease, recompute the log size before making decisions. + r.mu.raftLogSizeTrusted = false r.assertStateLocked(ctx, r.store.Engine()) r.mu.Unlock()