Skip to content

Commit

Permalink
Merge pull request #1519 from STEllAR-GROUP/cv-locks
Browse files Browse the repository at this point in the history
Making detail::condition_variable safe to use
  • Loading branch information
K-ballo committed May 21, 2015
2 parents c48f107 + 5a7d4f6 commit 289f98a
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 140 deletions.
24 changes: 10 additions & 14 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -22,6 +22,7 @@
#include <boost/intrusive_ptr.hpp>
#include <boost/detail/atomic_count.hpp>
#include <boost/detail/scoped_enum_emulation.hpp>
#include <boost/thread/lock_types.hpp>

///////////////////////////////////////////////////////////////////////////////
namespace hpx { namespace lcos
Expand Down Expand Up @@ -224,7 +225,7 @@ namespace detail
{
completed_callback_type on_completed;
{
typename mutex_type::scoped_lock l(this->mtx_);
boost::unique_lock<mutex_type> l(this->mtx_);

// check whether the data already has been set
if (is_ready_locked()) {
Expand All @@ -243,12 +244,7 @@ namespace detail
state_ = full;

// handle all threads waiting for the block to become full
cond_.notify_all(l, ec);

// Note: cond_.notify_all() may cause this shared state to be
// destroyed. This is, however, safe as the scoped lock above
// will not touch the wrapped mutex if it was unlocked inside
// notify_all().
cond_.notify_all(std::move(l), ec);
}

// invoke the callback (continuation) function
Expand Down Expand Up @@ -301,7 +297,7 @@ namespace detail
/// operation. Allows any subsequent set_data operation to succeed.
void reset(error_code& /*ec*/ = throws)
{
typename mutex_type::scoped_lock l(this->mtx_);
boost::unique_lock<mutex_type> l(this->mtx_);
state_ = empty;

// release any stored data and callback functions
Expand All @@ -318,7 +314,7 @@ namespace detail
{
if (!data_sink) return;

typename mutex_type::scoped_lock l(this->mtx_);
boost::unique_lock<mutex_type> l(this->mtx_);

if (is_ready_locked()) {
// invoke the callback (continuation) function right away
Expand All @@ -337,7 +333,7 @@ namespace detail

virtual void wait(error_code& ec = throws)
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);

// block if this entry is empty
if (state_ == empty) {
Expand All @@ -355,7 +351,7 @@ namespace detail
wait_until(boost::chrono::steady_clock::time_point const& abs_time,
error_code& ec = throws)
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);

// block if this entry is empty
if (state_ == empty) {
Expand All @@ -380,7 +376,7 @@ namespace detail
/// \a future.
bool is_ready() const
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
return is_ready_locked();
}

Expand All @@ -391,13 +387,13 @@ namespace detail

bool has_value() const
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
return state_ != empty && data_.stores_value();
}

bool has_exception() const
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
return state_ != empty && data_.stores_error();
}

Expand Down
6 changes: 4 additions & 2 deletions hpx/lcos/local/barrier.hpp
Expand Up @@ -9,6 +9,8 @@
#include <hpx/lcos/local/detail/condition_variable.hpp>
#include <hpx/lcos/local/spinlock.hpp>

#include <boost/thread/lock_types.hpp>

///////////////////////////////////////////////////////////////////////////////
namespace hpx { namespace lcos { namespace local
{
Expand Down Expand Up @@ -36,13 +38,13 @@ namespace hpx { namespace lcos { namespace local
/// entered this function.
void wait()
{
mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
if (cond_.size(l) < number_of_threads_-1) {
cond_.wait(l, "barrier::wait");
}
else {
// release the threads
cond_.notify_all(l);
cond_.notify_all(std::move(l));
}
}

Expand Down
13 changes: 7 additions & 6 deletions hpx/lcos/local/condition_variable.hpp
Expand Up @@ -14,6 +14,7 @@
#include <hpx/util/date_time_chrono.hpp>

#include <boost/detail/scoped_enum_emulation.hpp>
#include <boost/thread/lock_types.hpp>

///////////////////////////////////////////////////////////////////////////////
namespace hpx { namespace lcos { namespace local
Expand All @@ -32,21 +33,21 @@ namespace hpx { namespace lcos { namespace local
public:
void notify_one(error_code& ec = throws)
{
mutex_type::scoped_lock l(mtx_);
cond_.notify_one(l, ec);
boost::unique_lock<mutex_type> l(mtx_);
cond_.notify_one(std::move(l), ec);
}

void notify_all(error_code& ec = throws)
{
mutex_type::scoped_lock l(mtx_);
cond_.notify_all(l, ec);
boost::unique_lock<mutex_type> l(mtx_);
cond_.notify_all(std::move(l), ec);
}

template <class Lock>
void wait(Lock& lock, error_code& ec = throws)
{
util::ignore_all_while_checking ignore_lock;
mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
util::scoped_unlock<Lock> unlock(lock);

cond_.wait(l, ec);
Expand All @@ -69,7 +70,7 @@ namespace hpx { namespace lcos { namespace local
error_code& ec = throws)
{
util::ignore_all_while_checking ignore_lock;
mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
util::scoped_unlock<Lock> unlock(lock);

threads::thread_state_ex_enum const reason =
Expand Down
29 changes: 18 additions & 11 deletions hpx/lcos/local/counting_semaphore.hpp
Expand Up @@ -13,6 +13,7 @@
#include <hpx/util/assert.hpp>

#include <boost/cstdint.hpp>
#include <boost/thread/lock_types.hpp>

#if defined(BOOST_MSVC)
#pragma warning(push)
Expand Down Expand Up @@ -67,7 +68,7 @@ namespace hpx { namespace lcos { namespace local
/// yielded.
void wait(boost::int64_t count = 1)
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
wait_locked(count, l);
}

Expand All @@ -84,7 +85,7 @@ namespace hpx { namespace lcos { namespace local
/// are available at this point in time.
bool try_wait(boost::int64_t count = 1)
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
if (!(value_ < count)) {
// enter wait_locked only if there are sufficient credits
// available
Expand All @@ -99,21 +100,23 @@ namespace hpx { namespace lcos { namespace local
///
void signal(boost::int64_t count = 1)
{
typename mutex_type::scoped_lock l(mtx_);
signal_locked(count, l);
boost::unique_lock<mutex_type> l(mtx_);
signal_locked(count, std::move(l));
}

boost::int64_t signal_all()
{
typename mutex_type::scoped_lock l(mtx_);
boost::unique_lock<mutex_type> l(mtx_);
boost::int64_t count = static_cast<boost::int64_t>(cond_.size(l));
signal_locked(count, l);
signal_locked(count, std::move(l));
return count;
}

template <typename Lock>
void wait_locked(boost::int64_t count, Lock& l)
void wait_locked(boost::int64_t count,
boost::unique_lock<mutex_type>& l)
{
HPX_ASSERT(l.owns_lock());

while (value_ < count)
{
cond_.wait(l, "counting_semaphore::wait_locked");
Expand All @@ -122,17 +125,21 @@ namespace hpx { namespace lcos { namespace local
}

private:
template <typename Lock>
void signal_locked(boost::int64_t count, Lock& l)
void signal_locked(boost::int64_t count,
boost::unique_lock<mutex_type> l)
{
HPX_ASSERT(l.owns_lock());

mutex_type* mtx = l.mutex();
// release no more threads than we get resources
value_ += count;
for (boost::int64_t i = 0; value_ >= 0 && i < count; ++i)
{
// notify_one() returns false if no more threads are
// waiting
if (!cond_.notify_one(l))
if (!cond_.notify_one(std::move(l)))
break;
l = boost::unique_lock<mutex_type>(*mtx);
}
}

Expand Down

0 comments on commit 289f98a

Please sign in to comment.