Skip to content

Commit

Permalink
Added priority-queue lifo scheduler
Browse files Browse the repository at this point in the history
- flyby: removed unneeded #includes
- flyby: removed unneeded type definitions
  • Loading branch information
hkaiser committed Feb 5, 2017
1 parent 9420de8 commit be8c54b
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 48 deletions.
5 changes: 3 additions & 2 deletions docs/manual/commandline.qbk
Expand Up @@ -85,8 +85,9 @@ described in the table below:
[[`--hpx:print-bind`] [print to the console the bit masks calculated from the
arguments specified to all `--hpx:bind` options.]]
[[`--hpx:queuing arg`] [the queue scheduling policy to use, options are
'local/l', 'local-priority/lo', 'abp/a', 'abp-priority',
'hierarchy/h', and 'periodic/pe' (default: local-priority/lo)]]
'local/l', 'local-priority-fifo/lo', 'local-priority-lifo', 'abp/a',
'abp-priority', 'hierarchy/h', and 'periodic/pe'
(default: local-priority-fifo/lo)]]
[[`--hpx:hierarchy-arity`] [the arity of the of the thread queue tree, valid for
`--hpx:queuing=hierarchy` only (default: 2)]]
[[`--hpx:high-priority-threads arg`] [the number of operating system threads
Expand Down
7 changes: 6 additions & 1 deletion docs/manual/scheduling_policies.qbk
Expand Up @@ -19,7 +19,7 @@ information).

[heading Priority Local Scheduling Policy (default policy)]

* default or invoke using: [hpx_cmdline `--hpx:queuing=local-priority`] (or `-qpr`)
* default or invoke using: [hpx_cmdline `--hpx:queuing=local-priority-fifo`] (or `-qlo`)

The priority local scheduling policy maintains one queue per operating system
(OS) thread. The OS thread pulls its work from this queue. By default the number
Expand All @@ -38,6 +38,11 @@ same NUMA domain first, only after that work is stolen from other NUMA domains.

This scheduler is enabled at build time by default and will be available always.

This scheduler can be used with two underlying queueing policies (FIFO:
first-in-first-out, and LIFO: last-in-first-out). The default is FIFO. In order
to use the LIFO policiy use the command line option
[hpx_cmdline `--hpx:queuing=local-priority-lifo`].

[heading Static Priority Scheduling Policy]

* invoke using: [hpx_cmdline `--hpx:queuing=static-priority`] (or `-qs`)
Expand Down
Expand Up @@ -934,7 +934,6 @@ namespace hpx { namespace threads { namespace policies
}
}


#ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION
// no new work is available, are we deadlocked?
if (HPX_UNLIKELY(minimal_deadlock_detection && LHPX_ENABLED(error)))
Expand Down Expand Up @@ -1117,7 +1116,6 @@ namespace hpx { namespace threads { namespace policies
}

protected:
boost::mutex void_mtx_;
std::size_t max_queue_thread_count_;
std::vector<thread_queue_type*> queues_;
std::vector<thread_queue_type*> high_priority_queues_;
Expand Down
2 changes: 0 additions & 2 deletions hpx/runtime/threads/policies/static_queue_scheduler.hpp
Expand Up @@ -17,8 +17,6 @@
#include <hpx/runtime/threads_fwd.hpp>
#include <hpx/util/logging.hpp>

#include <boost/atomic.hpp>

#include <cstddef>
#include <cstdint>
#include <memory>
Expand Down
22 changes: 0 additions & 22 deletions hpx/runtime/threads_fwd.hpp
Expand Up @@ -94,28 +94,6 @@ namespace hpx
class HPX_EXPORT hierarchy_scheduler;
#endif

typedef local_priority_queue_scheduler<
boost::mutex,
lockfree_fifo, // FIFO pending queuing
lockfree_fifo, // FIFO staged queuing
lockfree_lifo // LIFO terminated queuing
> fifo_priority_queue_scheduler;

#if defined(HPX_HAVE_ABP_SCHEDULER)
struct lockfree_abp_fifo;
struct lockfree_abp_lifo;

typedef local_priority_queue_scheduler<
boost::mutex,
lockfree_abp_fifo, // FIFO + ABP pending queuing
lockfree_abp_fifo, // FIFO + ABP staged queuing
lockfree_lifo // LIFO terminated queuing
> abp_fifo_priority_queue_scheduler;
#endif

// define the default scheduler to use
typedef fifo_priority_queue_scheduler queue_scheduler;

class HPX_EXPORT callback_notifier;
}

Expand Down
36 changes: 26 additions & 10 deletions src/hpx_init.cpp
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2013 Hartmut Kaiser
// Copyright (c) 2007-2017 Hartmut Kaiser
// Copyright (c) 2010-2011 Phillip LeBlanc, Dylan Stark
// Copyright (c) 2011 Bryce Lelbach
//
Expand Down Expand Up @@ -899,6 +899,7 @@ namespace hpx
///////////////////////////////////////////////////////////////////////
// local scheduler with priority queue (one queue for each OS threads
// plus one separate queue for high priority HPX-threads)
template <typename Queuing>
int run_priority_local(startup_function_type startup,
shutdown_function_type shutdown,
util::command_line_handling& cfg, bool blocking)
Expand All @@ -915,8 +916,10 @@ namespace hpx
get_affinity_description(cfg, affinity_desc);

// scheduling policy
typedef hpx::threads::policies:: local_priority_queue_scheduler<>
local_queue_policy;
typedef hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, Queuing
> local_queue_policy;

local_queue_policy::init_parameter_type init(
cfg.num_threads_, num_high_priority_queues, 1000,
numa_sensitive, "core-local_priority_queue_scheduler");
Expand All @@ -934,7 +937,7 @@ namespace hpx
}

///////////////////////////////////////////////////////////////////////
// priority abp scheduler: local priority deques for each OS thread,
// priority abp scheduler: local priority dequeues for each OS thread,
// with work stealing from the "bottom" of each.
int run_priority_abp(startup_function_type startup,
shutdown_function_type shutdown,
Expand All @@ -948,8 +951,10 @@ namespace hpx
get_num_high_priority_queues(cfg);

// scheduling policy
typedef hpx::threads::policies::abp_fifo_priority_queue_scheduler
abp_priority_queue_policy;
typedef hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, hpx::threads::policies::lockfree_fifo
> abp_priority_queue_policy;

abp_priority_queue_policy::init_parameter_type init(
cfg.num_threads_, num_high_priority_queues, 1000,
cfg.numa_sensitive_, "core-abp_fifo_priority_queue_scheduler");
Expand Down Expand Up @@ -1108,14 +1113,25 @@ namespace hpx
result = run_static(std::move(startup), std::move(shutdown),
cfg, blocking);
}
else if (0 == std::string("local-priority").find(cfg.queuing_))
else if (0 == std::string("local-priority-fifo").find(cfg.queuing_))
{
// local scheduler with priority queue (one queue for each
// OS thread plus separate dequeues for low/high priority
/// HPX-threads)
cfg.queuing_ = "local-priority";
result = run_priority_local(std::move(startup),
std::move(shutdown), cfg, blocking);
cfg.queuing_ = "local-priority-fifo";
result = run_priority_local<
hpx::threads::policies::lockfree_fifo
>(std::move(startup), std::move(shutdown), cfg, blocking);
}
else if (0 == std::string("local-priority-lifo").find(cfg.queuing_))
{
// local scheduler with priority queue (one queue for each
// OS thread plus separate dequeues for low/high priority
/// HPX-threads)
cfg.queuing_ = "local-priority-lifo";
result = run_priority_local<
hpx::threads::policies::lockfree_lifo
>(std::move(startup), std::move(shutdown), cfg, blocking);
}
else if (0 == std::string("static-priority").find(cfg.queuing_))
{
Expand Down
12 changes: 10 additions & 2 deletions src/runtime/threads/detail/thread_pool.cpp
Expand Up @@ -1434,11 +1434,19 @@ template class HPX_EXPORT hpx::threads::detail::thread_pool<

#include <hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp>
template class HPX_EXPORT hpx::threads::detail::thread_pool<
hpx::threads::policies::local_priority_queue_scheduler<> >;
hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, hpx::threads::policies::lockfree_fifo
> >;
template class HPX_EXPORT hpx::threads::detail::thread_pool<
hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, hpx::threads::policies::lockfree_lifo
> >;

#if defined(HPX_HAVE_ABP_SCHEDULER)
template class HPX_EXPORT hpx::threads::detail::thread_pool<
hpx::threads::policies::abp_fifo_priority_queue_scheduler>;
hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, hpx::threads::policies::lockfree_abp_fifo
> >;
#endif

#if defined(HPX_HAVE_HIERARCHY_SCHEDULER)
Expand Down
12 changes: 10 additions & 2 deletions src/runtime/threads/threadmanager.cpp
Expand Up @@ -1313,11 +1313,19 @@ template class HPX_EXPORT hpx::threads::threadmanager_impl<

#include <hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp>
template class HPX_EXPORT hpx::threads::threadmanager_impl<
hpx::threads::policies::local_priority_queue_scheduler<> >;
hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, hpx::threads::policies::lockfree_fifo
> >;
template class HPX_EXPORT hpx::threads::threadmanager_impl<
hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, hpx::threads::policies::lockfree_lifo
> >;

#if defined(HPX_HAVE_ABP_SCHEDULER)
template class HPX_EXPORT hpx::threads::threadmanager_impl<
hpx::threads::policies::abp_fifo_priority_queue_scheduler>;
hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, hpx::threads::policies::lockfree_abp_fifo
> >;
#endif

#if defined(HPX_HAVE_HIERARCHY_SCHEDULER)
Expand Down
12 changes: 10 additions & 2 deletions src/runtime_impl.cpp
Expand Up @@ -875,11 +875,19 @@ template class HPX_EXPORT hpx::runtime_impl<

#include <hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp>
template class HPX_EXPORT hpx::runtime_impl<
hpx::threads::policies::local_priority_queue_scheduler<> >;
hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, hpx::threads::policies::lockfree_fifo
> >;
template class HPX_EXPORT hpx::runtime_impl<
hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, hpx::threads::policies::lockfree_lifo
> >;

#if defined(HPX_HAVE_ABP_SCHEDULER)
template class HPX_EXPORT hpx::runtime_impl<
hpx::threads::policies::abp_fifo_priority_queue_scheduler>;
hpx::threads::policies::local_priority_queue_scheduler<
boost::mutex, hpx::threads::policies::lockfree_abp_fifo
> >;
#endif

#if defined(HPX_HAVE_HIERARCHY_SCHEDULER)
Expand Down
2 changes: 1 addition & 1 deletion src/util/command_line_handling.cpp
Expand Up @@ -674,7 +674,7 @@ namespace hpx { namespace util
}

// handle setting related to schedulers
queuing_ = detail::handle_queueing(cfgmap, vm, "local-priority");
queuing_ = detail::handle_queueing(cfgmap, vm, "local-priority-fifo");
ini_config += "hpx.scheduler=" + queuing_;

affinity_domain_ = detail::handle_affinity(cfgmap, vm, "pu");
Expand Down
3 changes: 2 additions & 1 deletion src/util/parse_command_line.cpp
Expand Up @@ -451,7 +451,8 @@ namespace hpx { namespace util
"the number of total cores in the system)")
("hpx:queuing", value<std::string>(),
"the queue scheduling policy to use, options are "
"'local', 'local-priority', 'abp-priority', "
"'local', 'local-priority-fifo','local-priority-lifo', "
"'abp-priority', "
"'hierarchy', 'static', 'static-priority', and "
"'periodic-priority' (default: 'local-priority'; "
"all option values can be abbreviated)")
Expand Down
2 changes: 1 addition & 1 deletion src/util/runtime_configuration.cpp
Expand Up @@ -181,7 +181,7 @@ namespace hpx { namespace util
"localities = 1",
"first_pu = 0",
"runtime_mode = console",
"scheduler = local-priority",
"scheduler = local-priority-fifo",
"affinity = pu",
"pu_step = 1",
"pu_offset = 0",
Expand Down

0 comments on commit be8c54b

Please sign in to comment.