Skip to content

Commit

Permalink
Cleaning up Parcel awaiting once more
Browse files Browse the repository at this point in the history
 - Removed put_parcels from parcelhandler since we don't really need it
 - Moved parcel awaiting back into parcelport_impl for easier handling
   of archive flags
  • Loading branch information
Thomas Heller committed Aug 30, 2016
1 parent 6a7dda0 commit c593d5c
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 348 deletions.
40 changes: 0 additions & 40 deletions hpx/runtime/parcelset/parcelhandler.hpp
Expand Up @@ -207,46 +207,6 @@ namespace hpx { namespace parcelset
&parcelhandler::invoke_write_handler, this, _1, _2));
}

/// A parcel is submitted for transport at the source locality site to
/// the parcel set of the locality with the put-parcel command
//
/// \note The function \a put_parcel() is asynchronous, the provided
/// function or function object gets invoked on completion of the send
/// operation or on any error.
///
/// \param p [in] The parcels to send.
/// \param f [in] The function objects to be invoked on
/// successful completion or on errors. The signature
/// of these function object are expected to be:
///
/// \code
/// void f (boost::system::error_code const& err, std::size_t );
/// \endcode
///
/// where \a err is the status code of the operation and
/// \a size is the number of successfully
/// transferred bytes.
void put_parcels(std::vector<parcel> p, std::vector<write_handler_type> f);

/// This put_parcel() function overload is asynchronous, but no
/// callback functor is provided by the user.
///
/// \note The function \a put_parcel() is asynchronous.
///
/// \param p [in, out] A reference to the parcel to send. The
/// parcel \a p will be modified in place, as it will
/// get set the resolved destination address and parcel
/// id (if not already set).
void put_parcels(std::vector<parcel> parcels)
{
using util::placeholders::_1;
using util::placeholders::_2;
std::vector<write_handler_type> handlers(parcels.size(),
util::bind(&parcelhandler::invoke_write_handler, this, _1, _2));

put_parcels(std::move(parcels), std::move(handlers));
}

double get_current_time() const
{
return util::high_resolution_timer::now();
Expand Down
58 changes: 33 additions & 25 deletions hpx/runtime/parcelset/parcelport_impl.hpp
Expand Up @@ -14,6 +14,7 @@
#include <hpx/error_code.hpp>
#include <hpx/runtime/get_config_entry.hpp>
#include <hpx/runtime/parcelset/detail/call_for_each.hpp>
#include <hpx/runtime/parcelset/detail/parcel_await.hpp>
#include <hpx/runtime/parcelset/encode_parcels.hpp>
#include <hpx/runtime/parcelset/parcelport.hpp>
#include <hpx/runtime/threads/thread.hpp>
Expand Down Expand Up @@ -197,22 +198,40 @@ namespace hpx { namespace parcelset
}

public:
void put_parcel(locality const & dest, parcel p, write_handler_type f)
struct parcel_await_handler
{
HPX_ASSERT(dest.type() == type());
parcelport_impl& this_;
locality dest_;
write_handler_type f_;

if (!connection_handler_traits<ConnectionHandler>::
use_connection_cache::value)
{
send_parcel_immediate(dest, std::move(p), std::move(f));
}
else
void operator()(parcel p)
{
// enqueue the outgoing parcel ...
enqueue_parcel(dest, std::move(p), std::move(f));
if (!connection_handler_traits<ConnectionHandler>::
use_connection_cache::value)
{
this_.send_parcel_immediate(dest_, std::move(p), std::move(f_));
}
else
{
// enqueue the outgoing parcel ...
this_.enqueue_parcel(dest_, std::move(p), std::move(f_));

get_connection_and_send_parcels(dest);
this_.get_connection_and_send_parcels(dest_);
}
}
};

void put_parcel(locality const & dest, parcel p, write_handler_type f)
{
HPX_ASSERT(dest.type() == type());

typedef
detail::parcel_await<parcel_await_handler>
parcel_await;

std::make_shared<parcel_await>(
std::move(p),
parcel_await_handler{*this, dest, std::move(f)})->apply();
}

void put_parcels(locality const& dest, std::vector<parcel> parcels,
Expand All @@ -234,21 +253,10 @@ namespace hpx { namespace parcelset
parcels[i].destination_locality());
}
#endif
if (!connection_handler_traits<ConnectionHandler>::
use_connection_cache::value)
for (std::size_t i = 0; i != parcels.size(); ++i)
{
for (std::size_t i = 0; i != parcels.size(); ++i)
{
send_parcel_immediate(dest, std::move(parcels[i]),
std::move(handlers[i]));
}
}
else
{
// enqueue the outgoing parcels ...
enqueue_parcels(dest, std::move(parcels), std::move(handlers));

get_connection_and_send_parcels(dest);
put_parcel(dest, std::move(parcels[i]),
std::move(handlers[i]));
}
}

Expand Down
141 changes: 54 additions & 87 deletions hpx/runtime/parcelset/put_parcel.hpp
Expand Up @@ -14,13 +14,10 @@
#include <hpx/runtime/naming/split_gid.hpp>
#include <hpx/runtime/parcelset/parcel.hpp>
#include <hpx/runtime/parcelset/parcelhandler.hpp>
#include <hpx/runtime/serialization/detail/preprocess.hpp>
#include <hpx/runtime/serialization/output_archive.hpp>
#include <hpx/traits/is_action.hpp>
#include <hpx/traits/is_continuation.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/decay.hpp>
#include <hpx/util/protect.hpp>
#include <hpx/util/detail/pack.hpp>

#include <cstddef>
Expand Down Expand Up @@ -105,118 +102,88 @@ namespace hpx { namespace parcelset {
}
};

template <typename PutParcel>
struct parcel_await
: std::enable_shared_from_this<parcel_await<PutParcel>>
{
template <typename PutParcel_, typename... Args>
parcel_await(PutParcel_&& pp,
naming::address&& addr, Args&&... args)
: put_parcel_(std::forward<PutParcel_>(pp)),
p_(
create_parcel::call(
// is the first parameter of args a continuation?
std::integral_constant<bool,
traits::is_continuation<
typename util::detail::at_index<0, Args...>::type
>::value &&
// we need to treat unique pointers to continuations
// differently
!std::is_same<
std::unique_ptr<actions::continuation>,
typename util::detail::at_index<0, Args...>::type
>::value
>(),
// is the second parameter of args a action?
traits::is_action<
typename util::detail::at_index<1, Args...>::type
>(),
naming::gid_type(), std::move(addr),
std::forward<Args>(args)...
)
),
size_(0)
{
}

void apply(naming::gid_type&& gid)
{
p_.set_destination_id(std::move(gid));
(*this)();
}

void operator()()
{
preprocess_.reset();
typedef hpx::serialization::output_archive archive_type;
std::shared_ptr<archive_type> archive(new archive_type(preprocess_));
(*archive) << p_;

// We are doing a fixed point iteration until we are sure that the
// serialization process requires nothing more to wait on ...
// Things where we need waiting:
// - (shared_)future<id_type>: when the future wasn't ready yet, we
// need to do another await round for the id splitting
// - id_type: we need to await, if and only if, the credit of the
// needs to split.
if(preprocess_.has_futures())
{
auto this_ = this->shared_from_this();
preprocess_([this_, archive](){ (*this_)(); });
return;
}
HPX_ASSERT(preprocess_.size() == archive->bytes_written());
p_.size() = preprocess_.size();
p_.set_splitted_gids(std::move(preprocess_.splitted_gids_));
put_parcel_(std::move(p_));
}

typename hpx::util::decay<PutParcel>::type put_parcel_;
parcel p_;
hpx::serialization::detail::preprocess preprocess_;
std::size_t size_;
};

template <typename PutParcel, typename... Args>
void put_parcel_impl(PutParcel&& pp,
naming::id_type dest, naming::address&& addr, Args&&... args)
{
typedef parcel_await<PutParcel> parcel_awaiter_type;
std::shared_ptr<parcel_awaiter_type> parcel_awaiter(
new parcel_awaiter_type(
std::forward<PutParcel>(pp), std::move(addr),
std::forward<Args>(args)...));
typedef
typename util::detail::at_index<0, Args...>::type
arg0_type;
typedef
typename util::detail::at_index<1, Args...>::type
arg1_type;

// Is the first argument a continuation?
std::integral_constant<bool,
traits::is_continuation<arg0_type>::value &&
// We need to tread unique pointers to continuations
// differently
!std::is_same<
std::unique_ptr<actions::continuation>, arg0_type
>::value
> is_continuation;

// Is the second paramter a action?
traits::is_action<arg1_type> is_action;

if (dest.get_management_type() == naming::id_type::unmanaged)
{
naming::gid_type gid = dest.get_gid();
naming::detail::strip_credits_from_gid(gid);
HPX_ASSERT(gid);

parcel_awaiter->apply(std::move(gid));
pp(detail::create_parcel::call(
is_continuation, is_action,
std::move(gid), std::move(addr),
std::forward<Args>(args)...
));
}
else if (dest.get_management_type() == naming::id_type::managed_move_credit)
{
naming::gid_type gid = naming::detail::move_gid(dest.get_gid());
HPX_ASSERT(gid);
parcel_awaiter->apply(std::move(gid));

pp(detail::create_parcel::call(
is_continuation, is_action,
std::move(gid), std::move(addr),
std::forward<Args>(args)...
));
}
else
{
future<naming::gid_type> splitted_gid =
naming::detail::split_gid_if_needed(dest.get_gid());
if (splitted_gid.is_ready())
{
parcel_awaiter->apply(splitted_gid.get());
pp(detail::create_parcel::call(
is_continuation, is_action,
splitted_gid.get(), std::move(addr),
std::forward<Args>(args)...
));
}
else
{
splitted_gid.then(
[dest, parcel_awaiter]
(hpx::future<naming::gid_type> f)
{
parcel_awaiter->apply(f.get());
}
hpx::util::bind(
hpx::util::one_shot(
[is_continuation, is_action, dest]
(hpx::future<naming::gid_type> f,
typename util::decay<PutParcel>::type&& pp_,
naming::address&& addr_,
typename util::decay<Args>::type&&... args_)
{
pp_(detail::create_parcel::call(
is_continuation, is_action,
f.get(), std::move(addr_),
std::move(args_)...
));
}
),
hpx::util::placeholders::_1,
std::forward<PutParcel>(pp), std::move(addr),
std::forward<Args>(args)...
)
);
}
}
Expand Down
1 change: 1 addition & 0 deletions hpx/runtime/serialization/detail/preprocess.hpp
Expand Up @@ -91,6 +91,7 @@ namespace hpx { namespace serialization { namespace detail

void reset()
{
size_ = 0;
done_ = false;
num_futures_ = 0;
triggered_futures_ = 0;
Expand Down

0 comments on commit c593d5c

Please sign in to comment.