Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit recursion-depth in dataflow to a configurable constant #1611

Merged
merged 8 commits into from Jun 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions hpx/config.hpp
Expand Up @@ -434,6 +434,13 @@
# define HPX_HUGE_STACK_SIZE 0x2000000 // 32MByte
#endif

///////////////////////////////////////////////////////////////////////////////
// This limits how deep the internal recursion of future continuations will go
// before a new operation is re-spawned.
#if !defined(HPX_CONTINUATION_MAX_RECURSION_DEPTH)
#define HPX_CONTINUATION_MAX_RECURSION_DEPTH 20
#endif

///////////////////////////////////////////////////////////////////////////////
// Older Boost versions do not have BOOST_NOEXCEPT defined
#if !defined(BOOST_NOEXCEPT)
Expand Down
72 changes: 65 additions & 7 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -17,6 +17,7 @@
#include <hpx/util/move.hpp>
#include <hpx/util/unused.hpp>
#include <hpx/util/unique_function.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/detail/value_or_error.hpp>

#include <boost/intrusive_ptr.hpp>
Expand Down Expand Up @@ -143,6 +144,22 @@ namespace detail
return result_type(std::forward<F1>(f1), std::forward<F2>(f2));
}

///////////////////////////////////////////////////////////////////////////
struct handle_continuation_recursion_count
{
handle_continuation_recursion_count()
: count_(threads::get_continuation_recursion_count())
{
++count_;
}
~handle_continuation_recursion_count()
{
--count_;
}

std::size_t& count_;
};

///////////////////////////////////////////////////////////////////////////
template <typename Result>
struct future_data : future_data_refcnt_base
Expand Down Expand Up @@ -219,6 +236,47 @@ namespace detail
return data_;
}

// deferred execution of a given continuation
void run_on_completed(completed_callback_type const& on_completed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to take the callback by value or rvalue reference here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What benefit would that give us?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably keeping constness/non-constness of the passed function. Other than that probably non.

{
try {
on_completed();
}
catch (hpx::exception const&) {
set_result(boost::current_exception());
}
}

// make sure continuation invocation does not recurse deeper than
// allowed
void handle_on_completed(completed_callback_type && on_completed)
{
handle_continuation_recursion_count cnt;

if (cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH)
{
// re-spawn continuation on a new thread
boost::intrusive_ptr<future_data> this_(this);

error_code ec;
threads::register_thread_nullary(
util::deferred_call(&future_data::run_on_completed,
std::move(this_), std::move(on_completed)),
"future_data<Result>::set_result", threads::pending,
true, threads::thread_priority_normal, std::size_t(-1),
threads::thread_stacksize_default, ec);

if (ec) {
// thread creation failed, report error to the future
this->set_exception(hpx::detail::access_exception(ec));
}
return;
}

// directly execute continuation on this thread
on_completed();
}

/// Set the result of the requested action.
template <typename Target>
void set_result(Target && data, error_code& ec = throws)
Expand Down Expand Up @@ -249,7 +307,7 @@ namespace detail

// invoke the callback (continuation) function
if (on_completed)
on_completed();
handle_on_completed(std::move(on_completed));
}

// helper functions for setting data (if successful) or the error (if
Expand All @@ -259,7 +317,7 @@ namespace detail
{
// set the received result, reset error status
try {
typedef typename util::decay<T>::type naked_type;
typedef typename util::decay<T>::type naked_type;

typedef traits::get_remote_result<
result_type, naked_type
Expand Down Expand Up @@ -323,7 +381,7 @@ namespace detail
// invoke the callback (continuation) function right away
l.unlock();

data_sink();
handle_on_completed(std::move(data_sink));
}
else {
// store a combined callback wrapping the old and the new one
Expand Down Expand Up @@ -431,7 +489,7 @@ namespace detail
error_code ec;
threads::thread_id_type id = threads::register_thread_nullary(
util::bind(util::one_shot(&timed_future_data::set_data),
this_, std::forward<Result_>(init)),
std::move(this_), std::forward<Result_>(init)),
"timed_future_data<Result>::timed_future_data",
threads::suspended, true, threads::thread_priority_normal,
std::size_t(-1), threads::thread_stacksize_default, ec);
Expand Down Expand Up @@ -571,20 +629,20 @@ namespace detail
hpx::threads::get_self_id());

if (sched_) {
sched_->add(util::bind(&task_base::run_impl, this_),
sched_->add(util::bind(&task_base::run_impl, std::move(this_)),
desc ? desc : "task_base::apply", threads::pending, false,
stacksize, ec);
}
else if (policy == launch::fork) {
threads::register_thread_plain(
util::bind(&task_base::run_impl, this_),
util::bind(&task_base::run_impl, std::move(this_)),
desc ? desc : "task_base::apply", threads::pending, false,
threads::thread_priority_boost, get_worker_thread_num(),
stacksize, ec);
}
else {
threads::register_thread_plain(
util::bind(&task_base::run_impl, this_),
util::bind(&task_base::run_impl, std::move(this_)),
desc ? desc : "task_base::apply", threads::pending, false,
priority, std::size_t(-1), stacksize, ec);
}
Expand Down