Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: centralize configuration change application #10865

Merged
merged 2 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 17 additions & 24 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,19 @@ func StartNode(c *Config, peers []Peer) Node {
if err != nil {
panic("unexpected marshal error")
}
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
// TODO(tbg): this should append the ConfChange for the own node first
// and also call applyConfChange below for that node first. Otherwise
// we have a Raft group (for a little while) that doesn't have itself
// in its config, which is bad.
// This whole way of setting things up is rickety. The app should just
// populate the initial ConfState appropriately and then all of this
// goes away.
e := pb.Entry{
Type: pb.EntryConfChange,
Term: 1,
Index: r.raftLog.lastIndex() + 1,
Data: d,
}
r.raftLog.append(e)
}
// Mark these initial entries as committed.
Expand All @@ -225,7 +237,7 @@ func StartNode(c *Config, peers []Peer) Node {
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
for _, peer := range peers {
r.addNode(peer.ID)
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
}

n := newNode()
Expand Down Expand Up @@ -357,35 +369,16 @@ func (n *node) run(r *raft) {
r.Step(m)
}
case cc := <-n.confc:
if cc.NodeID == None {
select {
case n.confstatec <- pb.ConfState{
Nodes: r.prs.VoterNodes(),
Learners: r.prs.LearnerNodes()}:
case <-n.done:
}
break
}
switch cc.Type {
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfChangeAddLearnerNode:
r.addLearner(cc.NodeID)
case pb.ConfChangeRemoveNode:
cs := r.applyConfChange(cc)
if _, ok := r.prs.Progress[r.id]; !ok {
// block incoming proposal when local node is
// removed
if cc.NodeID == r.id {
propc = nil
}
r.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
default:
panic("unexpected conf type")
}
select {
case n.confstatec <- pb.ConfState{
Nodes: r.prs.VoterNodes(),
Learners: r.prs.LearnerNodes()}:
case n.confstatec <- cs:
case <-n.done:
}
case <-n.tickc:
Expand Down
7 changes: 7 additions & 0 deletions raft/quorum/joint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ package quorum
// majority configurations. Decisions require the support of both majorities.
type JointConfig [2]MajorityConfig

func (c JointConfig) String() string {
if len(c[1]) > 0 {
return c[0].String() + "&&" + c[1].String()
}
return c[0].String()
}

// IDs returns a newly initialized map representing the set of voters present
// in the joint configuration.
func (c JointConfig) IDs() map[uint64]struct{} {
Expand Down
18 changes: 18 additions & 0 deletions raft/quorum/majority.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ import (
// MajorityConfig is a set of IDs that uses majority quorums to make decisions.
type MajorityConfig map[uint64]struct{}

func (c MajorityConfig) String() string {
sl := make([]uint64, 0, len(c))
for id := range c {
sl = append(sl, id)
}
sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] })
var buf strings.Builder
buf.WriteByte('(')
for i := range sl {
if i > 0 {
buf.WriteByte(' ')
}
fmt.Fprint(&buf, sl[i])
}
buf.WriteByte(')')
return buf.String()
}

// Describe returns a (multi-line) representation of the commit indexes for the
// given lookuper.
func (c MajorityConfig) Describe(l AckedIndexer) string {
Expand Down
1 change: 1 addition & 0 deletions raft/quorum/quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"
)

// Index is a Raft log position.
type Index uint64

func (i Index) String() string {
Expand Down
197 changes: 132 additions & 65 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,11 +1322,51 @@ func (r *raft) handleSnapshot(m pb.Message) {
}

// restore recovers the state machine from a snapshot. It restores the log and the
// configuration of state machine.
// configuration of state machine. If this method returns false, the snapshot was
// ignored, either because it was obsolete or because of an error.
func (r *raft) restore(s pb.Snapshot) bool {
if s.Metadata.Index <= r.raftLog.committed {
return false
}
if r.state != StateFollower {
// This is defense-in-depth: if the leader somehow ended up applying a
// snapshot, it could move into a new term without moving into a
// follower state. This should never fire, but if it did, we'd have
// prevented damage by returning early, so log only a loud warning.
//
// At the time of writing, the instance is guaranteed to be in follower
// state when this method is called.
r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
r.becomeFollower(r.Term+1, None)
return false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment to this function explaining the bool return value. Should we also step down to follower in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// More defense-in-depth: throw away snapshot if recipient is not in the
// config. This shouuldn't ever happen (at the time of writing) but lots of
// code here and there assumes that r.id is in the progress tracker.
found := false
cs := s.Metadata.ConfState
for _, set := range [][]uint64{
cs.Nodes,
cs.Learners,
} {
for _, id := range set {
if id == r.id {
found = true
break
}
}
}
if !found {
r.logger.Warningf(
"%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
r.id, cs,
)
return false
}

// Now go ahead and actually restore.

if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
Expand All @@ -1344,26 +1384,23 @@ func (r *raft) restore(s pb.Snapshot) bool {
}
}

r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)

r.raftLog.restore(s)
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
r.restoreNode(s.Metadata.ConfState.Nodes, false)
r.restoreNode(s.Metadata.ConfState.Learners, true)
return true
}

func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
for _, n := range nodes {
match, next := uint64(0), r.raftLog.lastIndex()+1
if n == r.id {
match = next - 1
r.isLearner = isLearner
}
r.prs.InitProgress(n, match, next, isLearner)
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.Progress[n])
// Reset the configuration and add the (potentially updated) peers in anew.
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
for _, id := range s.Metadata.ConfState.Nodes {
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode})
}
for _, id := range s.Metadata.ConfState.Learners {
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode})
}

pr := r.prs.Progress[r.id]
pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks new. What prompted you to add it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not new, just looks different now. I've traced this back to basically the age of the dinosaurs but there was no reason given for doing this. I don't think it matters, but am not sure enough to rip it out via a drive-by.


r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
return true
}

// promotable indicates whether state machine can be promoted to leader,
Expand All @@ -1373,68 +1410,98 @@ func (r *raft) promotable() bool {
return pr != nil && !pr.IsLearner
}

func (r *raft) addNode(id uint64) {
r.addNodeOrLearnerNode(id, false)
}
func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
addNodeOrLearnerNode := func(id uint64, isLearner bool) {
// NB: this method is intentionally hidden from view. All mutations of
// the conf state must call applyConfChange directly.
pr := r.prs.Progress[id]
if pr == nil {
r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
} else {
if isLearner && !pr.IsLearner {
// Can only change Learner to Voter.
//
// TODO(tbg): why?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we come to a decision on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a good reason, but we're waiting for @xiang90 and @siddontang for more input. My plan so far is to implement everything as if we wanted to allow demotions (incl unit tests etc) but then not actually expose them to the outside world. We certainly don't need them in CRDB.

r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
return
}

func (r *raft) addLearner(id uint64) {
r.addNodeOrLearnerNode(id, true)
}
if isLearner == pr.IsLearner {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}

func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
pr := r.prs.Progress[id]
if pr == nil {
r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
} else {
if isLearner && !pr.IsLearner {
// Can only change Learner to Voter.
r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
return
// Change Learner to Voter, use origin Learner progress.
r.prs.RemoveAny(id)
r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
pr.IsLearner = false
*r.prs.Progress[id] = *pr
}

if isLearner == pr.IsLearner {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has had a chance to communicate with us.
r.prs.Progress[id].RecentActive = true
}

var removed int
if cc.NodeID != None {
switch cc.Type {
case pb.ConfChangeAddNode:
addNodeOrLearnerNode(cc.NodeID, false /* isLearner */)
case pb.ConfChangeAddLearnerNode:
addNodeOrLearnerNode(cc.NodeID, true /* isLearner */)
case pb.ConfChangeRemoveNode:
removed++
r.prs.RemoveAny(cc.NodeID)
case pb.ConfChangeUpdateNode:
default:
panic("unexpected conf type")
}

// Change Learner to Voter, use origin Learner progress.
r.prs.RemoveAny(id)
r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
pr.IsLearner = false
*r.prs.Progress[id] = *pr
}

if r.id == id {
r.isLearner = isLearner
}
r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
// Now that the configuration is updated, handle any side effects.

// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has a chance to communicate with us.
r.prs.Progress[id].RecentActive = true
}
cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()}
pr, ok := r.prs.Progress[r.id]

func (r *raft) removeNode(id uint64) {
r.prs.RemoveAny(id)
// Update whether the node itself is a learner, resetting to false when the
// node is removed.
r.isLearner = ok && pr.IsLearner

// Do not try to commit or abort transferring if the cluster is now empty.
if len(r.prs.Voters[0]) == 0 && len(r.prs.Learners) == 0 {
return
if (!ok || r.isLearner) && r.state == StateLeader {
// This node is leader and was removed or demoted. We prevent demotions
// at the time writing but hypothetically we handle them the same way as
// removing the leader: stepping down into the next Term.
//
// TODO(tbg): step down (for sanity) and ask follower with largest Match
// to TimeoutNow (to avoid interruption). This might still drop some
// proposals but it's better than nothing.
//
// TODO(tbg): test this branch. It is untested at the time of writing.
return cs
}

// TODO(tbg): won't bad (or at least unfortunate) things happen if the
// leader just removed itself?

// The quorum size is now smaller, so see if any pending entries can
// be committed.
if r.maybeCommit() {
r.bcastAppend()
// The remaining steps only make sense if this node is the leader and there
// are other nodes.
if r.state != StateLeader || len(cs.Nodes) == 0 {
return cs
}
// If the removed node is the leadTransferee, then abort the leadership transferring.
if r.state == StateLeader && r.leadTransferee == id {
if removed > 0 {
// The quorum size may have been reduced (but not to zero), so see if
// any pending entries can be committed.
if r.maybeCommit() {
r.bcastAppend()
}
}
// If the the leadTransferee was removed, abort the leadership transfer.
if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
}

return cs
}

func (r *raft) loadState(state pb.HardState) {
Expand Down
Loading