Skip to content

Commit

Permalink
Refactor brokers to use the mailbox, relates #343
Browse files Browse the repository at this point in the history
  • Loading branch information
Neverlord committed Sep 28, 2015
1 parent 4feb541 commit e11edaf
Show file tree
Hide file tree
Showing 24 changed files with 339 additions and 265 deletions.
2 changes: 1 addition & 1 deletion libcaf_core/caf/abstract_actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class abstract_actor : public abstract_channel {

/// Called by the runtime system to perform cleanup actions for this actor.
/// Subtypes should always call this member function when overriding it.
void cleanup(uint32_t reason);
virtual void cleanup(uint32_t reason);

/// Returns `exit_reason() != exit_reason::not_exited`.
inline bool exited() const {
Expand Down
2 changes: 1 addition & 1 deletion libcaf_core/caf/detail/single_reader_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class single_reader_queue {
template <class F>
void close(const F& f) {
clear_cached_elements(f);
if (fetch_new_data(nullptr)) {
if (! blocked() && fetch_new_data(nullptr)) {
clear_cached_elements(f);
}
cache_.clear(f);
Expand Down
14 changes: 13 additions & 1 deletion libcaf_core/caf/local_actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,12 @@ class local_actor : public abstract_actor, public resumable {

/// @cond PRIVATE

// handle `ptr` in an event-based actor
resumable::resume_result exec_event(mailbox_element_ptr& ptr);

// handle `ptr` in an event-based actor, not suitable to be called in a loop
void exec_single_event(mailbox_element_ptr& ptr);

local_actor();

template <class ActorHandle>
Expand Down Expand Up @@ -563,7 +569,11 @@ class local_actor : public abstract_actor, public resumable {

virtual void initialize() = 0;

void cleanup(uint32_t reason);
// clear behavior stack and call cleanup if actor either has no
// valid behavior left or has set a planned exit reason
bool finalize();

void cleanup(uint32_t reason) override;

// an actor can have multiple pending timeouts, but only
// the latest one is active (i.e. the pending_timeouts_.back())
Expand All @@ -584,6 +594,8 @@ class local_actor : public abstract_actor, public resumable {
behavior& fun,
message_id awaited_response);

//invoke_message_result invoke_message(mailbox_element_ptr& node);

using pending_response = std::pair<message_id, behavior>;

message_id new_request_id(message_priority mp);
Expand Down
16 changes: 12 additions & 4 deletions libcaf_core/caf/scheduler/abstract_coordinator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class abstract_coordinator {
public:
friend class detail::singletons;

explicit abstract_coordinator(size_t num_worker_threads);
explicit abstract_coordinator(size_t num_worker_threads,
size_t max_throughput_param);

virtual ~abstract_coordinator();

Expand All @@ -64,6 +65,10 @@ class abstract_coordinator {
nullptr);
}

inline size_t max_throughput() const {
return max_throughput_;
}

inline size_t num_workers() const {
return num_workers_;
}
Expand All @@ -84,13 +89,16 @@ class abstract_coordinator {
delete this;
}

actor timer_;
actor printer_;

// ID of the worker receiving the next enqueue
std::atomic<size_t> next_worker_;

// number of messages each actor is allowed to consume per resume
size_t max_throughput_;

size_t num_workers_;

actor timer_;
actor printer_;
};

} // namespace scheduler
Expand Down
5 changes: 1 addition & 4 deletions libcaf_core/caf/scheduler/coordinator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ class coordinator : public abstract_coordinator {

coordinator(size_t nw = std::max(std::thread::hardware_concurrency(), 4u),
size_t mt = std::numeric_limits<size_t>::max())
: super(nw),
max_throughput_(mt) {
: super(nw, mt) {
// nop
}

Expand Down Expand Up @@ -145,8 +144,6 @@ class coordinator : public abstract_coordinator {
policy_data data_;
// instance of our policy object
Policy policy_;
// number of messages each actor is allowed to consume per resume
size_t max_throughput_;
};

} // namespace scheduler
Expand Down
3 changes: 3 additions & 0 deletions libcaf_core/caf/stateful_actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <type_traits>

#include "caf/fwd.hpp"

#include "caf/detail/logging.hpp"
#include "caf/detail/type_traits.hpp"

namespace caf {
Expand Down Expand Up @@ -60,6 +62,7 @@ class stateful_actor : public Base {

/// Destroys the state of this actor (no further overriding allowed).
void on_exit() override final {
CAF_LOG_TRACE("");
state_.~State();
}

Expand Down
3 changes: 2 additions & 1 deletion libcaf_core/src/abstract_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,9 @@ void abstract_coordinator::stop_actors() {
);
}

abstract_coordinator::abstract_coordinator(size_t nw)
abstract_coordinator::abstract_coordinator(size_t nw, size_t mt)
: next_worker_(0),
max_throughput_(mt),
num_workers_(nw) {
// nop
}
Expand Down
156 changes: 103 additions & 53 deletions libcaf_core/src/local_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,17 @@ invoke_message_result local_actor::invoke_message(mailbox_element_ptr& ptr,
CAF_CRITICAL("invalid message type");
}

/*
invoke_message_result local_actor::invoke_message(mailbox_element_ptr& node) {
if (! awaits_response() && bhvr_stack().empty())
return im_dropped;
auto& bhvr = awaits_response() ? awaited_response_handler()
: bhvr_stack().back();
auto mid = awaited_response_id();
return invoke_message(node, bhvr, mid);
}
*/

struct pending_response_predicate {
public:
explicit pending_response_predicate(message_id mid) : mid_(mid) {
Expand Down Expand Up @@ -682,33 +693,21 @@ resumable::resume_result local_actor::resume(execution_unit* eu,
}
// actor is cooperatively scheduled
host(eu);
auto actor_done = [&]() -> bool {
if (! has_behavior() || planned_exit_reason() != exit_reason::not_exited) {
CAF_LOG_DEBUG("actor either has no behavior or has set an exit reason");
on_exit();
bhvr_stack().clear();
bhvr_stack().cleanup();
auto rsn = planned_exit_reason();
if (rsn == exit_reason::not_exited) {
rsn = exit_reason::normal;
planned_exit_reason(rsn);
}
cleanup(rsn);
return true;
}
return false;
};
// actors without behavior or that have already defined
// an exit reason must not be resumed
CAF_ASSERT(! is_initialized()
|| (has_behavior()
&& planned_exit_reason() == exit_reason::not_exited));
if (is_initialized()
&& (! has_behavior()
|| planned_exit_reason() != exit_reason::not_exited)) {
CAF_LOG_DEBUG_IF(! has_behavior(),
"resume called on an actor without behavior");
CAF_LOG_DEBUG_IF(planned_exit_reason() != exit_reason::not_exited,
"resume called on an actor with exit reason");
return resumable::done;
}
std::exception_ptr eptr = nullptr;
try {
if (! is_initialized()) {
CAF_LOG_DEBUG("initialize actor");
initialize();
if (actor_done()) {
if (finalize()) {
CAF_LOG_DEBUG("actor_done() returned true right "
<< "after make_behavior()");
return resumable::resume_result::done;
Expand All @@ -723,36 +722,9 @@ resumable::resume_result local_actor::resume(execution_unit* eu,
for (size_t i = 0; i < max_throughput; ++i) {
auto ptr = next_message();
if (ptr) {
auto& bhvr = awaits_response()
? awaited_response_handler()
: bhvr_stack().back();
auto mid = awaited_response_id();
switch (invoke_message(ptr, bhvr, mid)) {
case im_success:
bhvr_stack().cleanup();
++handled_msgs;
if (actor_done()) {
CAF_LOG_DEBUG("actor exited");
return resumable::resume_result::done;
}
// continue from cache if current message was
// handled, because the actor might have changed
// its behavior to match 'old' messages now
while (invoke_from_cache()) {
if (actor_done()) {
CAF_LOG_DEBUG("actor exited");
return resumable::resume_result::done;
}
}
break;
case im_skipped:
CAF_ASSERT(ptr != nullptr);
push_to_cache(std::move(ptr));
break;
case im_dropped:
// destroy msg
break;
}
if (exec_event(ptr) == resumable::resume_result::done)
return resumable::resume_result::done;
++handled_msgs;
} else {
CAF_LOG_DEBUG("no more element in mailbox; going to block");
reset_timeout_if_needed();
Expand Down Expand Up @@ -797,13 +769,75 @@ resumable::resume_result local_actor::resume(execution_unit* eu,
planned_exit_reason(*opt_reason);
}
}
if (! actor_done()) {
if (! finalize()) {
// actor has been "revived", try running it again later
return resumable::resume_later;
}
return resumable::done;
}

resumable::resume_result local_actor::exec_event(mailbox_element_ptr& ptr) {
auto& bhvr = awaits_response() ? awaited_response_handler()
: bhvr_stack().back();
auto mid = awaited_response_id();
switch (invoke_message(ptr, bhvr, mid)) {
case im_success:
bhvr_stack().cleanup();
if (finalize()) {
CAF_LOG_DEBUG("actor exited");
return resumable::resume_result::done;
}
// continue from cache if current message was
// handled, because the actor might have changed
// its behavior to match 'old' messages now
while (invoke_from_cache()) {
if (finalize()) {
CAF_LOG_DEBUG("actor exited");
return resumable::resume_result::done;
}
}
break;
case im_skipped:
CAF_ASSERT(ptr != nullptr);
push_to_cache(std::move(ptr));
break;
case im_dropped:
// destroy msg
break;
}
return resumable::resume_result::resume_later;
}

void local_actor::exec_single_event(mailbox_element_ptr& ptr) {
if (! is_initialized()) {
CAF_LOG_DEBUG("initialize actor");
initialize();
if (finalize()) {
CAF_LOG_DEBUG("actor_done() returned true right "
<< "after make_behavior()");
return;
}
}
if (! has_behavior() || planned_exit_reason() != exit_reason::not_exited) {
CAF_LOG_DEBUG_IF(! has_behavior(),
"resume called on an actor without behavior");
CAF_LOG_DEBUG_IF(planned_exit_reason() != exit_reason::not_exited,
"resume called on an actor with exit reason");
return;
}
try {
exec_event(ptr);
}
catch (...) {
CAF_LOG_INFO("broker died because of an exception");
auto eptr = std::current_exception();
auto opt_reason = this->handle(eptr);
if (opt_reason)
planned_exit_reason(*opt_reason);
finalize();
}
}

mailbox_element_ptr local_actor::next_message() {
if (! is_priority_aware()) {
return mailbox_element_ptr{mailbox().try_pop()};
Expand Down Expand Up @@ -934,6 +968,22 @@ behavior& local_actor::get_behavior() {
: pending_responses_.front().second;
}

bool local_actor::finalize() {
if (has_behavior() && planned_exit_reason() == exit_reason::not_exited)
return false;
CAF_LOG_DEBUG("actor either has no behavior or has set an exit reason");
on_exit();
bhvr_stack().clear();
bhvr_stack().cleanup();
auto rsn = planned_exit_reason();
if (rsn == exit_reason::not_exited) {
rsn = exit_reason::normal;
planned_exit_reason(rsn);
}
cleanup(rsn);
return true;
}

void local_actor::cleanup(uint32_t reason) {
CAF_LOG_TRACE(CAF_ARG(reason));
current_mailbox_element().reset();
Expand Down
Loading

0 comments on commit e11edaf

Please sign in to comment.