Skip to content

Commit

Permalink
Merge pull request #1723
Browse files Browse the repository at this point in the history
Remove the class monitorable_actor
  • Loading branch information
Neverlord committed Jan 20, 2024
2 parents 1183333 + 42b5420 commit 5125196
Show file tree
Hide file tree
Showing 18 changed files with 367 additions and 482 deletions.
1 change: 0 additions & 1 deletion libcaf_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ caf_add_component(
caf/message_builder.test.cpp
caf/message_handler.cpp
caf/message_id.test.cpp
caf/monitorable_actor.cpp
caf/mtl.test.cpp
caf/node_id.cpp
caf/policy/select_all.test.cpp
Expand Down
198 changes: 188 additions & 10 deletions libcaf_core/caf/abstract_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "caf/logger.hpp"
#include "caf/mailbox_element.hpp"
#include "caf/message.hpp"
#include "caf/scheduler/abstract_coordinator.hpp"
#include "caf/system_messages.hpp"

#include <atomic>
Expand All @@ -24,29 +25,94 @@

namespace caf {

// exit_state_ is guaranteed to be set to 0, i.e., exit_reason::not_exited,
// by std::atomic<> constructor
// -- constructors, destructors, and assignment operators ----------------------

actor_control_block* abstract_actor::ctrl() const {
return actor_control_block::from(this);
abstract_actor::abstract_actor(actor_config& cfg) : flags_(cfg.flags) {
// nop
}

abstract_actor::~abstract_actor() {
// nop
}

void abstract_actor::on_destroy() {
// nop
// -- attachables ------------------------------------------------------------

void abstract_actor::attach(attachable_ptr ptr) {
CAF_LOG_TRACE("");
CAF_ASSERT(ptr != nullptr);
error fail_state;
auto attached = exclusive_critical_section([&] {
if (getf(is_terminated_flag)) {
fail_state = fail_state_;
return false;
}
attach_impl(ptr);
return true;
});
if (!attached) {
CAF_LOG_DEBUG(
"cannot attach functor to terminated actor: call immediately");
ptr->actor_exited(fail_state, nullptr);
}
}

abstract_actor::abstract_actor(actor_config& cfg) : flags_(cfg.flags) {
// nop
size_t abstract_actor::detach(const attachable::token& what) {
CAF_LOG_TRACE("");
std::unique_lock<std::mutex> guard{mtx_};
return detach_impl(what);
}

actor_addr abstract_actor::address() const noexcept {
return actor_addr{actor_control_block::from(this)};
void abstract_actor::attach_impl(attachable_ptr& ptr) {
ptr->next.swap(attachables_head_);
attachables_head_.swap(ptr);
}

size_t abstract_actor::detach_impl(const attachable::token& what,
bool stop_on_hit, bool dry_run) {
CAF_LOG_TRACE(CAF_ARG(stop_on_hit) << CAF_ARG(dry_run));
size_t count = 0;
auto i = &attachables_head_;
while (*i != nullptr) {
if ((*i)->matches(what)) {
++count;
if (!dry_run) {
CAF_LOG_DEBUG("removed element");
attachable_ptr next;
next.swap((*i)->next);
(*i).swap(next);
} else {
i = &((*i)->next);
}
if (stop_on_hit)
return count;
} else {
i = &((*i)->next);
}
}
return count;
}

// -- linking ------------------------------------------------------------------

void abstract_actor::link_to(const actor_addr& other) {
CAF_LOG_TRACE(CAF_ARG(other));
link_to(actor_cast<strong_actor_ptr>(other));
}

void abstract_actor::unlink_from(const actor_addr& other) {
CAF_LOG_TRACE(CAF_ARG(other));
if (!other)
return;
if (auto hdl = actor_cast<strong_actor_ptr>(other)) {
unlink_from(hdl);
return;
}
default_attachable::observe_token tk{other, default_attachable::link};
exclusive_critical_section([&] { detach_impl(tk, true); });
}

// -- properties ---------------------------------------------------------------

std::set<std::string> abstract_actor::message_types() const {
// defaults to untyped
return std::set<std::string>{};
Expand All @@ -64,6 +130,56 @@ actor_system& abstract_actor::home_system() const noexcept {
return *(actor_control_block::from(this)->home_system);
}

actor_control_block* abstract_actor::ctrl() const {
return actor_control_block::from(this);
}

actor_addr abstract_actor::address() const noexcept {
return actor_addr{actor_control_block::from(this)};
}

// -- callbacks ----------------------------------------------------------------

void abstract_actor::on_destroy() {
// nop
}

void abstract_actor::on_cleanup(const error&) {
// nop
}

bool abstract_actor::cleanup(error&& reason, execution_unit* host) {
CAF_LOG_TRACE(CAF_ARG(reason));
attachable_ptr head;
bool set_fail_state = exclusive_critical_section([&]() -> bool {
if (!getf(is_cleaned_up_flag)) {
// local actors pass fail_state_ as first argument
if (&fail_state_ != &reason)
fail_state_ = std::move(reason);
attachables_head_.swap(head);
flags(flags() | is_terminated_flag | is_cleaned_up_flag);
on_cleanup(fail_state_);
return true;
}
return false;
});
if (!set_fail_state)
return false;
CAF_LOG_DEBUG("cleanup" << CAF_ARG(id()) << CAF_ARG(node())
<< CAF_ARG(fail_state_));
// send exit messages
for (attachable* i = head.get(); i != nullptr; i = i->next.get())
i->actor_exited(fail_state_, host);
// tell printer to purge its state for us if we ever used aout()
if (getf(abstract_actor::has_used_aout_flag)) {
auto pr = home_system().scheduler().printer();
pr->enqueue(make_mailbox_element(nullptr, make_message_id(), delete_atom_v,
id()),
nullptr);
}
return true;
}

mailbox_element* abstract_actor::peek_at_next_mailbox_element() {
return nullptr;
}
Expand All @@ -84,4 +200,66 @@ void abstract_actor::unregister_from_system() {
CAF_LOG_DEBUG("actor" << id() << "decreased running count to" << count);
}

void abstract_actor::add_link(abstract_actor* x) {
// Add backlink on `x` first and add the local attachable only on success.
CAF_LOG_TRACE(CAF_ARG(x));
CAF_ASSERT(x != nullptr);
error fail_state;
bool send_exit_immediately = false;
auto tmp = default_attachable::make_link(address(), x->address());
joined_exclusive_critical_section(this, x, [&] {
if (getf(is_terminated_flag)) {
fail_state = fail_state_;
send_exit_immediately = true;
} else if (x->add_backlink(this)) {
attach_impl(tmp);
}
});
if (send_exit_immediately) {
auto ptr = make_mailbox_element(nullptr, make_message_id(),
exit_msg{address(), fail_state});
x->enqueue(std::move(ptr), nullptr);
}
}

void abstract_actor::remove_link(abstract_actor* x) {
CAF_LOG_TRACE(CAF_ARG(x));
default_attachable::observe_token tk{x->address(), default_attachable::link};
joined_exclusive_critical_section(this, x, [&] {
x->remove_backlink(this);
detach_impl(tk, true);
});
}

bool abstract_actor::add_backlink(abstract_actor* x) {
// Called in an exclusive critical section.
CAF_LOG_TRACE(CAF_ARG(x));
CAF_ASSERT(x);
error fail_state;
bool send_exit_immediately = false;
default_attachable::observe_token tk{x->address(), default_attachable::link};
auto tmp = default_attachable::make_link(address(), x->address());
auto success = false;
if (getf(is_terminated_flag)) {
fail_state = fail_state_;
send_exit_immediately = true;
} else if (detach_impl(tk, true, true) == 0) {
attach_impl(tmp);
success = true;
}
if (send_exit_immediately) {
auto ptr = make_mailbox_element(nullptr, make_message_id(),
exit_msg{address(), fail_state});
x->enqueue(std::move(ptr), nullptr);
}
return success;
}

bool abstract_actor::remove_backlink(abstract_actor* x) {
// Called in an exclusive critical section.
CAF_LOG_TRACE(CAF_ARG(x));
default_attachable::observe_token tk{x->address(), default_attachable::link};
return detach_impl(tk, true) > 0;
}

} // namespace caf

0 comments on commit 5125196

Please sign in to comment.