From c61b4c4922d2cee93135ccd87c95dcd7ab780ad1 Mon Sep 17 00:00:00 2001 From: drdr xp Date: Thu, 23 Dec 2021 19:46:44 +0800 Subject: [PATCH] change: remove ConflictOpt, which is a wrapper of log_id; add matched log id in AppendEntriesResponse --- async-raft/src/core/append_entries.rs | 21 ++++--- async-raft/src/raft.rs | 32 +++++------ async-raft/src/replication/mod.rs | 23 +++----- async-raft/tests/append_conflicts.rs | 56 +++++++------------ async-raft/tests/append_updates_membership.rs | 8 +-- async-raft/tests/compaction.rs | 4 +- .../tests/conflict_with_empty_entries.rs | 31 ++++------ 7 files changed, 72 insertions(+), 103 deletions(-) diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index 058557d20..d123d3d65 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -5,7 +5,6 @@ use crate::core::UpdateCurrentLeader; use crate::error::RaftResult; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; -use crate::raft::ConflictOpt; use crate::raft::Entry; use crate::raft::EntryPayload; use crate::ActiveMembership; @@ -36,8 +35,8 @@ impl, S: RaftStorage> Ra tracing::debug!({self.current_term, rpc_term=msg.term}, "AppendEntries RPC term is less than current term"); return Ok(AppendEntriesResponse { term: self.current_term, - success: false, - conflict_opt: None, + matched: None, + conflict: None, }); } @@ -247,11 +246,19 @@ impl, S: RaftStorage> Ra return Ok(AppendEntriesResponse { term: self.current_term, - success: false, - conflict_opt: Some(ConflictOpt { log_id: *prev_log_id }), + matched: None, + conflict: Some(*prev_log_id), }); } + // If prev_log_id matches local entry, then every inconsistent entries will be removed. + // Thus the last known matching log id has to be the last entry id. + let matched = Some(if entries.is_empty() { + *prev_log_id + } else { + entries[entries.len() - 1].log_id + }); + // The entries left are all inconsistent log or absent let (n_matching, entries) = self.skip_matching_entries(entries).await?; @@ -279,8 +286,8 @@ impl, S: RaftStorage> Ra Ok(AppendEntriesResponse { term: self.current_term, - success: true, - conflict_opt: None, + matched, + conflict: None, }) } diff --git a/async-raft/src/raft.rs b/async-raft/src/raft.rs index c2e059776..806ae1e94 100644 --- a/async-raft/src/raft.rs +++ b/async-raft/src/raft.rs @@ -520,26 +520,23 @@ impl MessageSummary for AppendEntriesRequest { pub struct AppendEntriesResponse { /// The responding node's current term, for leader to update itself. pub term: u64, - /// Will be true if follower contained entry matching `prev_log_index` and `prev_log_term`. - pub success: bool, - /// A value used to implement the _conflicting term_ optimization outlined in §5.3. + + /// The last matching log id on follower. + /// + /// It is a successful append-entry iff `matched` is `Some()`. + pub matched: Option, + + /// The log id that is different from the leader on follower. /// - /// This value will only be present, and should only be considered, when `success` is `false`. - pub conflict_opt: Option, + /// `conflict` is None if `matched` is `Some()`, because if there is a matching entry, all following inconsistent + /// entries will be deleted. + pub conflict: Option, } -/// A struct used to implement the _conflicting term_ optimization outlined in §5.3 for log replication. -/// -/// This value will only be present, and should only be considered, when an `AppendEntriesResponse` -/// object has a `success` value of `false`. -/// -/// This implementation of Raft uses this value to more quickly synchronize a leader with its -/// followers which may be some distance behind in replication, may have conflicting entries, or -/// which may be new to the cluster. -#[derive(Debug, Serialize, Deserialize, PartialEq)] -pub struct ConflictOpt { - /// The most recent entry which does not conflict with the received request. - pub log_id: LogId, +impl AppendEntriesResponse { + pub fn success(&self) -> bool { + self.matched.is_some() + } } /// A Raft log entry. @@ -683,6 +680,7 @@ impl MembershipConfig { pub struct VoteRequest { /// The candidate's current term. pub term: u64, + /// The candidate's ID. pub candidate_id: u64, diff --git a/async-raft/src/replication/mod.rs b/async-raft/src/replication/mod.rs index 449175995..ba216acc7 100644 --- a/async-raft/src/replication/mod.rs +++ b/async-raft/src/replication/mod.rs @@ -307,12 +307,6 @@ impl, S: RaftStorage> Re break (prev_log_id, logs); }; - let last_log_id = if logs.is_empty() { - prev_log_id - } else { - logs[logs.len() - 1].log_id - }; - // Build the heartbeat frame to be sent to the follower. let payload = AppendEntriesRequest { term: self.term, @@ -350,13 +344,13 @@ impl, S: RaftStorage> Re } }; - tracing::debug!(%last_log_id, "append_entries resp: {:?}", append_resp); + tracing::debug!("append_entries resp: {:?}", append_resp); // Handle success conditions. - if append_resp.success { - self.matched = last_log_id; + if append_resp.success() { + self.matched = append_resp.matched.unwrap(); // TODO(xp): if matched does not change, do not bother the core. - self.update_max_possible_matched_index(last_log_id.index); + self.update_max_possible_matched_index(self.matched.index); self.update_matched(); return Ok(()); @@ -375,7 +369,7 @@ impl, S: RaftStorage> Re } // Replication was not successful, handle conflict optimization record, else decrement `next_index`. - let conflict = append_resp.conflict_opt.unwrap(); + let conflict = append_resp.conflict.unwrap(); tracing::debug!( ?conflict, @@ -383,13 +377,10 @@ impl, S: RaftStorage> Re "append entries failed, handling conflict opt" ); - assert_eq!( - conflict.log_id, prev_log_id, - "if conflict, it is always the prev_log_id" - ); + assert_eq!(conflict, prev_log_id, "if conflict, it is always the prev_log_id"); // Continue to find the matching log id on follower. - self.max_possible_matched_index = conflict.log_id.index - 1; + self.max_possible_matched_index = conflict.index - 1; Ok(()) } diff --git a/async-raft/tests/append_conflicts.rs b/async-raft/tests/append_conflicts.rs index 54c87af52..990151ba0 100644 --- a/async-raft/tests/append_conflicts.rs +++ b/async-raft/tests/append_conflicts.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use anyhow::Result; use async_raft::raft::AppendEntriesRequest; -use async_raft::raft::ConflictOpt; use async_raft::raft::Entry; use async_raft::Config; use async_raft::LogId; @@ -55,8 +54,8 @@ async fn append_conflicts() -> Result<()> { }; let resp = r0.append_entries(req.clone()).await?; - assert!(resp.success); - assert_eq!(None, resp.conflict_opt); + assert!(resp.success()); + assert_eq!(None, resp.conflict); check_logs(&sto0, vec![0]).await?; @@ -72,16 +71,16 @@ async fn append_conflicts() -> Result<()> { }; let resp = r0.append_entries(req.clone()).await?; - assert!(resp.success); - assert_eq!(None, resp.conflict_opt); + assert!(resp.success()); + assert_eq!(None, resp.conflict); check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?; tracing::info!("--- case 0: prev_log_id.index == 0, last_log_id mismatch"); let resp = r0.append_entries(req.clone()).await?; - assert!(resp.success); - assert_eq!(None, resp.conflict_opt); + assert!(resp.success()); + assert_eq!(None, resp.conflict); check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?; @@ -97,8 +96,8 @@ async fn append_conflicts() -> Result<()> { }; let resp = r0.append_entries(req).await?; - assert!(resp.success); - assert_eq!(None, resp.conflict_opt); + assert!(resp.success()); + assert_eq!(None, resp.conflict); check_logs(&sto0, vec![0, 1, 1, 1, 1]).await?; @@ -114,8 +113,8 @@ async fn append_conflicts() -> Result<()> { }; let resp = r0.append_entries(req).await?; - assert!(resp.success); - assert_eq!(None, resp.conflict_opt); + assert!(resp.success()); + assert_eq!(None, resp.conflict); check_logs(&sto0, vec![0, 1, 1, 2]).await?; @@ -129,13 +128,8 @@ async fn append_conflicts() -> Result<()> { }; let resp = r0.append_entries(req).await?; - assert!(!resp.success); - assert_eq!( - Some(ConflictOpt { - log_id: LogId { term: 1, index: 2000 } - }), - resp.conflict_opt - ); + assert!(!resp.success()); + assert_eq!(Some(LogId::new(1, 2000)), resp.conflict); check_logs(&sto0, vec![0, 1, 1, 2]).await?; @@ -150,14 +144,9 @@ async fn append_conflicts() -> Result<()> { }; let resp = r0.append_entries(req).await?; - assert!(!resp.success); + assert!(!resp.success()); // returns the id just before prev_log_id.index - assert_eq!( - Some(ConflictOpt { - log_id: LogId { term: 3, index: 3 } - }), - resp.conflict_opt - ); + assert_eq!(Some(LogId::new(3, 3)), resp.conflict); check_logs(&sto0, vec![0, 1, 1]).await?; @@ -172,8 +161,8 @@ async fn append_conflicts() -> Result<()> { }; let resp = r0.append_entries(req).await?; - assert!(resp.success); - assert_eq!(None, resp.conflict_opt); + assert!(resp.success()); + assert_eq!(None, resp.conflict); // check prepared store check_logs(&sto0, vec![0, 1, 1, 2, 2, 2]).await?; @@ -188,8 +177,8 @@ async fn append_conflicts() -> Result<()> { }; let resp = r0.append_entries(req).await?; - assert!(resp.success); - assert_eq!(None, resp.conflict_opt); + assert!(resp.success()); + assert_eq!(None, resp.conflict); check_logs(&sto0, vec![0, 1, 1, 2, 3]).await?; @@ -205,13 +194,8 @@ async fn append_conflicts() -> Result<()> { }; let resp = r0.append_entries(req).await?; - assert!(!resp.success); - assert_eq!( - Some(ConflictOpt { - log_id: LogId { term: 1, index: 200 } - }), - resp.conflict_opt - ); + assert!(!resp.success()); + assert_eq!(Some(LogId::new(1, 200)), resp.conflict); Ok(()) } diff --git a/async-raft/tests/append_updates_membership.rs b/async-raft/tests/append_updates_membership.rs index 01f6c6993..b482f6d51 100644 --- a/async-raft/tests/append_updates_membership.rs +++ b/async-raft/tests/append_updates_membership.rs @@ -71,8 +71,8 @@ async fn append_updates_membership() -> Result<()> { }; let resp = r0.append_entries(req.clone()).await?; - assert!(resp.success); - assert_eq!(None, resp.conflict_opt); + assert!(resp.success()); + assert_eq!(None, resp.conflict); r0.wait(timeout()).members(btreeset! {1,2,3,4}, "append-entries update membership").await?; } @@ -88,8 +88,8 @@ async fn append_updates_membership() -> Result<()> { }; let resp = r0.append_entries(req.clone()).await?; - assert!(resp.success); - assert_eq!(None, resp.conflict_opt); + assert!(resp.success()); + assert_eq!(None, resp.conflict); r0.wait(timeout()).members(btreeset! {1,2}, "deleting inconsistent lgos updates membership").await?; } diff --git a/async-raft/tests/compaction.rs b/async-raft/tests/compaction.rs index 301e58927..97d002248 100644 --- a/async-raft/tests/compaction.rs +++ b/async-raft/tests/compaction.rs @@ -138,8 +138,8 @@ async fn compaction() -> Result<()> { }) .await?; - assert!(res.success); - assert_eq!(None, res.conflict_opt); + assert!(res.success()); + assert_eq!(None, res.conflict); } Ok(()) diff --git a/async-raft/tests/conflict_with_empty_entries.rs b/async-raft/tests/conflict_with_empty_entries.rs index 1d07e0dd6..5ce04fcdf 100644 --- a/async-raft/tests/conflict_with_empty_entries.rs +++ b/async-raft/tests/conflict_with_empty_entries.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use anyhow::Result; use async_raft::raft::AppendEntriesRequest; -use async_raft::raft::ConflictOpt; use async_raft::raft::Entry; use async_raft::raft::EntryPayload; use async_raft::Config; @@ -57,15 +56,10 @@ async fn conflict_with_empty_entries() -> Result<()> { }; let resp = router.send_append_entries(0, rpc).await?; - assert!(!resp.success); - assert!(resp.conflict_opt.is_some()); - let c = resp.conflict_opt.unwrap(); - assert_eq!( - ConflictOpt { - log_id: LogId { term: 1, index: 5 } - }, - c - ); + assert!(!resp.success()); + assert!(resp.conflict.is_some()); + let c = resp.conflict.unwrap(); + assert_eq!(LogId { term: 1, index: 5 }, c); // Feed 2 logs @@ -91,8 +85,8 @@ async fn conflict_with_empty_entries() -> Result<()> { }; let resp = router.send_append_entries(0, rpc).await?; - assert!(resp.success); - assert!(resp.conflict_opt.is_none()); + assert!(resp.success()); + assert!(resp.conflict.is_none()); // Expect a conflict with prev_log_index == 3 @@ -105,15 +99,10 @@ async fn conflict_with_empty_entries() -> Result<()> { }; let resp = router.send_append_entries(0, rpc).await?; - assert!(!resp.success); - assert!(resp.conflict_opt.is_some()); - let c = resp.conflict_opt.unwrap(); - assert_eq!( - ConflictOpt { - log_id: LogId { term: 1, index: 3 } - }, - c - ); + assert!(!resp.success()); + assert!(resp.conflict.is_some()); + let c = resp.conflict.unwrap(); + assert_eq!(LogId::new(1, 3), c); Ok(()) }