Skip to content

Commit

Permalink
Reenabling parcelhandler::put_parcels again
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Heller committed Sep 13, 2016
1 parent af7a408 commit d75ce09
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 28 deletions.
40 changes: 40 additions & 0 deletions hpx/runtime/parcelset/parcelhandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,46 @@ namespace hpx { namespace parcelset
&parcelhandler::invoke_write_handler, this, _1, _2));
}

/// A parcel is submitted for transport at the source locality site to
/// the parcel set of the locality with the put-parcel command
//
/// \note The function \a put_parcel() is asynchronous, the provided
/// function or function object gets invoked on completion of the send
/// operation or on any error.
///
/// \param p [in] The parcels to send.
/// \param f [in] The function objects to be invoked on
/// successful completion or on errors. The signature
/// of these function object are expected to be:
///
/// \code
/// void f (boost::system::error_code const& err, std::size_t );
/// \endcode
///
/// where \a err is the status code of the operation and
/// \a size is the number of successfully
/// transferred bytes.
void put_parcels(std::vector<parcel> p, std::vector<write_handler_type> f);

/// This put_parcel() function overload is asynchronous, but no
/// callback functor is provided by the user.
///
/// \note The function \a put_parcel() is asynchronous.
///
/// \param p [in, out] A reference to the parcel to send. The
/// parcel \a p will be modified in place, as it will
/// get set the resolved destination address and parcel
/// id (if not already set).
void put_parcels(std::vector<parcel> parcels)
{
using util::placeholders::_1;
using util::placeholders::_2;
std::vector<write_handler_type> handlers(parcels.size(),
util::bind(&parcelhandler::invoke_write_handler, this, _1, _2));

put_parcels(std::move(parcels), std::move(handlers));
}

double get_current_time() const
{
return util::high_resolution_timer::now();
Expand Down
147 changes: 145 additions & 2 deletions src/runtime/parcelset/parcelhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,7 @@ namespace hpx { namespace parcelset
}
}

dest.first->put_parcel(
std::move(dest.second), std::move(p), std::move(wrapped_f));
dest.first->put_parcel(dest.second, std::move(p), std::move(wrapped_f));
return;
}

Expand All @@ -465,6 +464,150 @@ namespace hpx { namespace parcelset
resolver_->route(std::move(p), std::move(wrapped_f));
}

void parcelhandler::put_parcels(std::vector<parcel> parcels,
std::vector<write_handler_type> handlers)
{
HPX_ASSERT(resolver_);

if (parcels.size() != handlers.size())
{
HPX_THROW_EXCEPTION(bad_parameter,
"parcelhandler::put_parcels",
"mismatched number of parcels and handlers");
return;
}

// if this isn't an HPX thread, the stack space check will return false
if (!this_thread::has_sufficient_stack_space() &&
hpx::threads::threadmanager_is(hpx::state::state_running))
{
// naming::gid_type locality = naming::get_locality_from_gid(
// parcels[0].destination());
// if (!resolver_->has_resolved_locality(locality))
{
// reschedule request as an HPX thread to avoid hangs
void (parcelhandler::*put_parcels_ptr) (
std::vector<parcel>, std::vector<write_handler_type>
) = &parcelhandler::put_parcels;

threads::register_thread_nullary(
util::deferred_call(put_parcels_ptr, this,
std::move(parcels), std::move(handlers)),
"parcelhandler::put_parcels", threads::pending, true,
threads::thread_priority_boost, std::size_t(-1),
threads::thread_stacksize_medium);
return;
}
}

// partition parcels depending on whether their destination can be
// resolved locally
std::size_t num_parcels = parcels.size();

std::vector<parcel> resolved_parcels;
resolved_parcels.reserve(num_parcels);
std::vector<write_handler_type> resolved_handlers;
resolved_handlers.reserve(num_parcels);

typedef std::pair<std::shared_ptr<parcelport>, locality>
destination_pair;

destination_pair resolved_dest;

std::vector<parcel> nonresolved_parcels;
nonresolved_parcels.reserve(num_parcels);
std::vector<write_handler_type> nonresolved_handlers;
nonresolved_handlers.reserve(num_parcels);

for (std::size_t i = 0; i != num_parcels; ++i)
{
parcel& p = parcels[i];

// make sure all parcels go to the same locality
if (parcels[0].destination_locality() !=
p.destination_locality())
{
HPX_THROW_EXCEPTION(bad_parameter,
"parcelhandler::put_parcels",
"mismatched destinations, all parcels are expected to "
"target the same locality");
return;
}

// properly initialize parcel
init_parcel(p);

bool resolved_locally = true;
naming::address& addr = p.addr();

if (!addr)
{
resolved_locally = resolver_->resolve_local(
p.destination(), addr);
}

using util::placeholders::_1;
using util::placeholders::_2;
write_handler_type f = util::bind(&detail::parcel_sent_handler,
std::move(handlers[i]), _1, _2);

// If we were able to resolve the address(es) locally we would send
// the parcel directly to the destination.
if (resolved_locally)
{
// dispatch to the message handler which is associated with the
// encapsulated action
destination_pair dest = find_appropriate_destination(
addr.locality_);

if (load_message_handlers_)
{
policies::message_handler* mh = p.get_message_handler(
this, dest.second);

if (mh) {
mh->put_parcel(dest.second, std::move(p), std::move(f));
continue;
}
}

resolved_parcels.push_back(std::move(p));
resolved_handlers.push_back(std::move(f));
if (!resolved_dest.second)
{
resolved_dest = dest;
}
else
{
HPX_ASSERT(resolved_dest == dest);
}
}
else
{
nonresolved_parcels.push_back(std::move(p));
nonresolved_handlers.push_back(std::move(f));
}
}

// handle parcel which have been locally resolved
if (!resolved_parcels.empty())
{
HPX_ASSERT(!!resolved_dest.first && !!resolved_dest.second);
resolved_dest.first->put_parcels(resolved_dest.second,
std::move(resolved_parcels),
std::move(resolved_handlers));
}

// At least one of the addresses is locally unknown, route the
// parcel to the AGAS managing the destination.
for (std::size_t i = 0; i != nonresolved_parcels.size(); ++i)
{
++count_routed_;
resolver_->route(std::move(nonresolved_parcels[i]),
std::move(nonresolved_handlers[i]));
}
}

std::int64_t parcelhandler::get_outgoing_queue_length(bool reset) const
{
std::int64_t parcel_count = 0;
Expand Down
24 changes: 16 additions & 8 deletions tests/unit/parcelset/put_parcels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,20 @@ void test_plain_argument(hpx::id_type const& id)
results.reserve(numparcels_default);

// create parcels
std::vector<hpx::parcelset::parcel> parcels;
for (std::size_t i = 0; i != numparcels_default; ++i)
{
hpx::lcos::promise<hpx::id_type> p;
auto f = p.get_future();
// send parcel
hpx::get_runtime().get_parcel_handler().put_parcel(
parcels.push_back(
generate_parcel<test1_action>(id, p.get_id(), data)
);
results.push_back(std::move(f));
}

// send parcels
hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));

// verify all messages got actually sent to the correct locality
hpx::wait_all(results);

Expand All @@ -91,14 +94,14 @@ void test_future_argument(hpx::id_type const& id)
results.reserve(numparcels_default);

// create parcels
std::vector<hpx::parcelset::parcel> parcels;
for (std::size_t i = 0; i != numparcels_default; ++i)
{
hpx::lcos::local::promise<double> p_arg;
hpx::lcos::promise<hpx::id_type> p_cont;
auto f_cont = p_cont.get_future();

// send parcel
hpx::get_runtime().get_parcel_handler().put_parcel(
parcels.push_back(
generate_parcel<test2_action>(id, p_cont.get_id(),
p_arg.get_future())
);
Expand All @@ -107,6 +110,9 @@ void test_future_argument(hpx::id_type const& id)
results.push_back(std::move(f_cont));
}

// send parcels
hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));

// now make the futures ready
for (hpx::lcos::local::promise<double>& arg : args)
{
Expand Down Expand Up @@ -134,24 +140,23 @@ void test_mixed_arguments(hpx::id_type const& id)
results.reserve(numparcels_default);

// create parcels
std::vector<hpx::parcelset::parcel> parcels;
for (std::size_t i = 0; i != numparcels_default; ++i)
{
hpx::lcos::promise<hpx::id_type> p_cont;
auto f_cont = p_cont.get_future();

if (std::rand() % 2)
{
// send parcel
hpx::get_runtime().get_parcel_handler().put_parcel(
parcels.push_back(
generate_parcel<test1_action>(id, p_cont.get_id(), data)
);
}
else
{
hpx::lcos::local::promise<double> p_arg;

// send parcel
hpx::get_runtime().get_parcel_handler().put_parcel(
parcels.push_back(
generate_parcel<test2_action>(id, p_cont.get_id(),
p_arg.get_future())
);
Expand All @@ -162,6 +167,9 @@ void test_mixed_arguments(hpx::id_type const& id)
results.push_back(std::move(f_cont));
}

// send parcels
hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));

// now make the futures ready
for (hpx::lcos::local::promise<double>& arg : args)
{
Expand Down
29 changes: 19 additions & 10 deletions tests/unit/parcelset/put_parcels_with_coalescing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,22 @@ void test_plain_argument(hpx::id_type const& id)
hpx::components::client<test_server> c = hpx::new_<test_server>(id);

// create parcels
std::vector<hpx::parcelset::parcel> parcels;
for (std::size_t i = 0; i != numparcels_default; ++i)
{
hpx::lcos::promise<hpx::id_type> p;
auto f = p.get_future();

// send parcel
hpx::get_runtime().get_parcel_handler().put_parcel(
generate_parcel<test1_action>(c.get_id(), p.get_id(), data));
parcels.push_back(
generate_parcel<test1_action>(c.get_id(), p.get_id(), data)
);

results.push_back(std::move(f));
}

// send parcels
hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));

// verify all messages got actually sent to the correct locality
hpx::wait_all(results);

Expand Down Expand Up @@ -116,21 +120,24 @@ void test_future_argument(hpx::id_type const& id)
results.reserve(numparcels_default);

// create parcels
std::vector<hpx::parcelset::parcel> parcels;
for (std::size_t i = 0; i != numparcels_default; ++i)
{
hpx::lcos::local::promise<double> p_arg;
hpx::lcos::promise<hpx::id_type> p_cont;
auto f_cont = p_cont.get_future();

// send parcel
hpx::get_runtime().get_parcel_handler().put_parcel(
parcels.push_back(
generate_parcel<test2_action>(id, p_cont.get_id(),
p_arg.get_future()));
p_arg.get_future())
);

args.push_back(std::move(p_arg));
results.push_back(std::move(f_cont));
}

// send parcels
hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));

// now make the futures ready
for (hpx::lcos::local::promise<double>& arg : args)
Expand Down Expand Up @@ -161,24 +168,23 @@ void test_mixed_arguments(hpx::id_type const& id)
hpx::components::client<test_server> c = hpx::new_<test_server>(id);

// create parcels
std::vector<hpx::parcelset::parcel> parcels;
for (std::size_t i = 0; i != numparcels_default; ++i)
{
hpx::lcos::promise<hpx::id_type> p_cont;
auto f_cont = p_cont.get_future();

if (std::rand() % 2)
{
// send parcel
hpx::get_runtime().get_parcel_handler().put_parcel(
parcels.push_back(
generate_parcel<test1_action>(c.get_id(), p_cont.get_id(), data)
);
}
else
{
hpx::lcos::local::promise<double> p_arg;

// send parcel
hpx::get_runtime().get_parcel_handler().put_parcel(
parcels.push_back(
generate_parcel<test2_action>(id, p_cont.get_id(),
p_arg.get_future())
);
Expand All @@ -189,6 +195,9 @@ void test_mixed_arguments(hpx::id_type const& id)
results.push_back(std::move(f_cont));
}

// send parcels
hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));

// now make the futures ready
for (hpx::lcos::local::promise<double>& arg : args)
{
Expand Down
Loading

0 comments on commit d75ce09

Please sign in to comment.