diff --git a/peer.go b/peer.go index 3de4659..9eedf46 100644 --- a/peer.go +++ b/peer.go @@ -146,14 +146,12 @@ func (p *Peer) heartbeat(c chan bool) { func (p *Peer) flush() { debugln("peer.heartbeat.flush: ", p.Name) prevLogIndex := p.getPrevLogIndex() - entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest) + term := p.server.currentTerm - if p.server.State() != Leader { - return - } + entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest) if entries != nil { - p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries)) + p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries)) } else { p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot)) } @@ -192,13 +190,13 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { // If it was unsuccessful then decrement the previous log index and // we'll try again next time. } else { - if resp.Term() > p.server.Term() || resp.CommitIndex() > p.server.CommitIndex() { + if resp.Term() > p.server.Term() { // this happens when there is a new leader comes up that this *leader* has not // known yet. // this server can know until the new leader send a ae with higher term // or this server finish processing this response. debugln("peer.append.resp.not.update: new.leader.found") - } else if resp.CommitIndex() >= p.prevLogIndex { + } else if resp.Term() == req.Term && resp.CommitIndex() >= p.prevLogIndex { // we may miss a response from peer // so maybe the peer has committed the logs we just sent // but we did not receive the successful reply and did not increase diff --git a/server.go b/server.go index 9f32740..40db607 100644 --- a/server.go +++ b/server.go @@ -495,6 +495,12 @@ func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) { prevLeader := s.leader if term > s.currentTerm { + // stop heartbeats before step-down + if s.state == Leader { + for _, peer := range s.peers { + peer.stopHeartbeat(false) + } + } // update the term and clear vote for s.state = Follower s.currentTerm = term @@ -775,6 +781,10 @@ func (s *server) leaderLoop() { select { case e := <-s.c: if e.target == &stopValue { + // Stop all peers before stop + for _, peer := range s.peers { + peer.stopHeartbeat(false) + } s.setState(Stopped) } else { switch req := e.target.(type) { @@ -800,11 +810,6 @@ func (s *server) leaderLoop() { } } - // Stop all peers. - for _, peer := range s.peers { - peer.stopHeartbeat(false) - } - s.syncedPeer = nil }