Skip to content

Commit

Permalink
Implemented exponential back off recovery strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Relrin committed Jan 20, 2020
1 parent 2db4f94 commit d6dcdf0
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 11 deletions.
1 change: 1 addition & 0 deletions bastion/Cargo.toml
Expand Up @@ -37,6 +37,7 @@ unstable = ["bastion-executor/unstable"]
[dependencies]
bastion-executor = { version = "= 0.3.2", path = "../bastion-executor" }
futures = { version = "0.3", features = ["async-await"] }
futures-timer = "2.0.2"
fxhash = "0.2"
lazy_static = "1.4"
lightproc = { version = "= 0.3.3", path = "../lightproc" }
Expand Down
2 changes: 1 addition & 1 deletion bastion/src/lib.rs
Expand Up @@ -96,6 +96,6 @@ pub mod prelude {
pub use crate::message::{Answer, AnswerSender, Message, Msg};
pub use crate::msg;
pub use crate::path::{BastionPath, BastionPathElement};
pub use crate::supervisor::{SupervisionStrategy, Supervisor, SupervisorRef};
pub use crate::supervisor::{ActorRestartStrategy, SupervisionStrategy, Supervisor, SupervisorRef};
pub use crate::{blocking, children, run, spawn, supervisor};
}
126 changes: 116 additions & 10 deletions bastion/src/supervisor.rs
Expand Up @@ -13,13 +13,15 @@ use bastion_executor::pool;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
use futures::{pending, poll};
use futures_timer::Delay;
use fxhash::FxHashMap;
use lightproc::prelude::*;
use log::Level;
use std::cmp::{Eq, PartialEq};
use std::ops::RangeFrom;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;

#[derive(Debug)]
/// A supervisor that can supervise both [`Children`] and other
Expand Down Expand Up @@ -64,8 +66,9 @@ pub struct Supervisor {
// The order in which children and supervisors were added.
// It is only updated when at least one of those is resat.
order: Vec<BastionId>,
// The currently launched supervised children and supervisors.
launched: FxHashMap<BastionId, (usize, RecoverableHandle<Supervised>)>,
// The currently launched supervised children and supervisors. The
// last value stores amount of restarts of the certain actor.
launched: FxHashMap<BastionId, (usize, RecoverableHandle<Supervised>, u64)>,
// Supervised children and supervisors that are stopped.
// This is used when resetting or recovering when the
// supervision strategy is not "one-for-one".
Expand All @@ -74,6 +77,7 @@ pub struct Supervisor {
// This is used when resetting only.
killed: FxHashMap<BastionId, Supervised>,
strategy: SupervisionStrategy,
restart_strategy: ActorRestartStrategy,
// The callbacks called at the supervisor's different
// lifecycle events.
callbacks: Callbacks,
Expand Down Expand Up @@ -132,6 +136,25 @@ enum Supervised {
Children(Children),
}

#[derive(Debug, Clone)]
/// The strategy for restating an actor as far as it
/// returned an failure.
///
/// The default strategy is `Instantly`.
pub enum ActorRestartStrategy {
/// Restart an actor as soon as possible, since the moment
/// the actor finished with a failure.
Instantly,
/// Restart an actor after with the timeout. Each next timeout
/// is increasing exponentially.
ExponentialBackOff {
/// An initial delay before the restarting an actor.
timeout: Duration,
/// Defines a multiplier how fast the timeout will be increasing.
multiplier: u64,
},
}

impl Supervisor {
pub(crate) fn new(bcast: Broadcast) -> Self {
debug!("Supervisor({}): Initializing.", bcast.id());
Expand All @@ -140,6 +163,7 @@ impl Supervisor {
let stopped = FxHashMap::default();
let killed = FxHashMap::default();
let strategy = SupervisionStrategy::default();
let restart_strategy = ActorRestartStrategy::default();
let callbacks = Callbacks::new();
let is_system_supervisor = false;
let pre_start_msgs = Vec::new();
Expand All @@ -152,6 +176,7 @@ impl Supervisor {
stopped,
killed,
strategy,
restart_strategy,
callbacks,
is_system_supervisor,
pre_start_msgs,
Expand Down Expand Up @@ -621,6 +646,52 @@ impl Supervisor {
self
}

/// Sets the actor restart strategy the supervisor should use
/// of its supervised children groups or supervisors dies to
/// restore in the correct state.
///
/// The default strategy is
/// [`ActorRestartStrategy::Instanly`].
///
/// # Arguments
///
/// * `restart_strategy` - The strategy to use:
/// - [`ActorRestartStrategy::Instantly`] would restart the
/// failed actor as soon as possible.
/// - [`ActorRestartStrategy::ExponentialBackOff`] would restart the
/// failed actor with the delay (in milliseconds), multiplied on the
/// some coefficient.
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # fn main() {
/// # Bastion::init();
/// #
/// Bastion::supervisor(|sp| {
/// sp.with_restart_strategy(ActorRestartStrategy::Instantly)
/// }).expect("Couldn't create the supervisor");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// ```
///
/// [`ActorRestartStrategy::Instantly`]: supervisor/enum.ActorRestartStrategy.html#variant.Instantly
/// [`ActorRestartStrategy::ExponentialBackOff`]: supervisor/enum.ActorRestartStrategy.html#variant.ExponentialBackOff
pub fn with_restart_strategy(mut self, restart_strategy: ActorRestartStrategy) -> Self {
trace!(
"Supervisor({}): Setting actor restart strategy: {:?}",
self.id(),
restart_strategy
);
self.restart_strategy = restart_strategy;
self
}

/// Sets the callbacks that will get called at this supervisor's
/// different lifecycle events.
///
Expand Down Expand Up @@ -670,6 +741,7 @@ impl Supervisor {
// TODO: stop or kill?
self.kill(range.clone()).await;

let restart_strategy = self.restart_strategy.clone();
let supervisor_id = &self.id().clone();
let parent = Parent::supervisor(self.as_ref());
let mut reset = FuturesOrdered::new();
Expand All @@ -692,6 +764,12 @@ impl Supervisor {
supervised.elem().clone().with_id(BastionId::new()),
);

let restart_count = match self.launched.get(&id.clone()) {
Some((_, _, count)) => *count,
None => 0,
};

let restart_strategy_inner = restart_strategy.clone();
reset.push(async move {
debug!(
"Supervisor({}): Resetting Supervised({}) (killed={}) to Supervised({}).",
Expand All @@ -700,6 +778,19 @@ impl Supervisor {
killed,
bcast.id()
);

match restart_strategy_inner {
ActorRestartStrategy::ExponentialBackOff {
timeout,
multiplier,
} => {
let start_in =
timeout.as_secs() + (timeout.as_secs() * multiplier * restart_count);
Delay::new(Duration::from_secs(start_in)).await;
}
_ => {}
};

// FIXME: panics?
let supervised = supervised.reset(bcast).await.unwrap();
// FIXME: might not keep order
Expand Down Expand Up @@ -733,9 +824,13 @@ impl Supervisor {
supervised.id()
);
let id = supervised.id().clone();
let restart_count = match self.launched.get(&id.clone()) {
Some((_, _, count)) => *count,
None => 0,
};
let launched = supervised.launch();
self.launched
.insert(id.clone(), (self.order.len(), launched));
.insert(id.clone(), (self.order.len(), launched, restart_count));
self.order.push(id);
}
}
Expand All @@ -756,7 +851,7 @@ impl Supervisor {
// FIXME: panics?
for id in self.order.get(range.clone()).unwrap() {
// TODO: Err if None?
if let Some((_, launched)) = self.launched.remove(&id) {
if let Some((_, launched, _)) = self.launched.remove(&id) {
// TODO: add a "stopped" list and poll from it instead of awaiting
supervised.push(launched);
}
Expand Down Expand Up @@ -797,7 +892,7 @@ impl Supervisor {
// FIXME: panics?
for id in self.order.get(range.clone()).unwrap() {
// TODO: Err if None?
if let Some((_, launched)) = self.launched.remove(&id) {
if let Some((_, launched, _)) = self.launched.remove(&id) {
// TODO: add a "stopped" list and poll from it instead of awaiting
supervised.push(launched);
}
Expand Down Expand Up @@ -838,7 +933,7 @@ impl Supervisor {
);
match self.strategy {
SupervisionStrategy::OneForOne => {
let (order, launched) = self.launched.remove(&id).ok_or(())?;
let (order, launched, _) = self.launched.remove(&id).ok_or(())?;
// TODO: add a "waiting" list and poll from it instead of awaiting
// FIXME: panics?
let supervised = launched.await.unwrap();
Expand Down Expand Up @@ -873,8 +968,13 @@ impl Supervisor {
self.id(),
supervised.id()
);
let restart_count = match self.launched.get(&id.clone()) {
Some((_, _, count)) => count + 1,
None => 0,
};
let launched = supervised.launch();
self.launched.insert(id.clone(), (order, launched));
self.launched
.insert(id.clone(), (order, launched, restart_count));
self.order[order] = id;
}
SupervisionStrategy::OneForAll => {
Expand All @@ -885,7 +985,7 @@ impl Supervisor {
self.killed.shrink_to_fit();
}
SupervisionStrategy::RestForOne => {
let (start, _) = self.launched.get(&id).ok_or(())?;
let (start, _, _) = self.launched.get(&id).ok_or(())?;
let start = *start;

self.restart(start..).await;
Expand Down Expand Up @@ -960,7 +1060,7 @@ impl Supervisor {
let id = supervised.id().clone();
let launched = supervised.launch();
self.launched
.insert(id.clone(), (self.order.len(), launched));
.insert(id.clone(), (self.order.len(), launched, 0));
self.order.push(id);
}
// FIXME
Expand Down Expand Up @@ -995,7 +1095,7 @@ impl Supervisor {
..
} => {
// FIXME: Err if None?
if let Some((_, launched)) = self.launched.remove(&id) {
if let Some((_, launched, _)) = self.launched.remove(&id) {
debug!("Supervisor({}): Supervised({}) stopped.", self.id(), id);
// TODO: add a "waiting" list an poll from it instead of awaiting
// FIXME: panics?
Expand Down Expand Up @@ -1579,6 +1679,12 @@ impl Default for SupervisionStrategy {
}
}

impl Default for ActorRestartStrategy {
fn default() -> Self {
ActorRestartStrategy::Instantly
}
}

impl PartialEq for SupervisorRef {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
Expand Down

0 comments on commit d6dcdf0

Please sign in to comment.