Skip to content

Commit

Permalink
Refactoring thread manager, mainly extracting thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Jun 14, 2015
1 parent a0383fe commit daf0711
Show file tree
Hide file tree
Showing 20 changed files with 1,588 additions and 1,000 deletions.
8 changes: 2 additions & 6 deletions hpx/hpx_fwd.hpp
Expand Up @@ -253,9 +253,7 @@ namespace hpx
class HPX_EXPORT thread_data_base;
class HPX_EXPORT thread_data;

template <
typename SchedulingPolicy,
typename NotificationPolicy = threads::policies::callback_notifier>
template <typename SchedulingPolicy>
class HPX_EXPORT threadmanager_impl;

///////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -504,9 +502,7 @@ namespace hpx
std::size_t dflt);

///////////////////////////////////////////////////////////////////////////
template <
typename SchedulingPolicy,
typename NotificationPolicy = threads::policies::callback_notifier>
template <typename SchedulingPolicy>
class HPX_API_EXPORT runtime_impl;

/// The function \a get_runtime returns a reference to the (thread
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime.hpp
Expand Up @@ -59,7 +59,7 @@ namespace hpx
int pre_main(runtime_mode);

///////////////////////////////////////////////////////////////////////////
template <typename SchedulingPolicy, typename NotificationPolicy>
template <typename SchedulingPolicy>
class HPX_EXPORT runtime_impl;

#if defined(HPX_HAVE_SECURITY)
Expand All @@ -72,7 +72,7 @@ namespace hpx
class HPX_EXPORT runtime
{
public:

state get_state() const { return state_.load(); }

/// The \a hpx_main_function_type is the default function type usable
Expand Down
96 changes: 96 additions & 0 deletions hpx/runtime/threads/detail/periodic_maintenance.hpp
@@ -0,0 +1,96 @@
// Copyright (c) 2007-2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#if !defined(HPX_RUNTIME_THREADS_DETAIL_PERIODIC_MAINTENANCE_JAN_11_2015_0626PM)
#define HPX_RUNTIME_THREADS_DETAIL_PERIODIC_MAINTENANCE_JAN_11_2015_0626PM

#include <hpx/config.hpp>
#include <hpx/state.hpp>
#include <hpx/util/io_service_pool.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/date_time_chrono.hpp>

#include <boost/cstdint.hpp>
#include <boost/mpl/bool.hpp>
#include <boost/ref.hpp>
#include <boost/asio/basic_deadline_timer.hpp>
#include <boost/atomic.hpp>
#include <boost/chrono/system_clocks.hpp>

namespace hpx { namespace threads { namespace detail
{
///////////////////////////////////////////////////////////////////////////
inline bool is_running_state(hpx::state state)
{
return state == state_running || state == state_suspended;
}

///////////////////////////////////////////////////////////////////////////
template <typename SchedulingPolicy>
inline void periodic_maintenance_handler(SchedulingPolicy& scheduler,
boost::atomic<hpx::state>& global_state, boost::mpl::false_)
{
}

template <typename SchedulingPolicy>
inline void periodic_maintenance_handler(SchedulingPolicy& scheduler,
boost::atomic<hpx::state>& global_state, boost::mpl::true_)
{
bool running = is_running_state(global_state.load());
scheduler.periodic_maintenance(running);

if (running)
{
// create timer firing in correspondence with given time
typedef boost::asio::basic_deadline_timer<
boost::chrono::steady_clock
, util::chrono_traits<boost::chrono::steady_clock>
> deadline_timer;

deadline_timer t(
get_thread_pool("timer-thread")->get_io_service(),
boost::chrono::milliseconds(1000));

void (*handler)(SchedulingPolicy&, boost::atomic<hpx::state>&, boost::mpl::true_) =
&periodic_maintenance_handler<SchedulingPolicy>;

t.async_wait(boost::bind(handler, boost::ref(scheduler),
boost::ref(global_state), boost::mpl::true_()));
}
}

template <typename SchedulingPolicy>
inline void start_periodic_maintenance(SchedulingPolicy&,
boost::atomic<hpx::state>& global_state, boost::mpl::false_)
{
}

template <typename SchedulingPolicy>
inline void start_periodic_maintenance(SchedulingPolicy& scheduler,
boost::atomic<hpx::state>& global_state, boost::mpl::true_)
{
scheduler.periodic_maintenance(is_running_state(global_state.load()));

// create timer firing in correspondence with given time
typedef boost::asio::basic_deadline_timer<
boost::chrono::steady_clock
, util::chrono_traits<boost::chrono::steady_clock>
> deadline_timer;

deadline_timer t (
get_thread_pool("io-thread")->get_io_service(),
boost::chrono::milliseconds(1000));

void (*handler)(SchedulingPolicy&, boost::atomic<hpx::state>&, boost::mpl::true_) =
&periodic_maintenance_handler<SchedulingPolicy>;

t.async_wait(util::bind(handler, boost::ref(scheduler),
boost::ref(global_state), boost::mpl::true_()));
}
}}}

#endif


76 changes: 2 additions & 74 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -9,86 +9,15 @@
#include <hpx/hpx_fwd.hpp>
#include <hpx/state.hpp>
#include <hpx/runtime/threads/thread_data.hpp>
#include <hpx/runtime/threads/detail/periodic_maintenance.hpp>
#include <hpx/runtime/agas/interface.hpp>
#include <hpx/util/itt_notify.hpp>
#include <hpx/util/hardware/timestamp.hpp>

#include <boost/cstdint.hpp>
#include <boost/mpl/bool.hpp>
#include <boost/bind.hpp>
#include <boost/ref.hpp>
#include <boost/asio/basic_deadline_timer.hpp>

namespace hpx { namespace threads { namespace detail
{
inline bool is_running_state(hpx::state state)
{
return state == state_running || state == state_suspended;
}

///////////////////////////////////////////////////////////////////////
template <typename SchedulingPolicy>
inline void periodic_maintenance_handler(SchedulingPolicy& scheduler,
boost::atomic<hpx::state>& global_state, boost::mpl::false_)
{
}

template <typename SchedulingPolicy>
inline void periodic_maintenance_handler(SchedulingPolicy& scheduler,
boost::atomic<hpx::state>& global_state, boost::mpl::true_)
{
bool running = is_running_state(global_state.load());
scheduler.periodic_maintenance(running);

if (running)
{
// create timer firing in correspondence with given time
typedef boost::asio::basic_deadline_timer<
boost::chrono::steady_clock
, util::chrono_traits<boost::chrono::steady_clock>
> deadline_timer;

deadline_timer t(
get_thread_pool("timer-thread")->get_io_service(),
boost::chrono::milliseconds(1000));

void (*handler)(SchedulingPolicy&, boost::atomic<hpx::state>&, boost::mpl::true_) =
&periodic_maintenance_handler<SchedulingPolicy>;

t.async_wait(boost::bind(handler, boost::ref(scheduler),
boost::ref(global_state), boost::mpl::true_()));
}
}

template <typename SchedulingPolicy>
inline void start_periodic_maintenance(SchedulingPolicy&,
boost::atomic<hpx::state>& global_state, boost::mpl::false_)
{
}

template <typename SchedulingPolicy>
inline void start_periodic_maintenance(SchedulingPolicy& scheduler,
boost::atomic<hpx::state>& global_state, boost::mpl::true_)
{
scheduler.periodic_maintenance(is_running_state(global_state.load()));

// create timer firing in correspondence with given time
typedef boost::asio::basic_deadline_timer<
boost::chrono::steady_clock
, util::chrono_traits<boost::chrono::steady_clock>
> deadline_timer;

deadline_timer t (
get_thread_pool("io-thread")->get_io_service(),
boost::chrono::milliseconds(1000));

void (*handler)(SchedulingPolicy&, boost::atomic<hpx::state>&, boost::mpl::true_) =
&periodic_maintenance_handler<SchedulingPolicy>;

t.async_wait(boost::bind(handler, boost::ref(scheduler),
boost::ref(global_state), boost::mpl::true_()));
}

///////////////////////////////////////////////////////////////////////
inline void write_new_state_log_debug(std::size_t num_thread,
thread_data_base* thrd, thread_state_enum state, char const* info)
Expand Down Expand Up @@ -273,8 +202,7 @@ namespace hpx { namespace threads { namespace detail
idle_collect_rate idle_rate(tfunc_time, exec_time);
tfunc_time_wrapper tfunc_time_collector(idle_rate);

typedef typename SchedulingPolicy::has_periodic_maintenance pred;
detail::start_periodic_maintenance(scheduler, global_state, pred());
scheduler.SchedulingPolicy::start_periodic_maintenance(global_state);

// spin for some time after queues have become empty
bool may_exit = false;
Expand Down

0 comments on commit daf0711

Please sign in to comment.