Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding hpx support #217

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ option(SHAD_ENABLE_PERFORMANCE_TEST "Enable the compilation of the Performance T

set(
SHAD_RUNTIME_SYSTEM "CPP_SIMPLE" CACHE STRING
"(Default) Runtime system to be used as backend of the Abstract Runtime API (Default=CPP_SIMPLE, Supported=CPP_SIMPLE | TBB | GMT)")
"(Default) Runtime system to be used as backend of the Abstract Runtime API (Default=CPP_SIMPLE, Supported=CPP_SIMPLE | TBB | GMT | HPX)")


include(config)
Expand Down
6 changes: 6 additions & 0 deletions cmake/config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ elseif (SHAD_RUNTIME_SYSTEM STREQUAL "GMT")
include_directories(${GMT_INCLUDE_DIR})
set(HAVE_GMT 1)
set(SHAD_RUNTIME_LIB ${GMT_LIBRARIES})
elseif (SHAD_RUNTIME_SYSTEM STREQUAL "HPX")
message(STATUS "Using HPX as backend of the Abstract Runtime API.")
find_package(HPX REQUIRED)
include_directories(${HPX_INCLUDE_DIR})
set(HAVE_HPX 1)
set(SHAD_RUNTIME_LIB HPX::hpx)
else()
message(FATAL_ERROR "${SHAD_RUNTIME_SYSTEM} is not a supported runtime system.")
endif()
Expand Down
84 changes: 84 additions & 0 deletions include/shad/core/impl/impl_patterns.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,21 @@ S distributed_folding_map(ForwardIt first, ForwardIt last, MapF&& map_kernel,
using itr_traits = distributed_iterator_traits<ForwardIt>;
auto localities = itr_traits::localities(first, last);
auto res = init_sol;
#ifdef HAVE_HPX
for (auto locality = localities.begin(), end = localities.end();
locality != end; ++locality) {
auto d_args = std::make_tuple(shad::rt::lambda_wrapper<std::decay_t<MapF>>(
std::forward<MapF>(map_kernel)),
first, last, res, args...);
rt::executeAtWithRet(
locality,
[](const typeof(d_args)& d_args, S* result) {
*result = apply_from<1>(::std::get<0>(d_args),
::std::forward<typeof(d_args)>(d_args));
},
d_args, &res);
}
#else
for (auto locality = localities.begin(), end = localities.end();
locality != end; ++locality) {
auto d_args = std::make_tuple(map_kernel, first, last, res, args...);
Expand All @@ -103,6 +118,7 @@ S distributed_folding_map(ForwardIt first, ForwardIt last, MapF&& map_kernel,
},
d_args, &res);
}
#endif
return res;
}

Expand All @@ -112,6 +128,21 @@ void distributed_folding_map_void(ForwardIt first, ForwardIt last,
MapF&& map_kernel, Args&&... args) {
using itr_traits = distributed_iterator_traits<ForwardIt>;
auto localities = itr_traits::localities(first, last);
#ifdef HAVE_HPX
for (auto locality = localities.begin(), end = localities.end();
locality != end; ++locality) {
auto d_args = std::make_tuple(shad::rt::lambda_wrapper<std::decay_t<MapF>>(
std::forward<MapF>(map_kernel)),
first, last, args...);
rt::executeAt(
locality,
[](const typeof(d_args)& d_args) {
apply_from<1>(::std::get<0>(d_args),
::std::forward<typeof(d_args)>(d_args));
},
d_args);
}
#else
for (auto locality = localities.begin(), end = localities.end();
locality != end; ++locality) {
auto d_args = std::make_tuple(map_kernel, first, last, args...);
Expand All @@ -123,6 +154,7 @@ void distributed_folding_map_void(ForwardIt first, ForwardIt last,
},
d_args);
}
#endif
}

// distributed_folding_map variant testing for early termination
Expand All @@ -134,6 +166,22 @@ S distributed_folding_map_early_termination(ForwardIt first, ForwardIt last,
using itr_traits = distributed_iterator_traits<ForwardIt>;
auto localities = itr_traits::localities(first, last);
auto res = init_sol;
#ifdef HAVE_HPX
for (auto locality = localities.begin(), end = localities.end();
locality != end; ++locality) {
auto d_args = std::make_tuple(shad::rt::lambda_wrapper<std::decay_t<MapF>>(
std::forward<MapF>(map_kernel)),
first, last, res, args...);
rt::executeAtWithRet(
locality,
[](const typeof(d_args)& d_args, S* result) {
*result = apply_from<1>(::std::get<0>(d_args),
::std::forward<typeof(d_args)>(d_args));
},
d_args, &res);
if (halt(res)) return res;
}
#else
for (auto locality = localities.begin(), end = localities.end();
locality != end; ++locality) {
auto d_args = std::make_tuple(map_kernel, first, last, res, args...);
Expand All @@ -146,6 +194,8 @@ S distributed_folding_map_early_termination(ForwardIt first, ForwardIt last,
d_args, &res);
if (halt(res)) return res;
}
#endif

return res;
}

Expand Down Expand Up @@ -205,7 +255,13 @@ distributed_map_init(
auto localities = itr_traits::localities(first, last);
size_t i = 0;
rt::Handle h;
#ifdef HAVE_HPX
auto d_args = std::make_tuple(shad::rt::lambda_wrapper<std::decay_t<MapF>>(
std::forward<MapF>(map_kernel)),
first, last, args...);
#else
auto d_args = std::make_tuple(map_kernel, first, last, args...);
#endif
optional_vector<mapped_t> opt_res(localities.size(), init);
for (auto locality = localities.begin(), end = localities.end();
locality != end; ++locality, ++i) {
Expand Down Expand Up @@ -255,7 +311,13 @@ void distributed_map_void(ForwardIt first, ForwardIt last, MapF&& map_kernel,
auto localities = itr_traits::localities(first, last);
size_t i = 0;
rt::Handle h;
#ifdef HAVE_HPX
auto d_args = std::make_tuple(shad::rt::lambda_wrapper<std::decay_t<MapF>>(
std::forward<MapF>(map_kernel)),
first, last, args...);
#else
auto d_args = std::make_tuple(map_kernel, first, last, args...);
#endif
for (auto locality = localities.begin(), end = localities.end();
locality != end; ++locality, ++i) {
rt::asyncExecuteAt(
Expand Down Expand Up @@ -301,7 +363,15 @@ local_map_init(
std::vector<mapped_t> map_res(parts.size(), init);

if (parts.size()) {
#ifdef HAVE_HPX
auto map_args =
std::make_tuple(parts.data(),
shad::rt::lambda_wrapper<std::decay_t<MapF>>(
std::forward<MapF>(map_kernel)),
map_res.data());
#else
auto map_args = std::make_tuple(parts.data(), map_kernel, map_res.data());
#endif
shad::rt::forEachAt(
rt::thisLocality(),
[](const typeof(map_args)& map_args, size_t iter) {
Expand Down Expand Up @@ -339,7 +409,13 @@ void local_map_void(ForwardIt first, ForwardIt last, MapF&& map_kernel) {
first, last, rt::impl::getConcurrency());

if (parts.size()) {
#ifdef HAVE_HPX
auto map_args = std::make_tuple(
parts.data(), shad::rt::lambda_wrapper<std::decay_t<MapF>>(
std::forward<MapF>(map_kernel)));
#else
auto map_args = std::make_tuple(parts.data(), map_kernel);
#endif
shad::rt::forEachAt(
rt::thisLocality(),
[](const typeof(map_args)& map_args, size_t iter) {
Expand All @@ -361,7 +437,15 @@ void local_map_void_offset(ForwardIt first, ForwardIt last, MapF&& map_kernel) {
first, last, rt::impl::getConcurrency());

if (parts.size()) {
#ifdef HAVE_HPX
auto map_args =
std::make_tuple(parts.data(),
shad::rt::lambda_wrapper<std::decay_t<MapF>>(
std::forward<MapF>(map_kernel)),
first);
#else
auto map_args = std::make_tuple(parts.data(), map_kernel, first);
#endif
shad::rt::forEachAt(
rt::thisLocality(),
[](const typeof(map_args)& map_args, size_t iter) {
Expand Down
85 changes: 83 additions & 2 deletions include/shad/core/impl/modifyng_sequence_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,32 @@ template <typename ForwardIt, typename Generator>
void generate(distributed_parallel_tag&& policy, ForwardIt first,
ForwardIt last, Generator generator) {
using itr_traits = distributed_iterator_traits<ForwardIt>;
#ifdef HAVE_HPX
auto new_generator = shad::rt::lambda_wrapper(generator);

// distributed map
distributed_map_void(
// range
first, last,
// kernel
[](ForwardIt first, ForwardIt last,
shad::rt::lambda_wrapper<typeof(new_generator)> new_generator) {
using local_iterator_t = typename itr_traits::local_iterator_type;

// local map
auto lrange = itr_traits::local_range(first, last);

local_map_void(
// range
lrange.begin(), lrange.end(),
// kernel
[&](local_iterator_t b, local_iterator_t e) {
std::generate(b, e, new_generator);
});
},
// map arguments
new_generator);
#else
// distributed map
distributed_map_void(
// range
Expand All @@ -435,13 +460,28 @@ void generate(distributed_parallel_tag&& policy, ForwardIt first,
},
// map arguments
generator);
#endif
}

template <typename ForwardIt, typename Generator>
void generate(distributed_sequential_tag&& policy, ForwardIt first,
ForwardIt last, Generator generator) {
using itr_traits = distributed_iterator_traits<ForwardIt>;

#ifdef HAVE_HPX
auto new_generator = shad::rt::lambda_wrapper(generator);
distributed_folding_map_void(
// range
first, last,
// kernel
[](ForwardIt first, ForwardIt last,
shad::rt::lambda_wrapper<typeof(new_generator)> new_generator) {
// local processing
auto lrange = itr_traits::local_range(first, last);
std::generate(lrange.begin(), lrange.end(), new_generator);
},
// map arguments
new_generator);
#else
distributed_folding_map_void(
// range
first, last,
Expand All @@ -453,6 +493,7 @@ void generate(distributed_sequential_tag&& policy, ForwardIt first,
},
// map arguments
generator);
#endif
}

template <typename ForwardIt, typename T>
Expand Down Expand Up @@ -507,7 +548,31 @@ template <typename ForwardIt, typename UnaryPredicate, typename T>
void replace_if(distributed_parallel_tag&& policy, ForwardIt first,
ForwardIt last, UnaryPredicate p, const T& new_value) {
using itr_traits = distributed_iterator_traits<ForwardIt>;
#ifdef HAVE_HPX
auto new_p = shad::rt::lambda_wrapper(p);
// distributed map
distributed_map_void(
// range
first, last,
// kernel
[](ForwardIt first, ForwardIt last,
shad::rt::lambda_wrapper<typeof(new_p)> new_p, const T& new_value) {
using local_iterator_t = typename itr_traits::local_iterator_type;

// local map
auto lrange = itr_traits::local_range(first, last);

local_map_void(
// range
lrange.begin(), lrange.end(),
// kernel
[&](local_iterator_t b, local_iterator_t e) {
std::replace_if(b, e, new_p, new_value);
});
},
// map arguments
new_p, new_value);
#else
// distributed map
distributed_map_void(
// range
Expand All @@ -530,13 +595,28 @@ void replace_if(distributed_parallel_tag&& policy, ForwardIt first,
},
// map arguments
p, new_value);
#endif
}

template <typename ForwardIt, typename UnaryPredicate, typename T>
void replace_if(distributed_sequential_tag&& policy, ForwardIt first,
ForwardIt last, UnaryPredicate p, const T& new_value) {
using itr_traits = distributed_iterator_traits<ForwardIt>;

#ifdef HAVE_HPX
auto new_p = shad::rt::lambda_wrapper(p);
distributed_folding_map_void(
// range
first, last,
// kernel
[](ForwardIt first, ForwardIt last,
shad::rt::lambda_wrapper<typeof(new_p)> new_p, const T& new_value) {
// local processing
auto lrange = itr_traits::local_range(first, last);
std::replace_if(lrange.begin(), lrange.end(), new_p, new_value);
},
// map arguments
new_p, new_value);
#else
distributed_folding_map_void(
// range
first, last,
Expand All @@ -549,6 +629,7 @@ void replace_if(distributed_sequential_tag&& policy, ForwardIt first,
},
// map arguments
p, new_value);
#endif
}

} // namespace impl
Expand Down
21 changes: 20 additions & 1 deletion include/shad/core/impl/numeric_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,25 @@ void iota(ForwardIterator first, ForwardIterator last, const T& value) {
template <class InputIt, class T, class BinaryOperation>
T accumulate(InputIt first, InputIt last, T init, BinaryOperation op) {
using itr_traits = distributed_iterator_traits<InputIt>;

#ifdef HAVE_HPX
auto new_op = shad::rt::lambda_wrapper(op);
return distributed_folding_map(
// range
first, last,
// kernel
[](InputIt first, InputIt last, T res,
shad::rt::lambda_wrapper<typeof(new_op)> new_op) {
// local processing
auto lrange = itr_traits::local_range(first, last);
res = std::accumulate(lrange.begin(), lrange.end(), res, new_op);
// update the partial solution
return res;
},
// initial solution
init,
// map arguments
new_op);
#else
return distributed_folding_map(
// range
first, last,
Expand All @@ -79,6 +97,7 @@ T accumulate(InputIt first, InputIt last, T init, BinaryOperation op) {
init,
// map arguments
op);
#endif
}

template <class InputIt1, class InputIt2, class T>
Expand Down