Skip to content

Commit

Permalink
Merge pull request #1611 from STEllAR-GROUP/limit_dataflow_recursion
Browse files Browse the repository at this point in the history
Limit recursion-depth in dataflow to a configurable constant
  • Loading branch information
hkaiser committed Jun 24, 2015
2 parents d21c8d7 + 9f7bcf4 commit 9fa57f8
Show file tree
Hide file tree
Showing 17 changed files with 582 additions and 248 deletions.
7 changes: 7 additions & 0 deletions hpx/config.hpp
Expand Up @@ -434,6 +434,13 @@
# define HPX_HUGE_STACK_SIZE 0x2000000 // 32MByte
#endif

///////////////////////////////////////////////////////////////////////////////
// This limits how deep the internal recursion of future continuations will go
// before a new operation is re-spawned.
#if !defined(HPX_CONTINUATION_MAX_RECURSION_DEPTH)
#define HPX_CONTINUATION_MAX_RECURSION_DEPTH 20
#endif

///////////////////////////////////////////////////////////////////////////////
// Older Boost versions do not have BOOST_NOEXCEPT defined
#if !defined(BOOST_NOEXCEPT)
Expand Down
72 changes: 65 additions & 7 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -17,6 +17,7 @@
#include <hpx/util/move.hpp>
#include <hpx/util/unused.hpp>
#include <hpx/util/unique_function.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/detail/value_or_error.hpp>

#include <boost/intrusive_ptr.hpp>
Expand Down Expand Up @@ -143,6 +144,22 @@ namespace detail
return result_type(std::forward<F1>(f1), std::forward<F2>(f2));
}

///////////////////////////////////////////////////////////////////////////
struct handle_continuation_recursion_count
{
handle_continuation_recursion_count()
: count_(threads::get_continuation_recursion_count())
{
++count_;
}
~handle_continuation_recursion_count()
{
--count_;
}

std::size_t& count_;
};

///////////////////////////////////////////////////////////////////////////
template <typename Result>
struct future_data : future_data_refcnt_base
Expand Down Expand Up @@ -219,6 +236,47 @@ namespace detail
return data_;
}

// deferred execution of a given continuation
void run_on_completed(completed_callback_type const& on_completed)
{
try {
on_completed();
}
catch (hpx::exception const&) {
set_result(boost::current_exception());
}
}

// make sure continuation invocation does not recurse deeper than
// allowed
void handle_on_completed(completed_callback_type && on_completed)
{
handle_continuation_recursion_count cnt;

if (cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH)
{
// re-spawn continuation on a new thread
boost::intrusive_ptr<future_data> this_(this);

error_code ec;
threads::register_thread_nullary(
util::deferred_call(&future_data::run_on_completed,
std::move(this_), std::move(on_completed)),
"future_data<Result>::set_result", threads::pending,
true, threads::thread_priority_normal, std::size_t(-1),
threads::thread_stacksize_default, ec);

if (ec) {
// thread creation failed, report error to the future
this->set_exception(hpx::detail::access_exception(ec));
}
return;
}

// directly execute continuation on this thread
on_completed();
}

/// Set the result of the requested action.
template <typename Target>
void set_result(Target && data, error_code& ec = throws)
Expand Down Expand Up @@ -249,7 +307,7 @@ namespace detail

// invoke the callback (continuation) function
if (on_completed)
on_completed();
handle_on_completed(std::move(on_completed));
}

// helper functions for setting data (if successful) or the error (if
Expand All @@ -259,7 +317,7 @@ namespace detail
{
// set the received result, reset error status
try {
typedef typename util::decay<T>::type naked_type;
typedef typename util::decay<T>::type naked_type;

typedef traits::get_remote_result<
result_type, naked_type
Expand Down Expand Up @@ -323,7 +381,7 @@ namespace detail
// invoke the callback (continuation) function right away
l.unlock();

data_sink();
handle_on_completed(std::move(data_sink));
}
else {
// store a combined callback wrapping the old and the new one
Expand Down Expand Up @@ -431,7 +489,7 @@ namespace detail
error_code ec;
threads::thread_id_type id = threads::register_thread_nullary(
util::bind(util::one_shot(&timed_future_data::set_data),
this_, std::forward<Result_>(init)),
std::move(this_), std::forward<Result_>(init)),
"timed_future_data<Result>::timed_future_data",
threads::suspended, true, threads::thread_priority_normal,
std::size_t(-1), threads::thread_stacksize_default, ec);
Expand Down Expand Up @@ -571,20 +629,20 @@ namespace detail
hpx::threads::get_self_id());

if (sched_) {
sched_->add(util::bind(&task_base::run_impl, this_),
sched_->add(util::bind(&task_base::run_impl, std::move(this_)),
desc ? desc : "task_base::apply", threads::pending, false,
stacksize, ec);
}
else if (policy == launch::fork) {
threads::register_thread_plain(
util::bind(&task_base::run_impl, this_),
util::bind(&task_base::run_impl, std::move(this_)),
desc ? desc : "task_base::apply", threads::pending, false,
threads::thread_priority_boost, get_worker_thread_num(),
stacksize, ec);
}
else {
threads::register_thread_plain(
util::bind(&task_base::run_impl, this_),
util::bind(&task_base::run_impl, std::move(this_)),
desc ? desc : "task_base::apply", threads::pending, false,
priority, std::size_t(-1), stacksize, ec);
}
Expand Down

0 comments on commit 9fa57f8

Please sign in to comment.