Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: add option in function change_membership, turn not exist member into learner, or remove it #148

Merged
merged 11 commits into from
Feb 10, 2022
4 changes: 3 additions & 1 deletion openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
&mut self,
members: BTreeSet<NodeId>,
blocking: bool,
turn_to_learner: bool,
tx: RaftRespTx<ClientWriteResponse<R>, ClientWriteError>,
) -> Result<(), StorageError> {
// Ensure cluster will have at least one node.
Expand All @@ -165,7 +166,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

let curr = self.core.effective_membership.membership.clone();
let new_members = members.difference(curr.all_members());
let mut new_config = curr.next_safe(members.clone());
let mut new_config = curr.next_safe(members.clone(), turn_to_learner);

tracing::debug!(?new_config, "new_config");

// Check the proposed config for any new nodes. If ALL new nodes already have replication
Expand Down
9 changes: 7 additions & 2 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,8 +828,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
RaftMsg::AddLearner { id, tx, blocking } => {
self.add_learner(id, tx, blocking).await;
}
RaftMsg::ChangeMembership { members, blocking, tx } => {
self.change_membership(members, blocking, tx).await?;
RaftMsg::ChangeMembership {
members,
blocking,
turn_to_learner,
tx,
} => {
self.change_membership(members, blocking, turn_to_learner, tx).await?;
}
};

Expand Down
21 changes: 15 additions & 6 deletions openraft/src/membership/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,23 @@ impl Membership {
/// }
/// ```
#[must_use]
pub fn next_safe(&self, goal: BTreeSet<NodeId>) -> Self {
pub fn next_safe(&self, goal: BTreeSet<NodeId>, turn_to_learner: bool) -> Self {
let learners = if turn_to_learner {
let curr = self.clone();
// add removed members into learners
let removed_members = curr.all_members().difference(&goal);
let mut learners = curr.all_learners().clone();
for id in removed_members {
learners.insert(*id);
}
learners
} else {
self.learners.clone()
};
if self.configs.contains(&goal) {
Membership::new_single_with_learners(goal, self.learners.clone())
Membership::new_single_with_learners(goal, learners)
} else {
Membership::new_multi_with_learners(
vec![self.configs.last().cloned().unwrap(), goal],
self.learners.clone(),
)
Membership::new_multi_with_learners(vec![self.configs.last().cloned().unwrap(), goal], learners)
}
}

Expand Down
16 changes: 11 additions & 5 deletions openraft/src/membership/membership_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,17 @@ fn test_membership_next_safe() -> anyhow::Result<()> {
let m12 = Membership::new_multi(vec![c1(), c2()]);
let m23 = Membership::new_multi(vec![c2(), c3()]);

assert_eq!(m1, m1.next_safe(c1()));
assert_eq!(m12, m1.next_safe(c2()));
assert_eq!(m1, m12.next_safe(c1()));
assert_eq!(m2, m12.next_safe(c2()));
assert_eq!(m23, m12.next_safe(c3()));
assert_eq!(m1, m1.next_safe(c1(), false));
assert_eq!(m12, m1.next_safe(c2(), false));
assert_eq!(m1, m12.next_safe(c1(), false));
assert_eq!(m2, m12.next_safe(c2(), false));
assert_eq!(m23, m12.next_safe(c3(), false));

let old_learners = || btreeset! {1, 2};
let learners = || btreeset! {1, 2, 3, 4, 5};
let m23_with_learners_old = Membership::new_multi_with_learners(vec![c2(), c3()], old_learners());
let m23_with_learners_new = Membership::new_multi_with_learners(vec![c3()], learners());
assert_eq!(m23_with_learners_new, m23_with_learners_old.next_safe(c3(), true));

Ok(())
}
36 changes: 32 additions & 4 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,20 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// - It proposes a **joint** config.
/// - When the **joint** config is committed, it proposes a uniform config.
///
/// If blocking is true, it blocks until every learner becomes up to date.
/// If `blocking` is true, it blocks until every learner becomes up to date.
/// Otherwise it returns error `ChangeMembershipError::LearnerIsLagging` if there is a lagging learner.
///
/// If `turn_to_learner` is true, then all the members which not exists in the new membership,
/// will be turned into learners, otherwise will be removed.
///
/// If it lost leadership or crashed before committing the second **uniform** config log, the cluster is left in the
/// **joint** config.
#[tracing::instrument(level = "debug", skip(self))]
pub async fn change_membership(
&self,
members: BTreeSet<NodeId>,
blocking: bool,
turn_to_learner: bool,
) -> Result<ClientWriteResponse<R>, ClientWriteError> {
tracing::info!("change_membership: start to commit joint config");

Expand All @@ -270,6 +274,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
RaftMsg::ChangeMembership {
members: members.clone(),
blocking,
turn_to_learner,
tx,
},
rx,
Expand All @@ -289,7 +294,17 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
tracing::debug!("the second step is to change to uniform config: {:?}", members);

let (tx, rx) = oneshot::channel();
let res = self.call_core(RaftMsg::ChangeMembership { members, blocking, tx }, rx).await?;
let res = self
.call_core(
RaftMsg::ChangeMembership {
members,
blocking,
turn_to_learner,
tx,
},
rx,
)
.await?;

tracing::info!("res of second change_membership: {}", res.summary());

Expand Down Expand Up @@ -446,6 +461,11 @@ pub(crate) enum RaftMsg<D: AppData, R: AppDataResponse> {
///
/// Otherwise, wait for commit of the member change log.
blocking: bool,

/// If turn_to_learner is true, then all the members which not exists in the new membership,
/// will be turned into learners, otherwise will be removed.
turn_to_learner: bool,

tx: RaftRespTx<ClientWriteResponse<R>, ClientWriteError>,
},
}
Expand Down Expand Up @@ -476,8 +496,16 @@ where
RaftMsg::AddLearner { id, blocking, .. } => {
format!("AddLearner: id: {}, blocking: {}", id, blocking)
}
RaftMsg::ChangeMembership { members, blocking, .. } => {
format!("ChangeMembership: members: {:?}, blocking: {}", members, blocking)
RaftMsg::ChangeMembership {
members,
blocking,
turn_to_learner,
..
} => {
format!(
"ChangeMembership: members: {:?}, blocking: {}, turn_to_learner: {}",
members, blocking, turn_to_learner,
)
}
}
}
Expand Down
34 changes: 32 additions & 2 deletions openraft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,25 @@ impl RaftRouter {
Ok(())
}

#[tracing::instrument(level = "info", skip(self))]
pub async fn wait_for_members(
&self,
node_ids: &BTreeSet<u64>,
members: BTreeSet<u64>,
timeout: Option<Duration>,
msg: &str,
) -> Result<()> {
for i in node_ids.iter() {
let wait = self.wait(i, timeout).await?;
wait.metrics(
|x| x.membership_config.membership.get_ith_config(0).cloned().unwrap() == members,
msg,
)
lichuang marked this conversation as resolved.
Show resolved Hide resolved
.await?;
}
Ok(())
}

/// Wait for specified nodes until their state becomes `state`.
#[tracing::instrument(level = "info", skip(self))]
pub async fn wait_for_state(
Expand Down Expand Up @@ -476,7 +495,18 @@ impl RaftRouter {
) -> Result<ClientWriteResponse<MemClientResponse>, ClientWriteError> {
let rt = self.routing_table.read().await;
let node = rt.get(&leader).unwrap_or_else(|| panic!("node with ID {} does not exist", leader));
node.0.change_membership(members, true).await
node.0.change_membership(members, true, false).await
}

pub async fn change_membership_with_turn_to_learner(
&self,
leader: NodeId,
members: BTreeSet<NodeId>,
turn_to_learner: bool,
) -> Result<ClientWriteResponse<MemClientResponse>, ClientWriteError> {
let rt = self.routing_table.read().await;
let node = rt.get(&leader).unwrap_or_else(|| panic!("node with ID {} does not exist", leader));
node.0.change_membership(members, true, turn_to_learner).await
}

pub async fn change_membership_with_blocking(
Expand All @@ -487,7 +517,7 @@ impl RaftRouter {
) -> Result<ClientWriteResponse<MemClientResponse>, ClientWriteError> {
let rt = self.routing_table.read().await;
let node = rt.get(&leader).unwrap_or_else(|| panic!("node with ID {} does not exist", leader));
node.0.change_membership(members, blocking).await
node.0.change_membership(members, blocking, false).await
}

/// Send a client read request to the target node.
Expand Down
59 changes: 59 additions & 0 deletions openraft/tests/membership/t20_change_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use maplit::btreeset;
use openraft::error::ChangeMembershipError;
use openraft::Config;
use openraft::RaftStorage;
use openraft::State;

use crate::fixtures::RaftRouter;

Expand Down Expand Up @@ -119,6 +120,64 @@ async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn change_with_turn_not_exist_member_to_learner() -> anyhow::Result<()> {
// Add a member without adding it as learner, in blocking mode it should finish successfully.

let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();

let config = Arc::new(Config { ..Default::default() }.validate()?);
let router = Arc::new(RaftRouter::new(config.clone()));
let timeout = Some(Duration::from_millis(1000));
let mut n_logs = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {}).await?;

tracing::info!("--- write up to 1 logs");
{
router.client_request_many(0, "non_voter_add", 1).await;
n_logs += 1;

// all the nodes MUST recv the log
router.wait_for_log(&btreeset![0, 1, 2], Some(n_logs), timeout, "append a log").await?;
}

{
router.change_membership_with_turn_to_learner(0, btreeset![0, 1], true).await?;
// 2 for change_membership
n_logs += 2;

// all the nodes MUST recv the change_membership log
router.wait_for_log(&btreeset![0, 1], Some(n_logs), timeout, "append a log").await?;
}

tracing::info!("--- write up to 1 logs");
{
router.client_request_many(0, "non_voter_add", 1).await;
n_logs += 1;

// node [0,1] MUST recv the log
router.wait_for_log(&btreeset![0, 1], Some(n_logs), timeout, "append a log").await?;

// node 2 MUST stay in learner state and is able to receive new logs
router
.wait_for_metrics(
&2,
|x| x.state == State::Learner,
timeout,
&format!("n{}.state -> {:?}", 2, State::Learner),
)
.await?;

// node [2] MUST recv the log
router.wait_for_log(&btreeset![2], Some(n_logs), timeout, "append a log").await?;

// check membership
router.wait_for_members(&btreeset![0, 1, 2], btreeset![0, 1], timeout, "members: [0,1]").await?;
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_micros(500))
}