Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making detail::condition_variable safe to use #1519

Merged
merged 3 commits into from May 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 10 additions & 14 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -22,6 +22,7 @@
#include <boost/intrusive_ptr.hpp>
#include <boost/detail/atomic_count.hpp>
#include <boost/detail/scoped_enum_emulation.hpp>
#include <boost/thread/lock_types.hpp>

///////////////////////////////////////////////////////////////////////////////
namespace hpx { namespace lcos
Expand Down Expand Up @@ -224,7 +225,7 @@ namespace detail
{
completed_callback_type on_completed;
{
typename mutex_type::scoped_lock l(this->mtx_);
boost::unique_lock<mutex_type> l(this->mtx_);

// check whether the data already has been set
if (is_ready_locked()) {
Expand All @@ -243,12 +244,7 @@ namespace detail
state_ = full;

// handle all threads waiting for the block to become full
cond_.notify_all(l, ec);

// Note: cond_.notify_all() may cause this shared state to be
// destroyed. This is, however, safe as the scoped lock above
// will not touch the wrapped mutex if it was unlocked inside
// notify_all().
cond_.notify_all(std::move(l), ec);
}

// invoke the callback (continuation) function
Expand Down Expand Up @@ -301,7 +297,7 @@ namespace detail
/// operation. Allows any subsequent set_data operation to succeed.
void reset(error_code& /*ec*/ = throws)
{
typename mutex_type::scoped_lock l(this->mtx_);
boost::unique_lock<mutex_type> l(this->mtx_);
state_ = empty;

// release any stored data and callback functions
Expand All @@ -318,7 +314,7 @@ namespace detail
{
if (!data_sink) return;

typename mutex_type::scoped_lock l(this->mtx_);
boost::unique_lock<mutex_type> l(this->mtx_);

if (is_ready_locked()) {
// invoke the callback (continuation) function right away
Expand All @@ -337,7 +333,7 @@ namespace detail

virtual void wait(error_code& ec = throws)
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);

// block if this entry is empty
if (state_ == empty) {
Expand All @@ -355,7 +351,7 @@ namespace detail
wait_until(boost::chrono::steady_clock::time_point const& abs_time,
error_code& ec = throws)
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);

// block if this entry is empty
if (state_ == empty) {
Expand All @@ -380,7 +376,7 @@ namespace detail
/// \a future.
bool is_ready() const
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
return is_ready_locked();
}

Expand All @@ -391,13 +387,13 @@ namespace detail

bool has_value() const
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
return state_ != empty && data_.stores_value();
}

bool has_exception() const
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
return state_ != empty && data_.stores_error();
}

Expand Down
6 changes: 4 additions & 2 deletions hpx/lcos/local/barrier.hpp
Expand Up @@ -9,6 +9,8 @@
#include <hpx/lcos/local/detail/condition_variable.hpp>
#include <hpx/lcos/local/spinlock.hpp>

#include <boost/thread/lock_types.hpp>

///////////////////////////////////////////////////////////////////////////////
namespace hpx { namespace lcos { namespace local
{
Expand Down Expand Up @@ -36,13 +38,13 @@ namespace hpx { namespace lcos { namespace local
/// entered this function.
void wait()
{
mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
if (cond_.size(l) < number_of_threads_-1) {
cond_.wait(l, "barrier::wait");
}
else {
// release the threads
cond_.notify_all(l);
cond_.notify_all(std::move(l));
}
}

Expand Down
13 changes: 7 additions & 6 deletions hpx/lcos/local/condition_variable.hpp
Expand Up @@ -14,6 +14,7 @@
#include <hpx/util/date_time_chrono.hpp>

#include <boost/detail/scoped_enum_emulation.hpp>
#include <boost/thread/lock_types.hpp>

///////////////////////////////////////////////////////////////////////////////
namespace hpx { namespace lcos { namespace local
Expand All @@ -32,21 +33,21 @@ namespace hpx { namespace lcos { namespace local
public:
void notify_one(error_code& ec = throws)
{
mutex_type::scoped_lock l(mtx_);
cond_.notify_one(l, ec);
boost::unique_lock<mutex_type> l(mtx_);
cond_.notify_one(std::move(l), ec);
}

void notify_all(error_code& ec = throws)
{
mutex_type::scoped_lock l(mtx_);
cond_.notify_all(l, ec);
boost::unique_lock<mutex_type> l(mtx_);
cond_.notify_all(std::move(l), ec);
}

template <class Lock>
void wait(Lock& lock, error_code& ec = throws)
{
util::ignore_all_while_checking ignore_lock;
mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
util::scoped_unlock<Lock> unlock(lock);

cond_.wait(l, ec);
Expand All @@ -69,7 +70,7 @@ namespace hpx { namespace lcos { namespace local
error_code& ec = throws)
{
util::ignore_all_while_checking ignore_lock;
mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
util::scoped_unlock<Lock> unlock(lock);

threads::thread_state_ex_enum const reason =
Expand Down
29 changes: 18 additions & 11 deletions hpx/lcos/local/counting_semaphore.hpp
Expand Up @@ -13,6 +13,7 @@
#include <hpx/util/assert.hpp>

#include <boost/cstdint.hpp>
#include <boost/thread/lock_types.hpp>

#if defined(BOOST_MSVC)
#pragma warning(push)
Expand Down Expand Up @@ -67,7 +68,7 @@ namespace hpx { namespace lcos { namespace local
/// yielded.
void wait(boost::int64_t count = 1)
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
wait_locked(count, l);
}

Expand All @@ -84,7 +85,7 @@ namespace hpx { namespace lcos { namespace local
/// are available at this point in time.
bool try_wait(boost::int64_t count = 1)
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
if (!(value_ < count)) {
// enter wait_locked only if there are sufficient credits
// available
Expand All @@ -99,21 +100,23 @@ namespace hpx { namespace lcos { namespace local
///
void signal(boost::int64_t count = 1)
{
typename mutex_type::scoped_lock l(mtx_);
signal_locked(count, l);
boost::unique_lock<mutex_type> l(mtx_);
signal_locked(count, std::move(l));
}

boost::int64_t signal_all()
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
boost::int64_t count = static_cast<boost::int64_t>(cond_.size(l));
signal_locked(count, l);
signal_locked(count, std::move(l));
return count;
}

template <typename Lock>
void wait_locked(boost::int64_t count, Lock& l)
void wait_locked(boost::int64_t count,
boost::unique_lock<mutex_type>& l)
{
HPX_ASSERT(l.owns_lock());

while (value_ < count)
{
cond_.wait(l, "counting_semaphore::wait_locked");
Expand All @@ -122,17 +125,21 @@ namespace hpx { namespace lcos { namespace local
}

private:
template <typename Lock>
void signal_locked(boost::int64_t count, Lock& l)
void signal_locked(boost::int64_t count,
boost::unique_lock<mutex_type> l)
{
HPX_ASSERT(l.owns_lock());

mutex_type* mtx = l.mutex();
// release no more threads than we get resources
value_ += count;
for (boost::int64_t i = 0; value_ >= 0 && i < count; ++i)
{
// notify_one() returns false if no more threads are
// waiting
if (!cond_.notify_one(l))
if (!cond_.notify_one(std::move(l)))
break;
l = boost::unique_lock<mutex_type>(*mtx);
}
}

Expand Down