Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distribution_policy_executor #1701

Merged
merged 2 commits into from Aug 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/manual/parallel_algorithms.qbk
Expand Up @@ -428,7 +428,14 @@ In __hpx__ we have implemented the following executor types:
creates groups of parallel execution agents which execute in one of the
kernel threads associated with a given pool category (I/O, parcel, or timer
pool, or on the main thread of the application).
* [classref hpx::parallel::v3::local_priority_queue_executor `hpx::parallel::local_priority_queue_executor`]
* [classref hpx::parallel::v3::local_priority_queue_executor `hpx::parallel::local_priority_queue_executor`],
[classref hpx::parallel::v3::local_queue_executor `hpx::parallel::local_queue_executor`]
[classref hpx::parallel::v3::static_priority_queue_executor `hpx::parallel::static_priority_queue_executor`]
create executors on top of the corresponding __hpx__ schedulers.
* [classref hpx::parallel::v3::distribution_policy_executor `hpx::parallel::distribution_policy_executor`]
creates executors using any of the existing distribution policies (like
[classref hpx::components::colocating_distribution_policy `hpx::components::colocating_distribution_policy]
et.al.).

[endsect]

Expand Down
2 changes: 2 additions & 0 deletions hpx/include/components.hpp
Expand Up @@ -26,6 +26,8 @@
#include <hpx/runtime/components/server/create_component.hpp>
#include <hpx/runtime/components/server/destroy_component.hpp>

#include <hpx/runtime/components/server/invoke_function.hpp>

#include <hpx/runtime/components/stubs/runtime_support.hpp>
#include <hpx/runtime/components/client_base.hpp>

Expand Down
1 change: 1 addition & 0 deletions hpx/parallel/executors.hpp
Expand Up @@ -12,6 +12,7 @@
#include <hpx/parallel/executors/parallel_fork_executor.hpp>
#include <hpx/parallel/executors/sequential_executor.hpp>
#include <hpx/parallel/executors/service_executors.hpp>
#include <hpx/parallel/executors/distribution_policy_executor.hpp>
#include <hpx/parallel/executors/thread_pool_executors.hpp>
#include <hpx/parallel/executors/this_thread_executors.hpp>

Expand Down
170 changes: 170 additions & 0 deletions hpx/parallel/executors/distribution_policy_executor.hpp
@@ -0,0 +1,170 @@
// Copyright (c) 2007-2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

/// \file parallel/executors/distribution_policy_executor.hpp

#if !defined(HPX_PARALLEL_EXECUTORS_DISTRIBUTION_POLICY_EXECUTOR_JUL_21_2015_0404PM)
#define HPX_PARALLEL_EXECUTORS_DISTRIBUTION_POLICY_EXECUTOR_JUL_21_2015_0404PM

#include <hpx/config.hpp>
#include <hpx/traits/is_action.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/runtime/components/server/invoke_function.hpp>

#include <hpx/parallel/config/inline_namespace.hpp>
#include <hpx/parallel/executors/executor_traits.hpp>

#include <hpx/util/decay.hpp>
#include <hpx/util/move.hpp>
#include <hpx/util/result_of.hpp>

#include <type_traits>

#include <boost/static_assert.hpp>

namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
///////////////////////////////////////////////////////////////////////////
namespace detail
{
template <typename F, typename Enable = void>
struct async_execute_result
{
typedef typename hpx::util::result_of<
typename hpx::util::decay<F>::type()
>::type type;
};

template <typename Action>
struct async_execute_result<Action,
typename std::enable_if<hpx::traits::is_action<Action>::value>::type>
{
typedef typename Action::local_result_type type;
};
}

/// A \a distribution_policy_executor creates groups of parallel execution
/// agents which execute in threads implicitly created by the executor and
/// placed on any of the associated localities.
///
/// \tparam DistPolicy The distribution policy type for which an
/// executor should be created. The expression
/// \a hpx::traits::is_distribution_policy<DistPolicy>::value must
/// evaluate to true.
///
template <typename DistPolicy>
class distribution_policy_executor : public executor_tag
{
private:
/// \cond NOINTERNAL
BOOST_STATIC_ASSERT_MSG(
hpx::traits::is_distribution_policy<DistPolicy>::value,
"distribution_policy_executor needs to be instantiated with a "
"distribution policy type");

// apply_execute implementations
template <typename F>
typename std::enable_if<
!hpx::traits::is_action<F>::value
>::type
apply_execute_impl(F && f) const
{
typedef components::server::invoke_function_action<F> action_type;
policy_.template apply<action_type>(actions::continuation_type(),
threads::thread_priority_default, std::forward<F>(f));
}

template <typename Action>
typename std::enable_if<
hpx::traits::is_action<Action>::value
>::type
apply_execute_impl(Action && /*act*/) const
{
policy_.template apply<Action>(actions::continuation_type(),
threads::thread_priority_default);
}

// async_execute implementations
template <typename F>
typename std::enable_if<
!hpx::traits::is_action<F>::value,
hpx::future<typename detail::async_execute_result<F>::type>
>::type
async_execute_impl(F && f) const
{
typedef components::server::invoke_function_action<F> action_type;
return policy_.template async<action_type>(launch::async,
std::forward<F>(f));
}

template <typename Action>
typename std::enable_if<
hpx::traits::is_action<Action>::value,
hpx::future<typename Action::local_result_type>
>::type
async_execute_impl(Action && /*act*/) const
{
return policy_.template async<Action>(launch::async);
}
/// \endcond

public:
/// Create a new distribution_policy executor from the given
/// distribution policy
///
/// \param policy The distribution_policy to create an executor from
///
template <typename DistPolicy_>
distribution_policy_executor(DistPolicy_ && policy)
: policy_(std::forward<DistPolicy_>(policy))
{}

/// \cond NOINTERNAL
typedef parallel_execution_tag execution_category;

template <typename F>
void apply_execute(F && f) const
{
return apply_execute_impl(std::forward<F>(f));
}

template <typename F>
hpx::future<typename detail::async_execute_result<F>::type>
async_execute(F && f) const
{
return async_execute_impl(std::forward<F>(f));
}
/// \endcond

private:
DistPolicy policy_;
};

/// Create a new distribution_policy_executor from the given
/// distribution_policy.
///
/// \param policy The distribution_policy to create an executor from
///
template <typename DistPolicy>
distribution_policy_executor<typename hpx::util::decay<DistPolicy>::type>
make_distribution_policy_executor(DistPolicy && policy)
{
typedef typename hpx::util::decay<DistPolicy>::type dist_policy_type;
return distribution_policy_executor<dist_policy_type>(
std::forward<DistPolicy>(policy));
}

namespace detail
{
/// \cond NOINTERNAL
template <typename DistPolicy>
struct is_executor<distribution_policy_executor<DistPolicy> >
: std::true_type
{};
/// \endcond
}
}}}

#endif