Skip to content

Commit

Permalink
Adding non-blocking (on destruction) service executors/future_then_as…
Browse files Browse the repository at this point in the history
…ync_executor.cpp

- implement run_as_os_thread in terms of non-blocking io_pool executor
  • Loading branch information
hkaiser committed Nov 6, 2017
1 parent f7b4663 commit 0c40763
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 33 deletions.
68 changes: 66 additions & 2 deletions hpx/parallel/executors/service_executors.hpp
Expand Up @@ -32,34 +32,98 @@ namespace hpx { namespace parallel { namespace execution
/// one of the OS-threads dedicated for the IO thread pool. The
/// tasks will not run as HPX-threads.
///
/// \note The destructor of this executor will block waiting for all
/// scheduled tasks to finish running.
///
using io_pool_executor = threads::executors::io_pool_executor;

/// A \a io_pool_executor exposes the predefined HPX parcel thread pool
/// A \a io_pool_executor_non_blocking exposes the predefined HPX IO thread
/// pool through an executor interface.
///
/// \note All tasks executed by one of these executors will run on
/// one of the OS-threads dedicated for the IO thread pool. The
/// tasks will not run as HPX-threads.
///
/// \note The destructor of this executor will not block waiting for all
/// scheduled tasks to finish running.
///
using io_pool_executor_non_blocking =
threads::executors::io_pool_executor_non_blocking;

/// A \a parcel_pool_executor exposes the predefined HPX parcel thread pool
/// through an executor interface.
///
/// \note All tasks executed by one of these executors will run on
/// one of the OS-threads dedicated for the parcel thread pool. The
/// tasks will not run as HPX-threads.
///
/// \note The destructor of this executor will block waiting for all
/// scheduled tasks to finish running.
///
using parcel_pool_executor = threads::executors::parcel_pool_executor;

/// A \a parcel_pool_executor_non_blocking exposes the predefined HPX
/// parcel thread pool through an executor interface.
///
/// \note All tasks executed by one of these executors will run on
/// one of the OS-threads dedicated for the parcel thread pool. The
/// tasks will not run as HPX-threads.
///
/// \note The destructor of this executor will not block waiting for all
/// scheduled tasks to finish running.
///
using parcel_pool_executor_non_blocking =
threads::executors::parcel_pool_executor_non_blocking;

/// A \a io_pool_executor exposes the predefined HPX timer thread pool
/// through an executor interface.
///
/// \note All tasks executed by one of these executors will run on
/// one of the OS-threads dedicated for the timer thread pool. The
/// tasks will not run as HPX-threads.
///
/// \note The destructor of this executor will block waiting for all
/// scheduled tasks to finish running.
///
using timer_pool_executor = threads::executors::timer_pool_executor;

/// A \a io_pool_executor exposes the predefined HPX main thread pool
/// A \a timer_pool_executor_non_blocking exposes the predefined HPX timer
/// thread pool through an executor interface.
///
/// \note All tasks executed by one of these executors will run on
/// one of the OS-threads dedicated for the timer thread pool. The
/// tasks will not run as HPX-threads.
///
/// \note The destructor of this executor will not block waiting for all
/// scheduled tasks to finish running.
///
using timer_pool_executor_non_blocking =
threads::executors::timer_pool_executor_non_blocking;

/// A \a main_pool_executor exposes the predefined HPX main thread pool
/// through an executor interface.
///
/// \note All tasks executed by one of these executors will run on
/// one of the OS-threads dedicated for the main thread pool. The
/// tasks will not run as HPX-threads.
///
/// \note The destructor of this executor will block waiting for all
/// scheduled tasks to finish running.
///
using main_pool_executor = threads::executors::main_pool_executor;

/// A \a main_pool_executor_non_blocking exposes the predefined HPX main
/// thread pool through an executor interface.
///
/// \note All tasks executed by one of these executors will run on
/// one of the OS-threads dedicated for the main thread pool. The
/// tasks will not run as HPX-threads.
///
/// \note The destructor of this executor will not block waiting for all
/// scheduled tasks to finish running.
///
using main_pool_executor_non_blocking =
threads::executors::main_pool_executor_non_blocking;
}}}

#if defined(HPX_HAVE_EXECUTOR_COMPATIBILITY)
Expand Down
59 changes: 49 additions & 10 deletions hpx/runtime/threads/executors/service_executors.hpp
Expand Up @@ -34,7 +34,7 @@ namespace hpx { namespace threads { namespace executors
{
public:
service_executor(char const* pool_name,
char const* pool_name_suffix = "");
char const* pool_name_suffix = "", bool blocking = true);
~service_executor();

// Schedule the specified function for execution in this executor.
Expand Down Expand Up @@ -79,6 +79,7 @@ namespace hpx { namespace threads { namespace executors
mutex_type mtx_;
hpx::util::atomic_count task_count_;
compat::condition_variable shutdown_cv_;
bool blocking_;
};
}

Expand All @@ -104,24 +105,28 @@ namespace hpx { namespace threads { namespace executors
/// \cond NOINTERNAL
inline threads::detail::scheduled_executor_base*
get_service_executor(service_executor_type t,
char const* name_suffix = "")
char const* name_suffix = "", bool blocking = true)
{
switch(t)
{
case service_executor_type::io_thread_pool:
return new detail::service_executor("io-pool");
return new detail::service_executor(
"io-pool", name_suffix, blocking);

case service_executor_type::parcel_thread_pool:
{
char const* suffix = *name_suffix ? name_suffix : "-tcp";
return new detail::service_executor("parcel-pool", suffix);
return new detail::service_executor(
"parcel-pool", suffix, blocking);
}

case service_executor_type::timer_thread_pool:
return new detail::service_executor("timer-pool");
return new detail::service_executor(
"timer-pool", name_suffix, blocking);

case service_executor_type::main_thread:
return new detail::service_executor("main-pool");
return new detail::service_executor(
"main-pool", name_suffix, blocking);

default:
break;
Expand All @@ -138,9 +143,10 @@ namespace hpx { namespace threads { namespace executors
///////////////////////////////////////////////////////////////////////////
struct service_executor : public scheduled_executor
{
service_executor(service_executor_type t,
char const* name_suffix = "")
: scheduled_executor(detail::get_service_executor(t, name_suffix))
service_executor(service_executor_type t, char const* name_suffix = "",
bool blocking = true)
: scheduled_executor(
detail::get_service_executor(t, name_suffix, blocking))
{}
};

Expand Down Expand Up @@ -175,8 +181,41 @@ namespace hpx { namespace threads { namespace executors
service_executor_type::main_thread))
{}
};

// same as above, just not blocking during destruction
struct io_pool_executor_non_blocking : public scheduled_executor
{
io_pool_executor_non_blocking()
: scheduled_executor(detail::get_service_executor(
service_executor_type::io_thread_pool, "", false))
{}
};

struct parcel_pool_executor_non_blocking : public scheduled_executor
{
parcel_pool_executor_non_blocking(char const* name_suffix = "-tcp")
: scheduled_executor(detail::get_service_executor(
service_executor_type::parcel_thread_pool, name_suffix, false))
{}
};

struct timer_pool_executor_non_blocking : public scheduled_executor
{
timer_pool_executor_non_blocking()
: scheduled_executor(detail::get_service_executor(
service_executor_type::timer_thread_pool, "", false))
{}
};

struct main_pool_executor_non_blocking : public scheduled_executor
{
main_pool_executor_non_blocking()
: scheduled_executor(detail::get_service_executor(
service_executor_type::main_thread, "", false))
{}
};
}}}

#include <hpx/config/warnings_suffix.hpp>

#endif /*HPX_RUNTIME_THREADS_EXECUTORS_SERVICE_EXECUTOR_HPP*/
#endif /* HPX_RUNTIME_THREADS_EXECUTORS_SERVICE_EXECUTOR_HPP */
4 changes: 2 additions & 2 deletions hpx/runtime/threads/run_as_os_thread.hpp
@@ -1,4 +1,4 @@
// Copyright (c) 2016 Hartmut Kaiser
// Copyright (c) 2016-2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Expand Down Expand Up @@ -26,7 +26,7 @@ namespace hpx { namespace threads
{
HPX_ASSERT(get_self_ptr() != nullptr);

parallel::execution::io_pool_executor scheduler;
parallel::execution::io_pool_executor_non_blocking scheduler;
return parallel::execution::async_execute(scheduler,
std::forward<F>(f), std::forward<Ts>(vs)...);
}
Expand Down
46 changes: 27 additions & 19 deletions src/runtime/threads/executors/service_executor.cpp
Expand Up @@ -32,9 +32,10 @@ namespace hpx { namespace threads { namespace executors { namespace detail
{
///////////////////////////////////////////////////////////////////////////
service_executor::service_executor(
char const* pool_name, char const* pool_name_suffix)
char const* pool_name, char const* pool_name_suffix, bool blocking)
: pool_(get_thread_pool(pool_name, pool_name_suffix)),
task_count_(0)
task_count_(0),
blocking_(blocking)
{
if (!pool_) {
HPX_THROW_EXCEPTION(bad_parameter,
Expand All @@ -46,14 +47,17 @@ namespace hpx { namespace threads { namespace executors { namespace detail
service_executor::~service_executor()
{
std::unique_lock<mutex_type> l(mtx_);
while (task_count_ > 0)
if (blocking_)
{
// We need to cancel the wait process here, since we might block
// other running HPX threads.
shutdown_cv_.wait_for(l, std::chrono::seconds(1));
if (hpx::threads::get_self_ptr())
while (task_count_ > 0)
{
hpx::this_thread::suspend();
// We need to cancel the wait process here, since we might block
// other running HPX threads.
shutdown_cv_.wait_for(l, std::chrono::seconds(1));
if (hpx::threads::get_self_ptr())
{
hpx::this_thread::suspend();
}
}
}
}
Expand All @@ -79,15 +83,17 @@ namespace hpx { namespace threads { namespace executors { namespace detail
typedef void result_type;

thread_wrapper_helper(
service_executor* exec
, service_executor::closure_type&& f
) : exec_(exec)
service_executor* exec, service_executor::closure_type&& f)
: exec_(exec)
, f_(std::move(f))
{}
{
intrusive_ptr_add_ref(exec);
}

result_type invoke()
{
exec_->thread_wrapper(std::move(f_));
intrusive_ptr_release(exec_);
}

service_executor* exec_;
Expand Down Expand Up @@ -128,19 +134,21 @@ namespace hpx { namespace threads { namespace executors { namespace detail
{
typedef void result_type;

delayed_add_helper(
service_executor* exec
, service_executor::closure_type&& f
, boost::asio::io_service& io_service
, util::steady_clock::time_point const& abs_time
) : exec_(exec)
delayed_add_helper(service_executor* exec,
service_executor::closure_type&& f,
boost::asio::io_service& io_service,
util::steady_clock::time_point const& abs_time)
: exec_(exec)
, f_(std::move(f))
, timer_(io_service, abs_time)
{}
{
intrusive_ptr_add_ref(exec);
}

result_type invoke()
{
exec_->add_no_count(std::move(f_));
intrusive_ptr_release(exec_);
}

service_executor* exec_;
Expand Down
1 change: 1 addition & 0 deletions tests/regressions/threads/CMakeLists.txt
Expand Up @@ -7,6 +7,7 @@
set(tests
block_os_threads_1036
resume_priority
run_as_os_thread_lockup_2991
thread_data_1111
thread_pool_executor_1112
thread_rescheduling
Expand Down
43 changes: 43 additions & 0 deletions tests/regressions/threads/run_as_os_thread_lockup_2991.cpp
@@ -0,0 +1,43 @@
// Copyright (c) 2017 Maciej Brodowicz
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_init.hpp>
#include <hpx/hpx.hpp>
#include <hpx/runtime/threads/run_as_os_thread.hpp>

#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void locker()
{
std::cout << std::this_thread::get_id() << ": about to lock mutex\n";
std::lock_guard<std::mutex> lock(mtx);
std::cout << std::this_thread::get_id() << ": mutex locked\n";
}

int hpx_main()
{
{
std::cout << std::this_thread::get_id() << ": about to lock mutex\n";
std::lock_guard<std::mutex> lock(mtx);
std::cout << std::this_thread::get_id() << ": mutex locked\n";

std::cout << std::this_thread::get_id() << ": about to run on io thread\n";
hpx::threads::run_as_os_thread(locker);
//sleep(2);
}
std::cout << std::this_thread::get_id() << ": exiting\n";

return hpx::finalize();
}

int main(int argc, char **argv)
{
return hpx::init(argc, argv);
}

0 comments on commit 0c40763

Please sign in to comment.