Skip to content

Commit

Permalink
Unify handling of list of localities
Browse files Browse the repository at this point in the history
- rule out case where localities->empty()
  • Loading branch information
hkaiser committed Dec 30, 2017
1 parent 6e04210 commit ecf6b6f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 48 deletions.
25 changes: 19 additions & 6 deletions hpx/components/containers/container_distribution_policy.hpp
Expand Up @@ -40,8 +40,8 @@ namespace hpx

container_distribution_policy operator()(std::size_t num_partitions) const
{
auto localities = this->base_type::ensure_localities();
return container_distribution_policy(num_partitions, *localities);
return container_distribution_policy(
num_partitions, *ensure_localities());
}

container_distribution_policy operator()(hpx::id_type const& locality) const
Expand Down Expand Up @@ -85,15 +85,29 @@ namespace hpx
///////////////////////////////////////////////////////////////////////
std::size_t get_num_partitions() const
{
auto localities = this->base_type::ensure_localities();
auto localities = ensure_localities();
std::size_t num_parts = (num_partitions_ == std::size_t(-1)) ?
localities->size() : num_partitions_;
return (std::max)(num_parts, std::size_t(1));
}

std::vector<hpx::id_type> const& get_localities() const
std::vector<hpx::id_type> get_localities() const
{
return *this->base_type::ensure_localities();
return *ensure_localities();
}

protected:
std::shared_ptr<std::vector<id_type>> ensure_localities() const
{
// use this locality, if this object was default constructed
auto localities = localities_;
if (!localities)
{
localities = std::make_shared<std::vector<id_type>>();
localities->push_back(hpx::find_here());
}
HPX_ASSERT(!localities->empty());
return localities;
}

private:
Expand All @@ -111,7 +125,6 @@ namespace hpx
num_partitions_(num_partitions)
{}


container_distribution_policy(std::size_t num_partitions,
std::vector<id_type> && localities)
: components::default_distribution_policy(std::move(localities)),
Expand Down
78 changes: 36 additions & 42 deletions hpx/runtime/components/default_distribution_policy.hpp
Expand Up @@ -55,8 +55,7 @@ namespace hpx { namespace components
public:
/// Default-construct a new instance of a \a default_distribution_policy.
/// This policy will represent one locality (the local locality).
default_distribution_policy()
{}
default_distribution_policy() = default;

/// Create a new \a default_distribution policy representing the given
/// set of localities.
Expand All @@ -75,7 +74,7 @@ namespace hpx { namespace components
/// \param locs [in] The list of localities the new instance should
/// represent
default_distribution_policy operator()(
std::vector<id_type> && locs) const
std::vector<id_type>&& locs) const
{
return default_distribution_policy(std::move(locs));
}
Expand Down Expand Up @@ -107,13 +106,15 @@ namespace hpx { namespace components
{
using components::stub_base;

auto localities = ensure_localities();
for (hpx::id_type const& loc: *localities)
if (localities_)
{
if (get_num_items(1, loc) != 0)
for (hpx::id_type const& loc: *localities_)
{
return stub_base<Component>::create_async(
loc, std::forward<Ts>(vs)...);
if (get_num_items(1, loc) != 0)
{
return stub_base<Component>::create_async(
loc, std::forward<Ts>(vs)...);
}
}
}

Expand Down Expand Up @@ -147,19 +148,19 @@ namespace hpx { namespace components
{
using components::stub_base;

auto localities = ensure_localities();
if (localities->size() > 1)
if (localities_ && localities_->size() > 1)
{
// schedule creation of all objects across given localities
std::vector<hpx::future<std::vector<hpx::id_type> > > objs;
objs.reserve(localities->size());
for (hpx::id_type const& loc: *localities)
objs.reserve(localities_->size());
for (hpx::id_type const& loc: *localities_)
{
objs.push_back(stub_base<Component>::bulk_create_async(
loc, get_num_items(count, loc), vs...));
}

// consolidate all results
auto localities = localities_;
return hpx::dataflow(hpx::launch::sync,
[localities](
std::vector<hpx::future<std::vector<hpx::id_type> > > && v
Expand Down Expand Up @@ -274,15 +275,14 @@ namespace hpx { namespace components
///
std::size_t get_num_localities() const
{
return (std::max)(std::size_t(1), localities_->size());
return !localities_ ? std::size_t(1) : localities_->size();
}

/// Returns the locality which is anticipated to be used for the next
/// async operation
hpx::id_type get_next_target() const
{
return (!localities_ || localities_->empty()) ?
hpx::find_here() : localities_->front();
return !localities_ ? hpx::find_here() : localities_->front();
}

protected:
Expand All @@ -292,20 +292,14 @@ namespace hpx { namespace components
{
// make sure the given id is known to this distribution policy
HPX_ASSERT(
!localities_ ||
(localities_->empty() && loc == hpx::find_here()) ||
localities_ &&
std::find(localities_->begin(), localities_->end(), loc) !=
localities_->end()
);

if (!localities_ || localities_->empty())
{
return std::size_t(1);
}

// this distribution policy places an equal number of items onto
// each locality
std::size_t locs = (std::max)(std::size_t(1), localities_->size());
std::size_t locs = localities_->size();

// the overall number of items to create is smaller than the number
// of localities
Expand All @@ -317,44 +311,44 @@ namespace hpx { namespace components
}

// the last locality might get less items
if (localities_->size() > 1 && loc == localities_->back())
if (locs > 1 && loc == localities_->back())
{
return items - detail::round_to_multiple(items, locs, locs-1);
}

// otherwise just distribute evenly
return (items + locs - 1) / locs;
}

std::shared_ptr<std::vector<id_type>> ensure_localities() const
{
// use this locality, if this object was default constructed
auto localities = localities_;
if (!localities || localities->empty())
{
localities = std::make_shared<std::vector<id_type>>();
localities->push_back(hpx::find_here());
}
return localities;
}
/// \endcond

protected:
/// \cond NOINTERNAL
default_distribution_policy(std::vector<id_type> const& localities)
: localities_(std::make_shared<std::vector<id_type>>(localities))
{}
{
if (localities_->empty())
{
HPX_THROW_EXCEPTION(invalid_status,
"default_distribution_policy::default_distribution_policy",
"unexpectedly empty list of localities");
}
}

default_distribution_policy(std::vector<id_type> && localities)
: localities_(std::make_shared<std::vector<id_type>>(std::move(localities)))
{}

default_distribution_policy(id_type const& locality)
: localities_(std::make_shared<std::vector<id_type>>())
{
localities_->push_back(locality);
if (localities_->empty())
{
HPX_THROW_EXCEPTION(invalid_status,
"default_distribution_policy::default_distribution_policy",
"unexpectedly empty list of localities");
}
}

default_distribution_policy(id_type const& locality)
: localities_(std::make_shared<std::vector<id_type>>(1, locality))
{}

friend class hpx::serialization::access;

template <typename Archive>
Expand Down

0 comments on commit ecf6b6f

Please sign in to comment.