Skip to content

Commit

Permalink
Refactor: impl initialize() in Engine
Browse files Browse the repository at this point in the history
Move input config checking into Engine.

Minor refactor for Engine: move common codes to standalone method.
  • Loading branch information
drmingdrmer committed Apr 16, 2022
1 parent 82e0597 commit 5946bcc
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 52 deletions.
15 changes: 3 additions & 12 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use crate::error::InProgress;
use crate::error::InitializeError;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::error::MissingNodeInfo;
use crate::metrics::RemoveTarget;
use crate::raft::AddLearnerResponse;
use crate::raft::ChangeMembers;
Expand All @@ -43,19 +42,11 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Learner
#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn handle_init_with_config(
&mut self,
members: BTreeMap<C::NodeId, Option<Node>>,
member_nodes: BTreeMap<C::NodeId, Option<Node>>,
) -> Result<(), InitializeError<C::NodeId>> {
let node_ids = members.keys().cloned().collect::<BTreeSet<C::NodeId>>();

if !node_ids.contains(&self.core.id) {
let e = MissingNodeInfo {
node_id: self.core.id,
reason: "target should be a member".to_string(),
};
return Err(InitializeError::MissingNodeInfo(e));
}
let member_ids = member_nodes.keys().cloned().collect::<BTreeSet<C::NodeId>>();

let membership = Membership::with_nodes(vec![node_ids], members)?;
let membership = Membership::with_nodes(vec![member_ids], member_nodes)?;
let payload = EntryPayload::<C>::Membership(membership);

let mut entry_refs = [EntryRef::new(&payload)];
Expand Down
95 changes: 55 additions & 40 deletions openraft/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use maplit::btreeset;

use crate::entry::RaftEntry;
use crate::error::InitializeError;
use crate::error::MissingNodeInfo;
use crate::error::NotAllowed;
use crate::storage::InitialState;
use crate::EffectiveMembership;
Expand Down Expand Up @@ -116,30 +117,6 @@ impl<NID: NodeId> Engine<NID> {
}
}

/// Check if a raft node is in a state that allows to initialize.
///
/// It is allowed to initialize only when `last_log_id.is_none()` and `vote==(term=0, node_id=0)`.
/// See: [Conditions for initialization](https://datafuselabs.github.io/openraft/cluster-formation.html#conditions-for-initialization)
pub(crate) fn check_initialize(&self) -> Result<(), InitializeError<NID>> {
if self.state.last_log_id.is_none() && self.state.vote == Vote::default() {
return Ok(());
}

tracing::error!(?self.state.last_log_id, ?self.state.vote, "Can not initialize");

Err(InitializeError::NotAllowed(NotAllowed {
last_log_id: self.state.last_log_id,
vote: self.state.vote,
}))
}

fn next_log_id(&mut self) -> LogId<NID> {
let log_id = LogId::new(self.state.vote.leader_id(), self.state.last_log_id.next_index());
self.state.last_log_id = Some(log_id);

log_id
}

/// Initialize a node by appending the first log.
///
/// - The first log has to be membership config log.
Expand All @@ -154,19 +131,17 @@ impl<NID: NodeId> Engine<NID> {

self.check_initialize()?;

let entry = &mut entries[0];
let log_id = self.next_log_id();
entry.set_log_id(&log_id);

tracing::debug!("append initialization log id: {}", log_id);
self.assign_log_ids(entries.iter_mut());

self.commands.push(Command::AppendInputEntries { range: 0..l });
self.metrics_flags.set_data_changed();

assert!(
entry.get_membership().is_some(),
"Initialization log has to be a membership config entry"
);
let entry = &mut entries[0];
if let Some(m) = entry.get_membership() {
self.check_members_contain_me(m)?;
} else {
panic!("Initialization log has to be a membership config entry");
}
self.try_update_membership(entry);

// TODO: set target state. This should be done by Engine but currently it is not.
Expand All @@ -183,18 +158,13 @@ impl<NID: NodeId> Engine<NID> {
/// TODO(xp): metrics flag needs to be dealt with.
/// TODO(xp): if vote indicates this node is not the leader, refuse append
#[tracing::instrument(level = "debug", skip(self, entries))]
pub(crate) fn leader_append_entries<Ent: RaftEntry<NID>>(&mut self, entries: &mut [Ent]) {
pub(crate) fn leader_append_entries<'a, Ent: RaftEntry<NID> + 'a>(&mut self, entries: &mut [Ent]) {
let l = entries.len();
if l == 0 {
return;
}

for entry in entries.iter_mut() {
let log_id = self.next_log_id();
entry.set_log_id(&log_id);

tracing::debug!("append log id: {}", log_id);
}
self.assign_log_ids(entries.iter_mut());

self.commands.push(Command::AppendInputEntries { range: 0..l });
self.metrics_flags.set_data_changed();
Expand Down Expand Up @@ -256,4 +226,49 @@ impl<NID: NodeId> Engine<NID> {
self.metrics_flags.set_cluster_changed();
}
}

/// Check if a raft node is in a state that allows to initialize.
///
/// It is allowed to initialize only when `last_log_id.is_none()` and `vote==(term=0, node_id=0)`.
/// See: [Conditions for initialization](https://datafuselabs.github.io/openraft/cluster-formation.html#conditions-for-initialization)
fn check_initialize(&self) -> Result<(), NotAllowed<NID>> {
if self.state.last_log_id.is_none() && self.state.vote == Vote::default() {
return Ok(());
}

tracing::error!(?self.state.last_log_id, ?self.state.vote, "Can not initialize");

Err(NotAllowed {
last_log_id: self.state.last_log_id,
vote: self.state.vote,
})
}

/// When initialize, the node that accept initialize request has to be a member of the initial config.
fn check_members_contain_me(&self, m: &Membership<NID>) -> Result<(), MissingNodeInfo<NID>> {
if !m.is_member(&self.id) {
let e = MissingNodeInfo {
node_id: self.id,
reason: "target should be a member".to_string(),
};
Err(e)
} else {
Ok(())
}
}

fn assign_log_ids<'a, Ent: RaftEntry<NID> + 'a>(&mut self, entries: impl Iterator<Item = &'a mut Ent>) {
for entry in entries {
let log_id = self.next_log_id();
entry.set_log_id(&log_id);
tracing::debug!("assign log id: {}", log_id);
}
}

fn next_log_id(&mut self) -> LogId<NID> {
let log_id = LogId::new(self.state.vote.leader_id(), self.state.last_log_id.next_index());
self.state.last_log_id = Some(log_id);

log_id
}
}

0 comments on commit 5946bcc

Please sign in to comment.