Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding non-blocking (on destruction) service executors #2992

Merged
merged 1 commit into from Nov 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion hpx/runtime/threads/executors/service_executors.hpp
Expand Up @@ -68,6 +68,9 @@ namespace hpx { namespace threads { namespace executors
void add_no_count(closure_type&& f);
void thread_wrapper(closure_type&& f);

// detaches this object from the underlying thread pool object
void detach();

protected:
// Return the requested policy element
std::size_t get_policy_element(
Expand All @@ -79,6 +82,7 @@ namespace hpx { namespace threads { namespace executors
mutex_type mtx_;
hpx::util::atomic_count task_count_;
compat::condition_variable shutdown_cv_;
bool blocking_;
};
}

Expand Down Expand Up @@ -179,4 +183,4 @@ namespace hpx { namespace threads { namespace executors

#include <hpx/config/warnings_suffix.hpp>

#endif /*HPX_RUNTIME_THREADS_EXECUTORS_SERVICE_EXECUTOR_HPP*/
#endif /* HPX_RUNTIME_THREADS_EXECUTORS_SERVICE_EXECUTOR_HPP */
6 changes: 4 additions & 2 deletions hpx/runtime/threads/run_as_os_thread.hpp
@@ -1,4 +1,4 @@
// Copyright (c) 2016 Hartmut Kaiser
// Copyright (c) 2016-2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Expand Down Expand Up @@ -27,8 +27,10 @@ namespace hpx { namespace threads
HPX_ASSERT(get_self_ptr() != nullptr);

parallel::execution::io_pool_executor scheduler;
return parallel::execution::async_execute(scheduler,
auto result = parallel::execution::async_execute(scheduler,
std::forward<F>(f), std::forward<Ts>(vs)...);
scheduler.detach();
return result;
}
}}

Expand Down
10 changes: 10 additions & 0 deletions hpx/runtime/threads/thread_executor.hpp
Expand Up @@ -196,6 +196,11 @@ namespace hpx { namespace threads
return create_id(reinterpret_cast<std::size_t>(this));
}

virtual void detach()
{
// by default, do nothing
}

protected:
static executor_id create_id(std::size_t id)
{
Expand Down Expand Up @@ -429,6 +434,11 @@ namespace hpx { namespace threads
stacksize, ec);
}

void detach()
{
executor_data_->detach();
}

/// Return a reference to the default executor for this process.
static scheduled_executor& default_executor();
};
Expand Down
50 changes: 32 additions & 18 deletions src/runtime/threads/executors/service_executor.cpp
Expand Up @@ -34,7 +34,8 @@ namespace hpx { namespace threads { namespace executors { namespace detail
service_executor::service_executor(
char const* pool_name, char const* pool_name_suffix)
: pool_(get_thread_pool(pool_name, pool_name_suffix)),
task_count_(0)
task_count_(0),
blocking_(true)
{
if (!pool_) {
HPX_THROW_EXCEPTION(bad_parameter,
Expand All @@ -46,18 +47,27 @@ namespace hpx { namespace threads { namespace executors { namespace detail
service_executor::~service_executor()
{
std::unique_lock<mutex_type> l(mtx_);
while (task_count_ > 0)
if (blocking_)
{
// We need to cancel the wait process here, since we might block
// other running HPX threads.
shutdown_cv_.wait_for(l, std::chrono::seconds(1));
if (hpx::threads::get_self_ptr())
while (task_count_ > 0)
{
hpx::this_thread::suspend();
// We need to cancel the wait process here, since we might block
// other running HPX threads.
shutdown_cv_.wait_for(l, std::chrono::seconds(1));
if (hpx::threads::get_self_ptr())
{
hpx::this_thread::suspend();
}
}
}
}

void service_executor::detach()
{
std::unique_lock<mutex_type> l(mtx_);
blocking_ = false;
}

void service_executor::thread_wrapper(closure_type&& f) //-V669
{
f(); // execute the actual thread function
Expand All @@ -79,15 +89,17 @@ namespace hpx { namespace threads { namespace executors { namespace detail
typedef void result_type;

thread_wrapper_helper(
service_executor* exec
, service_executor::closure_type&& f
) : exec_(exec)
service_executor* exec, service_executor::closure_type&& f)
: exec_(exec)
, f_(std::move(f))
{}
{
intrusive_ptr_add_ref(exec);
}

result_type invoke()
{
exec_->thread_wrapper(std::move(f_));
intrusive_ptr_release(exec_);
}

service_executor* exec_;
Expand Down Expand Up @@ -128,19 +140,21 @@ namespace hpx { namespace threads { namespace executors { namespace detail
{
typedef void result_type;

delayed_add_helper(
service_executor* exec
, service_executor::closure_type&& f
, boost::asio::io_service& io_service
, util::steady_clock::time_point const& abs_time
) : exec_(exec)
delayed_add_helper(service_executor* exec,
service_executor::closure_type&& f,
boost::asio::io_service& io_service,
util::steady_clock::time_point const& abs_time)
: exec_(exec)
, f_(std::move(f))
, timer_(io_service, abs_time)
{}
{
intrusive_ptr_add_ref(exec);
}

result_type invoke()
{
exec_->add_no_count(std::move(f_));
intrusive_ptr_release(exec_);
}

service_executor* exec_;
Expand Down
1 change: 1 addition & 0 deletions tests/regressions/threads/CMakeLists.txt
Expand Up @@ -7,6 +7,7 @@
set(tests
block_os_threads_1036
resume_priority
run_as_os_thread_lockup_2991
thread_data_1111
thread_pool_executor_1112
thread_rescheduling
Expand Down
43 changes: 43 additions & 0 deletions tests/regressions/threads/run_as_os_thread_lockup_2991.cpp
@@ -0,0 +1,43 @@
// Copyright (c) 2017 Maciej Brodowicz
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_init.hpp>
#include <hpx/hpx.hpp>
#include <hpx/runtime/threads/run_as_os_thread.hpp>

#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void locker()
{
std::cout << std::this_thread::get_id() << ": about to lock mutex\n";
std::lock_guard<std::mutex> lock(mtx);
std::cout << std::this_thread::get_id() << ": mutex locked\n";
}

int hpx_main()
{
{
std::cout << std::this_thread::get_id() << ": about to lock mutex\n";
std::lock_guard<std::mutex> lock(mtx);
std::cout << std::this_thread::get_id() << ": mutex locked\n";

std::cout << std::this_thread::get_id() << ": about to run on io thread\n";
hpx::threads::run_as_os_thread(locker);
//sleep(2);
}
std::cout << std::this_thread::get_id() << ": exiting\n";

return hpx::finalize();
}

int main(int argc, char **argv)
{
return hpx::init(argc, argv);
}