Skip to content

Commit

Permalink
Refactor: use ProgressEntry to store replication progress
Browse files Browse the repository at this point in the history
ReplicationCore stores the most recent matching log id and max possible
matching log index in a `ProgressEntry`, which contains progress
related information and operations.
  • Loading branch information
drmingdrmer committed Dec 8, 2022
1 parent 89c270d commit 0657d76
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 69 deletions.
12 changes: 6 additions & 6 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

RaftMsg::UpdateReplicationMatched {
RaftMsg::UpdateReplicationProgress {
target,
result,
session_id,
Expand Down Expand Up @@ -1328,7 +1328,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
async fn handle_update_matched(
&mut self,
target: C::NodeId,
result: Result<LogId<C::NodeId>, String>,
result: Result<ProgressEntry<C::NodeId>, String>,
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!(
target = display(target),
Expand All @@ -1346,17 +1346,17 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

let matched = match result {
Ok(matched) => matched,
let progress = match result {
Ok(p) => p,
Err(_err_str) => {
return Ok(());
}
};

self.engine.update_progress(target, Some(matched));
self.engine.update_progress(target, Some(progress.matching.unwrap()));
self.run_engine_commands::<Entry<C>>(&[]).await?;

self.update_replication_metrics(target, matched);
self.update_replication_metrics(target, progress.matching.unwrap());

Ok(())
}
Expand Down
21 changes: 19 additions & 2 deletions openraft/src/progress/entry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::borrow::Borrow;
use std::fmt::Display;
use std::fmt::Formatter;

use crate::summary::MessageSummary;
use crate::LogId;
Expand Down Expand Up @@ -86,7 +88,7 @@ impl<NID: NodeId> ProgressEntry<NID> {

if let Some(s) = &mut self.searching {
debug_assert!(conflict < s.end);
debug_assert!(conflict >= s.mid);
debug_assert!(conflict + 1 >= s.mid, "conflict can only be the prev_log_index");

s.end = conflict;

Expand All @@ -100,7 +102,9 @@ impl<NID: NodeId> ProgressEntry<NID> {
}
}

/// Return the starting log index range(`[start,end)`) for the next AppendEntries.
/// Return the index range(`[start,end]`) of the first log in the next AppendEntries.
///
/// The returned range is left close and right close.
#[allow(dead_code)]
pub(crate) fn sending_start(&self) -> (u64, u64) {
match self.searching {
Expand Down Expand Up @@ -138,6 +142,19 @@ impl<NID: NodeId> Borrow<Option<LogId<NID>>> for ProgressEntry<NID> {
}
}

impl<NID: NodeId> Display for ProgressEntry<NID> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match &self.searching {
None => {
write!(f, "{}", self.matching.summary())
}
Some(s) => {
write!(f, "[{}, {}, {})", self.matching.summary(), s.mid, s.end)
}
}
}
}

#[cfg(test)]
mod tests {
use std::borrow::Borrow;
Expand Down
7 changes: 4 additions & 3 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::membership::IntoNodes;
use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::node::Node;
use crate::progress::entry::ProgressEntry;
use crate::replication::ReplicationSessionId;
use crate::storage::Snapshot;
use crate::AppData;
Expand Down Expand Up @@ -919,13 +920,13 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor

/// Update the `matched` log id of a replication target.
/// Sent by a replication task `ReplicationCore`.
UpdateReplicationMatched {
UpdateReplicationProgress {
/// The ID of the target node for which the match index is to be updated.
target: C::NodeId,

/// Either the last log id that has been successfully replicated to the target,
/// or an error in string.
result: Result<LogId<C::NodeId>, String>,
result: Result<ProgressEntry<C::NodeId>, String>,

/// In which session this message is sent.
/// A replication session(vote,membership_log_id) should ignore message from other session.
Expand Down Expand Up @@ -1016,7 +1017,7 @@ where
RaftMsg::Tick { i } => {
format!("Tick {}", i)
}
RaftMsg::UpdateReplicationMatched {
RaftMsg::UpdateReplicationProgress {
ref target,
ref result,
ref session_id,
Expand Down
84 changes: 26 additions & 58 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,8 @@ pub(crate) struct ReplicationCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S
/// The log id of the highest log entry which is known to be committed in the cluster.
committed: Option<LogId<C::NodeId>>,

/// The last know log to be successfully replicated on the target.
///
/// This Raft implementation also uses a _conflict optimization_ pattern for reducing the
/// number of RPCs which need to be sent back and forth between a peer which is lagging
/// behind. This is defined in §5.3.
/// This will be initialized to the leader's (last_log_term, last_log_index), and will be updated as
/// replication proceeds.
matched: Option<LogId<C::NodeId>>,

/// The last possible matching entry on a follower.
max_possible_matched_index: Option<u64>,
/// Replication progress
progress: ProgressEntry<C::NodeId>,

/// if or not need to replicate log entries or states, e.g., `commit_index` etc.
need_to_replicate: bool,
Expand Down Expand Up @@ -139,8 +130,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
config,
target_repl_state: TargetReplState::LineRate,
committed,
matched: progress_entry.matching,
max_possible_matched_index: progress_entry.max_possible_matching(),
progress: progress_entry,
tx_raft_core,
rx_repl,
need_to_replicate: true,
Expand Down Expand Up @@ -221,11 +211,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
/// configured heartbeat interval.
#[tracing::instrument(level = "debug", skip(self))]
async fn send_append_entries(&mut self) -> Result<(), ReplicationError<C::NodeId, C::Node>> {
// find the mid position aligning to 8
let diff = self.max_possible_matched_index.next_index() - self.matched.next_index();
let offset = diff / 16 * 8;
let (start, _right) = self.progress.sending_start();

let mut prev_index = self.matched.index().add(offset);
let mut prev_index = if start == 0 { None } else { Some(start - 1) };

let (prev_log_id, logs, has_more_logs) = loop {
// TODO(xp): test heartbeat when all logs are removed.
Expand All @@ -245,8 +233,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
let end = std::cmp::min(start + self.config.max_payload_entries, last_log_index);

tracing::debug!(
?self.matched,
?self.max_possible_matched_index,
progress = display(&self.progress),
?last_purged,
?prev_index,
end,
Expand Down Expand Up @@ -321,15 +308,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
let repl_err = match err {
RPCError::NodeNotFound(e) => ReplicationError::NodeNotFound(e),
RPCError::Timeout(e) => {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
target: self.target,
result: Err(e.to_string()),
session_id: self.session_id,
});
ReplicationError::Timeout(e)
}
RPCError::Network(e) => {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
target: self.target,
result: Err(e.to_string()),
session_id: self.session_id,
Expand All @@ -344,7 +331,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
Err(timeout_err) => {
tracing::warn!(error=%timeout_err, "timeout while sending AppendEntries RPC to target");

let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
target: self.target,
result: Err(timeout_err.to_string()),
session_id: self.session_id,
Expand Down Expand Up @@ -385,13 +372,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
AppendEntriesResponse::Conflict => {
debug_assert!(conflict.is_some(), "prev_log_id=None never conflict");
let conflict = conflict.unwrap();

// Continue to find the matching log id on follower.
self.max_possible_matched_index = if conflict.index == 0 {
None
} else {
Some(conflict.index - 1)
};
self.progress.update_conflicting(conflict.index);

Ok(())
}
Expand All @@ -401,11 +382,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
/// max_possible_matched_index is the least index for `prev_log_id` to form a consecutive log sequence
#[tracing::instrument(level = "trace", skip_all)]
fn check_consecutive(&self, last_purged: Option<LogId<C::NodeId>>) -> Result<(), LackEntry<C::NodeId>> {
tracing::debug!(?last_purged, ?self.max_possible_matched_index, "check_consecutive");
tracing::debug!(?last_purged, progress = display(&self.progress), "check_consecutive");

if last_purged.index() > self.max_possible_matched_index {
if last_purged.index() > self.progress.max_possible_matching() {
return Err(LackEntry {
index: self.max_possible_matched_index,
index: self.progress.max_possible_matching(),
last_purged_log_id: last_purged,
});
}
Expand All @@ -424,26 +405,17 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
/// And also report the matched log id to RaftCore to commit an entry etc.
#[tracing::instrument(level = "trace", skip(self))]
fn update_matched(&mut self, new_matched: Option<LogId<C::NodeId>>) {
tracing::debug!(
self.max_possible_matched_index,
?self.matched,
?new_matched, "update_matched");

if self.max_possible_matched_index < new_matched.index() {
self.max_possible_matched_index = new_matched.index();
}
tracing::debug!(progress = display(&self.progress), ?new_matched, "update_matched");

if self.matched < new_matched {
self.matched = new_matched;
if self.progress.matching < new_matched {
self.progress.update_matching(new_matched);

tracing::debug!(target=%self.target, matched=?self.matched, "matched updated");
tracing::debug!(target=%self.target, progress=display(&self.progress), "matched updated");

let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
target: self.target,
// `self.matched < new_matched` implies new_matched can not be None.
// Thus unwrap is safe.
result: Ok(self.matched.unwrap()),
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
session_id: self.session_id,
target: self.target,
result: Ok(self.progress),
});
}
}
Expand All @@ -453,7 +425,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
#[tracing::instrument(level = "trace", skip(self))]
pub(self) fn needs_snapshot(&self) -> bool {
let c = self.committed.next_index();
let m = self.matched.next_index();
let m = self.progress.matching.next_index();
let distance = c.saturating_sub(m);

#[allow(clippy::infallible_destructuring_match)]
Expand Down Expand Up @@ -514,7 +486,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
}
}
Replicate::Entries(last) => {
if last.index() > self.matched.index() {
if last.index() > self.progress.matching.index() {
self.need_to_replicate = true;
}
}
Expand Down Expand Up @@ -563,11 +535,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
pub async fn line_rate_loop(&mut self) -> Result<(), ReplicationError<C::NodeId, C::Node>> {
loop {
loop {
tracing::debug!(
"current matched: {:?} max_possible_matched_index: {:?}",
self.matched,
self.max_possible_matched_index
);
tracing::debug!("progress: {}", self.progress);

let res = self.send_append_entries().await;
tracing::debug!(target = display(self.target), res = debug(&res), "replication res",);
Expand All @@ -589,7 +557,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
}
}

if self.matched.index() == self.max_possible_matched_index {
if self.progress.searching.is_none() {
break;
}
}
Expand Down Expand Up @@ -741,9 +709,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
// If we just sent the final chunk of the snapshot, then transition to lagging state.
if done {
tracing::debug!(
"done install snapshot: snapshot last_log_id: {:?}, matched: {:?}",
"done install snapshot: snapshot last_log_id: {:?}, progress: {}",
snapshot.meta.last_log_id,
self.matched,
self.progress,
);

self.update_matched(snapshot.meta.last_log_id);
Expand Down

0 comments on commit 0657d76

Please sign in to comment.