diff --git a/hpx/lcos/detail/future_data.hpp b/hpx/lcos/detail/future_data.hpp index 8b9ac06b4873..7d7074569a9f 100644 --- a/hpx/lcos/detail/future_data.hpp +++ b/hpx/lcos/detail/future_data.hpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -24,7 +23,6 @@ #include #include #include -#include #include #include #include @@ -34,7 +32,6 @@ #include #include #include -#include #include #include #include @@ -264,16 +261,12 @@ namespace detail return result_type(std::forward(f1), std::forward(f2)); } - /////////////////////////////////////////////////////////////////////////// - HPX_EXPORT bool run_on_completed_on_new_thread( - util::unique_function_nonser && f, error_code& ec); - /////////////////////////////////////////////////////////////////////////// template struct future_data_base; template <> - struct future_data_base + struct HPX_EXPORT future_data_base : future_data_refcnt_base { future_data_base() @@ -286,30 +279,10 @@ namespace detail typedef lcos::local::spinlock mutex_type; typedef util::unused_type result_type; + typedef util::unique_function_nonser completed_callback_type; typedef future_data_refcnt_base::init_no_addref init_no_addref; - virtual ~future_data_base() noexcept {} - virtual void execute_deferred(error_code& = throws) = 0; - virtual bool cancelable() const = 0; - virtual void cancel() = 0; - virtual result_type* get_result_void(error_code& = throws) = 0; - virtual void wait(error_code& = throws) = 0; - virtual future_status wait_until(util::steady_clock::time_point const&, - error_code& = throws) = 0; - virtual std::exception_ptr get_exception_ptr() const = 0; - - virtual std::string const& get_registered_name() const - { - HPX_THROW_EXCEPTION(invalid_status, - "future_data_base::get_registered_name", - "this future does not support name registration"); - } - virtual void register_as(std::string const& name, bool manage_lifetime) - { - HPX_THROW_EXCEPTION(invalid_status, - "future_data_base::set_registered_name", - "this future does not support name registration"); - } + virtual ~future_data_base(); enum state { @@ -346,9 +319,66 @@ namespace detail return state_ == exception; } + virtual void execute_deferred(error_code& /*ec*/ = throws) {} + + // cancellation is disabled by default + virtual bool cancelable() const + { + return false; + } + virtual void cancel() + { + HPX_THROW_EXCEPTION(future_does_not_support_cancellation, + "future_data_base::cancel", + "this future does not support cancellation"); + } + + result_type* get_result_void(void const* storage, error_code& ec = throws); + virtual result_type* get_result_void(error_code& ec = throws) = 0; + + virtual void set_exception(std::exception_ptr data, + error_code& ec = throws) = 0; + + // continuation support + + // deferred execution of a given continuation + bool run_on_completed(completed_callback_type && on_completed, + std::exception_ptr& ptr); + + // make sure continuation invocation does not recurse deeper than + // allowed + void handle_on_completed(completed_callback_type && on_completed); + + /// Set the callback which needs to be invoked when the future becomes + /// ready. If the future is ready the function will be invoked + /// immediately. + void set_on_completed(completed_callback_type data_sink); + + virtual void wait(error_code& ec = throws); + + virtual future_status wait_until( + util::steady_clock::time_point const& abs_time, error_code& ec = throws); + + virtual std::exception_ptr get_exception_ptr() const = 0; + + virtual std::string const& get_registered_name() const + { + HPX_THROW_EXCEPTION(invalid_status, + "future_data_base::get_registered_name", + "this future does not support name registration"); + } + virtual void register_as(std::string const& name, bool manage_lifetime) + { + HPX_THROW_EXCEPTION(invalid_status, + "future_data_base::set_registered_name", + "this future does not support name registration"); + } + protected: - mutable mutex_type mtx_; - state state_; // current state + mutable mutex_type mtx_; + state state_; // current state + completed_callback_type on_completed_; + local::detail::condition_variable cond_; // threads waiting in read }; template @@ -359,20 +389,19 @@ namespace detail typedef typename future_data_result::type result_type; typedef util::unique_function_nonser completed_callback_type; + typedef future_data_base base_type; typedef lcos::local::spinlock mutex_type; - typedef typename future_data_base< - traits::detail::future_data_void - >::init_no_addref init_no_addref; + typedef typename base_type::init_no_addref init_no_addref; future_data_base() = default; future_data_base(init_no_addref no_addref) - : future_data_base(no_addref) + : base_type(no_addref) {} template future_data_base(Target && data, init_no_addref no_addref) - : future_data_base(no_addref) + : base_type(no_addref) { result_type* value_ptr = reinterpret_cast(&storage_); @@ -382,7 +411,7 @@ namespace detail } future_data_base(std::exception_ptr const& e, init_no_addref no_addref) - : future_data_base(no_addref) + : base_type(no_addref) { std::exception_ptr* exception_ptr = reinterpret_cast(&storage_); @@ -390,7 +419,7 @@ namespace detail state_ = exception; } future_data_base(std::exception_ptr && e, init_no_addref no_addref) - : future_data_base(no_addref) + : base_type(no_addref) { std::exception_ptr* exception_ptr = reinterpret_cast(&storage_); @@ -403,20 +432,6 @@ namespace detail reset(); } - virtual void execute_deferred(error_code& ec = throws) {} - - // cancellation is disabled by default - virtual bool cancelable() const - { - return false; - } - virtual void cancel() - { - HPX_THROW_EXCEPTION(future_does_not_support_cancellation, - "future_data_base::cancel", - "this future does not support cancellation"); - } - /// Get the result of the requested action. This call blocks (yields /// control) if the result is not ready. As soon as the result has been /// returned and the waiting thread has been re-scheduled by the thread @@ -435,162 +450,21 @@ namespace detail /// error description if &ec == &throws. virtual result_type* get_result(error_code& ec = throws) { - // yields control if needed - wait(ec); - if (ec) return nullptr; - - // No locking is required. Once a future has been made ready, which - // is a postcondition of wait, either: - // - // - there is only one writer (future), or - // - there are multiple readers only (shared_future, lock hurts - // concurrency) - - if (state_ == empty) { - // the value has already been moved out of this future - HPX_THROWS_IF(ec, no_state, - "future_data_base::get_result", - "this future has no valid shared state"); + if (!get_result_void(ec)) return nullptr; - } - - // the thread has been re-activated by one of the actions - // supported by this promise (see promise::set_event - // and promise::set_exception). - if (state_ == exception) - { - std::exception_ptr* exception_ptr = - reinterpret_cast(&storage_); - // an error has been reported in the meantime, throw or set - // the error code - if (&ec == &throws) { - std::rethrow_exception(*exception_ptr); - // never reached - } - else { - ec = make_error_code(*exception_ptr); - } - return nullptr; - } return reinterpret_cast(&storage_); } virtual util::unused_type* get_result_void(error_code& ec = throws) { - // yields control if needed - wait(ec); - if (ec) return nullptr; - - // No locking is required. Once a future has been made ready, which - // is a postcondition of wait, either: - // - // - there is only one writer (future), or - // - there are multiple readers only (shared_future, lock hurts - // concurrency) - - if (state_ == empty) { - // the value has already been moved out of this future - HPX_THROWS_IF(ec, no_state, - "future_data_base::get_result", - "this future has no valid shared state"); - return nullptr; - } - - // the thread has been re-activated by one of the actions - // supported by this promise (see promise::set_event - // and promise::set_exception). - if (state_ == exception) - { - std::exception_ptr* exception_ptr = - reinterpret_cast(&storage_); - // an error has been reported in the meantime, throw or set - // the error code - if (&ec == &throws) { - std::rethrow_exception(*exception_ptr); - // never reached - } - else { - ec = make_error_code(*exception_ptr); - } - return nullptr; - } - - static util::unused_type unused_; - return &unused_; - } - - // deferred execution of a given continuation - bool run_on_completed(completed_callback_type && on_completed, - std::exception_ptr& ptr) - { - try { - hpx::util::annotate_function annotate(on_completed); - on_completed(); - } - catch (...) { - ptr = std::current_exception(); - return false; - } - return true; - } - - // make sure continuation invocation does not recurse deeper than - // allowed - void handle_on_completed(completed_callback_type && on_completed) - { - // We need to run the completion on a new thread if we are on a - // non HPX thread. - bool recurse_asynchronously = hpx::threads::get_self_ptr() == nullptr; -#if defined(HPX_HAVE_THREADS_GET_STACK_POINTER) - recurse_asynchronously = - !this_thread::has_sufficient_stack_space(); -#else - handle_continuation_recursion_count cnt; - recurse_asynchronously = recurse_asynchronously || - cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH; -#endif - if (!recurse_asynchronously) - { - // directly execute continuation on this thread - std::exception_ptr ptr; - if (!run_on_completed(std::move(on_completed), ptr)) - { - error_code ec(lightweight); - set_exception(hpx::detail::access_exception(ec)); - } - } - else - { - // re-spawn continuation on a new thread - boost::intrusive_ptr this_(this); - - error_code ec(lightweight); - std::exception_ptr ptr; - if (!run_on_completed_on_new_thread( - util::deferred_call( - &future_data_base::run_on_completed, - std::move(this_), std::move(on_completed), - std::ref(ptr)), - ec)) - { - // thread creation went wrong - if (ec) { - set_exception(hpx::detail::access_exception(ec)); - return; - } - - // re-throw exception in this context - HPX_ASSERT(ptr); // exception should have been set - std::rethrow_exception(ptr); - } - } + return base_type::get_result_void(&storage_, ec); } /// Set the result of the requested action. template void set_value(Target && data, error_code& ec = throws) { - std::unique_lock l(this->mtx_); + std::unique_lock l(mtx_); // check whether the data has already been set if (is_ready_locked(l)) { @@ -601,7 +475,7 @@ namespace detail return; } - completed_callback_type on_completed = std::move(this->on_completed_); + completed_callback_type on_completed = std::move(on_completed_); // set the data result_type* value_ptr = @@ -621,7 +495,7 @@ namespace detail // re-lock the mutex while exiting from condition_variable::wait while (cond_.notify_one(std::move(l), threads::thread_priority_boost, ec)) { - l = std::unique_lock(this->mtx_); + l = std::unique_lock(mtx_); } // Note: cv.notify_one() above 'consumes' the lock 'l' and leaves @@ -632,10 +506,9 @@ namespace detail handle_on_completed(std::move(on_completed)); } - template - void set_exception(Target && data, error_code& ec = throws) + void set_exception(std::exception_ptr data, error_code& ec = throws) { - std::unique_lock l(this->mtx_); + std::unique_lock l(mtx_); // check whether the data has already been set if (is_ready_locked(l)) { @@ -646,13 +519,12 @@ namespace detail return; } - completed_callback_type on_completed = std::move(this->on_completed_); + completed_callback_type on_completed = std::move(on_completed_); // set the data std::exception_ptr* exception_ptr = reinterpret_cast(&storage_); - ::new ((void*)exception_ptr) std::exception_ptr( - std::forward(data)); + ::new ((void*)exception_ptr) std::exception_ptr(std::move(data)); state_ = exception; // handle all threads waiting for the future to become ready @@ -666,7 +538,7 @@ namespace detail // re-lock the mutex while exiting from condition_variable::wait while (cond_.notify_one(std::move(l), threads::thread_priority_boost, ec)) { - l = std::unique_lock(this->mtx_); + l = std::unique_lock(mtx_); } // Note: cv.notify_one() above 'consumes' the lock 'l' and leaves @@ -742,74 +614,6 @@ namespace detail on_completed_ = completed_callback_type(); } - // continuation support - - /// Set the callback which needs to be invoked when the future becomes - /// ready. If the future is ready the function will be invoked - /// immediately. - void set_on_completed(completed_callback_type data_sink) - { - if (!data_sink) return; - - std::unique_lock l(this->mtx_); - - if (is_ready_locked(l)) { - - HPX_ASSERT(!on_completed_); - - // invoke the callback (continuation) function right away - l.unlock(); - - handle_on_completed(std::move(data_sink)); - } - else { - // store a combined callback wrapping the old and the new one - // make sure continuations are evaluated in the order they are - // attached - this->on_completed_ = compose_cb( - std::move(on_completed_), std::move(data_sink)); - } - } - - virtual void wait(error_code& ec = throws) - { - std::unique_lock l(mtx_); - - // block if this entry is empty - if (state_ == empty) { - cond_.wait(l, "future_data_base::wait", ec); - if (ec) return; - } - - if (&ec != &throws) - ec = make_success_code(); - } - - virtual future_status - wait_until(util::steady_clock::time_point const& abs_time, - error_code& ec = throws) - { - std::unique_lock l(mtx_); - - // block if this entry is empty - if (state_ == empty) { - threads::thread_state_ex_enum const reason = - cond_.wait_until(l, abs_time, - "future_data_base::wait_until", ec); - if (ec) return future_status::uninitialized; - - if (reason == threads::wait_timeout) - return future_status::timeout; - - return future_status::ready; - } - - if (&ec != &throws) - ec = make_success_code(); - - return future_status::ready; //-V110 - } - std::exception_ptr get_exception_ptr() const { HPX_ASSERT(state_ == exception); @@ -817,10 +621,12 @@ namespace detail } protected: - completed_callback_type on_completed_; + using base_type::mtx_; + using base_type::state_; + using base_type::on_completed_; private: - local::detail::condition_variable cond_; // threads waiting in read + using base_type::cond_; typename future_data_storage::type storage_; }; @@ -1083,9 +889,9 @@ namespace detail } void set_exception( - std::exception_ptr const& e, error_code& ec = throws) + std::exception_ptr e, error_code& ec = throws) { - this->future_data::set_exception(e, ec); + this->future_data::set_exception(std::move(e), ec); } virtual void do_run() diff --git a/hpx/lcos/local/futures_factory.hpp b/hpx/lcos/local/futures_factory.hpp index 8cdbd5edb8eb..60f8b709700c 100644 --- a/hpx/lcos/local/futures_factory.hpp +++ b/hpx/lcos/local/futures_factory.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include diff --git a/hpx/lcos/local/promise.hpp b/hpx/lcos/local/promise.hpp index 306e6083a6dc..82843a36bb03 100644 --- a/hpx/lcos/local/promise.hpp +++ b/hpx/lcos/local/promise.hpp @@ -316,9 +316,9 @@ namespace hpx { namespace lcos { namespace local // - promise_already_satisfied if its shared state already has a // stored value or exception. // - no_state if *this has no shared state. - void set_exception(std::exception_ptr const& e, error_code& ec = throws) + void set_exception(std::exception_ptr e, error_code& ec = throws) { - base_type::set_exception(e, ec); + base_type::set_exception(std::move(e), ec); } }; @@ -412,9 +412,9 @@ namespace hpx { namespace lcos { namespace local // - promise_already_satisfied if its shared state already has a // stored value or exception. // - no_state if *this has no shared state. - void set_exception(std::exception_ptr const& e, error_code& ec = throws) + void set_exception(std::exception_ptr e, error_code& ec = throws) { - base_type::set_exception(e, ec); + base_type::set_exception(std::move(e), ec); } }; @@ -510,9 +510,9 @@ namespace hpx { namespace lcos { namespace local // - promise_already_satisfied if its shared state already has a // stored value or exception. // - no_state if *this has no shared state. - void set_exception(std::exception_ptr const& e, error_code& ec = throws) + void set_exception(std::exception_ptr e, error_code& ec = throws) { - base_type::set_exception(e, ec); + base_type::set_exception(std::move(e), ec); } }; diff --git a/src/lcos/detail/future_data.cpp b/src/lcos/detail/future_data.cpp index 40b1ae0b3d14..8f4ab0efcd1f 100644 --- a/src/lcos/detail/future_data.cpp +++ b/src/lcos/detail/future_data.cpp @@ -3,21 +3,30 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +#include + #include #include -#include +#include +#include +#include #include +#include #include -#include #include #include #include +#include + +#include +#include +#include #include namespace hpx { namespace lcos { namespace detail { - bool run_on_completed_on_new_thread( + static bool run_on_completed_on_new_thread( util::unique_function_nonser && f, error_code& ec) { lcos::local::futures_factory p(std::move(f)); @@ -49,4 +58,190 @@ namespace hpx { namespace lcos { namespace detail return true; } } + + /////////////////////////////////////////////////////////////////////////// + future_data_base:: + ~future_data_base() + {} + + util::unused_type* future_data_base:: + get_result_void(void const* storage, error_code& ec) + { + // yields control if needed + wait(ec); + if (ec) return nullptr; + + // No locking is required. Once a future has been made ready, which + // is a postcondition of wait, either: + // + // - there is only one writer (future), or + // - there are multiple readers only (shared_future, lock hurts + // concurrency) + + if (state_ == empty) { + // the value has already been moved out of this future + HPX_THROWS_IF(ec, no_state, + "future_data_base::get_result", + "this future has no valid shared state"); + return nullptr; + } + + // the thread has been re-activated by one of the actions + // supported by this promise (see promise::set_event + // and promise::set_exception). + if (state_ == exception) + { + std::exception_ptr const* exception_ptr = + static_cast(storage); + // an error has been reported in the meantime, throw or set + // the error code + if (&ec == &throws) { + std::rethrow_exception(*exception_ptr); + // never reached + } + else { + ec = make_error_code(*exception_ptr); + } + return nullptr; + } + + static util::unused_type unused_; + return &unused_; + } + + // deferred execution of a given continuation + bool future_data_base:: + run_on_completed(completed_callback_type && on_completed, + std::exception_ptr& ptr) + { + try { + hpx::util::annotate_function annotate(on_completed); + on_completed(); + } + catch (...) { + ptr = std::current_exception(); + return false; + } + return true; + } + + // make sure continuation invocation does not recurse deeper than + // allowed + void future_data_base:: + handle_on_completed(completed_callback_type && on_completed) + { + // We need to run the completion on a new thread if we are on a + // non HPX thread. + bool recurse_asynchronously = hpx::threads::get_self_ptr() == nullptr; +#if defined(HPX_HAVE_THREADS_GET_STACK_POINTER) + recurse_asynchronously = + !this_thread::has_sufficient_stack_space(); +#else + handle_continuation_recursion_count cnt; + recurse_asynchronously = recurse_asynchronously || + cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH; +#endif + if (!recurse_asynchronously) + { + // directly execute continuation on this thread + std::exception_ptr ptr; + if (!run_on_completed(std::move(on_completed), ptr)) + { + error_code ec(lightweight); + set_exception(hpx::detail::access_exception(ec)); + } + } + else + { + // re-spawn continuation on a new thread + boost::intrusive_ptr this_(this); + + error_code ec(lightweight); + std::exception_ptr ptr; + if (!run_on_completed_on_new_thread( + util::deferred_call( + &future_data_base::run_on_completed, + std::move(this_), std::move(on_completed), + std::ref(ptr)), + ec)) + { + // thread creation went wrong + if (ec) { + set_exception(hpx::detail::access_exception(ec)); + return; + } + + // re-throw exception in this context + HPX_ASSERT(ptr); // exception should have been set + std::rethrow_exception(ptr); + } + } + } + + /// Set the callback which needs to be invoked when the future becomes + /// ready. If the future is ready the function will be invoked + /// immediately. + void future_data_base:: + set_on_completed(completed_callback_type data_sink) + { + if (!data_sink) return; + + std::unique_lock l(mtx_); + + if (is_ready_locked(l)) { + + HPX_ASSERT(!on_completed_); + + // invoke the callback (continuation) function right away + l.unlock(); + + handle_on_completed(std::move(data_sink)); + } + else { + // store a combined callback wrapping the old and the new one + // make sure continuations are evaluated in the order they are + // attached + on_completed_ = compose_cb( + std::move(on_completed_), std::move(data_sink)); + } + } + + void future_data_base:: + wait(error_code& ec) + { + std::unique_lock l(mtx_); + + // block if this entry is empty + if (state_ == empty) { + cond_.wait(l, "future_data_base::wait", ec); + if (ec) return; + } + + if (&ec != &throws) + ec = make_success_code(); + } + + future_status future_data_base:: + wait_until(util::steady_clock::time_point const& abs_time, error_code& ec) + { + std::unique_lock l(mtx_); + + // block if this entry is empty + if (state_ == empty) { + threads::thread_state_ex_enum const reason = + cond_.wait_until(l, abs_time, + "future_data_base::wait_until", ec); + if (ec) return future_status::uninitialized; + + if (reason == threads::wait_timeout) + return future_status::timeout; + + return future_status::ready; + } + + if (&ec != &throws) + ec = make_success_code(); + + return future_status::ready; //-V110 + } }}}