Skip to content

Commit

Permalink
Merge #6249
Browse files Browse the repository at this point in the history
6249: Implement the send immediate optimization for the MPI parcelport. r=hkaiser a=JiakunYan

Users can use the option `hpx.parcel.mpi.sendimm` to enable and disable it.

When HPX wants to send a parcel, the default behavior is to enqueue this parcel into the parcel queue of the target locality, dequeue all the parcels from that parcel queue, aggregate them, and pass them in one `parcel_buffer` structure. It will also try to get an existing connection from the connection cache. These two data structures are protected by locks and can be a source of thread contention.

With the send immediate optimization, the parcelport will bypass the connection cache and parcel queues and directly send the parcel.

This option is mainly for research purposes. The default is off since it will slow down the MPI parcelport in most cases.

Co-authored-by: Jiakun Yan <jiakunyan1998@gmail.com>
  • Loading branch information
StellarBot and JiakunYan committed May 27, 2023
2 parents d854c09 + 203a832 commit f1f7078
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ namespace hpx::traits {
"${HPX_HAVE_PARCELPORT_LCI_MAX_CONNECTIONS:8192}\n"
"log_level = none\n"
"log_outfile = stderr\n"
"sendimm = 1\n"
"backlog_queue = 0\n"
"use_two_device = 0\n"
"prg_thread_core = -1\n"
Expand Down
21 changes: 20 additions & 1 deletion libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <memory>
#include <mutex>
#include <utility>
#include <vector>

namespace hpx::parcelset::policies::mpi {

Expand Down Expand Up @@ -75,7 +76,9 @@ namespace hpx::parcelset::policies::mpi {
postprocess_handler;
std::swap(
postprocess_handler, connection->postprocess_handler_);
postprocess_handler(ec, connection->destination(), connection);
if (postprocess_handler)
postprocess_handler(
ec, connection->destination(), connection);
}
else
{
Expand Down Expand Up @@ -106,6 +109,22 @@ namespace hpx::parcelset::policies::mpi {
return has_work;
}

using buffer_type = std::vector<char>;
using chunk_type = serialization::serialization_chunk;
using parcel_buffer_type = parcel_buffer<buffer_type, chunk_type>;
using callback_fn_type =
hpx::move_only_function<void(error_code const&)>;
bool send_immediate(parcelset::parcelport* pp,
parcelset::locality const& dest, parcel_buffer_type buffer,
callback_fn_type&& callbackFn)
{
int dest_rank = dest.get<locality>().rank();
auto connection = create_connection(dest_rank, pp);
connection->buffer_ = HPX_MOVE(buffer);
connection->async_write(HPX_MOVE(callbackFn), nullptr);
return true;
}

private:
tag_provider tag_provider_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ namespace hpx::parcelset::policies::mpi {
{
}

template <typename Handler, typename ParcelPostprocess>
using handler_type = hpx::move_only_function<void(error_code const&)>;
using post_handler_type = hpx::move_only_function<void(
error_code const&, parcelset::locality const&,
std::shared_ptr<sender_connection>)>;
void async_write(
Handler&& handler, ParcelPostprocess&& parcel_postprocess)
handler_type&& handler, post_handler_type&& parcel_postprocess)
{
HPX_ASSERT(!handler_);
HPX_ASSERT(!postprocess_handler_);
HPX_ASSERT(!buffer_.data_.empty());

#if defined(HPX_HAVE_PARCELPORT_COUNTERS)
Expand All @@ -118,7 +120,8 @@ namespace hpx::parcelset::policies::mpi {
{
HPX_ASSERT(!handler_);
error_code ec;
parcel_postprocess(ec, there_, shared_from_this());
if (parcel_postprocess)
parcel_postprocess(ec, there_, shared_from_this());
}
}

Expand Down Expand Up @@ -307,12 +310,7 @@ namespace hpx::parcelset::policies::mpi {
int tag_;
int dst_;

using handler_type = hpx::move_only_function<void(error_code const&)>;
handler_type handler_;

using post_handler_type = hpx::move_only_function<void(
error_code const&, parcelset::locality const&,
std::shared_ptr<sender_connection>)>;
post_handler_type postprocess_handler_;

header header_;
Expand Down
36 changes: 33 additions & 3 deletions libs/full/parcelport_mpi/src/parcelport_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ namespace hpx::parcelset {
using connection_type = policies::mpi::sender_connection;
using send_early_parcel = std::true_type;
using do_background_work = std::true_type;
using send_immediate_parcels = std::false_type;
using is_connectionless = std::false_type;
using send_immediate_parcels = std::true_type;
using is_connectionless = std::true_type;

static constexpr const char* type() noexcept
{
Expand Down Expand Up @@ -127,14 +127,27 @@ namespace hpx::parcelset {
return false;
}

static bool enable_send_immediate(
util::runtime_configuration const& ini)
{
if (hpx::util::get_entry_as<std::size_t>(
ini, "hpx.parcel.mpi.sendimm", 0) != 0)
{
return true;
}
return false;
}

public:
using sender_type = sender;
parcelport(util::runtime_configuration const& ini,
threads::policies::callback_notifier const& notifier)
: base_type(ini, here(), notifier)
, stopped_(false)
, receiver_(*this)
, background_threads_(background_threads(ini))
, multi_threaded_mpi_(multi_threaded_mpi(ini))
, enable_send_immediate_(enable_send_immediate(ini))
{
}

Expand Down Expand Up @@ -232,6 +245,21 @@ namespace hpx::parcelset {
return has_work;
}

bool can_send_immediate()
{
return enable_send_immediate_;
}

bool send_immediate(parcelset::parcelport* pp,
parcelset::locality const& dest,
sender::parcel_buffer_type buffer,
sender::callback_fn_type&& callbackFn)
{
return sender_.send_immediate(pp, dest,
HPX_FORWARD(sender_base::parcel_buffer_type, buffer),
HPX_FORWARD(sender_base::callback_fn_type, callbackFn));
}

template <typename F>
bool reschedule_on_thread(F&& f,
threads::thread_schedule_state state, char const* funcname)
Expand Down Expand Up @@ -298,6 +326,7 @@ namespace hpx::parcelset {

std::size_t background_threads_;
bool multi_threaded_mpi_;
bool enable_send_immediate_;
};
} // namespace policies::mpi
} // namespace hpx::parcelset
Expand Down Expand Up @@ -359,7 +388,8 @@ struct hpx::traits::plugin_config_data<

// number of cores that do background work, default: all
"background_threads = "
"${HPX_HAVE_PARCELPORT_MPI_BACKGROUND_THREADS:-1}\n";
"${HPX_HAVE_PARCELPORT_MPI_BACKGROUND_THREADS:-1}\n"
"sendimm = 0\n";
}
}; // namespace hpx::traits

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ namespace hpx::plugins {
fillini.emplace_back("priority = ${HPX_PARCEL_" + name_uc +
"_PRIORITY:" +
traits::plugin_config_data<Parcelport>::priority() + "}");
fillini.emplace_back(
"sendimm = ${HPX_PARCEL_" + name_uc + "_SENDIMM:1}");

// get the parcelport specific information ...
char const* more = traits::plugin_config_data<Parcelport>::call();
Expand Down

0 comments on commit f1f7078

Please sign in to comment.