Skip to content

Commit

Permalink
Feature: add backoff strategy for unreachable nodes
Browse files Browse the repository at this point in the history
Implements a backoff strategy for temporarily or permanently unreachable nodes.
If the `Network` implementation returns `Unreachable` error, Openraft
will backoff for a while before sending next RPC to this target.
This mechanism prevents error logging flood.

Adds a new method `backoff()` to `RaftNetwork` to let an application
return a customized backoff policy, the default provided backoff just
constantly sleep 500ms.

Adds an `unreachable_nodes` setting to the testing router `TypedRaftRouteryped` to emulate unreachable nodes.
Add new error `Unreachable` and an `RPCError` variant `Unreachable`.

- Fix: #462
  • Loading branch information
drmingdrmer committed Apr 24, 2023
1 parent fa99e24 commit 229f336
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 36 deletions.
36 changes: 34 additions & 2 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ where
#[error(transparent)]
HigherVote(#[from] HigherVote<NID>),

#[error("Replication is closed")]
Closed,
#[error(transparent)]
Closed(#[from] ReplicationClosed),

// TODO(xp): two sub type: StorageError / TransportError
// TODO(xp): a sub error for just send_append_entries()
Expand All @@ -236,6 +236,11 @@ where
RPCError(#[from] RPCError<NID, N, RaftError<NID, Infallible>>),
}

/// Error occurs when replication is closed.
#[derive(Debug, thiserror::Error)]
#[error("Replication is closed by RaftCore")]
pub(crate) struct ReplicationClosed {}

/// Error occurs when invoking a remote raft API.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(
Expand All @@ -247,6 +252,11 @@ pub enum RPCError<NID: NodeId, N: Node, E: Error> {
#[error(transparent)]
Timeout(#[from] Timeout<NID>),

/// The node is temporarily unreachable and should backoff before retrying.
#[error(transparent)]
Unreachable(#[from] Unreachable),

/// Failed to send the RPC request and should retry immediately.
#[error(transparent)]
Network(#[from] NetworkError),

Expand All @@ -265,6 +275,7 @@ where
where E: TryAsRef<ForwardToLeader<NID, N>> {
match self {
RPCError::Timeout(_) => None,
RPCError::Unreachable(_) => None,
RPCError::Network(_) => None,
RPCError::RemoteError(remote_err) => remote_err.source.forward_to_leader(),
}
Expand Down Expand Up @@ -331,6 +342,27 @@ impl NetworkError {
}
}

/// Error that indicates a node is unreachable and should not retry sending anything to it
/// immediately.
///
/// It is similar to [`NetworkError`] but indicating a backoff.
/// When a [`NetworkError`] is returned, Openraft will retry immediately.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[error("Unreachable node: {source}")]
pub struct Unreachable {
#[from]
source: AnyError,
}

impl Unreachable {
pub fn new<E: Error + 'static>(e: &E) -> Self {
Self {
source: AnyError::new(e),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[error("timeout after {timeout:?} when {action} {id}->{target}")]
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/membership/stored_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ where
NID: NodeId,
{
fn summary(&self) -> String {
format!("{{log_id:{:?} membership:{}}}", self.log_id, self.membership.summary())
format!("{{log_id:{}, {}}}", self.log_id.summary(), self.membership.summary())
}
}
3 changes: 2 additions & 1 deletion openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ where
NID: NodeId,
N: Node,
{
// TODO: make this more readable
fn summary(&self) -> String {
format!("Metrics{{id:{},{:?}, term:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, replication:{{{}}}",
self.id,
self.state,
self.current_term,
self.last_log_index,
self.last_applied,
self.last_applied.summary(),
self.current_leader,
self.membership_config.summary(),
self.snapshot,
Expand Down
36 changes: 36 additions & 0 deletions openraft/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! The Raft network interface.

use std::fmt::Formatter;
use std::time::Duration;

use async_trait::async_trait;

Expand Down Expand Up @@ -63,6 +64,41 @@ where C: RaftTypeConfig
&mut self,
rpc: VoteRequest<C::NodeId>,
) -> Result<VoteResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>>;

/// Build a backoff instance if the target node is temporarily(or permanently) unreachable.
///
/// When a [`Unreachable`](`crate::error::Unreachable`) error is returned from the `Network`
/// methods, Openraft does not retry connecting to a node immediately. Instead, it sleeps
/// for a while and retries. The duration of the sleep is determined by the backoff
/// instance.
///
/// The backoff is an infinite iterator that returns the ith sleep interval before the ith
/// retry. The returned instance will be dropped if a successful RPC is made.
///
/// By default it returns a constant backoff of 500 ms.
fn backoff(&self) -> Backoff {
Backoff::new(std::iter::repeat(Duration::from_millis(500)))
}
}

/// A backoff instance that is an infinite iterator of durations to sleep before next retry, when a
/// [`Unreachable`](`crate::error::Unreachable`) occurs.
pub struct Backoff {
inner: Box<dyn Iterator<Item = Duration> + Send + 'static>,
}

impl Backoff {
pub fn new(iter: impl Iterator<Item = Duration> + Send + 'static) -> Self {
Self { inner: Box::new(iter) }
}
}

impl Iterator for Backoff {
type Item = Duration;

fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}

/// A trait defining the interface for a Raft network factory to create connections between cluster
Expand Down
107 changes: 81 additions & 26 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,25 @@ use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeek;
use tokio::io::AsyncSeekExt;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tokio::time::timeout;
use tokio::time::Duration;
use tokio::time::Instant;
use tracing_futures::Instrument;

use crate::config::Config;
use crate::error::HigherVote;
use crate::error::RPCError;
use crate::error::ReplicationClosed;
use crate::error::ReplicationError;
use crate::error::Timeout;
use crate::log_id::LogIdOptionExt;
use crate::log_id_range::LogIdRange;
use crate::network::Backoff;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::InstallSnapshotRequest;
Expand Down Expand Up @@ -59,7 +63,7 @@ where
S: AsyncRead + AsyncSeek + Send + Unpin + 'static,
{
/// The spawn handle the `ReplicationCore` task.
pub(crate) join_handle: JoinHandle<()>,
pub(crate) join_handle: JoinHandle<Result<(), ReplicationClosed>>,

/// The channel used for communicating with the replication task.
pub(crate) tx_repl: mpsc::UnboundedSender<Replicate<NID, N, S>>,
Expand Down Expand Up @@ -93,6 +97,10 @@ where
/// The `RaftNetwork` interface.
network: N::Network,

/// The backoff policy if an [`Unreachable`] error is returned.
/// It will be reset to `None` when an successful response is received.
backoff: Option<Backoff>,

/// The `RaftLogReader` of a `RaftStorage` interface.
log_reader: LS::LogReader,

Expand Down Expand Up @@ -146,6 +154,7 @@ where
target,
session_id,
network,
backoff: None,
log_reader,
config,
committed,
Expand All @@ -161,7 +170,7 @@ where
}

#[tracing::instrument(level="debug", skip(self), fields(session=%self.session_id, target=display(self.target), cluster=%self.config.cluster_name))]
async fn main(mut self) {
async fn main(mut self) -> Result<(), ReplicationClosed> {
loop {
let action = std::mem::replace(&mut self.next_action, None);

Expand All @@ -179,28 +188,31 @@ where
};

match res {
Ok(_x) => {}
Ok(_) => {
// reset backoff
self.backoff = None;
}
Err(err) => {
tracing::warn!(error=%err, "error replication to target={}", self.target);

match err {
ReplicationError::Closed => {
return;
ReplicationError::Closed(closed) => {
return Err(closed);
}
ReplicationError::HigherVote(h) => {
let _ = self.tx_raft_core.send(RaftMsg::HigherVote {
target: self.target,
higher: h.higher,
vote: self.session_id.vote,
});
return;
return Ok(());
}
ReplicationError::StorageError(err) => {
tracing::error!(error=%err, "error replication to target={}", self.target);

// TODO: report this error
let _ = self.tx_raft_core.send(RaftMsg::ReplicationFatal);
return;
return Ok(());
}
ReplicationError::RPCError(err) => {
tracing::error!(err = display(&err), "RPCError");
Expand All @@ -210,24 +222,29 @@ where
result: Err(err.to_string()),
session_id: self.session_id,
});

// If there is an [`Unreachable`] error, we will backoff for a period of time
// Backoff will be reset if there is a successful RPC is sent.
if let RPCError::Unreachable(_unreachable) = err {
if self.backoff.is_none() {
self.backoff = Some(self.network.backoff());
}
}
}
};
}
};

let res = self.drain_events().await;
match res {
Ok(_x) => {}
Err(err) => match err {
ReplicationError::Closed => {
return;
}
if let Some(b) = &mut self.backoff {
let duration = b.next().unwrap_or_else(|| {
tracing::warn!("backoff exhausted, using default");
Duration::from_millis(500)
});

_ => {
unreachable!("no other error expected but: {:?}", err);
}
},
self.backoff_drain_events(Instant::now() + duration).await?;
}

self.drain_events().await?;
}
}

Expand Down Expand Up @@ -367,14 +384,49 @@ where
}
}

/// Drain all events in the channel in backoff mode, i.e., there was an un-retry-able error and
/// should not send out anything before backoff interval expired.
///
/// In the backoff period, we should not send out any RPCs, but we should still receive events,
/// in case the channel is closed, it should quit at once.
#[tracing::instrument(level = "trace", skip(self))]
pub async fn backoff_drain_events(&mut self, until: Instant) -> Result<(), ReplicationClosed> {
let d = until - Instant::now();
tracing::warn!(
interval = debug(d),
"{} backoff mode: drain events without processing them",
func_name!()
);

loop {
let sleep_duration = until - Instant::now();
let sleep = sleep(sleep_duration);

let recv = self.rx_repl.recv();

tracing::debug!("backoff timeout: {:?}", sleep_duration);

select! {
_ = sleep => {
tracing::debug!("backoff timeout");
return Ok(());
}
recv_res = recv => {
let event = recv_res.ok_or(ReplicationClosed{})?;
self.process_event(event);
}
}
}
}

/// Receive and process events from RaftCore, until `next_action` is filled.
///
/// It blocks until at least one event is received.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn drain_events(&mut self) -> Result<(), ReplicationError<C::NodeId, C::Node>> {
pub async fn drain_events(&mut self) -> Result<(), ReplicationClosed> {
tracing::debug!("drain_events");

let event = self.rx_repl.recv().await.ok_or(ReplicationError::Closed)?;
let event = self.rx_repl.recv().await.ok_or(ReplicationClosed {})?;
self.process_event(event);

self.try_drain_events().await?;
Expand All @@ -399,10 +451,13 @@ where
}

#[tracing::instrument(level = "trace", skip(self))]
pub async fn try_drain_events(&mut self) -> Result<(), ReplicationError<C::NodeId, C::Node>> {
pub async fn try_drain_events(&mut self) -> Result<(), ReplicationClosed> {
tracing::debug!("try_drain_raft_rx");

while self.next_action.is_none() {
// Just drain all events in the channel.
// There should not be more than one `Replicate::Data` event in the channel.
// Looping it just collect all commit events and heartbeat events.
loop {
let maybe_res = self.rx_repl.recv().now_or_never();

let recv_res = match maybe_res {
Expand All @@ -413,12 +468,10 @@ where
Some(x) => x,
};

let event = recv_res.ok_or(ReplicationError::Closed)?;
let event = recv_res.ok_or(ReplicationClosed {})?;

self.process_event(event);
}

Ok(())
}

#[tracing::instrument(level = "trace", skip_all)]
Expand All @@ -445,7 +498,9 @@ where
//- If self.next_action is not None, next_action will serve as a heartbeat.
}
Replicate::Data(d) => {
debug_assert!(self.next_action.is_none(),);
// TODO: Currently there is at most 1 in flight data. But in future RaftCore may send next data
// actions without waiting for the previous to finish.
debug_assert!(self.next_action.is_none(), "there can not be two data action in flight");
self.next_action = Some(d);
}
}
Expand Down
Loading

0 comments on commit 229f336

Please sign in to comment.