Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: use ProgressEntry to store replication progress #614

Merged
merged 1 commit into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

RaftMsg::UpdateReplicationMatched {
RaftMsg::UpdateReplicationProgress {
target,
result,
session_id,
Expand Down Expand Up @@ -1328,7 +1328,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
async fn handle_update_matched(
&mut self,
target: C::NodeId,
result: Result<LogId<C::NodeId>, String>,
result: Result<ProgressEntry<C::NodeId>, String>,
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!(
target = display(target),
Expand All @@ -1346,17 +1346,17 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

let matched = match result {
Ok(matched) => matched,
let progress = match result {
Ok(p) => p,
Err(_err_str) => {
return Ok(());
}
};

self.engine.update_progress(target, Some(matched));
self.engine.update_progress(target, Some(progress.matching.unwrap()));
self.run_engine_commands::<Entry<C>>(&[]).await?;

self.update_replication_metrics(target, matched);
self.update_replication_metrics(target, progress.matching.unwrap());

Ok(())
}
Expand Down
21 changes: 19 additions & 2 deletions openraft/src/progress/entry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::borrow::Borrow;
use std::fmt::Display;
use std::fmt::Formatter;

use crate::summary::MessageSummary;
use crate::LogId;
Expand Down Expand Up @@ -86,7 +88,7 @@ impl<NID: NodeId> ProgressEntry<NID> {

if let Some(s) = &mut self.searching {
debug_assert!(conflict < s.end);
debug_assert!(conflict >= s.mid);
debug_assert!(conflict + 1 >= s.mid, "conflict can only be the prev_log_index");

s.end = conflict;

Expand All @@ -100,7 +102,9 @@ impl<NID: NodeId> ProgressEntry<NID> {
}
}

/// Return the starting log index range(`[start,end)`) for the next AppendEntries.
/// Return the index range(`[start,end]`) of the first log in the next AppendEntries.
///
/// The returned range is left close and right close.
#[allow(dead_code)]
pub(crate) fn sending_start(&self) -> (u64, u64) {
match self.searching {
Expand Down Expand Up @@ -138,6 +142,19 @@ impl<NID: NodeId> Borrow<Option<LogId<NID>>> for ProgressEntry<NID> {
}
}

impl<NID: NodeId> Display for ProgressEntry<NID> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match &self.searching {
None => {
write!(f, "{}", self.matching.summary())
}
Some(s) => {
write!(f, "[{}, {}, {})", self.matching.summary(), s.mid, s.end)
}
}
}
}

#[cfg(test)]
mod tests {
use std::borrow::Borrow;
Expand Down
7 changes: 4 additions & 3 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::membership::IntoNodes;
use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::node::Node;
use crate::progress::entry::ProgressEntry;
use crate::replication::ReplicationSessionId;
use crate::storage::Snapshot;
use crate::AppData;
Expand Down Expand Up @@ -919,13 +920,13 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor

/// Update the `matched` log id of a replication target.
/// Sent by a replication task `ReplicationCore`.
UpdateReplicationMatched {
UpdateReplicationProgress {
/// The ID of the target node for which the match index is to be updated.
target: C::NodeId,

/// Either the last log id that has been successfully replicated to the target,
/// or an error in string.
result: Result<LogId<C::NodeId>, String>,
result: Result<ProgressEntry<C::NodeId>, String>,

/// In which session this message is sent.
/// A replication session(vote,membership_log_id) should ignore message from other session.
Expand Down Expand Up @@ -1016,7 +1017,7 @@ where
RaftMsg::Tick { i } => {
format!("Tick {}", i)
}
RaftMsg::UpdateReplicationMatched {
RaftMsg::UpdateReplicationProgress {
ref target,
ref result,
ref session_id,
Expand Down
84 changes: 26 additions & 58 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,8 @@ pub(crate) struct ReplicationCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S
/// The log id of the highest log entry which is known to be committed in the cluster.
committed: Option<LogId<C::NodeId>>,

/// The last know log to be successfully replicated on the target.
///
/// This Raft implementation also uses a _conflict optimization_ pattern for reducing the
/// number of RPCs which need to be sent back and forth between a peer which is lagging
/// behind. This is defined in §5.3.
/// This will be initialized to the leader's (last_log_term, last_log_index), and will be updated as
/// replication proceeds.
matched: Option<LogId<C::NodeId>>,

/// The last possible matching entry on a follower.
max_possible_matched_index: Option<u64>,
/// Replication progress
progress: ProgressEntry<C::NodeId>,

/// if or not need to replicate log entries or states, e.g., `commit_index` etc.
need_to_replicate: bool,
Expand Down Expand Up @@ -139,8 +130,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
config,
target_repl_state: TargetReplState::LineRate,
committed,
matched: progress_entry.matching,
max_possible_matched_index: progress_entry.max_possible_matching(),
progress: progress_entry,
tx_raft_core,
rx_repl,
need_to_replicate: true,
Expand Down Expand Up @@ -221,11 +211,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
/// configured heartbeat interval.
#[tracing::instrument(level = "debug", skip(self))]
async fn send_append_entries(&mut self) -> Result<(), ReplicationError<C::NodeId, C::Node>> {
// find the mid position aligning to 8
let diff = self.max_possible_matched_index.next_index() - self.matched.next_index();
let offset = diff / 16 * 8;
let (start, _right) = self.progress.sending_start();

let mut prev_index = self.matched.index().add(offset);
let mut prev_index = if start == 0 { None } else { Some(start - 1) };

let (prev_log_id, logs, has_more_logs) = loop {
// TODO(xp): test heartbeat when all logs are removed.
Expand All @@ -245,8 +233,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
let end = std::cmp::min(start + self.config.max_payload_entries, last_log_index);

tracing::debug!(
?self.matched,
?self.max_possible_matched_index,
progress = display(&self.progress),
?last_purged,
?prev_index,
end,
Expand Down Expand Up @@ -321,15 +308,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
let repl_err = match err {
RPCError::NodeNotFound(e) => ReplicationError::NodeNotFound(e),
RPCError::Timeout(e) => {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
target: self.target,
result: Err(e.to_string()),
session_id: self.session_id,
});
ReplicationError::Timeout(e)
}
RPCError::Network(e) => {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
target: self.target,
result: Err(e.to_string()),
session_id: self.session_id,
Expand All @@ -344,7 +331,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
Err(timeout_err) => {
tracing::warn!(error=%timeout_err, "timeout while sending AppendEntries RPC to target");

let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
target: self.target,
result: Err(timeout_err.to_string()),
session_id: self.session_id,
Expand Down Expand Up @@ -385,13 +372,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
AppendEntriesResponse::Conflict => {
debug_assert!(conflict.is_some(), "prev_log_id=None never conflict");
let conflict = conflict.unwrap();

// Continue to find the matching log id on follower.
self.max_possible_matched_index = if conflict.index == 0 {
None
} else {
Some(conflict.index - 1)
};
self.progress.update_conflicting(conflict.index);

Ok(())
}
Expand All @@ -401,11 +382,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
/// max_possible_matched_index is the least index for `prev_log_id` to form a consecutive log sequence
#[tracing::instrument(level = "trace", skip_all)]
fn check_consecutive(&self, last_purged: Option<LogId<C::NodeId>>) -> Result<(), LackEntry<C::NodeId>> {
tracing::debug!(?last_purged, ?self.max_possible_matched_index, "check_consecutive");
tracing::debug!(?last_purged, progress = display(&self.progress), "check_consecutive");

if last_purged.index() > self.max_possible_matched_index {
if last_purged.index() > self.progress.max_possible_matching() {
return Err(LackEntry {
index: self.max_possible_matched_index,
index: self.progress.max_possible_matching(),
last_purged_log_id: last_purged,
});
}
Expand All @@ -424,26 +405,17 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
/// And also report the matched log id to RaftCore to commit an entry etc.
#[tracing::instrument(level = "trace", skip(self))]
fn update_matched(&mut self, new_matched: Option<LogId<C::NodeId>>) {
tracing::debug!(
self.max_possible_matched_index,
?self.matched,
?new_matched, "update_matched");

if self.max_possible_matched_index < new_matched.index() {
self.max_possible_matched_index = new_matched.index();
}
tracing::debug!(progress = display(&self.progress), ?new_matched, "update_matched");

if self.matched < new_matched {
self.matched = new_matched;
if self.progress.matching < new_matched {
self.progress.update_matching(new_matched);

tracing::debug!(target=%self.target, matched=?self.matched, "matched updated");
tracing::debug!(target=%self.target, progress=display(&self.progress), "matched updated");

let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationMatched {
target: self.target,
// `self.matched < new_matched` implies new_matched can not be None.
// Thus unwrap is safe.
result: Ok(self.matched.unwrap()),
let _ = self.tx_raft_core.send(RaftMsg::UpdateReplicationProgress {
session_id: self.session_id,
target: self.target,
result: Ok(self.progress),
});
}
}
Expand All @@ -453,7 +425,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
#[tracing::instrument(level = "trace", skip(self))]
pub(self) fn needs_snapshot(&self) -> bool {
let c = self.committed.next_index();
let m = self.matched.next_index();
let m = self.progress.matching.next_index();
let distance = c.saturating_sub(m);

#[allow(clippy::infallible_destructuring_match)]
Expand Down Expand Up @@ -514,7 +486,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
}
}
Replicate::Entries(last) => {
if last.index() > self.matched.index() {
if last.index() > self.progress.matching.index() {
self.need_to_replicate = true;
}
}
Expand Down Expand Up @@ -563,11 +535,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
pub async fn line_rate_loop(&mut self) -> Result<(), ReplicationError<C::NodeId, C::Node>> {
loop {
loop {
tracing::debug!(
"current matched: {:?} max_possible_matched_index: {:?}",
self.matched,
self.max_possible_matched_index
);
tracing::debug!("progress: {}", self.progress);

let res = self.send_append_entries().await;
tracing::debug!(target = display(self.target), res = debug(&res), "replication res",);
Expand All @@ -589,7 +557,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
}
}

if self.matched.index() == self.max_possible_matched_index {
if self.progress.searching.is_none() {
break;
}
}
Expand Down Expand Up @@ -741,9 +709,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
// If we just sent the final chunk of the snapshot, then transition to lagging state.
if done {
tracing::debug!(
"done install snapshot: snapshot last_log_id: {:?}, matched: {:?}",
"done install snapshot: snapshot last_log_id: {:?}, progress: {}",
snapshot.meta.last_log_id,
self.matched,
self.progress,
);

self.update_matched(snapshot.meta.last_log_id);
Expand Down