From 183e81ce9dc6ea0137ceb829532ba7e7792d905a Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Tue, 21 Jul 2015 20:38:10 -0500 Subject: [PATCH 1/2] Add distribution_policy_executor - integrates executors with distribution policies - adding test --- docs/manual/parallel_algorithms.qbk | 9 +- hpx/include/components.hpp | 2 + hpx/parallel/executors.hpp | 1 + .../distribution_policy_executor.hpp | 170 ++++++++++++++++++ hpx/parallel/executors/executor_traits.hpp | 132 +++++++++----- .../components/server/invoke_function.hpp | 46 +++++ hpx/traits/is_action.hpp | 2 +- tests/unit/component/CMakeLists.txt | 5 + .../distribution_policy_executor.cpp | 56 ++++++ 9 files changed, 375 insertions(+), 48 deletions(-) create mode 100644 hpx/parallel/executors/distribution_policy_executor.hpp create mode 100644 hpx/runtime/components/server/invoke_function.hpp create mode 100644 tests/unit/component/distribution_policy_executor.cpp diff --git a/docs/manual/parallel_algorithms.qbk b/docs/manual/parallel_algorithms.qbk index a9dc284d8e5e..3b0d6560571e 100644 --- a/docs/manual/parallel_algorithms.qbk +++ b/docs/manual/parallel_algorithms.qbk @@ -428,7 +428,14 @@ In __hpx__ we have implemented the following executor types: creates groups of parallel execution agents which execute in one of the kernel threads associated with a given pool category (I/O, parcel, or timer pool, or on the main thread of the application). -* [classref hpx::parallel::v3::local_priority_queue_executor `hpx::parallel::local_priority_queue_executor`] +* [classref hpx::parallel::v3::local_priority_queue_executor `hpx::parallel::local_priority_queue_executor`], + [classref hpx::parallel::v3::local_queue_executor `hpx::parallel::local_queue_executor`] + [classref hpx::parallel::v3::static_priority_queue_executor `hpx::parallel::static_priority_queue_executor`] + create executors on top of the corresponding __hpx__ schedulers. +* [classref hpx::parallel::v3::distribution_policy_executor `hpx::parallel::distribution_policy_executor`] + creates executors using any of the existing distribution policies (like + [classref hpx::components::colocating_distribution_policy `hpx::components::colocating_distribution_policy] + et.al.). [endsect] diff --git a/hpx/include/components.hpp b/hpx/include/components.hpp index 18c4ca39a628..9d78976339d3 100644 --- a/hpx/include/components.hpp +++ b/hpx/include/components.hpp @@ -26,6 +26,8 @@ #include #include +#include + #include #include diff --git a/hpx/parallel/executors.hpp b/hpx/parallel/executors.hpp index f9cf223d18d5..3b1e53b34f33 100644 --- a/hpx/parallel/executors.hpp +++ b/hpx/parallel/executors.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include diff --git a/hpx/parallel/executors/distribution_policy_executor.hpp b/hpx/parallel/executors/distribution_policy_executor.hpp new file mode 100644 index 000000000000..0fdeb5797ba9 --- /dev/null +++ b/hpx/parallel/executors/distribution_policy_executor.hpp @@ -0,0 +1,170 @@ +// Copyright (c) 2007-2015 Hartmut Kaiser +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +/// \file parallel/executors/distribution_policy_executor.hpp + +#if !defined(HPX_PARALLEL_EXECUTORS_DISTRIBUTION_POLICY_EXECUTOR_JUL_21_2015_0404PM) +#define HPX_PARALLEL_EXECUTORS_DISTRIBUTION_POLICY_EXECUTOR_JUL_21_2015_0404PM + +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +#include + +#include + +namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) +{ + /////////////////////////////////////////////////////////////////////////// + namespace detail + { + template + struct async_execute_result + { + typedef typename hpx::util::result_of< + typename hpx::util::decay::type() + >::type type; + }; + + template + struct async_execute_result::value>::type> + { + typedef typename Action::local_result_type type; + }; + } + + /// A \a distribution_policy_executor creates groups of parallel execution + /// agents which execute in threads implicitly created by the executor and + /// placed on any of the associated localities. + /// + /// \tparam DistPolicy The distribution policy type for which an + /// executor should be created. The expression + /// \a hpx::traits::is_distribution_policy::value must + /// evaluate to true. + /// + template + class distribution_policy_executor : public executor_tag + { + private: + /// \cond NOINTERNAL + BOOST_STATIC_ASSERT_MSG( + hpx::traits::is_distribution_policy::value, + "distribution_policy_executor needs to be instantiated with a " + "distribution policy type"); + + // apply_execute implementations + template + typename std::enable_if< + !hpx::traits::is_action::value + >::type + apply_execute(F && f) const + { + typedef components::server::invoke_function_action action_type; + policy_.template apply(actions::continuation_type(), + threads::thread_priority_default, std::forward(f)); + } + + template + typename std::enable_if< + hpx::traits::is_action::value + >::type + apply_execute(Action && act) const + { + policy_.template apply(actions::continuation_type(), + threads::thread_priority_default); + } + + // async_execute implementations + template + typename std::enable_if< + !hpx::traits::is_action::value, + hpx::future::type> + >::type + async_execute_impl(F && f) const + { + typedef components::server::invoke_function_action action_type; + return policy_.template async(launch::async, + std::forward(f)); + } + + template + typename std::enable_if< + hpx::traits::is_action::value, + hpx::future + >::type + async_execute_impl(Action && act) const + { + return policy_.template async(launch::async); + } + /// \endcond + + public: + /// Create a new distribution_policy executor from the given + /// distribution policy + /// + /// \param policy The distribution_policy to create an executor from + /// + template + distribution_policy_executor(DistPolicy_ && policy) + : policy_(std::forward(policy)) + {} + + /// \cond NOINTERNAL + typedef parallel_execution_tag execution_category; + + template + void apply_execute(F && f) const + { + return apply_execute_impl(std::forward(f)); + } + + template + hpx::future::type> + async_execute(F && f) const + { + return async_execute_impl(std::forward(f)); + } + /// \endcond + + private: + DistPolicy policy_; + }; + + /// Create a new distribution_policy_executor from the given + /// distribution_policy. + /// + /// \param policy The distribution_policy to create an executor from + /// + template + distribution_policy_executor::type> + make_distribution_policy_executor(DistPolicy && policy) + { + typedef typename hpx::util::decay::type dist_policy_type; + return distribution_policy_executor( + std::forward(policy)); + } + + namespace detail + { + /// \cond NOINTERNAL + template + struct is_executor > + : std::true_type + {}; + /// \endcond + } +}}} + +#endif diff --git a/hpx/parallel/executors/executor_traits.hpp b/hpx/parallel/executors/executor_traits.hpp index 001fb255300a..67fe20ce1002 100644 --- a/hpx/parallel/executors/executor_traits.hpp +++ b/hpx/parallel/executors/executor_traits.hpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -24,9 +25,11 @@ #include #include +#include #include #include +#include namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) { @@ -129,14 +132,14 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) struct apply_helper { template - static auto call(wrap_int, Executor& exec, F && f) -> void + static void call(wrap_int, Executor& exec, F && f) { exec.async_execute(std::forward(f)); } template static auto call(int, Executor& exec, F && f) - -> decltype(exec.apply_execute(std::forward(f))) + -> decltype(exec.apply_execute(std::forward(f))) { exec.apply_execute(std::forward(f)); } @@ -145,34 +148,42 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) template void call_apply_execute(Executor& exec, F && f) { - return apply_helper::call(0, exec, std::forward(f)); + apply_helper::call(0, exec, std::forward(f)); } /////////////////////////////////////////////////////////////////////// struct execute_helper { template - static typename hpx::util::result_of< - typename hpx::util::decay::type() - >::type + static auto call(wrap_int, Executor& exec, F && f) + -> decltype(exec.async_execute(std::forward(f)).get()) { - return exec.async_execute(std::forward(f)).get(); + try { + return exec.async_execute(std::forward(f)).get(); + } + catch (std::bad_alloc const& ba) { + boost::throw_exception(ba); + } + catch (...) { + boost::throw_exception( + exception_list(boost::current_exception()) + ); + } } template static auto call(int, Executor& exec, F && f) - -> decltype(exec.execute(std::forward(f))) + -> decltype(exec.execute(std::forward(f))) { return exec.execute(std::forward(f)); } }; template - typename hpx::util::result_of< - typename hpx::util::decay::type() - >::type + auto call_execute(Executor& exec, F && f) + -> decltype(execute_helper::call(0, exec, std::forward(f))) { return execute_helper::call(0, exec, std::forward(f)); } @@ -192,18 +203,35 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) >::type type; }; + template + struct bulk_result_helper + { + typedef std::vector type; + }; + + template <> + struct bulk_result_helper + { + typedef void type; + }; + + /////////////////////////////////////////////////////////////////////// struct bulk_async_execute_helper { template - static std::vector::type - >::type> + static auto call(wrap_int, Executor& exec, F && f, S const& shape) + -> typename bulk_result_helper::type { std::vector::type >::type> results; + results.reserve(boost::size(shape)); for (auto const& elem: shape) { @@ -217,17 +245,19 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) template static auto call(int, Executor& exec, F && f, S const& shape) - -> decltype(exec.bulk_async_execute(std::forward(f), shape)) + -> decltype(exec.bulk_async_execute(std::forward(f), shape)) { return exec.bulk_async_execute(std::forward(f), shape); } }; template - std::vector::type - >::type> + auto call_bulk_async_execute(Executor& exec, F && f, S const& shape) + -> decltype( + bulk_async_execute_helper::call(0, exec, std::forward(f), + shape) + ) { return bulk_async_execute_helper::call( 0, exec, std::forward(f), shape); @@ -257,35 +287,50 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) { // returns void if F returns void template - static typename detail::bulk_execute_result::type + static auto call(wrap_int, Executor& exec, F && f, S const& shape) + -> typename bulk_result_helper::type { std::vector::type >::type> results; - - for (auto const& elem: shape) - { - results.push_back( - exec.async_execute(hpx::util::deferred_call(f, elem)) + results.reserve(boost::size(shape)); + + try { + for (auto const& elem: shape) + { + results.push_back( + exec.async_execute(hpx::util::deferred_call(f, elem)) + ); + } + return hpx::util::unwrapped(results); + } + catch (std::bad_alloc const& ba) { + boost::throw_exception(ba); + } + catch (...) { + boost::throw_exception( + exception_list(boost::current_exception()) ); } - - return hpx::util::unwrapped(results); } template static auto call(int, Executor& exec, F && f, S const& shape) - -> decltype(exec.bulk_execute(std::forward(f), shape)) + -> decltype(exec.bulk_execute(std::forward(f), shape)) { return exec.bulk_execute(std::forward(f), shape); } }; template - typename detail::bulk_execute_result::type - call_bulk_execute(Executor& exec, F && f, S const& shape) + auto call_bulk_execute(Executor& exec, F && f, S const& shape) + -> decltype(bulk_execute_helper::call(0, exec, std::forward(f), shape)) { return bulk_execute_helper::call(0, exec, std::forward(f), shape); } @@ -294,14 +339,14 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) struct os_thread_count_helper { template - static auto call(wrap_int, Executor& exec) -> std::size_t + static std::size_t call(wrap_int, Executor& exec) { return hpx::get_os_thread_count(); } template static auto call(int, Executor& exec) - -> decltype(exec.os_thread_count()) + -> decltype(exec.os_thread_count()) { return exec.os_thread_count(); } @@ -424,12 +469,8 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) /// \returns f()'s result through a future /// template - static typename future< - typename hpx::util::result_of< - typename hpx::util::decay::type() - >::type - >::type - async_execute(executor_type& exec, F && f) + static auto async_execute(executor_type& exec, F && f) + -> decltype(exec.async_execute(std::forward(f))) { return exec.async_execute(std::forward(f)); } @@ -451,10 +492,8 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) /// otherwise hpx::async(f).get() /// template - static typename hpx::util::result_of< - typename hpx::util::decay::type() - >::type - execute(executor_type& exec, F && f) + static auto execute(executor_type& exec, F && f) + -> decltype(detail::call_execute(exec, std::forward(f))) { return detail::call_execute(exec, std::forward(f)); } @@ -486,10 +525,11 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) /// otherwise it executes hpx::async(f, i) as often as needed. /// template - static std::vector::type - >::type> + static auto async_execute(executor_type& exec, F && f, Shape const& shape) + -> decltype( + detail::call_bulk_async_execute(exec, std::forward(f), shape) + ) { return detail::call_bulk_async_execute( exec, std::forward(f), shape); @@ -523,8 +563,8 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) /// otherwise it executes hpx::async(f, i) as often as needed. /// template - static typename detail::bulk_execute_result::type - execute(executor_type& exec, F && f, Shape const& shape) + static auto execute(executor_type& exec, F && f, Shape const& shape) + -> decltype(detail::call_bulk_execute(exec, std::forward(f), shape)) { return detail::call_bulk_execute(exec, std::forward(f), shape); } diff --git a/hpx/runtime/components/server/invoke_function.hpp b/hpx/runtime/components/server/invoke_function.hpp new file mode 100644 index 000000000000..168d03c1ebe0 --- /dev/null +++ b/hpx/runtime/components/server/invoke_function.hpp @@ -0,0 +1,46 @@ +// Copyright (c) 2015 Hartmut Kaiser +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#if !defined(HPX_COMPONENTS_INVOKE_FUNCTION_JUL_21_2015_0521PM) +#define HPX_COMPONENTS_INVOKE_FUNCTION_JUL_21_2015_0521PM + +#include +#include +#include +#include +#include +#include +#include + +namespace hpx { namespace components { namespace server +{ + namespace detail + { + /////////////////////////////////////////////////////////////////////// + // simple utility action which invoke an arbitrary function + template + struct invoke_function + { + static typename util::result_of::type + call (F f, Ts... ts) + { + return f(std::move(ts)...); + } + }; + } + + /////////////////////////////////////////////////////////////////////////// + // action definition exposing invoke_function<> + template + struct invoke_function_action + : ::hpx::actions::action< + typename util::result_of::type(*)(F, Ts...), + &detail::invoke_function::call, + invoke_function_action > + {}; +}}} + +#endif + diff --git a/hpx/traits/is_action.hpp b/hpx/traits/is_action.hpp index 6a83ac68bfad..a760cd75044a 100644 --- a/hpx/traits/is_action.hpp +++ b/hpx/traits/is_action.hpp @@ -28,7 +28,7 @@ namespace hpx { namespace traits template struct is_action - : detail::is_action_impl + : detail::is_action_impl::type> {}; /////////////////////////////////////////////////////////////////////////// diff --git a/tests/unit/component/CMakeLists.txt b/tests/unit/component/CMakeLists.txt index a402e68a936c..1ecc64a8d52a 100644 --- a/tests/unit/component/CMakeLists.txt +++ b/tests/unit/component/CMakeLists.txt @@ -6,6 +6,7 @@ set(tests action_invoke_no_more_than copy_component + distribution_policy_executor get_gid get_ptr inheritance_2_classes_abstract @@ -34,6 +35,10 @@ set(action_invoke_no_more_than_PARAMETERS set(action_invoke_no_more_than_FLAGS DEPENDENCIES iostreams_component) +set(colocated_distribution_policy_PARAMETERS + LOCALITIES 2 + THREADS_PER_LOCALITY 2) + set(copy_component_PARAMETERS LOCALITIES 2 THREADS_PER_LOCALITY 2) diff --git a/tests/unit/component/distribution_policy_executor.cpp b/tests/unit/component/distribution_policy_executor.cpp new file mode 100644 index 000000000000..d49180114db4 --- /dev/null +++ b/tests/unit/component/distribution_policy_executor.cpp @@ -0,0 +1,56 @@ +// Copyright (c) 2015 Hartmut Kaiser +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include +#include +#include +#include + +/////////////////////////////////////////////////////////////////////////////// +hpx::id_type call() { return hpx::find_here(); } + +HPX_PLAIN_ACTION(call, call_action); + +struct call_pfo +{ + hpx::id_type operator()() const + { + return hpx::find_here(); + } +}; + +/////////////////////////////////////////////////////////////////////////////// +void test_distribution_policy_executor() +{ + using namespace hpx::parallel; + + for (hpx::id_type const& loc: hpx::find_all_localities()) + { + auto exec = make_distribution_policy_executor(hpx::colocated(loc)); + typedef executor_traits executor_traits; + + HPX_TEST_EQ(executor_traits::async_execute(exec, call_pfo()).get(), loc); + HPX_TEST_EQ(executor_traits::async_execute(exec, call_action()).get(), loc); + } + + for (hpx::id_type const& loc: hpx::find_all_localities()) + { + auto exec = make_distribution_policy_executor(hpx::colocated(loc)); + typedef executor_traits executor_traits; + + HPX_TEST_EQ(executor_traits::execute(exec, call_pfo()), loc); + HPX_TEST_EQ(executor_traits::execute(exec, call_action()), loc); + } +} + +int main() +{ + test_distribution_policy_executor(); + return 0; +} + From fb9de3a674e64dda7b18479644fdd785c2f41d3f Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Wed, 5 Aug 2015 07:08:52 -0500 Subject: [PATCH 2/2] Fixing unused parameter warnings --- hpx/parallel/executors/distribution_policy_executor.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hpx/parallel/executors/distribution_policy_executor.hpp b/hpx/parallel/executors/distribution_policy_executor.hpp index 0fdeb5797ba9..9ee284cbcbba 100644 --- a/hpx/parallel/executors/distribution_policy_executor.hpp +++ b/hpx/parallel/executors/distribution_policy_executor.hpp @@ -69,7 +69,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) typename std::enable_if< !hpx::traits::is_action::value >::type - apply_execute(F && f) const + apply_execute_impl(F && f) const { typedef components::server::invoke_function_action action_type; policy_.template apply(actions::continuation_type(), @@ -80,7 +80,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) typename std::enable_if< hpx::traits::is_action::value >::type - apply_execute(Action && act) const + apply_execute_impl(Action && /*act*/) const { policy_.template apply(actions::continuation_type(), threads::thread_priority_default); @@ -104,7 +104,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3) hpx::traits::is_action::value, hpx::future >::type - async_execute_impl(Action && act) const + async_execute_impl(Action && /*act*/) const { return policy_.template async(launch::async); }