diff --git a/hpx/config.hpp b/hpx/config.hpp index 5f4cd0be3bc4..e6c08d9d07ff 100644 --- a/hpx/config.hpp +++ b/hpx/config.hpp @@ -438,7 +438,7 @@ // This limits how deep the internal recursion of future continuations will go // before a new operation is re-spawned. #if !defined(HPX_CONTINUATION_MAX_RECURSION_DEPTH) -#define HPX_CONTINUATION_MAX_RECURSION_DEPTH 50 +#define HPX_CONTINUATION_MAX_RECURSION_DEPTH 12 #endif /////////////////////////////////////////////////////////////////////////////// diff --git a/hpx/lcos/detail/future_data.hpp b/hpx/lcos/detail/future_data.hpp index 3b3b06a89eb6..edf63f5c9add 100644 --- a/hpx/lcos/detail/future_data.hpp +++ b/hpx/lcos/detail/future_data.hpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -143,6 +144,22 @@ namespace detail return result_type(std::forward(f1), std::forward(f2)); } + /////////////////////////////////////////////////////////////////////////// + struct handle_continuation_recursion_count + { + handle_continuation_recursion_count() + : count_(threads::get_continuation_recursion_count()) + { + ++count_; + } + ~handle_continuation_recursion_count() + { + --count_; + } + + std::size_t& count_; + }; + /////////////////////////////////////////////////////////////////////////// template struct future_data : future_data_refcnt_base @@ -219,6 +236,17 @@ namespace detail return data_; } + // deferred execution of a given continuation + void run_on_completed(completed_callback_type const& on_completed) + { + try { + on_completed(); + } + catch (hpx::exception const&) { + set_result(boost::current_exception()); + } + } + /// Set the result of the requested action. template void set_result(Target && data, error_code& ec = throws) @@ -249,7 +277,32 @@ namespace detail // invoke the callback (continuation) function if (on_completed) + { + handle_continuation_recursion_count cnt; + + if (cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH) + { + // re-spawn continuation on a new thread + boost::intrusive_ptr this_(this); + + error_code ec; + threads::register_thread_nullary( + util::deferred_call(&future_data::run_on_completed, + std::move(this_), std::move(on_completed)), + "future_data::set_result", threads::pending, + true, threads::thread_priority_normal, std::size_t(-1), + threads::thread_stacksize_default, ec); + + if (ec) { + // thread creation failed, report error to the future + this->set_exception(hpx::detail::access_exception(ec)); + } + return; + } + + // directly execute continuation on this thread on_completed(); + } } // helper functions for setting data (if successful) or the error (if @@ -259,7 +312,7 @@ namespace detail { // set the received result, reset error status try { - typedef typename util::decay::type naked_type; + typedef typename util::decay::type naked_type; typedef traits::get_remote_result< result_type, naked_type @@ -431,7 +484,7 @@ namespace detail error_code ec; threads::thread_id_type id = threads::register_thread_nullary( util::bind(util::one_shot(&timed_future_data::set_data), - this_, std::forward(init)), + std::move(this_), std::forward(init)), "timed_future_data::timed_future_data", threads::suspended, true, threads::thread_priority_normal, std::size_t(-1), threads::thread_stacksize_default, ec); @@ -571,20 +624,20 @@ namespace detail hpx::threads::get_self_id()); if (sched_) { - sched_->add(util::bind(&task_base::run_impl, this_), + sched_->add(util::bind(&task_base::run_impl, std::move(this_)), desc ? desc : "task_base::apply", threads::pending, false, stacksize, ec); } else if (policy == launch::fork) { threads::register_thread_plain( - util::bind(&task_base::run_impl, this_), + util::bind(&task_base::run_impl, std::move(this_)), desc ? desc : "task_base::apply", threads::pending, false, threads::thread_priority_boost, get_worker_thread_num(), stacksize, ec); } else { threads::register_thread_plain( - util::bind(&task_base::run_impl, this_), + util::bind(&task_base::run_impl, std::move(this_)), desc ? desc : "task_base::apply", threads::pending, false, priority, std::size_t(-1), stacksize, ec); } diff --git a/hpx/runtime/threads/thread_helpers.hpp b/hpx/runtime/threads/thread_helpers.hpp index 2259f55e3311..2e3a1345eb26 100644 --- a/hpx/runtime/threads/thread_helpers.hpp +++ b/hpx/runtime/threads/thread_helpers.hpp @@ -399,6 +399,8 @@ namespace hpx { namespace threads std::size_t data, error_code& ec = throws); #endif + HPX_API_EXPORT std::size_t& get_continuation_recursion_count(); + /// Returns a non-null pointer to the executor which was used to create /// the given thread. /// diff --git a/hpx/util/coroutine/detail/context_base.hpp b/hpx/util/coroutine/detail/context_base.hpp index 31c926e8b4bd..1f53b4f46d31 100644 --- a/hpx/util/coroutine/detail/context_base.hpp +++ b/hpx/util/coroutine/detail/context_base.hpp @@ -115,7 +115,8 @@ namespace hpx { namespace util { namespace coroutines { namespace detail m_thread_data(0), #endif m_type_info(), - m_thread_id(id) + m_thread_id(id), + continuation_recursion_count_(0) {} friend void intrusive_ptr_add_ref(type * ctx) @@ -474,6 +475,11 @@ namespace hpx { namespace util { namespace coroutines { namespace detail } #endif + std::size_t& get_continuation_recursion_count() + { + return continuation_recursion_count_; + } + static boost::uint64_t get_allocation_count_all(bool reset) { boost::uint64_t count = 0; @@ -614,6 +620,8 @@ namespace hpx { namespace util { namespace coroutines { namespace detail // This is used to generate a meaningful exception trace. boost::exception_ptr m_type_info; thread_id_repr_type m_thread_id; + + std::size_t continuation_recursion_count_; }; // initialize static allocation counter diff --git a/hpx/util/coroutine/detail/self.hpp b/hpx/util/coroutine/detail/self.hpp index c8c5c9d63ed6..279a33bcb861 100644 --- a/hpx/util/coroutine/detail/self.hpp +++ b/hpx/util/coroutine/detail/self.hpp @@ -221,6 +221,12 @@ namespace hpx { namespace util { namespace coroutines { namespace detail } #endif + std::size_t& get_continuation_recursion_count() + { + HPX_ASSERT(m_pimpl); + return m_pimpl->get_continuation_recursion_count(); + } + #if defined(HPX_HAVE_GENERIC_CONTEXT_COROUTINES) yield_result_type yield_impl( diff --git a/src/runtime/threads/thread_helpers.cpp b/src/runtime/threads/thread_helpers.cpp index 7c0617880499..73633720173b 100644 --- a/src/runtime/threads/thread_helpers.cpp +++ b/src/runtime/threads/thread_helpers.cpp @@ -240,6 +240,12 @@ namespace hpx { namespace threads } #endif + //////////////////////////////////////////////////////////////////////////// + std::size_t& get_continuation_recursion_count() + { + return get_self().get_continuation_recursion_count(); + } + /////////////////////////////////////////////////////////////////////////// void run_thread_exit_callbacks(thread_id_type const& id, error_code& ec) { diff --git a/tests/regressions/lcos/CMakeLists.txt b/tests/regressions/lcos/CMakeLists.txt index 726293c5b2fc..ac1d843eb36f 100644 --- a/tests/regressions/lcos/CMakeLists.txt +++ b/tests/regressions/lcos/CMakeLists.txt @@ -38,6 +38,7 @@ if(HPX_WITH_CXX11_LAMBDAS) set(tests ${tests} dataflow_const_functor_773 dataflow_future_swap2 + dataflow_recursion_1613 dataflow_using_774 ) endif() diff --git a/tests/regressions/lcos/dataflow_recursion_1613.cpp b/tests/regressions/lcos/dataflow_recursion_1613.cpp new file mode 100644 index 000000000000..8ef2f784d526 --- /dev/null +++ b/tests/regressions/lcos/dataflow_recursion_1613.cpp @@ -0,0 +1,49 @@ +// Copyright (c) 2015 Hartmut Kaiser +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// This test case demonstrates the issue described in #1613: Dataflow causes +// stack overflow + +#include +#include +#include +#include + +#include + +#define NUM_FUTURES std::size_t(1000) + +int main() +{ + hpx::lcos::local::promise first_promise; + + std::vector > results; + results.reserve(NUM_FUTURES+1); + + boost::atomic executed_dataflow(0); + + results.push_back(first_promise.get_future()); + for (std::size_t i = 0; i != NUM_FUTURES; ++i) + { + results.push_back( + hpx::lcos::local::dataflow( + hpx::launch::sync, + [&](hpx::shared_future &&) + { + ++executed_dataflow; + }, + results.back() + ) + ); + } + + // make futures ready in backwards sequence + hpx::apply([&first_promise]() { first_promise.set_value(); }); + + hpx::wait_all(results); + HPX_TEST_EQ(executed_dataflow.load(), NUM_FUTURES); + + return hpx::util::report_errors(); +}