Skip to content

Commit

Permalink
change: MembershipConfig.member type is changed form HashSet BTreeSet
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Aug 16, 2021
1 parent 0d6f6de commit 8b59966
Show file tree
Hide file tree
Showing 31 changed files with 198 additions and 195 deletions.
7 changes: 4 additions & 3 deletions async-raft/src/core/admin.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::collections::HashSet;

use futures::future::FutureExt;
Expand Down Expand Up @@ -29,7 +30,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn handle_init_with_config(
&mut self,
mut members: HashSet<NodeId>,
mut members: BTreeSet<NodeId>,
) -> Result<(), InitializeError> {
if self.core.last_log_id.index != 0 || self.core.current_term != 0 {
tracing::error!({self.core.last_log_id.index, self.core.current_term}, "rejecting init_with_config request as last_log_index or current_term is 0");
Expand Down Expand Up @@ -97,7 +98,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

#[tracing::instrument(level = "trace", skip(self, tx))]
pub(super) async fn change_membership(&mut self, members: HashSet<NodeId>, tx: ChangeMembershipTx) {
pub(super) async fn change_membership(&mut self, members: BTreeSet<NodeId>, tx: ChangeMembershipTx) {
// Ensure cluster will have at least one node.
if members.is_empty() {
let _ = tx.send(Err(ChangeConfigError::InoperableConfig));
Expand Down Expand Up @@ -219,7 +220,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.all_nodes()
.into_iter()
.filter(|elem| elem != &self.core.id)
.collect::<HashSet<_>>();
.collect::<BTreeSet<_>>();

let old_node_ids = self.core.membership.members.clone();
let node_ids_to_add = new_node_ids.difference(&old_node_ids);
Expand Down
3 changes: 2 additions & 1 deletion async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) mod replication;
mod vote;

use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -778,7 +779,7 @@ pub enum ConsensusState {
/// The set of non-voters nodes which are still being synced.
awaiting: HashSet<NodeId>,
/// The full membership change which has been proposed.
members: HashSet<NodeId>,
members: BTreeSet<NodeId>,
/// The response channel to use once the consensus state is back into uniform state.
tx: ChangeMembershipTx,
},
Expand Down
6 changes: 3 additions & 3 deletions async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::BTreeSet;

use tokio::sync::oneshot;

Expand Down Expand Up @@ -221,7 +221,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
std::cmp::min(c0_index, c1_index)
}

fn calc_members_commit_index(&self, mem: &HashSet<NodeId>, msg: &str) -> u64 {
fn calc_members_commit_index(&self, mem: &BTreeSet<NodeId>, msg: &str) -> u64 {
let log_ids = self.get_match_log_ids(mem);
tracing::debug!("{} matched log_ids: {:?}", msg, log_ids);

Expand All @@ -232,7 +232,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

/// Extract the matching index/term of the replication state of specified nodes.
fn get_match_log_ids(&self, node_ids: &HashSet<NodeId>) -> Vec<LogId> {
fn get_match_log_ids(&self, node_ids: &BTreeSet<NodeId>) -> Vec<LogId> {
tracing::debug!("to get match log ids of nodes: {:?}", node_ids);

let mut rst = Vec::with_capacity(node_ids.len());
Expand Down
6 changes: 3 additions & 3 deletions async-raft/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
//! Metrics are observed on a running Raft node via the `Raft::metrics()` method, which will
//! return a stream of metrics.

use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;

use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Wait {

/// Wait for `membership_config.members` to become expected node set or timeout.
#[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn members(&self, want_members: HashSet<NodeId>, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
pub async fn members(&self, want_members: BTreeSet<NodeId>, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
self.metrics(
|x| x.membership_config.members == want_members,
&format!("{} .membership_config.members -> {:?}", msg.to_string(), want_members),
Expand All @@ -183,7 +183,7 @@ impl Wait {
#[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn next_members(
&self,
want_members: Option<HashSet<NodeId>>,
want_members: Option<BTreeSet<NodeId>>,
msg: impl ToString,
) -> Result<RaftMetrics, WaitError> {
self.metrics(
Expand Down
14 changes: 7 additions & 7 deletions async-raft/src/metrics_wait_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use maplit::hashset;
use maplit::btreeset;
use tokio::sync::watch;
use tokio::time::sleep;

Expand Down Expand Up @@ -73,14 +73,14 @@ async fn test_wait() -> anyhow::Result<()> {
let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.membership_config.members = hashset![1, 2];
update.membership_config.members = btreeset![1, 2];
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.members(hashset![1, 2], "members").await?;
let got = w.members(btreeset![1, 2], "members").await?;
h.await?;

assert_eq!(hashset![1, 2], got.membership_config.members);
assert_eq!(btreeset![1, 2], got.membership_config.members);
}

{
Expand All @@ -90,14 +90,14 @@ async fn test_wait() -> anyhow::Result<()> {
let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.membership_config.members_after_consensus = Some(hashset![1, 2]);
update.membership_config.members_after_consensus = Some(btreeset![1, 2]);
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.next_members(Some(hashset![1, 2]), "next_members").await?;
let got = w.next_members(Some(btreeset![1, 2]), "next_members").await?;
h.await?;

assert_eq!(Some(hashset![1, 2]), got.membership_config.members_after_consensus);
assert_eq!(Some(btreeset![1, 2]), got.membership_config.members_after_consensus);
}

tracing::info!("--- wait for snapshot, Ok");
Expand Down
18 changes: 9 additions & 9 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Public Raft interface and data types.

use std::collections::HashSet;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -214,7 +214,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// free, and Raft guarantees that the first node to become the cluster leader will propagate
/// only its own config.
#[tracing::instrument(level = "debug", skip(self))]
pub async fn initialize(&self, members: HashSet<NodeId>) -> Result<(), InitializeError> {
pub async fn initialize(&self, members: BTreeSet<NodeId>) -> Result<(), InitializeError> {
let (tx, rx) = oneshot::channel();
self.inner.tx_api.send(RaftMsg::Initialize { members, tx }).map_err(|_| RaftError::ShuttingDown)?;
rx.await.map_err(|_| InitializeError::RaftError(RaftError::ShuttingDown)).and_then(|res| res)
Expand Down Expand Up @@ -251,7 +251,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// If this Raft node is not the cluster leader, then the proposed configuration change will be
/// rejected.
#[tracing::instrument(level = "debug", skip(self))]
pub async fn change_membership(&self, members: HashSet<NodeId>) -> Result<(), ChangeConfigError> {
pub async fn change_membership(&self, members: BTreeSet<NodeId>) -> Result<(), ChangeConfigError> {
let (tx, rx) = oneshot::channel();
self.inner
.tx_api
Expand Down Expand Up @@ -339,15 +339,15 @@ pub(crate) enum RaftMsg<D: AppData, R: AppDataResponse> {
tx: ClientReadResponseTx,
},
Initialize {
members: HashSet<NodeId>,
members: BTreeSet<NodeId>,
tx: oneshot::Sender<Result<(), InitializeError>>,
},
AddNonVoter {
id: NodeId,
tx: ChangeMembershipTx,
},
ChangeMembership {
members: HashSet<NodeId>,
members: BTreeSet<NodeId>,
tx: ChangeMembershipTx,
},
}
Expand Down Expand Up @@ -484,16 +484,16 @@ pub struct EntrySnapshotPointer {
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MembershipConfig {
/// All members of the Raft cluster.
pub members: HashSet<NodeId>,
pub members: BTreeSet<NodeId>,
/// All members of the Raft cluster after joint consensus is finalized.
///
/// The presence of a value here indicates that the config is in joint consensus.
pub members_after_consensus: Option<HashSet<NodeId>>,
pub members_after_consensus: Option<BTreeSet<NodeId>>,
}

impl MembershipConfig {
/// Get an iterator over all nodes in the current config.
pub fn all_nodes(&self) -> HashSet<u64> {
pub fn all_nodes(&self) -> BTreeSet<u64> {
let mut all = self.members.clone();
if let Some(members) = &self.members_after_consensus {
all.extend(members);
Expand All @@ -520,7 +520,7 @@ impl MembershipConfig {

/// Create a new initial config containing only the given node ID.
pub fn new_initial(id: NodeId) -> Self {
let mut members = HashSet::new();
let mut members = BTreeSet::new();
members.insert(id);
Self {
members,
Expand Down
14 changes: 7 additions & 7 deletions async-raft/tests/add_remove_voter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -7,7 +7,7 @@ use async_raft::Config;
use async_raft::State;
use fixtures::RaftRouter;
use futures::stream::StreamExt;
use maplit::hashset;
use maplit::btreeset;

mod fixtures;

Expand All @@ -19,7 +19,7 @@ mod fixtures;
/// - add 4 non-voter as follower.
/// - asserts that the leader was able to successfully commit logs and that the followers has successfully replicated
/// the payload.
/// - remove one folower: node-4
/// - remove one follower: node-4
/// - asserts node-4 becomes non-voter and the leader stops sending logs to it.
///
/// RUST_LOG=async_raft,memstore,add_remove_voter=trace cargo test -p async-raft --test add_remove_voter
Expand All @@ -28,8 +28,8 @@ async fn add_remove_voter() -> Result<()> {
fixtures::init_tracing();

let timeout = Duration::from_millis(500);
let all_members = hashset![0, 1, 2, 3, 4];
let left_members = hashset![0, 1, 2, 3];
let all_members = btreeset![0, 1, 2, 3, 4];
let left_members = btreeset![0, 1, 2, 3];

// Setup test dependencies.
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
Expand Down Expand Up @@ -62,7 +62,7 @@ async fn add_remove_voter() -> Result<()> {
router.initialize_from_single_node(0).await?;
want = 1;

wait_log(router.clone(), &hashset![0], want).await?;
wait_log(router.clone(), &btreeset![0], want).await?;
router.assert_stable_cluster(Some(1), Some(want)).await;

// Sync some new nodes.
Expand Down Expand Up @@ -123,7 +123,7 @@ async fn add_remove_voter() -> Result<()> {
Ok(())
}

async fn wait_log(router: std::sync::Arc<fixtures::RaftRouter>, node_ids: &HashSet<u64>, want_log: u64) -> Result<()> {
async fn wait_log(router: std::sync::Arc<fixtures::RaftRouter>, node_ids: &BTreeSet<u64>, want_log: u64) -> Result<()> {
let timeout = Duration::from_millis(500);
for i in node_ids.iter() {
router
Expand Down
8 changes: 4 additions & 4 deletions async-raft/tests/api_install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use async_raft::LogId;
use async_raft::SnapshotMeta;
use async_raft::State;
use fixtures::RaftRouter;
use maplit::hashset;
use maplit::btreeset;

/// API test: install_snapshot with various condition.
///
Expand All @@ -33,13 +33,13 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
{
router.new_raft_node(0).await;

router.wait_for_log(&hashset![0], want, None, "empty").await?;
router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?;
router.wait_for_log(&btreeset![0], want, None, "empty").await?;
router.wait_for_state(&btreeset![0], State::NonVoter, None, "empty").await?;

router.initialize_from_single_node(0).await?;
want += 1;

router.wait_for_log(&hashset![0], want, None, "init leader").await?;
router.wait_for_log(&btreeset![0], want, None, "init leader").await?;
router.assert_stable_cluster(Some(1), Some(want)).await;
}

Expand Down
8 changes: 4 additions & 4 deletions async-raft/tests/client_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::Result;
use async_raft::Config;
use async_raft::State;
use fixtures::RaftRouter;
use maplit::hashset;
use maplit::btreeset;

/// Client read tests.
///
Expand All @@ -31,16 +31,16 @@ async fn client_reads() -> Result<()> {
let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
router.wait_for_log(&hashset![0, 1, 2], want, None, "empty node").await?;
router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty node").await?;
router.wait_for_log(&btreeset![0, 1, 2], want, None, "empty node").await?;
router.wait_for_state(&btreeset![0, 1, 2], State::NonVoter, None, "empty node").await?;
router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
want += 1;

router.wait_for_log(&hashset![0, 1, 2], want, None, "init leader").await?;
router.wait_for_log(&btreeset![0, 1, 2], want, None, "init leader").await?;
router.assert_stable_cluster(Some(1), Some(1)).await;

// Get the ID of the leader, and assert that client_read succeeds.
Expand Down
14 changes: 7 additions & 7 deletions async-raft/tests/client_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_raft::LogId;
use async_raft::State;
use fixtures::RaftRouter;
use futures::prelude::*;
use maplit::hashset;
use maplit::btreeset;

mod fixtures;

Expand All @@ -34,17 +34,17 @@ async fn client_writes() -> Result<()> {
let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
router.wait_for_log(&hashset![0, 1, 2], want, None, "empty").await?;
router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, None, "empty").await?;
router.wait_for_log(&btreeset![0, 1, 2], want, None, "empty").await?;
router.wait_for_state(&btreeset![0, 1, 2], State::NonVoter, None, "empty").await?;
router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
want += 1;

router.wait_for_log(&hashset![0, 1, 2], want, None, "leader init log").await?;
router.wait_for_state(&hashset![0], State::Leader, None, "init").await?;
router.wait_for_log(&btreeset![0, 1, 2], want, None, "leader init log").await?;
router.wait_for_state(&btreeset![0], State::Leader, None, "init").await?;

router.assert_stable_cluster(Some(1), Some(want)).await;

Expand All @@ -60,7 +60,7 @@ async fn client_writes() -> Result<()> {
while clients.next().await.is_some() {}

want = 6001;
router.wait_for_log(&hashset![0, 1, 2], want, None, "sync logs").await?;
router.wait_for_log(&btreeset![0, 1, 2], want, None, "sync logs").await?;

router.assert_stable_cluster(Some(1), Some(want)).await; // The extra 1 is from the leader's initial commit entry.
router
Expand All @@ -70,7 +70,7 @@ async fn client_writes() -> Result<()> {
Some(0),
LogId { term: 1, index: want },
Some(((5000..5100).into(), 1, MembershipConfig {
members: hashset![0, 1, 2],
members: btreeset![0, 1, 2],
members_after_consensus: None,
})),
)
Expand Down
Loading

0 comments on commit 8b59966

Please sign in to comment.