diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index dc744abfc..aec5ac483 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1290,7 +1290,7 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore, String>, + result: Result, String>, ) -> Result<(), StorageError> { tracing::debug!( target = display(target), @@ -1346,17 +1346,17 @@ impl, S: RaftStorage> RaftCore matched, + let progress = match result { + Ok(p) => p, Err(_err_str) => { return Ok(()); } }; - self.engine.update_progress(target, Some(matched)); + self.engine.update_progress(target, Some(progress.matching.unwrap())); self.run_engine_commands::>(&[]).await?; - self.update_replication_metrics(target, matched); + self.update_replication_metrics(target, progress.matching.unwrap()); Ok(()) } diff --git a/openraft/src/progress/entry.rs b/openraft/src/progress/entry.rs index 193bd8df1..b15466cbd 100644 --- a/openraft/src/progress/entry.rs +++ b/openraft/src/progress/entry.rs @@ -1,4 +1,6 @@ use std::borrow::Borrow; +use std::fmt::Display; +use std::fmt::Formatter; use crate::summary::MessageSummary; use crate::LogId; @@ -86,7 +88,7 @@ impl ProgressEntry { if let Some(s) = &mut self.searching { debug_assert!(conflict < s.end); - debug_assert!(conflict >= s.mid); + debug_assert!(conflict + 1 >= s.mid, "conflict can only be the prev_log_index"); s.end = conflict; @@ -100,7 +102,9 @@ impl ProgressEntry { } } - /// Return the starting log index range(`[start,end)`) for the next AppendEntries. + /// Return the index range(`[start,end]`) of the first log in the next AppendEntries. + /// + /// The returned range is left close and right close. #[allow(dead_code)] pub(crate) fn sending_start(&self) -> (u64, u64) { match self.searching { @@ -138,6 +142,19 @@ impl Borrow>> for ProgressEntry { } } +impl Display for ProgressEntry { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match &self.searching { + None => { + write!(f, "{}", self.matching.summary()) + } + Some(s) => { + write!(f, "[{}, {}, {})", self.matching.summary(), s.mid, s.end) + } + } + } +} + #[cfg(test)] mod tests { use std::borrow::Borrow; diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 5a66f8ef3..98da472b3 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -41,6 +41,7 @@ use crate::membership::IntoNodes; use crate::metrics::RaftMetrics; use crate::metrics::Wait; use crate::node::Node; +use crate::progress::entry::ProgressEntry; use crate::replication::ReplicationSessionId; use crate::storage::Snapshot; use crate::AppData; @@ -919,13 +920,13 @@ pub(crate) enum RaftMsg, S: RaftStor /// Update the `matched` log id of a replication target. /// Sent by a replication task `ReplicationCore`. - UpdateReplicationMatched { + UpdateReplicationProgress { /// The ID of the target node for which the match index is to be updated. target: C::NodeId, /// Either the last log id that has been successfully replicated to the target, /// or an error in string. - result: Result, String>, + result: Result, String>, /// In which session this message is sent. /// A replication session(vote,membership_log_id) should ignore message from other session. @@ -1016,7 +1017,7 @@ where RaftMsg::Tick { i } => { format!("Tick {}", i) } - RaftMsg::UpdateReplicationMatched { + RaftMsg::UpdateReplicationProgress { ref target, ref result, ref session_id, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index be1ba34b3..ec7568929 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -89,17 +89,8 @@ pub(crate) struct ReplicationCore, S /// The log id of the highest log entry which is known to be committed in the cluster. committed: Option>, - /// The last know log to be successfully replicated on the target. - /// - /// This Raft implementation also uses a _conflict optimization_ pattern for reducing the - /// number of RPCs which need to be sent back and forth between a peer which is lagging - /// behind. This is defined in ยง5.3. - /// This will be initialized to the leader's (last_log_term, last_log_index), and will be updated as - /// replication proceeds. - matched: Option>, - - /// The last possible matching entry on a follower. - max_possible_matched_index: Option, + /// Replication progress + progress: ProgressEntry, /// if or not need to replicate log entries or states, e.g., `commit_index` etc. need_to_replicate: bool, @@ -139,8 +130,7 @@ impl, S: RaftStorage> Replication config, target_repl_state: TargetReplState::LineRate, committed, - matched: progress_entry.matching, - max_possible_matched_index: progress_entry.max_possible_matching(), + progress: progress_entry, tx_raft_core, rx_repl, need_to_replicate: true, @@ -221,11 +211,9 @@ impl, S: RaftStorage> Replication /// configured heartbeat interval. #[tracing::instrument(level = "debug", skip(self))] async fn send_append_entries(&mut self) -> Result<(), ReplicationError> { - // find the mid position aligning to 8 - let diff = self.max_possible_matched_index.next_index() - self.matched.next_index(); - let offset = diff / 16 * 8; + let (start, _right) = self.progress.sending_start(); - let mut prev_index = self.matched.index().add(offset); + let mut prev_index = if start == 0 { None } else { Some(start - 1) }; let (prev_log_id, logs, has_more_logs) = loop { // TODO(xp): test heartbeat when all logs are removed. @@ -245,8 +233,7 @@ impl, S: RaftStorage> Replication let end = std::cmp::min(start + self.config.max_payload_entries, last_log_index); tracing::debug!( - ?self.matched, - ?self.max_possible_matched_index, + progress = display(&self.progress), ?last_purged, ?prev_index, end, @@ -321,7 +308,7 @@ impl, S: RaftStorage> Replication let repl_err = match err { RPCError::NodeNotFound(e) => ReplicationError::NodeNotFound(e), RPCError::Timeout(e) => { - let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched { + let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress { target: self.target, result: Err(e.to_string()), session_id: self.session_id, @@ -329,7 +316,7 @@ impl, S: RaftStorage> Replication ReplicationError::Timeout(e) } RPCError::Network(e) => { - let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched { + let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress { target: self.target, result: Err(e.to_string()), session_id: self.session_id, @@ -344,7 +331,7 @@ impl, S: RaftStorage> Replication Err(timeout_err) => { tracing::warn!(error=%timeout_err, "timeout while sending AppendEntries RPC to target"); - let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched { + let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress { target: self.target, result: Err(timeout_err.to_string()), session_id: self.session_id, @@ -385,13 +372,7 @@ impl, S: RaftStorage> Replication AppendEntriesResponse::Conflict => { debug_assert!(conflict.is_some(), "prev_log_id=None never conflict"); let conflict = conflict.unwrap(); - - // Continue to find the matching log id on follower. - self.max_possible_matched_index = if conflict.index == 0 { - None - } else { - Some(conflict.index - 1) - }; + self.progress.update_conflicting(conflict.index); Ok(()) } @@ -401,11 +382,11 @@ impl, S: RaftStorage> Replication /// max_possible_matched_index is the least index for `prev_log_id` to form a consecutive log sequence #[tracing::instrument(level = "trace", skip_all)] fn check_consecutive(&self, last_purged: Option>) -> Result<(), LackEntry> { - tracing::debug!(?last_purged, ?self.max_possible_matched_index, "check_consecutive"); + tracing::debug!(?last_purged, progress = display(&self.progress), "check_consecutive"); - if last_purged.index() > self.max_possible_matched_index { + if last_purged.index() > self.progress.max_possible_matching() { return Err(LackEntry { - index: self.max_possible_matched_index, + index: self.progress.max_possible_matching(), last_purged_log_id: last_purged, }); } @@ -424,26 +405,17 @@ impl, S: RaftStorage> Replication /// And also report the matched log id to RaftCore to commit an entry etc. #[tracing::instrument(level = "trace", skip(self))] fn update_matched(&mut self, new_matched: Option>) { - tracing::debug!( - self.max_possible_matched_index, - ?self.matched, - ?new_matched, "update_matched"); - - if self.max_possible_matched_index < new_matched.index() { - self.max_possible_matched_index = new_matched.index(); - } + tracing::debug!(progress = display(&self.progress), ?new_matched, "update_matched"); - if self.matched < new_matched { - self.matched = new_matched; + if self.progress.matching < new_matched { + self.progress.update_matching(new_matched); - tracing::debug!(target=%self.target, matched=?self.matched, "matched updated"); + tracing::debug!(target=%self.target, progress=display(&self.progress), "matched updated"); - let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched { - target: self.target, - // `self.matched < new_matched` implies new_matched can not be None. - // Thus unwrap is safe. - result: Ok(self.matched.unwrap()), + let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress { session_id: self.session_id, + target: self.target, + result: Ok(self.progress), }); } } @@ -453,7 +425,7 @@ impl, S: RaftStorage> Replication #[tracing::instrument(level = "trace", skip(self))] pub(self) fn needs_snapshot(&self) -> bool { let c = self.committed.next_index(); - let m = self.matched.next_index(); + let m = self.progress.matching.next_index(); let distance = c.saturating_sub(m); #[allow(clippy::infallible_destructuring_match)] @@ -514,7 +486,7 @@ impl, S: RaftStorage> Replication } } Replicate::Entries(last) => { - if last.index() > self.matched.index() { + if last.index() > self.progress.matching.index() { self.need_to_replicate = true; } } @@ -563,11 +535,7 @@ impl, S: RaftStorage> Replication pub async fn line_rate_loop(&mut self) -> Result<(), ReplicationError> { loop { loop { - tracing::debug!( - "current matched: {:?} max_possible_matched_index: {:?}", - self.matched, - self.max_possible_matched_index - ); + tracing::debug!("progress: {}", self.progress); let res = self.send_append_entries().await; tracing::debug!(target = display(self.target), res = debug(&res), "replication res",); @@ -589,7 +557,7 @@ impl, S: RaftStorage> Replication } } - if self.matched.index() == self.max_possible_matched_index { + if self.progress.searching.is_none() { break; } } @@ -741,9 +709,9 @@ impl, S: RaftStorage> Replication // If we just sent the final chunk of the snapshot, then transition to lagging state. if done { tracing::debug!( - "done install snapshot: snapshot last_log_id: {:?}, matched: {:?}", + "done install snapshot: snapshot last_log_id: {:?}, progress: {}", snapshot.meta.last_log_id, - self.matched, + self.progress, ); self.update_matched(snapshot.meta.last_log_id);