diff --git a/bastion/src/children.rs b/bastion/src/children.rs index ef4b73ec..111a6d26 100644 --- a/bastion/src/children.rs +++ b/bastion/src/children.rs @@ -1,10 +1,11 @@ use crate::bastion::REGISTRY; use crate::broadcast::{BastionMessage, Broadcast, Sender}; -use crate::context::{BastionContext, BastionId}; +use crate::context::{BastionContext, BastionId, ContextState}; use futures::future::CatchUnwind; use futures::pending; use futures::poll; use futures::prelude::*; +use qutex::Qutex; use runtime::task::JoinHandle; use std::any::Any; use std::fmt::Debug; @@ -45,6 +46,7 @@ pub(super) struct Children { pub(super) struct Child { exec: CatchUnwind>>, bcast: Broadcast, + state: Qutex, } impl Children { @@ -115,16 +117,19 @@ impl Children { let bcast = self.bcast.new_child(); let id = bcast.id().clone(); + let state = ContextState::new(); + let state = Qutex::new(state); + let thunk = objekt::clone_box(&*self.thunk); let msg = objekt::clone_box(&*self.msg); let parent = self.bcast.sender().clone(); - let ctx = BastionContext::new(id, parent); + let ctx = BastionContext::new(id, parent, state.clone()); let exec = thunk(ctx, msg) .catch_unwind(); - let child = Child { exec, bcast }; + let child = Child { exec, bcast, state }; runtime::spawn(child.run()); } @@ -141,51 +146,52 @@ impl Child { self.bcast.sender() } - async fn run(mut self) { - REGISTRY.add_child(&self); + fn dead(mut self) { + REGISTRY.remove_child(&self); - loop { - if let Poll::Ready(res) = poll!(&mut self.exec) { - REGISTRY.remove_child(&self); + self.bcast.dead(); + } - match res { - Ok(Ok(())) => self.bcast.dead(), - Ok(Err(())) | Err(_) => self.bcast.faulted(), - } + fn faulted(mut self) { + REGISTRY.remove_child(&self); - return; - } + self.bcast.faulted(); + } + + async fn run(mut self) { + REGISTRY.add_child(&self); + loop { match poll!(&mut self.bcast.next()) { Poll::Ready(Some(msg)) => { - match msg { - BastionMessage::PoisonPill => { - REGISTRY.remove_child(&self); - - self.bcast.dead(); - - return; - } - BastionMessage::Dead { .. } | BastionMessage::Faulted { .. } => { - REGISTRY.remove_child(&self); - - self.bcast.faulted(); + // FIXME: Err(Error) + let mut state = self.state.clone().lock_async().await.unwrap(); - return; - } + match msg { + BastionMessage::PoisonPill => return self.dead(), // FIXME - BastionMessage::Message(_) => unimplemented!(), + BastionMessage::Dead { .. } => unimplemented!(), + // FIXME + BastionMessage::Faulted { .. } => unimplemented!(), + BastionMessage::Message(msg) => { + state.push_msg(msg); + + continue; + }, } } - Poll::Ready(None) => { - REGISTRY.remove_child(&self); - - self.bcast.faulted(); + Poll::Ready(None) => return self.faulted(), + Poll::Pending => (), + } - return; + if let Poll::Ready(res) = poll!(&mut self.exec) { + match res { + Ok(Ok(())) => return self.dead(), + Ok(Err(())) | Err(_) => return self.faulted(), } - Poll::Pending => pending!(), } + + pending!(); } } } diff --git a/bastion/src/context.rs b/bastion/src/context.rs index 6423dfd2..e88f4657 100644 --- a/bastion/src/context.rs +++ b/bastion/src/context.rs @@ -1,6 +1,9 @@ use crate::bastion::REGISTRY; use crate::broadcast::{BastionMessage, Sender}; use crate::children::Message; +use futures::pending; +use qutex::{Guard, Qutex}; +use std::collections::VecDeque; use uuid::Uuid; #[derive(Hash, Eq, PartialEq, Debug, Clone)] @@ -9,6 +12,11 @@ pub struct BastionId(Uuid); pub struct BastionContext { id: BastionId, parent: Sender, + state: Qutex, +} + +pub(super) struct ContextState { + msgs: VecDeque>, } impl BastionId { @@ -20,8 +28,8 @@ impl BastionId { } impl BastionContext { - pub(super) fn new(id: BastionId, parent: Sender) -> Self { - BastionContext { id, parent } + pub(super) fn new(id: BastionId, parent: Sender, state: Qutex) -> Self { + BastionContext { id, parent, state } } pub fn id(&self) -> &BastionId { @@ -31,6 +39,42 @@ impl BastionContext { pub fn send_msg(&self, id: &BastionId, msg: Box) -> Result<(), Box> { let msg = BastionMessage::msg(msg); + // TODO: Err(Error) REGISTRY.send_child(id, msg).map_err(|msg| msg.into_msg().unwrap()) } + + // TODO: Err(Error) + pub async fn recv(&self) -> Result, ()> { + loop { + // TODO: Err(Error) + let mut state = self.state.clone().lock_async().await.unwrap(); + + if let Some(msg) = state.msgs.pop_front() { + return Ok(msg); + } + + Guard::unlock(state); + + pending!(); + } + } + + pub async fn try_recv(&self) -> Option> { + // TODO: Err(Error) + let mut state = self.state.clone().lock_async().await.ok()?; + + state.msgs.pop_front() + } +} + +impl ContextState { + pub(super) fn new() -> Self { + let msgs = VecDeque::new(); + + ContextState { msgs } + } + + pub(super) fn push_msg(&mut self, msg: Box) { + self.msgs.push_back(msg) + } }