diff --git a/hpx/lcos/base_lco.hpp b/hpx/lcos/base_lco.hpp index 8be4c2cc821d..0f46aeb66ef0 100644 --- a/hpx/lcos/base_lco.hpp +++ b/hpx/lcos/base_lco.hpp @@ -23,7 +23,7 @@ namespace hpx { namespace lcos /// implementing a simple set_event action class HPX_API_EXPORT base_lco { - protected: + public: virtual void set_event () = 0; virtual void set_exception (boost::exception_ptr const& e); @@ -34,7 +34,6 @@ namespace hpx { namespace lcos // noop by default virtual void disconnect(naming::id_type const &); - public: // components must contain a typedef for wrapping_type defining the // managed_component type used to encapsulate instances of this // component diff --git a/hpx/lcos/detail/async_implementations.hpp b/hpx/lcos/detail/async_implementations.hpp index 7469c678de35..08ca17f9626a 100644 --- a/hpx/lcos/detail/async_implementations.hpp +++ b/hpx/lcos/detail/async_implementations.hpp @@ -15,6 +15,8 @@ #include #include +#include + namespace hpx { namespace detail { /// \cond NOINTERNAL @@ -41,31 +43,21 @@ namespace hpx { namespace detail struct sync_local_invoke { template - static lcos::future call( - naming::id_type const& id, naming::address && addr, - Ts &&... vs) + static lcos::future + call(naming::id_type const& id, naming::address && addr, Ts &&... vs) { - lcos::packaged_action p; bool target_is_managed = false; + naming::id_type id1; if (id.get_management_type() == naming::id_type::managed) { - naming::id_type id1(id.get_gid(), naming::id_type::unmanaged); - if (addr) - { - p.apply(launch::sync, std::move(addr), id1, - std::forward(vs)...); - } - else - { - p.apply(launch::sync, id1, std::forward(vs)...); - } + id1 = naming::id_type(id.get_gid(), naming::id_type::unmanaged); target_is_managed = true; } - else - { - p.apply(launch::sync, id, std::forward(vs)...); - } + + lcos::packaged_action p; + p.apply(std::move(addr), target_is_managed ? id1 : id, + std::forward(vs)...); // keep id alive, if needed - this allows to send the destination // as an unmanaged id @@ -85,54 +77,44 @@ namespace hpx { namespace detail } }; - template - struct sync_local_invoke > + template + struct sync_local_invoke > { template - HPX_FORCEINLINE static lcos::future call( - boost::mpl::true_, naming::id_type const&, - naming::address && addr, Ts &&... vs) + HPX_FORCEINLINE static lcos::future + call(naming::id_type const&, naming::address && addr, Ts &&... vs) { + HPX_ASSERT(!!addr); HPX_ASSERT(traits::component_type_is_compatible< - typename Action::component_type>::call(addr)); + typename Action::component_type + >::call(addr)); return Action::execute_function(addr.address_, std::forward(vs)...); } }; - /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////////// template struct sync_local_invoke_cb { template - static lcos::future call( - naming::id_type const& id, naming::address && addr, - Callback && cb, Ts &&... vs) + static lcos::future + call(naming::id_type const& id, naming::address && addr, Callback && cb, + Ts &&... vs) { - lcos::packaged_action p; bool target_is_managed = false; + naming::id_type id1; if (id.get_management_type() == naming::id_type::managed) { - naming::id_type id1(id.get_gid(), naming::id_type::unmanaged); - if (addr) - { - p.apply_cb(launch::sync, std::move(addr), id1, - std::forward(cb), std::forward(vs)...); - } - else - { - p.apply_cb(launch::sync, id1, std::forward(cb), - std::forward(vs)...); - } + id1 = naming::id_type(id.get_gid(), naming::id_type::unmanaged); target_is_managed = true; } - else - { - p.apply_cb(launch::sync, id, std::forward(cb), - std::forward(vs)...); - } + + lcos::packaged_action p; + p.apply_cb(std::move(addr), target_is_managed ? id1 : id, + std::forward(cb), std::forward(vs)...); // keep id alive, if needed - this allows to send the destination // as an unmanaged id @@ -152,19 +134,21 @@ namespace hpx { namespace detail } }; - template - struct sync_local_invoke_cb > + template + struct sync_local_invoke_cb > { template - HPX_FORCEINLINE static lcos::future call( - boost::mpl::true_, naming::id_type const&, - naming::address && addr, Callback && cb, Ts&&... vs) + HPX_FORCEINLINE static lcos::future + call(naming::id_type const&, naming::address && addr, Callback && cb, + Ts &&... vs) { + HPX_ASSERT(!!addr); HPX_ASSERT(traits::component_type_is_compatible< - typename Action::component_type>::call(addr)); + typename Action::component_type + >::call(addr)); - lcos::future f = Action::execute_function(addr.address_, - std::forward(vs)...); + lcos::future f = Action::execute_function( + addr.address_, std::forward(vs)...); // invoke callback cb(boost::system::error_code(), parcelset::parcel()); @@ -194,35 +178,39 @@ namespace hpx { namespace detail call(id, std::move(addr), std::forward(vs)...); } - lcos::packaged_action p; - bool target_is_managed = false; + naming::id_type id1; + future f; + + if (id.get_management_type() == naming::id_type::managed) + { + id1 = naming::id_type(id.get_gid(), naming::id_type::unmanaged); + target_is_managed = true; + } + if (policy == launch::sync || hpx::detail::has_async_policy(policy)) { - if (id.get_management_type() == naming::id_type::managed) - { - naming::id_type id1(id.get_gid(), naming::id_type::unmanaged); - if (addr) - { - p.apply(policy, std::move(addr), id1, - std::forward(vs)...); - } - else - { - p.apply(policy, id1, std::forward(vs)...); - } - target_is_managed = true; - } - else - { - p.apply(policy, id, std::forward(vs)...); - } + lcos::packaged_action p; + p.apply(std::move(addr), target_is_managed ? id1 : id, + std::forward(vs)...); + f = p.get_future(); + } + else if (policy == launch::deferred) + { + lcos::packaged_action p; + p.apply_deferred(std::move(addr), target_is_managed ? id1 : id, + std::forward(vs)...); + f = p.get_future(); + } + else + { + HPX_THROW_EXCEPTION(bad_parameter, + "async_impl", "unknown launch policy"); + return f; } // keep id alive, if needed - this allows to send the destination as an // unmanaged id - future f = p.get_future(); - if (target_is_managed) { typedef typename traits::detail::shared_state_ptr_for< @@ -260,37 +248,39 @@ namespace hpx { namespace detail std::forward(vs)...); } - lcos::packaged_action p; - + future f; bool target_is_managed = false; + naming::id_type id1; + + if (id.get_management_type() == naming::id_type::managed) + { + id1 = naming::id_type(id.get_gid(), naming::id_type::unmanaged); + target_is_managed = true; + } + if (policy == launch::sync || hpx::detail::has_async_policy(policy)) { - if (id.get_management_type() == naming::id_type::managed) - { - naming::id_type id1(id.get_gid(), naming::id_type::unmanaged); - if (addr) - { - p.apply_cb(policy, std::move(addr), id1, - std::forward(cb), std::forward(vs)...); - } - else - { - p.apply_cb(policy, id1, std::forward(cb), - std::forward(vs)...); - } - target_is_managed = true; - } - else - { - p.apply_cb(policy, id, std::forward(cb), - std::forward(vs)...); - } + lcos::packaged_action p; + p.apply_cb(std::move(addr), target_is_managed ? id1 : id, + std::forward(cb), std::forward(vs)...); + f = p.get_future(); + } + else if (policy == launch::deferred) + { + lcos::packaged_action p; + p.apply_deferred_cb(std::move(addr), target_is_managed ? id1 : id, + std::forward(cb), std::forward(vs)...); + f = p.get_future(); + } + else + { + HPX_THROW_EXCEPTION(bad_parameter, + "async_cb_impl", "unknown launch policy"); + return f; } // keep id alive, if needed - this allows to send the destination // as an unmanaged id - future f = p.get_future(); - if (target_is_managed) { typedef typename traits::detail::shared_state_ptr_for< diff --git a/hpx/lcos/detail/future_data.hpp b/hpx/lcos/detail/future_data.hpp index 70f111753ca6..f66ac0926910 100644 --- a/hpx/lcos/detail/future_data.hpp +++ b/hpx/lcos/detail/future_data.hpp @@ -689,12 +689,12 @@ namespace detail protected: typedef typename future_data::result_type result_type; - threads::thread_id_type get_id() const + threads::thread_id_type get_thread_id() const { boost::lock_guard l(this->mtx_); return id_; } - void set_id(threads::thread_id_type id) + void set_thread_id(threads::thread_id_type id) { boost::lock_guard l(this->mtx_); id_ = id; @@ -729,8 +729,7 @@ namespace detail { if (!started_test_and_set()) this->do_run(); - else - this->future_data::wait(ec); + this->future_data::wait(ec); } virtual BOOST_SCOPED_ENUM(future_status) @@ -817,11 +816,11 @@ namespace detail reset_id(task_base& target) : target_(target) { - target.set_id(threads::get_self_id()); + target.set_thread_id(threads::get_self_id()); } ~reset_id() { - target_.set_id(threads::invalid_thread_id); + target_.set_thread_id(threads::invalid_thread_id); } task_base& target_; }; @@ -838,13 +837,11 @@ namespace detail template void set_data(T && result) { - HPX_ASSERT(started_); - this->future_data::set_value(std::forward(result)); + this->future_data::set_data(std::forward(result)); } void set_exception(boost::exception_ptr const& e) { - HPX_ASSERT(started_); this->future_data::set_exception(e); } diff --git a/hpx/lcos/packaged_action.hpp b/hpx/lcos/packaged_action.hpp index 3a61434c3142..445e7625988d 100644 --- a/hpx/lcos/packaged_action.hpp +++ b/hpx/lcos/packaged_action.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2007-2015 Hartmut Kaiser // Copyright (c) 2011 Bryce Lelbach // // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -9,15 +9,10 @@ #include #include -#include #include -#include -#include -#include +#include #include #include -#include -#include #include #include #include @@ -65,18 +60,17 @@ namespace hpx { namespace lcos : public promise::remote_result_type> { - private: + protected: typedef typename hpx::actions::extract_action::type action_type; typedef promise base_type; - struct profiler_tag {}; - static void parcel_write_handler( boost::intrusive_ptr impl, boost::system::error_code const& ec, parcelset::parcel const& p) { // any error in the parcel layer will be stored in the future object - if (ec) { + if (ec) + { boost::exception_ptr exception = hpx::detail::get_exception(hpx::exception(ec), "packaged_action::parcel_write_handler", @@ -91,7 +85,8 @@ namespace hpx { namespace lcos boost::system::error_code const& ec, parcelset::parcel const& p) { // any error in the parcel layer will be stored in the future object - if (ec) { + if (ec) + { boost::exception_ptr exception = hpx::detail::get_exception(hpx::exception(ec), "packaged_action::parcel_write_handler", @@ -103,276 +98,265 @@ namespace hpx { namespace lcos cb(ec, p); } - public: - /// Construct a (non-functional) instance of an \a packaged_action. To use - /// this instance its member function \a apply needs to be directly - /// called. - packaged_action() - : apply_logger_("packaged_action") + /////////////////////////////////////////////////////////////////////// + template + void do_apply(naming::address && addr, naming::id_type const& id, + threads::thread_priority priority, Ts&&... vs) { - LLCO_(info) << "packaged_action::packaged_action(" + LLCO_(info) << "packaged_action::do_apply(" //-V128 << hpx::actions::detail::get_action_name() - << ") args(0)"; - } - - /// The apply function starts the asynchronous operations encapsulated - /// by this eager future. - /// - /// \param gid [in] The global id of the target component to use to - /// apply the action. - template - void apply(BOOST_SCOPED_ENUM(launch) policy, naming::id_type const& gid, - Ts&&... vs) - { - util::block_profiler_wrapper bp(apply_logger_); + << ", " << id << ") args(" << sizeof...(Ts) << ")"; naming::id_type cont_id(this->get_id()); naming::detail::set_dont_store_in_cache(cont_id); - hpx::apply_c_cb(cont_id, gid, - util::bind(&packaged_action::parcel_write_handler, - this->impl_, util::placeholders::_1, util::placeholders::_2), - std::forward(vs)...); + using util::placeholders::_1; + using util::placeholders::_2; + + auto f = util::bind(&packaged_action::parcel_write_handler, + this->impl_, _1, _2); + + if (addr) + { + hpx::apply_c_p_cb( + cont_id, std::move(addr), id, priority, std::move(f), + std::forward(vs)...); + } + else + { + hpx::apply_c_p_cb( + cont_id, id, priority, std::move(f), + std::forward(vs)...); + } } - template - void apply(BOOST_SCOPED_ENUM(launch) policy, naming::address&& addr, - naming::id_type const& gid, Ts&&... vs) + template + void do_apply(naming::id_type const& id, + threads::thread_priority priority, Ts&&... vs) { - util::block_profiler_wrapper bp(apply_logger_); + LLCO_(info) << "packaged_action::do_apply(" //-V128 + << hpx::actions::detail::get_action_name() + << ", " << id << ") args(" << sizeof...(Ts) << ")"; naming::id_type cont_id(this->get_id()); naming::detail::set_dont_store_in_cache(cont_id); - hpx::apply_c_cb(cont_id, std::move(addr), gid, - util::bind(&packaged_action::parcel_write_handler, - this->impl_, util::placeholders::_1, util::placeholders::_2), + using util::placeholders::_1; + using util::placeholders::_2; + + hpx::apply_c_p_cb( + cont_id, id, priority, + util::bind( + &packaged_action::parcel_write_handler, + this->impl_, _1, _2), std::forward(vs)...); } - template - void apply_cb(BOOST_SCOPED_ENUM(launch) policy, - naming::id_type const& gid, Callback && cb, Ts&&... vs) + template + void do_apply_cb(naming::address&& addr, naming::id_type const& id, + threads::thread_priority priority, Callback && cb, Ts&&... vs) { - util::block_profiler_wrapper bp(apply_logger_); - - typedef typename util::decay::type callback_type; + LLCO_(info) << "packaged_action::do_apply_cb(" //-V128 + << hpx::actions::detail::get_action_name() + << ", " << id << ") args(" << sizeof...(Ts) << ")"; naming::id_type cont_id(this->get_id()); naming::detail::set_dont_store_in_cache(cont_id); - hpx::apply_c_cb(cont_id, gid, - util::bind( - &packaged_action::parcel_write_handler_cb, - util::protect(std::forward(cb)), this->impl_, - util::placeholders::_1, util::placeholders::_2), - std::forward(vs)...); + using util::placeholders::_1; + using util::placeholders::_2; + + typedef typename util::decay::type callback_type; + + auto f = util::bind( + &packaged_action::parcel_write_handler_cb, + util::protect(std::forward(cb)), this->impl_, _1, _2); + + if (addr) + { + hpx::apply_c_p_cb( + cont_id, std::move(addr), id, priority, std::move(cb), + std::forward(vs)...); + } + else + { + hpx::apply_c_p_cb( + cont_id, id, priority, std::move(cb), + std::forward(vs)...); + } } - template - void apply_cb(BOOST_SCOPED_ENUM(launch) policy, naming::address&& addr, - naming::id_type const& gid, Callback && cb, Ts&&... vs) + template + void do_apply_cb(naming::id_type const& id, + threads::thread_priority priority, Callback && cb, Ts&&... vs) { - util::block_profiler_wrapper bp(apply_logger_); - - typedef typename util::decay::type callback_type; + LLCO_(info) << "packaged_action::do_apply_cb(" //-V128 + << hpx::actions::detail::get_action_name() + << ", " << id << ") args(" << sizeof...(Ts) << ")"; naming::id_type cont_id(this->get_id()); naming::detail::set_dont_store_in_cache(cont_id); - hpx::apply_c_cb(cont_id, std::move(addr), gid, + using util::placeholders::_1; + using util::placeholders::_2; + + typedef typename util::decay::type callback_type; + + hpx::apply_c_p_cb( + cont_id, id, priority, util::bind( &packaged_action::parcel_write_handler_cb, - util::protect(std::forward(cb)), this->impl_, - util::placeholders::_1, util::placeholders::_2), + util::protect(std::forward(cb)), + this->impl_, _1, _2), std::forward(vs)...); } + public: + // Construct a (non-functional) instance of an \a packaged_action. To + // use this instance its member function \a apply needs to be directly + // called. + packaged_action() {} + + /////////////////////////////////////////////////////////////////////// template - void apply_p(BOOST_SCOPED_ENUM(launch) policy, naming::id_type const& gid, - threads::thread_priority priority, Ts&&... vs) + void apply(naming::id_type const& id, Ts &&... vs) { - util::block_profiler_wrapper bp(apply_logger_); - - naming::id_type cont_id(this->get_id()); - naming::detail::set_dont_store_in_cache(cont_id); + do_apply(id, actions::action_priority(), + std::forward(vs)...); + } - hpx::apply_c_p_cb(cont_id, gid, priority, - util::bind(&packaged_action::parcel_write_handler, - this->impl_, util::placeholders::_1, util::placeholders::_2), + template + void apply(naming::address && addr, naming::id_type const& id, + Ts &&... vs) + { + do_apply(std::move(addr), id, + actions::action_priority(), std::forward(vs)...); } + template + void apply_cb(naming::id_type const& id, Callback && cb, Ts &&... vs) + { + do_apply_cb(id, actions::action_priority(), + std::forward(cb), std::forward(vs)...); + } + + template + void apply_cb(naming::address && addr, naming::id_type const& id, + Callback && cb, Ts &&... vs) + { + do_apply_cb(std::move(addr), id, + actions::action_priority(), + std::forward(cb), std::forward(vs)...); + } + template - void apply_p(BOOST_SCOPED_ENUM(launch) policy, naming::address&& addr, - naming::id_type const& gid, threads::thread_priority priority, - Ts&&... vs) + void apply_p(naming::id_type const& id, + threads::thread_priority priority, Ts &&... vs) { - util::block_profiler_wrapper bp(apply_logger_); + do_apply(id, priority, std::forward(vs)...); + } - naming::id_type cont_id(this->get_id()); - naming::detail::set_dont_store_in_cache(cont_id); + template + void apply_p(naming::address && addr, naming::id_type const& id, + threads::thread_priority priority, Ts &&... vs) + { + do_apply(std::move(addr), id, priority, std::forward(vs)...); + } - hpx::apply_c_p_cb(cont_id, std::move(addr), - gid, priority, - util::bind(&packaged_action::parcel_write_handler, - this->impl_, util::placeholders::_1, util::placeholders::_2), + template + void apply_p_cb(naming::id_type const& id, + threads::thread_priority priority, Callback && cb, Ts &&... vs) + { + do_apply_cb(id, priority, std::forward(cb), std::forward(vs)...); } template - void apply_p_cb(BOOST_SCOPED_ENUM(launch) policy, naming::id_type const& gid, + void apply_p_cb(naming::address&& addr, naming::id_type const& id, threads::thread_priority priority, Callback && cb, Ts&&... vs) { - util::block_profiler_wrapper bp(apply_logger_); + do_apply_cb(std::move(addr), id, priority, + std::forward(cb), std::forward(vs)...); + } - typedef typename util::decay::type callback_type; + /////////////////////////////////////////////////////////////////////// + template + void apply_deferred(naming::address && addr, + naming::id_type const& id, Ts &&... vs) + { + LLCO_(info) << "packaged_action::apply_deferred(" //-V128 + << hpx::actions::detail::get_action_name() + << ", " << id << ") args(" << sizeof...(Ts) << ")"; naming::id_type cont_id(this->get_id()); naming::detail::set_dont_store_in_cache(cont_id); - hpx::apply_c_p_cb(cont_id, gid, priority, + using util::placeholders::_1; + using util::placeholders::_2; + + auto f = hpx::functional::apply_c_p_cb( + cont_id, std::move(addr), id, + actions::action_priority(), util::bind( - &packaged_action::parcel_write_handler_cb, - util::protect(std::forward(cb)), this->impl_, - util::placeholders::_1, util::placeholders::_2), + &packaged_action::parcel_write_handler, this->impl_, _1, _2 + ), std::forward(vs)...); + + this->base_type::set_task(std::move(f)); } template - void apply_p_cb(BOOST_SCOPED_ENUM(launch) policy, naming::address&& addr, - naming::id_type const& gid, threads::thread_priority priority, - Callback && cb, Ts&&... vs) + void apply_deferred_cb(naming::address && addr, + naming::id_type const& id, Callback && cb, Ts &&... vs) { - util::block_profiler_wrapper bp(apply_logger_); - - typedef typename util::decay::type callback_type; + LLCO_(info) << "packaged_action::apply_deferred(" //-V128 + << hpx::actions::detail::get_action_name() + << ", " << id << ") args(" << sizeof...(Ts) << ")"; naming::id_type cont_id(this->get_id()); naming::detail::set_dont_store_in_cache(cont_id); - hpx::apply_c_p_cb(cont_id, std::move(addr), - gid, priority, + using util::placeholders::_1; + using util::placeholders::_2; + + typedef typename util::decay::type callback_type; + + auto f = hpx::functional::apply_c_p_cb( + cont_id, std::move(addr), id, + actions::action_priority(), util::bind( - &packaged_action::parcel_write_handler_cb, - util::protect(std::forward(cb)), this->impl_, - util::placeholders::_1, util::placeholders::_2), + &packaged_action::parcel_write_handler_cb, + util::protect(std::forward(cb)), + this->impl_, _1, _2 + ), std::forward(vs)...); - } - /// Construct a new \a packaged_action instance. The \a thread - /// supplied to the function \a packaged_action#get will be - /// notified as soon as the result of the operation associated with - /// this packaged_action instance has been returned. - /// - /// \param gid [in] The global id of the target component to use to - /// apply the action. - /// - /// \note The result of the requested operation is expected to - /// be returned as the first parameter using a - /// \a base_lco#set_value action. Any error has to be - /// reported using a \a base_lco::set_exception action. The - /// target for either of these actions has to be this - /// packaged_action instance (as it has to be sent along - /// with the action as the continuation parameter). - template - packaged_action(naming::id_type const& gid, Ts&&... vs) - : apply_logger_("packaged_action::apply") - { - LLCO_(info) << "packaged_action::packaged_action(" //-V128 - << hpx::actions::detail::get_action_name() - << ", " - << gid - << ") args(" << sizeof...(Ts) << ")"; - apply(launch::all, gid, std::forward(vs)...); - } - - template - packaged_action(naming::gid_type const& gid, - threads::thread_priority priority, Ts&&... vs) - : apply_logger_("packaged_action::apply") - { - LLCO_(info) << "packaged_action::packaged_action(" //-V128 - << hpx::actions::detail::get_action_name() - << ", " - << gid - << ") args(" << sizeof...(Ts) << ")"; - apply_p(launch::all, naming::id_type(gid, naming::id_type::unmanaged), - priority, std::forward(vs)...); + this->base_type::set_task(std::move(f)); } - - util::block_profiler apply_logger_; }; /////////////////////////////////////////////////////////////////////////// template class packaged_action - : public promise::remote_result_type> + : public packaged_action { - private: - typedef typename hpx::actions::extract_action::type action_type; - typedef promise base_type; - - struct profiler_tag {}; - - static void parcel_write_handler( - boost::intrusive_ptr impl, - boost::system::error_code const& ec, parcelset::parcel const& p) - { - // any error in the parcel layer will be stored in the future object - if (ec) { - boost::exception_ptr exception = - hpx::detail::get_exception(hpx::exception(ec), - "packaged_action::parcel_write_handler", - __FILE__, __LINE__, parcelset::dump_parcel(p)); - (*impl)->set_exception(exception); - } - } - - template - static void parcel_write_handler_cb(Callback const& cb, - boost::intrusive_ptr impl, - boost::system::error_code const& ec, parcelset::parcel const& p) - { - // any error in the parcel layer will be stored in the future object - if (ec) { - boost::exception_ptr exception = - hpx::detail::get_exception(hpx::exception(ec), - "packaged_action::parcel_write_handler", - __FILE__, __LINE__, parcelset::dump_parcel(p)); - (*impl)->set_exception(exception); - } - - // invoke user supplied callback - cb(ec, p); - } + typedef typename packaged_action< + Action, Result, boost::mpl::false_ + >::action_type action_type; public: - /// Construct a (non-functional) instance of an \a packaged_action. To use - /// this instance its member function \a apply needs to be directly + /// Construct a (non-functional) instance of an \a packaged_action. To + /// use this instance its member function \a apply needs to be directly /// called. - packaged_action() - : apply_logger_("packaged_action_direct::apply") - { - LLCO_(info) << "packaged_action::packaged_action(" - << hpx::actions::detail::get_action_name() - << ") args(0)"; - } + packaged_action() {} - /// The apply function starts the asynchronous operations encapsulated - /// by this eager future. - /// - /// \param gid [in] The global id of the target component to use to - /// apply the action. + /////////////////////////////////////////////////////////////////////// template - void apply(BOOST_SCOPED_ENUM(launch) /*policy*/, - naming::id_type const& gid, Ts&&... vs) + void apply(naming::id_type const& id, Ts &&... vs) { - util::block_profiler_wrapper bp(apply_logger_); - naming::address addr; - if (agas::is_local_address_cached(gid, addr)) { + if (agas::is_local_address_cached(id, addr)) { // local, direct execution HPX_ASSERT(traits::component_type_is_compatible< typename Action::component_type>::call(addr)); @@ -382,23 +366,15 @@ namespace hpx { namespace lcos } else { // remote execution - naming::id_type cont_id(this->get_id()); - naming::detail::set_dont_store_in_cache(cont_id); - - hpx::applier::detail::apply_c_cb( - std::move(addr), cont_id, gid, - util::bind(&packaged_action::parcel_write_handler, - this->impl_, util::placeholders::_1, util::placeholders::_2), + this->do_apply(id, actions::action_priority(), std::forward(vs)...); } } template - void apply(BOOST_SCOPED_ENUM(launch) /*policy*/, naming::address&& addr, - naming::id_type const& gid, Ts&&... vs) + void apply(naming::address && addr, naming::id_type const& id, + Ts &&... vs) { - util::block_profiler_wrapper bp(apply_logger_); - if (addr.locality_ == hpx::get_locality()) { // local, direct execution HPX_ASSERT(traits::component_type_is_compatible< @@ -409,25 +385,17 @@ namespace hpx { namespace lcos } else { // remote execution - naming::id_type cont_id(this->get_id()); - naming::detail::set_dont_store_in_cache(cont_id); - - hpx::applier::detail::apply_c_cb( - std::move(addr), cont_id, gid, - util::bind(&packaged_action::parcel_write_handler, - this->impl_, util::placeholders::_1, util::placeholders::_2), + this->do_apply(std::move(addr), id, + actions::action_priority(), std::forward(vs)...); } } template - void apply_cb(BOOST_SCOPED_ENUM(launch) /*policy*/, - naming::id_type const& gid, Callback && cb, Ts&&... vs) + void apply_cb(naming::id_type const& id, Callback && cb, Ts&&... vs) { - util::block_profiler_wrapper bp(apply_logger_); - naming::address addr; - if (agas::is_local_address_cached(gid, addr)) { + if (agas::is_local_address_cached(id, addr)) { // local, direct execution HPX_ASSERT(traits::component_type_is_compatible< typename Action::component_type>::call(addr)); @@ -440,27 +408,15 @@ namespace hpx { namespace lcos } else { // remote execution - typedef typename util::decay::type callback_type; - - naming::id_type cont_id(this->get_id()); - naming::detail::set_dont_store_in_cache(cont_id); - - hpx::applier::detail::apply_c_cb( - std::move(addr), cont_id, gid, - util::bind( - &packaged_action::parcel_write_handler_cb, - util::protect(std::forward(cb)), this->impl_, - util::placeholders::_1, util::placeholders::_2), - std::forward(vs)...); + this->do_apply_cb(id, actions::action_priority(), + std::forward(cb), std::forward(vs)...); } } template - void apply_cb(BOOST_SCOPED_ENUM(launch) /*policy*/, naming::address&& addr, - naming::id_type const& gid, Callback && cb, Ts&&... vs) + void apply_cb(naming::address && addr, naming::id_type const& id, + Callback && cb, Ts &&... vs) { - util::block_profiler_wrapper bp(apply_logger_); - if (addr.locality_ == hpx::get_locality()) { // local, direct execution HPX_ASSERT(traits::component_type_is_compatible< @@ -474,49 +430,11 @@ namespace hpx { namespace lcos } else { // remote execution - typedef typename util::decay::type callback_type; - - naming::id_type cont_id(this->get_id()); - naming::detail::set_dont_store_in_cache(cont_id); - - hpx::applier::detail::apply_c_cb( - std::move(addr), cont_id, gid, - util::bind( - &packaged_action::parcel_write_handler_cb, - util::protect(std::forward(cb)), this->impl_, - util::placeholders::_1, util::placeholders::_2), - std::forward(vs)...); + this->do_apply_cb(std::move(addr), id, + actions::action_priority(), + std::forward(cb), std::forward(vs)...); } } - - /// Construct a new \a packaged_action instance. The \a thread - /// supplied to the function \a packaged_action#get will be - /// notified as soon as the result of the operation associated with - /// this packaged_action instance has been returned. - /// - /// \param gid [in] The global id of the target component to use to - /// apply the action. - /// - /// \note The result of the requested operation is expected to - /// be returned as the first parameter using a - /// \a base_lco#set_value action. Any error has to be - /// reported using a \a base_lco::set_exception action. The - /// target for either of these actions has to be this - /// packaged_action instance (as it has to be sent along - /// with the action as the continuation parameter). - template - packaged_action(naming::id_type const& gid, Ts&&... vs) - : apply_logger_("packaged_action_direct::apply") - { - LLCO_(info) << "packaged_action::packaged_action(" //-V128 - << hpx::actions::detail::get_action_name() - << ", " - << gid - << ") args(" << sizeof...(Ts) << ")"; - apply(launch::all, gid, std::forward(vs)...); - } - - util::block_profiler apply_logger_; }; }} diff --git a/hpx/lcos/promise.hpp b/hpx/lcos/promise.hpp index e7a7ac13ed0b..9f504ddc1c91 100644 --- a/hpx/lcos/promise.hpp +++ b/hpx/lcos/promise.hpp @@ -202,25 +202,40 @@ namespace hpx { namespace lcos { namespace detail /////////////////////////////////////////////////////////////////////////// template - class promise; + class promise_common; template - void intrusive_ptr_add_ref(promise* p); + void intrusive_ptr_add_ref(promise_common* p); template - void intrusive_ptr_release(promise* p); + void intrusive_ptr_release(promise_common* p); /////////////////////////////////////////////////////////////////////////// /// A promise can be used by a single thread to invoke a (remote) /// action and wait for the result. template - class promise + class promise_common : public promise_base, - public lcos::detail::future_data + public task_base { - protected: - typedef lcos::detail::future_data future_data_type; - typedef typename future_data_type::result_type result_type; + private: + void do_run() + { + if (!f_) + return; // do nothing if no deferred task is given + + try + { + f_(); // trigger action + this->wait(); // wait for value to come back + } + catch(...) + { + this->set_exception(boost::current_exception()); + } + } + + util::unique_function_nonser f_; public: // This is the component id. Every component needs to have an embedded @@ -228,22 +243,18 @@ namespace hpx { namespace lcos { namespace detail // to associate this component with a given action. enum { value = components::component_promise }; - promise() - {} + promise_common() {} // The implementation of the component is responsible for deleting the // actual managed component object - ~promise() + ~promise_common() { this->finalize(); } - // helper functions for setting data (if successful) or the error (if - // non-successful) - template - void set_local_data(T && result) + void set_task(util::unique_function_nonser && f) { - return this->set_data(std::forward(result)); + f_ = std::move(f); } /////////////////////////////////////////////////////////////////////// @@ -258,16 +269,7 @@ namespace hpx { namespace lcos { namespace detail void set_exception(boost::exception_ptr const& e) { - return this->future_data_type::set_exception(e); - } - - Result get_value(error_code& ec = throws) - { - typedef typename future_data_type::result_type result_type; - result_type* result = this->get_result(); - - // no error has been reported, return the result - return std::move(*result); + return this->task_base::set_exception(e); } void add_ref() @@ -312,17 +314,33 @@ namespace hpx { namespace lcos { namespace detail } // disambiguate reference counting - friend void intrusive_ptr_add_ref(promise* p) + friend void intrusive_ptr_add_ref(promise_common* p) { ++p->count_; } - friend void intrusive_ptr_release(promise* p) + friend void intrusive_ptr_release(promise_common* p) { if (p->requires_delete()) delete p; } + }; + + /////////////////////////////////////////////////////////////////////////// + template + class promise : public promise_common + { + public: + Result get_value(error_code& /*ec*/ = throws) + { + typedef typename task_base::result_type result_type; + result_type* result = this->get_result(); + + // no error has been reported, return the result + return std::move(*result); + } + private: template friend struct components::detail_adl_barrier::init; @@ -334,121 +352,25 @@ namespace hpx { namespace lcos { namespace detail } }; - /////////////////////////////////////////////////////////////////////////// template <> class promise - : public promise_base, - public lcos::detail::future_data + : public promise_common { - protected: - typedef lcos::detail::future_data future_data_type; - typedef future_data_type::result_type result_type; - public: - // This is the component id. Every component needs to have an embedded - // enumerator 'value' which is used by the generic action implementation - // to associate this component with a given action. - enum { value = components::component_promise }; - - promise() - {} - - // The implementation of the component is responsible for deleting the - // actual managed component object - ~promise() - { - this->finalize(); - } - - // helper functions for setting data (if successful) or the error (if - // non-successful) - template - void set_local_data(T && result) - { - return set_data(std::forward(result)); - } - - /////////////////////////////////////////////////////////////////////// - // exposed functionality of this component - - // trigger the future, set the result - void set_value (util::unused_type && result) - { - // set the received result, reset error status - set_data(std::move(result)); - } - - void set_exception(boost::exception_ptr const& e) - { - return this->future_data_type::set_exception(e); - } - util::unused_type get_value(error_code& /*ec*/ = throws) { this->get_result(); return util::unused; } - void add_ref() - { - intrusive_ptr_add_ref(this); - } - - void release() - { - intrusive_ptr_release(this); - } - - long count() const - { - return this->count_; - } - private: - bool requires_delete() - { - boost::unique_lock l(this->gid_.get_mutex()); - long counter = --this->count_; - - // special precautions for it to go out of scope - if (1 == counter && naming::detail::has_credits(this->gid_)) - { - // At this point, the remaining count has to be held by AGAS - // for this reason, we break the self-reference to allow for - // proper destruction - - // move all credits to a temporary id_type - naming::gid_type gid = this->gid_; - naming::detail::strip_credits_from_gid(this->gid_); - - naming::id_type id (gid, id_type::managed); - l.unlock(); - - return false; - } - - return 0 == counter; - } - - // disambiguate reference counting - friend void intrusive_ptr_add_ref(promise* p) - { - ++p->count_; - } - - friend void intrusive_ptr_release(promise* p) - { - if (p->requires_delete()) - delete p; - } - template friend struct components::detail_adl_barrier::init; void set_back_ptr(components::managed_component* bp) { HPX_ASSERT(bp); - HPX_ASSERT(gid_ == naming::invalid_gid); + HPX_ASSERT(this->gid_ == naming::invalid_gid); this->gid_ = bp->get_base_gid(); } }; @@ -590,6 +512,11 @@ namespace hpx { namespace lcos return *this; } + void set_task(util::unique_function_nonser && f) + { + (*impl_)->set_task(std::move(f)); + } + protected: template promise(Impl* impl) @@ -673,12 +600,6 @@ namespace hpx { namespace lcos (*impl_)->set_exception(e); // set the received error } - template - void set_local_data(T && result) - { - (*impl_)->set_local_data(std::forward(result)); - } - protected: boost::intrusive_ptr impl_; bool future_obtained_; @@ -694,7 +615,7 @@ namespace hpx { namespace lcos typedef detail::promise wrapped_type; typedef components::managed_component wrapping_type; - /// Construct a new \a future instance. The supplied + /// Construct a new \a promise instance. The supplied /// \a thread will be notified as soon as the result of the /// operation associated with this future instance has been /// returned. @@ -722,8 +643,7 @@ namespace hpx { namespace lcos rhs.future_obtained_ = false; } - virtual ~promise() - {} + virtual ~promise() {} promise& operator=(promise && rhs) { @@ -736,6 +656,11 @@ namespace hpx { namespace lcos return *this; } + void set_task(util::unique_function_nonser && f) + { + (*impl_)->set_task(std::move(f)); + } + protected: template promise(Impl* impl) diff --git a/hpx/runtime/applier/apply_callback.hpp b/hpx/runtime/applier/apply_callback.hpp index e394a7bbdc35..6413fe90ab8c 100644 --- a/hpx/runtime/applier/apply_callback.hpp +++ b/hpx/runtime/applier/apply_callback.hpp @@ -8,6 +8,8 @@ #include #include +#include +#include #include @@ -341,6 +343,101 @@ namespace hpx std::move(addr), gid, actions::action_priority(), std::forward(cb), std::forward(vs)...); } + + namespace functional + { + template + struct apply_c_p_cb_impl + { + private: + HPX_MOVABLE_BUT_NOT_COPYABLE(apply_c_p_cb_impl) + + public: + typedef util::tuple tuple_type; + + template + apply_c_p_cb_impl(naming::id_type const& contid, + naming::address && addr, naming::id_type const& id, + threads::thread_priority p, Callback && cb, Ts_ &&... vs) + : contid_(contid), addr_(std::move(addr)), id_(id), p_(p), + cb_(std::move(cb)), + args_(std::forward(vs)...) + {} + + apply_c_p_cb_impl(apply_c_p_cb_impl && rhs) + : contid_(std::move(rhs.contid_)), + addr_(std::move(rhs.addr_)), + id_(std::move(rhs.id_)), + p_(std::move(rhs.p_)), + cb_(std::move(rhs.cb_)), + args_(std::move(rhs.args_)) + {} + + apply_c_p_cb_impl& operator=(apply_c_p_cb_impl && rhs) + { + contid_ = std::move(rhs.contid_); + addr_ = std::move(rhs.addr_); + id_ = std::move(rhs.id_); + p_ = std::move(rhs.p_); + cb_ = std::move(rhs.cb_); + args_ = std::move(rhs.args_); + return *this; + } + + void operator()() + { + apply_action( + typename util::detail::make_index_pack< + sizeof...(Ts) + >::type()); + } + + protected: + template + void apply_action(util::detail::pack_c) + { + if (addr_) + { + hpx::apply_c_p_cb( + contid_, std::move(addr_), id_, p_, std::move(cb_), + util::get(std::forward(args_))...); + } + else + { + hpx::apply_c_p_cb( + contid_, id_, p_, std::move(cb_), + util::get(std::forward(args_))...); + } + } + + private: + naming::id_type contid_; + naming::address addr_; + naming::id_type id_; + threads::thread_priority p_; + Callback cb_; + tuple_type args_; + }; + + template + apply_c_p_cb_impl< + Action, typename util::decay::type, + typename util::decay::type... + > + apply_c_p_cb(naming::id_type const& contid, naming::address && addr, + naming::id_type const& id, threads::thread_priority p, + Callback && cb, Ts &&... vs) + { + typedef apply_c_p_cb_impl< + Action, typename util::decay::type, + typename util::decay::type... + > result_type; + + return result_type( + contid, std::move(addr), id, p, std::forward(cb), + std::forward(vs)...); + } + } } #endif diff --git a/src/runtime/agas/stubs/component_namespace_stubs.cpp b/src/runtime/agas/stubs/component_namespace_stubs.cpp index e8d64d686b9f..eb4b8ff6ce4a 100644 --- a/src/runtime/agas/stubs/component_namespace_stubs.cpp +++ b/src/runtime/agas/stubs/component_namespace_stubs.cpp @@ -24,7 +24,7 @@ lcos::future component_namespace::service_async( typedef server_type::service_action action_type; lcos::packaged_action p; - p.apply_p(launch::async, gid, priority, req); + p.apply_p(gid, priority, req); return p.get_future(); } @@ -66,7 +66,7 @@ lcos::future > component_namespace::bulk_service_async( typedef server_type::bulk_service_action action_type; lcos::packaged_action p; - p.apply_p(launch::async, gid, priority, reqs); + p.apply_p(gid, priority, reqs); return p.get_future(); } diff --git a/src/runtime/agas/stubs/locality_namespace_stubs.cpp b/src/runtime/agas/stubs/locality_namespace_stubs.cpp index 9c03cd7ec9bd..b69af826df36 100644 --- a/src/runtime/agas/stubs/locality_namespace_stubs.cpp +++ b/src/runtime/agas/stubs/locality_namespace_stubs.cpp @@ -21,7 +21,7 @@ lcos::future locality_namespace::service_async( typedef server_type::service_action action_type; lcos::packaged_action p; - p.apply_p(launch::async, gid, priority, req); + p.apply_p(gid, priority, req); return p.get_future(); } @@ -79,7 +79,7 @@ lcos::future > locality_namespace::bulk_service_async( typedef server_type::bulk_service_action action_type; lcos::packaged_action p; - p.apply_p(launch::async, gid, priority, reqs); + p.apply_p(gid, priority, reqs); return p.get_future(); } diff --git a/src/runtime/agas/stubs/primary_namespace_stubs.cpp b/src/runtime/agas/stubs/primary_namespace_stubs.cpp index 2341837b8edd..11182f0713ec 100644 --- a/src/runtime/agas/stubs/primary_namespace_stubs.cpp +++ b/src/runtime/agas/stubs/primary_namespace_stubs.cpp @@ -21,7 +21,7 @@ lcos::future primary_namespace::service_async( typedef server_type::service_action action_type; lcos::packaged_action p; - p.apply_p(launch::async, gid, priority, req); + p.apply_p(gid, priority, req); return p.get_future(); } @@ -92,7 +92,7 @@ lcos::future > typedef server_type::bulk_service_action action_type; lcos::packaged_action p; - p.apply_p(launch::async, gid, priority, reqs); + p.apply_p(gid, priority, reqs); return p.get_future(); } diff --git a/src/runtime/agas/stubs/symbol_namespace_stubs.cpp b/src/runtime/agas/stubs/symbol_namespace_stubs.cpp index 3c4ef08eac80..98ac5b90bdb0 100644 --- a/src/runtime/agas/stubs/symbol_namespace_stubs.cpp +++ b/src/runtime/agas/stubs/symbol_namespace_stubs.cpp @@ -21,7 +21,7 @@ lcos::future symbol_namespace::service_async( typedef server_type::service_action action_type; lcos::packaged_action p; - p.apply_p(launch::async, gid, priority, req); + p.apply_p(gid, priority, req); return p.get_future(); } @@ -62,7 +62,7 @@ lcos::future > symbol_namespace::bulk_service_async( typedef server_type::bulk_service_action action_type; lcos::packaged_action p; - p.apply_p(launch::async, gid, priority, reqs); + p.apply_p(gid, priority, reqs); return p.get_future(); } diff --git a/src/runtime/components/stubs/runtime_support_stubs.cpp b/src/runtime/components/stubs/runtime_support_stubs.cpp index 5f68914f0cb1..49d30166c9d8 100644 --- a/src/runtime/components/stubs/runtime_support_stubs.cpp +++ b/src/runtime/components/stubs/runtime_support_stubs.cpp @@ -144,7 +144,7 @@ namespace hpx { namespace components { namespace stubs naming::id_type(gid, naming::id_type::unmanaged)); lcos::packaged_action p; - p.apply(launch::async, id, g, gid, count); + p.apply(id, g, gid, count); p.get_future().get(); } } diff --git a/tests/regressions/actions/CMakeLists.txt b/tests/regressions/actions/CMakeLists.txt index 6ed5972cf0c1..60c7b53f6146 100644 --- a/tests/regressions/actions/CMakeLists.txt +++ b/tests/regressions/actions/CMakeLists.txt @@ -6,6 +6,7 @@ add_subdirectory(components) set(tests + async_deferred_1523 make_continuation_1615 plain_action_1330 plain_action_1550 diff --git a/tests/regressions/actions/async_deferred_1523.cpp b/tests/regressions/actions/async_deferred_1523.cpp new file mode 100644 index 000000000000..b1cb9a8327cb --- /dev/null +++ b/tests/regressions/actions/async_deferred_1523.cpp @@ -0,0 +1,54 @@ +// Copyright (c) 2015 Agustin Berge +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// This demonstrates the issue as reported by #1523: Remote async with deferred +// launch policy never executes + +#include +#include +#include +#include + +bool null_action_executed = false; +bool int_action_executed = false; + +void null_thread() +{ + null_action_executed = true; +} +HPX_PLAIN_ACTION(null_thread, null_action); + +int int_thread() +{ + int_action_executed = true; + return 42; +} +HPX_PLAIN_ACTION(int_thread, int_action); + +hpx::id_type get_locality() +{ + return hpx::find_here(); +} +HPX_PLAIN_ACTION(get_locality, get_locality_action); + +int main() +{ + hpx::async(hpx::launch::deferred, hpx::find_here()).get(); + HPX_TEST(null_action_executed); + + int result = hpx::async( + hpx::launch::deferred, hpx::find_here()).get(); + HPX_TEST(int_action_executed); + HPX_TEST_EQ(result, 42); + + for (hpx::id_type const& loc : hpx::find_all_localities()) + { + hpx::id_type id = hpx::async( + hpx::launch::deferred, loc).get(); + HPX_TEST_EQ(loc, id); + } + + return 0; +}