Skip to content

Commit

Permalink
fix: discarded log in replication_buffer should be finally sent.
Browse files Browse the repository at this point in the history
Internally when replication goes to LaggingState(a non-leader lacks a lot logs), the
ReplicationCore purges `outbound_buffer` and `replication_buffer` and then sends all
**committed** logs found in storage.

Thus if there are uncommitted logs in `replication_buffer`, these log will never have chance to
be replicated, even when replication goes back to LineRateState.
Since LineRateState only replicates logs from `ReplicationCore.outbound_buffer` and
`ReplicationCore.replication_buffer`.

This test ensures that when replication goes to LineRateState, it tries to re-send all logs
found in storage(including those that are removed from the two buffers.
  • Loading branch information
drmingdrmer committed Jun 1, 2021
1 parent d11a278 commit f449b64
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ jobs:
with:
command: test
args: -p async-raft --test singlenode
- name: Integration Test | Concurrent writing log and adding non-voter
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test concurrent_write_and_add_non_voter
- name: Integration Test | Dynamic Membership
uses: actions-rs/cargo@v1
with:
Expand Down
32 changes: 22 additions & 10 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,21 +532,33 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.first()
.map(|entry| entry.as_ref().index)
.or_else(|| self.core.replication_buffer.first().map(|entry| entry.index));
if let Some(index) = next_buf_index {
// Ensure that our buffered data matches up with `next_index`. When transitioning to
// line rate, it is always possible that new data has been sent for replication but has
// skipped this replication stream during transition. In such cases, a single update from
// storage will put this stream back on track.
if self.core.next_index != index {
self.frontload_outbound_buffer(self.core.next_index, index).await;
if self.core.target_state != TargetReplState::LineRate {
return;
}

// When converting to `LaggingState`, `outbound_buffer` and `replication_buffer` is cleared,
// in which there may be uncommitted logs.
// Thus when converting back to `LineRateState`, when these two buffers are empty, we
// need to resend all unommitted logs.
// Otherwise these logs have no chance to be replicated, unless a new log is written.
let index = match next_buf_index {
Some(i) => {i}
None => {
self.core.last_log_index+1
}
};

// Ensure that our buffered data matches up with `next_index`. When transitioning to
// line rate, it is always possible that new data has been sent for replication but has
// skipped this replication stream during transition. In such cases, a single update from
// storage will put this stream back on track.
if self.core.next_index != index {
self.frontload_outbound_buffer(self.core.next_index, index).await;
if self.core.target_state != TargetReplState::LineRate {
return;
}

self.core.send_append_entries().await;
continue;
}

tokio::select! {
_ = self.core.heartbeat.tick() => self.core.send_append_entries().await,
event = self.core.raftrx.recv() => match event {
Expand Down
128 changes: 128 additions & 0 deletions async-raft/tests/concurrent_write_and_add_non_voter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::sync::Arc;
use std::time::Duration;
use std::collections::HashSet;

use anyhow::Result;
use maplit::hashset;

use async_raft::Config;
use async_raft::State;
use fixtures::RaftRouter;

mod fixtures;

/// Cluster concurrent_write_and_add_non_voter test.
///
/// Internally when replication goes to LaggingState(a non-leader lacks a lot logs), the
/// ReplicationCore purges `outbound_buffer` and `replication_buffer` and then sends all
/// **committed** logs found in storage.
///
/// Thus if there are uncommitted logs in `replication_buffer`, these log will never have chance to
/// be replicated, even when replication goes back to LineRateState.
/// Since LineRateState only replicates logs from `ReplicationCore.outbound_buffer` and
/// `ReplicationCore.replication_buffer`.
///
/// This test ensures that when replication goes to LineRateState, it tries to re-send all logs
/// found in storage(including those that are removed from the two buffers.
///
/// NOTE: this test needs a multi nodes cluster because a single-node cluster will commit a log
/// at once.
///
///
/// What does this test do?
///
/// - brings a 3 candidates cluster online.
/// - add another non-voter and at the same time write a log.
/// - asserts that all of the leader, followers and the non-voter receives all logs.
///
/// RUST_LOG=async_raft,memstore,concurrent_write_and_add_non_voter=trace cargo test -p async-raft --test concurrent_write_and_add_non_voter
#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
async fn concurrent_write_and_add_non_voter() -> Result<()> {
fixtures::init_tracing();

let timeout = Duration::from_millis(500);
let candidates = hashset![0,1,2];

// Setup test dependencies.
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));

router.new_raft_node(0).await;

let mut want;

tracing::info!("--- initializing cluster of 1 node");
{
router.initialize_from_single_node(0).await?;
want = 1;

wait_log(router.clone(), &hashset![0], want).await?;
}

tracing::info!("--- adding two candidate nodes");
{
// Sync some new nodes.
router.new_raft_node(1).await;
router.new_raft_node(2).await;
router.add_non_voter(0, 1).await?;
router.add_non_voter(0, 2).await?;

tracing::info!("--- changing cluster config");

router.change_membership(0, candidates.clone()).await?;
want += 2; // Tow member change logs

wait_log(router.clone(), &candidates, want).await?;
router.assert_stable_cluster(Some(1), Some(3)).await; // Still in term 1, so leader is still node 0.
}

let leader = router.leader().await.unwrap();


tracing::info!("--- write one log");
{
router.client_request_many(leader, "client", 1).await;
want += 1;

wait_log(router.clone(), &candidates, want).await?;
}


// Concurrently add NonVoter and write another log.
tracing::info!("--- concurrently add non-voter and write another log");
{

router.new_raft_node(3).await;
let r = router.clone();

let handle = {
tokio::spawn(async move{
r.add_non_voter(leader, 3).await.unwrap();
Ok::<(), anyhow::Error>(())
})
};

router.client_request_many(leader, "client", 1).await;
want += 1;

let _ = handle.await?;
};


wait_log(router.clone(), &candidates, want).await?;
router.wait_for_metrics(&3u64, |x| { x.state == State::NonVoter }, timeout, &format!("n{}.state -> {:?}", 3, State::NonVoter)).await?;

// THe non-voter should receive the last written log
router.wait_for_metrics(&3u64, |x| { x.last_log_index == want }, timeout, &format!("n{}.last_log_index -> {}", 3, want)).await?;

Ok(())
}

async fn wait_log(router: std::sync::Arc<fixtures::RaftRouter>, node_ids: &HashSet<u64>, want_log: u64) -> anyhow::Result<()> {
let timeout = Duration::from_millis(500);
for i in node_ids.iter() {
router.wait_for_metrics(&i, |x| { x.last_log_index == want_log }, timeout, &format!("n{}.last_log_index -> {}", i, want_log)).await?;
router.wait_for_metrics(&i, |x| { x.last_applied == want_log }, timeout, &format!("n{}.last_applied -> {}", i, want_log)).await?;
}
Ok(())
}
8 changes: 7 additions & 1 deletion async-raft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,19 @@ impl RaftRouter {
impl RaftNetwork<MemClientRequest> for RaftRouter {
/// Send an AppendEntries RPC to the target Raft node (§5).
async fn append_entries(&self, target: u64, rpc: AppendEntriesRequest<MemClientRequest>) -> Result<AppendEntriesResponse> {

tracing::debug!("append_entries to id={} {:?}", target, rpc);

let rt = self.routing_table.read().await;
let isolated = self.isolated_nodes.read().await;
let addr = rt.get(&target).expect("target node not found in routing table");
if isolated.contains(&target) || isolated.contains(&rpc.leader_id) {
return Err(anyhow!("target node is isolated"));
}
Ok(addr.0.append_entries(rpc).await?)
let resp = addr.0.append_entries(rpc).await;

tracing::debug!("append_entries: recv resp from id={} {:?}", target, resp);
Ok(resp?)
}

/// Send an InstallSnapshot RPC to the target Raft node (§7).
Expand Down

0 comments on commit f449b64

Please sign in to comment.