Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: don't send historical Raft log with snapshots #35701

Merged
merged 1 commit into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 22 additions & 26 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,9 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) {
startWithSingleRange: true,
}
defer mtc.Stop()
mtc.Start(t, 3)
mtc.Start(t, 1)

const rangeID = 1
mtc.replicateRange(rangeID, 1, 2)

repl, err := mtc.stores[0].GetReplica(rangeID)
if err != nil {
Expand All @@ -647,37 +646,31 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) {
t.Fatal(err)
}

mtc.waitForValues(key, []int64{5, 5, 5})

index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}

// Verifies the recomputed log size against what we track in `r.mu.raftLogSize`.
assertCorrectRaftLogSize := func() error {
for _, s := range mtc.stores {
repl, err := s.GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}

// Recompute under raft lock so that the log doesn't change while we
// compute its size.
repl.RaftLock()
realSize, err := storage.ComputeRaftLogSize(
context.Background(), repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(),
)
size := repl.GetRaftLogSize()
repl.RaftUnlock()
// Recompute under raft lock so that the log doesn't change while we
// compute its size.
repl.RaftLock()
realSize, err := storage.ComputeRaftLogSize(
context.Background(), repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(),
)
size, _ := repl.GetRaftLogSize()
repl.RaftUnlock()

if err != nil {
t.Fatal(err)
}
if err != nil {
t.Fatal(err)
}

if size != realSize {
return fmt.Errorf("%s: raft log claims size %d, but is in fact %d", repl, size, realSize)
}
// If the size isn't trusted, it won't have to match (and in fact
// likely won't). In this test, this is because the upreplication
// elides old Raft log entries in the snapshot it uses.
if size != realSize {
return fmt.Errorf("%s: raft log claims size %d, but is in fact %d", repl, size, realSize)
}
return nil
}
Expand All @@ -690,6 +683,9 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) {
t.Fatal(err)
}

// Note that if there were multiple nodes, the Raft log sizes would not
// be correct for the followers as they would have received a shorter
// Raft log than the leader.
assert.NoError(t, assertCorrectRaftLogSize())
}

Expand Down Expand Up @@ -1304,7 +1300,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
}

// Determine the current raft log size.
initLogSize := leaderRepl.GetRaftLogSize()
initLogSize, _ := leaderRepl.GetRaftLogSize()

// While a majority nodes are down, write some data.
putRes := make(chan *roachpb.Error)
Expand Down Expand Up @@ -1335,7 +1331,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
// etc.). The important thing here is that the log doesn't grow
// forever.
logSizeLimit := int64(2 * sc.RaftMaxUncommittedEntriesSize)
curlogSize := leaderRepl.GetRaftLogSize()
curlogSize, _ := leaderRepl.GetRaftLogSize()
logSize := curlogSize - initLogSize
logSizeStr := humanizeutil.IBytes(logSize)
// Note that logSize could be negative if something got truncated.
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,12 @@ func (r *Replica) ShouldBackpressureWrites() bool {
return r.shouldBackpressureWrites()
}

// GetRaftLogSize returns the approximate raft log size. See r.mu.raftLogSize
// for details.
func (r *Replica) GetRaftLogSize() int64 {
// GetRaftLogSize returns the approximate raft log size and whether it is
// trustworthy.. See r.mu.raftLogSize for details.
func (r *Replica) GetRaftLogSize() (int64, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.raftLogSize
return r.mu.raftLogSize, r.mu.raftLogSizeTrusted
}

// GetCachedLastTerm returns the cached last term value. May return
Expand Down
37 changes: 27 additions & 10 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,20 +887,32 @@ func TestTruncateLogRecompute(t *testing.T) {
key := roachpb.Key("a")
repl := tc.store.LookupReplica(keys.MustAddr(key))

var v roachpb.Value
v.SetBytes(bytes.Repeat([]byte("x"), RaftLogQueueStaleSize*5))
put := roachpb.NewPut(key, v)
var ba roachpb.BatchRequest
ba.Add(put)
ba.RangeID = repl.RangeID

if _, pErr := tc.store.Send(ctx, ba); pErr != nil {
t.Fatal(pErr)
trusted := func() bool {
repl.mu.Lock()
defer repl.mu.Unlock()
return repl.mu.raftLogSizeTrusted
}

put := func() {
var v roachpb.Value
v.SetBytes(bytes.Repeat([]byte("x"), RaftLogQueueStaleSize*5))
put := roachpb.NewPut(key, v)
var ba roachpb.BatchRequest
ba.Add(put)
ba.RangeID = repl.RangeID

if _, pErr := tc.store.Send(ctx, ba); pErr != nil {
t.Fatal(pErr)
}
}

put()

decision, err := newTruncateDecision(ctx, repl)
assert.NoError(t, err)
assert.True(t, decision.ShouldTruncate())
// Should never trust initially, until recomputed at least once.
assert.False(t, trusted())

repl.mu.Lock()
repl.mu.raftLogSizeTrusted = false
Expand All @@ -913,5 +925,10 @@ func TestTruncateLogRecompute(t *testing.T) {
// grown over threshold again; we compute instead that its size is correct).
tc.store.SetRaftLogQueueActive(true)
tc.store.MustForceRaftLogScanAndProcess()
verifyLogSizeInSync(t, repl)

for i := 0; i < 2; i++ {
verifyLogSizeInSync(t, repl)
assert.True(t, trusted())
put() // make sure we remain trusted and in sync
}
}
22 changes: 16 additions & 6 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,19 +888,29 @@ func (r *Replica) sendSnapshot(
return &benignError{errors.New("raft status not initialized")}
}

// TODO(tbg): send snapshots without the past raft log. This means replacing
// the state's truncated state with one whose index and term equal that of
// the RaftAppliedIndex of the snapshot. It looks like the code sending out
// the actual entries will do the right thing from then on (see anchor
// below).
_ = (*kvBatchSnapshotStrategy)(nil).Send
usesReplicatedTruncatedState, err := engine.MVCCGetProto(
ctx, snap.EngineSnap, keys.RaftTruncatedStateLegacyKey(r.RangeID), hlc.Timestamp{}, nil, engine.MVCCGetOptions{},
)
if err != nil {
return errors.Wrap(err, "loading legacy truncated state")
}

if !usesReplicatedTruncatedState && snap.State.TruncatedState.Index < snap.State.RaftAppliedIndex {
// If we're not using a legacy (replicated) truncated state, we avoid
// sending the (past) Raft log in the snapshot in the first place and
// send only those entries that are actually useful to the follower.
// This is done by changing the truncated state, which we're allowed
// to do since it is not a replicated key (and thus not subject to
// matching across replicas). The actual sending happens here:
_ = (*kvBatchSnapshotStrategy)(nil).Send
// and results in no log entries being sent at all. Note that
// Metadata.Index is really the applied index of the replica.
snap.State.TruncatedState = &roachpb.RaftTruncatedState{
Index: snap.RaftSnap.Metadata.Index,
Term: snap.RaftSnap.Metadata.Term,
}
}

req := SnapshotRequest_Header{
State: snap.State,
// Tell the recipient whether it needs to synthesize the new
Expand Down
43 changes: 29 additions & 14 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2333,21 +2333,20 @@ func (r *Replica) applyRaftCommand(
return storagepb.ReplicatedEvalResult{}, err
}
if !apply {
// TODO(tbg): As written, there is low confidence that nil'ing out
// the truncated state has the desired effect as our caller actually
// applies the side effects. It may have taken a copy and won't
// observe what we did.
//
// It's very difficult to test this functionality because of how
// difficult it is to test (*Replica).processRaftCommand (and this
// method). Instead of adding yet another terrible that that bends
// reality to its will in some clunky way, assert that we're never
// hitting this branch, which is supposed to be true until we stop
// sending the raft log in snapshots (#34287).
// Morally we would want to drop the command in checkForcedErrLocked,
// but that may be difficult to achieve.
log.Fatal(ctx, log.Safe(fmt.Sprintf("TruncatedState regressed:\nold: %+v\nnew: %+v", oldTruncatedState, rResult.State.TruncatedState)))
// The truncated state was discarded, so make sure we don't apply
// it to our in-memory state.
rResult.State.TruncatedState = nil
rResult.RaftLogDelta = 0
// We received a truncation that doesn't apply to us, so we know that
// there's a leaseholder out there with a log that has earlier entries
// than ours. That leader also guided our log size computations by
// giving us RaftLogDeltas for past truncations, and this was likely
// off. Mark our Raft log size is not trustworthy so that, assuming
// we step up as leader at some point in the future, we recompute
// our numbers.
r.mu.Lock()
r.mu.raftLogSizeTrusted = false
r.mu.Unlock()
}
}

Expand Down Expand Up @@ -2392,6 +2391,22 @@ func (r *Replica) applyRaftCommand(
return rResult, nil
}

// handleTruncatedStateBelowRaft is called when a Raft command updates the truncated
// state. This isn't 100% trivial for two reasons:
// - in 19.1 we're making the TruncatedState key unreplicated, so there's a migration
// - we're making use of the above by not sending the Raft log in snapshots (the truncated
// state effectively determines the first index of the log, which requires it to be unreplicated).
// Updates to the HardState are sent out by a leaseholder truncating the log based on its local
// knowledge. For example, the leader might have a log 10..100 and truncates to 50, and will send
// out a TruncatedState with Index 50 to that effect. However, some replicas may not even have log
// entries that old, and must make sure to ignore this update to the truncated state, as it would
// otherwise clobber their "newer" truncated state.
//
// The returned boolean tells the caller whether to apply the truncated state's
// side effects, which means replacing the in-memory TruncatedState and applying
// the associated RaftLogDelta. It is usually expected to be true, but may not
// be for the first truncation after on a replica that recently received a
// snapshot.
func handleTruncatedStateBelowRaft(
ctx context.Context,
oldTruncatedState, newTruncatedState *roachpb.RaftTruncatedState,
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,9 @@ func (r *Replica) applySnapshot(
// by r.leasePostApply, but we called those above, so now it's safe to
// wholesale replace r.mu.state.
r.mu.state = s
// Snapshots typically have fewer log entries than the leaseholder. The next
// time we hold the lease, recompute the log size before making decisions.
r.mu.raftLogSizeTrusted = false
r.assertStateLocked(ctx, r.store.Engine())
r.mu.Unlock()

Expand Down