Skip to content

Commit

Permalink
fix: leader should not commit when there is no replication to voters.
Browse files Browse the repository at this point in the history
When there is no replication to voters but there are replications to
non-voters, the leader did not check non-voters for a quorum but just
commits a log at once.

This cause the membership change log from a single node always commits.
E.g. start node 0, and non-voter 1, 2; then `change_membership({0, 1, 2})`,
It just commits the joint-log at once.
But according to raft paper, it should await a quorum of {0} and a
quorum of {0, 1, 2}.
  • Loading branch information
drmingdrmer committed Aug 17, 2021
1 parent dff6260 commit beb0302
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 8 deletions.
19 changes: 12 additions & 7 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,23 +278,28 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Replicate the request if there are other cluster members. The client response will be
// returned elsewhere after the entry has been committed to the cluster.
let entry_arc = req.entry.clone();

if self.nodes.is_empty() && self.non_voters.is_empty() {
// Else, there are no voting nodes for replication, so the payload is now committed.
self.core.commit_index = entry_arc.log_id.index;
self.leader_report_metrics();
self.client_request_post_commit(req).await;
return;
}

self.awaiting_committed.push(req);

if !self.nodes.is_empty() {
self.awaiting_committed.push(req);
for node in self.nodes.values() {
let _ = node.replstream.repl_tx.send(RaftEvent::Replicate {
entry: entry_arc.clone(),
commit_index: self.core.commit_index,
});
}
} else {
// Else, there are no voting nodes for replication, so the payload is now committed.
self.core.commit_index = entry_arc.log_id.index;
self.leader_report_metrics();
self.client_request_post_commit(req).await;
}

// Replicate to non-voters.
if !self.non_voters.is_empty() {
// Replicate to non-voters.
for node in self.non_voters.values() {
let _ = node.state.replstream.repl_tx.send(RaftEvent::Replicate {
entry: entry_arc.clone(),
Expand Down
2 changes: 1 addition & 1 deletion async-raft/tests/leader_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use async_raft::ReplicationMetrics;
use async_raft::State;
use fixtures::RaftRouter;
use futures::stream::StreamExt;
use maplit::hashmap;
use maplit::btreeset;
use maplit::hashmap;

mod fixtures;

Expand Down
80 changes: 80 additions & 0 deletions async-raft/tests/members_0_to_012.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::sync::Arc;

use anyhow::Result;
use async_raft::Config;
use fixtures::RaftRouter;
use futures::stream::StreamExt;
use maplit::btreeset;

mod fixtures;

/// A leader must wait for non-voter to commit member-change from [0] to [0,1,2].
/// There is a bug that leader commit a member change log directly because it only checks
/// the replication of voters and believes itself is the only voter.
///
/// - Init 1 leader and 2 non-voter.
/// - Isolate non-voter.
/// - Asserts that membership change won't success.
///
/// RUST_LOG=async_raft,memstore,members_0_to_012=trace cargo test -p async-raft --test members_0_to_012
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
async fn members_0_to_012() -> Result<()> {
fixtures::init_tracing();

// Setup test dependencies.
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;

// Assert all nodes are in non-voter state & have no entries.
let want;

// router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
want = 1;

router.wait_for_log(&btreeset![0], want, None, "init node 0").await?;

// Sync some new nodes.
router.new_raft_node(1).await;
router.new_raft_node(2).await;

tracing::info!("--- adding new nodes to cluster");
let mut new_nodes = futures::stream::FuturesUnordered::new();
new_nodes.push(router.add_non_voter(0, 1));
new_nodes.push(router.add_non_voter(0, 2));
while let Some(inner) = new_nodes.next().await {
inner?;
}

router.wait_for_log(&btreeset![0], want, None, "init node 0").await?;

tracing::info!("--- isolate node 1,2, so that membership [0,1,2] wont commit");

router.isolate_node(1).await;
router.isolate_node(2).await;

tracing::info!("--- changing cluster config, should timeout");

tokio::spawn({
let router = router.clone();
async move {
let _x = router.change_membership(0, btreeset! {0,1,2}).await;
}
});

let res = router
.wait_for_metrics(
&0,
|x| x.last_applied > want,
None,
"the next joint log should not commit",
)
.await;
assert!(res.is_err(), "joint log should not commit");

Ok(())
}

0 comments on commit beb0302

Please sign in to comment.