Skip to content

Commit

Permalink
Merge pull request #612 from drmingdrmer/50-server-state
Browse files Browse the repository at this point in the history
Refactor: ProgressEntry: support updating matching and conflict
  • Loading branch information
drmingdrmer committed Dec 6, 2022
2 parents eb92486 + be22f86 commit 42e2ff3
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 69 deletions.
15 changes: 8 additions & 7 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use crate::raft_types::LogIdOptionExt;
use crate::raft_types::RaftLogId;
use crate::replication::Replicate;
use crate::replication::ReplicationCore;
use crate::replication::ReplicationSessionId;
use crate::replication::ReplicationStream;
use crate::runtime::RaftRuntime;
use crate::storage::RaftSnapshotBuilder;
Expand Down Expand Up @@ -644,7 +645,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
snapshot: self.engine.snapshot_meta.last_log_id,

// --- cluster ---
state: self.engine.calc_server_state(),
state: self.engine.state.server_state,
current_leader: self.current_leader(),
membership_config: self.engine.state.membership_state.effective.clone(),

Expand Down Expand Up @@ -944,10 +945,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let membership_log_id = self.engine.state.membership_state.effective.log_id;
let network = self.network.new_client(target, target_node).await?;

let session_id = ReplicationSessionId::new(self.engine.state.vote, membership_log_id);

Ok(ReplicationCore::<C, N, S>::spawn(
target,
self.engine.state.vote,
membership_log_id,
session_id,
self.config.clone(),
self.engine.state.committed,
progress_entry,
Expand Down Expand Up @@ -1299,13 +1301,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
RaftMsg::UpdateReplicationMatched {
target,
result,
vote,
membership_log_id,
session_id,
} => {
if self.does_vote_match(vote, "UpdateReplicationMatched") {
if self.does_vote_match(session_id.vote, "UpdateReplicationMatched") {
// If membership changes, ignore the message.
// There is chance delayed message reports a wrong state.
if membership_log_id == self.engine.state.membership_state.effective.log_id {
if session_id.membership_log_id == self.engine.state.membership_state.effective.log_id {
self.handle_update_matched(target, result).await?;
}
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,7 @@ where

/// The node is candidate or leader
fn is_leading(&self) -> bool {
self.state.internal_server_state.is_leading()
self.state.vote.node_id == self.id
}

pub(crate) fn is_leader(&self) -> bool {
Expand Down
147 changes: 143 additions & 4 deletions openraft/src/progress/entry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Borrow;

use crate::summary::MessageSummary;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::NodeId;
Expand Down Expand Up @@ -33,23 +34,81 @@ impl<NID: NodeId> ProgressEntry<NID> {
searching: None,
}
}

/// Create a progress entry that does not have any matching log id.
///
/// It's going to initiate a binary search to find the minimal matching log id.
pub(crate) fn empty(end: u64) -> Self {
let searching = if end == 0 {
None
} else {
Some(Searching {
mid: Self::calc_mid(0, end),
end,
})
};

Self {
matching: None,
searching: Some(Searching { mid: end / 2, end }),
searching,
}
}

pub(crate) fn update_matching(&mut self, matching: Option<LogId<NID>>) {
tracing::debug!(
self = debug(&self),
matching = display(matching.summary()),
"update_matching"
);

debug_assert!(matching >= self.matching);

self.matching = matching;

if let Some(s) = &self.searching {
if matching.next_index() >= s.end {
if let Some(s) = &mut self.searching {
let next = matching.next_index();

if next >= s.end {
self.searching = None;
} else {
s.mid = Self::calc_mid(next, s.end);
}
}
}

#[allow(dead_code)]
pub(crate) fn update_conflicting(&mut self, conflict: u64) {
tracing::debug!(self = debug(&self), conflict = display(conflict), "update_conflict");

let matching_next = self.matching.next_index();

debug_assert!(conflict >= matching_next);

if let Some(s) = &mut self.searching {
debug_assert!(conflict < s.end);
debug_assert!(conflict >= s.mid);

s.end = conflict;

if matching_next >= s.end {
self.searching = None;
} else {
s.mid = Self::calc_mid(matching_next, s.end);
}
} else {
unreachable!("found conflict({}) when searching is None", conflict);
}
}

// TODO: update mid
/// Return the starting log index range(`[start,end)`) for the next AppendEntries.
#[allow(dead_code)]
pub(crate) fn sending_start(&self) -> (u64, u64) {
match self.searching {
None => {
let next = self.matching.next_index();
(next, next)
}
Some(s) => (s.mid, s.end),
}
}

Expand All @@ -64,10 +123,90 @@ impl<NID: NodeId> ProgressEntry<NID> {
self.matching.index()
}
}

fn calc_mid(matching_next: u64, end: u64) -> u64 {
debug_assert!(matching_next < end);
let d = end - matching_next;
let offset = d / 16 * 8;
matching_next + offset
}
}

impl<NID: NodeId> Borrow<Option<LogId<NID>>> for ProgressEntry<NID> {
fn borrow(&self) -> &Option<LogId<NID>> {
&self.matching
}
}

#[cfg(test)]
mod tests {
use std::borrow::Borrow;

use crate::progress::entry::ProgressEntry;
use crate::LeaderId;
use crate::LogId;

fn log_id(index: u64) -> LogId<u64> {
LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
index,
}
}

#[test]
fn test_update_matching() -> anyhow::Result<()> {
let mut pe = ProgressEntry::empty(20);
assert_eq!(&None, pe.borrow());
assert_eq!((8, 20), pe.sending_start());

pe.update_matching(None);
assert_eq!(&None, pe.borrow());
assert_eq!((8, 20), pe.sending_start());

pe.update_matching(Some(log_id(0)));
assert_eq!(&Some(log_id(0)), pe.borrow());
assert_eq!((9, 20), pe.sending_start());

pe.update_matching(Some(log_id(0)));
assert_eq!(&Some(log_id(0)), pe.borrow());
assert_eq!((9, 20), pe.sending_start());

pe.update_matching(Some(log_id(1)));
assert_eq!(&Some(log_id(1)), pe.borrow());
assert_eq!((10, 20), pe.sending_start());

pe.update_matching(Some(log_id(4)));
assert_eq!(&Some(log_id(4)), pe.borrow());
assert_eq!((5, 20), pe.sending_start());

// All logs are matching
pe.update_matching(Some(log_id(19)));
assert_eq!(&Some(log_id(19)), pe.borrow());
assert_eq!((20, 20), pe.sending_start());

pe.update_matching(Some(log_id(20)));
assert_eq!(&Some(log_id(20)), pe.borrow());
assert_eq!((21, 21), pe.sending_start());
Ok(())
}

#[test]
fn test_update_conflict() -> anyhow::Result<()> {
let mut pe = ProgressEntry::empty(20);

pe.update_matching(Some(log_id(4)));
assert_eq!(&Some(log_id(4)), pe.borrow());
assert_eq!((5, 20), pe.sending_start());

pe.update_conflicting(19);
assert_eq!(&Some(log_id(4)), pe.borrow());
assert_eq!((5, 19), pe.sending_start());

pe.update_conflicting(5);
assert_eq!(&Some(log_id(4)), pe.borrow());
assert_eq!((5, 5), pe.sending_start());
assert!(pe.searching.is_none());

Ok(())
}
}
16 changes: 7 additions & 9 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::membership::IntoNodes;
use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::node::Node;
use crate::replication::ReplicationSessionId;
use crate::storage::Snapshot;
use crate::AppData;
use crate::AppDataResponse;
Expand Down Expand Up @@ -926,11 +927,9 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor
/// or an error in string.
result: Result<LogId<C::NodeId>, String>,

/// Which ServerState sent this message
vote: Vote<C::NodeId>,

/// The cluster this replication works for.
membership_log_id: Option<LogId<C::NodeId>>,
/// In which session this message is sent.
/// A replication session(vote,membership_log_id) should ignore message from other session.
session_id: ReplicationSessionId<C::NodeId>,
},

/// ReplicationCore has seen a higher `vote`.
Expand Down Expand Up @@ -1020,15 +1019,14 @@ where
RaftMsg::UpdateReplicationMatched {
ref target,
ref result,
ref vote,
ref membership_log_id,
ref session_id,
} => {
format!(
"UpdateMatchIndex: target: {}, result: {:?}, server_state_vote: {}, membership_log_id: {}",
target,
result,
vote,
membership_log_id.summary()
session_id.vote,
session_id.membership_log_id.summary()
)
}
RaftMsg::HigherVote {
Expand Down
Loading

0 comments on commit 42e2ff3

Please sign in to comment.