Skip to content

Commit

Permalink
Merge pull request #614 from drmingdrmer/40-repl-main
Browse files Browse the repository at this point in the history
Refactor: use ProgressEntry to store replication progress
  • Loading branch information
mergify[bot] committed Dec 9, 2022
2 parents 89c270d + 0657d76 commit bd52600
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 69 deletions.
12 changes: 6 additions & 6 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

RaftMsg::UpdateReplicationMatched {
RaftMsg::UpdateReplicationProgress {
target,
result,
session_id,
Expand Down Expand Up @@ -1328,7 +1328,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
async fn handle_update_matched(
&mut self,
target: C::NodeId,
result: Result<LogId<C::NodeId>, String>,
result: Result<ProgressEntry<C::NodeId>, String>,
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!(
target = display(target),
Expand All @@ -1346,17 +1346,17 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

let matched = match result {
Ok(matched) => matched,
let progress = match result {
Ok(p) => p,
Err(_err_str) => {
return Ok(());
}
};

self.engine.update_progress(target, Some(matched));
self.engine.update_progress(target, Some(progress.matching.unwrap()));
self.run_engine_commands::<Entry<C>>(&[]).await?;

self.update_replication_metrics(target, matched);
self.update_replication_metrics(target, progress.matching.unwrap());

Ok(())
}
Expand Down
21 changes: 19 additions & 2 deletions openraft/src/progress/entry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::borrow::Borrow;
use std::fmt::Display;
use std::fmt::Formatter;

use crate::summary::MessageSummary;
use crate::LogId;
Expand Down Expand Up @@ -86,7 +88,7 @@ impl<NID: NodeId> ProgressEntry<NID> {

if let Some(s) = &mut self.searching {
debug_assert!(conflict < s.end);
debug_assert!(conflict >= s.mid);
debug_assert!(conflict + 1 >= s.mid, "conflict can only be the prev_log_index");

s.end = conflict;

Expand All @@ -100,7 +102,9 @@ impl<NID: NodeId> ProgressEntry<NID> {
}
}

/// Return the starting log index range(`[start,end)`) for the next AppendEntries.
/// Return the index range(`[start,end]`) of the first log in the next AppendEntries.
///
/// The returned range is left close and right close.
#[allow(dead_code)]
pub(crate) fn sending_start(&self) -> (u64, u64) {
match self.searching {
Expand Down Expand Up @@ -138,6 +142,19 @@ impl<NID: NodeId> Borrow<Option<LogId<NID>>> for ProgressEntry<NID> {
}
}

impl<NID: NodeId> Display for ProgressEntry<NID> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match &self.searching {
None => {
write!(f, "{}", self.matching.summary())
}
Some(s) => {
write!(f, "[{}, {}, {})", self.matching.summary(), s.mid, s.end)
}
}
}
}

#[cfg(test)]
mod tests {
use std::borrow::Borrow;
Expand Down
7 changes: 4 additions & 3 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::membership::IntoNodes;
use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::node::Node;
use crate::progress::entry::ProgressEntry;
use crate::replication::ReplicationSessionId;
use crate::storage::Snapshot;
use crate::AppData;
Expand Down Expand Up @@ -919,13 +920,13 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor

/// Update the `matched` log id of a replication target.
/// Sent by a replication task `ReplicationCore`.
UpdateReplicationMatched {
UpdateReplicationProgress {
/// The ID of the target node for which the match index is to be updated.
target: C::NodeId,

/// Either the last log id that has been successfully replicated to the target,
/// or an error in string.
result: Result<LogId<C::NodeId>, String>,
result: Result<ProgressEntry<C::NodeId>, String>,

/// In which session this message is sent.
/// A replication session(vote,membership_log_id) should ignore message from other session.
Expand Down Expand Up @@ -1016,7 +1017,7 @@ where
RaftMsg::Tick { i } => {
format!("Tick {}", i)
}
RaftMsg::UpdateReplicationMatched {
RaftMsg::UpdateReplicationProgress {
ref target,
ref result,
ref session_id,
Expand Down
84 changes: 26 additions & 58 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,8 @@ pub(crate) struct ReplicationCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S
/// The log id of the highest log entry which is known to be committed in the cluster.
committed: Option<LogId<C::NodeId>>,

/// The last know log to be successfully replicated on the target.
///
/// This Raft implementation also uses a _conflict optimization_ pattern for reducing the
/// number of RPCs which need to be sent back and forth between a peer which is lagging
/// behind. This is defined in §5.3.
/// This will be initialized to the leader's (last_log_term, last_log_index), and will be updated as
/// replication proceeds.
matched: Option<LogId<C::NodeId>>,

/// The last possible matching entry on a follower.
max_possible_matched_index: Option<u64>,
/// Replication progress
progress: ProgressEntry<C::NodeId>,

/// if or not need to replicate log entries or states, e.g., `commit_index` etc.
need_to_replicate: bool,
Expand Down Expand Up @@ -139,8 +130,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
config,
target_repl_state: TargetReplState::LineRate,
committed,
matched: progress_entry.matching,
max_possible_matched_index: progress_entry.max_possible_matching(),
progress: progress_entry,
tx_raft_core,
rx_repl,
need_to_replicate: true,
Expand Down Expand Up @@ -221,11 +211,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
/// configured heartbeat interval.
#[tracing::instrument(level = "debug", skip(self))]
async fn send_append_entries(&mut self) -> Result<(), ReplicationError<C::NodeId, C::Node>> {
// find the mid position aligning to 8
let diff = self.max_possible_matched_index.next_index() - self.matched.next_index();
let offset = diff / 16 * 8;
let (start, _right) = self.progress.sending_start();

let mut prev_index = self.matched.index().add(offset);
let mut prev_index = if start == 0 { None } else { Some(start - 1) };

let (prev_log_id, logs, has_more_logs) = loop {
// TODO(xp): test heartbeat when all logs are removed.
Expand All @@ -245,8 +233,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
let end = std::cmp::min(start + self.config.max_payload_entries, last_log_index);

tracing::debug!(
?self.matched,
?self.max_possible_matched_index,
progress = display(&self.progress),
?last_purged,
?prev_index,
end,
Expand Down Expand Up @@ -321,15 +308,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
let repl_err = match err {
RPCError::NodeNotFound(e) => ReplicationError::NodeNotFound(e),
RPCError::Timeout(e) => {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
target: self.target,
result: Err(e.to_string()),
session_id: self.session_id,
});
ReplicationError::Timeout(e)
}
RPCError::Network(e) => {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
target: self.target,
result: Err(e.to_string()),
session_id: self.session_id,
Expand All @@ -344,7 +331,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
Err(timeout_err) => {
tracing::warn!(error=%timeout_err, "timeout while sending AppendEntries RPC to target");

let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
target: self.target,
result: Err(timeout_err.to_string()),
session_id: self.session_id,
Expand Down Expand Up @@ -385,13 +372,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
AppendEntriesResponse::Conflict => {
debug_assert!(conflict.is_some(), "prev_log_id=None never conflict");
let conflict = conflict.unwrap();

// Continue to find the matching log id on follower.
self.max_possible_matched_index = if conflict.index == 0 {
None
} else {
Some(conflict.index - 1)
};
self.progress.update_conflicting(conflict.index);

Ok(())
}
Expand All @@ -401,11 +382,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
/// max_possible_matched_index is the least index for `prev_log_id` to form a consecutive log sequence
#[tracing::instrument(level = "trace", skip_all)]
fn check_consecutive(&self, last_purged: Option<LogId<C::NodeId>>) -> Result<(), LackEntry<C::NodeId>> {
tracing::debug!(?last_purged, ?self.max_possible_matched_index, "check_consecutive");
tracing::debug!(?last_purged, progress = display(&self.progress), "check_consecutive");

if last_purged.index() > self.max_possible_matched_index {
if last_purged.index() > self.progress.max_possible_matching() {
return Err(LackEntry {
index: self.max_possible_matched_index,
index: self.progress.max_possible_matching(),
last_purged_log_id: last_purged,
});
}
Expand All @@ -424,26 +405,17 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
/// And also report the matched log id to RaftCore to commit an entry etc.
#[tracing::instrument(level = "trace", skip(self))]
fn update_matched(&mut self, new_matched: Option<LogId<C::NodeId>>) {
tracing::debug!(
self.max_possible_matched_index,
?self.matched,
?new_matched, "update_matched");

if self.max_possible_matched_index < new_matched.index() {
self.max_possible_matched_index = new_matched.index();
}
tracing::debug!(progress = display(&self.progress), ?new_matched, "update_matched");

if self.matched < new_matched {
self.matched = new_matched;
if self.progress.matching < new_matched {
self.progress.update_matching(new_matched);

tracing::debug!(target=%self.target, matched=?self.matched, "matched updated");
tracing::debug!(target=%self.target, progress=display(&self.progress), "matched updated");

let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
target: self.target,
// `self.matched < new_matched` implies new_matched can not be None.
// Thus unwrap is safe.
result: Ok(self.matched.unwrap()),
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
session_id: self.session_id,
target: self.target,
result: Ok(self.progress),
});
}
}
Expand All @@ -453,7 +425,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
#[tracing::instrument(level = "trace", skip(self))]
pub(self) fn needs_snapshot(&self) -> bool {
let c = self.committed.next_index();
let m = self.matched.next_index();
let m = self.progress.matching.next_index();
let distance = c.saturating_sub(m);

#[allow(clippy::infallible_destructuring_match)]
Expand Down Expand Up @@ -514,7 +486,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
}
}
Replicate::Entries(last) => {
if last.index() > self.matched.index() {
if last.index() > self.progress.matching.index() {
self.need_to_replicate = true;
}
}
Expand Down Expand Up @@ -563,11 +535,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
pub async fn line_rate_loop(&mut self) -> Result<(), ReplicationError<C::NodeId, C::Node>> {
loop {
loop {
tracing::debug!(
"current matched: {:?} max_possible_matched_index: {:?}",
self.matched,
self.max_possible_matched_index
);
tracing::debug!("progress: {}", self.progress);

let res = self.send_append_entries().await;
tracing::debug!(target = display(self.target), res = debug(&res), "replication res",);
Expand All @@ -589,7 +557,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
}
}

if self.matched.index() == self.max_possible_matched_index {
if self.progress.searching.is_none() {
break;
}
}
Expand Down Expand Up @@ -741,9 +709,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
// If we just sent the final chunk of the snapshot, then transition to lagging state.
if done {
tracing::debug!(
"done install snapshot: snapshot last_log_id: {:?}, matched: {:?}",
"done install snapshot: snapshot last_log_id: {:?}, progress: {}",
snapshot.meta.last_log_id,
self.matched,
self.progress,
);

self.update_matched(snapshot.meta.last_log_id);
Expand Down

0 comments on commit bd52600

Please sign in to comment.