From 6d53aa12f66ecd08e81bcb055eb17387b835e2eb Mon Sep 17 00:00:00 2001 From: drdr xp Date: Tue, 31 Aug 2021 00:31:52 +0800 Subject: [PATCH] fix: too many(50) inconsistent log should not live lock append-entries - Reproduce the bug that when append-entries, if there are more than 50 inconsistent logs, the responded `conflict` is always set to `self.last_log`, which blocks replication for ever. Because the next time append-entries use the same `prev_log_id`, it actually does not search backward for the first consistent log entry. https://github.com/drmingdrmer/async-raft/blob/79a39970855d80e1d3b761fadbce140ecf1da59e/async-raft/src/core/append_entries.rs#L131-L154 The test to reproduce it fakes a cluster of node 0,1,2: R0 has 100 uncommitted log at term 2. R2 has 100 uncommitted log at term 3. ``` R0 ... 2,99 2,100 R1 R2 ... 3,99, 3,00 ``` Before this fix, brings up the cluster, R2 becomes leader and will never sync any log to R0. The fix is also quite simple: - Search backward instead of searching forward, to find the last log entry that matches `prev_log_id.term`, and responds this log id to the leader to let it send next `append_entries` RPC since this log id. - If no such matching term is found, use the first log id it sees, e.g., the entry at index `prev_log_id.index - 50` for next `append_entries`. --- async-raft/src/core/admin.rs | 2 +- async-raft/src/core/append_entries.rs | 23 ++-- async-raft/src/core/mod.rs | 8 +- async-raft/src/core/vote.rs | 4 +- async-raft/tests/append_inconsistent_log.rs | 132 ++++++++++++++++++++ memstore/src/lib.rs | 2 +- 6 files changed, 157 insertions(+), 14 deletions(-) create mode 100644 async-raft/tests/append_inconsistent_log.rs diff --git a/async-raft/src/core/admin.rs b/async-raft/src/core/admin.rs index 67cfa0d33..79dd3ed40 100644 --- a/async-raft/src/core/admin.rs +++ b/async-raft/src/core/admin.rs @@ -23,7 +23,7 @@ use crate::RaftStorage; impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage> NonVoterState<'a, D, R, N, S> { /// Handle the admin `init_with_config` command. - #[tracing::instrument(level = "trace", skip(self))] + #[tracing::instrument(level = "debug", skip(self))] pub(super) async fn handle_init_with_config( &mut self, mut members: BTreeSet, diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index 663bc48de..0a4f1b397 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -27,6 +27,8 @@ impl, S: RaftStorage> Ra &mut self, msg: AppendEntriesRequest, ) -> RaftResult { + tracing::debug!(%self.last_log_id); + // If message's term is less than most recent term, then we do not honor the request. if msg.term < self.current_term { tracing::debug!({self.current_term, rpc_term=msg.term}, "AppendEntries RPC term is less than current term"); @@ -151,21 +153,25 @@ impl, S: RaftStorage> Ra // The target entry does not have the same term. Fetch the last 50 logs, and use the last // entry of that payload which is still in the target term for conflict optimization. else { - let start = if msg.prev_log_id.index >= 50 { + let mut start = if msg.prev_log_id.index >= 50 { msg.prev_log_id.index - 50 } else { 0 }; + if start == 0 { + start = 1 + } let old_entries = self .storage .get_log_entries(start..msg.prev_log_id.index) .await .map_err(|err| self.map_fatal_storage_error(err))?; - let opt = match old_entries.iter().find(|entry| entry.log_id.term == msg.prev_log_id.term) { + + let first = old_entries.first().map(|x| x.log_id).unwrap_or_default(); + + let opt = match old_entries.iter().rev().find(|entry| entry.log_id.term <= msg.prev_log_id.term) { Some(entry) => Some(ConflictOpt { log_id: entry.log_id }), - None => Some(ConflictOpt { - log_id: self.last_log_id, - }), + None => Some(ConflictOpt { log_id: first }), }; if report_metrics { self.report_metrics(Update::Ignore); @@ -312,8 +318,11 @@ impl, S: RaftStorage> Ra return; } - assert!(start <= stop); - if start == stop { + tracing::debug!(start, stop, self.commit_index, %self.last_log_id, "start stop"); + + // when self.commit_index is not initialized, e.g. the first heartbeat from leader always has a commit_index to + // be 0, because the leader needs one round of heartbeat to find out the commit index. + if start >= stop { return; } diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index 398740a59..faf3cb4cb 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -185,7 +185,7 @@ impl, S: RaftStorage> Ra } /// The main loop of the Raft protocol. - #[tracing::instrument(level="trace", skip(self), fields(id=self.id, cluster=%self.config.cluster_name))] + #[tracing::instrument(level="debug", skip(self), fields(id=self.id, cluster=%self.config.cluster_name))] async fn main(mut self) -> RaftResult<()> { tracing::debug!("raft node is initializing"); @@ -210,9 +210,9 @@ impl, S: RaftStorage> Ra let has_log = self.last_log_id.index != u64::MIN; let single = self.membership.members.len() == 1; - let is_candidate = self.membership.contains(&self.id); + let is_voter = self.membership.contains(&self.id); - self.target_state = match (has_log, single, is_candidate) { + self.target_state = match (has_log, single, is_voter) { // A restarted raft that already received some logs but was not yet added to a cluster. // It should remain in NonVoter state, not Follower. (true, true, false) => State::NonVoter, @@ -239,7 +239,7 @@ impl, S: RaftStorage> Ra // to ensure that restarted nodes don't disrupt a stable cluster by timing out and driving up their // term before network communication is established. let inst = Instant::now() - + Duration::from_secs(30) + + Duration::from_secs(2) + Duration::from_millis(self.config.new_rand_election_timeout()); self.next_election_timeout = Some(inst); } diff --git a/async-raft/src/core/vote.rs b/async-raft/src/core/vote.rs index 38bc2ea68..877c7feef 100644 --- a/async-raft/src/core/vote.rs +++ b/async-raft/src/core/vote.rs @@ -19,8 +19,10 @@ impl, S: RaftStorage> Ra /// An RPC invoked by candidates to gather votes (ยง5.2). /// /// See `receiver implementation: RequestVote RPC` in raft-essentials.md in this repo. - #[tracing::instrument(level = "trace", skip(self, msg))] + #[tracing::instrument(level = "debug", skip(self))] pub(super) async fn handle_vote_request(&mut self, msg: VoteRequest) -> RaftResult { + tracing::debug!({candidate=msg.candidate_id, self.current_term, rpc_term=msg.term}, "start handle_vote_request"); + // If candidate's current term is less than this nodes current term, reject. if msg.term < self.current_term { tracing::debug!({candidate=msg.candidate_id, self.current_term, rpc_term=msg.term}, "RequestVote RPC term is less than current term"); diff --git a/async-raft/tests/append_inconsistent_log.rs b/async-raft/tests/append_inconsistent_log.rs new file mode 100644 index 000000000..c5a9d7979 --- /dev/null +++ b/async-raft/tests/append_inconsistent_log.rs @@ -0,0 +1,132 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use async_raft::raft::Entry; +use async_raft::raft::EntryPayload; +use async_raft::storage::HardState; +use async_raft::Config; +use async_raft::LogId; +use async_raft::RaftStorage; +use async_raft::State; +use fixtures::RaftRouter; +use maplit::btreeset; + +#[macro_use] +mod fixtures; + +/// Too many inconsistent log should not block replication. +/// +/// Reproduce the bug that when append-entries, if there are more than 50 inconsistent logs the conflict is always set +/// to `last_log`, which blocks replication for ever. +/// https://github.com/drmingdrmer/async-raft/blob/79a39970855d80e1d3b761fadbce140ecf1da59e/async-raft/src/core/append_entries.rs#L131-L154 +/// +/// - fake a cluster of node 0,1,2. R0 has ~100 uncommitted log at term 2. R2 has ~100 uncommitted log at term 3. +/// +/// ``` +/// R0 ... 2,99 2,100 +/// R1 +/// R2 ... 3,99, 3,00 +/// ``` +/// +/// - Start the cluster and node 2 start to replicate logs. +/// - test the log should be replicated to node 0. +/// +/// RUST_LOG=async_raft,memstore,append_inconsistent_log=trace cargo test -p async-raft --test append_inconsistent_log +#[tokio::test(flavor = "multi_thread", worker_threads = 6)] +async fn append_inconsistent_log() -> Result<()> { + let (_log_guard, ut_span) = init_ut!(); + let _ent = ut_span.enter(); + + // Setup test dependencies. + let config = Arc::new(Config::build("test".into()).validate().expect("failed to build Raft config")); + let router = Arc::new(RaftRouter::new(config.clone())); + router.new_raft_node(0).await; + + let mut n_logs = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {}).await?; + + tracing::info!("--- remove all nodes and fake the logs"); + + let (r0, sto0) = router.remove_node(0).await.unwrap(); + let (r1, sto1) = router.remove_node(1).await.unwrap(); + let (r2, sto2) = router.remove_node(2).await.unwrap(); + + r0.shutdown().await?; + r1.shutdown().await?; + r2.shutdown().await?; + + for i in n_logs + 1..=100 { + sto0.append_to_log(&[&Entry { + log_id: LogId { term: 2, index: i }, + payload: EntryPayload::Blank, + }]) + .await?; + + sto2.append_to_log(&[&Entry { + log_id: LogId { term: 3, index: i }, + payload: EntryPayload::Blank, + }]) + .await?; + } + + sto0.save_hard_state(&HardState { + current_term: 2, + voted_for: Some(0), + }) + .await?; + + sto2.save_hard_state(&HardState { + current_term: 3, + voted_for: Some(2), + }) + .await?; + + n_logs = 100; + + tracing::info!("--- restart node 1 and isolate. To let node-2 to become leader, node-1 should not vote for node-0"); + { + router.new_raft_node_with_sto(1, sto1.clone()).await; + router.isolate_node(1).await; + } + + tracing::info!("--- restart node 0 and 2"); + { + router.new_raft_node_with_sto(0, sto0.clone()).await; + router.new_raft_node_with_sto(2, sto2.clone()).await; + } + + // leader appends a blank log. + n_logs += 1; + + tracing::info!("--- wait for node states"); + { + router + .wait_for_state( + &btreeset! {0}, + State::Follower, + Some(Duration::from_millis(2000)), + "node 0 become follower", + ) + .await?; + + router + .wait_for_state( + &btreeset! {2}, + State::Leader, + Some(Duration::from_millis(5000)), + "node 2 become leader", + ) + .await?; + } + + router + .wait(&0, Some(Duration::from_millis(2000))) + .await? + .metrics(|x| x.last_log_index == n_logs, "sync log to node 0") + .await?; + + let logs = sto0.get_log_entries(60..=60).await?; + assert_eq!(3, logs.first().unwrap().log_id.term, "log is overridden by leader logs"); + + Ok(()) +} diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 0244313b2..db3f7728d 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -574,7 +574,7 @@ impl RaftStorage for MemStore { } } - #[tracing::instrument(level = "trace", skip(self))] + #[tracing::instrument(level = "debug", skip(self))] async fn save_hard_state(&self, hs: &HardState) -> Result<()> { self.defensive_incremental_hard_state(hs).await?;