Skip to content

Commit

Permalink
Added logs to System.
Browse files Browse the repository at this point in the history
  • Loading branch information
r3v2d0g committed Nov 12, 2019
1 parent 5b66bff commit ef4bccf
Showing 1 changed file with 47 additions and 17 deletions.
64 changes: 47 additions & 17 deletions bastion/src/system.rs
Expand Up @@ -33,6 +33,7 @@ pub(crate) struct System {

impl System {
fn init() -> Sender {
info!("System: Initializing.");
let parent = Parent::none();
let bcast = Broadcast::with_id(parent, NIL_ID);
let launched = FxHashMap::default();
Expand All @@ -52,6 +53,7 @@ impl System {
started,
};

debug!("System: Creating the system supervisor.");
let parent = Parent::system();
let bcast = Broadcast::with_id(parent, NIL_ID);

Expand All @@ -61,11 +63,11 @@ impl System {
let msg = BastionMessage::deploy_supervisor(supervisor);
system.bcast.send_self(msg);

// FIXME: with_id
let stack = ProcStack::default();
debug!("System: Launching.");
let stack = system.stack();
let handle = pool::spawn(system.run(), stack);

// FIXME: pancis?
// FIXME: panics?
let mut system = SYSTEM.clone().lock().wait().unwrap();
*system = Some(handle);

Expand All @@ -75,12 +77,18 @@ impl System {
sender
}

fn stack(&self) -> ProcStack {
// FIXME: with_id
ProcStack::default()
}

pub(crate) fn root_supervisor() -> Option<&'static SupervisorRef> {
unsafe { ROOT_SPV.as_ref() }
}

// TODO: set a limit?
async fn recover(&mut self, mut supervisor: Supervisor) {
warn!("System: Recovering Supervisor({}).", supervisor.id());
supervisor.callbacks().before_restart();

let parent = Parent::system();
Expand All @@ -95,6 +103,7 @@ impl System {

self.bcast.register(supervisor.bcast());

info!("System: Launching Supervisor({}).", supervisor.id());
let id = supervisor.id().clone();
let launched = supervisor.launch();
self.launched.insert(id, launched);
Expand All @@ -110,15 +119,20 @@ impl System {
let mut supervisors = Vec::new();
loop {
match poll!(&mut self.waiting.next()) {
Poll::Ready(Some(Some(supervisor))) => supervisors.push(supervisor),
Poll::Ready(Some(None)) => (),
Poll::Ready(Some(Some(supervisor))) => {
debug!("System: Supervisor({}) stopped.", supervisor.id());
supervisors.push(supervisor);
}
Poll::Ready(Some(None)) => {
error!("System: Unknown supervisor cancelled instead of stopped.");
},
Poll::Ready(None) => return supervisors,
Poll::Pending => pending!(),
}
}
}

async fn kill(&mut self) -> Vec<Supervisor> {
async fn kill(&mut self) {
self.bcast.kill_children();

for launched in self.waiting.iter_mut() {
Expand All @@ -131,12 +145,15 @@ impl System {
self.waiting.push(launched);
}

let mut supervisors = Vec::new();
loop {
match poll!(&mut self.waiting.next()) {
Poll::Ready(Some(Some(supervisor))) => supervisors.push(supervisor),
Poll::Ready(Some(None)) => (),
Poll::Ready(None) => return supervisors,
Poll::Ready(Some(Some(supervisor))) => {
debug!("System: Supervisor({}) killed.", supervisor.id());
}
Poll::Ready(Some(None)) => {
debug!("System: Unknown Supervisor killed.");
}
Poll::Ready(None) => return,
Poll::Pending => pending!(),
}
}
Expand All @@ -146,23 +163,22 @@ impl System {
match msg {
BastionMessage::Start => unreachable!(),
BastionMessage::Stop => {
self.started = false;

info!("System: Stopping.");
for supervisor in self.stop().await {
supervisor.callbacks().after_stop();
}

return Err(());
}
BastionMessage::Kill => {
self.started = false;

info!("System: Killing.");
self.kill().await;

return Err(());
}
BastionMessage::Deploy(deployment) => match deployment {
Deployment::Supervisor(supervisor) => {
debug!("System: Deploying Supervisor({}).", supervisor.id());
supervisor.callbacks().before_start();

self.bcast.register(supervisor.bcast());
Expand All @@ -171,6 +187,7 @@ impl System {
self.bcast.send_child(supervisor.id(), msg);
}

info!("System: Launching Supervisor({}).", supervisor.id());
let id = supervisor.id().clone();
let launched = supervisor.launch();
self.launched.insert(id, launched);
Expand All @@ -189,17 +206,22 @@ impl System {
}
// FIXME
BastionMessage::SuperviseWith(_) => unimplemented!(),
BastionMessage::Message { .. } => self.bcast.send_children(msg),
BastionMessage::Message(ref message) => {
debug!("System: Broadcasting a message: {:?}", message);
self.bcast.send_children(msg);
}
BastionMessage::Stopped { id } => {
// TODO: Err if None?
if let Some(launched) = self.launched.remove(&id) {
info!("System: Supervisor({}) stopped.", id);
self.waiting.push(launched);
self.restart.remove(&id);
}
}
BastionMessage::Faulted { id } => {
// TODO: Err if None?
if let Some(launched) = self.launched.remove(&id) {
warn!("System: Supervisor({}) faulted.", id);
self.waiting.push(launched);
self.restart.insert(id);
}
Expand All @@ -210,6 +232,7 @@ impl System {
}

async fn run(mut self) {
info!("System: Launched.");
loop {
match poll!(&mut self.waiting.next()) {
Poll::Ready(Some(Some(supervisor))) => {
Expand All @@ -232,6 +255,8 @@ impl System {
match poll!(&mut self.bcast.next()) {
// TODO: Err if started == true?
Poll::Ready(Some(BastionMessage::Start)) => {
trace!("System: Received a new message (started=false): {:?}", BastionMessage::Start);
info!("System: Starting.");
self.started = true;

let msg = BastionMessage::start();
Expand All @@ -240,7 +265,9 @@ impl System {
let msgs = self.pre_start_msgs.drain(..).collect::<Vec<_>>();
self.pre_start_msgs.shrink_to_fit();

debug!("System: Replaying messages received before starting.");
for msg in msgs {
trace!("System: Replaying message: {:?}", msg);
// FIXME: Err(Error)?
if self.handle(msg).await.is_err() {
// FIXME: panics?
Expand All @@ -251,9 +278,12 @@ impl System {
}
}
}
Poll::Ready(Some(msg)) if !self.started => self.pre_start_msgs.push(msg),
Poll::Ready(Some(msg)) if !self.started => {
trace!("System: Received a new message (started=false): {:?}", msg);
self.pre_start_msgs.push(msg);
}
Poll::Ready(Some(msg)) => {
// FIXME: Err(Error)?
trace!("System: Received a new message (started=true): {:?}", msg);
if self.handle(msg).await.is_err() {
// FIXME: panics?
let mut system = SYSTEM.clone().lock_async().await.unwrap();
Expand Down

0 comments on commit ef4bccf

Please sign in to comment.