Skip to content

Commit

Permalink
storage: don't send historical Raft log with snapshots
Browse files Browse the repository at this point in the history
Assume a leaseholder wants to send a (Raft or preemptive) snapshot to a follower.
Say the leaseholder's log ranges from 100 to 200, and we assume that the size
(in bytes) of this log is 200mb. All of the log is successfully
committed and applied, and is thus reflected in the snapshot data.

Prior to this change, we would still send the 200mb of log entries along
with the snapshot, even though the snapshot itself already reflected
them.

After this change, we won't send any log entries along with the
snapshot, as sanity would suggest we would.

We were unable to make this change because up until recently, the Raft
truncated state (which dictates the first log index) was replicated and
consistency checked; this was changed in #34660. The migration
introduced there makes it straightforward to omit a prefix of the
log in snapshots, as done in this commit.

Somewhere down the road (19.2?) we should localize all the log
truncation decisions and simplify all this further. I suspect
that in doing so we can avoid tracking the size of the Raft log
in the first place; all we really need for this is some mechanism
that makes sure that an "idle" replica truncates its logs. With
unreplicated truncation, this becomes cheap enough to "just do".

Release note (bug fix): Remove historical log entries from Raft snapshots.
These log entries could lead to failed snapshots with a message such as:

    snapshot failed: aborting snapshot because raft log is too large
    (25911051 bytes after processing 7 of 37 entries)
  • Loading branch information
tbg committed Mar 13, 2019
1 parent e995453 commit 5d1124f
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 60 deletions.
48 changes: 22 additions & 26 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,9 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) {
startWithSingleRange: true,
}
defer mtc.Stop()
mtc.Start(t, 3)
mtc.Start(t, 1)

const rangeID = 1
mtc.replicateRange(rangeID, 1, 2)

repl, err := mtc.stores[0].GetReplica(rangeID)
if err != nil {
Expand All @@ -647,37 +646,31 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) {
t.Fatal(err)
}

mtc.waitForValues(key, []int64{5, 5, 5})

index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}

// Verifies the recomputed log size against what we track in `r.mu.raftLogSize`.
assertCorrectRaftLogSize := func() error {
for _, s := range mtc.stores {
repl, err := s.GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}

// Recompute under raft lock so that the log doesn't change while we
// compute its size.
repl.RaftLock()
realSize, err := storage.ComputeRaftLogSize(
context.Background(), repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(),
)
size := repl.GetRaftLogSize()
repl.RaftUnlock()
// Recompute under raft lock so that the log doesn't change while we
// compute its size.
repl.RaftLock()
realSize, err := storage.ComputeRaftLogSize(
context.Background(), repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(),
)
size, _ := repl.GetRaftLogSize()
repl.RaftUnlock()

if err != nil {
t.Fatal(err)
}
if err != nil {
t.Fatal(err)
}

if size != realSize {
return fmt.Errorf("%s: raft log claims size %d, but is in fact %d", repl, size, realSize)
}
// If the size isn't trusted, it won't have to match (and in fact
// likely won't). In this test, this is because the upreplication
// elides old Raft log entries in the snapshot it uses.
if size != realSize {
return fmt.Errorf("%s: raft log claims size %d, but is in fact %d", repl, size, realSize)
}
return nil
}
Expand All @@ -690,6 +683,9 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) {
t.Fatal(err)
}

// Note that if there were multiple nodes, the Raft log sizes would not
// be correct for the followers as they would have received a shorter
// Raft log than the leader.
assert.NoError(t, assertCorrectRaftLogSize())
}

Expand Down Expand Up @@ -1304,7 +1300,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
}

// Determine the current raft log size.
initLogSize := leaderRepl.GetRaftLogSize()
initLogSize, _ := leaderRepl.GetRaftLogSize()

// While a majority nodes are down, write some data.
putRes := make(chan *roachpb.Error)
Expand Down Expand Up @@ -1335,7 +1331,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
// etc.). The important thing here is that the log doesn't grow
// forever.
logSizeLimit := int64(2 * sc.RaftMaxUncommittedEntriesSize)
curlogSize := leaderRepl.GetRaftLogSize()
curlogSize, _ := leaderRepl.GetRaftLogSize()
logSize := curlogSize - initLogSize
logSizeStr := humanizeutil.IBytes(logSize)
// Note that logSize could be negative if something got truncated.
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,12 @@ func (r *Replica) ShouldBackpressureWrites() bool {
return r.shouldBackpressureWrites()
}

// GetRaftLogSize returns the approximate raft log size. See r.mu.raftLogSize
// for details.
func (r *Replica) GetRaftLogSize() int64 {
// GetRaftLogSize returns the approximate raft log size and whether it is
// trustworthy.. See r.mu.raftLogSize for details.
func (r *Replica) GetRaftLogSize() (int64, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.raftLogSize
return r.mu.raftLogSize, r.mu.raftLogSizeTrusted
}

// GetCachedLastTerm returns the cached last term value. May return
Expand Down
37 changes: 27 additions & 10 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,20 +887,32 @@ func TestTruncateLogRecompute(t *testing.T) {
key := roachpb.Key("a")
repl := tc.store.LookupReplica(keys.MustAddr(key))

var v roachpb.Value
v.SetBytes(bytes.Repeat([]byte("x"), RaftLogQueueStaleSize*5))
put := roachpb.NewPut(key, v)
var ba roachpb.BatchRequest
ba.Add(put)
ba.RangeID = repl.RangeID

if _, pErr := tc.store.Send(ctx, ba); pErr != nil {
t.Fatal(pErr)
trusted := func() bool {
repl.mu.Lock()
defer repl.mu.Unlock()
return repl.mu.raftLogSizeTrusted
}

put := func() {
var v roachpb.Value
v.SetBytes(bytes.Repeat([]byte("x"), RaftLogQueueStaleSize*5))
put := roachpb.NewPut(key, v)
var ba roachpb.BatchRequest
ba.Add(put)
ba.RangeID = repl.RangeID

if _, pErr := tc.store.Send(ctx, ba); pErr != nil {
t.Fatal(pErr)
}
}

put()

decision, err := newTruncateDecision(ctx, repl)
assert.NoError(t, err)
assert.True(t, decision.ShouldTruncate())
// Should never trust initially, until recomputed at least once.
assert.False(t, trusted())

repl.mu.Lock()
repl.mu.raftLogSizeTrusted = false
Expand All @@ -913,5 +925,10 @@ func TestTruncateLogRecompute(t *testing.T) {
// grown over threshold again; we compute instead that its size is correct).
tc.store.SetRaftLogQueueActive(true)
tc.store.MustForceRaftLogScanAndProcess()
verifyLogSizeInSync(t, repl)

for i := 0; i < 2; i++ {
verifyLogSizeInSync(t, repl)
assert.True(t, trusted())
put() // make sure we remain trusted and in sync
}
}
20 changes: 14 additions & 6 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,19 +888,27 @@ func (r *Replica) sendSnapshot(
return &benignError{errors.New("raft status not initialized")}
}

// TODO(tbg): send snapshots without the past raft log. This means replacing
// the state's truncated state with one whose index and term equal that of
// the RaftAppliedIndex of the snapshot. It looks like the code sending out
// the actual entries will do the right thing from then on (see anchor
// below).
_ = (*kvBatchSnapshotStrategy)(nil).Send
usesReplicatedTruncatedState, err := engine.MVCCGetProto(
ctx, snap.EngineSnap, keys.RaftTruncatedStateLegacyKey(r.RangeID), hlc.Timestamp{}, nil, engine.MVCCGetOptions{},
)
if err != nil {
return errors.Wrap(err, "loading legacy truncated state")
}

if !usesReplicatedTruncatedState && snap.State.TruncatedState.Index < snap.State.RaftAppliedIndex {
// If we're not using a legacy (replicated) truncated state, we can
// avoid sending the (past) Raft log in the snapshot in the first place
// and send only those entries that are actually useful to the follower.
//
// TODO(tbg): sending any entries at all doesn't seem worth it as now
// again we're relying on some mechanisms (quota pool, Raft uncommitted
// size limit) to make sure this doesn't grow without bounds.
snap.State.TruncatedState = &roachpb.RaftTruncatedState{
Index: snap.RaftSnap.Metadata.Index,
Term: snap.RaftSnap.Metadata.Term,
}
}

req := SnapshotRequest_Header{
State: snap.State,
// Tell the recipient whether it needs to synthesize the new
Expand Down
25 changes: 11 additions & 14 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2333,21 +2333,18 @@ func (r *Replica) applyRaftCommand(
return storagepb.ReplicatedEvalResult{}, err
}
if !apply {
// TODO(tbg): As written, there is low confidence that nil'ing out
// the truncated state has the desired effect as our caller actually
// applies the side effects. It may have taken a copy and won't
// observe what we did.
//
// It's very difficult to test this functionality because of how
// difficult it is to test (*Replica).processRaftCommand (and this
// method). Instead of adding yet another terrible that that bends
// reality to its will in some clunky way, assert that we're never
// hitting this branch, which is supposed to be true until we stop
// sending the raft log in snapshots (#34287).
// Morally we would want to drop the command in checkForcedErrLocked,
// but that may be difficult to achieve.
log.Fatal(ctx, log.Safe(fmt.Sprintf("TruncatedState regressed:\nold: %+v\nnew: %+v", oldTruncatedState, rResult.State.TruncatedState)))
rResult.State.TruncatedState = nil
rResult.RaftLogDelta = 0
// We received a truncation that doesn't apply to us, so we know that
// there's a leaseholder out there with a log that has earlier entries
// than ours. That leader also guided our log size computations by
// giving us RaftLogDeltas for past truncations, and this was likely
// off. Mark our Raft log size is not trustworthy so that, assuming
// we step up as leader at some point in the future, we recompute
// our numbers.
r.mu.Lock()
r.mu.raftLogSizeTrusted = false
r.mu.Unlock()
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,9 @@ func (r *Replica) applySnapshot(
// by r.leasePostApply, but we called those above, so now it's safe to
// wholesale replace r.mu.state.
r.mu.state = s
// Snapshots typically have less log entries than the leaseholder. The next
// time we hold the lease, recompute the log size before making decisions.
r.mu.raftLogSizeTrusted = false
r.assertStateLocked(ctx, r.store.Engine())
r.mu.Unlock()

Expand Down

0 comments on commit 5d1124f

Please sign in to comment.