Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed return type of bulk executors and added test #1538

Merged
merged 4 commits into from May 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 37 additions & 15 deletions hpx/parallel/executors/executor_traits.hpp
@@ -1,4 +1,5 @@
// Copyright (c) 2007-2015 Hartmut Kaiser
// Copyright (c) 2015 Daniel Bourgeois
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Expand Down Expand Up @@ -176,16 +177,20 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
template <typename Executor, typename F, typename S>
static auto call(wrap_int, Executor& exec, F && f, S const& shape)
-> typename future_type<Executor, void>::type
-> std::vector<
decltype(exec.async_execute(
util::deferred_call(f, *std::begin(shape))))
>
{
std::vector<hpx::future<void> > results;
std::vector<decltype(exec.async_execute(
util::deferred_call(f, *std::begin(shape))))> results;
for (auto const& elem: shape)
{
results.push_back(
exec.async_execute(util::deferred_call(f, elem))
);
}
return hpx::when_all(results);
return std::move(results);
}

template <typename Executor, typename F, typename S>
Expand All @@ -197,8 +202,9 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
};

template <typename Executor, typename F, typename S>
typename future_type<Executor, void>::type
call_bulk_async_execute(Executor& exec, F && f, S const& shape)
auto call_bulk_async_execute(Executor& exec, F && f, S const& shape)
-> decltype(bulk_async_execute_helper::call(
0, exec, std::forward<F>(f), shape))
{
return bulk_async_execute_helper::call(
0, exec, std::forward<F>(f), shape);
Expand All @@ -209,16 +215,19 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
{
template <typename Executor, typename F, typename S>
static auto call(wrap_int, Executor& exec, F && f, S const& shape)
-> void
-> decltype(hpx::util::unwrapped(exec.async_execute(
util::deferred_call(f, *std::begin(shape)))))
//returns void if F returns void
{
std::vector<hpx::future<void> > results;
std::vector<decltype(exec.async_execute(
util::deferred_call(f, *std::begin(shape))))> results;
for (auto const& elem: shape)
{
results.push_back(
exec.async_execute(util::deferred_call(f, elem))
);
}
hpx::when_all(results).get();
return hpx::util::unwrapped(results);
}

template <typename Executor, typename F, typename S>
Expand All @@ -230,7 +239,9 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
};

template <typename Executor, typename F, typename S>
void call_bulk_execute(Executor& exec, F && f, S const& shape)
auto call_bulk_execute(Executor& exec, F && f, S const& shape)
-> decltype(bulk_execute_helper::call(
0, exec, std::forward<F>(f), shape))
{
return bulk_execute_helper::call(0, exec, std::forward<F>(f), shape);
}
Expand Down Expand Up @@ -374,8 +385,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
/// otherwise hpx::async(f).get()
///
template <typename F>
static
typename hpx::util::result_of<
static typename hpx::util::result_of<
typename hpx::util::decay<F>::type()
>::type
execute(executor_type& exec, F && f)
Expand All @@ -401,15 +411,19 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
/// \param shape [in] The shape objects which defines the iteration
/// boundaries for the arguments to be passed to \a f.
///
/// \returns A future object representing which becomes ready once all
/// scheduled functions have finished executing.
/// \returns The return type of \a executor_type::async_execute if
/// defined by \a executor_type. Otherwise a vector
/// of futures holding the returned value of each invocation
/// of \a f.
///
/// \note This calls exec.async_execute(f, shape) if it exists;
/// otherwise it executes hpx::async(f, i) as often as needed.
///
template <typename F, typename Shape>
static typename future<void>::type
static auto
async_execute(executor_type& exec, F && f, Shape const& shape)
-> decltype(detail::call_bulk_async_execute(
exec, std::forward<F>(f), shape))
{
return detail::call_bulk_async_execute(
exec, std::forward<F>(f), shape);
Expand All @@ -434,11 +448,19 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
/// \param shape [in] The shape objects which defines the iteration
/// boundaries for the arguments to be passed to \a f.
///
/// \returns The return type of \a executor_type::execute if defined
/// by \a executor_type. Otherwise a vector holding the
/// returned value of each invocation of \a f except when
/// \a f returns void, which case void is returned.
///
/// \note This calls exec.execute(f, shape) if it exists;
/// otherwise it executes hpx::async(f, i) as often as needed.
///
template <typename F, typename Shape>
static void execute(executor_type& exec, F && f, Shape const& shape)
static auto
execute(executor_type& exec, F && f, Shape const& shape)
-> decltype(detail::call_bulk_execute(
exec, std::forward<F>(f), shape))
{
return detail::call_bulk_execute(exec, std::forward<F>(f), shape);
}
Expand Down
4 changes: 2 additions & 2 deletions hpx/parallel/executors/parallel_executor.hpp
Expand Up @@ -34,13 +34,13 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)

/// \cond NOINTERNAL
template <typename F>
void apply_execute(F && f)
static void apply_execute(F && f)
{
hpx::apply(std::forward<F>(f));
}

template <typename F>
hpx::future<typename hpx::util::result_of<
static hpx::future<typename hpx::util::result_of<
typename hpx::util::decay<F>::type()
>::type>
async_execute(F && f)
Expand Down
4 changes: 2 additions & 2 deletions hpx/parallel/executors/parallel_fork_executor.hpp
Expand Up @@ -34,13 +34,13 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)

/// \cond NOINTERNAL
template <typename F>
void apply_execute(F && f)
static void apply_execute(F && f)
{
return hpx::apply(std::forward<F>(f));
}

template <typename F>
hpx::future<typename hpx::util::result_of<
static hpx::future<typename hpx::util::result_of<
typename hpx::util::decay<F>::type()
>::type>
async_execute(F && f)
Expand Down
27 changes: 16 additions & 11 deletions hpx/parallel/executors/sequential_executor.hpp
Expand Up @@ -71,11 +71,24 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
}

template <typename F, typename Shape>
static void bulk_execute(F && f, Shape const& shape)
static auto bulk_execute(F && f, Shape const& shape)
-> decltype(hpx::util::unwrapped(
bulk_async_execute(std::forward<F>(f), shape)))
{
return hpx::util::unwrapped(
bulk_async_execute(std::forward<F>(f), shape));
}

template <typename F, typename Shape>
static auto bulk_async_execute(F && f, Shape const& shape)
-> std::vector<decltype(
hpx::async(hpx::launch::sync, f, *std::begin(shape)))>
{
std::vector<decltype(
hpx::async(hpx::launch::sync, f, *std::begin(shape)))> results;
try {
for (auto const& elem: shape)
f(elem);
results.push_back(hpx::async(hpx::launch::sync, f, elem));
}
catch (std::bad_alloc const& ba) {
boost::throw_exception(ba);
Expand All @@ -85,15 +98,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v3)
exception_list(boost::current_exception())
);
}
}

template <typename F, typename Shape>
static hpx::future<void>
bulk_async_execute(F && f, Shape const& shape)
{
return hpx::async(hpx::launch::deferred,
&sequential_executor::bulk_execute<F, Shape>,
std::forward<F>(f), shape);
return std::move(results);
}

std::size_t os_thread_count()
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/parallel/executors/CMakeLists.txt
Expand Up @@ -4,6 +4,8 @@
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests
bulk_async
created_executor
minimal_async_executor
minimal_sync_executor
parallel_executor
Expand Down
74 changes: 74 additions & 0 deletions tests/unit/parallel/executors/bulk_async.cpp
@@ -0,0 +1,74 @@
// Copyright (c) 2015 Daniel Bourgeois
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_init.hpp>
#include <hpx/hpx.hpp>
#include <hpx/include/parallel_executors.hpp>
#include <hpx/parallel/executors/executor_traits.hpp>
#include <hpx/util/lightweight_test.hpp>
#include <hpx/util/deferred_call.hpp>

#include <algorithm>

////////////////////////////////////////////////////////////////////////////////
int bulk_test(hpx::thread::id tid, int value, bool is_par)
{
HPX_TEST(is_par == (tid != hpx::this_thread::get_id()));
return value;
}

template <typename Executor>
void test_bulk_async(Executor& exec, bool is_par = true)
{
typedef hpx::parallel::executor_traits<Executor> traits;

hpx::thread::id tid = hpx::this_thread::get_id();

std::vector<int> v(107);
std::iota(std::begin(v), std::end(v), 0);

using hpx::util::placeholders::_1;

std::vector<hpx::future<int> > results = traits::async_execute
(
exec, hpx::util::bind(&bulk_test, tid, _1, is_par), v
);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v),
[](hpx::future<int>& lhs, const int& rhs)
{
return lhs.get() == rhs;
}));
}

////////////////////////////////////////////////////////////////////////////////
int hpx_main(int argc, char* argv[])
{
using namespace hpx::parallel;

sequential_executor seq_exec;
parallel_executor par_exec;
parallel_fork_executor par_fork_exec;

test_bulk_async(seq_exec, false);
test_bulk_async(par_exec);
test_bulk_async(par_fork_exec);

return hpx::finalize();
}

int main(int argc, char* argv[])
{
// By default this test should run on all available cores
std::vector<std::string> cfg;
cfg.push_back("hpx.os_threads=" +
boost::lexical_cast<std::string>(hpx::threads::hardware_concurrency()));

// Initialize and run HPX
HPX_TEST_EQ_MSG(hpx::init(argc, argv, cfg), 0,
"HPX main exited with non-zero status");

return hpx::util::report_errors();
}