Skip to content

Commit

Permalink
storage: Send snapshot messages with the correct term
Browse files Browse the repository at this point in the history
This fixes a regression in #12686, which meant that a replica that
fell too far behind would be unable to ever catch up because it would
think that it was receiving out-of-date snapshots.

Fixes #13506
  • Loading branch information
bdarnell committed Feb 9, 2017
1 parent 330deec commit 2c3b3ca
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 42 deletions.
149 changes: 108 additions & 41 deletions pkg/storage/client_raft_test.go
Expand Up @@ -636,56 +636,123 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) {
// truncated.
func TestSnapshotAfterTruncation(t *testing.T) {
defer leaktest.AfterTest(t)()
mtc := &multiTestContext{}
defer mtc.Stop()
mtc.Start(t, 3)
repl, err := mtc.stores[0].GetReplica(1)
if err != nil {
t.Fatal(err)
}

key := roachpb.Key("a")
incA := int64(5)
incB := int64(7)
incAB := incA + incB
for _, changeTerm := range []bool{false, true} {
name := "same term"
if changeTerm {
name = "different term"
}
t.Run(name, func(t *testing.T) {
mtc := &multiTestContext{}
defer mtc.Stop()
mtc.Start(t, 3)
const stoppedStore = 1
repl, err := mtc.stores[0].GetReplica(1)
if err != nil {
t.Fatal(err)
}

// Set up a key to replicate across the cluster. We're going to modify this
// key and truncate the raft logs from that command after killing one of the
// nodes to check that it gets the new value after it comes up.
incArgs := incrementArgs(key, incA)
if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), incArgs); err != nil {
t.Fatal(err)
}
key := roachpb.Key("a")
incA := int64(5)
incB := int64(7)
incAB := incA + incB

mtc.replicateRange(1, 1, 2)
mtc.waitForValues(key, []int64{incA, incA, incA})
// Set up a key to replicate across the cluster. We're going to modify this
// key and truncate the raft logs from that command after killing one of the
// nodes to check that it gets the new value after it comes up.
incArgs := incrementArgs(key, incA)
if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), incArgs); err != nil {
t.Fatal(err)
}

// Now kill store 1, increment the key on the other stores and truncate
// their logs to make sure that when store 1 comes back up it will require a
// non-preemptive snapshot from Raft.
mtc.stopStore(1)
mtc.replicateRange(1, 1, 2)
mtc.waitForValues(key, []int64{incA, incA, incA})

incArgs = incrementArgs(key, incB)
if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), incArgs); err != nil {
t.Fatal(err)
}
// Now kill one store, increment the key on the other stores and truncate
// their logs to make sure that when store 1 comes back up it will require a
// non-preemptive snapshot from Raft.
mtc.stopStore(stoppedStore)

mtc.waitForValues(key, []int64{incAB, incA, incAB})
incArgs = incrementArgs(key, incB)
if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), incArgs); err != nil {
t.Fatal(err)
}

index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
mtc.waitForValues(key, []int64{incAB, incA, incAB})

// Truncate the log at index+1 (log entries < N are removed, so this
// includes the increment).
truncArgs := truncateLogArgs(index+1, 1)
if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), truncArgs); err != nil {
t.Fatal(err)
}
mtc.restartStore(1)
index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}

// Truncate the log at index+1 (log entries < N are removed, so this
// includes the increment).
truncArgs := truncateLogArgs(index+1, 1)
if _, err := client.SendWrapped(context.Background(), rg1(mtc.stores[0]), truncArgs); err != nil {
t.Fatal(err)
}

mtc.waitForValues(key, []int64{incAB, incAB, incAB})
if changeTerm {
for i := range mtc.stores {
if i != stoppedStore {
// Stop and restart all the stores, which guarantees that
// we won't be in the same term we started with.
mtc.stopStore(i)
mtc.restartStore(i)
// Disable the snapshot queue on the live stores so that
// stoppedStore won't get a snapshot as soon as it starts
// back up.
mtc.stores[i].SetRaftSnapshotQueueActive(false)
}
}

// Restart the stopped store and wait for raft
// election/heartbeat traffic to settle down. Specifically, we
// need stoppedStore to know about the new term number before
// the snapshot is sent to reproduce #13506. If the snapshot
// happened before it learned the term, it would accept the
// snapshot no matter what term it contained.
mtc.restartStore(stoppedStore)
testutils.SucceedsSoon(t, func() error {
hasLeader := false
term := uint64(0)
for i := range mtc.stores {
repl, err := mtc.stores[i].GetReplica(1)
if err != nil {
return err
}
status := repl.RaftStatus()
if status == nil {
return errors.New("raft status not initialized")
}
if status.RaftState == raft.StateLeader {
hasLeader = true
}
if term == 0 {
term = status.Term
} else if status.Term != term {
return errors.Errorf("terms do not agree: %d vs %d", status.Term, term)
}
}
if !hasLeader {
return errors.New("no leader")
}
return nil
})

// Turn the queues back on and wait for the snapshot to be sent and processed.
for i, store := range mtc.stores {
if i != stoppedStore {
store.SetRaftSnapshotQueueActive(true)
store.ForceRaftSnapshotQueueProcess()
}
}
} else { // !changeTerm
mtc.restartStore(stoppedStore)
}
mtc.waitForValues(key, []int64{incAB, incAB, incAB})
})
}
}

type fakeSnapshotStream struct {
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/helpers_test.go
Expand Up @@ -101,6 +101,13 @@ func (s *Store) ForceTimeSeriesMaintenanceQueueProcess() {
forceScanAndProcess(s, s.tsMaintenanceQueue.baseQueue)
}

// ForceRaftSnapshotQueueProcess iterates over all ranges, enqueuing
// any that need raft snapshots, then processes the raft snapshot
// queue.
func (s *Store) ForceRaftSnapshotQueueProcess() {
forceScanAndProcess(s, s.raftSnapshotQueue.baseQueue)
}

// ConsistencyQueueShouldQueue invokes the shouldQueue method on the
// store's consistency queue.
func (s *Store) ConsistencyQueueShouldQueue(
Expand Down Expand Up @@ -146,6 +153,11 @@ func (s *Store) SetSplitQueueActive(active bool) {
s.setSplitQueueActive(active)
}

// SetRaftSnapshotQueueActive enables or disables the raft snapshotqueue.
func (s *Store) SetRaftSnapshotQueueActive(active bool) {
s.setRaftSnapshotQueueActive(active)
}

// SetReplicaScannerActive enables or disables the scanner. Note that while
// inactive, removals are still processed.
func (s *Store) SetReplicaScannerActive(active bool) {
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/replica_command.go
Expand Up @@ -3340,6 +3340,11 @@ func (r *Replica) sendSnapshot(
}
}

status := r.RaftStatus()
if status == nil {
return errors.New("raft status not initialized")
}

req := SnapshotRequest_Header{
State: snap.State,
RaftMessageRequest: RaftMessageRequest{
Expand All @@ -3350,7 +3355,7 @@ func (r *Replica) sendSnapshot(
Type: raftpb.MsgSnap,
To: uint64(repDesc.ReplicaID),
From: uint64(fromRepDesc.ReplicaID),
Term: snap.RaftSnap.Metadata.Term,
Term: status.Term,
Snapshot: snap.RaftSnap,
},
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/store.go
Expand Up @@ -4001,6 +4001,9 @@ func (s *Store) setSplitQueueActive(active bool) {
func (s *Store) setTimeSeriesMaintenanceQueueActive(active bool) {
s.tsMaintenanceQueue.SetDisabled(!active)
}
func (s *Store) setRaftSnapshotQueueActive(active bool) {
s.raftSnapshotQueue.SetDisabled(!active)
}
func (s *Store) setScannerActive(active bool) {
s.scanner.SetDisabled(!active)
}

0 comments on commit 2c3b3ca

Please sign in to comment.