Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Commit

Permalink
Merge pull request #167 from xiangli-cmu/dev
Browse files Browse the repository at this point in the history
more safety
  • Loading branch information
benbjohnson committed Jan 27, 2014
2 parents 0a20921 + bdeac89 commit 5ec8c0d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
12 changes: 5 additions & 7 deletions peer.go
Expand Up @@ -146,14 +146,12 @@ func (p *Peer) heartbeat(c chan bool) {
func (p *Peer) flush() {
debugln("peer.heartbeat.flush: ", p.Name)
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
term := p.server.currentTerm

if p.server.State() != Leader {
return
}
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)

if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
Expand Down Expand Up @@ -192,13 +190,13 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
} else {
if resp.Term() > p.server.Term() || resp.CommitIndex() > p.server.CommitIndex() {
if resp.Term() > p.server.Term() {
// this happens when there is a new leader comes up that this *leader* has not
// known yet.
// this server can know until the new leader send a ae with higher term
// or this server finish processing this response.
debugln("peer.append.resp.not.update: new.leader.found")
} else if resp.CommitIndex() >= p.prevLogIndex {
} else if resp.Term() == req.Term && resp.CommitIndex() >= p.prevLogIndex {
// we may miss a response from peer
// so maybe the peer has committed the logs we just sent
// but we did not receive the successful reply and did not increase
Expand Down
15 changes: 10 additions & 5 deletions server.go
Expand Up @@ -495,6 +495,12 @@ func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
prevLeader := s.leader

if term > s.currentTerm {
// stop heartbeats before step-down
if s.state == Leader {
for _, peer := range s.peers {
peer.stopHeartbeat(false)
}
}
// update the term and clear vote for
s.state = Follower
s.currentTerm = term
Expand Down Expand Up @@ -775,6 +781,10 @@ func (s *server) leaderLoop() {
select {
case e := <-s.c:
if e.target == &stopValue {
// Stop all peers before stop
for _, peer := range s.peers {
peer.stopHeartbeat(false)
}
s.setState(Stopped)
} else {
switch req := e.target.(type) {
Expand All @@ -800,11 +810,6 @@ func (s *server) leaderLoop() {
}
}

// Stop all peers.
for _, peer := range s.peers {
peer.stopHeartbeat(false)
}

s.syncedPeer = nil
}

Expand Down

0 comments on commit 5ec8c0d

Please sign in to comment.