From d6dcdf0b644b2f32286261c3fdc31cb09bfbabe3 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Mon, 20 Jan 2020 18:51:27 +0200 Subject: [PATCH] Implemented exponential back off recovery strategy --- bastion/Cargo.toml | 1 + bastion/src/lib.rs | 2 +- bastion/src/supervisor.rs | 126 +++++++++++++++++++++++++++++++++++--- 3 files changed, 118 insertions(+), 11 deletions(-) diff --git a/bastion/Cargo.toml b/bastion/Cargo.toml index 079c072e..0e57735b 100644 --- a/bastion/Cargo.toml +++ b/bastion/Cargo.toml @@ -37,6 +37,7 @@ unstable = ["bastion-executor/unstable"] [dependencies] bastion-executor = { version = "= 0.3.2", path = "../bastion-executor" } futures = { version = "0.3", features = ["async-await"] } +futures-timer = "2.0.2" fxhash = "0.2" lazy_static = "1.4" lightproc = { version = "= 0.3.3", path = "../lightproc" } diff --git a/bastion/src/lib.rs b/bastion/src/lib.rs index 81b15326..bcc18315 100644 --- a/bastion/src/lib.rs +++ b/bastion/src/lib.rs @@ -96,6 +96,6 @@ pub mod prelude { pub use crate::message::{Answer, AnswerSender, Message, Msg}; pub use crate::msg; pub use crate::path::{BastionPath, BastionPathElement}; - pub use crate::supervisor::{SupervisionStrategy, Supervisor, SupervisorRef}; + pub use crate::supervisor::{ActorRestartStrategy, SupervisionStrategy, Supervisor, SupervisorRef}; pub use crate::{blocking, children, run, spawn, supervisor}; } diff --git a/bastion/src/supervisor.rs b/bastion/src/supervisor.rs index e78392b9..7a43890f 100644 --- a/bastion/src/supervisor.rs +++ b/bastion/src/supervisor.rs @@ -13,6 +13,7 @@ use bastion_executor::pool; use futures::prelude::*; use futures::stream::FuturesOrdered; use futures::{pending, poll}; +use futures_timer::Delay; use fxhash::FxHashMap; use lightproc::prelude::*; use log::Level; @@ -20,6 +21,7 @@ use std::cmp::{Eq, PartialEq}; use std::ops::RangeFrom; use std::sync::Arc; use std::task::Poll; +use std::time::Duration; #[derive(Debug)] /// A supervisor that can supervise both [`Children`] and other @@ -64,8 +66,9 @@ pub struct Supervisor { // The order in which children and supervisors were added. // It is only updated when at least one of those is resat. order: Vec, - // The currently launched supervised children and supervisors. - launched: FxHashMap)>, + // The currently launched supervised children and supervisors. The + // last value stores amount of restarts of the certain actor. + launched: FxHashMap, u64)>, // Supervised children and supervisors that are stopped. // This is used when resetting or recovering when the // supervision strategy is not "one-for-one". @@ -74,6 +77,7 @@ pub struct Supervisor { // This is used when resetting only. killed: FxHashMap, strategy: SupervisionStrategy, + restart_strategy: ActorRestartStrategy, // The callbacks called at the supervisor's different // lifecycle events. callbacks: Callbacks, @@ -132,6 +136,25 @@ enum Supervised { Children(Children), } +#[derive(Debug, Clone)] +/// The strategy for restating an actor as far as it +/// returned an failure. +/// +/// The default strategy is `Instantly`. +pub enum ActorRestartStrategy { + /// Restart an actor as soon as possible, since the moment + /// the actor finished with a failure. + Instantly, + /// Restart an actor after with the timeout. Each next timeout + /// is increasing exponentially. + ExponentialBackOff { + /// An initial delay before the restarting an actor. + timeout: Duration, + /// Defines a multiplier how fast the timeout will be increasing. + multiplier: u64, + }, +} + impl Supervisor { pub(crate) fn new(bcast: Broadcast) -> Self { debug!("Supervisor({}): Initializing.", bcast.id()); @@ -140,6 +163,7 @@ impl Supervisor { let stopped = FxHashMap::default(); let killed = FxHashMap::default(); let strategy = SupervisionStrategy::default(); + let restart_strategy = ActorRestartStrategy::default(); let callbacks = Callbacks::new(); let is_system_supervisor = false; let pre_start_msgs = Vec::new(); @@ -152,6 +176,7 @@ impl Supervisor { stopped, killed, strategy, + restart_strategy, callbacks, is_system_supervisor, pre_start_msgs, @@ -621,6 +646,52 @@ impl Supervisor { self } + /// Sets the actor restart strategy the supervisor should use + /// of its supervised children groups or supervisors dies to + /// restore in the correct state. + /// + /// The default strategy is + /// [`ActorRestartStrategy::Instanly`]. + /// + /// # Arguments + /// + /// * `restart_strategy` - The strategy to use: + /// - [`ActorRestartStrategy::Instantly`] would restart the + /// failed actor as soon as possible. + /// - [`ActorRestartStrategy::ExponentialBackOff`] would restart the + /// failed actor with the delay (in milliseconds), multiplied on the + /// some coefficient. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # fn main() { + /// # Bastion::init(); + /// # + /// Bastion::supervisor(|sp| { + /// sp.with_restart_strategy(ActorRestartStrategy::Instantly) + /// }).expect("Couldn't create the supervisor"); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + /// + /// [`ActorRestartStrategy::Instantly`]: supervisor/enum.ActorRestartStrategy.html#variant.Instantly + /// [`ActorRestartStrategy::ExponentialBackOff`]: supervisor/enum.ActorRestartStrategy.html#variant.ExponentialBackOff + pub fn with_restart_strategy(mut self, restart_strategy: ActorRestartStrategy) -> Self { + trace!( + "Supervisor({}): Setting actor restart strategy: {:?}", + self.id(), + restart_strategy + ); + self.restart_strategy = restart_strategy; + self + } + /// Sets the callbacks that will get called at this supervisor's /// different lifecycle events. /// @@ -670,6 +741,7 @@ impl Supervisor { // TODO: stop or kill? self.kill(range.clone()).await; + let restart_strategy = self.restart_strategy.clone(); let supervisor_id = &self.id().clone(); let parent = Parent::supervisor(self.as_ref()); let mut reset = FuturesOrdered::new(); @@ -692,6 +764,12 @@ impl Supervisor { supervised.elem().clone().with_id(BastionId::new()), ); + let restart_count = match self.launched.get(&id.clone()) { + Some((_, _, count)) => *count, + None => 0, + }; + + let restart_strategy_inner = restart_strategy.clone(); reset.push(async move { debug!( "Supervisor({}): Resetting Supervised({}) (killed={}) to Supervised({}).", @@ -700,6 +778,19 @@ impl Supervisor { killed, bcast.id() ); + + match restart_strategy_inner { + ActorRestartStrategy::ExponentialBackOff { + timeout, + multiplier, + } => { + let start_in = + timeout.as_secs() + (timeout.as_secs() * multiplier * restart_count); + Delay::new(Duration::from_secs(start_in)).await; + } + _ => {} + }; + // FIXME: panics? let supervised = supervised.reset(bcast).await.unwrap(); // FIXME: might not keep order @@ -733,9 +824,13 @@ impl Supervisor { supervised.id() ); let id = supervised.id().clone(); + let restart_count = match self.launched.get(&id.clone()) { + Some((_, _, count)) => *count, + None => 0, + }; let launched = supervised.launch(); self.launched - .insert(id.clone(), (self.order.len(), launched)); + .insert(id.clone(), (self.order.len(), launched, restart_count)); self.order.push(id); } } @@ -756,7 +851,7 @@ impl Supervisor { // FIXME: panics? for id in self.order.get(range.clone()).unwrap() { // TODO: Err if None? - if let Some((_, launched)) = self.launched.remove(&id) { + if let Some((_, launched, _)) = self.launched.remove(&id) { // TODO: add a "stopped" list and poll from it instead of awaiting supervised.push(launched); } @@ -797,7 +892,7 @@ impl Supervisor { // FIXME: panics? for id in self.order.get(range.clone()).unwrap() { // TODO: Err if None? - if let Some((_, launched)) = self.launched.remove(&id) { + if let Some((_, launched, _)) = self.launched.remove(&id) { // TODO: add a "stopped" list and poll from it instead of awaiting supervised.push(launched); } @@ -838,7 +933,7 @@ impl Supervisor { ); match self.strategy { SupervisionStrategy::OneForOne => { - let (order, launched) = self.launched.remove(&id).ok_or(())?; + let (order, launched, _) = self.launched.remove(&id).ok_or(())?; // TODO: add a "waiting" list and poll from it instead of awaiting // FIXME: panics? let supervised = launched.await.unwrap(); @@ -873,8 +968,13 @@ impl Supervisor { self.id(), supervised.id() ); + let restart_count = match self.launched.get(&id.clone()) { + Some((_, _, count)) => count + 1, + None => 0, + }; let launched = supervised.launch(); - self.launched.insert(id.clone(), (order, launched)); + self.launched + .insert(id.clone(), (order, launched, restart_count)); self.order[order] = id; } SupervisionStrategy::OneForAll => { @@ -885,7 +985,7 @@ impl Supervisor { self.killed.shrink_to_fit(); } SupervisionStrategy::RestForOne => { - let (start, _) = self.launched.get(&id).ok_or(())?; + let (start, _, _) = self.launched.get(&id).ok_or(())?; let start = *start; self.restart(start..).await; @@ -960,7 +1060,7 @@ impl Supervisor { let id = supervised.id().clone(); let launched = supervised.launch(); self.launched - .insert(id.clone(), (self.order.len(), launched)); + .insert(id.clone(), (self.order.len(), launched, 0)); self.order.push(id); } // FIXME @@ -995,7 +1095,7 @@ impl Supervisor { .. } => { // FIXME: Err if None? - if let Some((_, launched)) = self.launched.remove(&id) { + if let Some((_, launched, _)) = self.launched.remove(&id) { debug!("Supervisor({}): Supervised({}) stopped.", self.id(), id); // TODO: add a "waiting" list an poll from it instead of awaiting // FIXME: panics? @@ -1579,6 +1679,12 @@ impl Default for SupervisionStrategy { } } +impl Default for ActorRestartStrategy { + fn default() -> Self { + ActorRestartStrategy::Instantly + } +} + impl PartialEq for SupervisorRef { fn eq(&self, other: &Self) -> bool { self.id == other.id