Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented linear/exponential back off recovery strategy #156

Merged
merged 3 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions bastion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ unstable = ["bastion-executor/unstable"]
[dependencies]
bastion-executor = { version = "= 0.3.2", path = "../bastion-executor" }
futures = { version = "0.3", features = ["async-await"] }
futures-timer = "2.0.2"
fxhash = "0.2"
lazy_static = "1.4"
lightproc = { version = "= 0.3.3", path = "../lightproc" }
Expand Down
4 changes: 3 additions & 1 deletion bastion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub mod prelude {
pub use crate::message::{Answer, AnswerSender, Message, Msg};
pub use crate::msg;
pub use crate::path::{BastionPath, BastionPathElement};
pub use crate::supervisor::{SupervisionStrategy, Supervisor, SupervisorRef};
pub use crate::supervisor::{
ActorRestartStrategy, SupervisionStrategy, Supervisor, SupervisorRef,
};
pub use crate::{blocking, children, run, spawn, supervisor};
}
140 changes: 131 additions & 9 deletions bastion/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use bastion_executor::pool;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
use futures::{pending, poll};
use futures_timer::Delay;
use fxhash::FxHashMap;
use lightproc::prelude::*;
use log::Level;
use std::cmp::{Eq, PartialEq};
use std::ops::RangeFrom;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;

#[derive(Debug)]
/// A supervisor that can supervise both [`Children`] and other
Expand Down Expand Up @@ -65,7 +67,8 @@ pub struct Supervisor {
// It is only updated when at least one of those is resat.
order: Vec<BastionId>,
// The currently launched supervised children and supervisors.
launched: FxHashMap<BastionId, (usize, RecoverableHandle<Supervised>)>,
// The last value is the amount of times a given actor has restarted.
launched: FxHashMap<BastionId, (usize, RecoverableHandle<Supervised>, usize)>,
// Supervised children and supervisors that are stopped.
// This is used when resetting or recovering when the
// supervision strategy is not "one-for-one".
Expand All @@ -74,6 +77,7 @@ pub struct Supervisor {
// This is used when resetting only.
killed: FxHashMap<BastionId, Supervised>,
strategy: SupervisionStrategy,
restart_strategy: ActorRestartStrategy,
// The callbacks called at the supervisor's different
// lifecycle events.
callbacks: Callbacks,
Expand Down Expand Up @@ -132,6 +136,34 @@ enum Supervised {
Children(Children),
}

#[derive(Debug, Clone)]
/// The strategy for restating an actor as far as it
/// returned an failure.
///
/// The default strategy is `Immediate`.
pub enum ActorRestartStrategy {
/// Restart an actor as soon as possible, since the moment
/// the actor finished with a failure.
Immediate,
/// Restart an actor after with the timeout. Each next restart
/// is increasing on the given duration.
LinearBackOff {
/// An initial delay before the restarting an actor.
timeout: Duration,
},
/// Restart an actor after with the timeout. Each next timeout
/// is increasing exponentially.
/// When passed a multiplier that equals to 1, the strategy works as the
/// linear back off strategy. Passing the multiplier that equals to 0 leads
/// to constant restart delays which is equal to the given timeout.
ExponentialBackOff {
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
/// An initial delay before the restarting an actor.
timeout: Duration,
/// Defines a multiplier how fast the timeout will be increasing.
multiplier: u64,
},
}

impl Supervisor {
pub(crate) fn new(bcast: Broadcast) -> Self {
debug!("Supervisor({}): Initializing.", bcast.id());
Expand All @@ -140,6 +172,7 @@ impl Supervisor {
let stopped = FxHashMap::default();
let killed = FxHashMap::default();
let strategy = SupervisionStrategy::default();
let restart_strategy = ActorRestartStrategy::default();
let callbacks = Callbacks::new();
let is_system_supervisor = false;
let pre_start_msgs = Vec::new();
Expand All @@ -152,6 +185,7 @@ impl Supervisor {
stopped,
killed,
strategy,
restart_strategy,
callbacks,
is_system_supervisor,
pre_start_msgs,
Expand Down Expand Up @@ -621,6 +655,54 @@ impl Supervisor {
self
}

/// Sets the actor restart strategy the supervisor should use
/// of its supervised children groups or supervisors dies to
/// restore in the correct state.
///
/// The default strategy is
/// [`ActorRestartStrategy::Instanly`].
///
/// # Arguments
///
/// * `restart_strategy` - The strategy to use:
/// - [`ActorRestartStrategy::Instantly`] would restart the
/// failed actor as soon as possible.
/// - [`ActorRestartStrategy::LinearBackOff`] would restart the
/// failed actor with the delay increasing linearly.
/// - [`ActorRestartStrategy::ExponentialBackOff`] would restart the
/// failed actor with the delay, multiplied by given coefficient.
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # fn main() {
/// # Bastion::init();
/// #
/// Bastion::supervisor(|sp| {
/// sp.with_restart_strategy(ActorRestartStrategy::Immediate)
/// }).expect("Couldn't create the supervisor");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// ```
///
/// [`ActorRestartStrategy::Instantly`]: supervisor/enum.ActorRestartStrategy.html#variant.Instantly
/// [`ActorRestartStrategy::LinearBackOff`]: supervisor/enum.ActorRestartStrategy.html#variant.LinearBackOff
/// [`ActorRestartStrategy::ExponentialBackOff`]: supervisor/enum.ActorRestartStrategy.html#variant.ExponentialBackOff
pub fn with_restart_strategy(mut self, restart_strategy: ActorRestartStrategy) -> Self {
trace!(
"Supervisor({}): Setting actor restart strategy: {:?}",
self.id(),
restart_strategy
);
self.restart_strategy = restart_strategy;
self
}

/// Sets the callbacks that will get called at this supervisor's
/// different lifecycle events.
///
Expand Down Expand Up @@ -670,6 +752,7 @@ impl Supervisor {
// TODO: stop or kill?
self.kill(range.clone()).await;

let restart_strategy = self.restart_strategy.clone();
let supervisor_id = &self.id().clone();
let parent = Parent::supervisor(self.as_ref());
let mut reset = FuturesOrdered::new();
Expand All @@ -692,6 +775,12 @@ impl Supervisor {
supervised.elem().clone().with_id(BastionId::new()),
);

let restart_count = match self.launched.get(&id.clone()) {
Some((_, _, count)) => *count,
None => 0,
};

let restart_strategy_inner = restart_strategy.clone();
reset.push(async move {
debug!(
"Supervisor({}): Resetting Supervised({}) (killed={}) to Supervised({}).",
Expand All @@ -700,6 +789,24 @@ impl Supervisor {
killed,
bcast.id()
);

match restart_strategy_inner {
ActorRestartStrategy::LinearBackOff { timeout } => {
let start_in =
timeout.as_secs() + (timeout.as_secs() * restart_count as u64);
Delay::new(Duration::from_secs(start_in)).await;
}
ActorRestartStrategy::ExponentialBackOff {
timeout,
multiplier,
} => {
let start_in = timeout.as_secs()
+ (timeout.as_secs() * multiplier * restart_count as u64);
Delay::new(Duration::from_secs(start_in)).await;
}
_ => {}
};

// FIXME: panics?
let supervised = supervised.reset(bcast).await.unwrap();
// FIXME: might not keep order
Expand Down Expand Up @@ -733,9 +840,13 @@ impl Supervisor {
supervised.id()
);
let id = supervised.id().clone();
let restart_count = match self.launched.get(&id.clone()) {
Some((_, _, count)) => *count,
None => 0,
};
let launched = supervised.launch();
self.launched
.insert(id.clone(), (self.order.len(), launched));
.insert(id.clone(), (self.order.len(), launched, restart_count));
self.order.push(id);
}
}
Expand All @@ -756,7 +867,7 @@ impl Supervisor {
// FIXME: panics?
for id in self.order.get(range.clone()).unwrap() {
// TODO: Err if None?
if let Some((_, launched)) = self.launched.remove(&id) {
if let Some((_, launched, _)) = self.launched.remove(&id) {
// TODO: add a "stopped" list and poll from it instead of awaiting
supervised.push(launched);
}
Expand Down Expand Up @@ -797,7 +908,7 @@ impl Supervisor {
// FIXME: panics?
for id in self.order.get(range.clone()).unwrap() {
// TODO: Err if None?
if let Some((_, launched)) = self.launched.remove(&id) {
if let Some((_, launched, _)) = self.launched.remove(&id) {
// TODO: add a "stopped" list and poll from it instead of awaiting
supervised.push(launched);
}
Expand Down Expand Up @@ -838,7 +949,7 @@ impl Supervisor {
);
match self.strategy {
SupervisionStrategy::OneForOne => {
let (order, launched) = self.launched.remove(&id).ok_or(())?;
let (order, launched, _) = self.launched.remove(&id).ok_or(())?;
// TODO: add a "waiting" list and poll from it instead of awaiting
// FIXME: panics?
let supervised = launched.await.unwrap();
Expand Down Expand Up @@ -873,8 +984,13 @@ impl Supervisor {
self.id(),
supervised.id()
);
let restart_count = match self.launched.get(&id.clone()) {
Some((_, _, count)) => count + 1,
None => 0,
};
let launched = supervised.launch();
self.launched.insert(id.clone(), (order, launched));
self.launched
.insert(id.clone(), (order, launched, restart_count));
self.order[order] = id;
}
SupervisionStrategy::OneForAll => {
Expand All @@ -885,7 +1001,7 @@ impl Supervisor {
self.killed.shrink_to_fit();
}
SupervisionStrategy::RestForOne => {
let (start, _) = self.launched.get(&id).ok_or(())?;
let (start, _, _) = self.launched.get(&id).ok_or(())?;
let start = *start;

self.restart(start..).await;
Expand Down Expand Up @@ -960,7 +1076,7 @@ impl Supervisor {
let id = supervised.id().clone();
let launched = supervised.launch();
self.launched
.insert(id.clone(), (self.order.len(), launched));
.insert(id.clone(), (self.order.len(), launched, 0));
self.order.push(id);
}
// FIXME
Expand Down Expand Up @@ -995,7 +1111,7 @@ impl Supervisor {
..
} => {
// FIXME: Err if None?
if let Some((_, launched)) = self.launched.remove(&id) {
if let Some((_, launched, _)) = self.launched.remove(&id) {
debug!("Supervisor({}): Supervised({}) stopped.", self.id(), id);
// TODO: add a "waiting" list an poll from it instead of awaiting
// FIXME: panics?
Expand Down Expand Up @@ -1579,6 +1695,12 @@ impl Default for SupervisionStrategy {
}
}

impl Default for ActorRestartStrategy {
fn default() -> Self {
ActorRestartStrategy::Immediate
}
}

impl PartialEq for SupervisorRef {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
Expand Down