Skip to content

Commit

Permalink
Change: error InProgress: add field committed
Browse files Browse the repository at this point in the history
- Refactor: Simplify Engine command executor
  • Loading branch information
drmingdrmer committed Jul 15, 2022
1 parent 1399b97 commit 2d1aff0
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 73 deletions.
48 changes: 0 additions & 48 deletions openraft/src/core/append_entries.rs

This file was deleted.

1 change: 0 additions & 1 deletion openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
//! Also it receives and execute `Command` emitted by `Engine` to apply raft state to underlying storage or forward
//! messages to other raft nodes.

mod append_entries;
mod install_snapshot;
mod internal_msg;
mod raft_core;
Expand Down
53 changes: 31 additions & 22 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
///
/// Adding a learner does not affect election, thus it does not need to enter joint consensus.
///
/// TODO: It has to wait for the previous membership to commit.
/// TODO: Otherwise a second proposed membership implies the previous one is committed.
/// TODO: Test it.
/// TODO: This limit can be removed if membership_state is replaced by a list of membership logs.
/// TODO: Because allowing this requires the engine to be able to store more than 2 membership logs.
/// And it does not need to wait for the previous membership log to commit to propose the new membership log.
///
/// If `blocking` is `true`, the result is sent to `tx` as the target node log has caught up. Otherwise, result is
/// sent at once, no matter whether the target node log is lagging or not.
#[tracing::instrument(level = "debug", skip_all)]
pub(super) async fn add_learner(
&mut self,
Expand Down Expand Up @@ -553,13 +555,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
Ok(())
}

/// return true if there is pending uncommitted config change
fn has_pending_config(&self) -> bool {
// The last membership config is not committed yet.
// Can not process the next one.
self.engine.state.committed < self.engine.state.membership_state.effective.log_id
}

/// Submit change-membership by writing a Membership log entry, if the `expect` is satisfied.
///
/// If `turn_to_learner` is `true`, removed `voter` will becomes `learner`. Otherwise they will be just removed.
Expand All @@ -582,13 +577,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
return Ok(());
}

if self.has_pending_config() {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::InProgress(InProgress {
// has_pending_config() implies an existing membership log.
membership_log_id: self.engine.state.membership_state.effective.log_id.unwrap(),
}),
)));
let res = self.check_membership_committed();
if let Err(e) = res {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(e)));
return Ok(());
}

Expand Down Expand Up @@ -631,6 +622,20 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
Ok(())
}

/// Check if the effective membership is committed, so that a new membership is allowed to be proposed.
fn check_membership_committed(&self) -> Result<(), ChangeMembershipError<C::NodeId>> {
let st = &self.engine.state;

if st.is_membership_committed() {
return Ok(());
}

Err(ChangeMembershipError::InProgress(InProgress {
committed: st.committed,
membership_log_id: st.membership_state.effective.log_id,
}))
}

/// return Ok if all the current replication states satisfy the `expectation` for changing membership.
fn check_replication_states<'n>(
&self,
Expand Down Expand Up @@ -1005,11 +1010,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

tracing::debug!(last_applied = display(last_applied), "update last_applied");

if let Some(leader_data) = &mut self.leader_data {
if let Some(l) = &mut self.leader_data {
let mut results = apply_results.into_iter();

for log_index in since..end {
let tx = leader_data.client_resp_channels.remove(&log_index);
let tx = l.client_resp_channels.remove(&log_index);

let i = log_index - since;
let entry = &entries[i as usize];
Expand All @@ -1023,6 +1028,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
Ok(())
}

/// Send result of applying a log entry to its client.
#[tracing::instrument(level = "debug", skip_all)]
pub(super) async fn send_response(
entry: &Entry<C>,
Expand Down Expand Up @@ -1460,7 +1466,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

match msg {
RaftMsg::AppendEntries { rpc, tx } => {
let _ = tx.send(self.handle_append_entries_request(rpc).await.extract_fatal()?);
let resp =
self.engine.handle_append_entries_req(&rpc.vote, rpc.prev_log_id, &rpc.entries, rpc.leader_commit);
self.run_engine_commands(rpc.entries.as_slice()).await?;
let _ = tx.send(Ok(resp));
}
RaftMsg::RequestVote { rpc, tx } => {
let _ = tx.send(self.handle_vote_request(rpc).await.extract_fatal()?);
Expand Down Expand Up @@ -1773,8 +1782,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
self.leader_commit(i).await?;
}
}
Command::FollowerCommit { upto: _, .. } => {
self.replicate_to_state_machine_if_needed().await?;
Command::FollowerCommit { upto, .. } => {
self.apply_to_state_machine(upto.index).await?;
}
Command::ReplicateInputEntries { range } => {
if let Some(last) = range.clone().last() {
Expand Down
6 changes: 4 additions & 2 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ where E: TryInto<Fatal<NID>> + Clone
}
}

// TODO: not used
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum AppendEntriesError<NID: NodeId> {
Expand Down Expand Up @@ -368,9 +369,10 @@ pub struct QuorumNotEnough<NID: NodeId> {

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[error("the cluster is already undergoing a configuration change at log {membership_log_id}")]
#[error("the cluster is already undergoing a configuration change at log {membership_log_id:?}, committed log id: {committed:?}")]
pub struct InProgress<NID: NodeId> {
pub membership_log_id: LogId<NID>,
pub committed: Option<LogId<NID>>,
pub membership_log_id: Option<LogId<NID>>,
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/raft_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ where NID: NodeId
self.leader = Some(Leader::new(em.clone(), em.learner_ids()));
}

/// Return true if the currently effective membership is committed.
pub(crate) fn is_membership_committed(&self) -> bool {
self.committed >= self.membership_state.effective.log_id
}

/// Update field `committed` if the input is greater.
/// If updated, it returns the previous value in a `Some()`.
pub(crate) fn update_committed(&mut self, committed: &Option<LogId<NID>>) -> Option<Option<LogId<NID>>> {
Expand Down
55 changes: 55 additions & 0 deletions openraft/src/raft_state_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use std::sync::Arc;

use maplit::btreeset;

use crate::engine::LogIdList;
use crate::EffectiveMembership;
use crate::LeaderId;
use crate::LogId;
use crate::Membership;
use crate::MembershipState;
use crate::RaftState;

fn log_id(term: u64, index: u64) -> LogId<u64> {
Expand All @@ -10,6 +17,10 @@ fn log_id(term: u64, index: u64) -> LogId<u64> {
}
}

fn m12() -> Membership<u64> {
Membership::new(vec![btreeset! {1,2}], None)
}

#[test]
fn test_raft_state_has_log_id_empty() -> anyhow::Result<()> {
let rs = RaftState::default();
Expand Down Expand Up @@ -100,3 +111,47 @@ fn test_raft_state_last_purged_log_id() -> anyhow::Result<()> {

Ok(())
}

#[test]
fn test_raft_state_is_membership_committed() -> anyhow::Result<()> {
//
let rs = RaftState::<u64> {
committed: None,
membership_state: MembershipState {
committed: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())),
effective: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())),
},
..Default::default()
};

assert!(
!rs.is_membership_committed(),
"committed == effective, but not consider this"
);

let rs = RaftState::<u64> {
committed: Some(log_id(2, 2)),
membership_state: MembershipState {
committed: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())),
effective: Arc::new(EffectiveMembership::new(Some(log_id(2, 2)), m12())),
},
..Default::default()
};

assert!(
rs.is_membership_committed(),
"committed != effective, but rs.committed == effective.log_id"
);

let rs = RaftState::<u64> {
committed: Some(log_id(2, 2)),
membership_state: MembershipState {
committed: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())),
effective: Arc::new(EffectiveMembership::new(Some(log_id(3, 3)), m12())),
},
..Default::default()
};

assert!(!rs.is_membership_committed(), "rs.committed < effective.log_id");
Ok(())
}

0 comments on commit 2d1aff0

Please sign in to comment.