diff --git a/examples/quickstart/init_globally.cpp b/examples/quickstart/init_globally.cpp index d55f8a82c320..d93fa42b472f 100644 --- a/examples/quickstart/init_globally.cpp +++ b/examples/quickstart/init_globally.cpp @@ -98,7 +98,7 @@ struct manage_global_runtime { // notify hpx_main above to tear down the runtime { - std::lock_guard lk(mtx_); + std::lock_guard lk(mtx_); rts_ = 0; // reset pointer } @@ -137,7 +137,7 @@ struct manage_global_runtime // Now, wait for destructor to be called. { - std::unique_lock lk(mtx_); + boost::unique_lock lk(mtx_); if (rts_ != 0) cond_.wait(lk); } @@ -147,7 +147,7 @@ struct manage_global_runtime } private: - hpx::lcos::local::spinlock mtx_; + hpx::lcos::local::mutex mtx_; hpx::lcos::local::condition_variable cond_; std::mutex startup_mtx_; diff --git a/hpx/exception_list.hpp b/hpx/exception_list.hpp index 656f23d576cc..8164e8ebfda2 100644 --- a/hpx/exception_list.hpp +++ b/hpx/exception_list.hpp @@ -10,7 +10,6 @@ #include #include -#include #include #include diff --git a/hpx/lcos/detail/future_data.hpp b/hpx/lcos/detail/future_data.hpp index 81b13a22894d..88c618ae54ce 100644 --- a/hpx/lcos/detail/future_data.hpp +++ b/hpx/lcos/detail/future_data.hpp @@ -577,10 +577,8 @@ namespace detail // block if this entry is empty if (state_ == empty) { - cond_.wait(l, "future_data::wait", ec); + cond_.wait(std::move(l), "future_data::wait", ec); if (ec) return; - - HPX_ASSERT(state_ != empty); } if (&ec != &throws) @@ -596,13 +594,13 @@ namespace detail // block if this entry is empty if (state_ == empty) { threads::thread_state_ex_enum const reason = - cond_.wait_until(l, abs_time, "future_data::wait_until", ec); + cond_.wait_until(std::move(l), abs_time, + "future_data::wait_until", ec); if (ec) return future_status::uninitialized; if (reason == threads::wait_timeout) return future_status::timeout; - HPX_ASSERT(state_ != empty); return future_status::ready; } diff --git a/hpx/lcos/local/and_gate.hpp b/hpx/lcos/local/and_gate.hpp index da91b256cf20..56bc6e6b0c80 100644 --- a/hpx/lcos/local/and_gate.hpp +++ b/hpx/lcos/local/and_gate.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include diff --git a/hpx/lcos/local/barrier.hpp b/hpx/lcos/local/barrier.hpp index 0f6f8f45922d..e995a78f3b70 100644 --- a/hpx/lcos/local/barrier.hpp +++ b/hpx/lcos/local/barrier.hpp @@ -40,7 +40,7 @@ namespace hpx { namespace lcos { namespace local { boost::unique_lock l(mtx_); if (cond_.size(l) < number_of_threads_-1) { - cond_.wait(l, "barrier::wait"); + cond_.wait(std::move(l), "barrier::wait"); } else { // release the threads diff --git a/hpx/lcos/local/condition_variable.hpp b/hpx/lcos/local/condition_variable.hpp index 03a689caae3b..c57d3d4643f7 100644 --- a/hpx/lcos/local/condition_variable.hpp +++ b/hpx/lcos/local/condition_variable.hpp @@ -8,10 +8,13 @@ #define HPX_LCOS_LOCAL_CONDITION_VARIABLE_DEC_4_2013_0130PM #include +#include #include +#include #include -#include +#include #include +#include #include @@ -43,21 +46,120 @@ namespace hpx { namespace lcos { namespace local cond_.notify_all(std::move(l), ec); } + void wait(boost::unique_lock& lock, error_code& ec = throws) + { + HPX_ASSERT_OWNS_LOCK(lock); + + util::ignore_all_while_checking ignore_lock; + boost::unique_lock l(mtx_); + util::unlock_guard > unlock(lock); + + cond_.wait(std::move(l), ec); + } + + template + void wait(boost::unique_lock& lock, Predicate pred, + error_code& ec = throws) + { + HPX_ASSERT_OWNS_LOCK(lock); + + while (!pred()) + { + wait(lock); + } + } + + cv_status wait_until(boost::unique_lock& lock, + util::steady_time_point const& abs_time, + error_code& ec = throws) + { + HPX_ASSERT_OWNS_LOCK(lock); + + util::ignore_all_while_checking ignore_lock; + boost::unique_lock l(mtx_); + util::unlock_guard > unlock(lock); + + threads::thread_state_ex_enum const reason = + cond_.wait_until(std::move(l), abs_time, ec); + + if (ec) return cv_status::error; + + // if the timer has hit, the waiting period timed out + return (reason == threads::wait_timeout) ? //-V110 + cv_status::timeout : cv_status::no_timeout; + } + + template + bool wait_until(boost::unique_lock& lock, + util::steady_time_point const& abs_time, Predicate pred, + error_code& ec = throws) + { + HPX_ASSERT_OWNS_LOCK(lock); + + while (!pred()) + { + if (wait_until(lock, abs_time, ec) == cv_status::timeout) + return pred(); + } + return true; + } + + cv_status wait_for(boost::unique_lock& lock, + util::steady_duration const& rel_time, + error_code& ec = throws) + { + return wait_until(lock, rel_time.from_now(), ec); + } + + template + bool wait_for(boost::unique_lock& lock, + util::steady_duration const& rel_time, Predicate pred, + error_code& ec = throws) + { + return wait_until(lock, rel_time.from_now(), std::move(pred), ec); + } + + private: + mutable mutex_type mtx_; + detail::condition_variable cond_; + }; + + class condition_variable_any + { + private: + typedef lcos::local::spinlock mutex_type; + + public: + void notify_one(error_code& ec = throws) + { + boost::unique_lock l(mtx_); + cond_.notify_one(std::move(l), ec); + } + + void notify_all(error_code& ec = throws) + { + util::ignore_all_while_checking ignore_lock; + boost::unique_lock l(mtx_); + cond_.notify_all(std::move(l), ec); + } + template void wait(Lock& lock, error_code& ec = throws) { + HPX_ASSERT_OWNS_LOCK(lock); + util::ignore_all_while_checking ignore_lock; boost::unique_lock l(mtx_); util::unlock_guard unlock(lock); - cond_.wait(l, ec); - - l.unlock(); + cond_.wait(std::move(l), ec); } template void wait(Lock& lock, Predicate pred, error_code& ec = throws) { + HPX_ASSERT_OWNS_LOCK(lock); + while (!pred()) { wait(lock); @@ -69,14 +171,15 @@ namespace hpx { namespace lcos { namespace local wait_until(Lock& lock, util::steady_time_point const& abs_time, error_code& ec = throws) { + HPX_ASSERT_OWNS_LOCK(lock); + util::ignore_all_while_checking ignore_lock; boost::unique_lock l(mtx_); util::unlock_guard unlock(lock); threads::thread_state_ex_enum const reason = - cond_.wait_until(l, abs_time, ec); + cond_.wait_until(std::move(l), abs_time, ec); - l.unlock(); if (ec) return cv_status::error; // if the timer has hit, the waiting period timed out @@ -88,6 +191,8 @@ namespace hpx { namespace lcos { namespace local bool wait_until(Lock& lock, util::steady_time_point const& abs_time, Predicate pred, error_code& ec = throws) { + HPX_ASSERT_OWNS_LOCK(lock); + while (!pred()) { if (wait_until(lock, abs_time, ec) == cv_status::timeout) diff --git a/hpx/lcos/local/detail/condition_variable.hpp b/hpx/lcos/local/detail/condition_variable.hpp index b5ed5fe8ecca..bc193d0b5f91 100644 --- a/hpx/lcos/local/detail/condition_variable.hpp +++ b/hpx/lcos/local/detail/condition_variable.hpp @@ -1,17 +1,17 @@ // Copyright (c) 2007-2013 Hartmut Kaiser -// Copyright (c) 2013 Agustin Berge +// Copyright (c) 2013-2015 Agustin Berge // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -#if !defined(HPX_LCOS_LOCAL_DETAIL_CONDITION_VARIABLE_DEC_4_2013_0130PM) -#define HPX_LCOS_LOCAL_DETAIL_CONDITION_VARIABLE_DEC_4_2013_0130PM +#ifndef HPX_LCOS_LOCAL_DETAIL_CONDITION_VARIABLE_HPP +#define HPX_LCOS_LOCAL_DETAIL_CONDITION_VARIABLE_HPP #include -#include -#include -#include -#include +#include +#include +#include +#include #include #include @@ -23,6 +23,27 @@ namespace hpx { namespace lcos { namespace local { namespace detail { HPX_NON_COPYABLE(condition_variable); + private: + typedef lcos::local::spinlock mutex_type; + + class relock_guard + { + HPX_NON_COPYABLE(relock_guard) + + public: + explicit relock_guard(boost::unique_lock& l) + : l_(l) + {} + + ~relock_guard() + { + l_.lock(); + } + + private: + boost::unique_lock& l_; + }; + private: // define data structures needed for intrusive slist container used for // the queues @@ -72,280 +93,78 @@ namespace hpx { namespace lcos { namespace local { namespace detail }; public: - condition_variable() - {} + HPX_EXPORT condition_variable(); - ~condition_variable() - { - if (!queue_.empty()) - { - LERR_(fatal) - << "~condition_variable: queue is not empty, " - "aborting threads"; + HPX_EXPORT ~condition_variable(); - local::no_mutex no_mtx; - boost::unique_lock lock(no_mtx); - abort_all(std::move(lock)); - } - } - - template - bool empty(boost::unique_lock const& lock) const - { - HPX_ASSERT_OWNS_LOCK(lock); - - return queue_.empty(); - } - - template - std::size_t size(boost::unique_lock const& lock) const - { - HPX_ASSERT_OWNS_LOCK(lock); + HPX_EXPORT bool empty( + boost::unique_lock const& lock) const; - return queue_.size(); - } + HPX_EXPORT std::size_t size( + boost::unique_lock const& lock) const; // Return false if no more threads are waiting (returns true if queue // is non-empty). - template - bool notify_one(boost::unique_lock lock, error_code& ec = throws) - { - HPX_ASSERT_OWNS_LOCK(lock); + HPX_EXPORT bool notify_one( + boost::unique_lock lock, error_code& ec = throws); - if (!queue_.empty()) - { - threads::thread_id_repr_type id = queue_.front().id_; + HPX_EXPORT void notify_all( + boost::unique_lock lock, error_code& ec = throws); - // remove item from queue before error handling - queue_.front().id_ = threads::invalid_thread_id_repr; - queue_.pop_front(); + HPX_EXPORT void abort_all( + boost::unique_lock lock); - if (HPX_UNLIKELY(id == threads::invalid_thread_id_repr)) - { - lock.unlock(); + HPX_EXPORT threads::thread_state_ex_enum wait( + boost::unique_lock&& lock, + char const* description, error_code& ec = throws); - HPX_THROWS_IF(ec, null_thread_id, - "condition_variable::notify_one", - "NULL thread id encountered"); - return false; - } - - bool not_empty = !queue_.empty(); - lock.unlock(); - - threads::set_thread_state(threads::thread_id_type( - reinterpret_cast(id)), - threads::pending, threads::wait_signaled, - threads::thread_priority_default, ec); - return not_empty; - } - - if (&ec != &throws) - ec = make_success_code(); - - return false; - } - - // re-add the remaining items to the original queue - template - void prepend_entries(boost::unique_lock& lock, queue_type& queue) + threads::thread_state_ex_enum wait( + boost::unique_lock& lock, + char const* description, error_code& ec = throws) { - HPX_ASSERT_OWNS_LOCK(lock); - - // splice is constant time only if it == end - queue.splice(queue.end(), queue_); - queue_.swap(queue); + relock_guard rl(lock); + return wait(std::move(lock), description, ec); } - template - void notify_all(boost::unique_lock lock, error_code& ec = throws) + threads::thread_state_ex_enum wait( + boost::unique_lock&& lock, + error_code& ec = throws) { - HPX_ASSERT_OWNS_LOCK(lock); - - // swap the list - queue_type queue; - queue.swap(queue_); - - if (!queue.empty()) - { - // update reference to queue for all queue entries - for (queue_entry& qe : queue) - qe.q_ = &queue; - - do { - threads::thread_id_repr_type id = queue.front().id_; - - // remove item from queue before error handling - queue.front().id_ = threads::invalid_thread_id_repr; - queue.pop_front(); - - if (HPX_UNLIKELY(id == threads::invalid_thread_id_repr)) - { - prepend_entries(lock, queue); - lock.unlock(); - - HPX_THROWS_IF(ec, null_thread_id, - "condition_variable::notify_all", - "NULL thread id encountered"); - return; - } - - error_code local_ec; - threads::set_thread_state(threads::thread_id_type( - reinterpret_cast(id)), - threads::pending, threads::wait_signaled, - threads::thread_priority_default, local_ec); - - if (local_ec) - { - prepend_entries(lock, queue); - lock.unlock(); - - if (&ec != &throws) - { - ec = std::move(local_ec); - } - else - { - boost::rethrow_exception( - hpx::detail::access_exception(local_ec)); - } - return; - } - - } while (!queue.empty()); - } - - if (&ec != &throws) - ec = make_success_code(); + return wait(std::move(lock), "condition_variable::wait", ec); } - template - void abort_all(boost::unique_lock lock) + threads::thread_state_ex_enum wait( + boost::unique_lock& lock, + error_code& ec = throws) { - HPX_ASSERT_OWNS_LOCK(lock); - - // new threads might have been added while we were notifying - while(!queue_.empty()) - { - // swap the list - queue_type queue; - queue.swap(queue_); - - // update reference to queue for all queue entries - for (queue_entry& qe : queue) - qe.q_ = &queue; - - while (!queue.empty()) - { - threads::thread_id_repr_type id = queue.front().id_; - - queue.front().id_ = threads::invalid_thread_id_repr; - queue.pop_front(); - - if (HPX_UNLIKELY(id == threads::invalid_thread_id_repr)) - { - LERR_(fatal) - << "condition_variable::abort_all:" - << " NULL thread id encountered"; - continue; - } - - // we know that the id is actually the pointer to the thread - threads::thread_id_type tid( - reinterpret_cast(id)); - - LERR_(fatal) - << "condition_variable::abort_all:" - << " pending thread: " - << get_thread_state_name( - threads::get_thread_state(tid)) - << "(" << tid << "): " - << threads::get_thread_description(tid); - - // unlock while notifying thread as this can suspend - util::unlock_guard > unlock(lock); - - // forcefully abort thread, do not throw - error_code ec(lightweight); - threads::set_thread_state(tid, - threads::pending, threads::wait_abort, - threads::thread_priority_default, ec); - if (ec) - { - LERR_(fatal) - << "condition_variable::abort_all:" - << " could not abort thread: " - << get_thread_state_name( - threads::get_thread_state(tid)) - << "(" << tid << "): " - << threads::get_thread_description(tid); - } - } - } + return wait(lock, "condition_variable::wait", ec); } - template - threads::thread_state_ex_enum - wait(boost::unique_lock& lock, - char const* description, error_code& ec = throws) - { - HPX_ASSERT(threads::get_self_ptr() != 0); - HPX_ASSERT_OWNS_LOCK(lock); - - // enqueue the request and block this thread - queue_entry f(threads::get_self_id().get(), &queue_); - queue_.push_back(f); - - reset_queue_entry r(f, queue_); - threads::thread_state_ex_enum reason = threads::wait_unknown; - { - // yield this thread - util::unlock_guard > unlock(lock); - reason = this_thread::suspend(threads::suspended, description, ec); - if (ec) return threads::wait_unknown; - } - - return (f.id_ != threads::invalid_thread_id_repr) ? - threads::wait_timeout : reason; - } + HPX_EXPORT threads::thread_state_ex_enum wait_until( + boost::unique_lock&& lock, + util::steady_time_point const& abs_time, + char const* description, error_code& ec = throws); - template - threads::thread_state_ex_enum - wait(boost::unique_lock& lock, error_code& ec = throws) + threads::thread_state_ex_enum wait_until( + boost::unique_lock& lock, + util::steady_time_point const& abs_time, + char const* description, error_code& ec = throws) { - return wait(lock, "condition_variable::wait", ec); + relock_guard rl(lock); + return wait_until(std::move(lock), abs_time, description, ec); } - template - threads::thread_state_ex_enum - wait_until(boost::unique_lock& lock, + threads::thread_state_ex_enum wait_until( + boost::unique_lock&& lock, util::steady_time_point const& abs_time, - char const* description, error_code& ec = throws) + error_code& ec = throws) { - HPX_ASSERT(threads::get_self_ptr() != 0); - HPX_ASSERT_OWNS_LOCK(lock); - - // enqueue the request and block this thread - queue_entry f(threads::get_self_id().get(), &queue_); - queue_.push_back(f); - - reset_queue_entry r(f, queue_); - threads::thread_state_ex_enum reason = threads::wait_unknown; - { - // yield this thread - util::unlock_guard > unlock(lock); - reason = this_thread::suspend(abs_time, description, ec); - if (ec) return threads::wait_unknown; - } - - return (f.id_ != threads::invalid_thread_id_repr) ? - threads::wait_timeout : reason; + return wait_until(std::move(lock), abs_time, + "condition_variable::wait_until", ec); } - template - threads::thread_state_ex_enum - wait_until(boost::unique_lock& lock, + threads::thread_state_ex_enum wait_until( + boost::unique_lock& lock, util::steady_time_point const& abs_time, error_code& ec = throws) { @@ -353,18 +172,33 @@ namespace hpx { namespace lcos { namespace local { namespace detail "condition_variable::wait_until", ec); } - template - threads::thread_state_ex_enum - wait_for(boost::unique_lock& lock, + threads::thread_state_ex_enum wait_for( + boost::unique_lock&& lock, + util::steady_duration const& rel_time, + char const* description, error_code& ec = throws) + { + return wait_until(std::move(lock), rel_time.from_now(), description, ec); + } + + threads::thread_state_ex_enum wait_for( + boost::unique_lock& lock, util::steady_duration const& rel_time, char const* description, error_code& ec = throws) { return wait_until(lock, rel_time.from_now(), description, ec); } - template - threads::thread_state_ex_enum - wait_for(boost::unique_lock& lock, + threads::thread_state_ex_enum wait_for( + boost::unique_lock&& lock, + util::steady_duration const& rel_time, + error_code& ec = throws) + { + return wait_until(std::move(lock), rel_time.from_now(), + "condition_variable::wait_for", ec); + } + + threads::thread_state_ex_enum wait_for( + boost::unique_lock& lock, util::steady_duration const& rel_time, error_code& ec = throws) { @@ -372,6 +206,14 @@ namespace hpx { namespace lcos { namespace local { namespace detail "condition_variable::wait_for", ec); } + private: + template + void abort_all(boost::unique_lock lock); + + // re-add the remaining items to the original queue + HPX_EXPORT void prepend_entries( + boost::unique_lock& lock, queue_type& queue); + private: queue_type queue_; }; diff --git a/hpx/lcos/local/detail/counting_semaphore.hpp b/hpx/lcos/local/detail/counting_semaphore.hpp index 9777a1081b20..aa6b4c0121da 100644 --- a/hpx/lcos/local/detail/counting_semaphore.hpp +++ b/hpx/lcos/local/detail/counting_semaphore.hpp @@ -28,13 +28,15 @@ namespace hpx { namespace lcos { namespace local { namespace detail { class counting_semaphore { + private: + typedef lcos::local::spinlock mutex_type; + public: counting_semaphore(boost::int64_t value = 0) : value_(value) {} - template - void wait(boost::unique_lock& l, boost::int64_t count) + void wait(boost::unique_lock& l, boost::int64_t count) { HPX_ASSERT_OWNS_LOCK(l); @@ -45,8 +47,7 @@ namespace hpx { namespace lcos { namespace local { namespace detail value_ -= count; } - template - bool try_wait(boost::unique_lock& l, boost::int64_t count = 1) + bool try_wait(boost::unique_lock& l, boost::int64_t count = 1) { HPX_ASSERT_OWNS_LOCK(l); @@ -59,12 +60,11 @@ namespace hpx { namespace lcos { namespace local { namespace detail return false; } - template - void signal(boost::unique_lock l, boost::int64_t count) + void signal(boost::unique_lock l, boost::int64_t count) { HPX_ASSERT_OWNS_LOCK(l); - Mutex* mtx = l.mutex(); + mutex_type* mtx = l.mutex(); // release no more threads than we get resources value_ += count; @@ -75,12 +75,11 @@ namespace hpx { namespace lcos { namespace local { namespace detail if (!cond_.notify_one(std::move(l))) break; - l = boost::unique_lock(*mtx); + l = boost::unique_lock(*mtx); } } - template - boost::int64_t signal_all(boost::unique_lock l) + boost::int64_t signal_all(boost::unique_lock l) { HPX_ASSERT_OWNS_LOCK(l); diff --git a/hpx/lcos/local/event.hpp b/hpx/lcos/local/event.hpp index 1293253cab48..12a90adee512 100644 --- a/hpx/lcos/local/event.hpp +++ b/hpx/lcos/local/event.hpp @@ -27,78 +27,72 @@ namespace hpx { namespace lcos { namespace local /// Event semaphores can be used for synchronizing multiple threads that /// need to wait for an event to occur. When the event occurs, all threads /// waiting for the event are woken up. - namespace detail + class event { - template - class event - { - private: - typedef Mutex mutex_type; + private: + typedef lcos::local::spinlock mutex_type; - public: - /// \brief Construct a new event semaphore - event() - : event_(false) - {} + public: + /// \brief Construct a new event semaphore + event() + : event_(false) + {} - /// \brief Check if the event has occurred. - bool occurred() - { - return event_.load(boost::memory_order_acquire); - } - - /// \brief Wait for the event to occur. - void wait() - { - if (event_.load(boost::memory_order_acquire)) - return; + /// \brief Check if the event has occurred. + bool occurred() + { + return event_.load(boost::memory_order_acquire); + } - boost::unique_lock l(mtx_); - wait_locked(l); - } + /// \brief Wait for the event to occur. + void wait() + { + if (event_.load(boost::memory_order_acquire)) + return; - /// \brief Release all threads waiting on this semaphore. - void set() - { - event_.store(true, boost::memory_order_release); + boost::unique_lock l(mtx_); + wait_locked(l); + } - boost::unique_lock l(mtx_); - set_locked(std::move(l)); - } + /// \brief Release all threads waiting on this semaphore. + void set() + { + event_.store(true, boost::memory_order_release); - /// \brief Reset the event - void reset() - { - event_.store(false, boost::memory_order_release); - } + boost::unique_lock l(mtx_); + set_locked(std::move(l)); + } - private: - void wait_locked(boost::unique_lock& l) - { - HPX_ASSERT(l.owns_lock()); + /// \brief Reset the event + void reset() + { + event_.store(false, boost::memory_order_release); + } - while (!event_.load(boost::memory_order_acquire)) - { - cond_.wait(l, "event::wait_locked"); - } - } + private: + void wait_locked(boost::unique_lock& l) + { + HPX_ASSERT(l.owns_lock()); - void set_locked(boost::unique_lock l) + while (!event_.load(boost::memory_order_acquire)) { - HPX_ASSERT(l.owns_lock()); - - // release the threads - cond_.notify_all(std::move(l)); + cond_.wait(l, "event::wait_locked"); } + } + + void set_locked(boost::unique_lock l) + { + HPX_ASSERT(l.owns_lock()); - mutex_type mtx_; ///< This mutex protects the queue. - local::detail::condition_variable cond_; + // release the threads + cond_.notify_all(std::move(l)); + } - boost::atomic event_; - }; - } + mutex_type mtx_; ///< This mutex protects the queue. + local::detail::condition_variable cond_; - typedef detail::event<> event; + boost::atomic event_; + }; }}} #if defined(HPX_MSVC) diff --git a/hpx/lcos/local/latch.hpp b/hpx/lcos/local/latch.hpp index bcc9b599c43c..21f75a5bd8c0 100644 --- a/hpx/lcos/local/latch.hpp +++ b/hpx/lcos/local/latch.hpp @@ -86,7 +86,7 @@ namespace hpx { namespace lcos { namespace local if (--counter_ == 0) cond_.notify_all(std::move(l)); // release the threads else - cond_.wait(l, "hpx::local::latch::count_down_and_wait"); + cond_.wait(std::move(l), "hpx::local::latch::count_down_and_wait"); } /// Decrements counter_ by n. Does not block. @@ -129,7 +129,7 @@ namespace hpx { namespace lcos { namespace local { boost::unique_lock l(mtx_); if (counter_ > 0) - cond_.wait(l, "hpx::local::latch::wait"); + cond_.wait(std::move(l), "hpx::local::latch::wait"); } void abort_all() diff --git a/hpx/lcos/local/shared_mutex.hpp b/hpx/lcos/local/shared_mutex.hpp index 6d2327c34258..e1c3077c116b 100644 --- a/hpx/lcos/local/shared_mutex.hpp +++ b/hpx/lcos/local/shared_mutex.hpp @@ -7,9 +7,9 @@ #if !defined(HPX_F0757EAC_E2A3_4F80_A1EC_8CC7EB55186F) #define HPX_F0757EAC_E2A3_4F80_A1EC_8CC7EB55186F +#include +#include #include -#include -#include #include @@ -33,21 +33,14 @@ namespace hpx { namespace lcos { namespace local state_data state; mutex_type state_change; - lcos::local::detail::counting_semaphore shared_cond; - lcos::local::detail::counting_semaphore exclusive_cond; - lcos::local::detail::counting_semaphore upgrade_cond; + lcos::local::condition_variable shared_cond; + lcos::local::condition_variable exclusive_cond; + lcos::local::condition_variable upgrade_cond; void release_waiters() { - no_mutex mtx; - { - boost::unique_lock l(mtx); - exclusive_cond.signal(std::move(l), 1); - } - { - boost::unique_lock l(mtx); - shared_cond.signal_all(std::move(l)); - } + exclusive_cond.notify_one(); + shared_cond.notify_all(); } public: @@ -63,7 +56,7 @@ namespace hpx { namespace lcos { namespace local while (state.exclusive || state.exclusive_waiting_blocked) { - shared_cond.wait(lk, 1); + shared_cond.wait(lk); } ++state.shared_count; @@ -96,9 +89,7 @@ namespace hpx { namespace lcos { namespace local state.upgrade = false; state.exclusive = true; - no_mutex mtx; - boost::unique_lock l(mtx); - upgrade_cond.signal(std::move(l), 1); + upgrade_cond.notify_one(); } else { @@ -116,7 +107,7 @@ namespace hpx { namespace lcos { namespace local while (state.shared_count || state.exclusive) { state.exclusive_waiting_blocked = true; - exclusive_cond.wait(lk, 1); + exclusive_cond.wait(lk); } state.exclusive = true; @@ -151,7 +142,7 @@ namespace hpx { namespace lcos { namespace local while (state.exclusive || state.exclusive_waiting_blocked || state.upgrade) { - shared_cond.wait(lk, 1); + shared_cond.wait(lk); } ++state.shared_count; @@ -193,7 +184,7 @@ namespace hpx { namespace lcos { namespace local while (state.shared_count) { - upgrade_cond.wait(lk, 1); + upgrade_cond.wait(lk); } state.upgrade = false; diff --git a/hpx/lcos/local/trigger.hpp b/hpx/lcos/local/trigger.hpp index e739ba9e6078..a7842c61728d 100644 --- a/hpx/lcos/local/trigger.hpp +++ b/hpx/lcos/local/trigger.hpp @@ -6,13 +6,14 @@ #if !defined(HPX_LCOS_LOCAL_TRIGGER_SEP_09_2012_1229PM) #define HPX_LCOS_LOCAL_TRIGGER_SEP_09_2012_1229PM -#include +#include #include #include #include #include #include #include +#include #include diff --git a/hpx/lcos/server/barrier.hpp b/hpx/lcos/server/barrier.hpp index a6c50e87d12b..1c76a7520013 100644 --- a/hpx/lcos/server/barrier.hpp +++ b/hpx/lcos/server/barrier.hpp @@ -71,7 +71,7 @@ namespace hpx { namespace lcos { namespace server { boost::unique_lock l(mtx_); if (cond_.size(l) < number_of_threads_-1) { - cond_.wait(l, "barrier::set_event"); + cond_.wait(std::move(l), "barrier::set_event"); } else { cond_.notify_all(std::move(l)); diff --git a/hpx/parallel/task_block.hpp b/hpx/parallel/task_block.hpp index f4744bff9461..66977308650a 100644 --- a/hpx/parallel/task_block.hpp +++ b/hpx/parallel/task_block.hpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include diff --git a/hpx/runtime/agas/server/primary_namespace.hpp b/hpx/runtime/agas/server/primary_namespace.hpp index b8106c00459c..f78139451238 100644 --- a/hpx/runtime/agas/server/primary_namespace.hpp +++ b/hpx/runtime/agas/server/primary_namespace.hpp @@ -139,14 +139,14 @@ struct HPX_EXPORT primary_namespace #if !defined(HPX_GCC_VERSION) || HPX_GCC_VERSION >= 408000 typedef std::map< naming::gid_type, - hpx::util::tuple + hpx::util::tuple > migration_table_type; #else typedef std::map< naming::gid_type, hpx::util::tuple< bool, std::size_t, - boost::shared_ptr + boost::shared_ptr > > migration_table_type; #endif diff --git a/hpx/runtime/components/server/runtime_support.hpp b/hpx/runtime/components/server/runtime_support.hpp index 885f8f35f61a..70d688e124a8 100644 --- a/hpx/runtime/components/server/runtime_support.hpp +++ b/hpx/runtime/components/server/runtime_support.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -425,7 +426,7 @@ namespace hpx { namespace components { namespace server typedef hpx::lcos::local::spinlock dijkstra_mtx_type; dijkstra_mtx_type dijkstra_mtx_; - lcos::local::condition_variable dijkstra_cond_; + lcos::local::condition_variable_any dijkstra_cond_; component_map_mutex_type cm_mtx_; plugin_map_mutex_type p_mtx_; diff --git a/hpx/runtime/components/server/wrapper_heap_list.hpp b/hpx/runtime/components/server/wrapper_heap_list.hpp index 5d157ba761eb..f1816a4fa798 100644 --- a/hpx/runtime/components/server/wrapper_heap_list.hpp +++ b/hpx/runtime/components/server/wrapper_heap_list.hpp @@ -11,6 +11,7 @@ #include #include #include +#include /////////////////////////////////////////////////////////////////////////////// namespace hpx { namespace components { namespace detail diff --git a/src/exception_list.cpp b/src/exception_list.cpp index cedd9f4792fc..613f0eae7b51 100644 --- a/src/exception_list.cpp +++ b/src/exception_list.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include diff --git a/src/lcos/local/detail/condition_variable.cpp b/src/lcos/local/detail/condition_variable.cpp new file mode 100644 index 000000000000..1497bfe35755 --- /dev/null +++ b/src/lcos/local/detail/condition_variable.cpp @@ -0,0 +1,289 @@ +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2013-2015 Agustin Berge +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace hpx { namespace lcos { namespace local { namespace detail +{ + /////////////////////////////////////////////////////////////////////////// + condition_variable::condition_variable() + {} + + condition_variable::~condition_variable() + { + if (!queue_.empty()) + { + LERR_(fatal) + << "~condition_variable: queue is not empty, " + "aborting threads"; + + local::no_mutex no_mtx; + boost::unique_lock lock(no_mtx); + abort_all(std::move(lock)); + } + } + + bool condition_variable::empty( + boost::unique_lock const& lock) const + { + HPX_ASSERT(lock.owns_lock()); + + return queue_.empty(); + } + + std::size_t condition_variable::size( + boost::unique_lock const& lock) const + { + HPX_ASSERT(lock.owns_lock()); + + return queue_.size(); + } + + // Return false if no more threads are waiting (returns true if queue + // is non-empty). + bool condition_variable::notify_one( + boost::unique_lock lock, error_code& ec) + { + HPX_ASSERT(lock.owns_lock()); + + if (!queue_.empty()) + { + threads::thread_id_repr_type id = queue_.front().id_; + + // remove item from queue before error handling + queue_.front().id_ = threads::invalid_thread_id_repr; + queue_.pop_front(); + + if (HPX_UNLIKELY(id == threads::invalid_thread_id_repr)) + { + lock.unlock(); + + HPX_THROWS_IF(ec, null_thread_id, + "condition_variable::notify_one", + "NULL thread id encountered"); + return false; + } + + bool not_empty = !queue_.empty(); + lock.unlock(); + + threads::set_thread_state(threads::thread_id_type( + reinterpret_cast(id)), + threads::pending, threads::wait_signaled, + threads::thread_priority_default, ec); + return not_empty; + } + + if (&ec != &throws) + ec = make_success_code(); + + return false; + } + + void condition_variable::notify_all( + boost::unique_lock lock, error_code& ec) + { + HPX_ASSERT(lock.owns_lock()); + + // swap the list + queue_type queue; + queue.swap(queue_); + + if (!queue.empty()) + { + // update reference to queue for all queue entries + for (queue_entry& qe : queue) + qe.q_ = &queue; + + do { + threads::thread_id_repr_type id = queue.front().id_; + + // remove item from queue before error handling + queue.front().id_ = threads::invalid_thread_id_repr; + queue.pop_front(); + + if (HPX_UNLIKELY(id == threads::invalid_thread_id_repr)) + { + prepend_entries(lock, queue); + lock.unlock(); + + HPX_THROWS_IF(ec, null_thread_id, + "condition_variable::notify_all", + "NULL thread id encountered"); + return; + } + + error_code local_ec; + threads::set_thread_state(threads::thread_id_type( + reinterpret_cast(id)), + threads::pending, threads::wait_signaled, + threads::thread_priority_default, local_ec); + + if (local_ec) + { + prepend_entries(lock, queue); + lock.unlock(); + + if (&ec != &throws) + { + ec = std::move(local_ec); + } + else + { + boost::rethrow_exception( + hpx::detail::access_exception(local_ec)); + } + return; + } + + } while (!queue.empty()); + } + + if (&ec != &throws) + ec = make_success_code(); + } + + void condition_variable::abort_all(boost::unique_lock lock) + { + HPX_ASSERT(lock.owns_lock()); + + abort_all(std::move(lock)); + } + + threads::thread_state_ex_enum condition_variable::wait( + boost::unique_lock&& lock, + char const* description, error_code& ec) + { + HPX_ASSERT(threads::get_self_ptr() != 0); + HPX_ASSERT(lock.owns_lock()); + + // enqueue the request and block this thread + queue_entry f(threads::get_self_id().get(), &queue_); + queue_.push_back(f); + + reset_queue_entry r(f, queue_); + threads::thread_state_ex_enum reason = threads::wait_unknown; + { + // yield this thread + lock.unlock(); + reason = this_thread::suspend(threads::suspended, description, ec); + if (ec) return threads::wait_unknown; + } + + return (f.id_ != threads::invalid_thread_id_repr) ? + threads::wait_timeout : reason; + } + + threads::thread_state_ex_enum condition_variable::wait_until( + boost::unique_lock&& lock, + util::steady_time_point const& abs_time, + char const* description, error_code& ec) + { + HPX_ASSERT(threads::get_self_ptr() != 0); + HPX_ASSERT(lock.owns_lock()); + + // enqueue the request and block this thread + queue_entry f(threads::get_self_id().get(), &queue_); + queue_.push_back(f); + + reset_queue_entry r(f, queue_); + threads::thread_state_ex_enum reason = threads::wait_unknown; + { + // yield this thread + lock.unlock(); + reason = this_thread::suspend(abs_time, description, ec); + if (ec) return threads::wait_unknown; + } + + return (f.id_ != threads::invalid_thread_id_repr) ? + threads::wait_timeout : reason; + } + + template + void condition_variable::abort_all(boost::unique_lock lock) + { + // new threads might have been added while we were notifying + while(!queue_.empty()) + { + // swap the list + queue_type queue; + queue.swap(queue_); + + // update reference to queue for all queue entries + for (queue_entry& qe : queue) + qe.q_ = &queue; + + while (!queue.empty()) + { + threads::thread_id_repr_type id = queue.front().id_; + + queue.front().id_ = threads::invalid_thread_id_repr; + queue.pop_front(); + + if (HPX_UNLIKELY(id == threads::invalid_thread_id_repr)) + { + LERR_(fatal) + << "condition_variable::abort_all:" + << " NULL thread id encountered"; + continue; + } + + // we know that the id is actually the pointer to the thread + threads::thread_id_type tid( + reinterpret_cast(id)); + + LERR_(fatal) + << "condition_variable::abort_all:" + << " pending thread: " + << get_thread_state_name( + threads::get_thread_state(tid)) + << "(" << tid << "): " + << threads::get_thread_description(tid); + + // unlock while notifying thread as this can suspend + util::unlock_guard > unlock(lock); + + // forcefully abort thread, do not throw + error_code ec(lightweight); + threads::set_thread_state(tid, + threads::pending, threads::wait_abort, + threads::thread_priority_default, ec); + if (ec) + { + LERR_(fatal) + << "condition_variable::abort_all:" + << " could not abort thread: " + << get_thread_state_name( + threads::get_thread_state(tid)) + << "(" << tid << "): " + << threads::get_thread_description(tid); + } + } + } + } + + // re-add the remaining items to the original queue + void condition_variable::prepend_entries( + boost::unique_lock& lock, queue_type& queue) + { + HPX_ASSERT(lock.owns_lock()); + + // splice is constant time only if it == end + queue.splice(queue.end(), queue_); + queue_.swap(queue); + } +}}}} diff --git a/src/performance_counters/server/statistics_counter.cpp b/src/performance_counters/server/statistics_counter.cpp index e89a62ebeaf6..5f5fb32e4f69 100644 --- a/src/performance_counters/server/statistics_counter.cpp +++ b/src/performance_counters/server/statistics_counter.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include diff --git a/src/runtime/agas/addressing_service.cpp b/src/runtime/agas/addressing_service.cpp index 6f1f74550c11..7d9ec2f86e01 100644 --- a/src/runtime/agas/addressing_service.cpp +++ b/src/runtime/agas/addressing_service.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include diff --git a/src/runtime/agas/server/primary_namespace_server.cpp b/src/runtime/agas/server/primary_namespace_server.cpp index d777ebdf728b..bad50b3e8cb4 100644 --- a/src/runtime/agas/server/primary_namespace_server.cpp +++ b/src/runtime/agas/server/primary_namespace_server.cpp @@ -395,7 +395,7 @@ response primary_namespace::begin_migration( id, hpx::util::make_tuple( false, 0, - boost::make_shared() + boost::make_shared() ) )); #endif diff --git a/src/runtime/agas/server/symbol_namespace_server.cpp b/src/runtime/agas/server/symbol_namespace_server.cpp index 6e7b38c66ac0..d736a6be9099 100644 --- a/src/runtime/agas/server/symbol_namespace_server.cpp +++ b/src/runtime/agas/server/symbol_namespace_server.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include diff --git a/src/runtime/components/console_logging.cpp b/src/runtime/components/console_logging.cpp index 8d96088030dd..7a9708bb381e 100644 --- a/src/runtime/components/console_logging.cpp +++ b/src/runtime/components/console_logging.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include diff --git a/src/runtime/parcelset/parcelhandler.cpp b/src/runtime/parcelset/parcelhandler.cpp index 067b3bac9930..934c20026c1b 100644 --- a/src/runtime/parcelset/parcelhandler.cpp +++ b/src/runtime/parcelset/parcelhandler.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include diff --git a/src/runtime/threads/thread.cpp b/src/runtime/threads/thread.cpp index a7b74df5a243..3736170b8576 100644 --- a/src/runtime/threads/thread.cpp +++ b/src/runtime/threads/thread.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include diff --git a/src/runtime/threads/thread_data.cpp b/src/runtime/threads/thread_data.cpp index 720e040b202e..a7388746fb46 100644 --- a/src/runtime/threads/thread_data.cpp +++ b/src/runtime/threads/thread_data.cpp @@ -11,6 +11,7 @@ #include #include #include +#include // #if HPX_DEBUG // # define HPX_DEBUG_THREAD_POOL 1 diff --git a/src/runtime/threads/threadmanager.cpp b/src/runtime/threads/threadmanager.cpp index 7894085eb591..7596f1ebb229 100644 --- a/src/runtime/threads/threadmanager.cpp +++ b/src/runtime/threads/threadmanager.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include diff --git a/src/util/query_counters.cpp b/src/util/query_counters.cpp index 1386334ced89..39e7030f2c39 100644 --- a/src/util/query_counters.cpp +++ b/src/util/query_counters.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include diff --git a/tests/regressions/lcos/ignore_while_locked_1485.cpp b/tests/regressions/lcos/ignore_while_locked_1485.cpp index a73be2a5a37d..0a2f42c932cd 100644 --- a/tests/regressions/lcos/ignore_while_locked_1485.cpp +++ b/tests/regressions/lcos/ignore_while_locked_1485.cpp @@ -14,14 +14,14 @@ struct wait_for_flag { hpx::lcos::local::spinlock mutex; - hpx::lcos::local::condition_variable cond_var; + hpx::lcos::local::condition_variable_any cond_var; wait_for_flag() : flag(false), woken(0) {} void wait(hpx::lcos::local::spinlock& local_mutex, - hpx::lcos::local::condition_variable& local_cond_var, bool& running) + hpx::lcos::local::condition_variable_any& local_cond_var, bool& running) { bool first = true; while (!flag) @@ -54,7 +54,7 @@ void test_condition_with_mutex() bool running = false; hpx::lcos::local::spinlock local_mutex; - hpx::lcos::local::condition_variable local_cond_var; + hpx::lcos::local::condition_variable_any local_cond_var; hpx::thread thread(&wait_for_flag::wait, boost::ref(data), boost::ref(local_mutex), boost::ref(local_cond_var), boost::ref(running)); diff --git a/tests/unit/lcos/apply_local.cpp b/tests/unit/lcos/apply_local.cpp index 2676df06d0f0..67598e749379 100644 --- a/tests/unit/lcos/apply_local.cpp +++ b/tests/unit/lcos/apply_local.cpp @@ -11,7 +11,7 @@ /////////////////////////////////////////////////////////////////////////////// boost::atomic accumulator; -hpx::lcos::local::condition_variable result_cv; +hpx::lcos::local::condition_variable_any result_cv; void increment(boost::int32_t i) { diff --git a/tests/unit/lcos/apply_local_executor.cpp b/tests/unit/lcos/apply_local_executor.cpp index 2b24a52a8115..92724052b666 100644 --- a/tests/unit/lcos/apply_local_executor.cpp +++ b/tests/unit/lcos/apply_local_executor.cpp @@ -12,7 +12,7 @@ /////////////////////////////////////////////////////////////////////////////// boost::atomic accumulator; -hpx::lcos::local::condition_variable result_cv; +hpx::lcos::local::condition_variable_any result_cv; void increment(boost::int32_t i) { diff --git a/tests/unit/lcos/condition_variable.cpp b/tests/unit/lcos/condition_variable.cpp index d24f22e5da47..1c3135f33078 100644 --- a/tests/unit/lcos/condition_variable.cpp +++ b/tests/unit/lcos/condition_variable.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include @@ -21,13 +21,13 @@ namespace { - hpx::lcos::local::spinlock multiple_wake_mutex; + hpx::lcos::local::mutex multiple_wake_mutex; hpx::lcos::local::condition_variable multiple_wake_cond; unsigned multiple_wake_count=0; void wait_for_condvar_and_increase_count() { - boost::unique_lock lk(multiple_wake_mutex); + boost::unique_lock lk(multiple_wake_mutex); multiple_wake_cond.wait(lk); ++multiple_wake_count; } @@ -43,7 +43,7 @@ namespace /////////////////////////////////////////////////////////////////////////////// struct wait_for_flag { - hpx::lcos::local::spinlock mutex; + hpx::lcos::local::mutex mutex; hpx::lcos::local::condition_variable cond_var; bool flag; unsigned woken; @@ -71,7 +71,7 @@ struct wait_for_flag void wait_without_predicate() { - boost::unique_lock lock(mutex); + boost::unique_lock lock(mutex); while(!flag) { cond_var.wait(lock); @@ -81,7 +81,7 @@ struct wait_for_flag void wait_with_predicate() { - boost::unique_lock lock(mutex); + boost::unique_lock lock(mutex); cond_var.wait(lock,check_flag(flag)); if(flag) { @@ -94,7 +94,7 @@ struct wait_for_flag boost::chrono::system_clock::time_point const timeout = boost::chrono::system_clock::now() + boost::chrono::milliseconds(5); - boost::unique_lock lock(mutex); + boost::unique_lock lock(mutex); while(!flag) { if(cond_var.wait_until(lock,timeout) == hpx::lcos::local::cv_status::timeout) @@ -110,7 +110,7 @@ struct wait_for_flag boost::chrono::system_clock::time_point const timeout = boost::chrono::system_clock::now() + boost::chrono::milliseconds(5); - boost::unique_lock lock(mutex); + boost::unique_lock lock(mutex); if(cond_var.wait_until(lock,timeout,check_flag(flag)) && flag) { ++woken; @@ -118,7 +118,7 @@ struct wait_for_flag } void relative_wait_until_with_predicate() { - boost::unique_lock lock(mutex); + boost::unique_lock lock(mutex); if(cond_var.wait_for(lock,boost::chrono::milliseconds(5), check_flag(flag)) && flag) { @@ -134,7 +134,7 @@ void test_condition_notify_one_wakes_from_wait() hpx::thread thread(&wait_for_flag::wait_without_predicate, boost::ref(data)); { - boost::unique_lock lock(data.mutex); + boost::unique_lock lock(data.mutex); data.flag=true; } @@ -151,7 +151,7 @@ void test_condition_notify_one_wakes_from_wait_with_predicate() hpx::thread thread(&wait_for_flag::wait_with_predicate, boost::ref(data)); { - boost::unique_lock lock(data.mutex); + boost::unique_lock lock(data.mutex); data.flag=true; } @@ -168,7 +168,7 @@ void test_condition_notify_one_wakes_from_wait_until() hpx::thread thread(&wait_for_flag::wait_until_without_predicate, boost::ref(data)); { - boost::unique_lock lock(data.mutex); + boost::unique_lock lock(data.mutex); data.flag=true; } @@ -185,7 +185,7 @@ void test_condition_notify_one_wakes_from_wait_until_with_predicate() hpx::thread thread(&wait_for_flag::wait_until_with_predicate, boost::ref(data)); { - boost::unique_lock lock(data.mutex); + boost::unique_lock lock(data.mutex); data.flag=true; } @@ -203,7 +203,7 @@ void test_condition_notify_one_wakes_from_relative_wait_until_with_predicate() boost::ref(data)); { - boost::unique_lock lock(data.mutex); + boost::unique_lock lock(data.mutex); data.flag=true; } @@ -231,7 +231,7 @@ void test_multiple_notify_one_calls_wakes_multiple_threads() hpx::this_thread::sleep_for(boost::chrono::milliseconds(200)); { - boost::unique_lock lk(multiple_wake_mutex); + boost::unique_lock lk(multiple_wake_mutex); HPX_TEST(multiple_wake_count==3); } @@ -257,7 +257,7 @@ void test_condition_notify_all_wakes_from_wait() } { - boost::unique_lock lock(data.mutex); + boost::unique_lock lock(data.mutex); data.flag=true; } @@ -288,7 +288,7 @@ void test_condition_notify_all_wakes_from_wait_with_predicate() } { - boost::unique_lock lock(data.mutex); + boost::unique_lock lock(data.mutex); data.flag=true; } @@ -319,7 +319,7 @@ void test_condition_notify_all_wakes_from_wait_until() } { - boost::unique_lock lock(data.mutex); + boost::unique_lock lock(data.mutex); data.flag=true; } @@ -350,7 +350,7 @@ void test_condition_notify_all_wakes_from_wait_until_with_predicate() } { - boost::unique_lock lock(data.mutex); + boost::unique_lock lock(data.mutex); data.flag=true; } @@ -381,7 +381,7 @@ void test_condition_notify_all_wakes_from_relative_wait_until_with_predicate() } { - boost::unique_lock lock(data.mutex); + boost::unique_lock lock(data.mutex); data.flag=true; } @@ -415,7 +415,7 @@ void test_notify_all_following_notify_one_wakes_all_threads() hpx::this_thread::sleep_for(boost::chrono::milliseconds(200)); { - boost::unique_lock lk(multiple_wake_mutex); + boost::unique_lock lk(multiple_wake_mutex); HPX_TEST(multiple_wake_count==3); } @@ -429,7 +429,7 @@ struct condition_test_data { condition_test_data() : notified(0), awoken(0) { } - hpx::lcos::local::spinlock mutex; + hpx::lcos::local::mutex mutex; hpx::lcos::local::condition_variable condition; int notified; int awoken; @@ -437,7 +437,7 @@ struct condition_test_data void condition_test_thread(condition_test_data* data) { - boost::unique_lock lock(data->mutex); + boost::unique_lock lock(data->mutex); HPX_TEST(lock ? true : false); while (!(data->notified > 0)) data->condition.wait(lock); @@ -460,7 +460,7 @@ struct cond_predicate void condition_test_waits(condition_test_data* data) { - boost::unique_lock lock(data->mutex); + boost::unique_lock lock(data->mutex); HPX_TEST(lock ? true : false); // Test wait. @@ -512,7 +512,7 @@ void condition_test_waits(condition_test_data* data) void test_condition_waits() { - typedef boost::unique_lock unique_lock; + typedef boost::unique_lock unique_lock; condition_test_data data; @@ -601,9 +601,9 @@ boost::chrono::milliseconds const timeout_resolution(100); void test_wait_until_times_out() { hpx::lcos::local::condition_variable cond; - hpx::lcos::local::spinlock m; + hpx::lcos::local::mutex m; - boost::unique_lock lock(m); + boost::unique_lock lock(m); boost::chrono::system_clock::time_point const start = boost::chrono::system_clock::now(); boost::chrono::system_clock::time_point const timeout = start + delay; @@ -618,9 +618,9 @@ void test_wait_until_times_out() void test_wait_until_with_predicate_times_out() { hpx::lcos::local::condition_variable cond; - hpx::lcos::local::spinlock m; + hpx::lcos::local::mutex m; - boost::unique_lock lock(m); + boost::unique_lock lock(m); boost::chrono::system_clock::time_point const start = boost::chrono::system_clock::now(); boost::chrono::system_clock::time_point const timeout = start + delay; @@ -636,9 +636,9 @@ void test_wait_until_with_predicate_times_out() void test_relative_wait_until_with_predicate_times_out() { hpx::lcos::local::condition_variable cond; - hpx::lcos::local::spinlock m; + hpx::lcos::local::mutex m; - boost::unique_lock lock(m); + boost::unique_lock lock(m); boost::chrono::system_clock::time_point const start = boost::chrono::system_clock::now(); @@ -653,9 +653,9 @@ void test_relative_wait_until_with_predicate_times_out() void test_wait_until_relative_times_out() { hpx::lcos::local::condition_variable cond; - hpx::lcos::local::spinlock m; + hpx::lcos::local::mutex m; - boost::unique_lock lock(m); + boost::unique_lock lock(m); boost::chrono::system_clock::time_point const start = boost::chrono::system_clock::now(); diff --git a/tests/unit/lcos/local_mutex.cpp b/tests/unit/lcos/local_mutex.cpp index c53503b25fe4..3cf38bbf4df9 100644 --- a/tests/unit/lcos/local_mutex.cpp +++ b/tests/unit/lcos/local_mutex.cpp @@ -29,7 +29,7 @@ struct test_lock void operator()() { mutex_type mutex; - hpx::lcos::local::condition_variable condition; + hpx::lcos::local::condition_variable_any condition; // Test the lock's constructors. { @@ -67,7 +67,7 @@ struct test_trylock void operator()() { mutex_type mutex; - hpx::lcos::local::condition_variable condition; + hpx::lcos::local::condition_variable_any condition; // Test the lock's constructors. { @@ -113,7 +113,7 @@ struct test_lock_times_out_if_other_thread_has_lock hpx::lcos::local::mutex done_mutex; bool done; bool locked; - hpx::lcos::local::condition_variable done_cond; + hpx::lcos::local::condition_variable_any done_cond; test_lock_times_out_if_other_thread_has_lock(): done(false),locked(false) @@ -201,7 +201,7 @@ struct test_timedlock test_lock_times_out_if_other_thread_has_lock()(); mutex_type mutex; - hpx::lcos::local::condition_variable condition; + hpx::lcos::local::condition_variable_any condition; // Test the lock's constructors. {