Skip to content

Commit

Permalink
Fix: delay leader step down
Browse files Browse the repository at this point in the history
When a membership that removes the leader is committed,
the leader continue to work for a short while before reverting to a learner.
This way, let the leader replicate the `membership-log-is-committed` message to followers.

Otherwise, if the leader step down at once, the follower might have to re-commit the membership log
again.

After committing the membership log that does not contain the leader,
the leader will step down in the next `tick`.
  • Loading branch information
drmingdrmer committed Nov 6, 2022
1 parent 4b52e6d commit 0023cff
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 33 deletions.
48 changes: 19 additions & 29 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

#[tracing::instrument(level="trace", skip(self), fields(id=display(self.id), cluster=%self.config.cluster_name))]
#[tracing::instrument(level="trace", skip_all, fields(id=display(self.id), cluster=%self.config.cluster_name))]
async fn do_main(&mut self, rx_shutdown: oneshot::Receiver<()>) -> Result<(), Fatal<C::NodeId>> {
tracing::debug!("raft node is initializing");

Expand Down Expand Up @@ -734,7 +734,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}

/// Update core's target state, ensuring all invariants are upheld.
#[tracing::instrument(level = "trace", skip(self), fields(id=display(self.id)))]
#[tracing::instrument(level = "debug", skip(self), fields(id=display(self.id)))]
pub(crate) fn set_target_state(&mut self, target_state: ServerState) {
tracing::debug!(id = display(self.id), ?target_state, "set_target_state");

Expand Down Expand Up @@ -762,6 +762,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
can_be_leader
);

// TODO: election timer should be bound to `(vote, membership_log_id)`:
// i.e., when membership is updated, the previous election timer should be invalidated.
// e.g., in a same `vote`, a learner becomes voter and then becomes learner again.
// election timer should be cleared for learner, set for voter and then cleared again.
self.next_election_time = VoteWiseTime::new(self.engine.state.vote, now + t);
}

Expand Down Expand Up @@ -1040,28 +1044,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
unreachable!("it has to be a leader!!!");
};
}

/// Leader will keep working until the effective membership that removes it committed.
///
/// This is ony called by leader.
#[tracing::instrument(level = "debug", skip_all)]
pub(super) fn leader_step_down(&mut self) {
let em = &self.engine.state.membership_state.effective;

if self.engine.state.committed < em.log_id {
return;
}

// TODO: Leader does not need to step down. It can keep working.
// This requires to separate Leader(Proposer) and Acceptors.
if !em.is_voter(&self.id) {
tracing::debug!("leader is stepping down");

// TODO(xp): transfer leadership
self.set_target_state(ServerState::Learner);
self.engine.metrics_flags.set_cluster_changed();
}
}
}

impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C, N, S> {
Expand Down Expand Up @@ -1092,7 +1074,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}

/// Run an event handling loop
#[tracing::instrument(level="debug", skip(self), fields(id=display(self.id)))]
#[tracing::instrument(level="debug", skip_all, fields(id=display(self.id)))]
async fn runtime_loop(&mut self, mut rx_shutdown: oneshot::Receiver<()>) -> Result<(), Fatal<C::NodeId>> {
loop {
self.flush_metrics();
Expand Down Expand Up @@ -1125,7 +1107,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}

/// Spawn parallel vote requests to all cluster members.
#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = "trace", skip_all, fields(vote=vote_req.summary()))]
async fn spawn_parallel_vote_requests(&mut self, vote_req: &VoteRequest<C::NodeId>) {
let members = self.engine.state.membership_state.effective.voter_ids();

Expand Down Expand Up @@ -1353,6 +1335,14 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}
}

// When a membership that removes the leader is committed,
// the leader continue to work for a short while before reverting to a learner.
// This way, let the leader replicate the `membership-log-is-committed` message to followers.
// Otherwise, if the leader step down at once, the follower might have to re-commit the membership log
// again, electing itself.
self.engine.leader_step_down();
self.run_engine_commands::<Entry<C>>(&[]).await?;
}

RaftMsg::HigherVote {
Expand Down Expand Up @@ -1443,8 +1433,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
);
l.replication_metrics.update(UpdateMatchedLogId { target, matched });
} else {
unreachable!("it has to be a leader!!!");
// This method is only called after `update_progress()`.
// And this node may become a non-leader after `update_progress()`
}

self.engine.metrics_flags.set_replication_changed()
}

Expand Down Expand Up @@ -1589,8 +1581,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
ref upto,
} => {
self.apply_to_state_machine(committed.next_index(), upto.index).await?;
// Stepping down should be controlled by Engine.
self.leader_step_down();
}
Command::FollowerCommit {
already_committed: ref committed,
Expand Down
33 changes: 30 additions & 3 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,33 @@ where
}
}

/// Leader steps down(convert to learner) once the membership not containing it is committed.
///
/// This is only called by leader.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn leader_step_down(&mut self) {
tracing::debug!("leader_step_down: node_id:{}", self.id);

// Step down:
// Keep acting as leader until a membership without this node is committed.
let em = &self.state.membership_state.effective;

tracing::debug!(
"membership: {}, committed: {}, is_leading: {}",
em.summary(),
self.state.committed.summary(),
self.is_leading(),
);

#[allow(clippy::collapsible_if)]
if em.log_id <= self.state.committed {
if !em.is_voter(&self.id) && self.is_leading() {
tracing::debug!("leader {} is stepping down", self.id);
self.enter_following();
}
}
}

/// Follower/Learner handles install-snapshot.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn install_snapshot(&mut self, meta: SnapshotMeta<NID, N>) {
Expand Down Expand Up @@ -1154,13 +1181,13 @@ where
tracing::debug!(
is_member = display(self.is_voter()),
is_leader = display(self.is_leader()),
is_becoming_leader = display(self.is_becoming_leader()),
is_leading = display(self.is_leading()),
"states"
);
if self.is_voter() {
if self.is_leader() {
ServerState::Leader
} else if self.is_becoming_leader() {
} else if self.is_leading() {
ServerState::Candidate
} else {
ServerState::Follower
Expand All @@ -1175,7 +1202,7 @@ where
}

/// The node is candidate or leader
fn is_becoming_leader(&self) -> bool {
fn is_leading(&self) -> bool {
self.state.internal_server_state.is_leading()
}

Expand Down
7 changes: 6 additions & 1 deletion openraft/tests/membership/t30_remove_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ async fn remove_leader_access_new_cluster() -> Result<()> {

router
.wait(&2, timeout())
.log(Some(log_index), "new leader node-2 commits 2 membership log")
// The last_applied may not be updated on follower nodes:
// because leader node-0 will steps down at once when the second membership log is committed.
.metrics(
|x| x.last_log_index == Some(log_index),
"new leader node-2 commits 2 membership log",
)
.await?;
}

Expand Down

0 comments on commit 0023cff

Please sign in to comment.