Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor local::condition_variable #2072

Merged
merged 6 commits into from Apr 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/quickstart/init_globally.cpp
Expand Up @@ -98,7 +98,7 @@ struct manage_global_runtime
{
// notify hpx_main above to tear down the runtime
{
std::lock_guard<hpx::lcos::local::spinlock> lk(mtx_);
std::lock_guard<hpx::lcos::local::mutex> lk(mtx_);
rts_ = 0; // reset pointer
}

Expand Down Expand Up @@ -137,7 +137,7 @@ struct manage_global_runtime

// Now, wait for destructor to be called.
{
std::unique_lock<hpx::lcos::local::spinlock> lk(mtx_);
boost::unique_lock<hpx::lcos::local::mutex> lk(mtx_);
if (rts_ != 0)
cond_.wait(lk);
}
Expand All @@ -147,7 +147,7 @@ struct manage_global_runtime
}

private:
hpx::lcos::local::spinlock mtx_;
hpx::lcos::local::mutex mtx_;
hpx::lcos::local::condition_variable cond_;

std::mutex startup_mtx_;
Expand Down
1 change: 0 additions & 1 deletion hpx/exception_list.hpp
Expand Up @@ -10,7 +10,6 @@

#include <hpx/exception.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/util/unlock_guard.hpp>

#include <boost/thread/locks.hpp>
#include <boost/exception_ptr.hpp>
Expand Down
8 changes: 3 additions & 5 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -577,10 +577,8 @@ namespace detail

// block if this entry is empty
if (state_ == empty) {
cond_.wait(l, "future_data::wait", ec);
cond_.wait(std::move(l), "future_data::wait", ec);
if (ec) return;

HPX_ASSERT(state_ != empty);
}

if (&ec != &throws)
Expand All @@ -596,13 +594,13 @@ namespace detail
// block if this entry is empty
if (state_ == empty) {
threads::thread_state_ex_enum const reason =
cond_.wait_until(l, abs_time, "future_data::wait_until", ec);
cond_.wait_until(std::move(l), abs_time,
"future_data::wait_until", ec);
if (ec) return future_status::uninitialized;

if (reason == threads::wait_timeout)
return future_status::timeout;

HPX_ASSERT(state_ != empty);
return future_status::ready;
}

Expand Down
1 change: 1 addition & 0 deletions hpx/lcos/local/and_gate.hpp
Expand Up @@ -12,6 +12,7 @@
#include <hpx/lcos/local/no_mutex.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/assert_owns_lock.hpp>
#include <hpx/util/unlock_guard.hpp>

#include <boost/dynamic_bitset.hpp>
#include <boost/thread/locks.hpp>
Expand Down
2 changes: 1 addition & 1 deletion hpx/lcos/local/barrier.hpp
Expand Up @@ -40,7 +40,7 @@ namespace hpx { namespace lcos { namespace local
{
boost::unique_lock<mutex_type> l(mtx_);
if (cond_.size(l) < number_of_threads_-1) {
cond_.wait(l, "barrier::wait");
cond_.wait(std::move(l), "barrier::wait");
}
else {
// release the threads
Expand Down
117 changes: 111 additions & 6 deletions hpx/lcos/local/condition_variable.hpp
Expand Up @@ -8,10 +8,13 @@
#define HPX_LCOS_LOCAL_CONDITION_VARIABLE_DEC_4_2013_0130PM

#include <hpx/config.hpp>
#include <hpx/exception_fwd.hpp>
#include <hpx/lcos/local/detail/condition_variable.hpp>
#include <hpx/lcos/local/mutex.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/util/unlock_guard.hpp>
#include <hpx/util/assert_owns_lock.hpp>
#include <hpx/util/date_time_chrono.hpp>
#include <hpx/util/unlock_guard.hpp>

#include <boost/thread/locks.hpp>

Expand Down Expand Up @@ -43,21 +46,120 @@ namespace hpx { namespace lcos { namespace local
cond_.notify_all(std::move(l), ec);
}

void wait(boost::unique_lock<mutex>& lock, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_all_while_checking ignore_lock;
boost::unique_lock<mutex_type> l(mtx_);
util::unlock_guard<boost::unique_lock<mutex> > unlock(lock);

cond_.wait(std::move(l), ec);
}

template <class Predicate>
void wait(boost::unique_lock<mutex>& lock, Predicate pred,
error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

while (!pred())
{
wait(lock);
}
}

cv_status wait_until(boost::unique_lock<mutex>& lock,
util::steady_time_point const& abs_time,
error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_all_while_checking ignore_lock;
boost::unique_lock<mutex_type> l(mtx_);
util::unlock_guard<boost::unique_lock<mutex> > unlock(lock);

threads::thread_state_ex_enum const reason =
cond_.wait_until(std::move(l), abs_time, ec);

if (ec) return cv_status::error;

// if the timer has hit, the waiting period timed out
return (reason == threads::wait_timeout) ? //-V110
cv_status::timeout : cv_status::no_timeout;
}

template <typename Predicate>
bool wait_until(boost::unique_lock<mutex>& lock,
util::steady_time_point const& abs_time, Predicate pred,
error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

while (!pred())
{
if (wait_until(lock, abs_time, ec) == cv_status::timeout)
return pred();
}
return true;
}

cv_status wait_for(boost::unique_lock<mutex>& lock,
util::steady_duration const& rel_time,
error_code& ec = throws)
{
return wait_until(lock, rel_time.from_now(), ec);
}

template <typename Predicate>
bool wait_for(boost::unique_lock<mutex>& lock,
util::steady_duration const& rel_time, Predicate pred,
error_code& ec = throws)
{
return wait_until(lock, rel_time.from_now(), std::move(pred), ec);
}

private:
mutable mutex_type mtx_;
detail::condition_variable cond_;
};

class condition_variable_any
{
private:
typedef lcos::local::spinlock mutex_type;

public:
void notify_one(error_code& ec = throws)
{
boost::unique_lock<mutex_type> l(mtx_);
cond_.notify_one(std::move(l), ec);
}

void notify_all(error_code& ec = throws)
{
util::ignore_all_while_checking ignore_lock;
boost::unique_lock<mutex_type> l(mtx_);
cond_.notify_all(std::move(l), ec);
}

template <class Lock>
void wait(Lock& lock, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_all_while_checking ignore_lock;
boost::unique_lock<mutex_type> l(mtx_);
util::unlock_guard<Lock> unlock(lock);

cond_.wait(l, ec);

l.unlock();
cond_.wait(std::move(l), ec);
}

template <class Lock, class Predicate>
void wait(Lock& lock, Predicate pred, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

while (!pred())
{
wait(lock);
Expand All @@ -69,14 +171,15 @@ namespace hpx { namespace lcos { namespace local
wait_until(Lock& lock, util::steady_time_point const& abs_time,
error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

util::ignore_all_while_checking ignore_lock;
boost::unique_lock<mutex_type> l(mtx_);
util::unlock_guard<Lock> unlock(lock);

threads::thread_state_ex_enum const reason =
cond_.wait_until(l, abs_time, ec);
cond_.wait_until(std::move(l), abs_time, ec);

l.unlock();
if (ec) return cv_status::error;

// if the timer has hit, the waiting period timed out
Expand All @@ -88,6 +191,8 @@ namespace hpx { namespace lcos { namespace local
bool wait_until(Lock& lock, util::steady_time_point const& abs_time,
Predicate pred, error_code& ec = throws)
{
HPX_ASSERT_OWNS_LOCK(lock);

while (!pred())
{
if (wait_until(lock, abs_time, ec) == cv_status::timeout)
Expand Down