Skip to content

Commit

Permalink
Fix: changing membership should not remove replication to all learners
Browse files Browse the repository at this point in the history
When changing membership, replications to the learners(non-voters) that
are not added as voter should be kept.

E.g.: with a cluster of voters `{0}` and learners `{1, 2, 3}`, changing
membership to `{0, 1, 2}` should not remove replication to node `3`.

Only replications to removed members should be removed.
  • Loading branch information
drmingdrmer committed Sep 30, 2022
1 parent 1bd22ed commit 2896b98
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 16 deletions.
35 changes: 22 additions & 13 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
///
/// This is ony called by leader.
#[tracing::instrument(level = "debug", skip(self))]
pub(super) fn handle_uniform_consensus_committed(&mut self, log_id: &LogId) {
pub(super) async fn handle_uniform_consensus_committed(&mut self, log_id: &LogId) -> Result<(), StorageError> {
tracing::info!("handle_uniform_consensus_committed at log id: {}", log_id);
let index = log_id.index;

Expand All @@ -257,24 +257,32 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// TODO(xp): transfer leadership
self.core.set_target_state(State::Learner);
self.core.current_leader = None;
return;
return Ok(());
}

let membership = &self.core.effective_membership.membership;

for (id, state) in self.nodes.iter_mut() {
if membership.contains(id) {
continue;
}
let (_, committed_membership) = self.core.storage.last_applied_state().await?;

if let Some(prev) = committed_membership {
let prev_x = prev.membership.all_nodes().clone();
let curr = membership.all_nodes();

tracing::info!(
"set remove_after_commit for {} = {}, membership: {:?}",
id,
index,
self.core.effective_membership
);
let removed = prev_x.difference(curr);
for id in removed {
if let Some(state) = self.nodes.get_mut(id) {
tracing::info!(
"set remove_after_commit for {} = {}, membership: {:?}",
id,
index,
self.core.effective_membership
);

state.remove_since = Some(index)
state.remove_since = Some(index)
} else {
tracing::warn!("replication not found to target: {}", id)
}
}
}

let targets = self.nodes.keys().cloned().collect::<Vec<_>>();
Expand All @@ -283,6 +291,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

self.leader_report_metrics();
Ok(())
}

/// Remove a replication if the membership that does not include it has committed.
Expand Down
8 changes: 5 additions & 3 deletions openraft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,24 +270,26 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
);
}

pub fn handle_special_log(&mut self, entry: &Entry<D>) {
pub async fn handle_special_log(&mut self, entry: &Entry<D>) -> Result<(), StorageError> {
match &entry.payload {
EntryPayload::Membership(ref m) => {
if m.is_in_joint_consensus() {
// nothing to do
} else {
self.handle_uniform_consensus_committed(&entry.log_id);
self.handle_uniform_consensus_committed(&entry.log_id).await?;
}
}
EntryPayload::Blank => {}
EntryPayload::Normal(_) => {}
}

Ok(())
}

/// Apply the given log entry to the state machine.
#[tracing::instrument(level = "debug", skip(self, entry))]
pub(super) async fn apply_entry_to_state_machine(&mut self, entry: &Entry<D>) -> Result<R, StorageError> {
self.handle_special_log(entry);
self.handle_special_log(entry).await?;

// First, we just ensure that we apply any outstanding up to, but not including, the index
// of the given entry. We need to be able to return the data response from applying this
Expand Down
1 change: 1 addition & 0 deletions openraft/tests/membership/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod t10_add_learner;
mod t10_remove_learner;
mod t15_add_remove_follower;
mod t20_change_membership;
mod t21_change_membership_keep_learner;
mod t25_elect_with_new_config;
mod t30_commit_joint_config;
mod t30_step_down;
Expand Down
61 changes: 61 additions & 0 deletions openraft/tests/membership/t21_change_membership_keep_learner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;
use openraft::Config;

use crate::fixtures::RaftRouter;

/// Given a cluster of voter {0,1,2} and learners {3,4,5};
/// Changing membership to {0,3,4} should not remove replication to node-5, should only remove replication to {1,2}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn change_membership_keep_learners() -> anyhow::Result<()> {
let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

let lag_threshold = 1;

let config = Arc::new(
Config {
replication_lag_threshold: lag_threshold,
..Default::default()
}
.validate()?,
);
let router = Arc::new(RaftRouter::new(config.clone()));

let mut log_index = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {3,4,5}).await?;

tracing::info!("--- change membership to: 0,3,4");
router.change_membership(0, btreeset! {0,3,4}).await?;
log_index += 2;

tracing::info!("--- write 5 logs");
{
router.client_request_many(0, "foo", 5).await;
log_index += 5;

for id in [1, 2] {
assert!(router
.wait(&id, timeout())
.await?
.log(Some(log_index), "removed voters can not receive logs")
.await
.is_err());
}

for id in [0, 3, 4, 5] {
router
.wait(&id, timeout())
.await?
.log(Some(log_index), "other voters and learners receive all logs")
.await?;
}
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(500))
}

0 comments on commit 2896b98

Please sign in to comment.