diff --git a/bastion/examples/callbacks.rs b/bastion/examples/callbacks.rs index 3ae3500d..ac0ff02d 100644 --- a/bastion/examples/callbacks.rs +++ b/bastion/examples/callbacks.rs @@ -3,116 +3,173 @@ use bastion::prelude::*; fn main() { Bastion::init(); - Bastion::supervisor(|supervisor| { - let children_ref = supervisor.children_ref(|children| { - let callbacks = Callbacks::new() - .with_before_start(|| { - // This children group is first in order, so its `before_start` will - // get called before the other one's... - println!("1: before_start"); - }) - .with_before_restart(|| { - // This children group is first in order and the other children group - // stopped (thus, its `after_stop` has already been called and - // `before_restart` won't be)... - println!("9: before_restart"); - }) - .with_after_restart(|| { - // This children group is first in order, so its `after_restart` will - // get called before the other one's... - println!("10: after_restart"); - - // The other children group has not been restarted yet... - println!("11: stop"); - // This will stop both children group... - Bastion::stop(); - }) - .with_after_stop(|| { - // This will get called after the other children group has been restarted - // (because the supervisor will then get a message from the system saying - // that it needs to stop running)... - println!("13: after_stop"); - }); - - children - .with_exec(|ctx| { - async move { - // This might not get called before the other child's `stop` - // (this is up to the executor) and/or `recv`... - println!("3|4|5: recv"); - // This will await for the other child to be stopped - // and to call `tell`... - ctx.recv().await.expect("Couldn't receive the message."); - - // The other child has stopped and sent a message to stop - // this child from awaiting... - println!("8: err"); - // This will make both children group get restarted... - Err(()) - } - }) - .with_callbacks(callbacks) - }); - - supervisor - .with_strategy(SupervisionStrategy::OneForAll) - .children(|children| { - let callbacks = Callbacks::new() - .with_before_start(|| { - // This children group is second in order, so its `before_start` will - // get called after the other one's... - println!("2: before_start"); - }) - .with_before_restart(|| { - // This won't happen because this child never faults... - unreachable!(); - }) - .with_after_restart(|| { - // This children group is second in order, so its `after_restart` will - // get called after the other one's... - println!("12: after_restart"); - }) - .with_after_stop(move || { - // This will get called both after this child's `recv` (see there why) - // and after the other children group's `stop` (which stops the system)... - println!("6|14: after_stop"); - - // Nothing will get printed in between because the other child's `exec` - // future is pending... - println!("7|15: tell"); - // This will "tell" a message to the other child, making it finish - // `await`ing on `ctx.recv()` and return an error... - children_ref.elems()[0].tell(()).ok(); - }); - - children - .with_exec(|ctx| { - async move { - // This might not get called before the other child's `recv` - // (this is up to the executor)... - println!("3|4|5: stop"); - // This will stop this children gruop once the future becomes - // pending... - ctx.parent() - .stop() - .expect("Couldn't stop the children group."); - - // This might not get called before the other child's `recv` - // (this is up to the executor)... - println!("4|5: recv"); - // This will make the future pending and allow it to stop (because - // `ctx.current().stop()` was called earlier)... - ctx.recv().await.expect("Couldn't receive the message."); - - // Note that this will never get there... - Ok(()) - } - }) - .with_callbacks(callbacks) - }) - }) - .expect("Couldn't create the supervisor."); + Bastion::supervisor(sp).expect("Couldn't create the supervisor."); Bastion::start(); Bastion::block_until_stopped(); } + +fn sp(supervisor: Supervisor) -> Supervisor { + let callbacks = Callbacks::new() + .with_before_start(|| { + // This will get called before others `before_start`s of this example + // because the others all are defined for elements supervised by + // this supervisor... + println!("(sp ) before_start"); + }) + .with_after_stop(|| { + // This will get called after others `after_stop`s of this example + // because the others all are defined for elements supervised by + // this supervisor... + println!("(sp ) after_stop"); + }); + + + let children_ref = supervisor.children_ref(sp_ch); + + supervisor + .supervisor(|sp| sp_sp(sp, children_ref)) + .with_strategy(SupervisionStrategy::OneForAll) + .with_callbacks(callbacks) +} + +fn sp_ch(children: Children) -> Children { + let callbacks = Callbacks::new() + .with_before_start(|| { + // This will be the first `before_start` callback to get called after + // "sp"'s because this is its first supervised element in order... + println!("(sp.ch ) before_start"); + }) + .with_before_restart(|| { + // This will be the first `before_restart` callback to get called + // after "sp" restarts its supervised elements because this is its + // first supervised element in order... + println!("(sp.ch ) before_restart"); + }) + .with_after_restart(|| { + // This might get called before, after or in-between "sp.sp.ch"'s + // `before_start` and "sp.sp"'s `after_restart`... + println!("(sp.ch ) after_restart"); + + // This might get called before, after or in-between "sp.sp.ch"'s + // `before_start` and "sp.sp"'s `after_restart`... + println!("(sp.ch ) stop"); + // This will stop the whole system (once "sp" finished restarting its + // supervised elements and both "sp.ch" and "sp.sp.ch"'s futures + // become pending)... + Bastion::stop(); + }) + .with_after_stop(|| { + // This will get called after `recv` but might get called before, + // after or in-between "sp.sp"'s `after_stop` and "sp.sp.ch"'s + // `after_stop`... + println!("(sp.ch ) after_stop"); + }); + + children + .with_exec(|ctx| { + async move { + // This will get called two times: + // - a first time after `before_start` but before, after or + // in-between "sp.sp"'s `before_start and "sp.sp.ch"'s `before_start`, + // `stop`, `recv` and `after_stop`... + // - a second time after `after_restart` but before, after or + // in-between "sp.sp"'s `before_restart`, `after_restart` and + // `after_stop` and "sp.sp.ch"'s `before_start`, `stop`, `recv` and + // `after_stop`... + println!("(sp.ch ) recv"); + // This will wait for the message sent by "sp.sp.ch"'s `tell` (when + // its `after_stop` gets called)... + ctx.recv().await.expect("Couldn't receive the message."); + + // Once the message has been received, the future will return an + // error to make "sp" restart "sp.ch" and "sp.sp" (because its + // supervision strategy is "one-for-all")... + println!("(sp.ch ) err"); + Err(()) + } + }) + .with_callbacks(callbacks) +} + +fn sp_sp(supervisor: Supervisor, children_ref: ChildrenRef) -> Supervisor { + let callbacks = Callbacks::new() + .with_before_start(|| { + // This will get called after "sp.ch"'s `before_start` and might get + // called before or after its `recv`... + println!("(sp.sp ) before_start"); + }) + .with_before_restart(|| { + // This will get called after "sp.ch"'s `before_restart`... + println!("(sp.sp ) before_restart"); + }) + .with_after_restart(|| { + // This will get called after "sp.sp.ch"'s `before_start` and might + // get called before, after or in-between its `recv` and "sp.ch"'s + // `recv`... + println!("(sp.sp ) after_restart"); + }) + .with_after_stop(|| { + // This will get called after "sp.sp.ch"'s `after_stop` but might get + // called before or after "sp.ch"'s `recv`... + println!("(sp.sp ) after_stop"); + }); + + supervisor + .children(|sp| sp_sp_ch(sp, children_ref)) + .with_callbacks(callbacks) +} + +fn sp_sp_ch(children: Children, children_ref: ChildrenRef) -> Children { + let callbacks = Callbacks::new() + .with_before_start(|| { + // This will get called two times: + // - a first time after "sp.sp"'s `before_start` and before or after + // "sp.ch"'s `recv` + // - a second time after "sp.sp"'s `before_restart` and before, after + // or in-between "sp.ch"'s `after_restart` and `stop`... + println!("(sp.sp.ch) before_start"); + }) + // This won't get called because this children group only stop itself + // (thus, `after_stop` would have already been called)... + .with_before_restart(|| unreachable!()) + // This won't get called because this children group only stops itself + // (thus, `before_start` will get called instead)... + .with_after_restart(|| unreachable!()) + .with_after_stop(move || { + // This will get called two times, both after `recv` but before or + // after "sp.ch"'s `recv`... + println!("(sp.sp.ch) after_stop"); + + // This might get called before or after "sp.ch"'s `recv`... + println!("(sp.sp.ch) tell"); + // This will send a message to "sp.ch", making it fault and making + // "sp" restart "sp.ch" and "sp.sp" (because its supervision strategy + // is "one-for-one")... + children_ref.elems()[0].tell(()).ok(); + }); + + children + .with_exec(|ctx| { + async move { + // This will get called two times, both after `before_start` and + // before or after "sp.ch"'s `recv`... + println!("(sp.sp.ch) stop"); + // This will stop this children group once this future becomes + // pending... + ctx.parent() + .stop() + .expect("Couldn't stop the children group."); + + // This might get called before or after "sp.ch"'s `recv`... + println!("(sp.sp.ch) recv"); + // This will make this future pending, thus allowing the children + // group to stop... + ctx.recv().await.expect("Couldn't receive the message."); + + // Note that this future will never get there... + Ok(()) + } + }) + .with_callbacks(callbacks) +} diff --git a/bastion/src/supervisor.rs b/bastion/src/supervisor.rs index f0dc1da9..2983606e 100644 --- a/bastion/src/supervisor.rs +++ b/bastion/src/supervisor.rs @@ -58,8 +58,11 @@ pub struct Supervisor { // supervision strategy is not "one-for-one". stopped: FxHashMap, // TODO: doc + // TODO: killed should be empty before calling `kill` and after restarting killed: FxHashMap, strategy: SupervisionStrategy, + // TODO: doc + callbacks: Callbacks, // Messages that were received before the supervisor was // started. Those will be "replayed" once a start message // is received. @@ -117,6 +120,7 @@ impl Supervisor { let stopped = FxHashMap::default(); let killed = FxHashMap::default(); let strategy = SupervisionStrategy::default(); + let callbacks = Callbacks::new(); let pre_start_msgs = Vec::new(); let started = false; @@ -127,6 +131,7 @@ impl Supervisor { stopped, killed, strategy, + callbacks, pre_start_msgs, started, } @@ -138,53 +143,63 @@ impl Supervisor { } pub(crate) async fn reset(&mut self, bcast: Broadcast) { - let stopped = self.stopped.keys().cloned().collect::>(); // TODO: stop or kill? - let supervised = self.kill(0..).await; + let killed = self.kill(0..).await; self.bcast = bcast; self.pre_start_msgs.clear(); self.pre_start_msgs.shrink_to_fit(); + let parent = Parent::supervisor(self.as_ref()); let mut reset = FuturesOrdered::new(); - for supervised in supervised { - if !stopped.contains(supervised.id()) { + for id in self.order.drain(..) { + let supervised = if let Some(supervised) = self.stopped.remove(&id) { + supervised + } else if let Some(supervised) = self.killed.remove(&id) { + supervised + } else { + // FIXME + unimplemented!(); + }; + + let killed = killed.contains(supervised.id()); + if killed { supervised.callbacks().call_before_restart(); } - let parent = Parent::supervisor(self.as_ref()); - let bcast = Broadcast::new(parent); - - reset.push(async { + let bcast = Broadcast::new(parent.clone()); + reset.push(async move { // FIXME: panics? - supervised.reset(bcast).await.unwrap() + let supervised = supervised.reset(bcast).await.unwrap(); + // FIXME: might not keep order + if killed { + supervised.callbacks().call_after_restart(); + } else { + supervised.callbacks().call_before_start(); + } + + supervised }) } while let Some(supervised) = reset.next().await { - let id = supervised.id().clone(); - if stopped.contains(supervised.id()) { - supervised.callbacks().call_before_start(); - } else { - supervised.callbacks().call_after_restart(); - } - self.bcast.register(supervised.bcast()); + // TODO: remove and set the supervised element as started instead? if self.started { let msg = BastionMessage::start(); - self.bcast.send_child(&id, msg); + self.bcast.send_child(supervised.id(), msg); } + let id = supervised.id().clone(); let launched = supervised.launch(); self.launched .insert(id.clone(), (self.order.len(), launched)); self.order.push(id); } - if self.started { - let msg = BastionMessage::start(); - self.bcast.send_children(msg); - } + // TODO: should be empty + self.stopped.shrink_to_fit(); + self.killed.shrink_to_fit(); } pub(crate) fn id(&self) -> &BastionId { @@ -195,6 +210,10 @@ impl Supervisor { &self.bcast } + pub(crate) fn callbacks(&self) -> &Callbacks { + &self.callbacks + } + pub(crate) fn as_ref(&self) -> SupervisorRef { // TODO: clone or ref? let id = self.bcast.id().clone(); @@ -240,8 +259,8 @@ impl Supervisor { /// [`SupervisorRef`]: ../struct.SupervisorRef.html /// [`supervisor_ref`]: #method.supervisor_ref pub fn supervisor(self, init: S) -> Self - where - S: FnOnce(Supervisor) -> Supervisor, + where + S: FnOnce(Supervisor) -> Supervisor, { let parent = Parent::supervisor(self.as_ref()); let bcast = Broadcast::new(parent); @@ -293,8 +312,8 @@ impl Supervisor { /// [`SupervisorRef`]: ../struct.SupervisorRef.html /// [`supervisor`]: #method.supervisor pub fn supervisor_ref(&mut self, init: S) -> SupervisorRef - where - S: FnOnce(Supervisor) -> Supervisor, + where + S: FnOnce(Supervisor) -> Supervisor, { let parent = Parent::supervisor(self.as_ref()); let bcast = Broadcast::new(parent); @@ -355,8 +374,8 @@ impl Supervisor { /// [`ChildrenRef`]: children/struct.ChildrenRef.html /// [`children_ref`]: #method.children_ref pub fn children(self, init: C) -> Self - where - C: FnOnce(Children) -> Children, + where + C: FnOnce(Children) -> Children, { let parent = Parent::supervisor(self.as_ref()); let bcast = Broadcast::new(parent); @@ -418,8 +437,8 @@ impl Supervisor { /// [`ChildrenRef`]: children/struct.ChildrenRef.html /// [`children`]: #method.children pub fn children_ref(&self, init: C) -> ChildrenRef - where - C: FnOnce(Children) -> Children, + where + C: FnOnce(Children) -> Children, { let parent = Parent::supervisor(self.as_ref()); let bcast = Broadcast::new(parent); @@ -488,7 +507,64 @@ impl Supervisor { self } - async fn stop(&mut self, range: RangeFrom) -> Vec { + // TODO: doc + pub fn with_callbacks(mut self, callbacks: Callbacks) -> Self { + self.callbacks = callbacks; + self + } + + async fn restart(&mut self, range: RangeFrom) { + // TODO: stop or kill? + let killed = self.kill(range.clone()).await; + + let parent = Parent::supervisor(self.as_ref()); + let mut reset = FuturesOrdered::new(); + for id in self.order.drain(range) { + let supervised = if let Some(supervised) = self.stopped.remove(&id) { + supervised + } else if let Some(supervised) = self.killed.remove(&id) { + supervised + } else { + // FIXME + unimplemented!(); + }; + + let killed = killed.contains(supervised.id()); + if killed { + supervised.callbacks().call_before_restart(); + } + + let bcast = Broadcast::new(parent.clone()); + reset.push(async move { + // FIXME: panics? + let supervised = supervised.reset(bcast).await.unwrap(); + // FIXME: might not keep order + if killed { + supervised.callbacks().call_after_restart(); + } else { + supervised.callbacks().call_before_start(); + } + + supervised + }) + } + + while let Some(supervised) = reset.next().await { + self.bcast.register(supervised.bcast()); + if self.started { + let msg = BastionMessage::start(); + self.bcast.send_child(supervised.id(), msg); + } + + let id = supervised.id().clone(); + let launched = supervised.launch(); + self.launched + .insert(id.clone(), (self.order.len(), launched)); + self.order.push(id); + } + } + + async fn stop(&mut self, range: RangeFrom) { if range.start == 0 { self.bcast.stop_children(); } else { @@ -498,10 +574,31 @@ impl Supervisor { } } - self.collect(range).await + let mut supervised = FuturesOrdered::new(); + // FIXME: panics? + for id in self.order.get(range.clone()).unwrap() { + // TODO: Err if None? + if let Some((_, launched)) = self.launched.remove(&id) { + // TODO: add a "stopped" list and poll from it instead of awaiting + supervised.push(launched); + } + } + + while let Some(supervised) = supervised.next().await { + match supervised { + Some(supervised) => { + let id = supervised.id().clone(); + + supervised.callbacks().call_after_stop(); + self.stopped.insert(id, supervised); + } + // FIXME + None => unimplemented!(), + } + } } - async fn kill(&mut self, range: RangeFrom) -> Vec { + async fn kill(&mut self, range: RangeFrom) -> Vec { if range.start == 0 { self.bcast.kill_children(); } else { @@ -511,19 +608,7 @@ impl Supervisor { } } - self.collect(range).await - } - - fn stopped(&mut self) { - self.bcast.stopped(); - } - - fn faulted(&mut self) { - self.bcast.faulted(); - } - - async fn collect(&mut self, range: RangeFrom) -> Vec { - let mut supervised = Vec::new(); + let mut supervised = FuturesOrdered::new(); // FIXME: panics? for id in self.order.get(range.clone()).unwrap() { // TODO: Err if None? @@ -533,37 +618,28 @@ impl Supervisor { } } - let supervised = FuturesOrdered::from_iter(supervised.into_iter().rev()); - let mut supervised = supervised.collect::>().await; - - let mut collected = Vec::with_capacity(supervised.len()); - for id in self.order.drain(range) { - if let Some(supervised) = self.stopped.remove(&id) { - collected.push(supervised); - - continue; - } - - if let Some(supervised) = self.killed.remove(&id) { - collected.push(supervised); - - continue; - } - - match supervised.pop() { - Some(Some(supervised)) if supervised.id() == &id => { - collected.push(supervised); + let mut killed = Vec::with_capacity(supervised.len()); + while let Some(supervised) = supervised.next().await { + match supervised { + Some(supervised) => { + let id = supervised.id().clone(); + killed.push(id.clone()); + self.killed.insert(id, supervised); } // FIXME - Some(Some(_)) => unimplemented!(), - // FIXME - Some(None) => unimplemented!(), - // FIXME None => unimplemented!(), } } - collected + killed + } + + fn stopped(&mut self) { + self.bcast.stopped(); + } + + fn faulted(&mut self) { + self.bcast.faulted(); } async fn recover(&mut self, id: BastionId) -> Result<(), ()> { @@ -573,6 +649,7 @@ impl Supervisor { // TODO: add a "waiting" list and poll from it instead of awaiting // FIXME: panics? let supervised = launched.await.unwrap(); + dbg!(); supervised.callbacks().call_before_restart(); self.bcast.unregister(supervised.id()); @@ -595,75 +672,17 @@ impl Supervisor { self.order[order] = id; } SupervisionStrategy::OneForAll => { - let stopped = self.stopped.keys().cloned().collect::>(); - - // TODO: stop or kill? - for supervised in self.kill(0..).await { - if !stopped.contains(supervised.id()) { - supervised.callbacks().call_before_restart(); - } - - self.bcast.unregister(supervised.id()); - - let parent = Parent::supervisor(self.as_ref()); - let bcast = Broadcast::new(parent); - let id = bcast.id().clone(); - // FIXME: panics. - let supervised = supervised.reset(bcast).await.unwrap(); - if stopped.contains(supervised.id()) { - supervised.callbacks().call_before_start(); - } else { - supervised.callbacks().call_after_restart(); - } + self.restart(0..).await; - self.bcast.register(supervised.bcast()); - - let launched = supervised.launch(); - self.launched - .insert(id.clone(), (self.order.len(), launched)); - self.order.push(id); - } - - if self.started { - let msg = BastionMessage::start(); - self.bcast.send_children(msg); - } + // TODO: should be empty + self.stopped.shrink_to_fit(); + self.killed.shrink_to_fit(); } SupervisionStrategy::RestForOne => { - let stopped = self.stopped.keys().cloned().collect::>(); - let (order, _) = self.launched.get(&id).ok_or(())?; - let order = *order; - - // TODO: stop or kill? - for supervised in self.kill(order..).await { - if !stopped.contains(supervised.id()) { - supervised.callbacks().call_before_restart(); - } - - self.bcast.unregister(supervised.id()); + let (start, _) = self.launched.get(&id).ok_or(())?; + let start = *start; - let parent = Parent::supervisor(self.as_ref()); - let bcast = Broadcast::new(parent); - let id = bcast.id().clone(); - // FIXME: panics? - let supervised = supervised.reset(bcast).await.unwrap(); - if stopped.contains(supervised.id()) { - supervised.callbacks().call_before_start(); - } else { - supervised.callbacks().call_after_restart(); - } - - self.bcast.register(supervised.bcast()); - if self.started { - let msg = BastionMessage::start(); - self.bcast.send_child(&id, msg); - } - - let launched = supervised.launch(); - self.launched - .insert(id.clone(), (self.order.len(), launched)); - self.order.push(id); - } + self.restart(start..).await; } } @@ -674,59 +693,40 @@ impl Supervisor { match msg { BastionMessage::Start => unreachable!(), BastionMessage::Stop => { - for supervised in self.stop(0..).await.into_iter() { - supervised.callbacks().call_after_stop(); - - self.stopped.insert(supervised.id().clone(), supervised); - } - + self.stop(0..).await; self.stopped(); return Err(()); } BastionMessage::Kill => { - for supervised in self.kill(0..).await.into_iter() { - self.killed.insert(supervised.id().clone(), supervised); - } - + self.kill(0..).await; self.stopped(); return Err(()); } - BastionMessage::Deploy(deployment) => match deployment { - Deployment::Supervisor(supervisor) => { - self.bcast.register(&supervisor.bcast); - if self.started { - let msg = BastionMessage::start(); - self.bcast.send_child(supervisor.id(), msg); + BastionMessage::Deploy(deployment) => { + let supervised = match deployment { + Deployment::Supervisor(supervisor) => { + supervisor.callbacks().call_before_start(); + Supervised::supervisor(supervisor) } - - let id = supervisor.id().clone(); - let supervised = Supervised::supervisor(supervisor); - //supervised.callbacks().call_before_start(); - - let launched = supervised.launch(); - self.launched - .insert(id.clone(), (self.order.len(), launched)); - self.order.push(id); - } - Deployment::Children(children) => { - children.callbacks().call_before_start(); - - self.bcast.register(children.bcast()); - if self.started { - let msg = BastionMessage::start(); - self.bcast.send_child(children.id(), msg); + Deployment::Children(children) => { + children.callbacks().call_before_start(); + Supervised::children(children) } + }; - let id = children.id().clone(); - let supervised = Supervised::children(children); - - let launched = supervised.launch(); - self.launched - .insert(id.clone(), (self.order.len(), launched)); - self.order.push(id); + self.bcast.register(supervised.bcast()); + if self.started { + let msg = BastionMessage::start(); + self.bcast.send_child(supervised.id(), msg); } + + let id = supervised.id().clone(); + let launched = supervised.launch(); + self.launched + .insert(id.clone(), (self.order.len(), launched)); + self.order.push(id); }, // FIXME BastionMessage::Prune { .. } => unimplemented!(), @@ -750,10 +750,8 @@ impl Supervisor { } BastionMessage::Faulted { id } => { if self.recover(id).await.is_err() { - for supervised in self.kill(0..).await.into_iter() { - self.killed.insert(supervised.id().clone(), supervised); - } - + // TODO: stop or kill? + self.kill(0..).await; self.faulted(); return Err(()); @@ -793,10 +791,7 @@ impl Supervisor { } Poll::Ready(None) => { // TODO: stop or kill? - for supervised in self.kill(0..).await.into_iter() { - self.killed.insert(supervised.id().clone(), supervised); - } - + self.kill(0..).await; self.faulted(); return self; @@ -1130,7 +1125,7 @@ impl Supervised { fn callbacks(&self) -> &Callbacks { match self { - Supervised::Supervisor(supervisor) => unimplemented!(), + Supervised::Supervisor(supervisor) => supervisor.callbacks(), Supervised::Children(children) => children.callbacks(), } } diff --git a/bastion/src/system.rs b/bastion/src/system.rs index 3d2c864a..9183a189 100644 --- a/bastion/src/system.rs +++ b/bastion/src/system.rs @@ -76,6 +76,8 @@ impl System { // TODO: set a limit? async fn recover(&mut self, mut supervisor: Supervisor) { + supervisor.callbacks().call_before_restart(); + let parent = Parent::system(); let bcast = if supervisor.id() == &NIL_ID { Broadcast::with_id(parent, NIL_ID) @@ -84,31 +86,34 @@ impl System { }; let id = bcast.id().clone(); - supervisor.reset(bcast).await; + supervisor.callbacks().call_after_restart(); + self.bcast.register(supervisor.bcast()); let launched = supervisor.launch(); self.launched.insert(id, launched); } - async fn stop(&mut self) { + async fn stop(&mut self) -> Vec { self.bcast.stop_children(); for (_, launched) in self.launched.drain() { self.waiting.push(launched); } + let mut supervisors = Vec::new(); loop { match poll!(&mut self.waiting.next()) { - Poll::Ready(Some(_)) => (), - Poll::Ready(None) => return, + Poll::Ready(Some(Some(supervisor))) => supervisors.push(supervisor), + Poll::Ready(Some(None)) => (), + Poll::Ready(None) => return supervisors, Poll::Pending => pending!(), } } } - async fn kill(&mut self) { + async fn kill(&mut self) -> Vec { self.bcast.kill_children(); for launched in self.waiting.iter_mut() { @@ -121,10 +126,12 @@ impl System { self.waiting.push(launched); } + let mut supervisors = Vec::new(); loop { match poll!(&mut self.waiting.next()) { - Poll::Ready(Some(_)) => (), - Poll::Ready(None) => return, + Poll::Ready(Some(Some(supervisor))) => supervisors.push(supervisor), + Poll::Ready(Some(None)) => (), + Poll::Ready(None) => return supervisors, Poll::Pending => pending!(), } } @@ -136,7 +143,9 @@ impl System { BastionMessage::Stop => { self.started = false; - self.stop().await; + for supervisor in self.stop().await { + supervisor.callbacks().call_after_stop(); + } return Err(()); } @@ -149,6 +158,8 @@ impl System { } BastionMessage::Deploy(deployment) => match deployment { Deployment::Supervisor(supervisor) => { + supervisor.callbacks().call_before_start(); + self.bcast.register(supervisor.bcast()); if self.started { let msg = BastionMessage::start(); @@ -202,6 +213,8 @@ impl System { if self.restart.remove(&id) { self.recover(supervisor).await; + } else { + supervisor.callbacks().call_after_stop(); } continue;