Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Converted partitioners and some algorithms to use executors #1564

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 55 additions & 30 deletions hpx/parallel/algorithms/detail/dispatch.hpp
Expand Up @@ -59,96 +59,121 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1) { namespace detail
: name_(name)
{}

///////////////////////////////////////////////////////////////////////
template <typename ExPolicy, typename... Args>
typename parallel::util::detail::algorithm_result<
ExPolicy, local_result_type
>::type
call(ExPolicy const& policy, boost::mpl::true_, Args&&... args) const
call(ExPolicy policy, boost::mpl::true_, Args&&... args) const
{
try {
return parallel::util::detail::algorithm_result<
ExPolicy, local_result_type
>::get(Derived::sequential(policy, std::forward<Args>(args)...));
>::get(Derived::sequential(policy,
std::forward<Args>(args)...));
}
catch (...) {
detail::handle_exception<ExPolicy, local_result_type>::call();
}
}

template <typename... Args>
///////////////////////////////////////////////////////////////////////
template <typename ExPolicy, typename... Args>
typename parallel::util::detail::algorithm_result<
sequential_task_execution_policy, local_result_type
ExPolicy, local_result_type
>::type
operator()(sequential_task_execution_policy const& policy,
Args&&... args) const
operator()(ExPolicy policy, Args&&... args) const
{
try {
return parallel::util::detail::algorithm_result<
sequential_task_execution_policy, local_result_type
>::get(Derived::sequential(policy, std::forward<Args>(args)...));
ExPolicy, local_result_type
>::get(Derived::sequential(policy,
std::forward<Args>(args)...));
}
catch (...) {
return detail::handle_exception<
sequential_task_execution_policy, local_result_type
ExPolicy, local_result_type
>::call();
}
}

template <typename... Args>
///////////////////////////////////////////////////////////////////////
template <typename ExPolicy, typename... Args>
typename parallel::util::detail::algorithm_result<
sequential_task_execution_policy, local_result_type
ExPolicy, local_result_type
>::type
call(sequential_task_execution_policy const& policy, boost::mpl::true_,
Args&&... args) const
call_sequential(ExPolicy policy, Args&&... args) const
{
try {
hpx::future<local_result_type> result =
hpx::async(derived(), policy, std::forward<Args>(args)...);

return parallel::util::detail::algorithm_result<
sequential_task_execution_policy, local_result_type
ExPolicy, local_result_type
>::get(std::move(result));
}
catch (...) {
return detail::handle_exception<
sequential_task_execution_policy, local_result_type
ExPolicy, local_result_type
>::call();
}
}


template <typename... Args>
typename parallel::util::detail::algorithm_result<
sequential_task_execution_policy, local_result_type
>::type
call(sequential_task_execution_policy policy, boost::mpl::true_,
Args&&... args) const
{
return call_sequential(policy, std::forward<Args>(args)...);
}

template <typename Executor, typename... Args>
typename parallel::util::detail::algorithm_result<
sequential_task_execution_policy_shim<Executor>, local_result_type
>::type
call(sequential_task_execution_policy_shim<Executor> policy,
boost::mpl::true_, Args&&... args) const
{
return call_sequential(policy, std::forward<Args>(args)...);
}

///////////////////////////////////////////////////////////////////////
template <typename... Args>
typename parallel::util::detail::algorithm_result<
parallel_task_execution_policy, local_result_type
>::type
call(parallel_task_execution_policy const& policy, boost::mpl::true_,
call(parallel_task_execution_policy policy, boost::mpl::true_,
Args&&... args) const
{
try {
return parallel::util::detail::algorithm_result<
parallel_task_execution_policy, local_result_type
>::get(Derived::sequential(policy,
std::forward<Args>(args)...));
}
catch (...) {
return detail::handle_exception<
parallel_task_execution_policy, local_result_type
>::call();
}
return call_sequential(policy, std::forward<Args>(args)...);
}

template <typename Executor, typename... Args>
typename parallel::util::detail::algorithm_result<
parallel_task_execution_policy_shim<Executor>, local_result_type
>::type
call(parallel_task_execution_policy_shim<Executor> policy,
boost::mpl::true_, Args&&... args) const
{
return call_sequential(policy, std::forward<Args>(args)...);
}

template <typename ExPolicy, typename... Args>
typename parallel::util::detail::algorithm_result<
ExPolicy, local_result_type
>::type
call(ExPolicy const& policy, boost::mpl::false_, Args&&... args) const
call(ExPolicy policy, boost::mpl::false_, Args&&... args) const
{
return Derived::parallel(policy, std::forward<Args>(args)...);
}

///////////////////////////////////////////////////////////////////////////
template <typename... Args>
local_result_type
call(parallel::execution_policy const& policy, boost::mpl::false_,
call(parallel::execution_policy policy, boost::mpl::false_,
Args&&... args) const
{
// this implementation is not nice, however we don't have variadic
Expand Down Expand Up @@ -196,7 +221,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1) { namespace detail

template <typename... Args>
local_result_type
call(parallel::execution_policy const& policy, boost::mpl::true_,
call(parallel::execution_policy policy, boost::mpl::true_,
Args&&... args) const
{
return call(seq, boost::mpl::true_(), std::forward<Args>(args)...);
Expand Down
8 changes: 4 additions & 4 deletions hpx/parallel/algorithms/for_each.hpp
Expand Up @@ -46,7 +46,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)

template <typename ExPolicy, typename F>
static Iter
sequential(ExPolicy const&, Iter first, std::size_t count, F && f)
sequential(ExPolicy, Iter first, std::size_t count, F && f)
{
return util::loop_n(first, count,
[f](Iter const& curr)
Expand All @@ -57,7 +57,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)

template <typename ExPolicy, typename F>
static typename util::detail::algorithm_result<ExPolicy, Iter>::type
parallel(ExPolicy const& policy, Iter first, std::size_t count,
parallel(ExPolicy policy, Iter first, std::size_t count,
F && f)
{
if (count != 0)
Expand Down Expand Up @@ -195,15 +195,15 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)

template <typename ExPolicy, typename InIter, typename F>
static hpx::util::unused_type
sequential(ExPolicy const&, InIter first, InIter last, F && f)
sequential(ExPolicy, InIter first, InIter last, F && f)
{
std::for_each(first, last, std::forward<F>(f));
return hpx::util::unused;
}

template <typename ExPolicy, typename FwdIter, typename F>
static typename util::detail::algorithm_result<ExPolicy>::type
parallel(ExPolicy const& policy, FwdIter first, FwdIter last, F && f)
parallel(ExPolicy policy, FwdIter first, FwdIter last, F && f)
{
typedef
typename util::detail::algorithm_result<ExPolicy>::type
Expand Down
6 changes: 3 additions & 3 deletions hpx/parallel/algorithms/inclusive_scan.hpp
Expand Up @@ -67,7 +67,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
///////////////////////////////////////////////////////////////////////
template <typename ExPolicy, typename T, typename OutIter, typename Op>
typename util::detail::algorithm_result<ExPolicy, OutIter>::type
scan_copy_helper(ExPolicy const& policy,
scan_copy_helper(ExPolicy policy,
std::vector<hpx::shared_future<T> >&& r,
boost::shared_array<T> data, std::size_t count,
OutIter dest, Op && op, std::vector<std::size_t> const& chunk_sizes)
Expand Down Expand Up @@ -112,7 +112,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)

template <typename ExPolicy, typename InIter, typename T, typename Op>
static OutIter
sequential(ExPolicy const&, InIter first, InIter last,
sequential(ExPolicy, InIter first, InIter last,
OutIter dest, T && init, Op && op)
{
return sequential_inclusive_scan(first, last, dest,
Expand All @@ -123,7 +123,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
static typename util::detail::algorithm_result<
ExPolicy, OutIter
>::type
parallel(ExPolicy const& policy, FwdIter first, FwdIter last,
parallel(ExPolicy policy, FwdIter first, FwdIter last,
OutIter dest, T && init, Op && op)
{
typedef util::detail::algorithm_result<ExPolicy, OutIter> result;
Expand Down
34 changes: 32 additions & 2 deletions hpx/parallel/exception_list.hpp
Expand Up @@ -15,8 +15,14 @@

namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
{
///////////////////////////////////////////////////////////////////////////
// forward declarations, see execution_policy.hpp
struct sequential_task_execution_policy;
template <typename Executor> struct sequential_task_execution_policy_shim;

struct parallel_task_execution_policy;
template <typename Executor> struct parallel_task_execution_policy_shim;

struct parallel_vector_execution_policy;

namespace detail
Expand All @@ -25,6 +31,8 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
template <typename ExPolicy, typename Result = void>
struct handle_exception
{
typedef void type;

HPX_ATTRIBUTE_NORETURN static void call()
{
try {
Expand All @@ -44,6 +52,8 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
template <typename Result>
struct handle_exception<sequential_task_execution_policy, Result>
{
typedef future<Result> type;

static future<Result> call()
{
try {
Expand All @@ -60,14 +70,24 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
}
}
catch (...) {
return hpx::make_exceptional_future<Result>(boost::current_exception());
return hpx::make_exceptional_future<Result>(
boost::current_exception());
}
}
};

template <typename Executor, typename Result>
struct handle_exception<
sequential_task_execution_policy_shim<Executor>, Result>
: handle_exception<sequential_task_execution_policy, Result>
{};

///////////////////////////////////////////////////////////////////////
template <typename Result>
struct handle_exception<parallel_task_execution_policy, Result>
{
typedef future<Result> type;

static future<Result> call()
{
try {
Expand All @@ -84,14 +104,24 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
}
}
catch (...) {
return hpx::make_exceptional_future<Result>(boost::current_exception());
return hpx::make_exceptional_future<Result>(
boost::current_exception());
}
}
};

template <typename Executor, typename Result>
struct handle_exception<
parallel_task_execution_policy_shim<Executor>, Result>
: handle_exception<parallel_task_execution_policy, Result>
{};

///////////////////////////////////////////////////////////////////////
template <typename Result>
struct handle_exception<parallel_vector_execution_policy, Result>
{
typedef void type;

HPX_ATTRIBUTE_NORETURN static void call()
{
// any exceptions thrown by algorithms executed with the
Expand Down