diff --git a/examples/raft-kv-memstore/src/client.rs b/examples/raft-kv-memstore/src/client.rs index b83c253de..e878816c3 100644 --- a/examples/raft-kv-memstore/src/client.rs +++ b/examples/raft-kv-memstore/src/client.rs @@ -7,15 +7,16 @@ use openraft::error::AddLearnerError; use openraft::error::CheckIsLeaderError; use openraft::error::ClientWriteError; use openraft::error::ForwardToLeader; -use openraft::error::Infallible; use openraft::error::InitializeError; use openraft::error::NetworkError; use openraft::error::RPCError; +use openraft::error::RaftError; use openraft::error::RemoteError; use openraft::raft::AddLearnerResponse; use openraft::raft::ClientWriteResponse; use openraft::BasicNode; use openraft::RaftMetrics; +use openraft::TryAsRef; use reqwest::Client; use serde::de::DeserializeOwned; use serde::Deserialize; @@ -60,7 +61,7 @@ impl ExampleClient { req: &ExampleRequest, ) -> Result< ClientWriteResponse, - RPCError>, + RPCError>>, > { self.send_rpc_to_leader("write", Some(req)).await } @@ -68,7 +69,10 @@ impl ExampleClient { /// Read value by key, in an inconsistent mode. /// /// This method may return stale value because it does not force to read on a legal leader. - pub async fn read(&self, req: &String) -> Result> { + pub async fn read( + &self, + req: &String, + ) -> Result>> { self.do_send_rpc_to_leader("read", Some(req)).await } @@ -78,7 +82,10 @@ impl ExampleClient { pub async fn consistent_read( &self, req: &String, - ) -> Result>> { + ) -> Result< + String, + RPCError>>, + > { self.do_send_rpc_to_leader("consistent_read", Some(req)).await } @@ -92,7 +99,10 @@ impl ExampleClient { /// Then make the new node a member with [`change_membership`]. pub async fn init( &self, - ) -> Result<(), RPCError>> { + ) -> Result< + (), + RPCError>>, + > { self.do_send_rpc_to_leader("init", Some(&Empty {})).await } @@ -104,7 +114,7 @@ impl ExampleClient { req: (ExampleNodeId, String), ) -> Result< AddLearnerResponse, - RPCError>, + RPCError>>, > { self.send_rpc_to_leader("add-learner", Some(&req)).await } @@ -118,7 +128,7 @@ impl ExampleClient { req: &BTreeSet, ) -> Result< ClientWriteResponse, - RPCError>, + RPCError>>, > { self.send_rpc_to_leader("change-membership", Some(req)).await } @@ -130,7 +140,8 @@ impl ExampleClient { /// See [`RaftMetrics`]. pub async fn metrics( &self, - ) -> Result, RPCError> { + ) -> Result, RPCError>> + { self.do_send_rpc_to_leader("metrics", None::<&()>).await } @@ -145,7 +156,7 @@ impl ExampleClient { &self, uri: &str, req: Option<&Req>, - ) -> Result> + ) -> Result>> where Req: Serialize + 'static, Resp: Serialize + DeserializeOwned, @@ -179,7 +190,8 @@ impl ExampleClient { } }; - let res: Result = resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let res: Result> = + resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; tracing::debug!( "<<< client recv reply from {}: {}", url, @@ -197,21 +209,22 @@ impl ExampleClient { &self, uri: &str, req: Option<&Req>, - ) -> Result> + ) -> Result>> where Req: Serialize + 'static, Resp: Serialize + DeserializeOwned, Err: std::error::Error + Serialize + DeserializeOwned - + TryInto> + + TryAsRef> + Clone, { // Retry at most 3 times to find a valid leader. let mut n_retry = 3; loop { - let res: Result> = self.do_send_rpc_to_leader(uri, req).await; + let res: Result>> = + self.do_send_rpc_to_leader(uri, req).await; let rpc_err = match res { Ok(x) => return Ok(x), @@ -219,19 +232,18 @@ impl ExampleClient { }; if let RPCError::RemoteError(remote_err) = &rpc_err { - let forward_err_res = - >>::try_into(remote_err.source.clone()); + let raft_err = &remote_err.source; - if let Ok(ForwardToLeader { + if let Some(ForwardToLeader { leader_id: Some(leader_id), leader_node: Some(leader_node), .. - }) = forward_err_res + }) = raft_err.forward_to_leader() { // Update target to the new leader. { let mut t = self.leader.lock().unwrap(); - *t = (leader_id, leader_node.addr); + *t = (*leader_id, leader_node.addr.clone()); } n_retry -= 1; diff --git a/examples/raft-kv-memstore/src/network/api.rs b/examples/raft-kv-memstore/src/network/api.rs index e31d75673..f90812fa4 100644 --- a/examples/raft-kv-memstore/src/network/api.rs +++ b/examples/raft-kv-memstore/src/network/api.rs @@ -4,6 +4,7 @@ use actix_web::web::Data; use actix_web::Responder; use openraft::error::CheckIsLeaderError; use openraft::error::Infallible; +use openraft::error::RaftError; use openraft::BasicNode; use web::Json; @@ -46,7 +47,8 @@ pub async fn consistent_read(app: Data, req: Json) -> actix_ let key = req.0; let value = state_machine.data.get(&key).cloned(); - let res: Result> = Ok(value.unwrap_or_default()); + let res: Result>> = + Ok(value.unwrap_or_default()); Ok(Json(res)) } Err(e) => Ok(Json(Err(e))), diff --git a/examples/raft-kv-memstore/src/network/raft_network_impl.rs b/examples/raft-kv-memstore/src/network/raft_network_impl.rs index bc8ef8dbe..82c48d784 100644 --- a/examples/raft-kv-memstore/src/network/raft_network_impl.rs +++ b/examples/raft-kv-memstore/src/network/raft_network_impl.rs @@ -1,10 +1,9 @@ use async_trait::async_trait; -use openraft::error::AppendEntriesError; use openraft::error::InstallSnapshotError; use openraft::error::NetworkError; use openraft::error::RPCError; +use openraft::error::RaftError; use openraft::error::RemoteError; -use openraft::error::VoteError; use openraft::raft::AppendEntriesRequest; use openraft::raft::AppendEntriesResponse; use openraft::raft::InstallSnapshotRequest; @@ -86,10 +85,8 @@ impl RaftNetwork for ExampleNetworkConnection { async fn send_append_entries( &mut self, req: AppendEntriesRequest, - ) -> Result< - AppendEntriesResponse, - RPCError>, - > { + ) -> Result, RPCError>> + { self.owner.send_rpc(self.target, &self.target_node, "raft-append", req).await } @@ -98,7 +95,7 @@ impl RaftNetwork for ExampleNetworkConnection { req: InstallSnapshotRequest, ) -> Result< InstallSnapshotResponse, - RPCError>, + RPCError>, > { self.owner.send_rpc(self.target, &self.target_node, "raft-snapshot", req).await } @@ -106,7 +103,7 @@ impl RaftNetwork for ExampleNetworkConnection { async fn send_vote( &mut self, req: VoteRequest, - ) -> Result, RPCError>> { + ) -> Result, RPCError>> { self.owner.send_rpc(self.target, &self.target_node, "raft-vote", req).await } } diff --git a/examples/raft-kv-memstore/tests/cluster/test_cluster.rs b/examples/raft-kv-memstore/tests/cluster/test_cluster.rs index e47ce3a81..5d18b2c48 100644 --- a/examples/raft-kv-memstore/tests/cluster/test_cluster.rs +++ b/examples/raft-kv-memstore/tests/cluster/test_cluster.rs @@ -1,4 +1,6 @@ +use std::backtrace::Backtrace; use std::collections::BTreeMap; +use std::panic::PanicInfo; use std::thread; use std::time::Duration; @@ -11,6 +13,38 @@ use raft_kv_memstore::store::ExampleRequest; use tokio::runtime::Runtime; use tracing_subscriber::EnvFilter; +pub fn log_panic(panic: &PanicInfo) { + let backtrace = { + format!("{:?}", Backtrace::force_capture()) + // #[cfg(feature = "bt")] + // { + // format!("{:?}", Backtrace::force_capture()) + // } + // + // #[cfg(not(feature = "bt"))] + // { + // "backtrace is disabled without --features 'bt'".to_string() + // } + }; + + eprintln!("{}", panic); + + if let Some(location) = panic.location() { + tracing::error!( + message = %panic, + backtrace = %backtrace, + panic.file = location.file(), + panic.line = location.line(), + panic.column = location.column(), + ); + eprintln!("{}:{}:{}", location.file(), location.line(), location.column()); + } else { + tracing::error!(message = %panic, backtrace = %backtrace); + } + + eprintln!("{}", backtrace); +} + /// Setup a cluster of 3 nodes. /// Write to it and read from it. #[tokio::test(flavor = "multi_thread", worker_threads = 8)] @@ -20,6 +54,10 @@ async fn test_cluster() -> anyhow::Result<()> { // This is only used by the client. A raft node in this example stores node addresses in its // store. + std::panic::set_hook(Box::new(|panic| { + log_panic(panic); + })); + tracing_subscriber::fmt() .with_target(true) .with_thread_ids(true) diff --git a/examples/raft-kv-rocksdb/src/client.rs b/examples/raft-kv-rocksdb/src/client.rs index ec032e468..6a9cce762 100644 --- a/examples/raft-kv-rocksdb/src/client.rs +++ b/examples/raft-kv-rocksdb/src/client.rs @@ -6,14 +6,15 @@ use openraft::error::AddLearnerError; use openraft::error::CheckIsLeaderError; use openraft::error::ClientWriteError; use openraft::error::ForwardToLeader; -use openraft::error::Infallible; use openraft::error::InitializeError; use openraft::error::NetworkError; use openraft::error::RPCError; +use openraft::error::RaftError; use openraft::error::RemoteError; use openraft::raft::AddLearnerResponse; use openraft::raft::ClientWriteResponse; use openraft::RaftMetrics; +use openraft::TryAsRef; use reqwest::Client; use serde::de::DeserializeOwned; use serde::Deserialize; @@ -58,7 +59,7 @@ impl ExampleClient { req: &ExampleRequest, ) -> Result< ClientWriteResponse, - RPCError>, + RPCError>>, > { self.send_rpc_to_leader("api/write", Some(req)).await } @@ -66,7 +67,10 @@ impl ExampleClient { /// Read value by key, in an inconsistent mode. /// /// This method may return stale value because it does not force to read on a legal leader. - pub async fn read(&self, req: &String) -> Result> { + pub async fn read( + &self, + req: &String, + ) -> Result>> { self.do_send_rpc_to_leader("api/read", Some(req)).await } @@ -76,7 +80,10 @@ impl ExampleClient { pub async fn consistent_read( &self, req: &String, - ) -> Result>> { + ) -> Result< + String, + RPCError>>, + > { self.do_send_rpc_to_leader("api/consistent_read", Some(req)).await } @@ -90,7 +97,10 @@ impl ExampleClient { /// Then make the new node a member with [`change_membership`]. pub async fn init( &self, - ) -> Result<(), RPCError>> { + ) -> Result< + (), + RPCError>>, + > { self.do_send_rpc_to_leader("cluster/init", Some(&Empty {})).await } @@ -102,7 +112,7 @@ impl ExampleClient { req: (ExampleNodeId, String, String), ) -> Result< AddLearnerResponse, - RPCError>, + RPCError>>, > { self.send_rpc_to_leader("cluster/add-learner", Some(&req)).await } @@ -116,7 +126,7 @@ impl ExampleClient { req: &BTreeSet, ) -> Result< ClientWriteResponse, - RPCError>, + RPCError>>, > { self.send_rpc_to_leader("cluster/change-membership", Some(req)).await } @@ -128,7 +138,8 @@ impl ExampleClient { /// See [`RaftMetrics`]. pub async fn metrics( &self, - ) -> Result, RPCError> { + ) -> Result, RPCError>> + { self.do_send_rpc_to_leader("cluster/metrics", None::<&()>).await } @@ -188,21 +199,21 @@ impl ExampleClient { &self, uri: &str, req: Option<&Req>, - ) -> Result> + ) -> Result>> where Req: Serialize + 'static, Resp: Serialize + DeserializeOwned, Err: std::error::Error + Serialize + DeserializeOwned - + TryInto> + + TryAsRef> + Clone, { // Retry at most 3 times to find a valid leader. let mut n_retry = 3; loop { - let res: Result> = + let res: Result>> = self.do_send_rpc_to_leader(uri, req).await; let rpc_err = match res { @@ -211,20 +222,19 @@ impl ExampleClient { }; if let RPCError::RemoteError(remote_err) = &rpc_err { - let forward_err_res = - >>::try_into(remote_err.source.clone()); + let raft_err: &RaftError = &remote_err.source; - if let Ok(ForwardToLeader { + if let Some(ForwardToLeader { leader_id: Some(leader_id), leader_node: Some(leader_node), .. - }) = forward_err_res + }) = raft_err.forward_to_leader() { // Update target to the new leader. { let mut t = self.leader.lock().unwrap(); let api_addr = leader_node.api_addr.clone(); - *t = (leader_id, api_addr); + *t = (*leader_id, api_addr); } n_retry -= 1; diff --git a/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs b/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs index a554c4183..7402d3ace 100644 --- a/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs +++ b/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs @@ -2,12 +2,11 @@ use std::any::Any; use std::fmt::Display; use async_trait::async_trait; -use openraft::error::AppendEntriesError; use openraft::error::InstallSnapshotError; use openraft::error::NetworkError; use openraft::error::RPCError; +use openraft::error::RaftError; use openraft::error::RemoteError; -use openraft::error::VoteError; use openraft::raft::AppendEntriesRequest; use openraft::raft::AppendEntriesResponse; use openraft::raft::InstallSnapshotRequest; @@ -100,10 +99,8 @@ impl RaftNetwork for ExampleNetworkConnection { async fn send_append_entries( &mut self, req: AppendEntriesRequest, - ) -> Result< - AppendEntriesResponse, - RPCError>, - > { + ) -> Result, RPCError>> + { self.c().await?.raft().append(req).await.map_err(|e| to_error(e, self.target)) } @@ -112,7 +109,7 @@ impl RaftNetwork for ExampleNetworkConnection { req: InstallSnapshotRequest, ) -> Result< InstallSnapshotResponse, - RPCError>, + RPCError>, > { self.c().await?.raft().snapshot(req).await.map_err(|e| to_error(e, self.target)) } @@ -120,7 +117,7 @@ impl RaftNetwork for ExampleNetworkConnection { async fn send_vote( &mut self, req: VoteRequest, - ) -> Result, RPCError>> { + ) -> Result, RPCError>> { self.c().await?.raft().vote(req).await.map_err(|e| to_error(e, self.target)) } } diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 2688a3dbd..659f3f4a9 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -4,10 +4,9 @@ use std::sync::Arc; use tokio::sync::oneshot; -use crate::error::AppendEntriesError; +use crate::error::Infallible; use crate::error::InitializeError; use crate::error::InstallSnapshotError; -use crate::error::VoteError; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; use crate::raft::AppendEntriesResponse; @@ -132,19 +131,19 @@ where // --- /// Send vote result `res` to its caller via `tx` SendVoteResult { - send: SendResult, VoteError>>, + send: SendResult, Infallible>>, }, /// Send append-entries result `res` to its caller via `tx` SendAppendEntriesResult { - send: SendResult, AppendEntriesError>>, + send: SendResult, Infallible>>, }, // TODO: use it #[allow(dead_code)] /// Send install-snapshot result `res` to its caller via `tx` SendInstallSnapshotResult { - send: SendResult, InstallSnapshotError>>, + send: SendResult, InstallSnapshotError>>, }, /// Send install-snapshot result `res` to its caller via `tx` diff --git a/openraft/src/error.rs b/openraft/src/error.rs index fd582d84a..6f3c860b1 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -10,6 +10,7 @@ use anyerror::AnyError; use crate::node::Node; use crate::raft::AppendEntriesResponse; use crate::raft_types::SnapshotSegmentId; +use crate::try_as_ref::TryAsRef; use crate::LogId; use crate::Membership; use crate::NodeId; @@ -17,56 +18,108 @@ use crate::RPCTypes; use crate::StorageError; use crate::Vote; -/// Fatal is unrecoverable and shuts down raft at once. +/// RaftError is returned by API methods of `Raft`. +/// +/// It is either a Fatal error indicating that `Raft` is no longer running, such as underlying IO +/// error, or an API error `E`. #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub enum Fatal +#[cfg_attr( + feature = "serde", + derive(serde::Deserialize, serde::Serialize), + serde(bound = "E:serde::Serialize + for <'d> serde::Deserialize<'d>") +)] +pub enum RaftError where NID: NodeId { #[error(transparent)] - StorageError(#[from] StorageError), + APIError(E), - #[error("panicked")] - Panicked, - - #[error("raft stopped")] - Stopped, + #[error(transparent)] + Fatal(#[from] Fatal), } -// TODO: not used, remove -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] -#[derive(PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub enum AppendEntriesError -where NID: NodeId +impl RaftError +where + NID: NodeId, + E: Debug, { - #[error(transparent)] - Fatal(#[from] Fatal), + /// Return a reference to Self::APIError. + pub fn api_error(&self) -> Option<&E> { + match self { + RaftError::APIError(e) => Some(e), + RaftError::Fatal(_) => None, + } + } + + pub fn into_api_error(self) -> Option { + match self { + RaftError::APIError(e) => Some(e), + RaftError::Fatal(_) => None, + } + } + + /// Return a reference to Self::Fatal. + pub fn fatal(&self) -> Option<&Fatal> { + match self { + RaftError::APIError(_) => None, + RaftError::Fatal(f) => Some(f), + } + } + + pub fn into_fatal(self) -> Option> { + match self { + RaftError::APIError(_) => None, + RaftError::Fatal(f) => Some(f), + } + } + + /// Return a reference to ForwardToLeader if Self::APIError contains it. + pub fn forward_to_leader(&self) -> Option<&ForwardToLeader> + where + N: Node, + E: TryAsRef>, + { + match self { + RaftError::APIError(api_err) => api_err.try_as_ref(), + RaftError::Fatal(_) => None, + } + } + + pub fn into_forward_to_leader(self) -> Option> + where + N: Node, + E: TryInto>, + { + match self { + RaftError::APIError(api_err) => api_err.try_into().ok(), + RaftError::Fatal(_) => None, + } + } } -// TODO: not used, remove -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] -#[derive(PartialEq, Eq)] +/// Fatal is unrecoverable and shuts down raft at once. +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub enum VoteError +pub enum Fatal where NID: NodeId { #[error(transparent)] - Fatal(#[from] Fatal), + StorageError(#[from] StorageError), + + #[error("panicked")] + Panicked, + + #[error("raft stopped")] + Stopped, } // TODO: remove #[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] #[derive(PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub enum InstallSnapshotError -where NID: NodeId -{ +pub enum InstallSnapshotError { #[error(transparent)] SnapshotMismatch(#[from] SnapshotMismatch), - - #[error(transparent)] - Fatal(#[from] Fatal), } /// An error related to a is_leader request. @@ -82,9 +135,19 @@ where #[error(transparent)] QuorumNotEnough(#[from] QuorumNotEnough), +} - #[error(transparent)] - Fatal(#[from] Fatal), +impl TryAsRef> for CheckIsLeaderError +where + NID: NodeId, + N: Node, +{ + fn try_as_ref(&self) -> Option<&ForwardToLeader> { + match self { + Self::ForwardToLeader(f) => Some(f), + _ => None, + } + } } /// An error related to a client write request. @@ -102,9 +165,19 @@ where /// When writing a change-membership entry. #[error(transparent)] ChangeMembershipError(#[from] ChangeMembershipError), +} - #[error(transparent)] - Fatal(#[from] Fatal), +impl TryAsRef> for ClientWriteError +where + NID: NodeId, + N: Node, +{ + fn try_as_ref(&self) -> Option<&ForwardToLeader> { + match self { + Self::ForwardToLeader(f) => Some(f), + _ => None, + } + } } /// The set of errors which may take place when requesting to propose a config change. @@ -137,9 +210,19 @@ where // TODO: do we really need this error? An app may check an target node if it wants to. #[error(transparent)] NetworkError(#[from] NetworkError), +} - #[error(transparent)] - Fatal(#[from] Fatal), +impl TryAsRef> for AddLearnerError +where + NID: NodeId, + N: Node, +{ + fn try_as_ref(&self) -> Option<&ForwardToLeader> { + match self { + Self::ForwardToLeader(f) => Some(f), + _ => None, + } + } } impl TryFrom> for ForwardToLeader @@ -170,64 +253,6 @@ where #[error(transparent)] NotInMembers(#[from] NotInMembers), - - #[error(transparent)] - Fatal(#[from] Fatal), -} - -impl From> for AppendEntriesError -where NID: NodeId -{ - fn from(s: StorageError) -> Self { - let f: Fatal = s.into(); - f.into() - } -} -impl From> for VoteError -where NID: NodeId -{ - fn from(s: StorageError) -> Self { - let f: Fatal = s.into(); - f.into() - } -} -impl From> for InstallSnapshotError -where NID: NodeId -{ - fn from(s: StorageError) -> Self { - let f: Fatal = s.into(); - f.into() - } -} -impl From> for CheckIsLeaderError -where - NID: NodeId, - N: Node, -{ - fn from(s: StorageError) -> Self { - let f: Fatal = s.into(); - f.into() - } -} -impl From> for InitializeError -where - NID: NodeId, - N: Node, -{ - fn from(s: StorageError) -> Self { - let f: Fatal = s.into(); - f.into() - } -} -impl From> for AddLearnerError -where - NID: NodeId, - N: Node, -{ - fn from(s: StorageError) -> Self { - let f: Fatal = s.into(); - f.into() - } } /// Error variants related to the Replication. @@ -250,9 +275,10 @@ where StorageError(#[from] StorageError), #[error(transparent)] - RPCError(#[from] RPCError>), + RPCError(#[from] RPCError>), } +/// Error occurs when invoking a remote raft API. #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[cfg_attr( feature = "serde", @@ -274,7 +300,6 @@ pub enum RPCError { #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] #[error("error occur on remote peer {target}: {source}")] pub struct RemoteError { - // #[serde(bound = "")] #[cfg_attr(feature = "serde", serde(bound = ""))] pub target: NID, #[cfg_attr(feature = "serde", serde(bound = ""))] @@ -431,6 +456,12 @@ pub struct EmptyMembership {} #[error("infallible")] pub enum Infallible {} +/// A place holder to mark RaftError won't have a ForwardToLeader variant. +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +#[error("no-forward")] +pub enum NoForward {} + #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub(crate) enum RejectVoteRequest { diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index e3f6dd213..2222fb6a0 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -62,6 +62,7 @@ mod runtime; pub mod storage; pub mod testing; pub mod timer; +mod try_as_ref; pub(crate) mod validate; pub mod versioned; @@ -71,6 +72,7 @@ pub use anyerror; pub use anyerror::AnyError; pub use async_trait; pub use metrics::ReplicationTargetMetrics; +pub use try_as_ref::TryAsRef; pub use crate::change_members::ChangeMembers; pub use crate::config::Config; diff --git a/openraft/src/network.rs b/openraft/src/network.rs index 8db5149f4..782ba04fa 100644 --- a/openraft/src/network.rs +++ b/openraft/src/network.rs @@ -5,10 +5,9 @@ use std::fmt::Formatter; use async_trait::async_trait; -use crate::error::AppendEntriesError; use crate::error::InstallSnapshotError; use crate::error::RPCError; -use crate::error::VoteError; +use crate::error::RaftError; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::raft::InstallSnapshotRequest; @@ -49,19 +48,22 @@ where C: RaftTypeConfig async fn send_append_entries( &mut self, rpc: AppendEntriesRequest, - ) -> Result, RPCError>>; + ) -> Result, RPCError>>; /// Send an InstallSnapshot RPC to the target Raft node (§7). async fn send_install_snapshot( &mut self, rpc: InstallSnapshotRequest, - ) -> Result, RPCError>>; + ) -> Result< + InstallSnapshotResponse, + RPCError>, + >; /// Send a RequestVote RPC to the target Raft node (§5). async fn send_vote( &mut self, rpc: VoteRequest, - ) -> Result, RPCError>>; + ) -> Result, RPCError>>; } /// A trait defining the interface for a Raft network factory to create connections between cluster diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 62a34b7f3..b245fd927 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -31,13 +31,13 @@ use crate::core::VoteWiseTime; use crate::engine::Engine; use crate::engine::EngineConfig; use crate::error::AddLearnerError; -use crate::error::AppendEntriesError; use crate::error::CheckIsLeaderError; use crate::error::ClientWriteError; use crate::error::Fatal; +use crate::error::Infallible; use crate::error::InitializeError; use crate::error::InstallSnapshotError; -use crate::error::VoteError; +use crate::error::RaftError; use crate::membership::IntoNodes; use crate::metrics::RaftMetrics; use crate::metrics::Wait; @@ -341,7 +341,7 @@ impl, S: RaftStorage> Raft, - ) -> Result, AppendEntriesError> { + ) -> Result, RaftError> { tracing::debug!(rpc = display(rpc.summary()), "Raft::append_entries"); let (tx, rx) = oneshot::channel(); @@ -353,7 +353,7 @@ impl, S: RaftStorage> Raft) -> Result, VoteError> { + pub async fn vote(&self, rpc: VoteRequest) -> Result, RaftError> { tracing::debug!(rpc = display(rpc.summary()), "Raft::vote()"); let (tx, rx) = oneshot::channel(); @@ -368,7 +368,7 @@ impl, S: RaftStorage> Raft, - ) -> Result, InstallSnapshotError> { + ) -> Result, RaftError> { tracing::debug!(rpc = display(rpc.summary()), "Raft::install_snapshot()"); let (tx, rx) = oneshot::channel(); @@ -391,7 +391,7 @@ impl, S: RaftStorage> Raft Result<(), CheckIsLeaderError> { + pub async fn is_leader(&self) -> Result<(), RaftError>> { let (tx, rx) = oneshot::channel(); self.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await } @@ -418,7 +418,7 @@ impl, S: RaftStorage> Raft Result, ClientWriteError> { + ) -> Result, RaftError>> { let (tx, rx) = oneshot::channel(); self.call_core( RaftMsg::ClientWriteRequest { @@ -452,8 +452,13 @@ impl, S: RaftStorage> Raft(&self, members: T) -> Result<(), InitializeError> - where T: IntoNodes + Debug { + pub async fn initialize( + &self, + members: T, + ) -> Result<(), RaftError>> + where + T: IntoNodes + Debug, + { let (tx, rx) = oneshot::channel(); self.call_core( RaftMsg::Initialize { @@ -490,7 +495,7 @@ impl, S: RaftStorage> Raft Result, AddLearnerError> { + ) -> Result, RaftError>> { let (tx, rx) = oneshot::channel(); let resp = self.call_core(RaftMsg::AddLearner { id, node, tx }, rx).await?; @@ -617,7 +622,7 @@ impl, S: RaftStorage> Raft>, allow_lagging: bool, turn_to_learner: bool, - ) -> Result, ClientWriteError> { + ) -> Result, RaftError>> { let changes: ChangeMembers = members.into(); tracing::info!( @@ -683,8 +688,14 @@ impl, S: RaftStorage> Raft(&self, mes: RaftMsg, rx: RaftRespRx) -> Result - where E: From> + Debug { + pub(crate) async fn call_core( + &self, + mes: RaftMsg, + rx: oneshot::Receiver>, + ) -> Result> + where + E: Debug, + { let sum = if tracing::enabled!(Level::DEBUG) { None } else { @@ -695,18 +706,18 @@ impl, S: RaftStorage> Raft x, + Ok(x) => x.map_err(|e| RaftError::APIError(e)), Err(_) => { let fatal = self.get_core_stopped_error("receiving rx from RaftCore", sum).await; tracing::error!(error = debug(&fatal), "core_call fatal error"); - Err(fatal.into()) + Err(RaftError::Fatal(fatal)) } } } @@ -838,7 +849,6 @@ impl, S: RaftStorage> Raft = oneshot::Sender>; -pub(crate) type RaftRespRx = oneshot::Receiver>; #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] @@ -854,13 +864,13 @@ pub struct AddLearnerResponse { pub(crate) type RaftAddLearnerTx = RaftRespTx, AddLearnerError>; /// TX for Install Snapshot Response -pub(crate) type InstallSnapshotTx = RaftRespTx, InstallSnapshotError>; +pub(crate) type InstallSnapshotTx = RaftRespTx, InstallSnapshotError>; /// TX for Vote Response -pub(crate) type VoteTx = RaftRespTx, VoteError>; +pub(crate) type VoteTx = RaftRespTx, Infallible>; /// TX for Append Entries Response -pub(crate) type AppendEntriesTx = RaftRespTx, AppendEntriesError>; +pub(crate) type AppendEntriesTx = RaftRespTx, Infallible>; /// TX for Client Write Response pub(crate) type ClientWriteTx = RaftRespTx, ClientWriteError>; diff --git a/openraft/src/try_as_ref.rs b/openraft/src/try_as_ref.rs new file mode 100644 index 000000000..4aa2c6910 --- /dev/null +++ b/openraft/src/try_as_ref.rs @@ -0,0 +1,3 @@ +pub trait TryAsRef { + fn try_as_ref(&self) -> Option<&T>; +} diff --git a/openraft/tests/fixtures/mod.rs b/openraft/tests/fixtures/mod.rs index 7f29c77e3..fb922b840 100644 --- a/openraft/tests/fixtures/mod.rs +++ b/openraft/tests/fixtures/mod.rs @@ -19,7 +19,6 @@ use std::time::Duration; use anyerror::AnyError; use anyhow::Context; -use anyhow::Result; use lazy_static::lazy_static; use maplit::btreeset; use memstore::Config as MemConfig; @@ -27,14 +26,13 @@ use memstore::IntoMemClientRequest; use memstore::MemStore; use openraft::async_trait::async_trait; use openraft::error::AddLearnerError; -use openraft::error::AppendEntriesError; use openraft::error::CheckIsLeaderError; use openraft::error::ClientWriteError; use openraft::error::InstallSnapshotError; use openraft::error::NetworkError; use openraft::error::RPCError; +use openraft::error::RaftError; use openraft::error::RemoteError; -use openraft::error::VoteError; use openraft::metrics::Wait; use openraft::raft::AddLearnerResponse; use openraft::raft::AppendEntriesRequest; @@ -390,7 +388,7 @@ where } /// Initialize all nodes based on the config in the routing table. - pub async fn initialize_from_single_node(&self, node_id: C::NodeId) -> Result<()> { + pub async fn initialize_from_single_node(&self, node_id: C::NodeId) -> anyhow::Result<()> { tracing::info!({ node_id = display(node_id) }, "initializing cluster from single node"); let members: BTreeSet = { let rt = self.routing_table.lock().unwrap(); @@ -421,14 +419,14 @@ where metrics } - pub fn get_metrics(&self, node_id: &C::NodeId) -> Result> { + pub fn get_metrics(&self, node_id: &C::NodeId) -> anyhow::Result> { let node = self.get_raft_handle(node_id)?; let metrics = node.metrics().borrow().clone(); Ok(metrics) } #[tracing::instrument(level = "debug", skip(self))] - pub fn get_raft_handle(&self, node_id: &C::NodeId) -> std::result::Result, NetworkError> { + pub fn get_raft_handle(&self, node_id: &C::NodeId) -> Result, NetworkError> { let rt = self.routing_table.lock().unwrap(); let raft_and_sto = rt .get(node_id) @@ -437,7 +435,7 @@ where Ok(r) } - pub fn get_storage_handle(&self, node_id: &C::NodeId) -> Result> { + pub fn get_storage_handle(&self, node_id: &C::NodeId) -> anyhow::Result> { let rt = self.routing_table.lock().unwrap(); let addr = rt.get(node_id).with_context(|| format!("could not find node {} in routing table", node_id))?; let sto = addr.clone().1; @@ -452,7 +450,7 @@ where func: T, timeout: Option, msg: &str, - ) -> Result> + ) -> anyhow::Result> where T: Fn(&RaftMetrics) -> bool + Send, { @@ -478,7 +476,7 @@ where want_log: Option, timeout: Option, msg: &str, - ) -> Result<()> { + ) -> anyhow::Result<()> { for i in node_ids.iter() { self.wait(i, timeout).log(want_log, msg).await?; } @@ -492,7 +490,7 @@ where members: BTreeSet, timeout: Option, msg: &str, - ) -> Result<()> { + ) -> anyhow::Result<()> { for i in node_ids.iter() { let wait = self.wait(i, timeout); wait.metrics( @@ -512,7 +510,7 @@ where want_state: ServerState, timeout: Option, msg: &str, - ) -> Result<()> { + ) -> anyhow::Result<()> { for i in node_ids.iter() { self.wait(i, timeout).state(want_state, msg).await?; } @@ -527,7 +525,7 @@ where want: LogId, timeout: Option, msg: &str, - ) -> Result<()> { + ) -> anyhow::Result<()> { for i in node_ids.iter() { self.wait(i, timeout).snapshot(want, msg).await?; } @@ -581,7 +579,7 @@ where target: C::NodeId, ) -> Result, AddLearnerError> { let node = self.get_raft_handle(&leader).unwrap(); - node.add_learner(target, C::Node::default(), true).await + node.add_learner(target, C::Node::default(), true).await.map_err(|e| e.into_api_error().unwrap()) } /// Send a is_leader request to the target node. @@ -590,7 +588,7 @@ where let rt = self.routing_table.lock().unwrap(); rt.get(&target).unwrap_or_else(|| panic!("node with ID {} does not exist", target)).clone() }; - node.0.is_leader().await + node.0.is_leader().await.map_err(|e| e.into_api_error().unwrap()) } /// Send a client request to the target node, causing test failure on error. @@ -599,7 +597,7 @@ where mut target: C::NodeId, client_id: &str, serial: u64, - ) -> Result<(), ClientWriteError> { + ) -> Result<(), RaftError>> { for ith in 0..3 { let req = >::make_request(client_id, serial); if let Err(err) = self.send_client_request(target, req).await { @@ -607,7 +605,7 @@ where #[allow(clippy::single_match)] match &err { - ClientWriteError::ForwardToLeader(e) => { + RaftError::APIError(ClientWriteError::ForwardToLeader(e)) => { tracing::info!( "{}-th request: target is not leader anymore. New leader is: {:?}", ith, @@ -660,7 +658,7 @@ where target: C::NodeId, client_id: &str, count: usize, - ) -> Result> { + ) -> Result>> { for idx in 0..count { self.client_request(target, client_id, idx as u64).await?; } @@ -672,7 +670,7 @@ where &self, target: C::NodeId, req: C::D, - ) -> std::result::Result> { + ) -> Result>> { let node = { let rt = self.routing_table.lock().unwrap(); rt.get(&target) @@ -937,7 +935,7 @@ where } #[tracing::instrument(level = "debug", skip(self))] - pub fn check_reachable(&self, id: C::NodeId, target: C::NodeId) -> std::result::Result<(), NetworkError> { + pub fn check_reachable(&self, id: C::NodeId, target: C::NodeId) -> Result<(), NetworkError> { let isolated = self.isolated_nodes.lock().unwrap(); if isolated.contains(&target) || isolated.contains(&id) { @@ -995,10 +993,7 @@ where async fn send_append_entries( &mut self, rpc: AppendEntriesRequest, - ) -> std::result::Result< - AppendEntriesResponse, - RPCError>, - > { + ) -> Result, RPCError>> { tracing::debug!("append_entries to id={} {}", self.target, rpc.summary()); self.owner.check_reachable(rpc.vote.node_id, self.target)?; self.owner.rand_send_delay().await; @@ -1016,9 +1011,9 @@ where async fn send_install_snapshot( &mut self, rpc: InstallSnapshotRequest, - ) -> std::result::Result< + ) -> Result< InstallSnapshotResponse, - RPCError>, + RPCError>, > { self.owner.check_reachable(rpc.vote.node_id, self.target)?; self.owner.rand_send_delay().await; @@ -1034,7 +1029,7 @@ where async fn send_vote( &mut self, rpc: VoteRequest, - ) -> std::result::Result, RPCError>> { + ) -> Result, RPCError>> { self.owner.check_reachable(rpc.vote.node_id, self.target)?; self.owner.rand_send_delay().await; diff --git a/openraft/tests/life_cycle/t20_initialization.rs b/openraft/tests/life_cycle/t20_initialization.rs index 16d5ea02e..f32e790c3 100644 --- a/openraft/tests/life_cycle/t20_initialization.rs +++ b/openraft/tests/life_cycle/t20_initialization.rs @@ -190,7 +190,7 @@ async fn initialize_err_target_not_include_target() -> anyhow::Result<()> { node_id, membership: Membership::new(vec![btreeset! {9}], None) }), - err + err.into_api_error().unwrap() ); } @@ -233,7 +233,7 @@ async fn initialize_err_not_allowed() -> anyhow::Result<()> { }), vote: Vote::new_committed(1, 0) }), - err + err.into_api_error().unwrap() ); } diff --git a/openraft/tests/life_cycle/t20_shutdown.rs b/openraft/tests/life_cycle/t20_shutdown.rs index df9c67d20..3ca551784 100644 --- a/openraft/tests/life_cycle/t20_shutdown.rs +++ b/openraft/tests/life_cycle/t20_shutdown.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use anyhow::Result; use maplit::btreeset; -use openraft::error::ClientWriteError; use openraft::error::Fatal; use openraft::Config; use openraft::ServerState; @@ -65,7 +64,7 @@ async fn return_error_after_panic() -> Result<()> { { let res = router.client_request(0, "foo", 2).await; let err = res.unwrap_err(); - assert_eq!(ClientWriteError::::Fatal(Fatal::Panicked), err); + assert_eq!(Fatal::Panicked, err.into_fatal().unwrap()); } Ok(()) @@ -98,7 +97,7 @@ async fn return_error_after_shutdown() -> Result<()> { { let res = router.client_request(0, "foo", 2).await; let err = res.unwrap_err(); - assert_eq!(ClientWriteError::::Fatal(Fatal::Stopped), err); + assert_eq!(Fatal::Stopped, err.into_fatal().unwrap()); } Ok(()) diff --git a/openraft/tests/membership/t20_change_membership.rs b/openraft/tests/membership/t20_change_membership.rs index 7f86013d1..c5a19fa5f 100644 --- a/openraft/tests/membership/t20_change_membership.rs +++ b/openraft/tests/membership/t20_change_membership.rs @@ -113,8 +113,9 @@ async fn change_without_adding_learner() -> anyhow::Result<()> { tracing::info!("--- change membership without adding-learner, allow_lagging=true"); { let res = leader.change_membership(btreeset! {0,1}, true, false).await; - match res { - Err(ClientWriteError::ChangeMembershipError(ChangeMembershipError::LearnerNotFound(err))) => { + let raft_err = res.unwrap_err(); + match raft_err.api_error().unwrap() { + ClientWriteError::ChangeMembershipError(ChangeMembershipError::LearnerNotFound(err)) => { assert_eq!(1, err.node_id); } _ => { @@ -126,8 +127,9 @@ async fn change_without_adding_learner() -> anyhow::Result<()> { tracing::info!("--- change membership without adding-learner, allow_lagging=false"); { let res = leader.change_membership(btreeset! {0,1}, false, false).await; - match res { - Err(ClientWriteError::ChangeMembershipError(ChangeMembershipError::LearnerNotFound(err))) => { + let raft_err = res.unwrap_err(); + match raft_err.api_error().unwrap() { + ClientWriteError::ChangeMembershipError(ChangeMembershipError::LearnerNotFound(err)) => { assert_eq!(1, err.node_id); } _ => { @@ -178,7 +180,7 @@ async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> { tracing::info!("--- got res: {:?}", res); let err = res.unwrap_err(); - let err: ChangeMembershipError = err.try_into().unwrap(); + let err: ChangeMembershipError = err.into_api_error().unwrap().try_into().unwrap(); match err { ChangeMembershipError::LearnerIsLagging(e) => { diff --git a/openraft/tests/membership/t30_remove_leader.rs b/openraft/tests/membership/t30_remove_leader.rs index 00439c7c1..acc9a8d51 100644 --- a/openraft/tests/membership/t30_remove_leader.rs +++ b/openraft/tests/membership/t30_remove_leader.rs @@ -160,7 +160,7 @@ async fn remove_leader_access_new_cluster() -> Result<()> { Ok(_) => { unreachable!("expect error"); } - Err(cli_err) => match cli_err { + Err(cli_err) => match cli_err.api_error().unwrap() { ClientWriteError::ForwardToLeader(fwd) => { assert!(fwd.leader_id.is_none()); assert!(fwd.leader_node.is_none());