Skip to content

Commit

Permalink
Change: add ChangeMembershipError sub error for reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jan 17, 2022
1 parent bbb47ce commit beeae72
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 35 deletions.
16 changes: 10 additions & 6 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use crate::core::State;
use crate::error::AddLearnerError;
use crate::error::ChangeMembershipError;
use crate::error::ClientWriteError;
use crate::error::EmptyMembership;
use crate::error::InProgress;
use crate::error::InitializeError;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::raft::AddLearnerResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::EntryPayload;
Expand Down Expand Up @@ -116,7 +120,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Ensure cluster will have at least one node.
if members.is_empty() {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::EmptyMembership,
ChangeMembershipError::EmptyMembership(EmptyMembership {}),
)));
return Ok(());
}
Expand All @@ -125,9 +129,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Can not process the next one.
if self.core.committed < Some(self.core.effective_membership.log_id) {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::InProgress {
ChangeMembershipError::InProgress(InProgress {
membership_log_id: self.core.effective_membership.log_id,
},
}),
)));
return Ok(());
}
Expand Down Expand Up @@ -160,11 +164,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
if !blocking {
// Node has repl stream, but is not yet ready to join.
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::LearnerIsLagging {
ChangeMembershipError::LearnerIsLagging(LearnerIsLagging {
node_id: *new_node,
matched: node.matched,
distance: self.core.last_log_id.next_index().saturating_sub(node.matched.next_index()),
},
}),
)));
return Ok(());
}
Expand All @@ -173,7 +177,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Node does not yet have a repl stream, spawn one.
None => {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::LearnerNotFound { node_id: *new_node },
ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: *new_node }),
)));
return Ok(());
}
Expand Down
59 changes: 39 additions & 20 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,28 +97,24 @@ pub enum ClientWriteError {
}

/// The set of errors which may take place when requesting to propose a config change.
#[derive(Debug, Clone, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
pub enum ChangeMembershipError {
#[error("the cluster is already undergoing a configuration change at log {membership_log_id}")]
InProgress { membership_log_id: LogId },
#[error(transparent)]
InProgress(#[from] InProgress),

#[error("new membership can not be empty")]
EmptyMembership,
#[error(transparent)]
EmptyMembership(#[from] EmptyMembership),

// TODO(xp): 111 test it
#[error("to add a member {node_id} first need to add it as learner")]
LearnerNotFound { node_id: NodeId },
#[error(transparent)]
LearnerNotFound(#[from] LearnerNotFound),

// TODO(xp): 111 test it
#[error("replication to learner {node_id} is lagging {distance}, matched: {matched:?}, can not add as member")]
LearnerIsLagging {
node_id: NodeId,
matched: Option<LogId>,
distance: u64,
},
#[error(transparent)]
LearnerIsLagging(#[from] LearnerIsLagging),
}

#[derive(Debug, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
pub enum AddLearnerError {
#[error(transparent)]
ForwardToLeader(#[from] ForwardToLeader),
Expand All @@ -131,8 +127,7 @@ pub enum AddLearnerError {
}

/// The set of errors which may take place when initializing a pristine Raft node.
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
pub enum InitializeError {
/// The requested action is not allowed due to the Raft node's current state.
#[error("the requested action is not allowed due to the Raft node's current state")]
Expand Down Expand Up @@ -222,28 +217,52 @@ pub enum ReplicationError {
},
}

#[derive(Debug, Clone, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("store has no log at: {index:?}")]
pub struct LackEntry {
pub index: Option<u64>,
}

#[derive(Debug, Clone, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("has to forward request to: {leader_id:?}")]
pub struct ForwardToLeader {
pub leader_id: Option<NodeId>,
}

#[derive(Debug, Clone, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("snapshot segment id mismatch, expect: {expect}, got: {got}")]
pub struct SnapshotMismatch {
pub expect: SnapshotSegmentId,
pub got: SnapshotSegmentId,
}

#[derive(Debug, Clone, thiserror::Error)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("not enough for a quorum, cluster: {cluster}, got: {got:?}")]
pub struct QuorumNotEnough {
pub cluster: String,
pub got: BTreeSet<NodeId>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("the cluster is already undergoing a configuration change at log {membership_log_id}")]
pub struct InProgress {
pub membership_log_id: LogId,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("to add a member {node_id} first need to add it as learner")]
pub struct LearnerNotFound {
pub node_id: NodeId,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("replication to learner {node_id} is lagging {distance}, matched: {matched:?}, can not add as member")]
pub struct LearnerIsLagging {
pub node_id: NodeId,
pub matched: Option<LogId>,
pub distance: u64,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("new membership can not be empty")]
pub struct EmptyMembership {}
14 changes: 5 additions & 9 deletions openraft/tests/membership/t20_change_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,11 @@ async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> {
let err: ChangeMembershipError = err.try_into().unwrap();

match err {
ChangeMembershipError::LearnerIsLagging {
node_id,
matched: _,
distance,
} => {
tracing::info!(distance, "--- distance");
assert_eq!(1, node_id);
assert!(distance >= lag_threshold);
assert!(distance < 500);
ChangeMembershipError::LearnerIsLagging(e) => {
tracing::info!(e.distance, "--- distance");
assert_eq!(1, e.node_id);
assert!(e.distance >= lag_threshold);
assert!(e.distance < 500);
}
_ => {
panic!("expect ChangeMembershipError::NonVoterNotFound");
Expand Down

0 comments on commit beeae72

Please sign in to comment.