Skip to content

Commit

Permalink
Fixed #1613: Dataflow causes stack overflow
Browse files Browse the repository at this point in the history
- Applying general fix to limit recursion depth while executing continuations
- Adding test triggering stack overflows (without this patch)
- Fly-by changes in future_data avoiding unneeded refcnt incref/decref
  • Loading branch information
hkaiser committed Jun 16, 2015
1 parent 3cd367f commit dddf8c5
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 7 deletions.
2 changes: 1 addition & 1 deletion hpx/config.hpp
Expand Up @@ -438,7 +438,7 @@
// This limits how deep the internal recursion of future continuations will go
// before a new operation is re-spawned.
#if !defined(HPX_CONTINUATION_MAX_RECURSION_DEPTH)
#define HPX_CONTINUATION_MAX_RECURSION_DEPTH 50
#define HPX_CONTINUATION_MAX_RECURSION_DEPTH 4
#endif

///////////////////////////////////////////////////////////////////////////////
Expand Down
63 changes: 58 additions & 5 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -17,6 +17,7 @@
#include <hpx/util/move.hpp>
#include <hpx/util/unused.hpp>
#include <hpx/util/unique_function.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/detail/value_or_error.hpp>

#include <boost/intrusive_ptr.hpp>
Expand Down Expand Up @@ -143,6 +144,22 @@ namespace detail
return result_type(std::forward<F1>(f1), std::forward<F2>(f2));
}

///////////////////////////////////////////////////////////////////////////
struct handle_continuation_recursion_count
{
handle_continuation_recursion_count()
: count_(threads::get_continuation_recursion_count())
{
++count_;
}
~handle_continuation_recursion_count()
{
--count_;
}

std::size_t& count_;
};

///////////////////////////////////////////////////////////////////////////
template <typename Result>
struct future_data : future_data_refcnt_base
Expand Down Expand Up @@ -219,6 +236,17 @@ namespace detail
return data_;
}

// deferred execution of a given continuation
void run_on_completed(completed_callback_type const& on_completed)
{
try {
on_completed();
}
catch (hpx::exception const&) {
set_result(boost::current_exception());
}
}

/// Set the result of the requested action.
template <typename Target>
void set_result(Target && data, error_code& ec = throws)
Expand Down Expand Up @@ -249,7 +277,32 @@ namespace detail

// invoke the callback (continuation) function
if (on_completed)
{
handle_continuation_recursion_count cnt;

if (cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH)
{
// re-spawn continuation on a new thread
boost::intrusive_ptr<future_data> this_(this);

error_code ec;
threads::register_thread_nullary(
util::deferred_call(&future_data::run_on_completed,
std::move(this_), std::move(on_completed)),
"future_data<Result>::set_result", threads::pending,
true, threads::thread_priority_normal, std::size_t(-1),
threads::thread_stacksize_default, ec);

if (ec) {
// thread creation failed, report error to the future
this->set_exception(hpx::detail::access_exception(ec));
}
return;
}

// directly execute continuation on this thread
on_completed();
}
}

// helper functions for setting data (if successful) or the error (if
Expand All @@ -259,7 +312,7 @@ namespace detail
{
// set the received result, reset error status
try {
typedef typename util::decay<T>::type naked_type;
typedef typename util::decay<T>::type naked_type;

typedef traits::get_remote_result<
result_type, naked_type
Expand Down Expand Up @@ -431,7 +484,7 @@ namespace detail
error_code ec;
threads::thread_id_type id = threads::register_thread_nullary(
util::bind(util::one_shot(&timed_future_data::set_data),
this_, std::forward<Result_>(init)),
std::move(this_), std::forward<Result_>(init)),
"timed_future_data<Result>::timed_future_data",
threads::suspended, true, threads::thread_priority_normal,
std::size_t(-1), threads::thread_stacksize_default, ec);
Expand Down Expand Up @@ -571,20 +624,20 @@ namespace detail
hpx::threads::get_self_id());

if (sched_) {
sched_->add(util::bind(&task_base::run_impl, this_),
sched_->add(util::bind(&task_base::run_impl, std::move(this_)),
desc ? desc : "task_base::apply", threads::pending, false,
stacksize, ec);
}
else if (policy == launch::fork) {
threads::register_thread_plain(
util::bind(&task_base::run_impl, this_),
util::bind(&task_base::run_impl, std::move(this_)),
desc ? desc : "task_base::apply", threads::pending, false,
threads::thread_priority_boost, get_worker_thread_num(),
stacksize, ec);
}
else {
threads::register_thread_plain(
util::bind(&task_base::run_impl, this_),
util::bind(&task_base::run_impl, std::move(this_)),
desc ? desc : "task_base::apply", threads::pending, false,
priority, std::size_t(-1), stacksize, ec);
}
Expand Down
2 changes: 2 additions & 0 deletions hpx/runtime/threads/thread_helpers.hpp
Expand Up @@ -399,6 +399,8 @@ namespace hpx { namespace threads
std::size_t data, error_code& ec = throws);
#endif

HPX_API_EXPORT std::size_t& get_continuation_recursion_count();

/// Returns a non-null pointer to the executor which was used to create
/// the given thread.
///
Expand Down
10 changes: 9 additions & 1 deletion hpx/util/coroutine/detail/context_base.hpp
Expand Up @@ -115,7 +115,8 @@ namespace hpx { namespace util { namespace coroutines { namespace detail
m_thread_data(0),
#endif
m_type_info(),
m_thread_id(id)
m_thread_id(id),
continuation_recursion_count_(0)
{}

friend void intrusive_ptr_add_ref(type * ctx)
Expand Down Expand Up @@ -474,6 +475,11 @@ namespace hpx { namespace util { namespace coroutines { namespace detail
}
#endif

std::size_t& get_continuation_recursion_count()
{
return continuation_recursion_count_;
}

static boost::uint64_t get_allocation_count_all(bool reset)
{
boost::uint64_t count = 0;
Expand Down Expand Up @@ -614,6 +620,8 @@ namespace hpx { namespace util { namespace coroutines { namespace detail
// This is used to generate a meaningful exception trace.
boost::exception_ptr m_type_info;
thread_id_repr_type m_thread_id;

std::size_t continuation_recursion_count_;
};

// initialize static allocation counter
Expand Down
6 changes: 6 additions & 0 deletions hpx/util/coroutine/detail/self.hpp
Expand Up @@ -221,6 +221,12 @@ namespace hpx { namespace util { namespace coroutines { namespace detail
}
#endif

std::size_t& get_continuation_recursion_count()
{
HPX_ASSERT(m_pimpl);
return m_pimpl->get_continuation_recursion_count();
}

#if defined(HPX_HAVE_GENERIC_CONTEXT_COROUTINES)

yield_result_type yield_impl(
Expand Down
6 changes: 6 additions & 0 deletions src/runtime/threads/thread_helpers.cpp
Expand Up @@ -240,6 +240,12 @@ namespace hpx { namespace threads
}
#endif

////////////////////////////////////////////////////////////////////////////
std::size_t& get_continuation_recursion_count()
{
return get_self().get_continuation_recursion_count();
}

///////////////////////////////////////////////////////////////////////////
void run_thread_exit_callbacks(thread_id_type const& id, error_code& ec)
{
Expand Down
1 change: 1 addition & 0 deletions tests/regressions/lcos/CMakeLists.txt
Expand Up @@ -38,6 +38,7 @@ if(HPX_WITH_CXX11_LAMBDAS)
set(tests ${tests}
dataflow_const_functor_773
dataflow_future_swap2
dataflow_recursion_1613
dataflow_using_774
)
endif()
Expand Down
49 changes: 49 additions & 0 deletions tests/regressions/lcos/dataflow_recursion_1613.cpp
@@ -0,0 +1,49 @@
// Copyright (c) 2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// This test case demonstrates the issue described in #1613: Dataflow causes
// stack overflow

#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/util/lightweight_test.hpp>

#include <boost/atomic.hpp>

#define NUM_FUTURES std::size_t(1000)

int main()
{
hpx::lcos::local::promise<void> first_promise;

std::vector<hpx::shared_future<void> > results;
results.reserve(NUM_FUTURES+1);

boost::atomic<std::size_t> executed_dataflow(0);

results.push_back(first_promise.get_future());
for (std::size_t i = 0; i != NUM_FUTURES; ++i)
{
results.push_back(
hpx::lcos::local::dataflow(
hpx::launch::sync,
[&](hpx::shared_future<void> &&)
{
++executed_dataflow;
},
results.back()
)
);
}

// make futures ready in backwards sequence
hpx::apply([&first_promise]() { first_promise.set_value(); });

hpx::wait_all(results);
HPX_TEST_EQ(executed_dataflow.load(), NUM_FUTURES);

return hpx::util::report_errors();
}

0 comments on commit dddf8c5

Please sign in to comment.