Skip to content

Commit

Permalink
fix: last_applied should be updated only when logs actually applied.
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jun 1, 2021
1 parent 8cb99fe commit 89bb48f
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ jobs:
with:
command: test
args: -p async-raft --test initialization
- name: Integration Test | metrics state machine consistency
uses: actions-rs/cargo@v1
with:
command: test
args: -p async-raft --test metrics_state_machine_consistency
- name: Integration Test | NonVoter restart
uses: actions-rs/cargo@v1
with:
Expand Down
14 changes: 6 additions & 8 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}
})
.collect();
// If we actually have some cached entries to apply, then we optimistically update, as
// `self.last_applied` is held in-memory only, and if an error does come up, then
// Raft will go into shutdown.
if let Some(index) = last_entry_seen {
self.last_applied = index;
self.report_metrics();
}

// If we have no data entries to apply, then do nothing.
if entries.is_empty() {
if let Some(index) = last_entry_seen {
self.last_applied = index;
self.report_metrics();
}
return;
}
// Spawn task to replicate these entries to the state machine.
Expand All @@ -260,7 +258,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// interface a bit before 1.0.
let entries_refs: Vec<_> = entries.iter().map(|(k, v)| (k, v)).collect();
storage.replicate_to_state_machine(&entries_refs).await?;
Ok(None)
Ok(last_entry_seen)
});
self.replicate_to_sm_handle.push(handle);
}
Expand Down
71 changes: 71 additions & 0 deletions async-raft/tests/metrics_state_machine_consistency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::hashset;

use async_raft::{Config, State};
use fixtures::RaftRouter;

mod fixtures;

/// Cluster metrics_state_machine_consistency test.
///
/// What does this test do?
///
/// - brings 2 nodes online: one leader and one non-voter.
/// - write one log to the leader.
/// - asserts that when metrics.last_applied is upto date, the state machine should be upto date too.
///
/// RUST_LOG=async_raft,memstore,metrics_state_machine_consistency=trace cargo test -p async-raft --test metrics_state_machine_consistency
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn metrics_state_machine_consistency() -> Result<()> {
fixtures::init_tracing();

// Setup test dependencies.
let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config"));
let router = Arc::new(RaftRouter::new(config.clone()));

router.new_raft_node(0).await;
router.new_raft_node(1).await;

tracing::info!("--- initializing single node cluster");

// Wait for node 0 to become leader.
router.initialize_with(0, hashset![0]).await?;
router
.wait_for_metrics(
&0u64,
|x| x.state == State::Leader,
Duration::from_micros(100),
"n0.state -> Leader",
)
.await;

tracing::info!("--- add one non-voter");
router.add_non_voter(0, 1).await?;

tracing::info!("--- write one log");
router.client_request(0, "foo", 1).await;

// Wait for metrics to be up to date.
// Once last_applied updated, the key should be visible in state machine.
tracing::info!("--- wait for log to sync");
let want = 2u64;
for node_id in 0..2 {
router
.wait_for_metrics(
&node_id,
|x| x.last_applied == want,
Duration::from_micros(100),
&format!("n{}.last_applied -> {}", node_id, want),
)
.await;

let sto = router.get_sto(&node_id).await;
assert!(sto.get_state_machine().await.client_status.get("foo").is_some());

}

Ok(())
}

0 comments on commit 89bb48f

Please sign in to comment.