Skip to content

Commit

Permalink
raft: fix a few problems for implementing leader lease
Browse files Browse the repository at this point in the history
  • Loading branch information
swingbach committed May 28, 2016
1 parent b6c67b4 commit 2e0518a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 36 deletions.
4 changes: 2 additions & 2 deletions raft/raft.go
Expand Up @@ -565,7 +565,7 @@ func (r *raft) Step(m pb.Message) error {
case m.Term > r.Term:
lead := m.From
if m.Type == pb.MsgVote {
if r.state == StateFollower && r.checkQuorum && r.electionElapsed < r.electionTimeout {
if r.checkQuorum && r.state != StateCandidate && r.electionElapsed < r.electionTimeout {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored vote from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
return nil
Expand All @@ -578,7 +578,7 @@ func (r *raft) Step(m pb.Message) error {
case m.Term < r.Term:
if r.checkQuorum && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
// Free the stuck peer by responding a msg with a higher term
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp})
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
} else {
// ignore other cases
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
Expand Down
98 changes: 64 additions & 34 deletions raft/raft_test.go
Expand Up @@ -1225,25 +1225,6 @@ func TestLeaderStepdownWhenQuorumLost(t *testing.T) {
}
}

func TestLeaderSupersedingWithoutCheckQuorum(t *testing.T) {
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())

nt := newNetwork(a, b, c)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
if c.state != StateFollower {
t.Errorf("state = %s, want %s", c.state, StateFollower)
}

nt.isolate(1)

nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
if c.state != StateLeader {
t.Errorf("state = %s, want %s", c.state, StateLeader)
}
}

func TestLeaderSupersedingWithCheckQuorum(t *testing.T) {
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
Expand All @@ -1254,7 +1235,12 @@ func TestLeaderSupersedingWithCheckQuorum(t *testing.T) {
c.checkQuorum = true

nt := newNetwork(a, b, c)
b.electionElapsed = b.electionTimeout

// Prevent campaigning from b
b.randomizedElectionTimeout = b.electionTimeout + 1
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})

if a.state != StateLeader {
Expand All @@ -1265,23 +1251,20 @@ func TestLeaderSupersedingWithCheckQuorum(t *testing.T) {
t.Errorf("state = %s, want %s", c.state, StateFollower)
}

nt.isolate(1)

nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})

// Peer b rejected c's vote since its electionElapsed had not reached to electionTimeout
if c.state != StateCandidate {
t.Errorf("state = %s, want %s", c.state, StateCandidate)
}

// Prevent campaigning from raft b
if b.randomizedElectionTimeout == b.electionTimeout {
b.randomizedElectionTimeout = b.electionTimeout + 1
}

// Letting b's electionElapsed reach to electionTimeout
b.randomizedElectionTimeout = b.electionTimeout + 1
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}

nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})

if c.state != StateLeader {
t.Errorf("state = %s, want %s", c.state, StateLeader)
}
Expand All @@ -1297,17 +1280,43 @@ func TestLeaderElectionWithCheckQuorum(t *testing.T) {
c.checkQuorum = true

nt := newNetwork(a, b, c)
b.electionElapsed = b.electionTimeout

// Letting b's electionElapsed reach to timeout so that it can vote for a
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})

b.electionElapsed = b.electionTimeout
if a.state != StateLeader {
t.Errorf("state = %s, want %s", a.state, StateLeader)
}

if c.state != StateFollower {
t.Errorf("state = %s, want %s", c.state, StateFollower)
}

for i := 0; i < a.electionTimeout; i++ {
a.tick()
}
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})

if a.state != StateFollower {
t.Errorf("state = %s, want %s", a.state, StateFollower)
}

if c.state != StateLeader {
t.Errorf("state = %s, want %s", c.state, StateLeader)
}
}

// TestFreeStuckPeerWithCheckQuorum ensures a stuck candidate with a higher term
// can be freed after receiving leader's MsgApp or MsgHeartbeatMsg
// the reason causes a stuck candidate is because it might be rejected by other peers if
// quorum check is on and those peers are still within leader lease, it ends up having a
// higher term than the quorum
func TestFreeStuckPeerWithCheckQuorum(t *testing.T) {
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
Expand All @@ -1318,24 +1327,45 @@ func TestFreeStuckPeerWithCheckQuorum(t *testing.T) {
c.checkQuorum = true

nt := newNetwork(a, b, c)
b.electionElapsed = b.electionTimeout
for i := 0; i < b.electionTimeout; i++ {
b.tick()
}
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})

nt.isolate(1)
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})

if b.state != StateFollower {
t.Errorf("state = %s, want %s", b.state, StateFollower)
}

if c.state != StateCandidate {
t.Errorf("state = %s, want %s", c.state, StateCandidate)
}

if c.Term != b.Term+1 {
t.Errorf("term = %d, want %d", c.Term, b.Term+1)
}

// Vote again for safety
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})

if b.state != StateFollower {
t.Errorf("state = %s, want %s", b.state, StateFollower)
}

if c.state != StateCandidate {
t.Errorf("state = %s, want %s", c.state, StateCandidate)
}

if c.Term != a.Term+1 {
t.Errorf("term = %d, want %d", c.Term, a.Term+1)
if c.Term != b.Term+2 {
t.Errorf("term = %d, want %d", c.Term, b.Term+2)
}

nt.recover()
nt.send(pb.Message{From: 1, To: 3, Type: pb.MsgHeartbeat, Term: a.Term})

// Disrupt the leader so that the stuck peer can be freed
// Disrupt the leader so that the stuck peer is freed
if a.state != StateFollower {
t.Errorf("state = %s, want %s", a.state, StateFollower)
}
Expand Down

0 comments on commit 2e0518a

Please sign in to comment.