Skip to content

Commit

Permalink
RPE::evaluate_and_process(): pack only if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
peterrum committed May 2, 2023
1 parent 11a5abd commit 8993018
Showing 1 changed file with 159 additions and 80 deletions.
239 changes: 159 additions & 80 deletions include/deal.II/base/mpi_remote_point_evaluation.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,130 @@ namespace Utilities
std::vector<unsigned int> send_ptrs;
};

namespace internal
{
#ifdef DEAL_II_WITH_MPI
/**
* Pack @p data and send it via MPI_Isend.
*/
template <typename T>
std::enable_if_t<Utilities::MPI::is_mpi_type<T> == false, void>
pack_and_isend(T * data,
const unsigned int size,
const unsigned int rank,
const unsigned int tag,
const MPI_Comm comm,
std::vector<std::vector<char>> &buffers,
std::vector<MPI_Request> & requests)
{
requests.emplace_back(MPI_Request());

buffers.emplace_back(
Utilities::pack(std::vector<T>(data, data + size), false));

const int ierr = MPI_Isend(buffers.back().data(),
buffers.back().size(),
MPI_CHAR,
rank,
tag,
comm,
&requests.back());
AssertThrowMPI(ierr);
}



/**
* Above function specialized for data types supported by MPI
* so that one can skip packing.
*/
template <typename T>
std::enable_if_t<Utilities::MPI::is_mpi_type<T> == true, void>
pack_and_isend(T * data,
const unsigned int size,
const unsigned int rank,
const unsigned int tag,
const MPI_Comm comm,
std::vector<std::vector<char>> &buffers,
std::vector<MPI_Request> & requests)
{
(void)buffers;

requests.emplace_back(MPI_Request());

const int ierr = MPI_Isend(data,
size,
Utilities::MPI::mpi_type_id_for_type<T>,
rank,
tag,
comm,
&requests.back());
AssertThrowMPI(ierr);
}


/**
* Receive message, unpack it, and store the result in @p data.
*/
template <typename T>
std::enable_if_t<Utilities::MPI::is_mpi_type<T> == false, void>
recv_and_upack(std::vector<T> & data,
const unsigned int size,
const MPI_Comm comm,
const MPI_Status & status,
std::vector<char> &buffer)
{
int message_length;
int ierr = MPI_Get_count(&status, MPI_CHAR, &message_length);
AssertThrowMPI(ierr);

buffer.resize(message_length);

ierr = MPI_Recv(buffer.data(),
buffer.size(),
MPI_CHAR,
status.MPI_SOURCE,
internal::Tags::remote_point_evaluation,
comm,
MPI_STATUS_IGNORE);
AssertThrowMPI(ierr);

// unpack data
data = Utilities::unpack<std::vector<T>>(buffer, false);

AssertDimension(data.size(), size);
}



/**
* Above function specialized for data types supported by MPI
* so that one can skip unpacking.
*/
template <typename T>
std::enable_if_t<Utilities::MPI::is_mpi_type<T> == true, void>
recv_and_upack(std::vector<T> & data,
const unsigned int size,
const MPI_Comm comm,
const MPI_Status & status,
std::vector<char> &buffer)
{
(void)buffer;

data.resize(size);

const auto ierr = MPI_Recv(data.data(),
data.size(),
Utilities::MPI::mpi_type_id_for_type<T>,
status.MPI_SOURCE,
internal::Tags::remote_point_evaluation,
comm,
MPI_STATUS_IGNORE);
AssertThrowMPI(ierr);
}
#endif
} // namespace internal


template <int dim, int spacedim>
template <typename T>
Expand Down Expand Up @@ -366,8 +490,8 @@ namespace Utilities
}

// send data
std::vector<std::vector<char>> send_buffer;
send_buffer.reserve(send_ranks.size());
std::vector<std::vector<char>> send_buffers_packed;
send_buffers_packed.reserve(send_ranks.size());

std::vector<MPI_Request> send_requests;
send_requests.reserve(send_ranks.size());
Expand All @@ -377,25 +501,18 @@ namespace Utilities
if (send_ranks[i] == my_rank)
continue;

send_requests.emplace_back(MPI_Request());

send_buffer.emplace_back(Utilities::pack(
std::vector<T>(buffer_comm.begin() + send_ptrs[i],
buffer_comm.begin() + send_ptrs[i + 1]),
false));

const int ierr = MPI_Isend(send_buffer.back().data(),
send_buffer.back().size(),
MPI_CHAR,
send_ranks[i],
internal::Tags::remote_point_evaluation,
tria->get_communicator(),
&send_requests.back());
AssertThrowMPI(ierr);
internal::pack_and_isend(buffer_comm.begin() + send_ptrs[i],
send_ptrs[i + 1] - send_ptrs[i],
send_ranks[i],
internal::Tags::remote_point_evaluation,
tria->get_communicator(),
send_buffers_packed,
send_requests);
}

// receive data
std::vector<char> buffer_char;
std::vector<T> recv_buffer;
std::vector<char> recv_buffer_packed;

for (unsigned int i = 0; i < recv_ranks.size(); ++i)
{
Expand All @@ -410,38 +527,23 @@ namespace Utilities
&status);
AssertThrowMPI(ierr);

int message_length;
ierr = MPI_Get_count(&status, MPI_CHAR, &message_length);
AssertThrowMPI(ierr);

buffer_char.resize(message_length);

ierr = MPI_Recv(buffer_char.data(),
buffer_char.size(),
MPI_CHAR,
status.MPI_SOURCE,
internal::Tags::remote_point_evaluation,
tria->get_communicator(),
MPI_STATUS_IGNORE);
AssertThrowMPI(ierr);

// unpack data
const auto buffer =
Utilities::unpack<std::vector<T>>(buffer_char, false);

// write data into output vector
const auto ptr =
std::find(recv_ranks.begin(), recv_ranks.end(), status.MPI_SOURCE);

Assert(ptr != recv_ranks.end(), ExcNotImplemented());

const unsigned int j = std::distance(recv_ranks.begin(), ptr);

AssertDimension(buffer.size(), recv_ptrs[j + 1] - recv_ptrs[j]);
internal::recv_and_upack(recv_buffer,
recv_ptrs[j + 1] - recv_ptrs[j],
tria->get_communicator(),
status,
recv_buffer_packed);

// write data into output vector
for (unsigned int i = recv_ptrs[j], c = 0; i < recv_ptrs[j + 1];
++i, ++c)
output[recv_permutation[i]] = buffer[c];
output[recv_permutation[i]] = recv_buffer[c];
}

// make sure all messages have been sent
Expand Down Expand Up @@ -534,8 +636,8 @@ namespace Utilities
}

// send data
std::vector<std::vector<char>> send_buffer;
send_buffer.reserve(recv_ranks.size());
std::vector<std::vector<char>> send_buffers_packed;
send_buffers_packed.reserve(recv_ranks.size());

std::vector<MPI_Request> send_requests;
send_requests.reserve(recv_ranks.size());
Expand All @@ -545,25 +647,18 @@ namespace Utilities
if (recv_ranks[i] == my_rank)
continue;

send_requests.push_back(MPI_Request());

send_buffer.emplace_back(Utilities::pack(
std::vector<T>(buffer_comm.begin() + recv_ptrs[i],
buffer_comm.begin() + recv_ptrs[i + 1]),
false));

const int ierr = MPI_Isend(send_buffer.back().data(),
send_buffer.back().size(),
MPI_CHAR,
recv_ranks[i],
internal::Tags::remote_point_evaluation,
tria->get_communicator(),
&send_requests.back());
AssertThrowMPI(ierr);
internal::pack_and_isend(buffer_comm.begin() + recv_ptrs[i],
recv_ptrs[i + 1] - recv_ptrs[i],
recv_ranks[i],
internal::Tags::remote_point_evaluation,
tria->get_communicator(),
send_buffers_packed,
send_requests);
}

// receive data
std::vector<char> recv_buffer;
std::vector<T> recv_buffer;
std::vector<char> recv_buffer_packed;

for (unsigned int i = 0; i < send_ranks.size(); ++i)
{
Expand All @@ -578,25 +673,6 @@ namespace Utilities
&status);
AssertThrowMPI(ierr);

int message_length;
ierr = MPI_Get_count(&status, MPI_CHAR, &message_length);
AssertThrowMPI(ierr);

recv_buffer.resize(message_length);

ierr = MPI_Recv(recv_buffer.data(),
recv_buffer.size(),
MPI_CHAR,
status.MPI_SOURCE,
internal::Tags::remote_point_evaluation,
tria->get_communicator(),
MPI_STATUS_IGNORE);
AssertThrowMPI(ierr);

// unpack data
const auto recv_buffer_unpacked =
Utilities::unpack<std::vector<T>>(recv_buffer, false);

// write data into buffer vector
const auto ptr =
std::find(send_ranks.begin(), send_ranks.end(), status.MPI_SOURCE);
Expand All @@ -605,12 +681,15 @@ namespace Utilities

const unsigned int j = std::distance(send_ranks.begin(), ptr);

AssertDimension(recv_buffer_unpacked.size(),
send_ptrs[j + 1] - send_ptrs[j]);
internal::recv_and_upack(recv_buffer,
send_ptrs[j + 1] - send_ptrs[j],
tria->get_communicator(),
status,
recv_buffer_packed);

for (unsigned int i = send_ptrs[j], c = 0; i < send_ptrs[j + 1];
++i, ++c)
buffer_eval[send_permutation_inv[i]] = recv_buffer_unpacked[c];
buffer_eval[send_permutation_inv[i]] = recv_buffer[c];
}

const int ierr = MPI_Waitall(send_requests.size(),
Expand Down

0 comments on commit 8993018

Please sign in to comment.