Skip to content

Commit

Permalink
Send failures to all monitoring actors
Browse files Browse the repository at this point in the history
  • Loading branch information
gamazeps committed Feb 12, 2016
1 parent 97ad313 commit ffdc6bb
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions src/actors/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ pub trait ActorContext {
/// Lifecycle monitoring, list of monitored actors.
fn monitoring(&self) -> HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)>;

/// Actors monitoring this actor.
fn monitored_by(&self) -> Vec<ActorRef>;

/// Logical path to the actor, such as `/user/foo/bar/baz`
fn path(&self) -> Arc<ActorPath>;

Expand All @@ -156,9 +159,6 @@ pub trait ActorContext {
///
/// The future will have the path: `$actor/$name_request`
fn identify_actor(&self, logical_path: String, request_name: String) -> ActorRef;

///// Monitors the given actor and will treat him with the given handler.
//fn monitor(&self, actor: ActorRef, handler: FailureHandler);
}

impl ActorContext for ActorCell {
Expand Down Expand Up @@ -299,6 +299,14 @@ impl ActorContext for ActorCell {
monitoring.clone()
}

fn monitored_by(&self) -> Vec<ActorRef> {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to get the children from the context of a no longer existing actor");
});
let monitored_by = inner.monitored_by.lock().unwrap();
monitored_by.clone()
}

fn path(&self) -> Arc<ActorPath> {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to get the path from the context of a no longer existing actor");
Expand Down Expand Up @@ -352,7 +360,9 @@ impl Drop for Failsafe {
fn drop(&mut self) {
if self.active {
*self.state.write().unwrap() = ActorState::Failed;
self.context.father().receive_system_message(SystemMessage::Failure(self.context.actor_ref()));
for actor in self.context.monitored_by().iter() {
actor.receive_system_message(SystemMessage::Failure(self.context.actor_ref()));
}
}
}
}
Expand Down Expand Up @@ -414,7 +424,7 @@ struct InnerActorCell {
children: Mutex<HashMap<Arc<ActorPath>, ActorRef>>,
monitoring: Mutex<HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)>>,
actor_state: Arc<RwLock<ActorState>>,
_monitored: Mutex<Vec<ActorRef>>,
monitored_by: Mutex<Vec<ActorRef>>,
actor: RwLock<Arc<Actor>>,
}

Expand All @@ -438,7 +448,7 @@ impl InnerActorCell {
children: Mutex::new(HashMap::new()),
monitoring: Mutex::new(HashMap::new()),
actor_state: Arc::new(RwLock::new(ActorState::Unstarted)),
_monitored: Mutex::new(vec![father.clone()]),
monitored_by: Mutex::new(vec![father.clone()]),
}
}

Expand All @@ -460,8 +470,7 @@ impl InnerActorCell {
fn handle_envelope(&self, context: ActorCell) {
// Now we do not want users to be able to touch current_sender while the actor is busy.
let _lock = self.busy.lock();
let failsafe = Failsafe::new(context.clone(),
self.actor_state.clone());
let failsafe = Failsafe::new(context.clone(), self.actor_state.clone());
// System messages are handled first, so that we can restart an actor if he failed without
// loosing the messages in the mailbox.
// NOTE: This does not break the fact that messages sent by the same actor are treated in
Expand Down

0 comments on commit ffdc6bb

Please sign in to comment.