Skip to content

Commit

Permalink
Added a ContextState struct to store a Child's messages and updated C…
Browse files Browse the repository at this point in the history
…hild to store received message in it and Context to allow to get messages from it
  • Loading branch information
r3v2d0g committed Oct 17, 2019
1 parent 4278240 commit 57a4ff6
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 37 deletions.
76 changes: 41 additions & 35 deletions bastion/src/children.rs
@@ -1,10 +1,11 @@
use crate::bastion::REGISTRY;
use crate::broadcast::{BastionMessage, Broadcast, Sender};
use crate::context::{BastionContext, BastionId};
use crate::context::{BastionContext, BastionId, ContextState};
use futures::future::CatchUnwind;
use futures::pending;
use futures::poll;
use futures::prelude::*;
use qutex::Qutex;
use runtime::task::JoinHandle;
use std::any::Any;
use std::fmt::Debug;
Expand Down Expand Up @@ -45,6 +46,7 @@ pub(super) struct Children {
pub(super) struct Child {
exec: CatchUnwind<Pin<Box<dyn Fut>>>,
bcast: Broadcast,
state: Qutex<ContextState>,
}

impl Children {
Expand Down Expand Up @@ -115,16 +117,19 @@ impl Children {
let bcast = self.bcast.new_child();
let id = bcast.id().clone();

let state = ContextState::new();
let state = Qutex::new(state);

let thunk = objekt::clone_box(&*self.thunk);
let msg = objekt::clone_box(&*self.msg);

let parent = self.bcast.sender().clone();
let ctx = BastionContext::new(id, parent);
let ctx = BastionContext::new(id, parent, state.clone());

let exec = thunk(ctx, msg)
.catch_unwind();

let child = Child { exec, bcast };
let child = Child { exec, bcast, state };
runtime::spawn(child.run());
}

Expand All @@ -141,51 +146,52 @@ impl Child {
self.bcast.sender()
}

async fn run(mut self) {
REGISTRY.add_child(&self);
fn dead(mut self) {
REGISTRY.remove_child(&self);

loop {
if let Poll::Ready(res) = poll!(&mut self.exec) {
REGISTRY.remove_child(&self);
self.bcast.dead();
}

match res {
Ok(Ok(())) => self.bcast.dead(),
Ok(Err(())) | Err(_) => self.bcast.faulted(),
}
fn faulted(mut self) {
REGISTRY.remove_child(&self);

return;
}
self.bcast.faulted();
}

async fn run(mut self) {
REGISTRY.add_child(&self);

loop {
match poll!(&mut self.bcast.next()) {
Poll::Ready(Some(msg)) => {
match msg {
BastionMessage::PoisonPill => {
REGISTRY.remove_child(&self);

self.bcast.dead();

return;
}
BastionMessage::Dead { .. } | BastionMessage::Faulted { .. } => {
REGISTRY.remove_child(&self);

self.bcast.faulted();
// FIXME: Err(Error)
let mut state = self.state.clone().lock_async().await.unwrap();

return;
}
match msg {
BastionMessage::PoisonPill => return self.dead(),
// FIXME
BastionMessage::Message(_) => unimplemented!(),
BastionMessage::Dead { .. } => unimplemented!(),
// FIXME
BastionMessage::Faulted { .. } => unimplemented!(),
BastionMessage::Message(msg) => {
state.push_msg(msg);

continue;
},
}
}
Poll::Ready(None) => {
REGISTRY.remove_child(&self);

self.bcast.faulted();
Poll::Ready(None) => return self.faulted(),
Poll::Pending => (),
}

return;
if let Poll::Ready(res) = poll!(&mut self.exec) {
match res {
Ok(Ok(())) => return self.dead(),
Ok(Err(())) | Err(_) => return self.faulted(),
}
Poll::Pending => pending!(),
}

pending!();
}
}
}
48 changes: 46 additions & 2 deletions bastion/src/context.rs
@@ -1,6 +1,9 @@
use crate::bastion::REGISTRY;
use crate::broadcast::{BastionMessage, Sender};
use crate::children::Message;
use futures::pending;
use qutex::{Guard, Qutex};
use std::collections::VecDeque;
use uuid::Uuid;

#[derive(Hash, Eq, PartialEq, Debug, Clone)]
Expand All @@ -9,6 +12,11 @@ pub struct BastionId(Uuid);
pub struct BastionContext {
id: BastionId,
parent: Sender,
state: Qutex<ContextState>,
}

pub(super) struct ContextState {
msgs: VecDeque<Box<dyn Message>>,
}

impl BastionId {
Expand All @@ -20,8 +28,8 @@ impl BastionId {
}

impl BastionContext {
pub(super) fn new(id: BastionId, parent: Sender) -> Self {
BastionContext { id, parent }
pub(super) fn new(id: BastionId, parent: Sender, state: Qutex<ContextState>) -> Self {
BastionContext { id, parent, state }
}

pub fn id(&self) -> &BastionId {
Expand All @@ -31,6 +39,42 @@ impl BastionContext {
pub fn send_msg(&self, id: &BastionId, msg: Box<dyn Message>) -> Result<(), Box<dyn Message>> {
let msg = BastionMessage::msg(msg);

// TODO: Err(Error)
REGISTRY.send_child(id, msg).map_err(|msg| msg.into_msg().unwrap())
}

// TODO: Err(Error)
pub async fn recv(&self) -> Result<Box<dyn Message>, ()> {
loop {
// TODO: Err(Error)
let mut state = self.state.clone().lock_async().await.unwrap();

if let Some(msg) = state.msgs.pop_front() {
return Ok(msg);
}

Guard::unlock(state);

pending!();
}
}

pub async fn try_recv(&self) -> Option<Box<dyn Message>> {
// TODO: Err(Error)
let mut state = self.state.clone().lock_async().await.ok()?;

state.msgs.pop_front()
}
}

impl ContextState {
pub(super) fn new() -> Self {
let msgs = VecDeque::new();

ContextState { msgs }
}

pub(super) fn push_msg(&mut self, msg: Box<dyn Message>) {
self.msgs.push_back(msg)
}
}

0 comments on commit 57a4ff6

Please sign in to comment.