Skip to content

Commit

Permalink
improve openraft error mapping in lite implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed Apr 5, 2024
1 parent f947d78 commit dee2097
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,8 @@
# Changelog

## [0.1.17] - 2024-04-06
better openraft error mapping in lite implementation

## [0.1.16] - 2024-04-04
Bug fixes and code refactoring in clients

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "rxqlite"
version = "0.1.16"
version = "0.1.17"
readme = "README.md"
edition = "2021"
authors = [
Expand Down
2 changes: 1 addition & 1 deletion ROADMAP.md
@@ -1,6 +1,6 @@
# Roadmap

## [0.1.17]
## [0.1.18]
Client authorization

## [0.1.12]
Expand Down
3 changes: 2 additions & 1 deletion crates/rxqlite-lite-common/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "rxqlite-lite-common"
version = "0.1.3"
version = "0.1.4"


edition = "2021"
Expand All @@ -17,6 +17,7 @@ repository = "https://github.com/HaHa421/rxqlite"

[dependencies]
anyhow = "1.0"
anyerror = "0.1.12"
derive_more = "0.99.9"
futures-util= "0.3"
rustls = {version = "0.22" }
Expand Down
96 changes: 87 additions & 9 deletions crates/rxqlite-lite-common/src/lib.rs
Expand Up @@ -10,6 +10,7 @@ use std::error::Error;
use std::fmt;
use rxqlite_common::MessageResponse;
use serde::{Serialize, Deserialize};
pub use anyerror::AnyError;

use std::collections::{
btree_map::BTreeMap,
Expand Down Expand Up @@ -96,22 +97,99 @@ impl<T: Error> RemoteError<T> {
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[error("Unreachable node: {source}")]
pub struct Unreachable {
#[from]
pub source: AnyError,
}



#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
pub struct PayloadTooLarge {
pub action: RPCTypes,

/// An optional hint indicating the anticipated number of entries.
/// Used only for append-entries replication.
pub entries_hint: u64,

/// An optional hint indicating the anticipated size in bytes.
/// Used for snapshot replication.
pub bytes_hint: u64,

#[source]
pub source: Option<AnyError>,
}

impl fmt::Display for PayloadTooLarge {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "RPC",)?;
write!(f, "({})", self.action)?;
write!(f, " payload too large:",)?;

write!(f, " hint:(")?;
match self.action {
RPCTypes::Vote => {
unreachable!("vote rpc should not have payload")
}
RPCTypes::AppendEntries => {
write!(f, "entries:{}", self.entries_hint)?;
}
RPCTypes::InstallSnapshot => {
write!(f, "bytes:{}", self.bytes_hint)?;
}
}
write!(f, ")")?;

if let Some(s) = &self.source {
write!(f, ", source: {}", s)?;
}

Ok(())
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Deserialize, serde::Serialize)]
#[error("NetworkError: {source}")]
pub struct NetworkError {
#[from]
pub source: AnyError,
}

impl NetworkError {
pub fn new<E: Error + 'static>(e: &E) -> Self {
Self {
source: AnyError::new(e),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(bound(serialize = "E: serde::Serialize"))]
#[serde(bound(deserialize = "E: for <'d> serde::Deserialize<'d>"))]
pub enum RPCError<E: Error> {
Timeout(#[from] Timeout),
Unreachable,
PayloadTooLarge,
#[error(transparent)]
Timeout(#[from] Timeout),

#[error(transparent)]
Unreachable(#[from] Unreachable),

#[error(transparent)]
PayloadTooLarge(#[from] PayloadTooLarge),

/// Failed to send the RPC request and should retry immediately.
Network,
#[error(transparent)]
Network(#[from] NetworkError),

#[error(transparent)]
RemoteError(#[from] RemoteError<E>),
}

/*
impl<E: Error> Display for RPCError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand All @@ -124,7 +202,7 @@ impl<E: Error> Display for RPCError<E> {
}
}

*/

impl<E> RPCError<RaftError<E>>
where
Expand All @@ -135,9 +213,9 @@ where
where E: TryAsRef<ForwardToLeader> {
match self {
RPCError::Timeout(_) => None,
RPCError::Unreachable => None,
RPCError::PayloadTooLarge => None,
RPCError::Network => None,
RPCError::Unreachable(_) => None,
RPCError::PayloadTooLarge(_) => None,
RPCError::Network(_) => None,
RPCError::RemoteError(remote_err) => remote_err.source.forward_to_leader(),
}
}
Expand Down
36 changes: 33 additions & 3 deletions src/convert.rs
Expand Up @@ -98,6 +98,36 @@ impl RXQliteInto<Timeout> for openraft::error::Timeout<NodeId>
}
}

impl RXQliteInto<Unreachable> for openraft::error::Unreachable
{
fn into_lite(self) -> Unreachable {
Unreachable {
source : AnyError::new(&self),
}
}
}

impl RXQliteInto<NetworkError> for openraft::error::NetworkError
{
fn into_lite(self) -> NetworkError {
NetworkError {
source : AnyError::new(&self),
}
}
}
impl RXQliteInto<PayloadTooLarge > for openraft::error::PayloadTooLarge
{
fn into_lite(self) -> PayloadTooLarge {
PayloadTooLarge {
action: self.action().into_lite(),
entries_hint: self.entries_hint(),
bytes_hint: 0,
//or we might loose information:
source : Some(AnyError::new(&self)),
}
}
}

impl<T,U> RXQliteInto<RemoteError<U>> for openraft::error::RemoteError<NodeId,Node,T>
where T: Error + RXQliteInto<U>,
U: Error
Expand All @@ -117,9 +147,9 @@ impl<Err: Error,FromErr> RXQliteInto<RPCError<Err>> for openraft::error::RPCErro
fn into_lite(self) -> RPCError<Err> {
match self {
Self::Timeout(to)=>RPCError::Timeout(to.into_lite()),
Self::Unreachable(_)=>RPCError::Unreachable,
Self::PayloadTooLarge(_)=>RPCError::PayloadTooLarge,
Self::Network(_)=>RPCError::Network,
Self::Unreachable(u)=>RPCError::Unreachable(u.into_lite()),
Self::PayloadTooLarge(p)=>RPCError::PayloadTooLarge(p.into_lite()),
Self::Network(n)=>RPCError::Network(n.into_lite()),
Self::RemoteError(re)=>RPCError::RemoteError(re.into_lite()),
}
}
Expand Down

0 comments on commit dee2097

Please sign in to comment.