Skip to content

Commit

Permalink
Merge pull request #796 from drmingdrmer/54-refact
Browse files Browse the repository at this point in the history
Refactor: collect replication response in variant `Notify::Network`
  • Loading branch information
drmingdrmer committed Apr 25, 2023
2 parents fc14fc2 + 2c35043 commit a28d552
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 111 deletions.
78 changes: 21 additions & 57 deletions openraft/src/core/notify.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::core::sm;
use crate::raft::VoteResponse;
use crate::replication::ReplicationResult;
use crate::replication::ReplicationSessionId;
use crate::replication;
use crate::MessageSummary;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Vote;

/// A message coming from the internal components.
Expand All @@ -19,41 +17,7 @@ where C: RaftTypeConfig
vote: Vote<C::NodeId>,
},

/// A tick event to wake up RaftCore to check timeout etc.
Tick {
/// ith tick
i: u64,
},

// /// Logs that are submitted to append has been persisted to disk.
// LogPersisted {},
/// Update the `matched` log id of a replication target.
/// Sent by a replication task `ReplicationCore`.
UpdateReplicationProgress {
/// The ID of the target node for which the match index is to be updated.
target: C::NodeId,

/// The id of the subject that submit this replication action.
///
/// It is only used for debugging purpose.
id: u64,

/// Either the last log id that has been successfully replicated to the target,
/// or an error in string.
result: Result<ReplicationResult<C::NodeId>, String>,

/// In which session this message is sent.
/// A replication session(vote,membership_log_id) should ignore message from other session.
session_id: ReplicationSessionId<C::NodeId>,
},

/// [`StorageError`] error has taken place locally(not on remote node) when replicating, and
/// [`RaftCore`] needs to shutdown. Sent by a replication task
/// [`crate::replication::ReplicationCore`].
ReplicationStorageError { error: StorageError<C::NodeId> },

/// ReplicationCore has seen a higher `vote`.
/// Sent by a replication task `ReplicationCore`.
/// Seen a higher `vote`.
HigherVote {
/// The ID of the target node from which the new term was observed.
target: C::NodeId,
Expand All @@ -68,33 +32,28 @@ where C: RaftTypeConfig
// membership_log_id: Option<LogId<C::NodeId>>,
},

/// Result of executing a command sent from network worker.
Network { response: replication::Response<C> },

/// Result of executing a command sent from state machine worker.
StateMachine { command_result: sm::CommandResult<C> },

/// A tick event to wake up RaftCore to check timeout etc.
Tick {
/// ith tick
i: u64,
},
}

impl<C> MessageSummary<Notify<C>> for Notify<C>
where C: RaftTypeConfig
{
fn summary(&self) -> String {
match self {
Notify::VoteResponse { target, resp, vote } => {
Self::VoteResponse { target, resp, vote } => {
format!("VoteResponse: from: {}: {}, res-vote: {}", target, resp.summary(), vote)
}
Notify::Tick { i } => {
format!("Tick {}", i)
}
Notify::UpdateReplicationProgress {
ref target,
ref id,
ref result,
ref session_id,
} => {
format!(
"UpdateReplicationProgress: target: {}, id: {}, result: {:?}, session_id: {}",
target, id, result, session_id,
)
}
Notify::HigherVote {
Self::HigherVote {
ref target,
higher: ref new_vote,
ref vote,
Expand All @@ -104,9 +63,14 @@ where C: RaftTypeConfig
target, new_vote, vote
)
}
Notify::ReplicationStorageError { error } => format!("ReplicationFatal: {}", error),
Notify::StateMachine { command_result: done } => {
format!("StateMachine command done: {:?}", done)
Self::Network { response } => {
format!("Replication command done: {}", response.summary())
}
Self::StateMachine { command_result } => {
format!("StateMachine command done: {:?}", command_result)
}
Self::Tick { i } => {
format!("Tick {}", i)
}
}
}
Expand Down
88 changes: 55 additions & 33 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::VoteTx;
use crate::raft_state::LogStateReader;
use crate::replication;
use crate::replication::Replicate;
use crate::replication::ReplicationCore;
use crate::replication::ReplicationHandle;
Expand Down Expand Up @@ -1225,6 +1226,22 @@ where
self.handle_vote_resp(resp, target);
}
}

Notify::HigherVote { target, higher, vote } => {
tracing::info!(
target = display(target),
higher_vote = display(&higher),
sending_vote = display(&vote),
"received Notify::HigherVote: {}",
func_name!()
);

if self.does_vote_match(&vote, "HigherVote") {
// Rejected vote change is ok.
let _ = self.engine.vote_handler().handle_message_vote(&higher);
}
}

Notify::Tick { i } => {
// check every timer

Expand Down Expand Up @@ -1275,31 +1292,46 @@ where
}
}

Notify::HigherVote { target, higher, vote } => {
tracing::info!(
target = display(target),
higher_vote = display(&higher),
sending_vote = display(&vote),
"received Notify::HigherVote: {}",
func_name!()
);
Notify::Network { response } => {
//
match response {
replication::Response::Progress {
target,
id,
result,
session_id,
} => {
// If vote or membership changes, ignore the message.
// There is chance delayed message reports a wrong state.
if self.does_replication_session_match(&session_id, "UpdateReplicationMatched") {
self.handle_replication_progress(target, id, result);
}
}

if self.does_vote_match(&vote, "HigherVote") {
// Rejected vote change is ok.
let _ = self.engine.vote_handler().handle_message_vote(&higher);
}
}
replication::Response::StorageError { error } => {
tracing::error!(
error = display(&error),
"received Notify::ReplicationStorageError: {}",
func_name!()
);

Notify::UpdateReplicationProgress {
target,
id,
result,
session_id,
} => {
// If vote or membership changes, ignore the message.
// There is chance delayed message reports a wrong state.
if self.does_replication_session_match(&session_id, "UpdateReplicationMatched") {
self.handle_replication_progress(target, id, result);
return Err(Fatal::from(error));
}

replication::Response::HigherVote { target, higher, vote } => {
tracing::info!(
target = display(target),
higher_vote = display(&higher),
sending_vote = display(&vote),
"received Notify::HigherVote: {}",
func_name!()
);

if self.does_vote_match(&vote, "HigherVote") {
// Rejected vote change is ok.
let _ = self.engine.vote_handler().handle_message_vote(&higher);
}
}
}
}

Expand Down Expand Up @@ -1339,16 +1371,6 @@ where
}
}
}

Notify::ReplicationStorageError { error } => {
tracing::error!(
error = display(&error),
"received Notify::ReplicationStorageError: {}",
func_name!()
);

return Err(Fatal::from(error));
}
};
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ where
}
}

// TODO: move to other mod, it is shared by log, sm and replication
/// A sequence number of a state machine command.
///
/// It is used to identify and consume a submitted command when the command callback is received by
Expand Down
57 changes: 36 additions & 21 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Replication stream.

mod replication_session_id;

mod response;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io::SeekFrom;
Expand All @@ -10,6 +10,7 @@ use std::sync::Arc;
use anyerror::AnyError;
use futures::future::FutureExt;
pub(crate) use replication_session_id::ReplicationSessionId;
pub(crate) use response::Response;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeek;
Expand Down Expand Up @@ -200,27 +201,33 @@ where
return Err(closed);
}
ReplicationError::HigherVote(h) => {
let _ = self.tx_raft_core.send(Notify::HigherVote {
target: self.target,
higher: h.higher,
vote: self.session_id.vote,
let _ = self.tx_raft_core.send(Notify::Network {
response: Response::HigherVote {
target: self.target,
higher: h.higher,
vote: self.session_id.vote,
},
});
return Ok(());
}
ReplicationError::StorageError(error) => {
tracing::error!(error=%error, "error replication to target={}", self.target);

// TODO: report this error
let _ = self.tx_raft_core.send(Notify::ReplicationStorageError { error });
let _ = self.tx_raft_core.send(Notify::Network {
response: Response::StorageError { error },
});
return Ok(());
}
ReplicationError::RPCError(err) => {
tracing::error!(err = display(&err), "RPCError");
let _ = self.tx_raft_core.send(Notify::UpdateReplicationProgress {
target: self.target,
id: repl_id,
result: Err(err.to_string()),
session_id: self.session_id,
let _ = self.tx_raft_core.send(Notify::Network {
response: Response::Progress {
target: self.target,
id: repl_id,
result: Err(err.to_string()),
session_id: self.session_id,
},
});

// If there is an [`Unreachable`] error, we will backoff for a period of time
Expand Down Expand Up @@ -350,11 +357,15 @@ where
"update_conflicting"
);

let _ = self.tx_raft_core.send(Notify::UpdateReplicationProgress {
session_id: self.session_id,
id,
target: self.target,
result: Ok(ReplicationResult::Conflict(conflict)),
let _ = self.tx_raft_core.send({
Notify::Network {
response: Response::Progress {
session_id: self.session_id,
id,
target: self.target,
result: Ok(ReplicationResult::Conflict(conflict)),
},
}
});
}

Expand All @@ -375,11 +386,15 @@ where
if self.matching < new_matching {
self.matching = new_matching;

let _ = self.tx_raft_core.send(Notify::UpdateReplicationProgress {
session_id: self.session_id,
id,
target: self.target,
result: Ok(ReplicationResult::Matching(new_matching)),
let _ = self.tx_raft_core.send({
Notify::Network {
response: Response::Progress {
session_id: self.session_id,
id,
target: self.target,
result: Ok(ReplicationResult::Matching(new_matching)),
},
}
});
}
}
Expand Down
Loading

0 comments on commit a28d552

Please sign in to comment.