Skip to content

Commit

Permalink
Change: add RaftError as API return error type.
Browse files Browse the repository at this point in the history
Add `RaftError<E>` as error type returned by every `Raft::xxx()` API.
RaftError has two variants: Fatal error or API specific error.
This way every API error such as AppendEntriesError does not have to include
an `Fatal` in it.

Upgrade tip:

The affected types is mainly `trait RaftNetwork`, an application should
replace AppendEntriesError, VoteError, InstallSnapshotError with
`RaftError<_>`, `RaftError<_>`, and `RaftError<_, InstallSnapshotError>`.

So is for other parts, e.g., `Raft::append_entries()` now returns
`Result<AppendEntriesResponse, RaftError<_>>`, an application should
also rewrite error handling that calls these APIs.

See changes in examples/.
  • Loading branch information
drmingdrmer committed Feb 12, 2023
1 parent 77e87a3 commit fbb3f21
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 212 deletions.
48 changes: 30 additions & 18 deletions examples/raft-kv-memstore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ use openraft::error::AddLearnerError;
use openraft::error::CheckIsLeaderError;
use openraft::error::ClientWriteError;
use openraft::error::ForwardToLeader;
use openraft::error::Infallible;
use openraft::error::InitializeError;
use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RaftError;
use openraft::error::RemoteError;
use openraft::raft::AddLearnerResponse;
use openraft::raft::ClientWriteResponse;
use openraft::BasicNode;
use openraft::RaftMetrics;
use openraft::TryAsRef;
use reqwest::Client;
use serde::de::DeserializeOwned;
use serde::Deserialize;
Expand Down Expand Up @@ -60,15 +61,18 @@ impl ExampleClient {
req: &ExampleRequest,
) -> Result<
ClientWriteResponse<ExampleTypeConfig>,
RPCError<ExampleNodeId, BasicNode, ClientWriteError<ExampleNodeId, BasicNode>>,
RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId, ClientWriteError<ExampleNodeId, BasicNode>>>,
> {
self.send_rpc_to_leader("write", Some(req)).await
}

/// Read value by key, in an inconsistent mode.
///
/// This method may return stale value because it does not force to read on a legal leader.
pub async fn read(&self, req: &String) -> Result<String, RPCError<ExampleNodeId, BasicNode, Infallible>> {
pub async fn read(
&self,
req: &String,
) -> Result<String, RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId>>> {
self.do_send_rpc_to_leader("read", Some(req)).await
}

Expand All @@ -78,7 +82,10 @@ impl ExampleClient {
pub async fn consistent_read(
&self,
req: &String,
) -> Result<String, RPCError<ExampleNodeId, BasicNode, CheckIsLeaderError<ExampleNodeId, BasicNode>>> {
) -> Result<
String,
RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId, CheckIsLeaderError<ExampleNodeId, BasicNode>>>,
> {
self.do_send_rpc_to_leader("consistent_read", Some(req)).await
}

Expand All @@ -92,7 +99,10 @@ impl ExampleClient {
/// Then make the new node a member with [`change_membership`].
pub async fn init(
&self,
) -> Result<(), RPCError<ExampleNodeId, BasicNode, InitializeError<ExampleNodeId, BasicNode>>> {
) -> Result<
(),
RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId, InitializeError<ExampleNodeId, BasicNode>>>,
> {
self.do_send_rpc_to_leader("init", Some(&Empty {})).await
}

Expand All @@ -104,7 +114,7 @@ impl ExampleClient {
req: (ExampleNodeId, String),
) -> Result<
AddLearnerResponse<ExampleNodeId>,
RPCError<ExampleNodeId, BasicNode, AddLearnerError<ExampleNodeId, BasicNode>>,
RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId, AddLearnerError<ExampleNodeId, BasicNode>>>,
> {
self.send_rpc_to_leader("add-learner", Some(&req)).await
}
Expand All @@ -118,7 +128,7 @@ impl ExampleClient {
req: &BTreeSet<ExampleNodeId>,
) -> Result<
ClientWriteResponse<ExampleTypeConfig>,
RPCError<ExampleNodeId, BasicNode, ClientWriteError<ExampleNodeId, BasicNode>>,
RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId, ClientWriteError<ExampleNodeId, BasicNode>>>,
> {
self.send_rpc_to_leader("change-membership", Some(req)).await
}
Expand All @@ -130,7 +140,8 @@ impl ExampleClient {
/// See [`RaftMetrics`].
pub async fn metrics(
&self,
) -> Result<RaftMetrics<ExampleNodeId, BasicNode>, RPCError<ExampleNodeId, BasicNode, Infallible>> {
) -> Result<RaftMetrics<ExampleNodeId, BasicNode>, RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId>>>
{
self.do_send_rpc_to_leader("metrics", None::<&()>).await
}

Expand All @@ -145,7 +156,7 @@ impl ExampleClient {
&self,
uri: &str,
req: Option<&Req>,
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId, Err>>>
where
Req: Serialize + 'static,
Resp: Serialize + DeserializeOwned,
Expand Down Expand Up @@ -179,7 +190,8 @@ impl ExampleClient {
}
};

let res: Result<Resp, Err> = resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
let res: Result<Resp, RaftError<ExampleNodeId, Err>> =
resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?;
tracing::debug!(
"<<< client recv reply from {}: {}",
url,
Expand All @@ -197,41 +209,41 @@ impl ExampleClient {
&self,
uri: &str,
req: Option<&Req>,
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId, Err>>>
where
Req: Serialize + 'static,
Resp: Serialize + DeserializeOwned,
Err: std::error::Error
+ Serialize
+ DeserializeOwned
+ TryInto<ForwardToLeader<ExampleNodeId, BasicNode>>
+ TryAsRef<ForwardToLeader<ExampleNodeId, BasicNode>>
+ Clone,
{
// Retry at most 3 times to find a valid leader.
let mut n_retry = 3;

loop {
let res: Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>> = self.do_send_rpc_to_leader(uri, req).await;
let res: Result<Resp, RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId, Err>>> =
self.do_send_rpc_to_leader(uri, req).await;

let rpc_err = match res {
Ok(x) => return Ok(x),
Err(rpc_err) => rpc_err,
};

if let RPCError::RemoteError(remote_err) = &rpc_err {
let forward_err_res =
<Err as TryInto<ForwardToLeader<ExampleNodeId, BasicNode>>>::try_into(remote_err.source.clone());
let raft_err = &remote_err.source;

if let Ok(ForwardToLeader {
if let Some(ForwardToLeader {
leader_id: Some(leader_id),
leader_node: Some(leader_node),
..
}) = forward_err_res
}) = raft_err.forward_to_leader()
{
// Update target to the new leader.
{
let mut t = self.leader.lock().unwrap();
*t = (leader_id, leader_node.addr);
*t = (*leader_id, leader_node.addr.clone());
}

n_retry -= 1;
Expand Down
4 changes: 3 additions & 1 deletion examples/raft-kv-memstore/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use actix_web::web::Data;
use actix_web::Responder;
use openraft::error::CheckIsLeaderError;
use openraft::error::Infallible;
use openraft::error::RaftError;
use openraft::BasicNode;
use web::Json;

Expand Down Expand Up @@ -46,7 +47,8 @@ pub async fn consistent_read(app: Data<ExampleApp>, req: Json<String>) -> actix_
let key = req.0;
let value = state_machine.data.get(&key).cloned();

let res: Result<String, CheckIsLeaderError<ExampleNodeId, BasicNode>> = Ok(value.unwrap_or_default());
let res: Result<String, RaftError<ExampleNodeId, CheckIsLeaderError<ExampleNodeId, BasicNode>>> =
Ok(value.unwrap_or_default());
Ok(Json(res))
}
Err(e) => Ok(Json(Err(e))),
Expand Down
13 changes: 5 additions & 8 deletions examples/raft-kv-memstore/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use async_trait::async_trait;
use openraft::error::AppendEntriesError;
use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RaftError;
use openraft::error::RemoteError;
use openraft::error::VoteError;
use openraft::raft::AppendEntriesRequest;
use openraft::raft::AppendEntriesResponse;
use openraft::raft::InstallSnapshotRequest;
Expand Down Expand Up @@ -86,10 +85,8 @@ impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
async fn send_append_entries(
&mut self,
req: AppendEntriesRequest<ExampleTypeConfig>,
) -> Result<
AppendEntriesResponse<ExampleNodeId>,
RPCError<ExampleNodeId, BasicNode, AppendEntriesError<ExampleNodeId>>,
> {
) -> Result<AppendEntriesResponse<ExampleNodeId>, RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId>>>
{
self.owner.send_rpc(self.target, &self.target_node, "raft-append", req).await
}

Expand All @@ -98,15 +95,15 @@ impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
req: InstallSnapshotRequest<ExampleTypeConfig>,
) -> Result<
InstallSnapshotResponse<ExampleNodeId>,
RPCError<ExampleNodeId, BasicNode, InstallSnapshotError<ExampleNodeId>>,
RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId, InstallSnapshotError>>,
> {
self.owner.send_rpc(self.target, &self.target_node, "raft-snapshot", req).await
}

async fn send_vote(
&mut self,
req: VoteRequest<ExampleNodeId>,
) -> Result<VoteResponse<ExampleNodeId>, RPCError<ExampleNodeId, BasicNode, VoteError<ExampleNodeId>>> {
) -> Result<VoteResponse<ExampleNodeId>, RPCError<ExampleNodeId, BasicNode, RaftError<ExampleNodeId>>> {
self.owner.send_rpc(self.target, &self.target_node, "raft-vote", req).await
}
}
38 changes: 38 additions & 0 deletions examples/raft-kv-memstore/tests/cluster/test_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::backtrace::Backtrace;
use std::collections::BTreeMap;
use std::panic::PanicInfo;
use std::thread;
use std::time::Duration;

Expand All @@ -11,6 +13,38 @@ use raft_kv_memstore::store::ExampleRequest;
use tokio::runtime::Runtime;
use tracing_subscriber::EnvFilter;

pub fn log_panic(panic: &PanicInfo) {
let backtrace = {
format!("{:?}", Backtrace::force_capture())
// #[cfg(feature = "bt")]
// {
// format!("{:?}", Backtrace::force_capture())
// }
//
// #[cfg(not(feature = "bt"))]
// {
// "backtrace is disabled without --features 'bt'".to_string()
// }
};

eprintln!("{}", panic);

if let Some(location) = panic.location() {
tracing::error!(
message = %panic,
backtrace = %backtrace,
panic.file = location.file(),
panic.line = location.line(),
panic.column = location.column(),
);
eprintln!("{}:{}:{}", location.file(), location.line(), location.column());
} else {
tracing::error!(message = %panic, backtrace = %backtrace);
}

eprintln!("{}", backtrace);
}

/// Setup a cluster of 3 nodes.
/// Write to it and read from it.
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
Expand All @@ -20,6 +54,10 @@ async fn test_cluster() -> anyhow::Result<()> {
// This is only used by the client. A raft node in this example stores node addresses in its
// store.

std::panic::set_hook(Box::new(|panic| {
log_panic(panic);
}));

tracing_subscriber::fmt()
.with_target(true)
.with_thread_ids(true)
Expand Down

0 comments on commit fbb3f21

Please sign in to comment.