Skip to content

Commit

Permalink
Merge pull request #2072 from STEllAR-GROUP/local-cv
Browse files Browse the repository at this point in the history
Refactor local::condition_variable
  • Loading branch information
sithhell committed Apr 6, 2016
2 parents 4bc8028 + 51dbd77 commit e1328ca
Show file tree
Hide file tree
Showing 34 changed files with 644 additions and 416 deletions.
6 changes: 3 additions & 3 deletions examples/quickstart/init_globally.cpp
Expand Up @@ -98,7 +98,7 @@ struct manage_global_runtime
{
// notify hpx_main above to tear down the runtime
{
std::lock_guard<hpx::lcos::local::spinlock> lk(mtx_);
std::lock_guard<hpx::lcos::local::mutex> lk(mtx_);
rts_ = 0; // reset pointer
}

Expand Down Expand Up @@ -137,7 +137,7 @@ struct manage_global_runtime

// Now, wait for destructor to be called.
{
std::unique_lock<hpx::lcos::local::spinlock> lk(mtx_);
boost::unique_lock<hpx::lcos::local::mutex> lk(mtx_);
if (rts_ != 0)
cond_.wait(lk);
}
Expand All @@ -147,7 +147,7 @@ struct manage_global_runtime
}

private:
hpx::lcos::local::spinlock mtx_;
hpx::lcos::local::mutex mtx_;
hpx::lcos::local::condition_variable cond_;

std::mutex startup_mtx_;
Expand Down
1 change: 0 additions & 1 deletion hpx/exception_list.hpp
Expand Up @@ -10,7 +10,6 @@

#include <hpx/exception.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/util/unlock_guard.hpp>

#include <boost/thread/locks.hpp>
#include <boost/exception_ptr.hpp>
Expand Down
8 changes: 3 additions & 5 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -577,10 +577,8 @@ namespace detail

// block if this entry is empty
if (state_ == empty) {
cond_.wait(l, "future_data::wait", ec);
cond_.wait(std::move(l), "future_data::wait", ec);
if (ec) return;

HPX_ASSERT(state_ != empty);
}

if (&ec != &throws)
Expand All @@ -596,13 +594,13 @@ namespace detail
// block if this entry is empty
if (state_ == empty) {
threads::thread_state_ex_enum const reason =
cond_.wait_until(l, abs_time, "future_data::wait_until", ec);
cond_.wait_until(std::move(l), abs_time,
"future_data::wait_until", ec);
if (ec) return future_status::uninitialized;

if (reason == threads::wait_timeout)
return future_status::timeout;

HPX_ASSERT(state_ != empty);
return future_status::ready;
}

Expand Down
1 change: 1 addition & 0 deletions hpx/lcos/local/and_gate.hpp
Expand Up @@ -12,6 +12,7 @@
#include <hpx/lcos/local/no_mutex.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/assert_owns_lock.hpp>
#include <hpx/util/unlock_guard.hpp>

#include <boost/dynamic_bitset.hpp>
#include <boost/thread/locks.hpp>
Expand Down
2 changes: 1 addition & 1 deletion hpx/lcos/local/barrier.hpp
Expand Up @@ -40,7 +40,7 @@ namespace hpx { namespace lcos { namespace local
{
boost::unique_lock<mutex_type> l(mtx_);
if (cond_.size(l) < number_of_threads_-1) {
cond_.wait(l, "barrier::wait");
cond_.wait(std::move(l), "barrier::wait");
}
else {
// release the threads
Expand Down
117 changes: 111 additions & 6 deletions hpx/lcos/local/condition_variable.hpp
Expand Up @@ -8,10 +8,13 @@
#define HPX_LCOS_LOCAL_CONDITION_VARIABLE_DEC_4_2013_0130PM

#include <hpx/config.hpp>
#include <hpx/exception_fwd.hpp>
#include <hpx/lcos/local/detail/condition_variable.hpp>
#include <hpx/lcos/local/mutex.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/util/unlock_guard.hpp>
#include <hpx/util/assert_owns_lock.hpp>
#include <hpx/util/date_time_chrono.hpp>
#include <hpx/util/unlock_guard.hpp>

#include <boost/thread/locks.hpp>

Expand Down Expand Up @@ -43,21 +46,120 @@ namespace hpx { namespace lcos { namespace local
cond_.notify_all(std::move(l), ec);
}

void wait(boost::unique_lock<mutex>& lock, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_all_while_checking ignore_lock;
boost::unique_lock<mutex_type> l(mtx_);
util::unlock_guard<boost::unique_lock<mutex> > unlock(lock);

cond_.wait(std::move(l), ec);
}

template <class Predicate>
void wait(boost::unique_lock<mutex>& lock, Predicate pred,
error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

while (!pred())
{
wait(lock);
}
}

cv_status wait_until(boost::unique_lock<mutex>& lock,
util::steady_time_point const& abs_time,
error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_all_while_checking ignore_lock;
boost::unique_lock<mutex_type> l(mtx_);
util::unlock_guard<boost::unique_lock<mutex> > unlock(lock);

threads::thread_state_ex_enum const reason =
cond_.wait_until(std::move(l), abs_time, ec);

if (ec) return cv_status::error;

// if the timer has hit, the waiting period timed out
return (reason == threads::wait_timeout) ? //-V110
cv_status::timeout : cv_status::no_timeout;
}

template <typename Predicate>
bool wait_until(boost::unique_lock<mutex>& lock,
util::steady_time_point const& abs_time, Predicate pred,
error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

while (!pred())
{
if (wait_until(lock, abs_time, ec) == cv_status::timeout)
return pred();
}
return true;
}

cv_status wait_for(boost::unique_lock<mutex>& lock,
util::steady_duration const& rel_time,
error_code& ec = throws)
{
return wait_until(lock, rel_time.from_now(), ec);
}

template <typename Predicate>
bool wait_for(boost::unique_lock<mutex>& lock,
util::steady_duration const& rel_time, Predicate pred,
error_code& ec = throws)
{
return wait_until(lock, rel_time.from_now(), std::move(pred), ec);
}

private:
mutable mutex_type mtx_;
detail::condition_variable cond_;
};

class condition_variable_any
{
private:
typedef lcos::local::spinlock mutex_type;

public:
void notify_one(error_code& ec = throws)
{
boost::unique_lock<mutex_type> l(mtx_);
cond_.notify_one(std::move(l), ec);
}

void notify_all(error_code& ec = throws)
{
util::ignore_all_while_checking ignore_lock;
boost::unique_lock<mutex_type> l(mtx_);
cond_.notify_all(std::move(l), ec);
}

template <class Lock>
void wait(Lock& lock, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_all_while_checking ignore_lock;
boost::unique_lock<mutex_type> l(mtx_);
util::unlock_guard<Lock> unlock(lock);

cond_.wait(l, ec);

l.unlock();
cond_.wait(std::move(l), ec);
}

template <class Lock, class Predicate>
void wait(Lock& lock, Predicate pred, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

while (!pred())
{
wait(lock);
Expand All @@ -69,14 +171,15 @@ namespace hpx { namespace lcos { namespace local
wait_until(Lock& lock, util::steady_time_point const& abs_time,
error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_all_while_checking ignore_lock;
boost::unique_lock<mutex_type> l(mtx_);
util::unlock_guard<Lock> unlock(lock);

threads::thread_state_ex_enum const reason =
cond_.wait_until(l, abs_time, ec);
cond_.wait_until(std::move(l), abs_time, ec);

l.unlock();
if (ec) return cv_status::error;

// if the timer has hit, the waiting period timed out
Expand All @@ -88,6 +191,8 @@ namespace hpx { namespace lcos { namespace local
bool wait_until(Lock& lock, util::steady_time_point const& abs_time,
Predicate pred, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

while (!pred())
{
if (wait_until(lock, abs_time, ec) == cv_status::timeout)
Expand Down

0 comments on commit e1328ca

Please sign in to comment.