Skip to content
Permalink
Browse files

Merge pull request #4119 from STEllAR-GROUP/fixing_4118

Making sure chunk_size and max_chunk are actually applied to parallel algorithms if specified
  • Loading branch information...
hkaiser committed Oct 5, 2019
2 parents 866b178 + 6f03150 commit 336b57dca01ef5c5f988b4b2fcc225ffda14c789
@@ -30,6 +30,7 @@
#include <hpx/parallel/tagspec.hpp>
#include <hpx/parallel/traits/projected.hpp>
#include <hpx/parallel/util/detail/algorithm_result.hpp>
#include <hpx/parallel/util/detail/chunk_size.hpp>
#include <hpx/parallel/util/detail/handle_local_exceptions.hpp>
#include <hpx/parallel/util/invoke_projected.hpp>
#include <hpx/parallel/util/loop.hpp>
@@ -150,10 +151,10 @@ namespace hpx { namespace parallel { inline namespace v1 {
parallel(ExPolicy&& policy, RandIter first, RandIter last, F&& f,
Proj&& proj)
{
typedef util::detail::algorithm_result<ExPolicy, RandIter>
algorithm_result;
typedef typename std::iterator_traits<RandIter>::difference_type
difference_type;
using algorithm_result =
util::detail::algorithm_result<ExPolicy, RandIter>;
using difference_type =
typename std::iterator_traits<RandIter>::difference_type;

future<RandIter> result;

@@ -167,23 +168,29 @@ namespace hpx { namespace parallel { inline namespace v1 {
}
else
{
typedef typename hpx::util::decay<ExPolicy>::type::
executor_parameters_type parameters_type;
using parameters_type = typename hpx::util::decay<
ExPolicy>::type::executor_parameters_type;

std::size_t const cores =
execution::processing_units_count(
policy.executor(), policy.parameters());

std::size_t chunk_size = execution::get_chunk_size(
policy.parameters(), policy.executor(),
[] { return 0; }, cores, size);

std::size_t max_chunks =
execution::maximal_number_of_chunks(
policy.parameters(), policy.executor(), cores,
size);

util::detail::adjust_chunk_size_and_max_chunks(
cores, size, chunk_size, max_chunks);

result = stable_partition_helper()(
std::forward<ExPolicy>(policy), first, last, size,
std::forward<F>(f), std::forward<Proj>(proj),
size == 1 ?
1 :
(std::min)(std::size_t(size), max_chunks));
max_chunks);
}
}
catch (...)
@@ -288,10 +295,9 @@ namespace hpx { namespace parallel { inline namespace v1 {
(hpx::traits::is_bidirectional_iterator<BidirIter>::value),
"Requires at least bidirectional iterator.");

typedef std::integral_constant<bool,
using is_seq = std::integral_constant<bool,
execution::is_sequenced_execution_policy<ExPolicy>::value ||
!hpx::traits::is_random_access_iterator<BidirIter>::value>
is_seq;
!hpx::traits::is_random_access_iterator<BidirIter>::value>;

return detail::stable_partition<BidirIter>().call(
std::forward<ExPolicy>(policy), is_seq(), first, last,
@@ -817,13 +823,13 @@ namespace hpx { namespace parallel { inline namespace v1 {

if (remaining_blocks.front().block_no < 0)
{
// when blocks are placed in leftside of boundary.
// when blocks are placed in left side of boundary.
return merge_leftside_remaining_blocks(
remaining_blocks, boundary, first);
}
else
{
// when blocks are placed in rightside of boundary.
// when blocks are placed in right side of boundary.
FwdIter boundary_end = boundary;
for (auto& block : remaining_blocks)
{
@@ -846,8 +852,8 @@ namespace hpx { namespace parallel { inline namespace v1 {
static FwdIter call(ExPolicy&& policy, FwdIter first, FwdIter last,
Pred&& pred, Proj&& proj)
{
typedef typename hpx::util::decay<
ExPolicy>::type::executor_parameters_type parameters_type;
using parameters_type = typename hpx::util::decay<
ExPolicy>::type::executor_parameters_type;

if (first == last)
return first;
@@ -967,8 +973,8 @@ namespace hpx { namespace parallel { inline namespace v1 {
parallel(ExPolicy&& policy, FwdIter first, FwdIter last,
Pred&& pred, Proj&& proj)
{
typedef util::detail::algorithm_result<ExPolicy, FwdIter>
algorithm_result;
using algorithm_result =
util::detail::algorithm_result<ExPolicy, FwdIter>;

try
{
@@ -1064,7 +1070,7 @@ namespace hpx { namespace parallel { inline namespace v1 {
static_assert((hpx::traits::is_forward_iterator<FwdIter>::value),
"Required at least forward iterator.");

typedef execution::is_sequenced_execution_policy<ExPolicy> is_seq;
using is_seq = execution::is_sequenced_execution_policy<ExPolicy>;

return detail::partition<FwdIter>().call(std::forward<ExPolicy>(policy),
is_seq(), first, last, std::forward<Pred>(pred),
@@ -1125,14 +1131,13 @@ namespace hpx { namespace parallel { inline namespace v1 {
FwdIter2 dest_true, FwdIter3 dest_false, Pred&& pred,
Proj&& proj)
{
typedef hpx::util::zip_iterator<FwdIter1, bool*> zip_iterator;
typedef util::detail::algorithm_result<ExPolicy,
hpx::util::tuple<FwdIter1, FwdIter2, FwdIter3>>
result;
typedef typename std::iterator_traits<FwdIter1>::difference_type
difference_type;
typedef std::pair<std::size_t, std::size_t>
output_iterator_offset;
using zip_iterator = hpx::util::zip_iterator<FwdIter1, bool*>;
using result = util::detail::algorithm_result<ExPolicy,
hpx::util::tuple<FwdIter1, FwdIter2, FwdIter3>>;
using difference_type =
typename std::iterator_traits<FwdIter1>::difference_type;
using output_iterator_offset =
std::pair<std::size_t, std::size_t>;

if (first == last)
return result::get(
@@ -1145,10 +1150,9 @@ namespace hpx { namespace parallel { inline namespace v1 {

using hpx::util::get;
using hpx::util::make_zip_iterator;
typedef util::scan_partitioner<ExPolicy,
using scan_partitioner_type = util::scan_partitioner<ExPolicy,
hpx::util::tuple<FwdIter1, FwdIter2, FwdIter3>,
output_iterator_offset>
scan_partitioner_type;
output_iterator_offset>;

auto f1 = [HPX_CAPTURE_FORWARD(pred),
HPX_CAPTURE_FORWARD(proj)](
@@ -1345,8 +1349,8 @@ namespace hpx { namespace parallel { inline namespace v1 {
static_assert((hpx::traits::is_forward_iterator<FwdIter3>::value),
"Requires at least forward iterator.");

typedef execution::is_sequenced_execution_policy<ExPolicy> is_seq;
typedef hpx::util::tuple<FwdIter1, FwdIter2, FwdIter3> result_type;
using is_seq = execution::is_sequenced_execution_policy<ExPolicy>;
using result_type = hpx::util::tuple<FwdIter1, FwdIter2, FwdIter3>;

return hpx::util::make_tagged_tuple<tag::in, tag::out1, tag::out2>(
detail::partition_copy<result_type>().call(
@@ -28,6 +28,7 @@
#include <hpx/parallel/traits/projected.hpp>
#include <hpx/parallel/util/compare_projected.hpp>
#include <hpx/parallel/util/detail/algorithm_result.hpp>
#include <hpx/parallel/util/detail/chunk_size.hpp>
#include <hpx/parallel/util/projection_identity.hpp>

#include <algorithm>
@@ -114,8 +115,8 @@ namespace hpx { namespace parallel { inline namespace v1 {

std::iter_swap(first, it_b);

typedef
typename std::iterator_traits<RandomIt>::reference reference;
using reference =
typename std::iterator_traits<RandomIt>::reference;

reference val = *first;
RandomIt c_first = first + 2, c_last = last - 2;
@@ -184,19 +185,13 @@ namespace hpx { namespace parallel { inline namespace v1 {

std::size_t max_chunks = execution::maximal_number_of_chunks(
policy.parameters(), policy.executor(), cores, count);
HPX_ASSERT(0 != max_chunks);

std::size_t chunk_size = execution::get_chunk_size(
policy.parameters(), policy.executor(), [] { return 0; }, cores,
count);

// we should not consider more chunks than we have elements
max_chunks = (std::min)(max_chunks, count);

// we should not make chunks smaller than what's determined by the
// max chunk size
chunk_size =
(std::max)(chunk_size, (count + max_chunks - 1) / max_chunks);
util::detail::adjust_chunk_size_and_max_chunks(
cores, count, max_chunks, chunk_size);

// we should not get smaller than our sort_limit_per_task
chunk_size = (std::max)(chunk_size, sort_limit_per_task);

0 comments on commit 336b57d

Please sign in to comment.
You can’t perform that action at this time.