Skip to content

Commit

Permalink
Fixing race condition in condition_variable
Browse files Browse the repository at this point in the history
We need to reaquire the lock in lcos::detail::condition_variable::wait(_until)
in order to avoid a race condition between setting the thread id to invalid
and resetting the queue entry
  • Loading branch information
Thomas Heller committed Oct 13, 2016
1 parent c1b2842 commit e267a95
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 85 deletions.
4 changes: 2 additions & 2 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -564,7 +564,7 @@ namespace detail

// block if this entry is empty
if (state_ == empty) {
cond_.wait(std::move(l), "future_data::wait", ec);
cond_.wait(l, "future_data::wait", ec);
if (ec) return;
}

Expand All @@ -581,7 +581,7 @@ namespace detail
// block if this entry is empty
if (state_ == empty) {
threads::thread_state_ex_enum const reason =
cond_.wait_until(std::move(l), abs_time,
cond_.wait_until(l, abs_time,
"future_data::wait_until", ec);
if (ec) return future_status::uninitialized;

Expand Down
8 changes: 4 additions & 4 deletions hpx/lcos/local/condition_variable.hpp
Expand Up @@ -57,7 +57,7 @@ namespace hpx { namespace lcos { namespace local
std::unique_lock<mutex_type> l(mtx_);
util::unlock_guard<std::unique_lock<mutex> > unlock(lock);

cond_.wait(std::move(l), ec);
cond_.wait(l, ec);
}

template <class Predicate>
Expand All @@ -83,7 +83,7 @@ namespace hpx { namespace lcos { namespace local
util::unlock_guard<std::unique_lock<mutex> > unlock(lock);

threads::thread_state_ex_enum const reason =
cond_.wait_until(std::move(l), abs_time, ec);
cond_.wait_until(l, abs_time, ec);

if (ec) return cv_status::error;

Expand Down Expand Up @@ -155,7 +155,7 @@ namespace hpx { namespace lcos { namespace local
std::unique_lock<mutex_type> l(mtx_);
util::unlock_guard<Lock> unlock(lock);

cond_.wait(std::move(l), ec);
cond_.wait(l, ec);
}

template <class Lock, class Predicate>
Expand All @@ -181,7 +181,7 @@ namespace hpx { namespace lcos { namespace local
util::unlock_guard<Lock> unlock(lock);

threads::thread_state_ex_enum const reason =
cond_.wait_until(std::move(l), abs_time, ec);
cond_.wait_until(l, abs_time, ec);

if (ec) return cv_status::error;

Expand Down
72 changes: 2 additions & 70 deletions hpx/lcos/local/detail/condition_variable.hpp
Expand Up @@ -30,24 +30,6 @@ namespace hpx { namespace lcos { namespace local { namespace detail
private:
typedef lcos::local::spinlock mutex_type;

class relock_guard
{
HPX_NON_COPYABLE(relock_guard);

public:
explicit relock_guard(std::unique_lock<mutex_type>& l)
: l_(l)
{}

~relock_guard()
{
l_.lock();
}

private:
std::unique_lock<mutex_type>& l_;
};

private:
// define data structures needed for intrusive slist container used for
// the queues
Expand Down Expand Up @@ -119,23 +101,8 @@ namespace hpx { namespace lcos { namespace local { namespace detail
std::unique_lock<mutex_type> lock);

HPX_EXPORT threads::thread_state_ex_enum wait(
std::unique_lock<mutex_type>&& lock,
char const* description, error_code& ec = throws);

threads::thread_state_ex_enum wait(
std::unique_lock<mutex_type>& lock,
char const* description, error_code& ec = throws)
{
relock_guard rl(lock);
return wait(std::move(lock), description, ec);
}

threads::thread_state_ex_enum wait(
std::unique_lock<mutex_type>&& lock,
error_code& ec = throws)
{
return wait(std::move(lock), "condition_variable::wait", ec);
}
char const* description, error_code& ec = throws);

threads::thread_state_ex_enum wait(
std::unique_lock<mutex_type>& lock,
Expand All @@ -145,27 +112,9 @@ namespace hpx { namespace lcos { namespace local { namespace detail
}

HPX_EXPORT threads::thread_state_ex_enum wait_until(
std::unique_lock<mutex_type>&& lock,
util::steady_time_point const& abs_time,
char const* description, error_code& ec = throws);

threads::thread_state_ex_enum wait_until(
std::unique_lock<mutex_type>& lock,
util::steady_time_point const& abs_time,
char const* description, error_code& ec = throws)
{
relock_guard rl(lock);
return wait_until(std::move(lock), abs_time, description, ec);
}

threads::thread_state_ex_enum wait_until(
std::unique_lock<mutex_type>&& lock,
util::steady_time_point const& abs_time,
error_code& ec = throws)
{
return wait_until(std::move(lock), abs_time,
"condition_variable::wait_until", ec);
}
char const* description, error_code& ec = throws);

threads::thread_state_ex_enum wait_until(
std::unique_lock<mutex_type>& lock,
Expand All @@ -176,14 +125,6 @@ namespace hpx { namespace lcos { namespace local { namespace detail
"condition_variable::wait_until", ec);
}

threads::thread_state_ex_enum wait_for(
std::unique_lock<mutex_type>&& lock,
util::steady_duration const& rel_time,
char const* description, error_code& ec = throws)
{
return wait_until(std::move(lock), rel_time.from_now(), description, ec);
}

threads::thread_state_ex_enum wait_for(
std::unique_lock<mutex_type>& lock,
util::steady_duration const& rel_time,
Expand All @@ -192,15 +133,6 @@ namespace hpx { namespace lcos { namespace local { namespace detail
return wait_until(lock, rel_time.from_now(), description, ec);
}

threads::thread_state_ex_enum wait_for(
std::unique_lock<mutex_type>&& lock,
util::steady_duration const& rel_time,
error_code& ec = throws)
{
return wait_until(std::move(lock), rel_time.from_now(),
"condition_variable::wait_for", ec);
}

threads::thread_state_ex_enum wait_for(
std::unique_lock<mutex_type>& lock,
util::steady_duration const& rel_time,
Expand Down
4 changes: 2 additions & 2 deletions hpx/lcos/local/latch.hpp
Expand Up @@ -88,7 +88,7 @@ namespace hpx { namespace lcos { namespace local
if (--counter_ == 0)
cond_.notify_all(std::move(l)); // release the threads
else
cond_.wait(std::move(l), "hpx::local::latch::count_down_and_wait");
cond_.wait(l, "hpx::local::latch::count_down_and_wait");
}

/// Decrements counter_ by n. Does not block.
Expand Down Expand Up @@ -131,7 +131,7 @@ namespace hpx { namespace lcos { namespace local
{
std::unique_lock<mutex_type> l(mtx_);
if (counter_ > 0)
cond_.wait(std::move(l), "hpx::local::latch::wait");
cond_.wait(l, "hpx::local::latch::wait");
}

void abort_all()
Expand Down
2 changes: 1 addition & 1 deletion hpx/lcos/server/barrier.hpp
Expand Up @@ -75,7 +75,7 @@ namespace hpx { namespace lcos { namespace server
{
std::unique_lock<mutex_type> l(mtx_);
if (cond_.size(l) < number_of_threads_-1) {
cond_.wait(std::move(l), "barrier::set_event");
cond_.wait(l, "barrier::set_event");
}
else {
cond_.notify_all(std::move(l));
Expand Down
8 changes: 4 additions & 4 deletions src/lcos/local/detail/condition_variable.cpp
Expand Up @@ -172,7 +172,7 @@ namespace hpx { namespace lcos { namespace local { namespace detail
}

threads::thread_state_ex_enum condition_variable::wait(
std::unique_lock<mutex_type>&& lock,
std::unique_lock<mutex_type>& lock,
char const* description, error_code& ec)
{
HPX_ASSERT(threads::get_self_ptr() != nullptr);
Expand All @@ -186,7 +186,7 @@ namespace hpx { namespace lcos { namespace local { namespace detail
threads::thread_state_ex_enum reason = threads::wait_unknown;
{
// yield this thread
lock.unlock();
util::unlock_guard<std::unique_lock<mutex_type> > ul(lock);
reason = this_thread::suspend(threads::suspended, description, ec);
if (ec) return threads::wait_unknown;
}
Expand All @@ -196,7 +196,7 @@ namespace hpx { namespace lcos { namespace local { namespace detail
}

threads::thread_state_ex_enum condition_variable::wait_until(
std::unique_lock<mutex_type>&& lock,
std::unique_lock<mutex_type>& lock,
util::steady_time_point const& abs_time,
char const* description, error_code& ec)
{
Expand All @@ -211,7 +211,7 @@ namespace hpx { namespace lcos { namespace local { namespace detail
threads::thread_state_ex_enum reason = threads::wait_unknown;
{
// yield this thread
lock.unlock();
util::unlock_guard<std::unique_lock<mutex_type> > ul(lock);
reason = this_thread::suspend(abs_time, description, ec);
if (ec) return threads::wait_unknown;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/lcos/counting_semaphore.cpp
Expand Up @@ -22,8 +22,8 @@ boost::atomic<int> count(0);

void worker(hpx::lcos::local::counting_semaphore& sem)
{
sem.signal(); // signal main thread
++count;
sem.signal(); // signal main thread
}

///////////////////////////////////////////////////////////////////////////////
Expand All @@ -37,7 +37,7 @@ int hpx_main()
// Wait for all threads to finish executing.
sem.wait(10);

HPX_TEST(count == 10);
HPX_TEST_EQ(count, 10);

return hpx::finalize();
}
Expand Down

0 comments on commit e267a95

Please sign in to comment.