Skip to content

Commit

Permalink
properly spawn new actors in same worker as parent
Browse files Browse the repository at this point in the history
  • Loading branch information
Neverlord committed Mar 14, 2014
1 parent 1ad4a17 commit 4126af4
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 116 deletions.
8 changes: 4 additions & 4 deletions cppa/detail/proper_actor.hpp
Expand Up @@ -188,15 +188,15 @@ class proper_actor : public proper_actor_base<Base,
template <typename... Ts>
proper_actor(Ts&&... args) : super(std::forward<Ts>(args)...) { }

inline void launch(bool is_hidden) {
inline void launch(bool is_hidden, execution_unit* host) {
CPPA_LOG_TRACE("");
this->hidden(is_hidden);
auto bhvr = this->make_behavior();
if (bhvr) this->become(std::move(bhvr));
CPPA_LOG_WARNING_IF(this->bhvr_stack().empty(),
"actor did not set a behavior");
if (!this->bhvr_stack().empty()) {
this->scheduling_policy().launch(this);
this->scheduling_policy().launch(this, host);
}
}

Expand Down Expand Up @@ -256,9 +256,9 @@ class proper_actor<Base, Policies,true> : public proper_actor_base<Base,
this->resume_policy().await_ready(this);
}

inline void launch(bool is_hidden) {
inline void launch(bool is_hidden, execution_unit* host) {
this->hidden(is_hidden);
this->scheduling_policy().launch(this);
this->scheduling_policy().launch(this, host);
}

// implement blocking_actor::dequeue_response
Expand Down
49 changes: 27 additions & 22 deletions cppa/local_actor.hpp
Expand Up @@ -52,7 +52,6 @@
#include "cppa/message_header.hpp"
#include "cppa/abstract_actor.hpp"
#include "cppa/abstract_group.hpp"
#include "cppa/execution_unit.hpp"
#include "cppa/mailbox_element.hpp"
#include "cppa/response_promise.hpp"
#include "cppa/message_priority.hpp"
Expand All @@ -68,7 +67,6 @@
namespace cppa {

// forward declarations
class execution_unit;
class sync_handle_helper;

/**
Expand All @@ -94,35 +92,50 @@ class local_actor : public extend<abstract_actor>::with<memory_cached> {
template<class C, spawn_options Os = no_spawn_options, typename... Ts>
actor spawn(Ts&&... args) {
constexpr auto os = make_unbound(Os);
auto res = cppa::spawn<C, os>(std::forward<Ts>(args)...);
auto res = spawn_class<C, os>(m_host, empty_before_launch_callback{},
std::forward<Ts>(args)...);
return eval_opts(Os, std::move(res));
}

template<spawn_options Os = no_spawn_options, typename... Ts>
actor spawn(Ts&&... args) {
constexpr auto os = make_unbound(Os);
auto res = cppa::spawn<os>(std::forward<Ts>(args)...);
auto res = spawn_functor<os>(m_host, empty_before_launch_callback{},
std::forward<Ts>(args)...);
return eval_opts(Os, std::move(res));
}

template<spawn_options Os = no_spawn_options, typename... Ts>
template<class C, spawn_options Os, typename... Ts>
actor spawn_in_group(const group& grp, Ts&&... args) {
constexpr auto os = make_unbound(Os);
auto res = cppa::spawn_in_group<os>(grp, std::forward<Ts>(args)...);
auto res = spawn_class<C, os>(m_host, group_subscriber{grp},
std::forward<Ts>(args)...);
return eval_opts(Os, std::move(res));
}

template<class C, spawn_options Os, typename... Ts>
template<spawn_options Os = no_spawn_options, typename... Ts>
actor spawn_in_group(const group& grp, Ts&&... args) {
constexpr auto os = make_unbound(Os);
auto res = cppa::spawn_in_group<C, os>(grp, std::forward<Ts>(args)...);
auto res = spawn_functor<os>(m_host, group_subscriber{grp},
std::forward<Ts>(args)...);
return eval_opts(Os, std::move(res));
}

/**************************************************************************
* spawn typed actors *
**************************************************************************/

template<class C, spawn_options Os = no_spawn_options, typename... Ts>
typename detail::actor_handle_from_signature_list<
typename C::signatures
>::type
spawn_typed(Ts&&... args) {
constexpr auto os = make_unbound(Os);
auto res = spawn_class<C, os>(m_host, empty_before_launch_callback{},
std::forward<Ts>(args)...);
return eval_opts(Os, std::move(res));
}

template<spawn_options Os = no_spawn_options, typename F, typename... Ts>
typename detail::infer_typed_actor_handle<
typename util::get_callable_trait<F>::result_type,
Expand All @@ -132,18 +145,10 @@ class local_actor : public extend<abstract_actor>::with<memory_cached> {
>::type
spawn_typed(F fun, Ts&&... args) {
constexpr auto os = make_unbound(Os);
auto res = cppa::spawn_typed<os>(std::move(fun),
std::forward<Ts>(args)...);
return eval_opts(Os, std::move(res));
}

template<class C, spawn_options Os = no_spawn_options, typename... Ts>
typename detail::actor_handle_from_signature_list<
typename C::signatures
>::type
spawn_typed(Ts&&... args) {
constexpr auto os = make_unbound(Os);
auto res = cppa::spawn_typed<C, os>(std::forward<Ts>(args)...);
auto res = cppa::spawn_typed_functor<os>(m_host,
empty_before_launch_callback{},
std::move(fun),
std::forward<Ts>(args)...);
return eval_opts(Os, std::move(res));
}

Expand Down Expand Up @@ -472,10 +477,10 @@ class local_actor : public extend<abstract_actor>::with<memory_cached> {
template<class ActorHandle>
inline ActorHandle eval_opts(spawn_options opts, ActorHandle res) {
if (has_monitor_flag(opts)) {
monitor(res.address());
monitor(res->address());
}
if (has_link_flag(opts)) {
link_to(res.address());
link_to(res->address());
}
return res;
}
Expand Down
6 changes: 5 additions & 1 deletion cppa/logging.hpp
Expand Up @@ -70,7 +70,7 @@ class logging {

// associates given actor id with this thread,
// returns the previously set actor id
virtual actor_id set_aid(actor_id aid) = 0;
actor_id set_aid(actor_id aid);

virtual void log(const char* level,
const char* class_name,
Expand Down Expand Up @@ -176,6 +176,7 @@ oss_wr operator<<(oss_wr&& lhs, T rhs) {
# define CPPA_LOG_IMPL(lvlname, classname, funname, message) \
CPPA_PRINT_ERROR_IMPL(lvlname, classname, funname, message)
# define CPPA_PUSH_AID(unused) static_cast<void>(0)
# define CPPA_PUSH_AID_FROM_PTR(unused) static_cast<void>(0)
# define CPPA_SET_AID(unused) cppa_set_aid_dummy()
# define CPPA_LOG_LEVEL 1
#else
Expand All @@ -190,6 +191,9 @@ oss_wr operator<<(oss_wr&& lhs, T rhs) {
auto aid_pop_guard = ::cppa::util::make_scope_guard([=] { \
::cppa::get_logger()->set_aid(prev_aid_in_scope); \
})
# define CPPA_PUSH_AID_FROM_PTR(some_ptr) \
auto aid_ptr_argument = some_ptr; \
CPPA_PUSH_AID(aid_ptr_argument ? aid_ptr_argument->id() : 0)
# define CPPA_SET_AID(aid_arg) \
::cppa::get_logger()->set_aid(aid_arg)
#endif
Expand Down
4 changes: 2 additions & 2 deletions cppa/policy/context_switching_resume.hpp
Expand Up @@ -80,8 +80,8 @@ class context_switching_resume {

resumable::resume_result resume(detail::cs_thread* from,
execution_unit* host) override {
CPPA_REQUIRE(from != nullptr);
CPPA_PUSH_AID(this->id());
CPPA_REQUIRE(from != nullptr && host != nullptr);
CPPA_LOG_TRACE("");
this->m_host = host;
using namespace detail;
for (;;) {
Expand Down
5 changes: 3 additions & 2 deletions cppa/policy/cooperative_scheduling.hpp
Expand Up @@ -82,10 +82,11 @@ class cooperative_scheduling {
}

template<class Actor>
inline void launch(Actor* self) {
inline void launch(Actor* self, execution_unit* host) {
// detached in scheduler::worker::run
self->attach_to_scheduler();
get_scheduling_coordinator()->enqueue(self);
if (host) host->exec_later(self);
else get_scheduling_coordinator()->enqueue(self);
}

template<class Actor>
Expand Down
3 changes: 2 additions & 1 deletion cppa/policy/event_based_resume.hpp
Expand Up @@ -39,6 +39,7 @@

#include "cppa/config.hpp"
#include "cppa/extend.hpp"
#include "cppa/logging.hpp"
#include "cppa/behavior.hpp"
#include "cppa/scheduler.hpp"
#include "cppa/actor_state.hpp"
Expand Down Expand Up @@ -74,13 +75,13 @@ class event_based_resume {

resumable::resume_result resume(detail::cs_thread*,
execution_unit* host) override {
CPPA_REQUIRE(host != nullptr);
auto d = dptr();
d->m_host = host;
CPPA_LOG_TRACE("id = " << d->id()
<< ", state = " << static_cast<int>(d->state()));
CPPA_REQUIRE( d->state() == actor_state::ready
|| d->state() == actor_state::pending);
CPPA_PUSH_AID(d->id());
auto done_cb = [&]() -> bool {
CPPA_LOG_TRACE("");
d->bhvr_stack().clear();
Expand Down
2 changes: 1 addition & 1 deletion cppa/policy/no_scheduling.hpp
Expand Up @@ -105,7 +105,7 @@ class no_scheduling {
}

template<class Actor>
void launch(Actor* self) {
void launch(Actor* self, execution_unit*) {
CPPA_PUSH_AID(self->id());
CPPA_LOG_TRACE(CPPA_ARG(self));
CPPA_REQUIRE(self != nullptr);
Expand Down

0 comments on commit 4126af4

Please sign in to comment.