diff --git a/cppa/detail/proper_actor.hpp b/cppa/detail/proper_actor.hpp index ea8ec5daf7..638054269e 100644 --- a/cppa/detail/proper_actor.hpp +++ b/cppa/detail/proper_actor.hpp @@ -188,7 +188,7 @@ class proper_actor : public proper_actor_base proper_actor(Ts&&... args) : super(std::forward(args)...) { } - inline void launch(bool is_hidden) { + inline void launch(bool is_hidden, execution_unit* host) { CPPA_LOG_TRACE(""); this->hidden(is_hidden); auto bhvr = this->make_behavior(); @@ -196,7 +196,7 @@ class proper_actor : public proper_actor_basebhvr_stack().empty(), "actor did not set a behavior"); if (!this->bhvr_stack().empty()) { - this->scheduling_policy().launch(this); + this->scheduling_policy().launch(this, host); } } @@ -256,9 +256,9 @@ class proper_actor : public proper_actor_baseresume_policy().await_ready(this); } - inline void launch(bool is_hidden) { + inline void launch(bool is_hidden, execution_unit* host) { this->hidden(is_hidden); - this->scheduling_policy().launch(this); + this->scheduling_policy().launch(this, host); } // implement blocking_actor::dequeue_response diff --git a/cppa/local_actor.hpp b/cppa/local_actor.hpp index dc632d7c76..9b9a87354c 100644 --- a/cppa/local_actor.hpp +++ b/cppa/local_actor.hpp @@ -52,7 +52,6 @@ #include "cppa/message_header.hpp" #include "cppa/abstract_actor.hpp" #include "cppa/abstract_group.hpp" -#include "cppa/execution_unit.hpp" #include "cppa/mailbox_element.hpp" #include "cppa/response_promise.hpp" #include "cppa/message_priority.hpp" @@ -68,7 +67,6 @@ namespace cppa { // forward declarations -class execution_unit; class sync_handle_helper; /** @@ -94,28 +92,32 @@ class local_actor : public extend::with { template actor spawn(Ts&&... args) { constexpr auto os = make_unbound(Os); - auto res = cppa::spawn(std::forward(args)...); + auto res = spawn_class(m_host, empty_before_launch_callback{}, + std::forward(args)...); return eval_opts(Os, std::move(res)); } template actor spawn(Ts&&... args) { constexpr auto os = make_unbound(Os); - auto res = cppa::spawn(std::forward(args)...); + auto res = spawn_functor(m_host, empty_before_launch_callback{}, + std::forward(args)...); return eval_opts(Os, std::move(res)); } - template + template actor spawn_in_group(const group& grp, Ts&&... args) { constexpr auto os = make_unbound(Os); - auto res = cppa::spawn_in_group(grp, std::forward(args)...); + auto res = spawn_class(m_host, group_subscriber{grp}, + std::forward(args)...); return eval_opts(Os, std::move(res)); } - template + template actor spawn_in_group(const group& grp, Ts&&... args) { constexpr auto os = make_unbound(Os); - auto res = cppa::spawn_in_group(grp, std::forward(args)...); + auto res = spawn_functor(m_host, group_subscriber{grp}, + std::forward(args)...); return eval_opts(Os, std::move(res)); } @@ -123,6 +125,17 @@ class local_actor : public extend::with { * spawn typed actors * **************************************************************************/ + template + typename detail::actor_handle_from_signature_list< + typename C::signatures + >::type + spawn_typed(Ts&&... args) { + constexpr auto os = make_unbound(Os); + auto res = spawn_class(m_host, empty_before_launch_callback{}, + std::forward(args)...); + return eval_opts(Os, std::move(res)); + } + template typename detail::infer_typed_actor_handle< typename util::get_callable_trait::result_type, @@ -132,18 +145,10 @@ class local_actor : public extend::with { >::type spawn_typed(F fun, Ts&&... args) { constexpr auto os = make_unbound(Os); - auto res = cppa::spawn_typed(std::move(fun), - std::forward(args)...); - return eval_opts(Os, std::move(res)); - } - - template - typename detail::actor_handle_from_signature_list< - typename C::signatures - >::type - spawn_typed(Ts&&... args) { - constexpr auto os = make_unbound(Os); - auto res = cppa::spawn_typed(std::forward(args)...); + auto res = cppa::spawn_typed_functor(m_host, + empty_before_launch_callback{}, + std::move(fun), + std::forward(args)...); return eval_opts(Os, std::move(res)); } @@ -472,10 +477,10 @@ class local_actor : public extend::with { template inline ActorHandle eval_opts(spawn_options opts, ActorHandle res) { if (has_monitor_flag(opts)) { - monitor(res.address()); + monitor(res->address()); } if (has_link_flag(opts)) { - link_to(res.address()); + link_to(res->address()); } return res; } diff --git a/cppa/logging.hpp b/cppa/logging.hpp index d03a68e0e0..18ab77a41a 100644 --- a/cppa/logging.hpp +++ b/cppa/logging.hpp @@ -70,7 +70,7 @@ class logging { // associates given actor id with this thread, // returns the previously set actor id - virtual actor_id set_aid(actor_id aid) = 0; + actor_id set_aid(actor_id aid); virtual void log(const char* level, const char* class_name, @@ -176,6 +176,7 @@ oss_wr operator<<(oss_wr&& lhs, T rhs) { # define CPPA_LOG_IMPL(lvlname, classname, funname, message) \ CPPA_PRINT_ERROR_IMPL(lvlname, classname, funname, message) # define CPPA_PUSH_AID(unused) static_cast(0) +# define CPPA_PUSH_AID_FROM_PTR(unused) static_cast(0) # define CPPA_SET_AID(unused) cppa_set_aid_dummy() # define CPPA_LOG_LEVEL 1 #else @@ -190,6 +191,9 @@ oss_wr operator<<(oss_wr&& lhs, T rhs) { auto aid_pop_guard = ::cppa::util::make_scope_guard([=] { \ ::cppa::get_logger()->set_aid(prev_aid_in_scope); \ }) +# define CPPA_PUSH_AID_FROM_PTR(some_ptr) \ + auto aid_ptr_argument = some_ptr; \ + CPPA_PUSH_AID(aid_ptr_argument ? aid_ptr_argument->id() : 0) # define CPPA_SET_AID(aid_arg) \ ::cppa::get_logger()->set_aid(aid_arg) #endif diff --git a/cppa/policy/context_switching_resume.hpp b/cppa/policy/context_switching_resume.hpp index fd6e4b3eee..3902bfba38 100644 --- a/cppa/policy/context_switching_resume.hpp +++ b/cppa/policy/context_switching_resume.hpp @@ -80,8 +80,8 @@ class context_switching_resume { resumable::resume_result resume(detail::cs_thread* from, execution_unit* host) override { - CPPA_REQUIRE(from != nullptr); - CPPA_PUSH_AID(this->id()); + CPPA_REQUIRE(from != nullptr && host != nullptr); + CPPA_LOG_TRACE(""); this->m_host = host; using namespace detail; for (;;) { diff --git a/cppa/policy/cooperative_scheduling.hpp b/cppa/policy/cooperative_scheduling.hpp index d29c9f52db..cf41ba2929 100644 --- a/cppa/policy/cooperative_scheduling.hpp +++ b/cppa/policy/cooperative_scheduling.hpp @@ -82,10 +82,11 @@ class cooperative_scheduling { } template - inline void launch(Actor* self) { + inline void launch(Actor* self, execution_unit* host) { // detached in scheduler::worker::run self->attach_to_scheduler(); - get_scheduling_coordinator()->enqueue(self); + if (host) host->exec_later(self); + else get_scheduling_coordinator()->enqueue(self); } template diff --git a/cppa/policy/event_based_resume.hpp b/cppa/policy/event_based_resume.hpp index 5f66dca374..f6887ebe39 100644 --- a/cppa/policy/event_based_resume.hpp +++ b/cppa/policy/event_based_resume.hpp @@ -39,6 +39,7 @@ #include "cppa/config.hpp" #include "cppa/extend.hpp" +#include "cppa/logging.hpp" #include "cppa/behavior.hpp" #include "cppa/scheduler.hpp" #include "cppa/actor_state.hpp" @@ -74,13 +75,13 @@ class event_based_resume { resumable::resume_result resume(detail::cs_thread*, execution_unit* host) override { + CPPA_REQUIRE(host != nullptr); auto d = dptr(); d->m_host = host; CPPA_LOG_TRACE("id = " << d->id() << ", state = " << static_cast(d->state())); CPPA_REQUIRE( d->state() == actor_state::ready || d->state() == actor_state::pending); - CPPA_PUSH_AID(d->id()); auto done_cb = [&]() -> bool { CPPA_LOG_TRACE(""); d->bhvr_stack().clear(); diff --git a/cppa/policy/no_scheduling.hpp b/cppa/policy/no_scheduling.hpp index 7a17bfecd4..b2a38e000b 100644 --- a/cppa/policy/no_scheduling.hpp +++ b/cppa/policy/no_scheduling.hpp @@ -105,7 +105,7 @@ class no_scheduling { } template - void launch(Actor* self) { + void launch(Actor* self, execution_unit*) { CPPA_PUSH_AID(self->id()); CPPA_LOG_TRACE(CPPA_ARG(self)); CPPA_REQUIRE(self != nullptr); diff --git a/cppa/spawn.hpp b/cppa/spawn.hpp index cc708705ae..67a6577a9f 100644 --- a/cppa/spawn.hpp +++ b/cppa/spawn.hpp @@ -52,10 +52,14 @@ namespace cppa { +class execution_unit; + namespace detail { template -intrusive_ptr spawn_impl(BeforeLaunch before_launch_fun, Ts&&... args) { +intrusive_ptr spawn_impl(execution_unit* host, + BeforeLaunch before_launch_fun, + Ts&&... args) { static_assert(!std::is_base_of::value || has_blocking_api_flag(Os), "C is derived type of blocking_actor but " @@ -69,7 +73,7 @@ intrusive_ptr spawn_impl(BeforeLaunch before_launch_fun, Ts&&... args) { if (has_blocking_api_flag(Os) && !has_detach_flag(Os) && detail::cs_thread::is_disabled_feature) { - return spawn_impl(before_launch_fun, + return spawn_impl(host, before_launch_fun, std::forward(args)...); } using scheduling_policy = typename std::conditional< @@ -102,9 +106,10 @@ intrusive_ptr spawn_impl(BeforeLaunch before_launch_fun, Ts&&... args) { invoke_policy>; using proper_impl = detail::proper_actor; auto ptr = make_counted(std::forward(args)...); + CPPA_LOGF_DEBUG("spawned actor with ID " << ptr->id()); CPPA_PUSH_AID(ptr->id()); before_launch_fun(ptr.get()); - ptr->launch(has_hide_flag(Os)); + ptr->launch(has_hide_flag(Os), host); return ptr; } @@ -127,17 +132,51 @@ struct spawn_fwd { static inline actor fwd(T& arg) { return arg; } }; +} // namespace detail + // forwards the arguments to spawn_impl, replacing pointers // to actors with instances of 'actor' template -intrusive_ptr spawn_fwd_args(BeforeLaunch before_launch_fun, Ts&&... args) { - return spawn_impl( - before_launch_fun, - spawn_fwd::type>::fwd( +intrusive_ptr spawn_class(execution_unit* host, + BeforeLaunch before_launch_fun, + Ts&&... args) { + return detail::spawn_impl(host, + before_launch_fun, + detail::spawn_fwd::type>::fwd( std::forward(args))...); } -} // namespace detail +template +actor spawn_functor(execution_unit* eu, + BeforeLaunch cb, + F fun, + Ts&&... args) { + typedef typename util::get_callable_trait::type trait; + typedef typename trait::arg_types arg_types; + typedef typename util::tl_head::type first_arg; + constexpr bool is_blocking = has_blocking_api_flag(Os); + constexpr bool has_ptr_arg = std::is_pointer::value; + constexpr bool has_blocking_self = std::is_same< + first_arg, + blocking_actor* + >::value; + constexpr bool has_nonblocking_self = std::is_same< + first_arg, + event_based_actor* + >::value; + static_assert(!is_blocking || has_blocking_self || !has_ptr_arg, + "functor-based actors with blocking_actor* as first " + "argument need to be spawned using the blocking_api flag"); + static_assert(is_blocking || has_nonblocking_self || !has_ptr_arg, + "functor-based actors with event_based_actor* as first " + "argument cannot be spawned using the blocking_api flag"); + using base_class = typename std::conditional< + is_blocking, + detail::functor_based_blocking_actor, + detail::functor_based_actor + >::type; + return spawn_class(eu, cb, fun, std::forward(args)...); +} /** * @ingroup ActorCreation @@ -151,11 +190,10 @@ intrusive_ptr spawn_fwd_args(BeforeLaunch before_launch_fun, Ts&&... args) { * @tparam Os Optional flags to modify spawn's behavior. * @returns An {@link actor} to the spawned {@link actor}. */ -template +template actor spawn(Ts&&... args) { - return detail::spawn_fwd_args( - [](C*) { /* no-op as BeforeLaunch callback */ }, - std::forward(args)...); + return spawn_class(nullptr, empty_before_launch_callback{}, + std::forward(args)...); } /** @@ -164,52 +202,40 @@ actor spawn(Ts&&... args) { * @tparam Os Optional flags to modify spawn's behavior. * @returns An {@link actor} to the spawned {@link actor}. */ -//template -template +template actor spawn(Ts&&... args) { static_assert(sizeof...(Ts) > 0, "too few arguments provided"); - using base_class = typename std::conditional< - has_blocking_api_flag(Os), - detail::functor_based_blocking_actor, - detail::functor_based_actor - >::type; - return spawn(std::forward(args)...); + return spawn_functor(nullptr, empty_before_launch_callback{}, + std::forward(args)...); } /** - * @brief Spawns a new actor that evaluates given arguments and - * immediately joins @p grp. - * @param args A functor followed by its arguments. + * @brief Spawns an actor of type @p C that immediately joins @p grp. + * @param args Constructor arguments. + * @tparam C Subtype of {@link event_based_actor} or {@link sb_actor}. * @tparam Os Optional flags to modify spawn's behavior. * @returns An {@link actor} to the spawned {@link actor}. * @note The spawned has joined the group before this function returns. */ -template +template actor spawn_in_group(const group& grp, Ts&&... args) { - static_assert(sizeof...(Ts) > 0, "too few arguments provided"); - using base_class = typename std::conditional< - has_blocking_api_flag(Os), - detail::functor_based_blocking_actor, - detail::functor_based_actor - >::type; - return detail::spawn_fwd_args( - [&](base_class* ptr) { ptr->join(grp); }, - std::forward(args)...); + return spawn_class(nullptr, group_subscriber{grp}, + std::forward(args)...); } /** - * @brief Spawns an actor of type @p C that immediately joins @p grp. - * @param args Constructor arguments. - * @tparam C Subtype of {@link event_based_actor} or {@link sb_actor}. + * @brief Spawns a new actor that evaluates given arguments and + * immediately joins @p grp. + * @param args A functor followed by its arguments. * @tparam Os Optional flags to modify spawn's behavior. * @returns An {@link actor} to the spawned {@link actor}. * @note The spawned has joined the group before this function returns. */ -template +template actor spawn_in_group(const group& grp, Ts&&... args) { - return detail::spawn_fwd_args( - [&](C* ptr) { ptr->join(grp); }, - std::forward(args)...); + static_assert(sizeof...(Ts) > 0, "too few arguments provided"); + return spawn_functor(nullptr, group_subscriber{grp}, + std::forward(args)...); } namespace detail { @@ -318,30 +344,23 @@ struct infer_typed_actor_base*> { * @tparam Os Optional flags to modify spawn's behavior. * @returns A {@link typed_actor} handle to the spawned actor. */ -template +template typename detail::actor_handle_from_signature_list< typename C::signatures >::type spawn_typed(Ts&&... args) { - return detail::spawn_fwd_args( - [&](C*) { }, - std::forward(args)...); + return spawn_class(nullptr, empty_before_launch_callback{}, + std::forward(args)...); } -/** - * @brief Spawns a typed actor from a functor. - * @param args A functor followed by its arguments. - * @tparam Os Optional flags to modify spawn's behavior. - * @returns An {@link actor} to the spawned {@link actor}. - */ -template +template typename detail::infer_typed_actor_handle< typename util::get_callable_trait::result_type, typename util::tl_head< typename util::get_callable_trait::arg_types >::type >::type -spawn_typed(F fun, Ts&&... args) { +spawn_typed_functor(execution_unit* eu, BeforeLaunch bl, F fun, Ts&&... args) { typedef typename detail::infer_typed_actor_base< typename util::get_callable_trait::result_type, typename util::tl_head< @@ -349,9 +368,25 @@ spawn_typed(F fun, Ts&&... args) { >::type >::type impl; - return detail::spawn_fwd_args( - [&](impl*) { }, - std::move(fun), std::forward(args)...); + return spawn_class(eu, bl, fun, std::forward(args)...); +} + +/** + * @brief Spawns a typed actor from a functor. + * @param args A functor followed by its arguments. + * @tparam Os Optional flags to modify spawn's behavior. + * @returns An {@link actor} to the spawned {@link actor}. + */ +template +typename detail::infer_typed_actor_handle< + typename util::get_callable_trait::result_type, + typename util::tl_head< + typename util::get_callable_trait::arg_types + >::type +>::type +spawn_typed(F fun, Ts&&... args) { + return spawn_typed_functor(nullptr, empty_before_launch_callback{}, + std::move(fun), std::forward(args)...); } /** @} */ diff --git a/cppa/spawn_fwd.hpp b/cppa/spawn_fwd.hpp index 5b4b4426af..dd903565f8 100644 --- a/cppa/spawn_fwd.hpp +++ b/cppa/spawn_fwd.hpp @@ -34,6 +34,7 @@ #ifndef CPPA_SPAWN_FWD_HPP #define CPPA_SPAWN_FWD_HPP +#include "cppa/group.hpp" #include "cppa/typed_actor.hpp" #include "cppa/spawn_options.hpp" @@ -41,21 +42,42 @@ namespace cppa { -/****************************************************************************** - * untyped actors * - ******************************************************************************/ +template +intrusive_ptr spawn_class(execution_unit* host, + BeforeLaunch before_launch_fun, + Ts&&... args); + +template +actor spawn_functor(execution_unit* host, + BeforeLaunch before_launch_fun, + F fun, + Ts&&... args); + +class group_subscriber { + + public: + + inline group_subscriber(const group& grp) : m_grp(grp) { } + + template + inline void operator()(T* ptr) const { + ptr->join(m_grp); + } -template -actor spawn(Ts&&... args); + private: -template -actor spawn(Ts&&... args); + group m_grp; -template -actor spawn_in_group(const group&, Ts&&... args); +}; + +class empty_before_launch_callback { -template -actor spawn_in_group(const group&, Ts&&... args); + public: + + template + inline void operator()(T*) const { } + +}; /****************************************************************************** * typed actors * @@ -88,20 +110,14 @@ struct actor_handle_from_signature_list> { } // namespace detail -template -typename detail::actor_handle_from_signature_list< - typename Impl::signatures ->::type -spawn_typed(Ts&&... args); - -template +template typename detail::infer_typed_actor_handle< typename util::get_callable_trait::result_type, typename util::tl_head< typename util::get_callable_trait::arg_types >::type >::type -spawn_typed(F fun, Ts&&... args); +spawn_typed_functor(execution_unit*, BeforeLaunch bl, F fun, Ts&&... args); } // namespace cppa diff --git a/src/logging.cpp b/src/logging.cpp index 8fe3a8b240..3ab6e0c7d0 100644 --- a/src/logging.cpp +++ b/src/logging.cpp @@ -104,12 +104,6 @@ class logging_impl : public logging { } } - actor_id set_aid(actor_id aid) override { - actor_id prev = t_self_id; - t_self_id = aid; - return prev; - } - void log(const char* level, const char* c_class_name, const char* function_name, @@ -170,4 +164,10 @@ logging::~logging() { } logging* logging::create_singleton() { return new logging_impl; } +actor_id logging::set_aid(actor_id aid) { + actor_id prev = t_self_id; + t_self_id = aid; + return prev; +} + } // namespace cppa diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 6a0ed029dc..1aadda9452 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -242,6 +242,7 @@ class coordinator::shutdown_helper : public resumable { resumable::resume_result resume(detail::cs_thread*, execution_unit* ptr) { auto w = dynamic_cast(ptr); CPPA_REQUIRE(w != nullptr); + CPPA_LOG_DEBUG("shutdown_helper::resume => shutdown worker"); w->m_running = false; std::unique_lock guard(mtx); last_worker = w; @@ -328,7 +329,7 @@ void coordinator::enqueue(resumable* what) { ******************************************************************************/ worker::worker(size_t id, coordinator* parent) - : m_running(false), m_id(id), m_last_victim(id), m_parent(parent) { } + : m_running(true), m_id(id), m_last_victim(id), m_parent(parent) { } void worker::start() { @@ -389,15 +390,20 @@ void worker::run() { } }; // scheduling loop - m_running = true; while (m_running) { local_poll() || aggressive_poll() || moderate_poll() || relaxed_poll(); CPPA_LOG_DEBUG("dequeued new job"); + CPPA_PUSH_AID_FROM_PTR(dynamic_cast(job)); if (job->resume(&fself, this) == resumable::done) { // was attached in policy::cooperative_scheduling::launch job->detach_from_scheduler(); } job = nullptr; + // give others the opportunity to steal from us + if (m_job_list.size() > 1 && m_exposed_queue.empty()) { + m_exposed_queue.push_back(m_job_list.front()); + m_job_list.erase(m_job_list.begin()); + } } } @@ -412,7 +418,12 @@ worker::job_ptr worker::raid() { m_last_victim = (m_last_victim + 1) % n; if (m_last_victim != m_id) { auto job = m_parent->worker_by_id(m_last_victim)->try_steal(); - if (job) return job; + if (job) { + CPPA_LOG_DEBUG("worker " << m_id + << " has successfully stolen a job from " + << m_last_victim); + return job; + } } } return nullptr;