Permalink
Browse files

refactored MONITOR and KILL_PROXY message handling; fixes #61

  • Loading branch information...
1 parent b519309 commit 809a198929ddf1059be6d58467f32129b03ca9f3 @Neverlord Neverlord committed Aug 22, 2012
Showing with 51 additions and 19 deletions.
  1. +51 −19 src/middleman.cpp
View
@@ -52,6 +52,7 @@
#include "cppa/util/output_stream.hpp"
#include "cppa/detail/middleman.hpp"
+#include "cppa/detail/actor_registry.hpp"
#include "cppa/detail/addressed_message.hpp"
#include "cppa/detail/actor_proxy_cache.hpp"
@@ -418,44 +419,75 @@ bool peer_connection::continue_reading() {
auto& content = msg.content();
DEBUG("<-- " << to_string(msg));
match(content) (
- on(atom("MONITOR")) >> [&]() {
- auto receiver = msg.receiver().downcast<actor>();
- // an empty receiver usually means the actor
- // has finished execution and is therefore no longer
- // available in the actor registry
- //CPPA_REQUIRE(receiver.get() != nullptr);
- if (!receiver) {
- DEBUG("MONITOR received for an empty receiver");
+ // monitor messages are sent automatically whenever
+ // actor_proxy_cache creates a new proxy
+ // note: aid is the *original* actor id
+ on(atom("MONITOR"), arg_match) >> [&](const process_information_ptr& peer, actor_id aid) {
+ if (!peer) {
+ DEBUG("MONITOR received from invalid peer");
+ return;
+ }
+ auto ar = singleton_manager::get_actor_registry();
+ auto reg_entry = ar->get_entry(aid);
+ auto pself = parent()->pself();
+ auto send_kp = [=](uint32_t reason) {
+ middleman_enqueue(peer,
+ nullptr,
+ nullptr,
+ make_any_tuple(
+ atom("KILL_PROXY"),
+ pself,
+ aid,
+ reason
+ ));
+ };
+ if (reg_entry.first == nullptr) {
+ if (reg_entry.second == exit_reason::not_exited) {
+ // invalid entry
+ DEBUG("MONITOR for an unknown actor received");
+ }
+ else {
+ // this actor already finished execution;
+ // reply with KILL_PROXY message
+ send_kp(reg_entry.second);
+ }
+ }
+ else {
+ reg_entry.first->attach_functor(send_kp);
}
- else if (receiver->parent_process() == *process_information::get()) {
- auto mpeer = m_peer;
- // this message was send from a proxy
- receiver->attach_functor([mpeer, receiver](uint32_t reason) {
- addressed_message kmsg{receiver, receiver, make_any_tuple(atom("KILL_PROXY"), reason)};
- middleman_enqueue(mpeer, kmsg);
- });
+ },
+ on(atom("KILL_PROXY"), arg_match) >> [&](const process_information_ptr& peer, actor_id aid, std::uint32_t reason) {
+ auto& cache = get_actor_proxy_cache();
+ auto proxy = cache.get(aid,
+ peer->process_id(),
+ peer->node_id());
+ if (proxy) {
+ proxy->enqueue(nullptr,
+ make_any_tuple(
+ atom("KILL_PROXY"), reason));
}
else {
- DEBUG("MONITOR received for a remote actor");
+ DEBUG("received KILL_PROXY message but didn't "
+ "found matching instance in cache");
}
},
- on(atom("LINK"), arg_match) >> [&](actor_ptr ptr) {
+ on(atom("LINK"), arg_match) >> [&](const actor_ptr& ptr) {
if (msg.sender()->is_proxy() == false) {
DEBUG("msg.sender() is not a proxy");
return;
}
auto whom = msg.sender().downcast<actor_proxy>();
if ((whom) && (ptr)) whom->local_link_to(ptr);
},
- on(atom("UNLINK"), arg_match) >> [](actor_ptr ptr) {
+ on(atom("UNLINK"), arg_match) >> [](const actor_ptr& ptr) {
if (ptr->is_proxy() == false) {
DEBUG("msg.sender() is not a proxy");
return;
}
auto whom = ptr.downcast<actor_proxy>();
if ((whom) && (ptr)) whom->local_unlink_from(ptr);
},
- others() >> [&]() {
+ others() >> [&] {
auto receiver = msg.receiver().get();
if (receiver) {
if (msg.id().valid()) {

0 comments on commit 809a198

Please sign in to comment.