Skip to content

Commit

Permalink
Merge pull request #1701 from STEllAR-GROUP/distribution_policy_execu…
Browse files Browse the repository at this point in the history
…tors

Add distribution_policy_executor
  • Loading branch information
hkaiser committed Aug 9, 2015
2 parents aaa8eb5 + fb9de3a commit 9806a56
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 48 deletions.
9 changes: 8 additions & 1 deletion docs/manual/parallel_algorithms.qbk
Expand Up @@ -428,7 +428,14 @@ In __hpx__ we have implemented the following executor types:
creates groups of parallel execution agents which execute in one of the
kernel threads associated with a given pool category (I/O, parcel, or timer
pool, or on the main thread of the application).
* [classref hpx::parallel::v3::local_priority_queue_executor `hpx::parallel::local_priority_queue_executor`]
* [classref hpx::parallel::v3::local_priority_queue_executor `hpx::parallel::local_priority_queue_executor`],
[classref hpx::parallel::v3::local_queue_executor `hpx::parallel::local_queue_executor`]
[classref hpx::parallel::v3::static_priority_queue_executor `hpx::parallel::static_priority_queue_executor`]
create executors on top of the corresponding __hpx__ schedulers.
* [classref hpx::parallel::v3::distribution_policy_executor `hpx::parallel::distribution_policy_executor`]
creates executors using any of the existing distribution policies (like
[classref hpx::components::colocating_distribution_policy `hpx::components::colocating_distribution_policy]
et.al.).

[endsect]

Expand Down
2 changes: 2 additions & 0 deletions hpx/include/components.hpp
Expand Up @@ -26,6 +26,8 @@
#include <hpx/runtime/components/server/create_component.hpp>
#include <hpx/runtime/components/server/destroy_component.hpp>

#include <hpx/runtime/components/server/invoke_function.hpp>

#include <hpx/runtime/components/stubs/runtime_support.hpp>
#include <hpx/runtime/components/client_base.hpp>

Expand Down
1 change: 1 addition & 0 deletions hpx/parallel/executors.hpp
Expand Up @@ -12,6 +12,7 @@
#include <hpx/parallel/executors/parallel_fork_executor.hpp>
#include <hpx/parallel/executors/sequential_executor.hpp>
#include <hpx/parallel/executors/service_executors.hpp>
#include <hpx/parallel/executors/distribution_policy_executor.hpp>
#include <hpx/parallel/executors/thread_pool_executors.hpp>
#include <hpx/parallel/executors/this_thread_executors.hpp>

Expand Down
170 changes: 170 additions & 0 deletions hpx/parallel/executors/distribution_policy_executor.hpp
@@ -0,0 +1,170 @@
// Copyright (c) 2007-2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

/// \file parallel/executors/distribution_policy_executor.hpp

#if !defined(HPX_PARALLEL_EXECUTORS_DISTRIBUTION_POLICY_EXECUTOR_JUL_21_2015_0404PM)
#define HPX_PARALLEL_EXECUTORS_DISTRIBUTION_POLICY_EXECUTOR_JUL_21_2015_0404PM

#include <hpx/config.hpp>
#include <hpx/traits/is_action.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/runtime/components/server/invoke_function.hpp>

#include <hpx/parallel/config/inline_namespace.hpp>
#include <hpx/parallel/executors/executor_traits.hpp>

#include <hpx/util/decay.hpp>
#include <hpx/util/move.hpp>
#include <hpx/util/result_of.hpp>

#include <type_traits>

#include <boost/static_assert.hpp>

namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
///////////////////////////////////////////////////////////////////////////
namespace detail
{
template <typename F, typename Enable = void>
struct async_execute_result
{
typedef typename hpx::util::result_of<
typename hpx::util::decay<F>::type()
>::type type;
};

template <typename Action>
struct async_execute_result<Action,
typename std::enable_if<hpx::traits::is_action<Action>::value>::type>
{
typedef typename Action::local_result_type type;
};
}

/// A \a distribution_policy_executor creates groups of parallel execution
/// agents which execute in threads implicitly created by the executor and
/// placed on any of the associated localities.
///
/// \tparam DistPolicy The distribution policy type for which an
/// executor should be created. The expression
/// \a hpx::traits::is_distribution_policy<DistPolicy>::value must
/// evaluate to true.
///
template <typename DistPolicy>
class distribution_policy_executor : public executor_tag
{
private:
/// \cond NOINTERNAL
BOOST_STATIC_ASSERT_MSG(
hpx::traits::is_distribution_policy<DistPolicy>::value,
"distribution_policy_executor needs to be instantiated with a "
"distribution policy type");

// apply_execute implementations
template <typename F>
typename std::enable_if<
!hpx::traits::is_action<F>::value
>::type
apply_execute_impl(F && f) const
{
typedef components::server::invoke_function_action<F> action_type;
policy_.template apply<action_type>(actions::continuation_type(),
threads::thread_priority_default, std::forward<F>(f));
}

template <typename Action>
typename std::enable_if<
hpx::traits::is_action<Action>::value
>::type
apply_execute_impl(Action && /*act*/) const
{
policy_.template apply<Action>(actions::continuation_type(),
threads::thread_priority_default);
}

// async_execute implementations
template <typename F>
typename std::enable_if<
!hpx::traits::is_action<F>::value,
hpx::future<typename detail::async_execute_result<F>::type>
>::type
async_execute_impl(F && f) const
{
typedef components::server::invoke_function_action<F> action_type;
return policy_.template async<action_type>(launch::async,
std::forward<F>(f));
}

template <typename Action>
typename std::enable_if<
hpx::traits::is_action<Action>::value,
hpx::future<typename Action::local_result_type>
>::type
async_execute_impl(Action && /*act*/) const
{
return policy_.template async<Action>(launch::async);
}
/// \endcond

public:
/// Create a new distribution_policy executor from the given
/// distribution policy
///
/// \param policy The distribution_policy to create an executor from
///
template <typename DistPolicy_>
distribution_policy_executor(DistPolicy_ && policy)
: policy_(std::forward<DistPolicy_>(policy))
{}

/// \cond NOINTERNAL
typedef parallel_execution_tag execution_category;

template <typename F>
void apply_execute(F && f) const
{
return apply_execute_impl(std::forward<F>(f));
}

template <typename F>
hpx::future<typename detail::async_execute_result<F>::type>
async_execute(F && f) const
{
return async_execute_impl(std::forward<F>(f));
}
/// \endcond

private:
DistPolicy policy_;
};

/// Create a new distribution_policy_executor from the given
/// distribution_policy.
///
/// \param policy The distribution_policy to create an executor from
///
template <typename DistPolicy>
distribution_policy_executor<typename hpx::util::decay<DistPolicy>::type>
make_distribution_policy_executor(DistPolicy && policy)
{
typedef typename hpx::util::decay<DistPolicy>::type dist_policy_type;
return distribution_policy_executor<dist_policy_type>(
std::forward<DistPolicy>(policy));
}

namespace detail
{
/// \cond NOINTERNAL
template <typename DistPolicy>
struct is_executor<distribution_policy_executor<DistPolicy> >
: std::true_type
{};
/// \endcond
}
}}}

#endif

0 comments on commit 9806a56

Please sign in to comment.