diff --git a/hpx/runtime/threads/executors/service_executors.hpp b/hpx/runtime/threads/executors/service_executors.hpp index 2bb79ba7e39a..fab4d3171c4e 100644 --- a/hpx/runtime/threads/executors/service_executors.hpp +++ b/hpx/runtime/threads/executors/service_executors.hpp @@ -68,6 +68,9 @@ namespace hpx { namespace threads { namespace executors void add_no_count(closure_type&& f); void thread_wrapper(closure_type&& f); + // detaches this object from the underlying thread pool object + void detach(); + protected: // Return the requested policy element std::size_t get_policy_element( @@ -79,6 +82,7 @@ namespace hpx { namespace threads { namespace executors mutex_type mtx_; hpx::util::atomic_count task_count_; compat::condition_variable shutdown_cv_; + bool blocking_; }; } @@ -179,4 +183,4 @@ namespace hpx { namespace threads { namespace executors #include -#endif /*HPX_RUNTIME_THREADS_EXECUTORS_SERVICE_EXECUTOR_HPP*/ +#endif /* HPX_RUNTIME_THREADS_EXECUTORS_SERVICE_EXECUTOR_HPP */ diff --git a/hpx/runtime/threads/run_as_os_thread.hpp b/hpx/runtime/threads/run_as_os_thread.hpp index 6d1559591f9f..c5205217bda1 100644 --- a/hpx/runtime/threads/run_as_os_thread.hpp +++ b/hpx/runtime/threads/run_as_os_thread.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Hartmut Kaiser +// Copyright (c) 2016-2017 Hartmut Kaiser // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -27,8 +27,10 @@ namespace hpx { namespace threads HPX_ASSERT(get_self_ptr() != nullptr); parallel::execution::io_pool_executor scheduler; - return parallel::execution::async_execute(scheduler, + auto result = parallel::execution::async_execute(scheduler, std::forward(f), std::forward(vs)...); + scheduler.detach(); + return result; } }} diff --git a/hpx/runtime/threads/thread_executor.hpp b/hpx/runtime/threads/thread_executor.hpp index 8e60682608ee..b313372c4a8c 100644 --- a/hpx/runtime/threads/thread_executor.hpp +++ b/hpx/runtime/threads/thread_executor.hpp @@ -196,6 +196,11 @@ namespace hpx { namespace threads return create_id(reinterpret_cast(this)); } + virtual void detach() + { + // by default, do nothing + } + protected: static executor_id create_id(std::size_t id) { @@ -429,6 +434,11 @@ namespace hpx { namespace threads stacksize, ec); } + void detach() + { + executor_data_->detach(); + } + /// Return a reference to the default executor for this process. static scheduled_executor& default_executor(); }; diff --git a/src/runtime/threads/executors/service_executor.cpp b/src/runtime/threads/executors/service_executor.cpp index ab15d9b94b9d..9d271e344676 100644 --- a/src/runtime/threads/executors/service_executor.cpp +++ b/src/runtime/threads/executors/service_executor.cpp @@ -34,7 +34,8 @@ namespace hpx { namespace threads { namespace executors { namespace detail service_executor::service_executor( char const* pool_name, char const* pool_name_suffix) : pool_(get_thread_pool(pool_name, pool_name_suffix)), - task_count_(0) + task_count_(0), + blocking_(true) { if (!pool_) { HPX_THROW_EXCEPTION(bad_parameter, @@ -46,18 +47,27 @@ namespace hpx { namespace threads { namespace executors { namespace detail service_executor::~service_executor() { std::unique_lock l(mtx_); - while (task_count_ > 0) + if (blocking_) { - // We need to cancel the wait process here, since we might block - // other running HPX threads. - shutdown_cv_.wait_for(l, std::chrono::seconds(1)); - if (hpx::threads::get_self_ptr()) + while (task_count_ > 0) { - hpx::this_thread::suspend(); + // We need to cancel the wait process here, since we might block + // other running HPX threads. + shutdown_cv_.wait_for(l, std::chrono::seconds(1)); + if (hpx::threads::get_self_ptr()) + { + hpx::this_thread::suspend(); + } } } } + void service_executor::detach() + { + std::unique_lock l(mtx_); + blocking_ = false; + } + void service_executor::thread_wrapper(closure_type&& f) //-V669 { f(); // execute the actual thread function @@ -79,15 +89,17 @@ namespace hpx { namespace threads { namespace executors { namespace detail typedef void result_type; thread_wrapper_helper( - service_executor* exec - , service_executor::closure_type&& f - ) : exec_(exec) + service_executor* exec, service_executor::closure_type&& f) + : exec_(exec) , f_(std::move(f)) - {} + { + intrusive_ptr_add_ref(exec); + } result_type invoke() { exec_->thread_wrapper(std::move(f_)); + intrusive_ptr_release(exec_); } service_executor* exec_; @@ -128,19 +140,21 @@ namespace hpx { namespace threads { namespace executors { namespace detail { typedef void result_type; - delayed_add_helper( - service_executor* exec - , service_executor::closure_type&& f - , boost::asio::io_service& io_service - , util::steady_clock::time_point const& abs_time - ) : exec_(exec) + delayed_add_helper(service_executor* exec, + service_executor::closure_type&& f, + boost::asio::io_service& io_service, + util::steady_clock::time_point const& abs_time) + : exec_(exec) , f_(std::move(f)) , timer_(io_service, abs_time) - {} + { + intrusive_ptr_add_ref(exec); + } result_type invoke() { exec_->add_no_count(std::move(f_)); + intrusive_ptr_release(exec_); } service_executor* exec_; diff --git a/tests/regressions/threads/CMakeLists.txt b/tests/regressions/threads/CMakeLists.txt index 4e53b5ba8522..5487a4b18828 100644 --- a/tests/regressions/threads/CMakeLists.txt +++ b/tests/regressions/threads/CMakeLists.txt @@ -7,6 +7,7 @@ set(tests block_os_threads_1036 resume_priority + run_as_os_thread_lockup_2991 thread_data_1111 thread_pool_executor_1112 thread_rescheduling diff --git a/tests/regressions/threads/run_as_os_thread_lockup_2991.cpp b/tests/regressions/threads/run_as_os_thread_lockup_2991.cpp new file mode 100644 index 000000000000..7ed9f438eb6e --- /dev/null +++ b/tests/regressions/threads/run_as_os_thread_lockup_2991.cpp @@ -0,0 +1,43 @@ +// Copyright (c) 2017 Maciej Brodowicz +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include + +#include +#include +#include + +std::mutex mtx; + +void locker() +{ + std::cout << std::this_thread::get_id() << ": about to lock mutex\n"; + std::lock_guard lock(mtx); + std::cout << std::this_thread::get_id() << ": mutex locked\n"; +} + +int hpx_main() +{ + { + std::cout << std::this_thread::get_id() << ": about to lock mutex\n"; + std::lock_guard lock(mtx); + std::cout << std::this_thread::get_id() << ": mutex locked\n"; + + std::cout << std::this_thread::get_id() << ": about to run on io thread\n"; + hpx::threads::run_as_os_thread(locker); + //sleep(2); + } + std::cout << std::this_thread::get_id() << ": exiting\n"; + + return hpx::finalize(); +} + +int main(int argc, char **argv) +{ + return hpx::init(argc, argv); +} +