Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add this_thread_executors #1669

Merged
merged 4 commits into from Jul 26, 2015
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions hpx/parallel/executors.hpp
Expand Up @@ -10,5 +10,7 @@
#include <hpx/parallel/executors/parallel_executor.hpp>
#include <hpx/parallel/executors/parallel_fork_executor.hpp>
#include <hpx/parallel/executors/sequential_executor.hpp>
#include <hpx/parallel/executors/thread_pool_executors.hpp>
#include <hpx/parallel/executors/this_thread_executors.hpp>

#endif
5 changes: 5 additions & 0 deletions hpx/parallel/executors/detail/thread_executor.hpp
Expand Up @@ -42,6 +42,11 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
return hpx::get_os_thread_count(exec_);
}

bool has_pending_closures() const
{
return exec_.num_pending_closures() != 0;
}

private:
threads::executor exec_;
};
Expand Down
36 changes: 36 additions & 0 deletions hpx/parallel/executors/executor_traits.hpp
Expand Up @@ -312,6 +312,29 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
return os_thread_count_helper::call(0, exec);
}

///////////////////////////////////////////////////////////////////////
struct has_pending_closures_helper
{
template <typename Executor>
static auto call(wrap_int, Executor& exec) -> bool
{
return hpx::get_os_thread_count();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the pending closures really the number of OS threads? wouldn't this rather return false for stateless executors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, that's a bug (copy&paste problem).

}

template <typename Executor>
static auto call(int, Executor& exec)
-> decltype(exec.has_pending_closures())
{
return exec.has_pending_closures();
}
};

template <typename Executor>
bool call_has_pending_closures(Executor& exec)
{
return has_pending_closures_helper::call(0, exec);
}
/// \endcond
}

Expand Down Expand Up @@ -520,6 +543,19 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
return detail::call_os_thread_count(exec);
}

/// Retrieve whether this executor has operations pending or not.
///
/// \param exec [in] The executor object to use for scheduling of the
/// function \a f.
///
/// \note If the executor does not expose this information, this call
/// will always return \a false
///
static bool has_pending_closures(executor_type& exec)
{
return detail::call_has_pending_closures(exec);
}
};

///////////////////////////////////////////////////////////////////////////
Expand Down
85 changes: 85 additions & 0 deletions hpx/parallel/executors/this_thread_executors.hpp
@@ -0,0 +1,85 @@
// Copyright (c) 2007-2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

/// \file parallel/executors/thread_pool_executors.hpp

#if !defined(HPX_PARALLEL_EXECUTORS_THIS_THREAD_EXECUTORS_JUL_16_2015_0809PM)
#define HPX_PARALLEL_EXECUTORS_THIS_THREAD_EXECUTORS_JUL_16_2015_0809PM

#include <hpx/config.hpp>
#include <hpx/parallel/config/inline_namespace.hpp>
#include <hpx/parallel/executors/executor_traits.hpp>
#include <hpx/parallel/executors/detail/thread_executor.hpp>
#include <hpx/runtime/threads/executors/this_thread_executors.hpp>
#include <hpx/util/move.hpp>

#include <type_traits>

namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
///////////////////////////////////////////////////////////////////////////
#if defined(HPX_HAVE_LOCAL_SCHEDULER)
struct this_thread_local_queue_executor
#if !defined(DOXYGEN)
: detail::threads_executor
#endif
{
/// Creates a new local_queue_executor
///
/// \param max_punits [in] The maximum number of processing units to
/// associate with the newly created executor.
/// \param min_punits [in] The minimum number of processing units to
/// associate with the newly created executor
/// (default: 1).
///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a copy and paste error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, forgot to update the comments

explicit this_thread_local_queue_executor()
: threads_executor(
threads::executors::this_thread_local_queue_executor())
{}
};
#endif

#if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
struct this_thread_static_priority_queue_executor
#if !defined(DOXYGEN)
: detail::threads_executor
#endif
{
/// Creates a new static_priority_queue_executor
///
/// \param max_punits [in] The maximum number of processing units to
/// associate with the newly created executor.
/// \param min_punits [in] The minimum number of processing units to
/// associate with the newly created executor
/// (default: 1).
///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

explicit this_thread_static_priority_queue_executor()
: threads_executor(
threads::executors::this_thread_static_priority_queue_executor())
{}
};
#endif

namespace detail
{
/// \cond NOINTERNAL
#if defined(HPX_HAVE_LOCAL_SCHEDULER)
template <>
struct is_executor<this_thread_local_queue_executor>
: std::true_type
{};
#endif

#if defined(HPX_HAVE_STATIC_PRIORITY_SCHEDULER)
template <>
struct is_executor<this_thread_static_priority_queue_executor>
: std::true_type
{};
#endif
/// \endcond
}
}}}

#endif
39 changes: 32 additions & 7 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -183,11 +183,26 @@ namespace hpx { namespace threads { namespace detail
#endif

///////////////////////////////////////////////////////////////////////////
struct scheduling_counters
{
scheduling_counters(boost::int64_t& executed_threads,
boost::int64_t& executed_thread_phases,
boost::uint64_t& tfunc_time, boost::uint64_t& exec_time)
: executed_threads_(executed_threads),
executed_thread_phases_(executed_thread_phases),
tfunc_time_(tfunc_time),
exec_time_(exec_time)
{}

boost::int64_t& executed_threads_;
boost::int64_t& executed_thread_phases_;
boost::uint64_t& tfunc_time_;
boost::uint64_t& exec_time_;
};

template <typename SchedulingPolicy>
void scheduling_loop(std::size_t num_thread, SchedulingPolicy& scheduler,
boost::atomic<hpx::state>& global_state, boost::int64_t& executed_threads,
boost::int64_t& executed_thread_phases, boost::uint64_t& tfunc_time,
boost::uint64_t& exec_time,
boost::atomic<hpx::state>& global_state, scheduling_counters& counters,
util::function_nonser<void()> const& cb_outer = util::function_nonser<void()>(),
util::function_nonser<void()> const& cb_inner = util::function_nonser<void()>())
{
Expand All @@ -199,7 +214,7 @@ namespace hpx { namespace threads { namespace detail
boost::int64_t idle_loop_count = 0;
boost::int64_t busy_loop_count = 0;

idle_collect_rate idle_rate(tfunc_time, exec_time);
idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_);
tfunc_time_wrapper tfunc_time_collector(idle_rate);

scheduler.SchedulingPolicy::start_periodic_maintenance(global_state);
Expand Down Expand Up @@ -255,7 +270,7 @@ namespace hpx { namespace threads { namespace detail
}

#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
++executed_thread_phases;
++counters.executed_thread_phases_;
#endif
}
else {
Expand Down Expand Up @@ -323,7 +338,7 @@ namespace hpx { namespace threads { namespace detail
if (state_val == depleted || state_val == terminated)
{
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
++executed_threads;
++counters.executed_threads_;
#endif
scheduler.SchedulingPolicy::destroy_thread(thrd, busy_loop_count);
}
Expand All @@ -339,7 +354,14 @@ namespace hpx { namespace threads { namespace detail
// clean up terminated threads one more time before existing
if (scheduler.SchedulingPolicy::cleanup_terminated(true))
{
// keep idling for some time
// if this is an inner scheduler, exit immediately
if (!cb_inner.empty())
{
global_state.store(state_stopped);
break;
}

// otherwise, keep idling for some time
if (!may_exit)
idle_loop_count = 0;
may_exit = true;
Expand Down Expand Up @@ -390,7 +412,10 @@ namespace hpx { namespace threads { namespace detail
if (may_exit)
{
if (scheduler.SchedulingPolicy::cleanup_terminated(true))
{
global_state.store(state_stopped);
break;
}
may_exit = false;
}
else
Expand Down
57 changes: 57 additions & 0 deletions hpx/runtime/threads/detail/thread_num_tss.hpp
@@ -0,0 +1,57 @@
// Copyright (c) 2007-2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#if !defined(HPX_RUNTIME_THREADS_DETAIL_THREAD_NUM_TSS_JUL_17_2015_0811PM)
#define HPX_RUNTIME_THREADS_DETAIL_THREAD_NUM_TSS_JUL_17_2015_0811PM

#include <hpx/config.hpp>
#include <hpx/util/thread_specific_ptr.hpp>

#include <cstdarg>

namespace hpx { namespace threads { namespace detail
{
///////////////////////////////////////////////////////////////////////////
class thread_num_tss
{
public:
std::size_t set_tss_threadnum(std::size_t num);
void init_tss(std::size_t num);
void deinit_tss();

std::size_t get_worker_thread_num() const;

private:
// the TSS holds the number associated with a given OS thread
struct tls_tag {};
static hpx::util::thread_specific_ptr<std::size_t, tls_tag> thread_num_;
};

// the TSS holds the number associated with a given OS thread
extern thread_num_tss thread_num_tss_;

///////////////////////////////////////////////////////////////////////////
struct reset_tss_helper
{
reset_tss_helper(std::size_t thread_num)
: thread_num_(thread_num_tss_.set_tss_threadnum(thread_num))
{}

~reset_tss_helper()
{
thread_num_tss_.set_tss_threadnum(thread_num_);
}

std::size_t previous_thread_num() const
{
return thread_num_;
}

private:
std::size_t thread_num_;
};
}}}

#endif
5 changes: 1 addition & 4 deletions hpx/runtime/threads/detail/thread_pool.hpp
Expand Up @@ -8,6 +8,7 @@

#include <hpx/config.hpp>
#include <hpx/state.hpp>
#include <hpx/runtime/threads/detail/thread_num_tss.hpp>
#include <hpx/runtime/threads/policies/affinity_data.hpp>
#include <hpx/runtime/threads/policies/scheduler_base.hpp>
#include <hpx/util/thread_specific_ptr.hpp>
Expand Down Expand Up @@ -165,10 +166,6 @@ namespace hpx { namespace threads { namespace detail
// Stores the mask identifying all processing units used by this
// thread manager.
threads::mask_type used_processing_units_;

// the TSS holds the number associated with a given OS thread
struct tls_tag {};
static hpx::util::thread_specific_ptr<std::size_t, tls_tag> thread_num_;
};
}}}

Expand Down
59 changes: 59 additions & 0 deletions hpx/runtime/threads/executors/manage_thread_executor.hpp
@@ -0,0 +1,59 @@
// Copyright (c) 2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#if !defined(HPX_RUNTIME_THREADS_EXECUTORS_MANAGE_THREAD_EXECUTOR_JUL_16_2015_0745PM)
#define HPX_RUNTIME_THREADS_EXECUTORS_MANAGE_THREAD_EXECUTOR_JUL_16_2015_0745PM

#include <hpx/config.hpp>
#include <hpx/error.hpp>
#include <hpx/runtime/threads/thread_executor.hpp>

#include <cstdarg>

namespace hpx { namespace threads { namespace executors { namespace detail
{
///////////////////////////////////////////////////////////////////////////
template <typename ExecutorImpl>
class manage_thread_executor
: public threads::detail::manage_executor
{
public:
manage_thread_executor(ExecutorImpl& sched)
: sched_(sched)
{}

protected:
// Return the requested policy element.
std::size_t get_policy_element(threads::detail::executor_parameter p,
error_code& ec) const
{
return sched_.get_policy_element(p, ec);
}

// Return statistics collected by this scheduler
void get_statistics(executor_statistics& stats, error_code& ec) const
{
sched_.get_statistics(stats, ec);
}

// Provide the given processing unit to the scheduler.
void add_processing_unit(std::size_t virt_core, std::size_t thread_num,
error_code& ec)
{
sched_.add_processing_unit(virt_core, thread_num, ec);
}

// Remove the given processing unit from the scheduler.
void remove_processing_unit(std::size_t thread_num, error_code& ec)
{
sched_.remove_processing_unit(thread_num, ec);
}

private:
ExecutorImpl& sched_;
};
}}}}

#endif