Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: advance commit index safely #139

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {

ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
}
rn.raft.raftLog.append(ents...)
rn.raft.raftLog.append(rn.raft.Term, ents...)

// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
Expand Down
110 changes: 94 additions & 16 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,35 @@ type raftLog struct {
// they will be saved into storage.
unstable unstable

// leaderTerm is a term of the leader with whom our log is "consistent". The
// log is guaranteed to be a prefix of this term's leader log.
//
// The leaderTerm can be safely updated to `t` if:
// 1. the last entry in the log has term `t`, or, more generally,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there cases where (2) is insufficient and (1) is needed? I'm curious why we can't make this more specific. You say "We use (1) to initialize leaderTerm" below. Is this important on startup?

Copy link
Contributor Author

@pav-kv pav-kv Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only (2) is necessary. (1) is a cheap "stateful" version of (2).

On a server restart, we forget who did the last append. (1) gives the best guess, and allows recovering to up-to-date state for idle raft groups that haven't seen recent appends except the current leader's empty entry. For non-idle groups, or followers whose log is significantly behind the leader's and doesn't end with entries at its term, recovering to raftLog.lastTerm() gives no value, and is equivalent to setting leaderTerm = 0. To recover, these will then have to "wait" for the next append message from this leader.

It would be more ideal if this field was stored in some HardState.LeaderTerm - then we would always recover to up-to-date state. Note that we can't reuse HardState.Term because HardState.Term can be > leaderTerm (for the same reason why r.Term can be > leaderTerm).

To bring analogy with Paxos, the local raftLog is an acceptor, and raftLog.leaderTerm is the id of the highest accepted proposal. The election term is sort of orthogonal to this - the election term (r.Term / HardState.Term) can briefly jump in front of the accepted proposal term until there is an accepted proposal at this new term.

If we ever want to bring this to the next level: MsgApp messages should not be rejected based on r.Term / HardState.Term. For correctness, it is only necessary to reject MsgApp if the message term is < raftLog.leaderTerm / HardState.LeaderTerm. I think this would reduce some unnecessary rejects during the leader election flux time. caveat: #139 (comment)

Copy link
Contributor Author

@pav-kv pav-kv Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should note the invariant here, and maybe check it in a few places or tests:

	RawNode.raft.Term >= raftLog.leaderTerm >= raftLog.lastTerm()

(1) initializes leaderTerm to lastTerm(), which is safe because raft.Term >= raftLog.lastTerm()
(2) maintains it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allows recovering to up-to-date state for idle raft groups that haven't seen recent appends except the current leader's empty entry. For non-idle groups, or followers whose log is significantly behind the leader's and doesn't end with entries at its term, recovering to raftLog.lastTerm() gives no value, and is equivalent to setting leaderTerm = 0. To recover, these will then have to "wait" for the next append message from this leader.

This is what I was hoping to clarify. Initializing to lastTerm is an opportunistic way to allow a restarted follower in idle raft groups to immediately advance its commit index on startup without needing to first accept a MsgApp.

Is it anything more than that? For an idle raft group, a follower with an up-to-date log but a lagging commit index may restart and never receive any new entries. If we didn't have (1) and we started discarding commit indexes in heartbeats with terms > raftLog.leaderTerm, would the commit index on the follower get stuck?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I think we indeed need (1). Also, (1) plays nicely with the invariant that I put above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we ever want to bring this to the next level: MsgApp messages should not be rejected based on r.Term / HardState.Term. For correctness, it is only necessary to reject MsgApp if the message term is < raftLog.leaderTerm / HardState.LeaderTerm. I think this would reduce some unnecessary rejects during the leader election flux time.

Does this mean MsgApp from old leader will be accepted by voters that voted to a new leader but yet received any log from the new leader? In that case, there may be committed entry in old leader but not in the new leader.

Copy link
Contributor Author

@pav-kv pav-kv Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joshuazh-x That's a good point. We don't want a quorum of nodes to accept entries unless we are sure these entries are consistent with the new leader's log.

So it's safest to accept only MsgApp.Term >= raft.Term. But we could sometimes accept MsgApp.Term < raft.Term if:

  • The MsgApp contains the (index, term) entry for which we voted when electing the raft.Term leader. If that election wins, we know the new leader will append right after this entry.
  • We would truncate MsgApp.Entries at the aforementioned index we voted for, and append it.
  • This guarantees that this append is consistent with the new leader's log (i.e. the new leader would send us the exact same entries).

A vote is a promise to the leader not to accept any entries that are not in the leader's log. If we can deduce that an entry is in the leader's log (before / other than by getting a MsgApp directly from this leader), we can always safely accept it.

It's unclear if such an optimization would give any value (like reduce replication latency in some cases; probably it does avoid a duplicate MsgApp from the new leader when the election races with the old leader appends), so I will leave it as an exercise for later :) Looks like a complication.

Filed #150 with a more general technique that will bring more benefits.

// 2. the last successful append was sent by the leader `t`.
//
// This is due to the following safety property (see raft paper §5.3):
//
// Log Matching: if two logs contain an entry with the same index and term,
// then the logs are identical in all entries up through the given index.
//
// We use (1) to initialize leaderTerm, and (2) to maintain it on updates.
//
// NB: (2) does not imply (1). If our log is behind the leader's log, the last
// entry term can be below leaderTerm.
//
// NB: leaderTerm does not necessarily match this raft node's term. It only
// does for the leader. For followers and candidates, when we first learn or
// bump to a new term, we don't have a proof that our log is consistent with
// the new term's leader (current or prospective). The new leader may override
// any suffix of the log after the committed index. Only when the first append
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the log", "the committed index"

Are you referring to the replicated log here, or this replica's local log? In other words, should these "the"s be replaced by "our"?

Copy link
Contributor Author

@pav-kv pav-kv Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both, but mostly the "replicated". The leader can override only a suffix of the "replicated" log after the "replicated" commit index.

"Our" log is lagging the leader's log, such as our committed index. For "our" log, this implies that the leader can override a suffix after our committed index (but we have no way of checking by how much it can go back).

// from the new leader succeeds, we can update leaderTerm.
//
// During normal operation, leaderTerm matches the node term though. During a
// leader change, it briefly lags behind, and matches again when the first
// append message succeeds.
leaderTerm uint64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My immediate feeling is that it's a little weird to add a leaderTerm into raftLog, which shouldn't care about the info (leader's Term). It should be part of the raft instead of raftLog.

Copy link
Contributor Author

@pav-kv pav-kv Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment https://github.com/etcd-io/raft/pull/139/files#r1469818174, specifically the Paxos analogy.

Semantically, raftLog is layered under raft.go. The raftLog is agnostic to election, and essentially only implements the "Paxos acceptor" role. So it makes quite a good sense to me to have a "last accepted term" notion in raftLog. Moving this logic up in raft.go would be a layering violation of sorts.

which shouldn't care about the info (leader's Term)

raftLog should care about the leader term with which it is consistent (i.e. what I called the leaderTerm here). It should not care about the RawNode.Term / HardState.Term, because the latter can be bumped arbitrarily when a new election happens, and this bump is not coordinated with raftLog.

There is an invariant at raft.go level that is relevant here: RawNode.Term >= raftLog.leaderTerm. We use RawNode.Term instead of raftLog.leaderTerm in a few places (notably, to reject append messages), and have only been safe because of this implicit invariant. I think we should make it more explicit.


// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
Expand Down Expand Up @@ -88,6 +117,11 @@ func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEnc
if err != nil {
panic(err) // TODO(bdarnell)
}
lastTerm, err := storage.Term(lastIndex)
if err != nil {
panic(err) // TODO(pav-kv)
}
log.leaderTerm = lastTerm
log.unstable.offset = lastIndex + 1
log.unstable.offsetInProgress = lastIndex + 1
log.unstable.logger = logger
Expand All @@ -106,35 +140,73 @@ func (l *raftLog) String() string {

// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if !l.matchTerm(index, logTerm) {
//
// TODO(pav-kv): introduce a struct that consolidates the append metadata. The
// (leaderTerm, prevIndex, prevTerm) tuple must always be carried together, so
// that safety properties for this append are checked at the lowest layers
// rather than up in raft.go.
func (l *raftLog) maybeAppend(leaderTerm, prevIndex, prevTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
// Can not accept append requests from an outdated leader.
if leaderTerm < l.leaderTerm {
return 0, false
}
// Can not accept append requests that are not consistent with our log.
//
// NB: it is unnecessary to check matchTerm() if leaderTerm == l.leaderTerm,
// because the leader always sends self-consistent appends. For ensuring raft
// safety, this check is only necessary if leaderTerm > l.leaderTerm.
//
// TODO(pav-kv): however, we should log an error if leaderTerm == l.leaderTerm
// and the entry does not match. This means either the leader is sending
// inconsistent appends, or there is some state corruption in general.
if !l.matchTerm(prevIndex, prevTerm) {
return 0, false
}

lastnewi = index + uint64(len(ents))
lastnewi = prevIndex + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
offset := prevIndex + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
}
l.append(ents[ci-offset:]...)
l.append(leaderTerm, ents[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
// TODO(pav-kv): call commitTo from outside of this method, for a smaller API.
// TODO(pav-kv): it is safe to pass committed index as is here instead of min,
// but it breaks some tests that make incorrect assumptions. Fix this.
l.commitTo(leaderTerm, min(committed, lastnewi))
return lastnewi, true
}

func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
func (l *raftLog) append(leaderTerm uint64, ents ...pb.Entry) uint64 {
// Can not accept append requests from an outdated leader.
if leaderTerm < l.leaderTerm {
return l.lastIndex()
}
if len(ents) == 0 { // no-op
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}

// INVARIANT: l.term(i) <= l.leaderTerm, for any entry in the log.
//
// TODO(pav-kv): we should more generally check that the content of ents slice
// is correct: all entries have consecutive indices, and terms do not regress.
// We should do this validation once, on every incoming message, and pass the
// append in a type-safe "validated append" wrapper. This wrapper can provide
// convenient accessors to the prev/last entry, instead of raw slices access.
if lastTerm := ents[len(ents)-1].Term; lastTerm > leaderTerm {
l.logger.Panicf("leader at term %d tries to append a higher term %d", leaderTerm, lastTerm)
}
l.leaderTerm = leaderTerm // l.leaderTerm never regresses here

l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}
Expand Down Expand Up @@ -315,12 +387,16 @@ func (l *raftLog) lastIndex() uint64 {
return i
}

func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
if l.lastIndex() < tocommit {
l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
}
func (l *raftLog) commitTo(leaderTerm, tocommit uint64) {
// Do not accept the commit index update from a leader if our log is not
// consistent with the leader's log.
if leaderTerm != l.leaderTerm {
return
}
// Otherwise, we have the guarantee that our log is a prefix of the leader's
// log. All entries <= min(tocommit, lastIndex) can thus be committed.
tocommit = min(tocommit, l.lastIndex())
if tocommit > l.committed {
l.committed = tocommit
}
}
Expand Down Expand Up @@ -444,12 +520,14 @@ func (l *raftLog) matchTerm(i, term uint64) bool {
return t == term
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
// TODO(pav-kv): clarify that (maxIndex, term) is the ID of the entry at the
// committed index. Clean this up.
func (l *raftLog) maybeCommit(leaderTerm, maxIndex, term uint64) bool {
// NB: term should never be 0 on a commit because the leader campaigns at
// least at term 1. But if it is 0 for some reason, we don't want to consider
// this a term match in case zeroTermOnOutOfBounds returns 0.
if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
l.commitTo(leaderTerm, maxIndex)
return true
}
return false
Expand Down
Loading
Loading