Skip to content

Commit

Permalink
Fix: check_is_leader() should return at once if encountering StorageE…
Browse files Browse the repository at this point in the history
…rror

Refactor: ExtractFatal is not used any more. Fatal error should only be
raised by Command executor, no more by API handler. There is no need to
extract Fatal error from an API error.
  • Loading branch information
drmingdrmer committed Feb 12, 2023
1 parent 9ac025d commit 9dbbe14
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 41 deletions.
23 changes: 10 additions & 13 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use crate::error::ChangeMembershipError;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
use crate::error::EmptyMembership;
use crate::error::ExtractFatal;
use crate::error::Fatal;
use crate::error::ForwardToLeader;
use crate::error::InProgress;
Expand Down Expand Up @@ -234,28 +233,27 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
pub(super) async fn handle_check_is_leader_request(
&mut self,
tx: RaftRespTx<(), CheckIsLeaderError<C::NodeId, C::Node>>,
) {
) -> Result<(), StorageError<C::NodeId>> {
// Setup sentinel values to track when we've received majority confirmation of leadership.

let em = self.engine.state.membership_state.effective();
let mut granted = btreeset! {self.id};

if em.is_quorum(granted.iter()) {
let _ = tx.send(Ok(()));
return;
return Ok(());
}

// Spawn parallel requests, all with the standard timeout for heartbeats.
let mut pending = FuturesUnordered::new();

let voter_progresses = if let Some(l) = &self.engine.internal_server_state.leading() {
let voter_progresses = {
let l = &self.engine.internal_server_state.leading().unwrap();
l.progress
.iter()
.filter(|(id, _v)| l.progress.is_voter(id) == Some(true))
.copied()
.collect::<Vec<_>>()
} else {
unreachable!("it has to be a leader!!!");
};

for (target, progress) in voter_progresses {
Expand Down Expand Up @@ -329,10 +327,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// request.
if let AppendEntriesResponse::HigherVote(vote) = data {
let res = self.engine.vote_handler().handle_message_vote(&vote);
if let Err(e) = self.run_engine_commands::<Entry<C>>(&[]).await.extract_fatal() {
let _ = tx.send(Err(e.into()));
return;
}
self.run_engine_commands::<Entry<C>>(&[]).await?;

if let Err(e) = res {
// simply ignore stale responses
tracing::warn!(target = display(target), "vote {vote} rejected: {e}");
Expand All @@ -341,7 +337,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// we are no longer leader so error out early
if !self.engine.state.is_leader(&self.engine.config.id) {
self.reject_with_forward_to_leader(tx);
return;
return Ok(());
}
}

Expand All @@ -350,7 +346,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let mem = &self.engine.state.membership_state.effective();
if mem.is_quorum(granted.iter()) {
let _ = tx.send(Ok(()));
return;
return Ok(());
}
}

Expand All @@ -362,6 +358,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
got: granted,
}
.into()));
Ok(())
}

/// Add a new node to the cluster as a learner, bringing it up-to-speed, and then responding
Expand Down Expand Up @@ -1193,7 +1190,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
RaftMsg::CheckIsLeaderRequest { tx } => {
if self.engine.state.is_leader(&self.engine.config.id) {
self.handle_check_is_leader_request(tx).await;
self.handle_check_is_leader_request(tx).await?;
} else {
self.reject_with_forward_to_leader(tx);
}
Expand Down
28 changes: 0 additions & 28 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,6 @@ where NID: NodeId
Stopped,
}

/// Extract Fatal from a Result.
///
/// Fatal will shutdown the raft and needs to be dealt separately,
/// such as StorageError.
pub trait ExtractFatal<NID>
where
Self: Sized,
NID: NodeId,
{
fn extract_fatal(self) -> Result<Self, Fatal<NID>>;
}

impl<NID, T, E> ExtractFatal<NID> for Result<T, E>
where
NID: NodeId,
E: TryInto<Fatal<NID>> + Clone,
{
fn extract_fatal(self) -> Result<Self, Fatal<NID>> {
if let Err(e) = &self {
let fatal = e.clone().try_into();
if let Ok(f) = fatal {
return Err(f);
}
}
Ok(self)
}
}

// TODO: not used, remove
#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)]
#[derive(PartialEq, Eq)]
Expand Down

0 comments on commit 9dbbe14

Please sign in to comment.