Skip to content

Commit

Permalink
fix #112 : when a follower is removed, leader should stops sending lo…
Browse files Browse the repository at this point in the history
…g to it.

A leader adds all follower replication states to a hashset `nodes`, when
the leader is established.
But the leader does not do it when membership changed.
Thus when a follower is removed, the leader can not stop replication to
it because the follower is not in `nodes`.

The solution is to move replication state from `non_voters` to `nodes`.
So that next time a follower is removed the leader is able to remove the
replication from `nodes`.
  • Loading branch information
drmingdrmer committed Jun 1, 2021
1 parent f449b64 commit 6d68048
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ jobs:
args: -p async-raft --lib

# integration tests
- name: Integration Test | Add remove Voter
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test add_remove_voter
- name: Integration Test | Initialization
uses: actions-rs/cargo@v1
with:
Expand Down
54 changes: 52 additions & 2 deletions async-raft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,58 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn handle_joint_consensus_committed(&mut self) -> Result<(), RaftError> {
if let ConsensusState::Joint { is_committed, .. } = &mut self.consensus_state {
*is_committed = true; // Mark as comitted.
*is_committed = true; // Mark as committed.
}
// Only proceed to finalize this joint consensus if there are no remaining nodes being synced.
if self.consensus_state.is_joint_consensus_safe_to_finalize() {
self.update_replication_state().await?;
self.finalize_joint_consensus().await?;
}
Ok(())
}

/// Finalize the comitted joint consensus.
/// When the joint membership is committed(not the uniform membership),
/// a new added node turns from a NonVoter to a Follower.
/// Thus we need to move replication state from `non_voters` to `nodes`.
///
/// There are two place in this code base where `nodes` are changed:
/// - When a leader is established it adds all node_id found in `membership` to `nodes`.
/// - When membership change is committed, i.e., a joint membership or a uniform membership.
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn update_replication_state(&mut self) -> Result<(), RaftError> {

let new_node_ids = self
.core
.membership
.all_nodes()
.into_iter()
.filter(|elem| elem != &self.core.id)
.collect::<HashSet<_>>();

let old_node_ids = self.core.membership.members.clone();
let node_ids_to_add = new_node_ids.difference(&old_node_ids);

// move replication state from non_voters to nodes.
for node_id in node_ids_to_add {

if !self.non_voters.contains_key(node_id) {
// Just a probe for bug
panic!("joint membership contains node_id:{} not in non_voters:{:?}", node_id, self.non_voters.keys().collect::<Vec<_>>());
}

if self.nodes.contains_key(node_id) {
// Just a probe for bug
panic!("joint membership contains an existent node_id:{} in nodes:{:?}", node_id,self.nodes.keys().collect::<Vec<_>>());
}

let non_voter_state = self.non_voters.remove(node_id).unwrap();
self.nodes.insert(*node_id, non_voter_state.state);
}

Ok(())
}

/// Finalize the committed joint consensus.
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn finalize_joint_consensus(&mut self) -> Result<(), RaftError> {
// Only proceed if it is safe to do so.
Expand Down Expand Up @@ -256,6 +298,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
})
.collect();

let follower_ids:Vec<u64> = self.nodes.keys().cloned().collect();
let non_voter_ids:Vec<u64> = self.non_voters.keys().cloned().collect();
tracing::debug!("nodes: {:?}", follower_ids);
tracing::debug!("non_voters: {:?}", non_voter_ids);
tracing::debug!("membership: {:?}", self.core.membership);
tracing::debug!("nodes_to_remove: {:?}", nodes_to_remove);

for node in nodes_to_remove {
tracing::debug!({ target = node }, "removing target node from replication pool");
if let Some(node) = self.nodes.remove(&node) {
Expand Down
143 changes: 143 additions & 0 deletions async-raft/tests/add_remove_voter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use futures::stream::StreamExt;
use maplit::hashset;

use async_raft::Config;
use async_raft::State;
use fixtures::RaftRouter;

mod fixtures;

/// Cluster add_remove_voter test.
///
/// What does this test do?
///
/// - brings 5 nodes online: one leader and 4 non-voter.
/// - add 4 non-voter as follower.
/// - asserts that the leader was able to successfully commit logs and that the followers has
/// successfully replicated the payload.
/// - remove one folower: node-4
/// - asserts node-4 becomes non-voter and the leader stops sending logs to it.
///
/// RUST_LOG=async_raft,memstore,add_remove_voter=trace cargo test -p async-raft --test add_remove_voter
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
async fn add_remove_voter() -> Result<()> {
fixtures::init_tracing();

let timeout = Duration::from_millis(500);
let all_members = hashset![0, 1, 2, 3, 4];
let left_members = hashset![0, 1, 2, 3];

// Setup test dependencies.
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;

// Assert all nodes are in non-voter state & have no entries.
let mut want = 0;
router
.wait_for_metrics(&0u64, |x| x.last_log_index == want, timeout, &format!("n{}.last_log_index -> {}", 0, 0))
.await?;
router
.wait_for_metrics(
&0u64,
|x| x.state == State::NonVoter,
timeout,
&format!("n{}.state -> {:?}", 4, State::NonVoter),
)
.await?;

router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
want = 1;

wait_log(router.clone(), &hashset![0], want).await?;
router.assert_stable_cluster(Some(1), Some(want)).await;

// Sync some new nodes.
router.new_raft_node(1).await;
router.new_raft_node(2).await;
router.new_raft_node(3).await;
router.new_raft_node(4).await;

tracing::info!("--- adding new nodes to cluster");
let mut new_nodes = futures::stream::FuturesUnordered::new();
new_nodes.push(router.add_non_voter(0, 1));
new_nodes.push(router.add_non_voter(0, 2));
new_nodes.push(router.add_non_voter(0, 3));
new_nodes.push(router.add_non_voter(0, 4));
while let Some(inner) = new_nodes.next().await {
inner?;
}

wait_log(router.clone(), &all_members, want).await?;

tracing::info!("--- changing cluster config");
router.change_membership(0, all_members.clone()).await?;
want += 2; // 2 member-change logs

wait_log(router.clone(), &all_members, want).await?;
router.assert_stable_cluster(Some(1), Some(want)).await; // Still in term 1, so leader is still node 0.

// Send some requests
router.client_request_many(0, "client", 100).await;
want += 100;

wait_log(router.clone(), &all_members, want).await?;

// Remove Node 4
tracing::info!("--- remove n{}", 4);
router.change_membership(0, left_members.clone()).await?;
want += 2; // two member-change logs

wait_log(router.clone(), &left_members, want).await?;
router
.wait_for_metrics(
&4u64,
|x| x.state == State::NonVoter,
timeout,
&format!("n{}.state -> {:?}", 4, State::NonVoter),
)
.await?;

// Send some requests
router.client_request_many(0, "client", 100).await;
want += 100;

wait_log(router.clone(), &left_members, want).await?;

// log will not be sync to removed node
let x = router.latest_metrics().await;
assert!(x[4].last_log_index < want);
Ok(())
}

async fn wait_log(router: std::sync::Arc<fixtures::RaftRouter>, node_ids: &HashSet<u64>, want_log: u64) -> Result<()> {
let timeout = Duration::from_millis(500);
for i in node_ids.iter() {
router
.wait_for_metrics(
&i,
|x| x.last_log_index == want_log,
timeout,
&format!("n{}.last_log_index -> {}", i, want_log),
)
.await?;
router
.wait_for_metrics(
&i,
|x| x.last_applied == want_log,
timeout,
&format!("n{}.last_applied -> {}", i, want_log),
)
.await?;
}
Ok(())
}

0 comments on commit 6d68048

Please sign in to comment.