Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented linear/exponential back off recovery strategy #156

Merged
merged 3 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions bastion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ unstable = ["bastion-executor/unstable"]
[dependencies]
bastion-executor = { version = "= 0.3.2", path = "../bastion-executor" }
futures = { version = "0.3", features = ["async-await"] }
futures-timer = "2.0.2"
fxhash = "0.2"
lazy_static = "1.4"
lightproc = { version = "= 0.3.3", path = "../lightproc" }
Expand Down
2 changes: 1 addition & 1 deletion bastion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ pub mod prelude {
pub use crate::message::{Answer, AnswerSender, Message, Msg};
pub use crate::msg;
pub use crate::path::{BastionPath, BastionPathElement};
pub use crate::supervisor::{SupervisionStrategy, Supervisor, SupervisorRef};
pub use crate::supervisor::{ActorRestartStrategy, SupervisionStrategy, Supervisor, SupervisorRef};
pub use crate::{blocking, children, run, spawn, supervisor};
}
126 changes: 116 additions & 10 deletions bastion/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use bastion_executor::pool;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
use futures::{pending, poll};
use futures_timer::Delay;
use fxhash::FxHashMap;
use lightproc::prelude::*;
use log::Level;
use std::cmp::{Eq, PartialEq};
use std::ops::RangeFrom;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;

#[derive(Debug)]
/// A supervisor that can supervise both [`Children`] and other
Expand Down Expand Up @@ -64,8 +66,9 @@ pub struct Supervisor {
// The order in which children and supervisors were added.
// It is only updated when at least one of those is resat.
order: Vec<BastionId>,
// The currently launched supervised children and supervisors.
launched: FxHashMap<BastionId, (usize, RecoverableHandle<Supervised>)>,
// The currently launched supervised children and supervisors. The
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
// last value stores amount of restarts of the certain actor.
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
launched: FxHashMap<BastionId, (usize, RecoverableHandle<Supervised>, u64)>,
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
// Supervised children and supervisors that are stopped.
// This is used when resetting or recovering when the
// supervision strategy is not "one-for-one".
Expand All @@ -74,6 +77,7 @@ pub struct Supervisor {
// This is used when resetting only.
killed: FxHashMap<BastionId, Supervised>,
strategy: SupervisionStrategy,
restart_strategy: ActorRestartStrategy,
// The callbacks called at the supervisor's different
// lifecycle events.
callbacks: Callbacks,
Expand Down Expand Up @@ -132,6 +136,25 @@ enum Supervised {
Children(Children),
}

#[derive(Debug, Clone)]
/// The strategy for restating an actor as far as it
/// returned an failure.
///
/// The default strategy is `Instantly`.
pub enum ActorRestartStrategy {
/// Restart an actor as soon as possible, since the moment
/// the actor finished with a failure.
Instantly,
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
/// Restart an actor after with the timeout. Each next timeout
/// is increasing exponentially.
ExponentialBackOff {
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
/// An initial delay before the restarting an actor.
timeout: Duration,
/// Defines a multiplier how fast the timeout will be increasing.
multiplier: u64,
},
}

impl Supervisor {
pub(crate) fn new(bcast: Broadcast) -> Self {
debug!("Supervisor({}): Initializing.", bcast.id());
Expand All @@ -140,6 +163,7 @@ impl Supervisor {
let stopped = FxHashMap::default();
let killed = FxHashMap::default();
let strategy = SupervisionStrategy::default();
let restart_strategy = ActorRestartStrategy::default();
let callbacks = Callbacks::new();
let is_system_supervisor = false;
let pre_start_msgs = Vec::new();
Expand All @@ -152,6 +176,7 @@ impl Supervisor {
stopped,
killed,
strategy,
restart_strategy,
callbacks,
is_system_supervisor,
pre_start_msgs,
Expand Down Expand Up @@ -621,6 +646,52 @@ impl Supervisor {
self
}

/// Sets the actor restart strategy the supervisor should use
/// of its supervised children groups or supervisors dies to
/// restore in the correct state.
///
/// The default strategy is
/// [`ActorRestartStrategy::Instanly`].
///
/// # Arguments
///
/// * `restart_strategy` - The strategy to use:
/// - [`ActorRestartStrategy::Instantly`] would restart the
/// failed actor as soon as possible.
/// - [`ActorRestartStrategy::ExponentialBackOff`] would restart the
/// failed actor with the delay (in milliseconds), multiplied on the
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
/// some coefficient.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// some coefficient.
/// given coefficient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # fn main() {
/// # Bastion::init();
/// #
/// Bastion::supervisor(|sp| {
/// sp.with_restart_strategy(ActorRestartStrategy::Instantly)
/// }).expect("Couldn't create the supervisor");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// ```
///
/// [`ActorRestartStrategy::Instantly`]: supervisor/enum.ActorRestartStrategy.html#variant.Instantly
/// [`ActorRestartStrategy::ExponentialBackOff`]: supervisor/enum.ActorRestartStrategy.html#variant.ExponentialBackOff
pub fn with_restart_strategy(mut self, restart_strategy: ActorRestartStrategy) -> Self {
trace!(
"Supervisor({}): Setting actor restart strategy: {:?}",
self.id(),
restart_strategy
);
self.restart_strategy = restart_strategy;
self
}

/// Sets the callbacks that will get called at this supervisor's
/// different lifecycle events.
///
Expand Down Expand Up @@ -670,6 +741,7 @@ impl Supervisor {
// TODO: stop or kill?
self.kill(range.clone()).await;

let restart_strategy = self.restart_strategy.clone();
let supervisor_id = &self.id().clone();
let parent = Parent::supervisor(self.as_ref());
let mut reset = FuturesOrdered::new();
Expand All @@ -692,6 +764,12 @@ impl Supervisor {
supervised.elem().clone().with_id(BastionId::new()),
);

let restart_count = match self.launched.get(&id.clone()) {
Some((_, _, count)) => *count,
None => 0,
};

let restart_strategy_inner = restart_strategy.clone();
reset.push(async move {
debug!(
"Supervisor({}): Resetting Supervised({}) (killed={}) to Supervised({}).",
Expand All @@ -700,6 +778,19 @@ impl Supervisor {
killed,
bcast.id()
);

match restart_strategy_inner {
ActorRestartStrategy::ExponentialBackOff {
timeout,
multiplier,
} => {
let start_in =
timeout.as_secs() + (timeout.as_secs() * multiplier * restart_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means that if we someday want to declare a LinearBackOff, It can be build by declaring an ExponentialBackOff with the expected timeout and a multiplier of 0. Pretty cool ! 🎉

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhapse it make a sense to mention about it in the docs as well, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would! :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add linear backoff too? If we can do that inside this PR that would be nice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add the LinearBackOff struct into the ActorRestartStrategy enum with the similar signature (that will contain only the timeout). Also, I'd like to say that the if we pass the multiplayer: 0 and any timeout value for the exponential back off, the supervisor will try to restart the failed actor at regular intervals. It's just a corner use case for this type of exponential back off :)

Copy link
Member Author

@Relrin Relrin Jan 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing that I though about is to add the limit/attempts option to the ExponentialBackOff struct that has the Option<u32> type. It gives a way to stop recovering failed actor if we actually can't do.
However, I'm not sure, is it supported this feature right now (to stop recovering after N attempts)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in other frameworks and langs it is working like that.
https://github.com/trendmicro/backoff-python#backoffon_exception
It would be nice to have max_retries. Which enables us to resolve #105 :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add the LinearBackOff struct into the ActorRestartStrategy enum with the similar signature (that will contain only the timeout).

Yes please, that will be clear what is the difference. It is a corner case but let's be explicit rather than implicit. :)

Copy link
Member Author

@Relrin Relrin Jan 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vertexclique I'm thinking about the adding the max_retries in code. Does it make a sense to append the max_restarts for the Supervisor/Children types instead of re-ogranizing it in the struct? (like in the code below)

pub struct RestartStrategy {
    max_restarts: Option<usize>
    strategy: ActorRestartStrategy
}

Any suggestions, better names for those things are welcome :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Relrin Actually what you are suggesting is better since they are logically separate. Naming also looks good max_restarts. If you want to do that in this PR just ping us. We can close two issues at the same time. Or we can implement this in other PR. This is your call…

Delay::new(Duration::from_secs(start_in)).await;
}
_ => {}
};

// FIXME: panics?
let supervised = supervised.reset(bcast).await.unwrap();
// FIXME: might not keep order
Expand Down Expand Up @@ -733,9 +824,13 @@ impl Supervisor {
supervised.id()
);
let id = supervised.id().clone();
let restart_count = match self.launched.get(&id.clone()) {
Some((_, _, count)) => *count,
None => 0,
};
let launched = supervised.launch();
self.launched
.insert(id.clone(), (self.order.len(), launched));
.insert(id.clone(), (self.order.len(), launched, restart_count));
self.order.push(id);
}
}
Expand All @@ -756,7 +851,7 @@ impl Supervisor {
// FIXME: panics?
for id in self.order.get(range.clone()).unwrap() {
// TODO: Err if None?
if let Some((_, launched)) = self.launched.remove(&id) {
if let Some((_, launched, _)) = self.launched.remove(&id) {
// TODO: add a "stopped" list and poll from it instead of awaiting
supervised.push(launched);
}
Expand Down Expand Up @@ -797,7 +892,7 @@ impl Supervisor {
// FIXME: panics?
for id in self.order.get(range.clone()).unwrap() {
// TODO: Err if None?
if let Some((_, launched)) = self.launched.remove(&id) {
if let Some((_, launched, _)) = self.launched.remove(&id) {
// TODO: add a "stopped" list and poll from it instead of awaiting
supervised.push(launched);
}
Expand Down Expand Up @@ -838,7 +933,7 @@ impl Supervisor {
);
match self.strategy {
SupervisionStrategy::OneForOne => {
let (order, launched) = self.launched.remove(&id).ok_or(())?;
let (order, launched, _) = self.launched.remove(&id).ok_or(())?;
// TODO: add a "waiting" list and poll from it instead of awaiting
// FIXME: panics?
let supervised = launched.await.unwrap();
Expand Down Expand Up @@ -873,8 +968,13 @@ impl Supervisor {
self.id(),
supervised.id()
);
let restart_count = match self.launched.get(&id.clone()) {
Some((_, _, count)) => count + 1,
None => 0,
};
let launched = supervised.launch();
self.launched.insert(id.clone(), (order, launched));
self.launched
.insert(id.clone(), (order, launched, restart_count));
self.order[order] = id;
}
SupervisionStrategy::OneForAll => {
Expand All @@ -885,7 +985,7 @@ impl Supervisor {
self.killed.shrink_to_fit();
}
SupervisionStrategy::RestForOne => {
let (start, _) = self.launched.get(&id).ok_or(())?;
let (start, _, _) = self.launched.get(&id).ok_or(())?;
let start = *start;

self.restart(start..).await;
Expand Down Expand Up @@ -960,7 +1060,7 @@ impl Supervisor {
let id = supervised.id().clone();
let launched = supervised.launch();
self.launched
.insert(id.clone(), (self.order.len(), launched));
.insert(id.clone(), (self.order.len(), launched, 0));
self.order.push(id);
}
// FIXME
Expand Down Expand Up @@ -995,7 +1095,7 @@ impl Supervisor {
..
} => {
// FIXME: Err if None?
if let Some((_, launched)) = self.launched.remove(&id) {
if let Some((_, launched, _)) = self.launched.remove(&id) {
debug!("Supervisor({}): Supervised({}) stopped.", self.id(), id);
// TODO: add a "waiting" list an poll from it instead of awaiting
// FIXME: panics?
Expand Down Expand Up @@ -1579,6 +1679,12 @@ impl Default for SupervisionStrategy {
}
}

impl Default for ActorRestartStrategy {
fn default() -> Self {
ActorRestartStrategy::Instantly
}
}

impl PartialEq for SupervisorRef {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
Expand Down