Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add this_thread_executors #1669

Merged
merged 4 commits into from Jul 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion CMakeLists.txt
Expand Up @@ -371,7 +371,7 @@ hpx_option(HPX_WITH_SWAP_CONTEXT_EMULATION BOOL
# Scheduler configuration
################################################################################
hpx_option(HPX_WITH_THREAD_SCHEDULERS STRING
"Which thread schedulers are build. Options are: all, abp-priority, local, static-priority, hierarchy, and periodic-priority. For multiple enabled schedulers, separate with a semicolon (default: all)"
"Which thread schedulers are build. Options are: all, abp-priority, local, static-priority, static, hierarchy, and periodic-priority. For multiple enabled schedulers, separate with a semicolon (default: all)"
"all"
CATEGORY "Thread Manager" ADVANCED)

Expand All @@ -393,6 +393,10 @@ foreach(_scheduler ${HPX_WITH_THREAD_SCHEDULERS_UC})
hpx_add_config_define(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
set(HPX_HAVE_STATIC_PRIORITY_SCHEDULER ON CACHE INTERNAL "")
endif()
if(_scheduler STREQUAL "STATIC" OR _all)
hpx_add_config_define(HPX_HAVE_STATIC_SCHEDULER)
set(HPX_HAVE_STATIC_SCHEDULER ON CACHE INTERNAL "")
endif()
if(_scheduler STREQUAL "HIERARCHY" OR _all)
hpx_add_config_define(HPX_HAVE_HIERARCHY_SCHEDULER)
set(HPX_HAVE_HIERARCHY_SCHEDULER ON CACHE INTERNAL "")
Expand Down
12 changes: 11 additions & 1 deletion docs/manual/scheduling_policies.qbk
Expand Up @@ -40,7 +40,7 @@ This scheduler is enabled at build time by default and will be available always.

[heading Static Priority Scheduling Policy]

* invoke using: [hpx_cmdline `--hpx:queuing=static`] (or `-qs`)
* invoke using: [hpx_cmdline `--hpx:queuing=static-priority`] (or `-qs`)
* flag to turn on for build: `HPX_THREAD_SCHEDULERS=all` or
`HPX_THREAD_SCHEDULERS=static-priority`

Expand All @@ -57,6 +57,16 @@ robin fashion. There is no thread stealing in this policy.
The local scheduling policy maintains one queue per OS thread from which each
OS thread pulls its tasks (user threads).

[heading Static Scheduling Policy]

* invoke using: [hpx_cmdline `--hpx:queuing=static`]
* flag to turn on for build: `HPX_THREAD_SCHEDULERS=all` or
`HPX_THREAD_SCHEDULERS=static`

The static scheduling policy maintains one queue per OS thread from which each
OS thread pulls its tasks (user threads). Threads are distributed in a round
robin fashion. There is no thread stealing in this policy.

[heading Priority ABP Scheduling Policy]

* invoke using: [hpx_cmdline `--hpx:queuing=abp-priority`]
Expand Down
13 changes: 13 additions & 0 deletions hpx/hpx_fwd.hpp
Expand Up @@ -183,13 +183,15 @@ namespace hpx
struct lockfree_fifo;
struct lockfree_lifo;

// multi priority scheduler with work-stealing
template <typename Mutex = boost::mutex
, typename PendingQueuing = lockfree_fifo
, typename StagedQueuing = lockfree_fifo
, typename TerminatedQueuing = lockfree_lifo
>
class HPX_EXPORT local_priority_queue_scheduler;

// single priority scheduler with work-stealing
template <typename Mutex = boost::mutex
, typename PendingQueuing = lockfree_fifo
, typename StagedQueuing = lockfree_fifo
Expand All @@ -207,6 +209,7 @@ namespace hpx
#endif

#if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
// multi priority scheduler with no work-stealing
template <typename Mutex = boost::mutex
, typename PendingQueuing = lockfree_fifo
, typename StagedQueuing = lockfree_fifo
Expand All @@ -215,6 +218,16 @@ namespace hpx
class HPX_EXPORT static_priority_queue_scheduler;
#endif

#if defined(HPX_HAVE_STATIC_SCHEDULER)
// single priority scheduler with no work-stealing
template <typename Mutex = boost::mutex
, typename PendingQueuing = lockfree_fifo
, typename StagedQueuing = lockfree_fifo
, typename TerminatedQueuing = lockfree_lifo
>
class HPX_EXPORT static_queue_scheduler;
#endif

#if defined(HPX_HAVE_HIERARCHY_SCHEDULER)
template <typename Mutex = boost::mutex
, typename PendingQueuing = lockfree_fifo
Expand Down
2 changes: 2 additions & 0 deletions hpx/parallel/executors.hpp
Expand Up @@ -10,5 +10,7 @@
#include <hpx/parallel/executors/parallel_executor.hpp>
#include <hpx/parallel/executors/parallel_fork_executor.hpp>
#include <hpx/parallel/executors/sequential_executor.hpp>
#include <hpx/parallel/executors/thread_pool_executors.hpp>
#include <hpx/parallel/executors/this_thread_executors.hpp>

#endif
5 changes: 5 additions & 0 deletions hpx/parallel/executors/detail/thread_executor.hpp
Expand Up @@ -42,6 +42,11 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
return hpx::get_os_thread_count(exec_);
}

bool has_pending_closures() const
{
return exec_.num_pending_closures() != 0;
}

private:
threads::executor exec_;
};
Expand Down
36 changes: 36 additions & 0 deletions hpx/parallel/executors/executor_traits.hpp
Expand Up @@ -312,6 +312,29 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
return os_thread_count_helper::call(0, exec);
}

///////////////////////////////////////////////////////////////////////
struct has_pending_closures_helper
{
template <typename Executor>
static auto call(wrap_int, Executor& exec) -> bool
{
return false; // assume stateless scheduling
}

template <typename Executor>
static auto call(int, Executor& exec)
-> decltype(exec.has_pending_closures())
{
return exec.has_pending_closures();
}
};

template <typename Executor>
bool call_has_pending_closures(Executor& exec)
{
return has_pending_closures_helper::call(0, exec);
}
/// \endcond
}

Expand Down Expand Up @@ -520,6 +543,19 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
return detail::call_os_thread_count(exec);
}

/// Retrieve whether this executor has operations pending or not.
///
/// \param exec [in] The executor object to use for scheduling of the
/// function \a f.
///
/// \note If the executor does not expose this information, this call
/// will always return \a false
///
static bool has_pending_closures(executor_type& exec)
{
return detail::call_has_pending_closures(exec);
}
};

///////////////////////////////////////////////////////////////////////////
Expand Down
73 changes: 73 additions & 0 deletions hpx/parallel/executors/this_thread_executors.hpp
@@ -0,0 +1,73 @@
// Copyright (c) 2007-2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

/// \file parallel/executors/thread_pool_executors.hpp

#if !defined(HPX_PARALLEL_EXECUTORS_THIS_THREAD_EXECUTORS_JUL_16_2015_0809PM)
#define HPX_PARALLEL_EXECUTORS_THIS_THREAD_EXECUTORS_JUL_16_2015_0809PM

#include <hpx/config.hpp>
#include <hpx/parallel/config/inline_namespace.hpp>
#include <hpx/parallel/executors/executor_traits.hpp>
#include <hpx/parallel/executors/detail/thread_executor.hpp>
#include <hpx/runtime/threads/executors/this_thread_executors.hpp>
#include <hpx/util/move.hpp>

#include <type_traits>

namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
///////////////////////////////////////////////////////////////////////////
#if defined(HPX_HAVE_STATIC_SCHEDULER)
struct this_thread_static_queue_executor
#if !defined(DOXYGEN)
: detail::threads_executor
#endif
{
/// Creates a new local_queue_executor
///
explicit this_thread_static_queue_executor()
: threads_executor(
threads::executors::this_thread_static_queue_executor())
{}
};
#endif

#if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
struct this_thread_static_priority_queue_executor
#if !defined(DOXYGEN)
: detail::threads_executor
#endif
{
/// Creates a new static_priority_queue_executor
///
explicit this_thread_static_priority_queue_executor()
: threads_executor(
threads::executors::this_thread_static_priority_queue_executor())
{}
};
#endif

namespace detail
{
/// \cond NOINTERNAL
#if defined(HPX_HAVE_STATIC_SCHEDULER)
template <>
struct is_executor<this_thread_static_queue_executor>
: std::true_type
{};
#endif

#if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
template <>
struct is_executor<this_thread_static_priority_queue_executor>
: std::true_type
{};
#endif
/// \endcond
}
}}}

#endif
29 changes: 29 additions & 0 deletions hpx/parallel/executors/thread_pool_executors.hpp
Expand Up @@ -42,6 +42,28 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
};
#endif

#if defined(HPX_HAVE_STATIC_SCHEDULER)
struct static_queue_executor
#if !defined(DOXYGEN)
: detail::threads_executor
#endif
{
/// Creates a new static_queue_executor
///
/// \param max_punits [in] The maximum number of processing units to
/// associate with the newly created executor.
/// \param min_punits [in] The minimum number of processing units to
/// associate with the newly created executor
/// (default: 1).
///
explicit static_queue_executor(std::size_t max_punits,
std::size_t min_punits = 1)
: threads_executor(threads::executors::static_queue_executor(
max_punits, min_punits))
{}
};
#endif

///////////////////////////////////////////////////////////////////////////
struct local_priority_queue_executor
#if !defined(DOXYGEN)
Expand Down Expand Up @@ -95,6 +117,13 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{};
#endif

#if defined(HPX_HAVE_STATIC_SCHEDULER)
template <>
struct is_executor<static_queue_executor>
: std::true_type
{};
#endif

template <>
struct is_executor<local_priority_queue_executor>
: std::true_type
Expand Down
39 changes: 32 additions & 7 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -183,11 +183,26 @@ namespace hpx { namespace threads { namespace detail
#endif

///////////////////////////////////////////////////////////////////////////
struct scheduling_counters
{
scheduling_counters(boost::int64_t& executed_threads,
boost::int64_t& executed_thread_phases,
boost::uint64_t& tfunc_time, boost::uint64_t& exec_time)
: executed_threads_(executed_threads),
executed_thread_phases_(executed_thread_phases),
tfunc_time_(tfunc_time),
exec_time_(exec_time)
{}

boost::int64_t& executed_threads_;
boost::int64_t& executed_thread_phases_;
boost::uint64_t& tfunc_time_;
boost::uint64_t& exec_time_;
};

template <typename SchedulingPolicy>
void scheduling_loop(std::size_t num_thread, SchedulingPolicy& scheduler,
boost::atomic<hpx::state>& global_state, boost::int64_t& executed_threads,
boost::int64_t& executed_thread_phases, boost::uint64_t& tfunc_time,
boost::uint64_t& exec_time,
boost::atomic<hpx::state>& global_state, scheduling_counters& counters,
util::function_nonser<void()> const& cb_outer = util::function_nonser<void()>(),
util::function_nonser<void()> const& cb_inner = util::function_nonser<void()>())
{
Expand All @@ -199,7 +214,7 @@ namespace hpx { namespace threads { namespace detail
boost::int64_t idle_loop_count = 0;
boost::int64_t busy_loop_count = 0;

idle_collect_rate idle_rate(tfunc_time, exec_time);
idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_);
tfunc_time_wrapper tfunc_time_collector(idle_rate);

scheduler.SchedulingPolicy::start_periodic_maintenance(global_state);
Expand Down Expand Up @@ -255,7 +270,7 @@ namespace hpx { namespace threads { namespace detail
}

#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
++executed_thread_phases;
++counters.executed_thread_phases_;
#endif
}
else {
Expand Down Expand Up @@ -323,7 +338,7 @@ namespace hpx { namespace threads { namespace detail
if (state_val == depleted || state_val == terminated)
{
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
++executed_threads;
++counters.executed_threads_;
#endif
scheduler.SchedulingPolicy::destroy_thread(thrd, busy_loop_count);
}
Expand All @@ -339,7 +354,14 @@ namespace hpx { namespace threads { namespace detail
// clean up terminated threads one more time before existing
if (scheduler.SchedulingPolicy::cleanup_terminated(true))
{
// keep idling for some time
// if this is an inner scheduler, exit immediately
if (!cb_inner.empty())
{
global_state.store(state_stopped);
break;
}

// otherwise, keep idling for some time
if (!may_exit)
idle_loop_count = 0;
may_exit = true;
Expand Down Expand Up @@ -390,7 +412,10 @@ namespace hpx { namespace threads { namespace detail
if (may_exit)
{
if (scheduler.SchedulingPolicy::cleanup_terminated(true))
{
global_state.store(state_stopped);
break;
}
may_exit = false;
}
else
Expand Down