From 3511e439e23e90b45ad7daa6274070b07d5fa576 Mon Sep 17 00:00:00 2001 From: drdr xp Date: Mon, 27 Dec 2021 14:47:12 +0800 Subject: [PATCH] change: rename MembershipConfig to Membership --- async-raft/src/core/admin.rs | 30 ++++++------ async-raft/src/core/client.rs | 12 ++--- async-raft/src/core/mod.rs | 49 +++++++++---------- async-raft/src/core/replication.rs | 30 +++++------- async-raft/src/core/vote.rs | 4 +- async-raft/src/error.rs | 7 +-- async-raft/src/membership_test.rs | 23 ++++----- async-raft/src/metrics.rs | 4 +- async-raft/src/metrics_wait_test.rs | 4 +- async-raft/src/raft.rs | 18 +++---- async-raft/src/storage.rs | 6 +-- async-raft/tests/append_updates_membership.rs | 6 +-- async-raft/tests/client_writes.rs | 4 +- async-raft/tests/compaction.rs | 10 ++-- async-raft/tests/elect_compare_last_log.rs | 6 +-- async-raft/tests/fixtures/mod.rs | 4 +- async-raft/tests/initialization.rs | 4 +- .../tests/members_leader_fix_partial.rs | 4 +- async-raft/tests/snapshot_chunk_size.rs | 6 +-- .../tests/snapshot_ge_half_threshold.rs | 6 +-- .../tests/snapshot_overrides_membership.rs | 12 ++--- .../snapshot_uses_prev_snap_membership.rs | 14 ++---- .../tests/state_machien_apply_membership.rs | 6 +-- memstore/src/lib.rs | 11 ++--- memstore/src/test.rs | 42 ++++++++-------- 25 files changed, 147 insertions(+), 175 deletions(-) diff --git a/async-raft/src/core/admin.rs b/async-raft/src/core/admin.rs index 4691cc2f1..231141803 100644 --- a/async-raft/src/core/admin.rs +++ b/async-raft/src/core/admin.rs @@ -14,7 +14,7 @@ use crate::error::InitializeError; use crate::raft::AddNonVoterResponse; use crate::raft::ClientWriteRequest; use crate::raft::ClientWriteResponse; -use crate::raft::MembershipConfig; +use crate::raft::Membership; use crate::raft::RaftRespTx; use crate::AppData; use crate::AppDataResponse; @@ -43,15 +43,15 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Build a new membership config from given init data & assign it as the new cluster // membership config in memory only. - self.core.membership = EffectiveMembership { + self.core.effective_membership = EffectiveMembership { log_id: LogId { term: 1, index: 1 }, - membership: MembershipConfig::new_single(members), + membership: Membership::new_single(members), }; // Become a candidate and start campaigning for leadership. If this node is the only node // in the cluster, then become leader without holding an election. If members len == 1, we // know it is our ID due to the above code where we ensure our own ID is present. - if self.core.membership.membership.all_nodes().len() == 1 { + if self.core.effective_membership.membership.all_nodes().len() == 1 { self.core.current_term += 1; self.core.voted_for = Some(self.core.id); self.core.set_target_state(State::Leader); @@ -121,10 +121,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // The last membership config is not committed yet. // Can not process the next one. - if self.core.commit_index < self.core.membership.log_id.index { + if self.core.commit_index < self.core.effective_membership.log_id.index { let _ = tx.send(Err(ClientWriteError::ChangeMembershipError( ChangeMembershipError::InProgress { - membership_log_id: self.core.membership.log_id, + membership_log_id: self.core.effective_membership.log_id, }, ))); return; @@ -132,7 +132,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let new_config; - let curr = &self.core.membership.membership; + let curr = &self.core.effective_membership.membership; if let Some(next_membership) = curr.get_ith_config(1) { // When it is in joint state, it is only allowed to change to the `members_after_consensus` @@ -145,11 +145,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage ))); return; } else { - new_config = MembershipConfig::new_single(next_membership.clone()); + new_config = Membership::new_single(next_membership.clone()); } } else { // currently it is uniform config, enter joint state - new_config = MembershipConfig::new_multi(vec![curr.get_ith_config(0).unwrap().clone(), members.clone()]); + new_config = Membership::new_multi(vec![curr.get_ith_config(0).unwrap().clone(), members.clone()]); } tracing::debug!(?new_config, "new_config"); @@ -165,7 +165,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // TODO(xp): 111 test adding a node that is not non-voter. // TODO(xp): 111 test adding a node that is lagging. - for new_node in members.difference(&self.core.membership.membership.get_ith_config(0).unwrap()) { + for new_node in members.difference(&self.core.effective_membership.membership.get_ith_config(0).unwrap()) { match self.nodes.get(&new_node) { // Node is ready to join. Some(node) => { @@ -207,14 +207,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage #[tracing::instrument(level = "debug", skip(self, resp_tx), fields(id=self.core.id))] pub async fn append_membership_log( &mut self, - mem: MembershipConfig, + mem: Membership, resp_tx: Option, ClientWriteError>>, ) -> Result<(), RaftError> { let payload = ClientWriteRequest::::new_config(mem.clone()); let res = self.append_payload_to_log(payload.entry).await; // Caveat: membership must be updated before commit check is done with the new config. - self.core.membership = EffectiveMembership { + self.core.effective_membership = EffectiveMembership { log_id: self.core.last_log_id, membership: mem, }; @@ -253,7 +253,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let index = log_id.index; // Step down if needed. - if !self.core.membership.membership.contains(&self.core.id) { + if !self.core.effective_membership.membership.contains(&self.core.id) { tracing::debug!("raft node is stepping down"); // TODO(xp): transfer leadership @@ -262,7 +262,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage return; } - let membership = &self.core.membership.membership; + let membership = &self.core.effective_membership.membership; let all = membership.all_nodes(); for (id, state) in self.nodes.iter_mut() { @@ -274,7 +274,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage "set remove_after_commit for {} = {}, membership: {:?}", id, index, - self.core.membership + self.core.effective_membership ); state.remove_since = Some(index) diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index ff34d76d2..3f91220a5 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -58,7 +58,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage let last_index = self.core.last_log_id.index; let req: ClientWriteRequest = if last_index == 0 { - ClientWriteRequest::new_config(self.core.membership.membership.clone()) + ClientWriteRequest::new_config(self.core.effective_membership.membership.clone()) } else { ClientWriteRequest::new_blank_payload() }; @@ -95,7 +95,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Setup sentinel values to track when we've received majority confirmation of leadership. let mut c0_confirmed = 0usize; - let mems = &self.core.membership.membership; + let mems = &self.core.effective_membership.membership; // Will never be zero, as we don't allow it when proposing config changes. let len_members = mems.get_ith_config(0).unwrap().len(); @@ -128,7 +128,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Spawn parallel requests, all with the standard timeout for heartbeats. let mut pending = FuturesUnordered::new(); - let all_members = self.core.membership.membership.all_nodes(); + let all_members = self.core.effective_membership.membership.all_nodes(); for (id, node) in self.nodes.iter() { if !all_members.contains(id) { continue; @@ -179,11 +179,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage } // If the term is the same, then it means we are still the leader. - if self.core.membership.membership.get_ith_config(0).unwrap().contains(&target) { + if self.core.effective_membership.membership.get_ith_config(0).unwrap().contains(&target) { c0_confirmed += 1; } - let second = self.core.membership.membership.get_ith_config(1); + let second = self.core.effective_membership.membership.get_ith_config(1); if let Some(joint) = second { if joint.contains(&target) { @@ -260,7 +260,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // TODO(xp): calculate nodes set that need to replicate to, when updating membership // TODO(xp): Or add to-non-voter replication into self.nodes. - let all_members = self.core.membership.membership.all_nodes(); + let all_members = self.core.effective_membership.membership.all_nodes(); let nodes = self.nodes.keys().collect::>(); tracing::debug!(?nodes, ?all_members, "replicate_client_request"); diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index dee0ea304..1cc9cbc28 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -51,7 +51,7 @@ use crate::raft::ClientWriteRequest; use crate::raft::ClientWriteResponse; use crate::raft::Entry; use crate::raft::EntryPayload; -use crate::raft::MembershipConfig; +use crate::raft::Membership; use crate::raft::RaftMsg; use crate::raft::RaftRespTx; use crate::replication::RaftEvent; @@ -80,18 +80,19 @@ pub struct EffectiveMembership { /// The id of the log that applies this membership config pub log_id: LogId, - pub membership: MembershipConfig, + pub membership: Membership, } /// The core type implementing the Raft protocol. pub struct RaftCore, S: RaftStorage> { /// This node's ID. id: NodeId, + /// This node's runtime config. config: Arc, /// The cluster's current membership configuration. - membership: EffectiveMembership, + effective_membership: EffectiveMembership, /// The `RaftNetwork` implementation. network: Arc, @@ -102,23 +103,14 @@ pub struct RaftCore, S: RaftSt /// The target state of the system. target_state: State, - /// The index of the highest log entry known to be committed cluster-wide. - /// - /// The definition of a committed log is that the leader which has created the log has - /// successfully replicated the log to a majority of the cluster. This value is updated via - /// AppendEntries RPC from the leader, or if a node is the leader, it will update this value - /// as new entries have been successfully replicated to a majority of the cluster. + /// The index of the last known committed entry. /// - /// Is initialized to 0, and increases monotonically. This is always based on the leader's - /// commit index which is communicated to other members via the AppendEntries protocol. + /// I.e.: + /// - a log that is replicated to a quorum of the cluster and it is of the term of the leader. + /// - A quorum could be a joint quorum. commit_index: u64, /// The log id of the highest log entry which has been applied to the local state machine. - /// - /// Is initialized to 0,0 for a pristine node; else, for nodes with existing state it is - /// is initialized to the value returned from the `RaftStorage::get_initial_state` on startup. - /// This value increases following the `commit_index` as logs are applied to the state - /// machine (via the storage interface). last_applied: LogId, /// The current term. @@ -127,8 +119,10 @@ pub struct RaftCore, S: RaftSt /// the leader's term which is communicated to other members via the AppendEntries protocol, /// but this may also be incremented when a follower becomes a candidate. current_term: u64, + /// The ID of the current leader of the Raft cluster. current_leader: Option, + /// The ID of the candidate which received this node's vote for the current term. /// /// Each server will vote for at most one candidate in a given term, on a @@ -152,6 +146,7 @@ pub struct RaftCore, S: RaftSt /// The last time a heartbeat was received. last_heartbeat: Option, + /// The duration until the next election timeout. next_election_timeout: Option, @@ -159,7 +154,9 @@ pub struct RaftCore, S: RaftSt rx_compaction: mpsc::Receiver, rx_api: mpsc::UnboundedReceiver<(RaftMsg, Span)>, + tx_metrics: watch::Sender, + rx_shutdown: oneshot::Receiver<()>, } @@ -173,12 +170,12 @@ impl, S: RaftStorage> Ra tx_metrics: watch::Sender, rx_shutdown: oneshot::Receiver<()>, ) -> JoinHandle> { - let membership = MembershipConfig::new_initial(id); // This is updated from storage in the main loop. + let membership = Membership::new_initial(id); // This is updated from storage in the main loop. let (tx_compaction, rx_compaction) = mpsc::channel(1); let this = Self { id, config, - membership: EffectiveMembership { + effective_membership: EffectiveMembership { log_id: LogId::default(), membership, }, @@ -214,7 +211,7 @@ impl, S: RaftStorage> Ra self.last_log_id = state.last_log_id; self.current_term = state.hard_state.current_term; self.voted_for = state.hard_state.voted_for; - self.membership = state.last_membership.clone(); + self.effective_membership = state.last_membership.clone(); self.last_applied = state.last_applied; // NOTE: this is repeated here for clarity. It is unsafe to initialize the node's commit // index to any other value. The commit index must be determined by a leader after @@ -228,8 +225,8 @@ impl, S: RaftStorage> Ra } let has_log = self.last_log_id.index != u64::MIN; - let single = self.membership.membership.all_nodes().len() == 1; - let is_voter = self.membership.membership.contains(&self.id); + let single = self.effective_membership.membership.all_nodes().len() == 1; + let is_voter = self.effective_membership.membership.contains(&self.id); self.target_state = match (has_log, single, is_voter) { // A restarted raft that already received some logs but was not yet added to a cluster. @@ -298,7 +295,7 @@ impl, S: RaftStorage> Ra last_applied: self.last_applied.index, current_leader: self.current_leader, // TODO(xp): 111 metrics should also track the membership log id - membership_config: self.membership.clone(), + membership_config: self.effective_membership.clone(), snapshot: self.snapshot_last_log_id, leader_metrics, }); @@ -321,7 +318,7 @@ impl, S: RaftStorage> Ra /// Update core's target state, ensuring all invariants are upheld. #[tracing::instrument(level = "debug", skip(self), fields(id=self.id))] fn set_target_state(&mut self, target_state: State) { - if target_state == State::Follower && !self.membership.membership.contains(&self.id) { + if target_state == State::Follower && !self.effective_membership.membership.contains(&self.id) { self.target_state = State::NonVoter; } else { self.target_state = target_state; @@ -412,8 +409,8 @@ impl, S: RaftStorage> Ra // - the node has been removed from the cluster. The parent application can observe the // transition to the non-voter state as a signal for when it is safe to shutdown a node // being removed. - self.membership = cfg; - if self.membership.membership.contains(&self.id) { + self.effective_membership = cfg; + if self.effective_membership.membership.contains(&self.id) { if self.target_state == State::NonVoter { // The node is a NonVoter and the new config has it configured as a normal member. // Transition to follower. @@ -699,7 +696,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Spawn replication streams. let targets = self .core - .membership + .effective_membership .membership .all_nodes() .iter() diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index 30bc879df..f052de63c 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -34,7 +34,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage target: NodeId, caller_tx: Option>, ) -> ReplicationState { - let replstream = ReplicationStream::new( + let repl_stream = ReplicationStream::new( self.core.id, target, self.core.current_term, @@ -47,7 +47,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage ); ReplicationState { matched: LogId { term: 0, index: 0 }, - repl_stream: replstream, + repl_stream, remove_since: None, tx: caller_tx, } @@ -170,28 +170,29 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage #[tracing::instrument(level = "trace", skip(self))] fn calc_commit_index(&self) -> u64 { let repl_indexes = self.get_match_log_indexes(); - let committed = self.core.membership.membership.greatest_majority_value(&repl_indexes); + + let committed = self.core.effective_membership.membership.greatest_majority_value(&repl_indexes); + *committed.unwrap_or(&self.core.commit_index) } + /// Collect indexes of the greatest matching log on every replica(include the leader itself) fn get_match_log_indexes(&self) -> BTreeMap { - let node_ids = self.core.membership.membership.all_nodes(); + let node_ids = self.core.effective_membership.membership.all_nodes(); let mut res = BTreeMap::new(); for id in node_ids.iter() { - // this node is me, the leader let matched = if *id == self.core.id { self.core.last_log_id } else { let repl_state = self.nodes.get(id); - if let Some(x) = repl_state { - x.matched - } else { - LogId::new(0, 0) - } + repl_state.map(|x| x.matched).unwrap_or_default() }; + // Mismatching term can not prevent other replica with higher term log from being chosen as leader, + // and that new leader may overrides any lower term logs. + // Thus it is not considered as committed. if matched.term == self.core.current_term { res.insert(*id, matched.index); } @@ -200,8 +201,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage res } - /// Handle events from replication streams requesting for snapshot info. - #[tracing::instrument(level = "trace", skip(self, tx))] + /// A replication streams requesting for snapshot info. + #[tracing::instrument(level = "debug", skip(self, tx))] async fn handle_needs_snapshot( &mut self, _: NodeId, @@ -269,15 +270,10 @@ fn snapshot_is_within_half_of_threshold(snapshot_last_index: &u64, last_log_inde distance_from_line <= threshold / 2 } -////////////////////////////////////////////////////////////////////////////////////////////////// - #[cfg(test)] mod tests { use super::*; - ////////////////////////////////////////////////////////////////////////// - // snapshot_is_within_half_of_threshold ////////////////////////////////// - mod snapshot_is_within_half_of_threshold { use super::*; diff --git a/async-raft/src/core/vote.rs b/async-raft/src/core/vote.rs index 3e5a5f01c..ac8426ace 100644 --- a/async-raft/src/core/vote.rs +++ b/async-raft/src/core/vote.rs @@ -143,7 +143,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage if res.vote_granted { self.granted.insert(target); - if self.core.membership.membership.is_majority(&self.granted) { + if self.core.effective_membership.membership.is_majority(&self.granted) { tracing::debug!("transitioning to leader state as minimum number of votes have been received"); self.core.set_target_state(State::Leader); return Ok(()); @@ -157,7 +157,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage /// Spawn parallel vote requests to all cluster members. #[tracing::instrument(level = "trace", skip(self))] pub(super) fn spawn_parallel_vote_requests(&self) -> mpsc::Receiver<(VoteResponse, NodeId)> { - let all_members = self.core.membership.membership.all_nodes().clone(); + let all_members = self.core.effective_membership.membership.all_nodes().clone(); let (tx, rx) = mpsc::channel(all_members.len()); for member in all_members.into_iter().filter(|member| member != &self.core.id) { let rpc = VoteRequest::new( diff --git a/async-raft/src/error.rs b/async-raft/src/error.rs index 884601238..fd88202db 100644 --- a/async-raft/src/error.rs +++ b/async-raft/src/error.rs @@ -4,7 +4,7 @@ use std::collections::BTreeSet; use std::fmt::Debug; use std::time::Duration; -use crate::raft::MembershipConfig; +use crate::raft::Membership; use crate::raft_types::SnapshotSegmentId; use crate::LogId; use crate::NodeId; @@ -179,10 +179,7 @@ pub enum ChangeMembershipError { // TODO(xp): rename this error to some elaborated name. // TODO(xp): 111 test it #[error("now allowed to change from {curr:?} to {to:?}")] - Incompatible { - curr: MembershipConfig, - to: BTreeSet, - }, + Incompatible { curr: Membership, to: BTreeSet }, } #[derive(Debug, thiserror::Error)] diff --git a/async-raft/src/membership_test.rs b/async-raft/src/membership_test.rs index 4cf538139..c67de9c0f 100644 --- a/async-raft/src/membership_test.rs +++ b/async-raft/src/membership_test.rs @@ -3,14 +3,14 @@ use std::collections::BTreeMap; use maplit::btreemap; use maplit::btreeset; -use crate::raft::MembershipConfig; +use crate::raft::Membership; use crate::NodeId; #[test] fn test_membership() -> anyhow::Result<()> { - let m1 = MembershipConfig::new_multi(vec![btreeset! {1}]); - let m123 = MembershipConfig::new_multi(vec![btreeset! {1,2,3}]); - let m123_345 = MembershipConfig::new_multi(vec![btreeset! {1,2,3}, btreeset! {3,4,5}]); + let m1 = Membership::new_multi(vec![btreeset! {1}]); + let m123 = Membership::new_multi(vec![btreeset! {1,2,3}]); + let m123_345 = Membership::new_multi(vec![btreeset! {1,2,3}, btreeset! {3,4,5}]); assert_eq!(Some(btreeset! {1}), m1.get_ith_config(0).cloned()); assert_eq!(Some(btreeset! {1,2,3}), m123.get_ith_config(0).cloned()); @@ -37,10 +37,7 @@ fn test_membership() -> anyhow::Result<()> { assert!(!m123.is_in_joint_consensus()); assert!(m123_345.is_in_joint_consensus()); - assert_eq!( - MembershipConfig::new_single(btreeset! {3,4,5}), - m123_345.to_final_config() - ); + assert_eq!(Membership::new_single(btreeset! {3,4,5}), m123_345.to_final_config()); Ok(()) } @@ -49,7 +46,7 @@ fn test_membership() -> anyhow::Result<()> { fn test_membership_update() -> anyhow::Result<()> { // --- replace - let mut m123 = MembershipConfig::new_single(btreeset! {1,2,3}); + let mut m123 = Membership::new_single(btreeset! {1,2,3}); m123.replace(vec![btreeset! {2,3}, btreeset! {3,4}]); assert_eq!(&btreeset! {2,3,4}, m123.all_nodes()); @@ -78,7 +75,7 @@ fn test_membership_update() -> anyhow::Result<()> { #[test] fn test_membership_majority() -> anyhow::Result<()> { { - let m12345 = MembershipConfig::new_single(btreeset! {1,2,3,4,5}); + let m12345 = Membership::new_single(btreeset! {1,2,3,4,5}); assert!(!m12345.is_majority(&btreeset! {0})); assert!(!m12345.is_majority(&btreeset! {0,1,2})); assert!(!m12345.is_majority(&btreeset! {6,7,8})); @@ -88,7 +85,7 @@ fn test_membership_majority() -> anyhow::Result<()> { } { - let m12345_678 = MembershipConfig::new_multi(vec![btreeset! {1,2,3,4,5}, btreeset! {6,7,8}]); + let m12345_678 = Membership::new_multi(vec![btreeset! {1,2,3,4,5}, btreeset! {6,7,8}]); assert!(!m12345_678.is_majority(&btreeset! {0})); assert!(!m12345_678.is_majority(&btreeset! {0,1,2})); assert!(!m12345_678.is_majority(&btreeset! {6,7,8})); @@ -103,7 +100,7 @@ fn test_membership_majority() -> anyhow::Result<()> { #[test] fn test_membership_greatest_majority_value() -> anyhow::Result<()> { { - let m123 = MembershipConfig::new_single(btreeset! {1,2,3}); + let m123 = Membership::new_single(btreeset! {1,2,3}); assert_eq!(None, m123.greatest_majority_value(&BTreeMap::::new())); assert_eq!(None, m123.greatest_majority_value(&btreemap! {0=>10})); assert_eq!(None, m123.greatest_majority_value(&btreemap! {0=>10,1=>10})); @@ -115,7 +112,7 @@ fn test_membership_greatest_majority_value() -> anyhow::Result<()> { } { - let m123_678 = MembershipConfig::new_multi(vec![btreeset! {1,2,3}, btreeset! {6,7,8}]); + let m123_678 = Membership::new_multi(vec![btreeset! {1,2,3}, btreeset! {6,7,8}]); assert_eq!(None, m123_678.greatest_majority_value(&btreemap! {0=>10})); assert_eq!(None, m123_678.greatest_majority_value(&btreemap! {0=>10,1=>10})); assert_eq!(None, m123_678.greatest_majority_value(&btreemap! {0=>10,1=>10,2=>20})); diff --git a/async-raft/src/metrics.rs b/async-raft/src/metrics.rs index e0fc807b8..597928489 100644 --- a/async-raft/src/metrics.rs +++ b/async-raft/src/metrics.rs @@ -19,7 +19,7 @@ use tokio::time::Instant; use crate::core::EffectiveMembership; use crate::core::State; -use crate::raft::MembershipConfig; +use crate::raft::Membership; use crate::LogId; use crate::NodeId; use crate::RaftError; @@ -60,7 +60,7 @@ pub struct LeaderMetrics { impl RaftMetrics { pub(crate) fn new_initial(id: NodeId) -> Self { - let membership_config = MembershipConfig::new_initial(id); + let membership_config = Membership::new_initial(id); Self { id, state: State::Follower, diff --git a/async-raft/src/metrics_wait_test.rs b/async-raft/src/metrics_wait_test.rs index aeb7f3f76..28920f75b 100644 --- a/async-raft/src/metrics_wait_test.rs +++ b/async-raft/src/metrics_wait_test.rs @@ -7,7 +7,7 @@ use tokio::time::sleep; use crate::core::EffectiveMembership; use crate::metrics::Wait; use crate::metrics::WaitError; -use crate::raft::MembershipConfig; +use crate::raft::Membership; use crate::LogId; use crate::RaftMetrics; use crate::State; @@ -184,7 +184,7 @@ fn init_wait_test() -> (RaftMetrics, Wait, watch::Sender) { current_leader: None, membership_config: EffectiveMembership { log_id: LogId::default(), - membership: MembershipConfig::new_single(btreeset! {}), + membership: Membership::new_single(btreeset! {}), }, snapshot: LogId { term: 0, index: 0 }, diff --git a/async-raft/src/raft.rs b/async-raft/src/raft.rs index 59493fc24..f44f2b049 100644 --- a/async-raft/src/raft.rs +++ b/async-raft/src/raft.rs @@ -596,7 +596,7 @@ pub enum EntryPayload { Normal(D), /// A change-membership log entry. - Membership(MembershipConfig), + Membership(Membership), } impl MessageSummary for EntryPayload { @@ -617,7 +617,7 @@ impl MessageSummary for EntryPayload { /// Unlike original raft, the membership always a joint. /// It could be a joint of one, two or more members, i.e., a quorum requires a majority of every members #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct MembershipConfig { +pub struct Membership { /// Multi configs. configs: Vec>, @@ -625,16 +625,16 @@ pub struct MembershipConfig { all_nodes: BTreeSet, } -impl MembershipConfig { +impl Membership { pub fn new_single(members: BTreeSet) -> Self { let configs = vec![members]; let all_nodes = Self::build_all_nodes(&configs); - MembershipConfig { configs, all_nodes } + Membership { configs, all_nodes } } pub fn new_multi(configs: Vec>) -> Self { let all_nodes = Self::build_all_nodes(&configs); - MembershipConfig { configs, all_nodes } + Membership { configs, all_nodes } } pub fn all_nodes(&self) -> &BTreeSet { @@ -684,14 +684,14 @@ impl MembershipConfig { // TODO(xp): rename this /// Create a new initial config containing only the given node ID. pub fn new_initial(id: NodeId) -> Self { - MembershipConfig::new_single(btreeset! {id}) + Membership::new_single(btreeset! {id}) } pub fn to_final_config(&self) -> Self { assert!(!self.configs.is_empty()); let last = self.configs.last().cloned().unwrap(); - MembershipConfig::new_single(last) + Membership::new_single(last) } /// Return true if the given set of ids constitutes a majority. @@ -881,7 +881,7 @@ impl ClientWriteRequest { } /// Generate a new payload holding a config change. - pub(crate) fn new_config(membership: MembershipConfig) -> Self { + pub(crate) fn new_config(membership: Membership) -> Self { Self::new_base(EntryPayload::Membership(membership)) } @@ -903,7 +903,7 @@ pub struct ClientWriteResponse { pub data: R, /// If the log entry is a change-membership entry. - pub membership: Option, + pub membership: Option, } impl MessageSummary for ClientWriteResponse { diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index 1aa0f527b..0047bd3d1 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -12,7 +12,7 @@ use tokio::io::AsyncWrite; use crate::core::EffectiveMembership; use crate::raft::Entry; -use crate::raft::MembershipConfig; +use crate::raft::Membership; use crate::raft_types::SnapshotId; use crate::raft_types::StateMachineChanges; use crate::AppData; @@ -27,7 +27,7 @@ pub struct SnapshotMeta { pub last_log_id: LogId, /// The latest membership configuration covered by the snapshot. - pub membership: MembershipConfig, + pub membership: Membership, /// To identify a snapshot when transferring. /// Caveat: even when two snapshot is built with the same `last_log_id`, they still could be different in bytes. @@ -89,7 +89,7 @@ impl InitialState { }, last_membership: EffectiveMembership { log_id: LogId { term: 0, index: 0 }, - membership: MembershipConfig::new_initial(id), + membership: Membership::new_initial(id), }, } } diff --git a/async-raft/tests/append_updates_membership.rs b/async-raft/tests/append_updates_membership.rs index f8951a300..a0ef646d5 100644 --- a/async-raft/tests/append_updates_membership.rs +++ b/async-raft/tests/append_updates_membership.rs @@ -5,7 +5,7 @@ use anyhow::Result; use async_raft::raft::AppendEntriesRequest; use async_raft::raft::Entry; use async_raft::raft::EntryPayload; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::AppData; use async_raft::Config; use async_raft::LogId; @@ -52,12 +52,12 @@ async fn append_updates_membership() -> Result<()> { ent(1, 1), Entry { log_id: LogId { term: 1, index: 2 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2})), }, ent(1, 3), Entry { log_id: LogId { term: 1, index: 4 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3,4})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3,4})), }, ent(1, 5), ], diff --git a/async-raft/tests/client_writes.rs b/async-raft/tests/client_writes.rs index b29787b3b..8ce34f9d8 100644 --- a/async-raft/tests/client_writes.rs +++ b/async-raft/tests/client_writes.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::Result; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::Config; use async_raft::LogId; use async_raft::State; @@ -107,7 +107,7 @@ async fn client_writes() -> Result<()> { want, Some(0), LogId { term: 1, index: want }, - Some(((5000..5100).into(), 1, MembershipConfig::new_single(btreeset! {0,1,2}))), + Some(((5000..5100).into(), 1, Membership::new_single(btreeset! {0,1,2}))), ) .await?; diff --git a/async-raft/tests/compaction.rs b/async-raft/tests/compaction.rs index c758fa5cd..e7126f76b 100644 --- a/async-raft/tests/compaction.rs +++ b/async-raft/tests/compaction.rs @@ -4,7 +4,7 @@ use anyhow::Result; use async_raft::raft::AppendEntriesRequest; use async_raft::raft::Entry; use async_raft::raft::EntryPayload; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::Config; use async_raft::LogId; use async_raft::RaftNetwork; @@ -76,7 +76,7 @@ async fn compaction() -> Result<()> { n_logs, Some(0), LogId { term: 1, index: n_logs }, - Some((n_logs.into(), 1, MembershipConfig::new_single(btreeset! {0}))), + Some((n_logs.into(), 1, Membership::new_single(btreeset! {0}))), ) .await?; @@ -107,11 +107,7 @@ async fn compaction() -> Result<()> { assert_eq!(LogId { term: 1, index: 51 }, logs[0].log_id) } - let expected_snap = Some(( - snapshot_threshold.into(), - 1, - MembershipConfig::new_single(btreeset! {0}), - )); + let expected_snap = Some((snapshot_threshold.into(), 1, Membership::new_single(btreeset! {0}))); router .assert_storage_state( 1, diff --git a/async-raft/tests/elect_compare_last_log.rs b/async-raft/tests/elect_compare_last_log.rs index aa1c9bccf..4362224c0 100644 --- a/async-raft/tests/elect_compare_last_log.rs +++ b/async-raft/tests/elect_compare_last_log.rs @@ -4,7 +4,7 @@ use std::time::Duration; use anyhow::Result; use async_raft::raft::Entry; use async_raft::raft::EntryPayload; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::storage::HardState; use async_raft::Config; use async_raft::LogId; @@ -44,7 +44,7 @@ async fn elect_compare_last_log() -> Result<()> { sto0.append_to_log(&[&Entry { log_id: LogId { term: 2, index: 1 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {0,1})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {0,1})), }]) .await?; } @@ -60,7 +60,7 @@ async fn elect_compare_last_log() -> Result<()> { sto1.append_to_log(&[ &Entry { log_id: LogId { term: 1, index: 1 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {0,1})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {0,1})), }, &Entry { log_id: LogId { term: 1, index: 2 }, diff --git a/async-raft/tests/fixtures/mod.rs b/async-raft/tests/fixtures/mod.rs index 15fafd6e5..e29210497 100644 --- a/async-raft/tests/fixtures/mod.rs +++ b/async-raft/tests/fixtures/mod.rs @@ -28,7 +28,7 @@ use async_raft::raft::Entry; use async_raft::raft::EntryPayload; use async_raft::raft::InstallSnapshotRequest; use async_raft::raft::InstallSnapshotResponse; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::raft::VoteRequest; use async_raft::raft::VoteResponse; use async_raft::storage::RaftStorage; @@ -665,7 +665,7 @@ impl RaftRouter { expect_last_log: u64, expect_voted_for: Option, expect_sm_last_applied_log: LogId, - expect_snapshot: Option<(ValueTest, u64, MembershipConfig)>, + expect_snapshot: Option<(ValueTest, u64, Membership)>, ) -> anyhow::Result<()> { let rt = self.routing_table.read().await; for (id, (_node, storage)) in rt.iter() { diff --git a/async-raft/tests/initialization.rs b/async-raft/tests/initialization.rs index 585aca473..9c591d2ea 100644 --- a/async-raft/tests/initialization.rs +++ b/async-raft/tests/initialization.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use anyhow::Result; use async_raft::raft::EntryPayload; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::Config; use async_raft::EffectiveMembership; use async_raft::LogId; @@ -70,7 +70,7 @@ async fn initialization() -> Result<()> { assert_eq!( Some(EffectiveMembership { log_id: LogId { term: 1, index: 1 }, - membership: MembershipConfig::new_single(btreeset! {0,1,2}) + membership: Membership::new_single(btreeset! {0,1,2}) }), sm_mem ); diff --git a/async-raft/tests/members_leader_fix_partial.rs b/async-raft/tests/members_leader_fix_partial.rs index 1606706bb..11026301c 100644 --- a/async-raft/tests/members_leader_fix_partial.rs +++ b/async-raft/tests/members_leader_fix_partial.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Result; use async_raft::raft::Entry; use async_raft::raft::EntryPayload; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::Config; use async_raft::LogId; use async_raft::Raft; @@ -43,7 +43,7 @@ async fn members_leader_fix_partial() -> Result<()> { term: 1, index: want + 1, }, - payload: EntryPayload::Membership(MembershipConfig::new_multi(vec![btreeset! {0}, btreeset! {0,1,2}])), + payload: EntryPayload::Membership(Membership::new_multi(vec![btreeset! {0}, btreeset! {0,1,2}])), }]) .await?; } diff --git a/async-raft/tests/snapshot_chunk_size.rs b/async-raft/tests/snapshot_chunk_size.rs index 7e1f16132..0d5f1d46a 100644 --- a/async-raft/tests/snapshot_chunk_size.rs +++ b/async-raft/tests/snapshot_chunk_size.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::Result; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::Config; use async_raft::LogId; use async_raft::SnapshotPolicy; @@ -59,7 +59,7 @@ async fn snapshot_chunk_size() -> Result<()> { router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await; want = snapshot_threshold; - let want_snap = Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))); + let want_snap = Some((want.into(), 1, Membership::new_single(btreeset! {0}))); router.wait_for_log(&btreeset![0], want, None, "send log to trigger snapshot").await?; router.wait_for_snapshot(&btreeset![0], LogId { term: 1, index: want }, None, "snapshot").await?; @@ -71,7 +71,7 @@ async fn snapshot_chunk_size() -> Result<()> { router.new_raft_node(1).await; router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); - let want_snap = Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))); + let want_snap = Some((want.into(), 1, Membership::new_single(btreeset! {0}))); router.wait_for_log(&btreeset![0, 1], want, None, "add non-voter").await?; router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, None, "").await?; diff --git a/async-raft/tests/snapshot_ge_half_threshold.rs b/async-raft/tests/snapshot_ge_half_threshold.rs index a8e1dc96a..6d8db9f81 100644 --- a/async-raft/tests/snapshot_ge_half_threshold.rs +++ b/async-raft/tests/snapshot_ge_half_threshold.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::Result; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::Config; use async_raft::LogId; use async_raft::SnapshotPolicy; @@ -73,7 +73,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> { want, Some(0), LogId { term: 1, index: want }, - Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))), + Some((want.into(), 1, Membership::new_single(btreeset! {0}))), ) .await?; } @@ -90,7 +90,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> { router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter"); router.wait_for_log(&btreeset![0, 1], want, None, "add non-voter").await?; - let expected_snap = Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))); + let expected_snap = Some((want.into(), 1, Membership::new_single(btreeset! {0}))); router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, None, "").await?; router .assert_storage_state( diff --git a/async-raft/tests/snapshot_overrides_membership.rs b/async-raft/tests/snapshot_overrides_membership.rs index 78c7fa0c6..111ff9c81 100644 --- a/async-raft/tests/snapshot_overrides_membership.rs +++ b/async-raft/tests/snapshot_overrides_membership.rs @@ -5,7 +5,7 @@ use anyhow::Result; use async_raft::raft::AppendEntriesRequest; use async_raft::raft::Entry; use async_raft::raft::EntryPayload; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::Config; use async_raft::LogId; use async_raft::RaftNetwork; @@ -77,7 +77,7 @@ async fn snapshot_overrides_membership() -> Result<()> { want, Some(0), LogId { term: 1, index: want }, - Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))), + Some((want.into(), 1, Membership::new_single(btreeset! {0}))), ) .await?; } @@ -96,7 +96,7 @@ async fn snapshot_overrides_membership() -> Result<()> { prev_log_id: LogId::new(0, 0), entries: vec![Entry { log_id: LogId { term: 1, index: 1 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {2,3})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {2,3})), }], leader_commit: 0, }; @@ -105,7 +105,7 @@ async fn snapshot_overrides_membership() -> Result<()> { tracing::info!("--- check that non-voter membership is affected"); { let m = sto.get_membership_config().await?; - assert_eq!(MembershipConfig::new_single(btreeset! {2,3}), m.membership); + assert_eq!(Membership::new_single(btreeset! {2,3}), m.membership); } } @@ -118,7 +118,7 @@ async fn snapshot_overrides_membership() -> Result<()> { router.wait_for_log(&btreeset![0, 1], want, timeout(), "add non-voter").await?; router.wait_for_snapshot(&btreeset![1], LogId { term: 1, index: want }, timeout(), "").await?; - let expected_snap = Some((want.into(), 1, MembershipConfig::new_single(btreeset! {0}))); + let expected_snap = Some((want.into(), 1, Membership::new_single(btreeset! {0}))); router .assert_storage_state( @@ -132,7 +132,7 @@ async fn snapshot_overrides_membership() -> Result<()> { let m = sto.get_membership_config().await?; assert_eq!( - MembershipConfig::new_single(btreeset! {0}), + Membership::new_single(btreeset! {0}), m.membership, "membership should be overridden by the snapshot" ); diff --git a/async-raft/tests/snapshot_uses_prev_snap_membership.rs b/async-raft/tests/snapshot_uses_prev_snap_membership.rs index 929a740cc..157d1c3cc 100644 --- a/async-raft/tests/snapshot_uses_prev_snap_membership.rs +++ b/async-raft/tests/snapshot_uses_prev_snap_membership.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::Config; use async_raft::LogId; use async_raft::MessageSummary; @@ -87,11 +87,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { assert_eq!(2, logs.len(), "only one applied log is kept"); } let m = sto0.get_membership_config().await?; - assert_eq!( - MembershipConfig::new_single(btreeset! {0,1}), - m.membership, - "membership " - ); + assert_eq!(Membership::new_single(btreeset! {0,1}), m.membership, "membership "); // TODO(xp): this assertion fails because when change-membership, a append-entries request does not update // voted_for and does not call save_hard_state. @@ -127,11 +123,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { assert_eq!(2, logs.len(), "only one applied log"); } let m = sto0.get_membership_config().await?; - assert_eq!( - MembershipConfig::new_single(btreeset! {0,1}), - m.membership, - "membership " - ); + assert_eq!(Membership::new_single(btreeset! {0,1}), m.membership, "membership "); } Ok(()) diff --git a/async-raft/tests/state_machien_apply_membership.rs b/async-raft/tests/state_machien_apply_membership.rs index e79354344..ec2b47e6f 100644 --- a/async-raft/tests/state_machien_apply_membership.rs +++ b/async-raft/tests/state_machien_apply_membership.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::Result; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::Config; use async_raft::EffectiveMembership; use async_raft::LogId; @@ -53,7 +53,7 @@ async fn state_machine_apply_membership() -> Result<()> { assert_eq!( Some(EffectiveMembership { log_id: LogId { term: 1, index: 1 }, - membership: MembershipConfig::new_single(btreeset! {0}) + membership: Membership::new_single(btreeset! {0}) }), sto.last_applied_state().await?.1 ); @@ -95,7 +95,7 @@ async fn state_machine_apply_membership() -> Result<()> { assert_eq!( Some(EffectiveMembership { log_id: LogId { term: 1, index: 3 }, - membership: MembershipConfig::new_single(btreeset! {0,1,2}) + membership: Membership::new_single(btreeset! {0,1,2}) }), last_membership ); diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 1d05d1473..7fa4dad21 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -17,7 +17,7 @@ use std::sync::Mutex; use async_raft::async_trait::async_trait; use async_raft::raft::Entry; use async_raft::raft::EntryPayload; -use async_raft::raft::MembershipConfig; +use async_raft::raft::Membership; use async_raft::storage::HardState; use async_raft::storage::InitialState; use async_raft::storage::Snapshot; @@ -207,7 +207,7 @@ impl MemStore { Some(x) => x, None => EffectiveMembership { log_id: LogId { term: 0, index: 0 }, - membership: MembershipConfig::new_initial(self.id), + membership: Membership::new_initial(self.id), }, }) } @@ -400,11 +400,8 @@ impl RaftStorage for MemStore { .map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, e.into()))?; last_applied_log = sm.last_applied_log; - membership_config = sm - .last_membership - .clone() - .map(|x| x.membership) - .unwrap_or_else(|| MembershipConfig::new_initial(self.id)); + membership_config = + sm.last_membership.clone().map(|x| x.membership).unwrap_or_else(|| Membership::new_initial(self.id)); } let snapshot_size = data.len(); diff --git a/memstore/src/test.rs b/memstore/src/test.rs index ba9b2cff1..3e3647d87 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -114,7 +114,7 @@ where let membership = store.get_membership_config().await?; - assert_eq!(MembershipConfig::new_single(btreeset! {NODE_ID}), membership.membership,); + assert_eq!(Membership::new_single(btreeset! {NODE_ID}), membership.membership,); Ok(()) } @@ -132,14 +132,14 @@ where }, &Entry { log_id: LogId { term: 1, index: 2 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {3,4,5})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {3,4,5})), }, ]) .await?; let mem = store.get_membership_config().await?; - assert_eq!(MembershipConfig::new_single(btreeset! {3,4,5}), mem.membership,); + assert_eq!(Membership::new_single(btreeset! {3,4,5}), mem.membership,); } tracing::info!("--- membership presents in log, but smaller than last_applied, read from state machine"); @@ -147,13 +147,13 @@ where store .append_to_log(&[&Entry { log_id: (1, 1).into(), - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3})), }]) .await?; let mem = store.get_membership_config().await?; - assert_eq!(MembershipConfig::new_single(btreeset! {3, 4, 5}), mem.membership,); + assert_eq!(Membership::new_single(btreeset! {3, 4, 5}), mem.membership,); } tracing::info!("--- membership presents in log and > sm.last_applied, read from log"); @@ -161,13 +161,13 @@ where store .append_to_log(&[&Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3})), }]) .await?; let mem = store.get_membership_config().await?; - assert_eq!(MembershipConfig::new_single(btreeset! {1,2,3},), mem.membership,); + assert_eq!(Membership::new_single(btreeset! {1,2,3},), mem.membership,); } Ok(()) @@ -195,7 +195,7 @@ where ); assert_eq!( - MembershipConfig::new_single(btreeset! {NODE_ID}), + Membership::new_single(btreeset! {NODE_ID}), initial.last_membership.membership, ); @@ -265,7 +265,7 @@ where }, &Entry { log_id: LogId { term: 1, index: 2 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {3,4,5})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {3,4,5})), }, ]) .await?; @@ -273,7 +273,7 @@ where let initial = store.get_initial_state().await?; assert_eq!( - MembershipConfig::new_single(btreeset! {3,4,5}), + Membership::new_single(btreeset! {3,4,5}), initial.last_membership.membership, ); } @@ -283,14 +283,14 @@ where store .append_to_log(&[&Entry { log_id: (1, 1).into(), - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3})), }]) .await?; let initial = store.get_initial_state().await?; assert_eq!( - MembershipConfig::new_single(btreeset! {3,4,5}), + Membership::new_single(btreeset! {3,4,5}), initial.last_membership.membership, ); } @@ -300,14 +300,14 @@ where store .append_to_log(&[&Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3})), }]) .await?; let initial = store.get_initial_state().await?; assert_eq!( - MembershipConfig::new_single(btreeset! {1,2,3}), + Membership::new_single(btreeset! {1,2,3}), initial.last_membership.membership, ); } @@ -611,7 +611,7 @@ where store .apply_to_state_machine(&[&Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2})), }]) .await?; @@ -620,7 +620,7 @@ where assert_eq!( Some(EffectiveMembership { log_id: LogId { term: 1, index: 3 }, - membership: MembershipConfig::new_single(btreeset! {1,2}) + membership: Membership::new_single(btreeset! {1,2}) }), membership ); @@ -640,7 +640,7 @@ where assert_eq!( Some(EffectiveMembership { log_id: LogId { term: 1, index: 3 }, - membership: MembershipConfig::new_single(btreeset! {1,2}) + membership: Membership::new_single(btreeset! {1,2}) }), membership ); @@ -908,7 +908,7 @@ where }, &Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3})), }, ]) .await?; @@ -920,7 +920,7 @@ where }, &Entry { log_id: LogId { term: 2, index: 2 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {3,4,5})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {3,4,5})), }, ]) .await?; @@ -958,7 +958,7 @@ where }, &Entry { log_id: LogId { term: 1, index: 3 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {1,2,3})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {1,2,3})), }, ]) .await?; @@ -971,7 +971,7 @@ where }, &Entry { log_id: LogId { term: 2, index: 2 }, - payload: EntryPayload::Membership(MembershipConfig::new_single(btreeset! {3,4,5})), + payload: EntryPayload::Membership(Membership::new_single(btreeset! {3,4,5})), }, ]) .await?;