Skip to content

Commit

Permalink
Merge #6258
Browse files Browse the repository at this point in the history
6258: Rewriting wait_some to circumvent data races causing hangs r=hkaiser a=hkaiser

- flyby: fixing possible race in latch

`@m-diers` Could you please try this patch to see if it fixes your issue?

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>
  • Loading branch information
StellarBot and hkaiser committed May 26, 2023
2 parents 691dfc5 + 973cb1b commit 5b092fe
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 64 deletions.
2 changes: 1 addition & 1 deletion libs/core/async_combinators/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022 The STE||AR-Group
# Copyright (c) 2019-2023 The STE||AR-Group
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,21 @@ namespace hpx {
#include <hpx/assert.hpp>
#include <hpx/async_combinators/detail/throw_if_exceptional.hpp>
#include <hpx/datastructures/tuple.hpp>
#include <hpx/functional/deferred_call.hpp>
#include <hpx/functional/tag_invoke.hpp>
#include <hpx/futures/future.hpp>
#include <hpx/futures/traits/acquire_shared_state.hpp>
#include <hpx/futures/traits/detail/future_traits.hpp>
#include <hpx/futures/traits/future_access.hpp>
#include <hpx/futures/traits/is_future.hpp>
#include <hpx/iterator_support/traits/is_iterator.hpp>
#include <hpx/modules/errors.hpp>
#include <hpx/preprocessor/strip_parens.hpp>
#include <hpx/modules/memory.hpp>
#include <hpx/thread_support/atomic_count.hpp>
#include <hpx/type_support/pack.hpp>

#include <algorithm>
#include <array>
#include <atomic>
#include <cstddef>
#include <iterator>
#include <memory>
#include <type_traits>
#include <utility>
#include <vector>
Expand All @@ -187,50 +184,42 @@ namespace hpx {
template <typename SharedState>
void operator()(SharedState const& shared_state) const
{
hpx::intrusive_ptr<wait_some<Sequence>> this_ = &wait_;
if constexpr (!traits::is_shared_state_v<SharedState>)
{
apply(shared_state);
}
else
{
std::size_t counter =
wait_.count_.load(std::memory_order_acquire);
wait_.count_.load(std::memory_order_relaxed);

if (counter < wait_.needed_count_ && shared_state)
{
if (!shared_state->is_ready(std::memory_order_relaxed))
{
// handle future only if not enough futures are
// ready yet also, do not touch any futures which
// are already ready
// ready yet also, do not touch any futures that are
// already ready
shared_state->execute_deferred();

// execute_deferred might have made the future ready
if (!shared_state->is_ready(
std::memory_order_relaxed))
{
auto state = shared_state;
shared_state->set_on_completed(
util::deferred_call(
&wait_some<Sequence>::on_future_ready,
wait_.shared_from_this(),
hpx::execution_base::this_thread::
agent()));
[this_ = HPX_MOVE(this_),
state = HPX_MOVE(state)] {
this_->on_future_ready(
state->has_exception());
});
return;
}
}

// check whether the current future is exceptional
if (!wait_.has_exceptional_results_ &&
shared_state->has_exception())
{
wait_.has_exceptional_results_ = true;
}
}

if (wait_.count_.fetch_add(1) + 1 == wait_.needed_count_)
{
wait_.goal_reached_on_calling_thread_ = true;
}
wait_.on_future_ready(shared_state->has_exception());
}
}

Expand Down Expand Up @@ -263,24 +252,25 @@ namespace hpx {
callback.apply(wait.values_);
}

///////////////////////////////////////////////////////////////////////
template <typename Sequence>
struct wait_some
: std::enable_shared_from_this<wait_some<Sequence>> //-V690
{
public:
void on_future_ready(hpx::execution_base::agent_ref ctx)
void on_future_ready(bool has_exception)
{
std::unique_lock l(mtx_.data_);

// check whether the current future is exceptional
if (has_exception)
{
has_exceptional_results_ = true;
}

if (count_.fetch_add(1) + 1 == needed_count_)
{
// reactivate waiting thread only if it's not us
if (ctx != hpx::execution_base::this_thread::agent())
{
ctx.resume();
}
else
{
goal_reached_on_calling_thread_ = true;
}
HPX_ASSERT_LOCKED(l, !notified_);
notified_ = true;
cond_.data_.notify_all(HPX_MOVE(l));
}
}

Expand All @@ -292,28 +282,35 @@ namespace hpx {
wait_some& operator=(wait_some&&) = delete;

public:
using argument_type = Sequence;

wait_some(argument_type const& values, std::size_t n) noexcept
: values_(values)
template <typename S>
wait_some(S&& values, std::size_t n) noexcept
: values_(HPX_FORWARD(S, values))
, count_(0)
, needed_count_(n)
, refcount_(1)
{
HPX_ASSERT_MSG(n > 0,
"wait_some should have to wait for at least one future to "
"become ready");
}

~wait_some() = default;

bool operator()()
{
// set callback functions to executed wait future is ready
set_on_completed_callback(*this);

// if all of the requested futures are already set, our
// callback above has already been called often enough, otherwise
// we suspend ourselves
if (!goal_reached_on_calling_thread_)
// if all of the requested futures are already set, our callback
// above has already been called often enough, otherwise we
// suspend ourselves
{
// wait for any of the futures to return to become ready
hpx::execution_base::this_thread::suspend(
"hpx::detail::wait_some::operator()");
std::unique_lock l(mtx_.data_);
if (count_.load(std::memory_order_acquire) < needed_count_)
{
HPX_ASSERT_LOCKED(l, !notified_);
cond_.data_.wait(l, "hpx::wait_some::operator()()");
}
}

// at least N futures should be ready
Expand All @@ -323,17 +320,44 @@ namespace hpx {
return has_exceptional_results_;
}

argument_type const& values_;
using argument_type =
std::conditional_t<std::is_reference_v<Sequence>, Sequence,
std::decay_t<Sequence>>;

argument_type values_;
std::atomic<std::size_t> count_;
std::size_t const needed_count_;
bool goal_reached_on_calling_thread_ = false;
bool has_exceptional_results_ = false;
bool notified_ = false;

mutable util::cache_line_data<hpx::spinlock> mtx_;
mutable util::cache_line_data<
hpx::lcos::local::detail::condition_variable>
cond_;

private:
friend void intrusive_ptr_add_ref(wait_some* p) noexcept
{
++p->refcount_;
}

friend void intrusive_ptr_release(wait_some* p) noexcept
{
if (0 == --p->refcount_)
{
delete p;
}
}

hpx::util::atomic_count refcount_;
};

template <typename T>
auto get_wait_some_frame(T const& values, std::size_t n)
template <typename Sequence>
auto get_wait_some_frame(Sequence&& values, std::size_t n)
{
return std::make_shared<hpx::detail::wait_some<T>>(values, n);
return hpx::intrusive_ptr<wait_some<Sequence>>(
new wait_some<Sequence>(HPX_FORWARD(Sequence, values), n),
false);
}
} // namespace detail

Expand All @@ -358,11 +382,10 @@ namespace hpx {
{
HPX_THROW_EXCEPTION(hpx::error::bad_parameter, "hpx::wait_some",
"number of results to wait for is out of bounds");
return false;
}

auto lazy_values = traits::acquire_shared_state_disp()(values);
auto f = detail::get_wait_some_frame(lazy_values, n);
auto f = detail::get_wait_some_frame(HPX_MOVE(lazy_values), n);

return (*f)();
}
Expand Down Expand Up @@ -406,11 +429,10 @@ namespace hpx {
{
HPX_THROW_EXCEPTION(hpx::error::bad_parameter, "hpx::wait_some",
"number of results to wait for is out of bounds");
return false;
}

auto lazy_values = traits::acquire_shared_state_disp()(values);
auto f = detail::get_wait_some_frame(lazy_values, n);
auto f = detail::get_wait_some_frame(HPX_MOVE(lazy_values), n);

return (*f)();
}
Expand Down Expand Up @@ -445,7 +467,7 @@ namespace hpx {
wait_some_nothrow_t, std::size_t n, Iterator begin, Iterator end)
{
auto values = traits::acquire_shared_state<Iterator>()(begin, end);
auto f = detail::get_wait_some_frame(values, n);
auto f = detail::get_wait_some_frame(HPX_MOVE(values), n);

return (*f)();
}
Expand All @@ -469,7 +491,6 @@ namespace hpx {
{
HPX_THROW_EXCEPTION(hpx::error::bad_parameter, "hpx::wait_some",
"number of results to wait for is out of bounds");
return false;
}

f.wait();
Expand All @@ -484,7 +505,6 @@ namespace hpx {
{
HPX_THROW_EXCEPTION(hpx::error::bad_parameter, "hpx::wait_some",
"number of results to wait for is out of bounds");
return false;
}

f.wait();
Expand All @@ -504,14 +524,13 @@ namespace hpx {
HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
"hpx::lcos::wait_some",
"number of results to wait for is out of bounds");
return false;
}

using result_type =
hpx::tuple<traits::detail::shared_state_ptr_for_t<Ts>...>;

result_type values(traits::detail::get_shared_state(ts)...);
auto f = detail::get_wait_some_frame(values, n);
auto f = detail::get_wait_some_frame(HPX_MOVE(values), n);

return (*f)();
}
Expand Down Expand Up @@ -652,7 +671,7 @@ namespace hpx {
{
auto values =
traits::acquire_shared_state<Iterator>()(begin, count);
auto f = detail::get_wait_some_frame(values, n);
auto f = detail::get_wait_some_frame(HPX_MOVE(values), n);

return (*f)();
}
Expand Down Expand Up @@ -758,7 +777,7 @@ namespace hpx::lcos {
Iterator wait_some_n(
std::size_t n, Iterator begin, std::size_t count, error_code& = throws)
{
hpx::wait_some(n, begin, count);
return hpx::wait_some(n, begin, count);
}

HPX_DEPRECATED_V(
Expand Down

0 comments on commit 5b092fe

Please sign in to comment.