diff --git a/async-raft/Cargo.toml b/async-raft/Cargo.toml index 00fbcfd18..2149781a4 100644 --- a/async-raft/Cargo.toml +++ b/async-raft/Cargo.toml @@ -19,6 +19,7 @@ byte-unit = "4.0.12" bytes = "1.0" derive_more = { version="0.99.9" } futures = "0.3" +maplit = "1.0.2" rand = "0.8" serde = { version="1", features=["derive"] } structopt = "0.3" @@ -29,7 +30,6 @@ tracing-futures = "0.2.4" [dev-dependencies] lazy_static = "1.4.0" -maplit = "1.0.2" memstore = { version="0.2.0", path="../memstore" } pretty_assertions = "1.0.0" tracing-appender = "0.2.0" diff --git a/async-raft/src/core/admin.rs b/async-raft/src/core/admin.rs index 4a871f501..4691cc2f1 100644 --- a/async-raft/src/core/admin.rs +++ b/async-raft/src/core/admin.rs @@ -45,16 +45,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // membership config in memory only. self.core.membership = EffectiveMembership { log_id: LogId { term: 1, index: 1 }, - membership: MembershipConfig { - members, - members_after_consensus: None, - }, + membership: MembershipConfig::new_single(members), }; // Become a candidate and start campaigning for leadership. If this node is the only node // in the cluster, then become leader without holding an election. If members len == 1, we // know it is our ID due to the above code where we ensure our own ID is present. - if self.core.membership.membership.members.len() == 1 { + if self.core.membership.membership.all_nodes().len() == 1 { self.core.current_term += 1; self.core.voted_for = Some(self.core.id); self.core.set_target_state(State::Leader); @@ -137,7 +134,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let curr = &self.core.membership.membership; - if let Some(ref next_membership) = curr.members_after_consensus { + if let Some(next_membership) = curr.get_ith_config(1) { // When it is in joint state, it is only allowed to change to the `members_after_consensus` if &members != next_membership { let _ = tx.send(Err(ClientWriteError::ChangeMembershipError( @@ -148,19 +145,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage ))); return; } else { - new_config = MembershipConfig { - members: next_membership.clone(), - members_after_consensus: None, - }; + new_config = MembershipConfig::new_single(next_membership.clone()); } } else { // currently it is uniform config, enter joint state - new_config = MembershipConfig { - members: curr.members.clone(), - members_after_consensus: Some(members.clone()), - }; + new_config = MembershipConfig::new_multi(vec![curr.get_ith_config(0).unwrap().clone(), members.clone()]); } + tracing::debug!(?new_config, "new_config"); + // Check the proposed config for any new nodes. If ALL new nodes already have replication // streams AND are ready to join, then we can immediately proceed with entering joint // consensus. Else, new nodes need to first be brought up-to-speed. @@ -172,7 +165,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // TODO(xp): 111 test adding a node that is not non-voter. // TODO(xp): 111 test adding a node that is lagging. - for new_node in members.difference(&self.core.membership.membership.members) { + for new_node in members.difference(&self.core.membership.membership.get_ith_config(0).unwrap()) { match self.nodes.get(&new_node) { // Node is ready to join. Some(node) => { diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index ea2554115..ff34d76d2 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -94,32 +94,30 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage pub(super) async fn handle_client_read_request(&mut self, tx: RaftRespTx<(), ClientReadError>) { // Setup sentinel values to track when we've received majority confirmation of leadership. let mut c0_confirmed = 0usize; + + let mems = &self.core.membership.membership; + // Will never be zero, as we don't allow it when proposing config changes. - let len_members = self.core.membership.membership.members.len(); + let len_members = mems.get_ith_config(0).unwrap().len(); let c0_needed = quorum::majority_of(len_members); let mut c1_confirmed = 0usize; let mut c1_needed = 0usize; - if let Some(joint_members) = &self.core.membership.membership.members_after_consensus { + + let second = mems.get_ith_config(1); + + if let Some(joint_members) = second { let len = joint_members.len(); // Will never be zero, as we don't allow it when proposing config changes. c1_needed = quorum::majority_of(len); + + if joint_members.contains(&self.core.id) { + c1_confirmed += 1; + } } // Increment confirmations for self, including post-joint-consensus config if applicable. c0_confirmed += 1; - let is_in_post_join_consensus_config = self - .core - .membership - .membership - .members_after_consensus - .as_ref() - .map(|members| members.contains(&self.core.id)) - .unwrap_or(false); - - if is_in_post_join_consensus_config { - c1_confirmed += 1; - } // If we already have all needed confirmations — which would be the case for single node // clusters — then respond. @@ -181,20 +179,18 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } // If the term is the same, then it means we are still the leader. - if self.core.membership.membership.members.contains(&target) { + if self.core.membership.membership.get_ith_config(0).unwrap().contains(&target) { c0_confirmed += 1; } - if self - .core - .membership - .membership - .members_after_consensus - .as_ref() - .map(|members| members.contains(&target)) - .unwrap_or(false) - { - c1_confirmed += 1; + + let second = self.core.membership.membership.get_ith_config(1); + + if let Some(joint) = second { + if joint.contains(&target) { + c1_confirmed += 1; + } } + if c0_confirmed >= c0_needed && c1_confirmed >= c1_needed { let _ = tx.send(Ok(())); return; diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index fe120d50d..8546797a5 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -9,6 +9,9 @@ pub(crate) mod replication; mod replication_state_test; mod vote; +#[cfg(test)] +mod startup_test; + use std::collections::BTreeMap; use std::sync::Arc; @@ -223,7 +226,7 @@ impl, S: RaftStorage> Ra } let has_log = self.last_log_id.index != u64::MIN; - let single = self.membership.membership.members.len() == 1; + let single = self.membership.membership.all_nodes().len() == 1; let is_voter = self.membership.membership.contains(&self.id); self.target_state = match (has_log, single, is_voter) { @@ -408,12 +411,14 @@ impl, S: RaftStorage> Ra // transition to the non-voter state as a signal for when it is safe to shutdown a node // being removed. self.membership = cfg; - if !self.membership.membership.contains(&self.id) { + if self.membership.membership.contains(&self.id) { + if self.target_state == State::NonVoter { + // The node is a NonVoter and the new config has it configured as a normal member. + // Transition to follower. + self.set_target_state(State::Follower); + } + } else { self.set_target_state(State::NonVoter); - } else if self.target_state == State::NonVoter && self.membership.membership.members.contains(&self.id) { - // The node is a NonVoter and the new config has it configured as a normal member. - // Transition to follower. - self.set_target_state(State::Follower); } Ok(()) } @@ -695,13 +700,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage .membership .membership .all_nodes() - .into_iter() - .filter(|elem| elem != &self.core.id) + .iter() + .filter(|elem| *elem != &self.core.id) .collect::>(); for target in targets { - let state = self.spawn_replication_stream(target, None); - self.nodes.insert(target, state); + let state = self.spawn_replication_stream(*target, None); + self.nodes.insert(*target, state); } // Setup state as leader. @@ -859,8 +864,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Setup initial state per term. self.votes_granted_old = 1; // We must vote for ourselves per the Raft spec. - self.votes_needed_old = ((self.core.membership.membership.members.len() / 2) + 1) as u64; // Just need a majority. - if let Some(nodes) = &self.core.membership.membership.members_after_consensus { + self.votes_needed_old = ((self.core.membership.membership.get_ith_config(0).unwrap().len() / 2) + 1) as u64; // Just need a majority. + if let Some(nodes) = self.core.membership.membership.get_ith_config(1) { self.votes_granted_new = 1; // We must vote for ourselves per the Raft spec. self.votes_needed_new = ((nodes.len() / 2) + 1) as u64; // Just need a majority. } diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index c8f6997b0..4a1f4f8b0 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -167,12 +167,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage #[tracing::instrument(level = "trace", skip(self))] fn calc_commit_index(&self) -> u64 { - let c0_index = self.calc_members_commit_index(&self.core.membership.membership.members, "c0"); + let c0_index = self.calc_members_commit_index(self.core.membership.membership.get_ith_config(0).unwrap(), "c0"); // If we are in joint consensus, then calculate the new commit index of the new membership config nodes. let mut c1_index = c0_index; // Defaults to just matching C0. - if let Some(members) = &self.core.membership.membership.members_after_consensus { + if let Some(members) = &self.core.membership.membership.get_ith_config(1) { c1_index = self.calc_members_commit_index(members, "c1"); } diff --git a/async-raft/src/core/startup_test.rs b/async-raft/src/core/startup_test.rs new file mode 100644 index 000000000..bca2bcd8c --- /dev/null +++ b/async-raft/src/core/startup_test.rs @@ -0,0 +1,5 @@ +#[test] +fn test_raft_core_initial_state() -> anyhow::Result<()> { + // TODO(xp): test initial state decided by has_log, is single and is_voter + Ok(()) +} diff --git a/async-raft/src/core/vote.rs b/async-raft/src/core/vote.rs index c6248f6c9..a104558dc 100644 --- a/async-raft/src/core/vote.rs +++ b/async-raft/src/core/vote.rs @@ -143,19 +143,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // If peer granted vote, then update campaign state. if res.vote_granted { // Handle vote responses from the C0 config group. - if self.core.membership.membership.members.contains(&target) { + if self.core.membership.membership.is_in_ith_config(0, &target) { self.votes_granted_old += 1; } // Handle vote responses from members of C1 config group. - if self - .core - .membership - .membership - .members_after_consensus - .as_ref() - .map(|members| members.contains(&target)) - .unwrap_or(false) - { + if self.core.membership.membership.is_in_ith_config(1, &target) { self.votes_granted_new += 1; } // If we've received enough votes from both config groups, then transition to leader state`. @@ -173,7 +165,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Spawn parallel vote requests to all cluster members. #[tracing::instrument(level = "trace", skip(self))] pub(super) fn spawn_parallel_vote_requests(&self) -> mpsc::Receiver<(VoteResponse, NodeId)> { - let all_members = self.core.membership.membership.all_nodes(); + let all_members = self.core.membership.membership.all_nodes().clone(); let (tx, rx) = mpsc::channel(all_members.len()); for member in all_members.into_iter().filter(|member| member != &self.core.id) { let rpc = VoteRequest::new( diff --git a/async-raft/src/lib.rs b/async-raft/src/lib.rs index 80b8c8f39..5c22cd7a3 100644 --- a/async-raft/src/lib.rs +++ b/async-raft/src/lib.rs @@ -16,6 +16,9 @@ pub mod storage; mod storage_error; mod summary; +#[cfg(test)] +mod membership_test; + pub use async_trait; use serde::de::DeserializeOwned; use serde::Serialize; diff --git a/async-raft/src/membership_test.rs b/async-raft/src/membership_test.rs new file mode 100644 index 000000000..1d6793af7 --- /dev/null +++ b/async-raft/src/membership_test.rs @@ -0,0 +1,87 @@ +use maplit::btreeset; + +use crate::raft::MembershipConfig; + +#[test] +fn test_membership() -> anyhow::Result<()> { + let m1 = MembershipConfig::new_multi(vec![btreeset! {1}]); + let m123 = MembershipConfig::new_multi(vec![btreeset! {1,2,3}]); + let m123_345 = MembershipConfig::new_multi(vec![btreeset! {1,2,3}, btreeset! {3,4,5}]); + + assert_eq!(Some(btreeset! {1}), m1.get_ith_config(0).cloned()); + assert_eq!(Some(btreeset! {1,2,3}), m123.get_ith_config(0).cloned()); + assert_eq!(Some(btreeset! {1,2,3}), m123_345.get_ith_config(0).cloned()); + + assert_eq!(None, m1.get_ith_config(1).cloned()); + assert_eq!(None, m123.get_ith_config(1).cloned()); + assert_eq!(Some(btreeset! {3,4,5}), m123_345.get_ith_config(1).cloned()); + + assert!(m1.is_in_ith_config(0, &1)); + assert!(!m1.is_in_ith_config(0, &2)); + assert!(!m1.is_in_ith_config(1, &1)); + assert!(!m1.is_in_ith_config(1, &2)); + + assert!(m123.is_in_ith_config(0, &1)); + assert!(m123.is_in_ith_config(0, &2)); + assert!(!m123.is_in_ith_config(1, &1)); + assert!(!m123.is_in_ith_config(1, &2)); + + assert!(m123_345.is_in_ith_config(0, &1)); + assert!(m123_345.is_in_ith_config(0, &2)); + assert!(!m123_345.is_in_ith_config(1, &1)); + assert!(m123_345.is_in_ith_config(1, &4)); + + assert_eq!(vec![1], m1.ith_config(0)); + assert_eq!(vec![1, 2, 3], m123.ith_config(0)); + assert_eq!(vec![1, 2, 3], m123_345.ith_config(0)); + assert_eq!(vec![3, 4, 5], m123_345.ith_config(1)); + + assert_eq!(&btreeset! {1}, m1.all_nodes()); + assert_eq!(&btreeset! {1,2,3}, m123.all_nodes()); + assert_eq!(&btreeset! {1,2,3,4,5}, m123_345.all_nodes()); + + assert!(!m1.contains(&0)); + assert!(m1.contains(&1)); + assert!(m123_345.contains(&4)); + assert!(!m123_345.contains(&6)); + + assert!(!m123.is_in_joint_consensus()); + assert!(m123_345.is_in_joint_consensus()); + + assert_eq!( + MembershipConfig::new_single(btreeset! {3,4,5}), + m123_345.to_final_config() + ); + + Ok(()) +} + +#[test] +fn test_membership_update() -> anyhow::Result<()> { + // --- replace + + let mut m123 = MembershipConfig::new_single(btreeset! {1,2,3}); + m123.replace(vec![btreeset! {2,3}, btreeset! {3,4}]); + + assert_eq!(&btreeset! {2,3,4}, m123.all_nodes()); + assert_eq!(&vec![btreeset! {2,3}, btreeset! {3,4}], m123.get_configs()); + + // --- push + + m123.push(btreeset! {3,5}); + + assert_eq!(&btreeset! {2,3,4,5}, m123.all_nodes()); + assert_eq!( + &vec![btreeset! {2,3}, btreeset! {3,4}, btreeset! {3,5}], + m123.get_configs() + ); + + // --- to final + + let got = m123.to_final_config(); + + assert_eq!(&btreeset! {3,5}, got.all_nodes()); + assert_eq!(&vec![btreeset! {3,5}], got.get_configs()); + + Ok(()) +} diff --git a/async-raft/src/metrics.rs b/async-raft/src/metrics.rs index 36dbfb28f..e0fc807b8 100644 --- a/async-raft/src/metrics.rs +++ b/async-raft/src/metrics.rs @@ -190,7 +190,7 @@ impl Wait { #[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn members(&self, want_members: BTreeSet, msg: impl ToString) -> Result { self.metrics( - |x| x.membership_config.membership.members == want_members, + |x| x.membership_config.membership.get_ith_config(0).cloned().unwrap() == want_members, &format!("{} .membership_config.members -> {:?}", msg.to_string(), want_members), ) .await @@ -204,12 +204,8 @@ impl Wait { msg: impl ToString, ) -> Result { self.metrics( - |x| x.membership_config.membership.members_after_consensus == want_members, - &format!( - "{} .membership_config.members_after_consensus -> {:?}", - msg.to_string(), - want_members - ), + |x| x.membership_config.membership.get_ith_config(1) == want_members.as_ref(), + &format!("{} .membership_config.next -> {:?}", msg.to_string(), want_members), ) .await } diff --git a/async-raft/src/metrics_wait_test.rs b/async-raft/src/metrics_wait_test.rs index cd2e3949c..aeb7f3f76 100644 --- a/async-raft/src/metrics_wait_test.rs +++ b/async-raft/src/metrics_wait_test.rs @@ -74,14 +74,17 @@ async fn test_wait() -> anyhow::Result<()> { let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; let mut update = init.clone(); - update.membership_config.membership.members = btreeset![1, 2]; + update.membership_config.membership.replace(vec![btreeset![1, 2]]); let rst = tx.send(update); assert!(rst.is_ok()); }); let got = w.members(btreeset![1, 2], "members").await?; h.await?; - assert_eq!(btreeset![1, 2], got.membership_config.membership.members); + assert_eq!( + btreeset![1, 2], + got.membership_config.membership.get_ith_config(0).unwrap().clone() + ); } { @@ -91,7 +94,7 @@ async fn test_wait() -> anyhow::Result<()> { let h = tokio::spawn(async move { sleep(Duration::from_millis(10)).await; let mut update = init.clone(); - update.membership_config.membership.members_after_consensus = Some(btreeset![1, 2]); + update.membership_config.membership.push(btreeset![1, 2]); let rst = tx.send(update); assert!(rst.is_ok()); }); @@ -100,7 +103,7 @@ async fn test_wait() -> anyhow::Result<()> { assert_eq!( Some(btreeset![1, 2]), - got.membership_config.membership.members_after_consensus + got.membership_config.membership.get_ith_config(1).cloned() ); } @@ -181,10 +184,7 @@ fn init_wait_test() -> (RaftMetrics, Wait, watch::Sender) { current_leader: None, membership_config: EffectiveMembership { log_id: LogId::default(), - membership: MembershipConfig { - members: Default::default(), - members_after_consensus: None, - }, + membership: MembershipConfig::new_single(btreeset! {}), }, snapshot: LogId { term: 0, index: 0 }, diff --git a/async-raft/src/raft.rs b/async-raft/src/raft.rs index a36d55395..6af414d6d 100644 --- a/async-raft/src/raft.rs +++ b/async-raft/src/raft.rs @@ -5,6 +5,7 @@ use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; +use maplit::btreeset; use serde::Deserialize; use serde::Serialize; use tokio::sync::mpsc; @@ -615,61 +616,95 @@ impl MessageSummary for EntryPayload { /// It could be a joint of one, two or more members, i.e., a quorum requires a majority of every members #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct MembershipConfig { - /// All members of the Raft cluster. - pub members: BTreeSet, + /// Multi configs. + configs: Vec>, - /// All members of the Raft cluster after joint consensus is finalized. - /// - /// The presence of a value here indicates that the config is in joint consensus. - pub members_after_consensus: Option>, + /// Cache of all node ids. + all_nodes: BTreeSet, } impl MembershipConfig { - /// Get an iterator over all nodes in the current config. - pub fn all_nodes(&self) -> BTreeSet { - let mut all = self.members.clone(); - if let Some(members) = &self.members_after_consensus { - all.extend(members); + pub fn new_single(members: BTreeSet) -> Self { + let configs = vec![members]; + let all_nodes = Self::build_all_nodes(&configs); + MembershipConfig { configs, all_nodes } + } + + pub fn new_multi(configs: Vec>) -> Self { + let all_nodes = Self::build_all_nodes(&configs); + MembershipConfig { configs, all_nodes } + } + + pub fn all_nodes(&self) -> &BTreeSet { + &self.all_nodes + } + + pub fn replace(&mut self, new_configs: Vec>) { + self.configs = new_configs; + self.all_nodes = Self::build_all_nodes(&self.configs); + } + + pub fn push(&mut self, new_config: BTreeSet) { + self.configs.push(new_config); + self.all_nodes = Self::build_all_nodes(&self.configs); + } + + pub fn get_configs(&self) -> &Vec> { + &self.configs + } + + pub fn get_ith_config(&self, i: usize) -> Option<&BTreeSet> { + self.configs.get(i) + } + + pub fn is_in_ith_config(&self, i: usize, id: &u64) -> bool { + if let Some(c) = self.configs.get(i) { + c.contains(id) + } else { + false } - all + } + + pub fn ith_config(&self, i: usize) -> Vec { + self.configs[i].iter().cloned().collect() } /// Check if the given NodeId exists in this membership config. /// /// When in joint consensus, this will check both config groups. pub fn contains(&self, x: &NodeId) -> bool { - self.members.contains(x) - || if let Some(members) = &self.members_after_consensus { - members.contains(x) - } else { - false + for c in self.configs.iter() { + if c.contains(x) { + return true; } + } + false } /// Check to see if the config is currently in joint consensus. pub fn is_in_joint_consensus(&self) -> bool { - self.members_after_consensus.is_some() + self.configs.len() > 1 } // TODO(xp): rename this /// Create a new initial config containing only the given node ID. pub fn new_initial(id: NodeId) -> Self { - let mut members = BTreeSet::new(); - members.insert(id); - Self { - members, - members_after_consensus: None, - } + MembershipConfig::new_single(btreeset! {id}) } pub fn to_final_config(&self) -> Self { - match self.members_after_consensus { - None => self.clone(), - Some(ref m) => MembershipConfig { - members: m.clone(), - members_after_consensus: None, - }, + assert!(!self.configs.is_empty()); + + let last = self.configs.last().cloned().unwrap(); + MembershipConfig::new_single(last) + } + + fn build_all_nodes(configs: &[BTreeSet]) -> BTreeSet { + let mut nodes = BTreeSet::new(); + for config in configs.iter() { + nodes.extend(config) } + nodes } } diff --git a/async-raft/tests/append_updates_membership.rs b/async-raft/tests/append_updates_membership.rs index b482f6d51..f8951a300 100644 --- a/async-raft/tests/append_updates_membership.rs +++ b/async-raft/tests/append_updates_membership.rs @@ -52,18 +52,12 @@ async fn append_updates_membership() -> Result<()> { ent(1, 1), Entry { log_id: LogId { term: 1, index: 2 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {1,2}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2})), }, ent(1, 3), Entry { log_id: LogId { term: 1, index: 4 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {1,2,3,4}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3,4})), }, ent(1, 5), ], diff --git a/async-raft/tests/client_writes.rs b/async-raft/tests/client_writes.rs index b49cc4832..38b84effc 100644 --- a/async-raft/tests/client_writes.rs +++ b/async-raft/tests/client_writes.rs @@ -106,10 +106,7 @@ async fn client_writes() -> Result<()> { want, Some(0), LogId { term: 1, index: want }, - Some(((5000..5100).into(), 1, MembershipConfig { - members: btreeset![0, 1, 2], - members_after_consensus: None, - })), + Some(((5000..5100).into(), 1, MembershipConfig::new_single(btreeset! {0,1,2}))), ) .await?; diff --git a/async-raft/tests/compaction.rs b/async-raft/tests/compaction.rs index 97d002248..c758fa5cd 100644 --- a/async-raft/tests/compaction.rs +++ b/async-raft/tests/compaction.rs @@ -76,10 +76,7 @@ async fn compaction() -> Result<()> { n_logs, Some(0), LogId { term: 1, index: n_logs }, - Some((n_logs.into(), 1, MembershipConfig { - members: btreeset![0], - members_after_consensus: None, - })), + Some((n_logs.into(), 1, MembershipConfig::new_single(btreeset! {0}))), ) .await?; @@ -110,10 +107,11 @@ async fn compaction() -> Result<()> { assert_eq!(LogId { term: 1, index: 51 }, logs[0].log_id) } - let expected_snap = Some((snapshot_threshold.into(), 1, MembershipConfig { - members: btreeset![0u64], - members_after_consensus: None, - })); + let expected_snap = Some(( + snapshot_threshold.into(), + 1, + MembershipConfig::new_single(btreeset! {0}), + )); router .assert_storage_state( 1, diff --git a/async-raft/tests/elect_compare_last_log.rs b/async-raft/tests/elect_compare_last_log.rs index da8cf557a..aa1c9bccf 100644 --- a/async-raft/tests/elect_compare_last_log.rs +++ b/async-raft/tests/elect_compare_last_log.rs @@ -44,10 +44,7 @@ async fn elect_compare_last_log() -> Result<()> { sto0.append_to_log(&[&Entry { log_id: LogId { term: 2, index: 1 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {0,1}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {0,1})), }]) .await?; } @@ -63,10 +60,7 @@ async fn elect_compare_last_log() -> Result<()> { sto1.append_to_log(&[ &Entry { log_id: LogId { term: 1, index: 1 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {0,1}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {0,1})), }, &Entry { log_id: LogId { term: 1, index: 2 }, diff --git a/async-raft/tests/fixtures/mod.rs b/async-raft/tests/fixtures/mod.rs index 5e738ea86..a9bae71a6 100644 --- a/async-raft/tests/fixtures/mod.rs +++ b/async-raft/tests/fixtures/mod.rs @@ -558,16 +558,16 @@ impl RaftRouter { "node {} has last_log_index {}, expected 0", node.id, node.last_log_index ); - let members = node.membership_config.membership.members.iter().collect::>(); + let members = node.membership_config.membership.ith_config(0); assert_eq!( members, - vec![&node.id], + vec![node.id], "node {0} has membership {1:?}, expected [{0}]", node.id, members ); assert!( - node.membership_config.membership.members_after_consensus.is_none(), + !node.membership_config.membership.is_in_joint_consensus(), "node {} is in joint consensus, expected uniform consensus", node.id ); @@ -638,7 +638,7 @@ impl RaftRouter { "node {} has last_log_index {}, expected {}", node.id, node.last_log_index, expected_last_log ); - let mut members = node.membership_config.membership.members.iter().cloned().collect::>(); + let mut members = node.membership_config.membership.ith_config(0); members.sort_unstable(); assert_eq!( members, all_nodes, @@ -646,7 +646,7 @@ impl RaftRouter { node.id, members, all_nodes ); assert!( - node.membership_config.membership.members_after_consensus.is_none(), + !node.membership_config.membership.is_in_joint_consensus(), "node {} was not in uniform consensus state", node.id ); diff --git a/async-raft/tests/initialization.rs b/async-raft/tests/initialization.rs index ea729c438..585aca473 100644 --- a/async-raft/tests/initialization.rs +++ b/async-raft/tests/initialization.rs @@ -64,16 +64,13 @@ async fn initialization() -> Result<()> { panic!("expect Membership payload") } }; - assert_eq!(btreeset![0, 1, 2], mem.members); + assert_eq!(btreeset![0, 1, 2], mem.get_ith_config(0).cloned().unwrap()); let sm_mem = sto.last_applied_state().await?.1; assert_eq!( Some(EffectiveMembership { log_id: LogId { term: 1, index: 1 }, - membership: MembershipConfig { - members: btreeset![0, 1, 2], - members_after_consensus: None, - } + membership: MembershipConfig::new_single(btreeset! {0,1,2}) }), sm_mem ); diff --git a/async-raft/tests/members_leader_fix_partial.rs b/async-raft/tests/members_leader_fix_partial.rs index daecfcee8..1606706bb 100644 --- a/async-raft/tests/members_leader_fix_partial.rs +++ b/async-raft/tests/members_leader_fix_partial.rs @@ -43,10 +43,7 @@ async fn members_leader_fix_partial() -> Result<()> { term: 1, index: want + 1, }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {0}, - members_after_consensus: Some(btreeset! {0,1,2}), - }), + payload: EntryPayload::Membership(MembershipConfig::new_multi(vec![btreeset! {0}, btreeset! {0,1,2}])), }]) .await?; } diff --git a/async-raft/tests/snapshot_chunk_size.rs b/async-raft/tests/snapshot_chunk_size.rs index 1d4b07aa6..7e1f16132 100644 --- a/async-raft/tests/snapshot_chunk_size.rs +++ b/async-raft/tests/snapshot_chunk_size.rs @@ -59,10 +59,7 @@ async fn snapshot_chunk_size() -> Result<()> { router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await; want = snapshot_threshold; - let want_snap = Some((want.into(), 1, MembershipConfig { - members: btreeset![0u64], - members_after_consensus: None, - })); + let want_snap = Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))); router.wait_for_log(&btreeset![0], want, None, "send log to trigger snapshot").await?; router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?; @@ -74,10 +71,7 @@ async fn snapshot_chunk_size() -> Result<()> { router.new_raft_node(1).await; router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); - let want_snap = Some((want.into(), 1, MembershipConfig { - members: btreeset![0u64], - members_after_consensus: None, - })); + let want_snap = Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))); router.wait_for_log(&btreeset![0, 1], want, None, "add non-voter").await?; router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, None, "").await?; diff --git a/async-raft/tests/snapshot_ge_half_threshold.rs b/async-raft/tests/snapshot_ge_half_threshold.rs index 9f4e30d6b..a8e1dc96a 100644 --- a/async-raft/tests/snapshot_ge_half_threshold.rs +++ b/async-raft/tests/snapshot_ge_half_threshold.rs @@ -73,10 +73,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> { want, Some(0), LogId { term: 1, index: want }, - Some((want.into(), 1, MembershipConfig { - members: btreeset![0], - members_after_consensus: None, - })), + Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))), ) .await?; } @@ -93,10 +90,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> { router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); router.wait_for_log(&btreeset![0, 1], want, None, "add non-voter").await?; - let expected_snap = Some((want.into(), 1, MembershipConfig { - members: btreeset![0u64], - members_after_consensus: None, - })); + let expected_snap = Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))); router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, None, "").await?; router .assert_storage_state( diff --git a/async-raft/tests/snapshot_overrides_membership.rs b/async-raft/tests/snapshot_overrides_membership.rs index d9df35481..78c7fa0c6 100644 --- a/async-raft/tests/snapshot_overrides_membership.rs +++ b/async-raft/tests/snapshot_overrides_membership.rs @@ -77,10 +77,7 @@ async fn snapshot_overrides_membership() -> Result<()> { want, Some(0), LogId { term: 1, index: want }, - Some((want.into(), 1, MembershipConfig { - members: btreeset![0], - members_after_consensus: None, - })), + Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))), ) .await?; } @@ -99,10 +96,7 @@ async fn snapshot_overrides_membership() -> Result<()> { prev_log_id: LogId::new(0, 0), entries: vec![Entry { log_id: LogId { term: 1, index: 1 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset![2, 3], - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {2,3})), }], leader_commit: 0, }; @@ -111,13 +105,7 @@ async fn snapshot_overrides_membership() -> Result<()> { tracing::info!("--- check that non-voter membership is affected"); { let m = sto.get_membership_config().await?; - assert_eq!( - MembershipConfig { - members: btreeset![2, 3], - members_after_consensus: None, - }, - m.membership - ); + assert_eq!(MembershipConfig::new_single(btreeset! {2,3}), m.membership); } } @@ -130,10 +118,7 @@ async fn snapshot_overrides_membership() -> Result<()> { router.wait_for_log(&btreeset![0, 1], want, timeout(), "add non-voter").await?; router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, timeout(), "").await?; - let expected_snap = Some((want.into(), 1, MembershipConfig { - members: btreeset![0u64], - members_after_consensus: None, - })); + let expected_snap = Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))); router .assert_storage_state( @@ -147,10 +132,7 @@ async fn snapshot_overrides_membership() -> Result<()> { let m = sto.get_membership_config().await?; assert_eq!( - MembershipConfig { - members: btreeset![0], - members_after_consensus: None, - }, + MembershipConfig::new_single(btreeset! {0}), m.membership, "membership should be overridden by the snapshot" ); diff --git a/async-raft/tests/snapshot_uses_prev_snap_membership.rs b/async-raft/tests/snapshot_uses_prev_snap_membership.rs index 469d97640..929a740cc 100644 --- a/async-raft/tests/snapshot_uses_prev_snap_membership.rs +++ b/async-raft/tests/snapshot_uses_prev_snap_membership.rs @@ -88,10 +88,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { } let m = sto0.get_membership_config().await?; assert_eq!( - MembershipConfig { - members: btreeset![0, 1], - members_after_consensus: None, - }, + MembershipConfig::new_single(btreeset! {0,1}), m.membership, "membership " ); @@ -131,10 +128,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { } let m = sto0.get_membership_config().await?; assert_eq!( - MembershipConfig { - members: btreeset![0, 1], - members_after_consensus: None, - }, + MembershipConfig::new_single(btreeset! {0,1}), m.membership, "membership " ); diff --git a/async-raft/tests/state_machien_apply_membership.rs b/async-raft/tests/state_machien_apply_membership.rs index 9c1e2725a..e79354344 100644 --- a/async-raft/tests/state_machien_apply_membership.rs +++ b/async-raft/tests/state_machien_apply_membership.rs @@ -53,10 +53,7 @@ async fn state_machine_apply_membership() -> Result<()> { assert_eq!( Some(EffectiveMembership { log_id: LogId { term: 1, index: 1 }, - membership: MembershipConfig { - members: btreeset![0], - members_after_consensus: None, - } + membership: MembershipConfig::new_single(btreeset! {0}) }), sto.last_applied_state().await?.1 ); @@ -98,10 +95,7 @@ async fn state_machine_apply_membership() -> Result<()> { assert_eq!( Some(EffectiveMembership { log_id: LogId { term: 1, index: 3 }, - membership: MembershipConfig { - members: btreeset![0, 1, 2], - members_after_consensus: None, - } + membership: MembershipConfig::new_single(btreeset! {0,1,2}) }), last_membership ); diff --git a/async-raft/tests/stepdown.rs b/async-raft/tests/stepdown.rs index ada6c0d08..87a719689 100644 --- a/async-raft/tests/stepdown.rs +++ b/async-raft/tests/stepdown.rs @@ -104,13 +104,13 @@ async fn stepdown() -> Result<()> { metrics.last_applied ); assert_eq!( - cfg.members, - btreeset![1, 2, 3], + cfg.get_configs().clone(), + vec![btreeset![1, 2, 3]], "expected old leader to have membership of [1, 2, 3], got {:?}", - cfg.members + cfg.get_configs() ); assert!( - cfg.members_after_consensus.is_none(), + !cfg.is_in_joint_consensus(), "expected old leader to be out of joint consensus" ); } diff --git a/memstore/src/test.rs b/memstore/src/test.rs index 3fbfb122a..ac639866b 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -132,13 +132,7 @@ where let membership = store.get_membership_config().await?; - assert_eq!( - MembershipConfig { - members: btreeset! {NODE_ID}, - members_after_consensus: None, - }, - membership.membership, - ); + assert_eq!(MembershipConfig::new_single(btreeset! {NODE_ID}), membership.membership,); Ok(()) } @@ -156,23 +150,14 @@ where }, &Entry { log_id: LogId { term: 1, index: 2 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {3,4,5}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {3,4,5})), }, ]) .await?; let mem = store.get_membership_config().await?; - assert_eq!( - MembershipConfig { - members: btreeset! {3,4,5}, - members_after_consensus: None, - }, - mem.membership, - ); + assert_eq!(MembershipConfig::new_single(btreeset! {3,4,5}), mem.membership,); } tracing::info!("--- membership presents in log, but smaller than last_applied, read from state machine"); @@ -180,22 +165,13 @@ where store .append_to_log(&[&Entry { log_id: (1, 1).into(), - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {1,2,3}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), }]) .await?; let mem = store.get_membership_config().await?; - assert_eq!( - MembershipConfig { - members: btreeset! {3, 4, 5}, - members_after_consensus: None, - }, - mem.membership, - ); + assert_eq!(MembershipConfig::new_single(btreeset! {3, 4, 5}), mem.membership,); } tracing::info!("--- membership presents in log and > sm.last_applied, read from log"); @@ -203,22 +179,13 @@ where store .append_to_log(&[&Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {1,2,3}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), }]) .await?; let mem = store.get_membership_config().await?; - assert_eq!( - MembershipConfig { - members: btreeset! {1,2,3}, - members_after_consensus: None, - }, - mem.membership, - ); + assert_eq!(MembershipConfig::new_single(btreeset! {1,2,3},), mem.membership,); } Ok(()) @@ -246,10 +213,7 @@ where ); assert_eq!( - MembershipConfig { - members: btreeset! {NODE_ID}, - members_after_consensus: None, - }, + MembershipConfig::new_single(btreeset! {NODE_ID}), initial.last_membership.membership, ); @@ -319,10 +283,7 @@ where }, &Entry { log_id: LogId { term: 1, index: 2 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {3,4,5}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {3,4,5})), }, ]) .await?; @@ -330,10 +291,7 @@ where let initial = store.get_initial_state().await?; assert_eq!( - MembershipConfig { - members: btreeset! {3,4,5}, - members_after_consensus: None, - }, + MembershipConfig::new_single(btreeset! {3,4,5}), initial.last_membership.membership, ); } @@ -343,20 +301,14 @@ where store .append_to_log(&[&Entry { log_id: (1, 1).into(), - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {1,2,3}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), }]) .await?; let initial = store.get_initial_state().await?; assert_eq!( - MembershipConfig { - members: btreeset! {3, 4, 5}, - members_after_consensus: None, - }, + MembershipConfig::new_single(btreeset! {3,4,5}), initial.last_membership.membership, ); } @@ -366,20 +318,14 @@ where store .append_to_log(&[&Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {1,2,3}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), }]) .await?; let initial = store.get_initial_state().await?; assert_eq!( - MembershipConfig { - members: btreeset! {1,2,3}, - members_after_consensus: None, - }, + MembershipConfig::new_single(btreeset! {1,2,3}), initial.last_membership.membership, ); } @@ -683,10 +629,7 @@ where store .apply_to_state_machine(&[&Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {1,2}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2})), }]) .await?; @@ -695,10 +638,7 @@ where assert_eq!( Some(EffectiveMembership { log_id: LogId { term: 1, index: 3 }, - membership: MembershipConfig { - members: btreeset! {1,2}, - members_after_consensus: None - } + membership: MembershipConfig::new_single(btreeset! {1,2}) }), membership ); @@ -718,10 +658,7 @@ where assert_eq!( Some(EffectiveMembership { log_id: LogId { term: 1, index: 3 }, - membership: MembershipConfig { - members: btreeset! {1,2}, - members_after_consensus: None - } + membership: MembershipConfig::new_single(btreeset! {1,2}) }), membership ); @@ -989,10 +926,7 @@ where }, &Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {1,2,3}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), }, ]) .await?; @@ -1004,10 +938,7 @@ where }, &Entry { log_id: LogId { term: 2, index: 2 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {3,4,5}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {3,4,5})), }, ]) .await?; @@ -1045,10 +976,7 @@ where }, &Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {1,2,3}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), }, ]) .await?; @@ -1061,10 +989,7 @@ where }, &Entry { log_id: LogId { term: 2, index: 2 }, - payload: EntryPayload::Membership(MembershipConfig { - members: btreeset! {3,4,5}, - members_after_consensus: None, - }), + payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {3,4,5})), }, ]) .await?;