Skip to content

Commit

Permalink
Add monitor loop to supervisor logic (#9)
Browse files Browse the repository at this point in the history
* add monitor loop to supervisor logic

* add run method to supervisor::Spec

* add SupervisorResult enum type

Co-authored-by: Zephyr Shannon <geoffpshannon@gmail.com>
  • Loading branch information
roman and RadicalZephyr committed Oct 15, 2020
1 parent 963a23f commit 21282c1
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 99 deletions.
11 changes: 8 additions & 3 deletions src/events.rs
Expand Up @@ -31,7 +31,7 @@ pub struct NodeData {

/// NotifyFn is used by the supervision API to send events to an interested
/// listener.
type NotifyFn = Box<dyn Fn(Event) -> BoxFuture<'static, ()>>;
type NotifyFn = Box<dyn (Fn(Event) -> BoxFuture<'static, ()>) + Send + Sync>;

/// EventNotifier is used by the internal supervision API to send events about a
/// running supervision tree
Expand All @@ -41,7 +41,7 @@ pub struct EventNotifier(Arc<NotifyFn>);
impl EventNotifier {
pub fn new<F, O>(notify0: F) -> Self
where
F: Fn(Event) -> O + 'static,
F: (Fn(Event) -> O) + Send + Sync + 'static,
O: Future<Output = ()> + FutureExt + Send + 'static,
{
let notify = move |ev| {
Expand Down Expand Up @@ -219,7 +219,9 @@ impl EventBufferCollector {
// we finished the loop, we have to wait until the next push and try
// again. if we wait too long, fail with a timeout error
if let Err(_) = timeout_at(Instant::now() + wait_duration, self.on_push.recv()).await {
return Err("Expected assertion after timeout, did not happen".to_owned());
return Err(
"wait_till: Expected assertion after timeout, did not happen".to_owned(),
);
}
}
}
Expand Down Expand Up @@ -398,6 +400,9 @@ async fn run_event_collector(
mut receiver: mpsc::Receiver<Event>,
) {
while let Some(ev) = receiver.recv().await {
// IMPORTANT: DO NOT REMOVE DEBUG LINE COMMENT BELLOW
// println!("{:?}", ev);

let mut ev_vec = events.lock().await;
ev_vec.push(ev);

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -5,6 +5,7 @@ extern crate lazy_static;
#[allow(clippy::new_without_default)]
mod context;
mod events;
mod notifier;
mod supervisor;
mod worker;

Expand Down
29 changes: 29 additions & 0 deletions src/notifier.rs
@@ -0,0 +1,29 @@
use tokio::sync::oneshot;

/// StartNotifier offers a convenient way to notify a supervising task that this
/// task got started or that it failed to start.
pub struct StartNotifier<T, E>(Box<dyn FnOnce(Result<T, E>) + Send>);

impl<T, E> StartNotifier<T, E> {
pub fn from_oneshot(sender: oneshot::Sender<Result<T, E>>) -> Self
where
T: Send + 'static,
E: Send + 'static,
{
StartNotifier(Box::new(move |err| {
sender.send(err).ok();
}))
}

fn call(self, err: Result<T, E>) {
self.0(err)
}

pub fn success(self, result: T) {
self.call(Ok(result))
}

pub fn failed(self, err: E) {
self.call(Err(err))
}
}

0 comments on commit 21282c1

Please sign in to comment.