Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing stable_partition #2369

Merged
merged 2 commits into from Oct 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
171 changes: 104 additions & 67 deletions hpx/parallel/algorithms/partition.hpp
Expand Up @@ -24,9 +24,12 @@
#include <hpx/parallel/util/invoke_projected.hpp>
#include <hpx/parallel/util/projection_identity.hpp>

#include <boost/exception_ptr.hpp>

#include <algorithm>
#include <cstddef>
#include <iterator>
#include <list>
#include <type_traits>
#include <utility>

Expand All @@ -40,59 +43,76 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
struct stable_partition_helper
{
template <typename ExPolicy, typename RandIter, typename F, typename Proj>
typename util::detail::algorithm_result<ExPolicy, RandIter>::type
hpx::future<RandIter>
operator()(ExPolicy && policy, RandIter first, RandIter last,
std::size_t size, F && f, Proj && proj, std::size_t chunks)
std::size_t size, F f, Proj proj, std::size_t chunks)
{
typedef util::detail::algorithm_result<ExPolicy, RandIter> result;
try {
if (chunks < 2)
{
return result::get(std::stable_partition(
first, last,
util::invoke_projected<F, Proj>(
std::forward<F>(f), std::forward<Proj>(proj)
)));
}
typedef typename hpx::util::decay<ExPolicy>::type::executor_type
executor_type;
typedef typename hpx::parallel::executor_traits<executor_type>
executor_traits;
if (chunks < 2)
{
return executor_traits::async_execute(
policy.executor(),
[first, last, f, proj]() -> RandIter
{
return std::stable_partition(
first, last,
util::invoke_projected<F, Proj>(f, proj));
});
}

std::size_t mid_point = size / 2;
chunks /= 2;
std::size_t mid_point = size / 2;
chunks /= 2;

RandIter mid = first;
std::advance(mid, mid_point);
RandIter mid = first;
std::advance(mid, mid_point);

hpx::future<RandIter> left = hpx::async(
policy.executor(), *this, policy, first, mid, mid_point,
f, proj, chunks);
hpx::future<RandIter> right = hpx::async(
policy.executor(), *this, policy, mid, last, size - mid_point,
std::forward<F>(f), std::forward<Proj>(proj), chunks);
hpx::future<RandIter> left = executor_traits::async_execute(
policy.executor(), *this, policy, first, mid, mid_point,
f, proj, chunks);
hpx::future<RandIter> right = executor_traits::async_execute(
policy.executor(), *this, policy, mid, last, size - mid_point,
f, proj, chunks);

return result::get(
dataflow(
policy.executor(),
[mid](
hpx::future<RandIter> && left,
hpx::future<RandIter> && right
) -> RandIter
return
dataflow(
policy.executor(),
[mid](
hpx::future<RandIter> && left,
hpx::future<RandIter> && right
) -> RandIter
{
if (left.has_exception() || right.has_exception())
{
RandIter first = left.get();
RandIter last = right.get();
std::list<boost::exception_ptr> errors;
if(left.has_exception())
hpx::parallel::util::detail::
handle_local_exceptions<ExPolicy>::call(
left.get_exception_ptr(), errors);
if(right.has_exception())
hpx::parallel::util::detail::
handle_local_exceptions<ExPolicy>::call(
right.get_exception_ptr(), errors);

std::rotate(first, mid, last);
if (!errors.empty())
{
boost::throw_exception(
exception_list(std::move(errors)));
}
}
RandIter first = left.get();
RandIter last = right.get();

// for some library implementations std::rotate
// does not return the new middle point
std::advance(first, std::distance(mid, last));
return first;
},
std::move(left), std::move(right)));
}
catch (...) {
return result::get(detail::handle_exception<
ExPolicy, RandIter
>::call(boost::current_exception()));
}
std::rotate(first, mid, last);

// for some library implementations std::rotate
// does not return the new middle point
std::advance(first, std::distance(mid, last));
return first;
},
std::move(left), std::move(right));
}
};

Expand Down Expand Up @@ -124,37 +144,54 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
parallel(ExPolicy && policy, RandIter first, RandIter last,
F && f, Proj && proj)
{
typedef util::detail::algorithm_result<ExPolicy, RandIter>
algorithm_result;
typedef typename std::iterator_traits<RandIter>::difference_type
difference_type;

difference_type size = std::distance(first, last);
future<RandIter> result;

if (size == 0)
{
return util::detail::algorithm_result<
ExPolicy, RandIter
>::get(std::move(last));
}
try {
difference_type size = std::distance(first, last);

typedef typename hpx::util::decay<ExPolicy>::type::executor_type
executor_type;
typedef typename
hpx::util::decay<ExPolicy>::type::executor_parameters_type
parameters_type;
if (size == 0)
{
result = hpx::make_ready_future(std::move(last));
}

typedef typename hpx::util::decay<ExPolicy>::type::executor_type
executor_type;
typedef typename
hpx::util::decay<ExPolicy>::type::executor_parameters_type
parameters_type;

typedef executor_parameter_traits<parameters_type> traits;
typedef executor_information_traits<executor_type> info_traits;

typedef executor_parameter_traits<parameters_type> traits;
typedef executor_information_traits<executor_type> info_traits;
std::size_t const cores =
info_traits::processing_units_count(policy.executor(),
policy.parameters());
std::size_t max_chunks = traits::maximal_number_of_chunks(
policy.parameters(), policy.executor(), cores, size);

std::size_t const cores =
info_traits::processing_units_count(policy.executor(),
policy.parameters());
std::size_t max_chunks = traits::maximal_number_of_chunks(
policy.parameters(), policy.executor(), cores, size);
result = stable_partition_helper()(
std::forward<ExPolicy>(policy), first, last, size,
std::forward<F>(f), std::forward<Proj>(proj),
size == 1 ? 1 : (std::min)(std::size_t(size), max_chunks));
}
catch (...) {
result = hpx::make_exceptional_future<RandIter>(
boost::current_exception());
}

if (result.has_exception())
{
return algorithm_result::get(
detail::handle_exception<ExPolicy, RandIter>::call(
std::move(result)));
}

return stable_partition_helper()(
std::forward<ExPolicy>(policy), first, last, size,
std::forward<F>(f), std::forward<Proj>(proj),
size == 1 ? 1 : (std::min)(std::size_t(size), max_chunks));
return algorithm_result::get(std::move(result));
}
};
/// \endcond
Expand Down
5 changes: 2 additions & 3 deletions src/exception_list.cpp
Expand Up @@ -67,7 +67,7 @@ namespace hpx
{}

exception_list::exception_list(exception_list const& l)
: hpx::exception(l.size() ? hpx::get_error(l.exceptions_.front()) : success)
: hpx::exception(static_cast<hpx::exception const&>(l))
, exceptions_(l.exceptions_)
{}

Expand All @@ -80,8 +80,7 @@ namespace hpx
{
if (this != &l)
{
*static_cast<hpx::exception*>(this) = hpx::exception(
l.size() ? hpx::get_error(l.exceptions_.front()) : success);
*static_cast<hpx::exception*>(this) = static_cast<hpx::exception const&>(l);
exceptions_ = l.exceptions_;
}
return *this;
Expand Down