Skip to content

Commit

Permalink
Merge pull request #1669 from STEllAR-GROUP/this_thread_executors
Browse files Browse the repository at this point in the history
Add this_thread_executors
  • Loading branch information
hkaiser committed Jul 26, 2015
2 parents f03b321 + 68f2ede commit 19ee001
Show file tree
Hide file tree
Showing 28 changed files with 1,378 additions and 97 deletions.
6 changes: 5 additions & 1 deletion CMakeLists.txt
Expand Up @@ -372,7 +372,7 @@ hpx_option(HPX_WITH_SWAP_CONTEXT_EMULATION BOOL
# Scheduler configuration
################################################################################
hpx_option(HPX_WITH_THREAD_SCHEDULERS STRING
"Which thread schedulers are build. Options are: all, abp-priority, local, static-priority, hierarchy, and periodic-priority. For multiple enabled schedulers, separate with a semicolon (default: all)"
"Which thread schedulers are build. Options are: all, abp-priority, local, static-priority, static, hierarchy, and periodic-priority. For multiple enabled schedulers, separate with a semicolon (default: all)"
"all"
CATEGORY "Thread Manager" ADVANCED)

Expand All @@ -394,6 +394,10 @@ foreach(_scheduler ${HPX_WITH_THREAD_SCHEDULERS_UC})
hpx_add_config_define(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
set(HPX_HAVE_STATIC_PRIORITY_SCHEDULER ON CACHE INTERNAL "")
endif()
if(_scheduler STREQUAL "STATIC" OR _all)
hpx_add_config_define(HPX_HAVE_STATIC_SCHEDULER)
set(HPX_HAVE_STATIC_SCHEDULER ON CACHE INTERNAL "")
endif()
if(_scheduler STREQUAL "HIERARCHY" OR _all)
hpx_add_config_define(HPX_HAVE_HIERARCHY_SCHEDULER)
set(HPX_HAVE_HIERARCHY_SCHEDULER ON CACHE INTERNAL "")
Expand Down
12 changes: 11 additions & 1 deletion docs/manual/scheduling_policies.qbk
Expand Up @@ -40,7 +40,7 @@ This scheduler is enabled at build time by default and will be available always.

[heading Static Priority Scheduling Policy]

* invoke using: [hpx_cmdline `--hpx:queuing=static`] (or `-qs`)
* invoke using: [hpx_cmdline `--hpx:queuing=static-priority`] (or `-qs`)
* flag to turn on for build: `HPX_THREAD_SCHEDULERS=all` or
`HPX_THREAD_SCHEDULERS=static-priority`

Expand All @@ -57,6 +57,16 @@ robin fashion. There is no thread stealing in this policy.
The local scheduling policy maintains one queue per OS thread from which each
OS thread pulls its tasks (user threads).

[heading Static Scheduling Policy]

* invoke using: [hpx_cmdline `--hpx:queuing=static`]
* flag to turn on for build: `HPX_THREAD_SCHEDULERS=all` or
`HPX_THREAD_SCHEDULERS=static`

The static scheduling policy maintains one queue per OS thread from which each
OS thread pulls its tasks (user threads). Threads are distributed in a round
robin fashion. There is no thread stealing in this policy.

[heading Priority ABP Scheduling Policy]

* invoke using: [hpx_cmdline `--hpx:queuing=abp-priority`]
Expand Down
13 changes: 13 additions & 0 deletions hpx/hpx_fwd.hpp
Expand Up @@ -183,13 +183,15 @@ namespace hpx
struct lockfree_fifo;
struct lockfree_lifo;

// multi priority scheduler with work-stealing
template <typename Mutex = boost::mutex
, typename PendingQueuing = lockfree_fifo
, typename StagedQueuing = lockfree_fifo
, typename TerminatedQueuing = lockfree_lifo
>
class HPX_EXPORT local_priority_queue_scheduler;

// single priority scheduler with work-stealing
template <typename Mutex = boost::mutex
, typename PendingQueuing = lockfree_fifo
, typename StagedQueuing = lockfree_fifo
Expand All @@ -207,6 +209,7 @@ namespace hpx
#endif

#if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
// multi priority scheduler with no work-stealing
template <typename Mutex = boost::mutex
, typename PendingQueuing = lockfree_fifo
, typename StagedQueuing = lockfree_fifo
Expand All @@ -215,6 +218,16 @@ namespace hpx
class HPX_EXPORT static_priority_queue_scheduler;
#endif

#if defined(HPX_HAVE_STATIC_SCHEDULER)
// single priority scheduler with no work-stealing
template <typename Mutex = boost::mutex
, typename PendingQueuing = lockfree_fifo
, typename StagedQueuing = lockfree_fifo
, typename TerminatedQueuing = lockfree_lifo
>
class HPX_EXPORT static_queue_scheduler;
#endif

#if defined(HPX_HAVE_HIERARCHY_SCHEDULER)
template <typename Mutex = boost::mutex
, typename PendingQueuing = lockfree_fifo
Expand Down
2 changes: 2 additions & 0 deletions hpx/parallel/executors.hpp
Expand Up @@ -10,5 +10,7 @@
#include <hpx/parallel/executors/parallel_executor.hpp>
#include <hpx/parallel/executors/parallel_fork_executor.hpp>
#include <hpx/parallel/executors/sequential_executor.hpp>
#include <hpx/parallel/executors/thread_pool_executors.hpp>
#include <hpx/parallel/executors/this_thread_executors.hpp>

#endif
5 changes: 5 additions & 0 deletions hpx/parallel/executors/detail/thread_executor.hpp
Expand Up @@ -42,6 +42,11 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
return hpx::get_os_thread_count(exec_);
}

bool has_pending_closures() const
{
return exec_.num_pending_closures() != 0;
}

private:
threads::executor exec_;
};
Expand Down
36 changes: 36 additions & 0 deletions hpx/parallel/executors/executor_traits.hpp
Expand Up @@ -312,6 +312,29 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
return os_thread_count_helper::call(0, exec);
}

///////////////////////////////////////////////////////////////////////
struct has_pending_closures_helper
{
template <typename Executor>
static auto call(wrap_int, Executor& exec) -> bool
{
return false; // assume stateless scheduling
}

template <typename Executor>
static auto call(int, Executor& exec)
-> decltype(exec.has_pending_closures())
{
return exec.has_pending_closures();
}
};

template <typename Executor>
bool call_has_pending_closures(Executor& exec)
{
return has_pending_closures_helper::call(0, exec);
}
/// \endcond
}

Expand Down Expand Up @@ -520,6 +543,19 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
return detail::call_os_thread_count(exec);
}

/// Retrieve whether this executor has operations pending or not.
///
/// \param exec [in] The executor object to use for scheduling of the
/// function \a f.
///
/// \note If the executor does not expose this information, this call
/// will always return \a false
///
static bool has_pending_closures(executor_type& exec)
{
return detail::call_has_pending_closures(exec);
}
};

///////////////////////////////////////////////////////////////////////////
Expand Down
73 changes: 73 additions & 0 deletions hpx/parallel/executors/this_thread_executors.hpp
@@ -0,0 +1,73 @@
// Copyright (c) 2007-2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

/// \file parallel/executors/thread_pool_executors.hpp

#if !defined(HPX_PARALLEL_EXECUTORS_THIS_THREAD_EXECUTORS_JUL_16_2015_0809PM)
#define HPX_PARALLEL_EXECUTORS_THIS_THREAD_EXECUTORS_JUL_16_2015_0809PM

#include <hpx/config.hpp>
#include <hpx/parallel/config/inline_namespace.hpp>
#include <hpx/parallel/executors/executor_traits.hpp>
#include <hpx/parallel/executors/detail/thread_executor.hpp>
#include <hpx/runtime/threads/executors/this_thread_executors.hpp>
#include <hpx/util/move.hpp>

#include <type_traits>

namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
///////////////////////////////////////////////////////////////////////////
#if defined(HPX_HAVE_STATIC_SCHEDULER)
struct this_thread_static_queue_executor
#if !defined(DOXYGEN)
: detail::threads_executor
#endif
{
/// Creates a new local_queue_executor
///
explicit this_thread_static_queue_executor()
: threads_executor(
threads::executors::this_thread_static_queue_executor())
{}
};
#endif

#if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
struct this_thread_static_priority_queue_executor
#if !defined(DOXYGEN)
: detail::threads_executor
#endif
{
/// Creates a new static_priority_queue_executor
///
explicit this_thread_static_priority_queue_executor()
: threads_executor(
threads::executors::this_thread_static_priority_queue_executor())
{}
};
#endif

namespace detail
{
/// \cond NOINTERNAL
#if defined(HPX_HAVE_STATIC_SCHEDULER)
template <>
struct is_executor<this_thread_static_queue_executor>
: std::true_type
{};
#endif

#if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
template <>
struct is_executor<this_thread_static_priority_queue_executor>
: std::true_type
{};
#endif
/// \endcond
}
}}}

#endif
29 changes: 29 additions & 0 deletions hpx/parallel/executors/thread_pool_executors.hpp
Expand Up @@ -42,6 +42,28 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
};
#endif

#if defined(HPX_HAVE_STATIC_SCHEDULER)
struct static_queue_executor
#if !defined(DOXYGEN)
: detail::threads_executor
#endif
{
/// Creates a new static_queue_executor
///
/// \param max_punits [in] The maximum number of processing units to
/// associate with the newly created executor.
/// \param min_punits [in] The minimum number of processing units to
/// associate with the newly created executor
/// (default: 1).
///
explicit static_queue_executor(std::size_t max_punits,
std::size_t min_punits = 1)
: threads_executor(threads::executors::static_queue_executor(
max_punits, min_punits))
{}
};
#endif

///////////////////////////////////////////////////////////////////////////
struct local_priority_queue_executor
#if !defined(DOXYGEN)
Expand Down Expand Up @@ -95,6 +117,13 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{};
#endif

#if defined(HPX_HAVE_STATIC_SCHEDULER)
template <>
struct is_executor<static_queue_executor>
: std::true_type
{};
#endif

template <>
struct is_executor<local_priority_queue_executor>
: std::true_type
Expand Down
39 changes: 32 additions & 7 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -183,11 +183,26 @@ namespace hpx { namespace threads { namespace detail
#endif

///////////////////////////////////////////////////////////////////////////
struct scheduling_counters
{
scheduling_counters(boost::int64_t& executed_threads,
boost::int64_t& executed_thread_phases,
boost::uint64_t& tfunc_time, boost::uint64_t& exec_time)
: executed_threads_(executed_threads),
executed_thread_phases_(executed_thread_phases),
tfunc_time_(tfunc_time),
exec_time_(exec_time)
{}

boost::int64_t& executed_threads_;
boost::int64_t& executed_thread_phases_;
boost::uint64_t& tfunc_time_;
boost::uint64_t& exec_time_;
};

template <typename SchedulingPolicy>
void scheduling_loop(std::size_t num_thread, SchedulingPolicy& scheduler,
boost::atomic<hpx::state>& global_state, boost::int64_t& executed_threads,
boost::int64_t& executed_thread_phases, boost::uint64_t& tfunc_time,
boost::uint64_t& exec_time,
boost::atomic<hpx::state>& global_state, scheduling_counters& counters,
util::function_nonser<void()> const& cb_outer = util::function_nonser<void()>(),
util::function_nonser<void()> const& cb_inner = util::function_nonser<void()>())
{
Expand All @@ -199,7 +214,7 @@ namespace hpx { namespace threads { namespace detail
boost::int64_t idle_loop_count = 0;
boost::int64_t busy_loop_count = 0;

idle_collect_rate idle_rate(tfunc_time, exec_time);
idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_);
tfunc_time_wrapper tfunc_time_collector(idle_rate);

scheduler.SchedulingPolicy::start_periodic_maintenance(global_state);
Expand Down Expand Up @@ -255,7 +270,7 @@ namespace hpx { namespace threads { namespace detail
}

#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
++executed_thread_phases;
++counters.executed_thread_phases_;
#endif
}
else {
Expand Down Expand Up @@ -323,7 +338,7 @@ namespace hpx { namespace threads { namespace detail
if (state_val == depleted || state_val == terminated)
{
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
++executed_threads;
++counters.executed_threads_;
#endif
scheduler.SchedulingPolicy::destroy_thread(thrd, busy_loop_count);
}
Expand All @@ -339,7 +354,14 @@ namespace hpx { namespace threads { namespace detail
// clean up terminated threads one more time before existing
if (scheduler.SchedulingPolicy::cleanup_terminated(true))
{
// keep idling for some time
// if this is an inner scheduler, exit immediately
if (!cb_inner.empty())
{
global_state.store(state_stopped);
break;
}

// otherwise, keep idling for some time
if (!may_exit)
idle_loop_count = 0;
may_exit = true;
Expand Down Expand Up @@ -390,7 +412,10 @@ namespace hpx { namespace threads { namespace detail
if (may_exit)
{
if (scheduler.SchedulingPolicy::cleanup_terminated(true))
{
global_state.store(state_stopped);
break;
}
may_exit = false;
}
else
Expand Down

0 comments on commit 19ee001

Please sign in to comment.