Skip to content

Commit

Permalink
Refactoring to support joint consensus.
Browse files Browse the repository at this point in the history
Summary:
Separate election-related bookkeeping from group membership;
allow group membership to be updated separately from commits.

Test Plan: go test

Reviewers: spencerkimball

Reviewed By: spencerkimball

Differential Revision: http://phabricator.cockroachdb.org/D76
  • Loading branch information
bdarnell committed Jul 15, 2014
1 parent 3b5af75 commit 447610d
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 85 deletions.
160 changes: 106 additions & 54 deletions multiraft/multiraft.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,12 @@ func (m *MultiRaft) SubmitCommand(groupID GroupID, command []byte) error {
// Role represents the state of the node in a group.
type Role int

// Nodes can be either followers, candidates, or leaders.
// Nodes can be either observers, followers, candidates, or leaders. Observers receive
// replicated logs but do not vote. There is at most one Leader per term; a node cannot become
// a Leader without first becoming a Candiate and winning an election.
const (
RoleFollower Role = iota
RoleObserver Role = iota
RoleFollower
RoleCandidate
RoleLeader
)
Expand All @@ -220,12 +223,14 @@ type group struct {
// are updated immediately; the 'persisted' versions are updated later after they have been
// (asynchronously) written to stable storage. The group is 'dirty' whenever the current
// and persisted data differ.
metadata *GroupMetadata
lastLogIndex int
lastLogTerm int
persistedMetadata *GroupMetadata
persistedLastIndex int
persistedLastTerm int
electionState *GroupElectionState
committedMembers *GroupMembers
lastLogIndex int
lastLogTerm int
persistedElectionState *GroupElectionState
persistedCommittedMembers *GroupMembers
persistedLastIndex int
persistedLastTerm int

// Volatile state
role Role
Expand All @@ -234,7 +239,10 @@ type group struct {
electionDeadline time.Time
votes map[NodeID]bool

// Leader state. Reset on election.
// Candidate/leader volatile state. Reset on conversion to candidate.
currentMembers *GroupMembers

// Leader volatile state. Reset on election.
nextIndex map[NodeID]int // default: lastLogIndex + 1
matchIndex map[NodeID]int // default: 0

Expand All @@ -247,8 +255,9 @@ type group struct {

func newGroup(groupID GroupID, members []NodeID) *group {
return &group{
groupID: groupID,
metadata: &GroupMetadata{
groupID: groupID,
electionState: &GroupElectionState{},
committedMembers: &GroupMembers{
Members: members,
},
role: RoleFollower,
Expand All @@ -257,15 +266,25 @@ func newGroup(groupID GroupID, members []NodeID) *group {
}
}

// findQuorumIndex examines matchIndex to find the smallest log index that a quorum has
// agreed on.
// findQuorumIndex examines matchIndex to find the largest log index that a quorum has
// agreed on. This method is aware of the "joint consensus" state during membership changes
// and reports the minimum index agreed to by the new and old membership sets (considered
// separately).
func (g *group) findQuorumIndex() int {
oldQuorum := g.findQuorumIndexInNodes(g.currentMembers.Members)
if len(g.currentMembers.ProposedMembers) > 0 {
newQuorum := g.findQuorumIndexInNodes(g.currentMembers.ProposedMembers)
if newQuorum < oldQuorum {
return newQuorum
}
}
return oldQuorum
}

func (g *group) findQuorumIndexInNodes(nodes []NodeID) int {
var indices []int
for _, nodeID := range g.metadata.Members {
// We want a zero if the node has not yet reported a matching index, so use the ", ok"
// form but discard the "ok".
matchIndex, _ := g.matchIndex[nodeID]
indices = append(indices, matchIndex)
for _, nodeID := range nodes {
indices = append(indices, g.matchIndex[nodeID])
}
sort.Ints(indices)
quorumPos := len(indices)/2 + 1
Expand Down Expand Up @@ -426,7 +445,7 @@ func (s *state) createGroup(op *createGroupOp) {
op.ch <- util.Errorf("group %v already exists", op.group.groupID)
return
}
for _, member := range op.group.metadata.Members {
for _, member := range op.group.committedMembers.Members {
if node, ok := s.nodes[member]; ok {
node.refCount++
continue
Expand All @@ -453,7 +472,7 @@ func (s *state) submitCommand(op *submitCommandOp) {

g.lastLogIndex++
entry := &LogEntry{
Term: g.metadata.CurrentTerm,
Term: g.electionState.CurrentTerm,
Index: g.lastLogIndex,
Type: LogEntryCommand,
Payload: op.command,
Expand All @@ -471,27 +490,43 @@ func (s *state) requestVoteRequest(req *RequestVoteRequest, resp *RequestVoteRes
call.Done <- call
return
}
if g.metadata.VotedFor.isSet() && g.metadata.VotedFor != req.CandidateID {
if g.electionState.VotedFor.isSet() && g.electionState.VotedFor != req.CandidateID {
resp.VoteGranted = false
} else {
// TODO: check log positions
g.metadata.CurrentTerm = req.Term
g.electionState.CurrentTerm = req.Term
resp.VoteGranted = true
}
resp.Term = g.metadata.CurrentTerm
g.pendingCalls.PushBack(&pendingCall{call, g.metadata.CurrentTerm, -1})
resp.Term = g.electionState.CurrentTerm
g.pendingCalls.PushBack(&pendingCall{call, g.electionState.CurrentTerm, -1})
s.updateDirtyStatus(g)
}

func hasMajority(votes map[NodeID]bool, members []NodeID) bool {
voteCount := 0
for _, node := range members {
if votes[node] {
voteCount++
}
}
return voteCount*2 > len(members)
}

func (s *state) requestVoteResponse(req *RequestVoteRequest, resp *RequestVoteResponse) {
g := s.groups[req.GroupID]
if resp.Term < g.metadata.CurrentTerm {
if resp.Term < g.electionState.CurrentTerm {
return
}
if resp.VoteGranted {
g.votes[req.DestNode] = resp.VoteGranted
}
if g.role == RoleCandidate && len(g.votes)*2 > len(g.metadata.Members) {
// We can convert from Candidate to Leader if we have enough votes. If we are in a
// transitional "joint consensus" state, we need a quorum of votes from both the old
// and new memberships.
if g.role == RoleCandidate &&
hasMajority(g.votes, g.currentMembers.Members) &&
(len(g.currentMembers.ProposedMembers) == 0 ||
hasMajority(g.votes, g.currentMembers.ProposedMembers)) {
g.role = RoleLeader
glog.V(1).Infof("node %v becoming leader for group %v", s.nodeID, g.groupID)
s.sendEvent(&EventLeaderElection{g.groupID, s.nodeID})
Expand All @@ -512,8 +547,8 @@ func (s *state) requestVoteResponse(req *RequestVoteRequest, resp *RequestVoteRe
func (s *state) appendEntriesRequest(req *AppendEntriesRequest, resp *AppendEntriesResponse,
call *rpc.Call) {
g := s.groups[req.GroupID]
resp.Term = g.metadata.CurrentTerm
if req.Term < g.metadata.CurrentTerm {
resp.Term = g.electionState.CurrentTerm
if req.Term < g.electionState.CurrentTerm {
resp.Success = false
call.Done <- call
return
Expand Down Expand Up @@ -557,9 +592,9 @@ func (s *state) handleWriteReady() {
for groupID, group := range s.dirtyGroups {
req := &groupWriteRequest{}
writeRequest.groups[groupID] = req
if !group.metadata.Equal(group.persistedMetadata) {
copy := *group.metadata
req.metadata = &copy
if !group.electionState.Equal(group.persistedElectionState) {
copy := *group.electionState
req.electionState = &copy
}
if len(group.pendingEntries) > 0 {
req.entries = group.pendingEntries
Expand All @@ -569,28 +604,37 @@ func (s *state) handleWriteReady() {
s.writeTask.in <- writeRequest
}

func (s *state) broadcastEntries(g *group, entries []*LogEntry) {
if g.role != RoleLeader {
return
}
glog.V(6).Infof("node %v: broadcasting entries to followers", s.nodeID)
for _, id := range g.currentMembers.Members {
node := s.nodes[id]
node.client.appendEntries(&AppendEntriesRequest{
RequestHeader: RequestHeader{s.nodeID, id},
GroupID: g.groupID,
Term: g.electionState.CurrentTerm,
LeaderID: s.nodeID,
PrevLogIndex: g.persistedLastIndex,
PrevLogTerm: g.persistedLastTerm,
LeaderCommit: g.commitIndex,
Entries: entries,
})
}
}

func (s *state) handleWriteResponse(response *writeResponse) {
glog.V(6).Infof("node %v got write response: %#v", s.nodeID, *response)
for groupID, persistedGroup := range response.groups {
g := s.groups[groupID]
if persistedGroup.metadata != nil {
g.persistedMetadata = persistedGroup.metadata
if persistedGroup.electionState != nil {
g.persistedElectionState = persistedGroup.electionState
}
if persistedGroup.lastIndex != -1 {
glog.V(6).Infof("updating persisted log index to %v", persistedGroup.lastIndex)
for _, id := range g.metadata.Members {
node := s.nodes[id]
node.client.appendEntries(&AppendEntriesRequest{
RequestHeader: RequestHeader{s.nodeID, id},
GroupID: g.groupID,
Term: g.metadata.CurrentTerm,
LeaderID: s.nodeID,
PrevLogIndex: g.persistedLastIndex,
PrevLogTerm: g.persistedLastTerm,
LeaderCommit: g.commitIndex,
Entries: persistedGroup.entries,
})
}
glog.V(6).Infof("node %v: updating persisted log index to %v", s.nodeID,
persistedGroup.lastIndex)
s.broadcastEntries(g, persistedGroup.entries)
g.persistedLastIndex = persistedGroup.lastIndex
g.persistedLastTerm = persistedGroup.lastTerm
}
Expand All @@ -599,10 +643,10 @@ func (s *state) handleWriteResponse(response *writeResponse) {
var toDelete []*list.Element
for e := g.pendingCalls.Front(); e != nil; e = e.Next() {
call := e.Value.(*pendingCall)
if g.persistedMetadata == nil || g.persistedLastIndex == -1 {
if g.persistedElectionState == nil || g.persistedLastIndex == -1 {
continue
}
if call.term != -1 && call.term > g.persistedMetadata.CurrentTerm {
if call.term != -1 && call.term > g.persistedElectionState.CurrentTerm {
continue
}
if call.logIndex != -1 && call.logIndex > g.persistedLastIndex {
Expand Down Expand Up @@ -632,16 +676,18 @@ func (s *state) becomeCandidate(g *group) {
panic("cannot transition from leader to candidate")
}
g.role = RoleCandidate
g.metadata.CurrentTerm++
g.metadata.VotedFor = s.nodeID
g.electionState.CurrentTerm++
g.electionState.VotedFor = s.nodeID
g.votes = make(map[NodeID]bool)
// TODO(bdarnell): scan the uncommitted tail to find currentMembers.
g.currentMembers = g.committedMembers
s.updateElectionDeadline(g)
for _, id := range g.metadata.Members {
for _, id := range g.currentMembers.Members {
node := s.nodes[id]
node.client.requestVote(&RequestVoteRequest{
RequestHeader: RequestHeader{s.nodeID, id},
GroupID: g.groupID,
Term: g.metadata.CurrentTerm,
Term: g.electionState.CurrentTerm,
CandidateID: s.nodeID,
LastLogIndex: g.lastLogIndex,
LastLogTerm: g.lastLogTerm,
Expand All @@ -654,11 +700,15 @@ func (s *state) commitEntries(g *group, index int) {
if index <= g.commitIndex {
// Commit index cannot actually move backwards, but a newly-elected leader might
// report stale positions for a short time so just ignore them.
glog.V(6).Infof("node %v: ignoring commit index %v because it is behind existing commit %v",
s.nodeID, index, g.commitIndex)
return
}
if index > g.persistedLastIndex {
// If we are not caught up with the leader, just commit as far as we can.
// We'll continue to commit new entries as we receive AppendEntriesRequests.
glog.V(6).Infof("node %v: leader is commited to %v, but capping to %v",
s.nodeID, index, g.persistedLastIndex)
index = g.persistedLastIndex
}
glog.V(6).Infof("node %v advancing commit position for group %v from %v to %v",
Expand All @@ -667,17 +717,19 @@ func (s *state) commitEntries(g *group, index int) {
entries := make(chan *LogEntryState, 100)
go s.Storage.GetLogEntries(g.groupID, g.commitIndex+1, index, entries)
for entry := range entries {
glog.V(6).Infof("node %v: committing %+v", s.nodeID, entry)
if entry.Entry.Type == LogEntryCommand {
s.sendEvent(&EventCommandCommitted{entry.Entry.Payload})
}
}
g.commitIndex = index
s.broadcastEntries(g, nil)
}

// updateDirtyStatus sets the dirty flag for the given group.
func (s *state) updateDirtyStatus(g *group) {
dirty := false
if !g.metadata.Equal(g.persistedMetadata) {
if !g.electionState.Equal(g.persistedElectionState) {
dirty = true
}
if len(g.pendingEntries) > 0 {
Expand Down
5 changes: 4 additions & 1 deletion multiraft/multiraft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package multiraft
import (
"testing"
"time"

"github.com/golang/glog"
)

type testCluster struct {
Expand Down Expand Up @@ -120,7 +122,8 @@ func TestCommand(t *testing.T) {
cluster.nodes[0].SubmitCommand(groupID, []byte("command"))

// The command will be committed on each node.
for _, events := range cluster.events {
for i, events := range cluster.events {
glog.Infof("waiting for event to be commited on node %v", i)
commit := <-events.CommandCommitted
if string(commit.Command) != "command" {
t.Errorf("unexpected value in committed command: %v", commit.Command)
Expand Down
Loading

0 comments on commit 447610d

Please sign in to comment.