Skip to content

Commit

Permalink
Merge pull request #13147 from bangerth/mpi
Browse files Browse the repository at this point in the history
Add a bit of documentation to MPI::compute_point_to_point_communication_pattern()
  • Loading branch information
drwells committed Jan 6, 2022
2 parents ae00080 + 8940b65 commit 07a2f3d
Showing 1 changed file with 176 additions and 145 deletions.
321 changes: 176 additions & 145 deletions source/base/mpi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,136 +405,155 @@ namespace Utilities
}

# if DEAL_II_MPI_VERSION_GTE(3, 0)
// check if destinations contain duplicates
auto destinations_unique = destinations;
std::sort(destinations_unique.begin(), destinations_unique.end());
destinations_unique.erase(std::unique(destinations_unique.begin(),
destinations_unique.end()),
destinations_unique.end());

if (Utilities::MPI::min<unsigned int>(destinations.size() ==
destinations_unique.size(),
mpi_comm))

// Have a little function that checks if destinations provided
// to the current process are unique:
const bool my_destinations_are_unique = [destinations]() {
std::vector<unsigned int> my_destinations = destinations;
const unsigned int n_destinations = my_destinations.size();
std::sort(my_destinations.begin(), my_destinations.end());
my_destinations.erase(std::unique(my_destinations.begin(),
my_destinations.end()),
my_destinations.end());
return (my_destinations.size() == n_destinations);
}();

// If all processes report that they have unique destinations,
// then we can short-cut the process using a consensus algorithm (which
// is implemented only for the case of unique destinations):
if (Utilities::MPI::min((my_destinations_are_unique ? 1 : 0), mpi_comm) ==
1)
{
ConsensusAlgorithms::AnonymousProcess<char, char> process(
[&]() { return destinations; });
ConsensusAlgorithms::NBX<char, char> consensus_algorithm(process,
mpi_comm);
return consensus_algorithm.run();
}
// If that was not the case, we need to use the remainder of the code
// below, i.e., just fall through the if condition above.
# endif
{


// So we need to run a different algorithm, specifically one that
// requires more memory -- MPI_Reduce_scatter_block will require memory
// proportional to the number of processes involved; that function is
// also only available for MPI 2.2 or later:
# if DEAL_II_MPI_VERSION_GTE(2, 2)
static CollectiveMutex mutex;
CollectiveMutex::ScopedLock lock(mutex, mpi_comm);

static CollectiveMutex mutex;
CollectiveMutex::ScopedLock lock(mutex, mpi_comm);
const int mpi_tag =
internal::Tags::compute_point_to_point_communication_pattern;

const int mpi_tag =
internal::Tags::compute_point_to_point_communication_pattern;
// Calculate the number of messages to send to each process
std::vector<unsigned int> dest_vector(n_procs);
for (const auto &el : destinations)
++dest_vector[el];

// Calculate the number of messages to send to each process
std::vector<unsigned int> dest_vector(n_procs);
for (const auto &el : destinations)
++dest_vector[el];
// Find how many processes will send to this one
// by reducing with sum and then scattering the
// results over all processes
unsigned int n_recv_from;
const int ierr = MPI_Reduce_scatter_block(
dest_vector.data(), &n_recv_from, 1, MPI_UNSIGNED, MPI_SUM, mpi_comm);

// Find how many processes will send to this one
// by reducing with sum and then scattering the
// results over all processes
unsigned int n_recv_from;
const int ierr = MPI_Reduce_scatter_block(
dest_vector.data(), &n_recv_from, 1, MPI_UNSIGNED, MPI_SUM, mpi_comm);
AssertThrowMPI(ierr);

AssertThrowMPI(ierr);
// Send myid to every process in `destinations` vector...
std::vector<MPI_Request> send_requests(destinations.size());
for (const auto &el : destinations)
{
const int ierr =
MPI_Isend(&myid,
1,
MPI_UNSIGNED,
el,
mpi_tag,
mpi_comm,
send_requests.data() + (&el - destinations.data()));
AssertThrowMPI(ierr);
}

// Send myid to every process in `destinations` vector...
std::vector<MPI_Request> send_requests(destinations.size());
for (const auto &el : destinations)
{
const int ierr =
MPI_Isend(&myid,
1,
MPI_UNSIGNED,
el,
mpi_tag,
mpi_comm,
send_requests.data() + (&el - destinations.data()));
AssertThrowMPI(ierr);
}

// Receive `n_recv_from` times from the processes
// who communicate with this one. Store the obtained id's
// in the resulting vector
std::vector<unsigned int> origins(n_recv_from);
for (auto &el : origins)
{
const int ierr = MPI_Recv(&el,
1,
MPI_UNSIGNED,
MPI_ANY_SOURCE,
mpi_tag,
mpi_comm,
MPI_STATUS_IGNORE);
AssertThrowMPI(ierr);
}

// Receive `n_recv_from` times from the processes
// who communicate with this one. Store the obtained id's
// in the resulting vector
std::vector<unsigned int> origins(n_recv_from);
for (auto &el : origins)
{
const int ierr = MPI_Recv(&el,
1,
MPI_UNSIGNED,
MPI_ANY_SOURCE,
mpi_tag,
mpi_comm,
MPI_STATUS_IGNORE);
AssertThrowMPI(ierr);
}
if (destinations.size() > 0)
{
const int ierr = MPI_Waitall(destinations.size(),
send_requests.data(),
MPI_STATUSES_IGNORE);
AssertThrowMPI(ierr);
}

if (destinations.size() > 0)
{
const int ierr = MPI_Waitall(destinations.size(),
send_requests.data(),
MPI_STATUSES_IGNORE);
AssertThrowMPI(ierr);
}
return origins;

return origins;
# else
// let all processors communicate the maximal number of destinations
// they have
const unsigned int max_n_destinations =
Utilities::MPI::max(destinations.size(), mpi_comm);

if (max_n_destinations == 0)
// all processes have nothing to send/receive:
return std::vector<unsigned int>();

// now that we know the number of data packets every processor wants to
// send, set up a buffer with the maximal size and copy our destinations
// in there, padded with -1's
std::vector<unsigned int> my_destinations(
max_n_destinations, numbers::invalid_unsigned_int);
std::copy(destinations.begin(),
destinations.end(),
my_destinations.begin());

// now exchange these (we could communicate less data if we used
// MPI_Allgatherv, but we'd have to communicate my_n_destinations to all
// processors in this case, which is more expensive than the reduction
// operation above in MPI_Allreduce)
std::vector<unsigned int> all_destinations(max_n_destinations *
n_procs);
const int ierr = MPI_Allgather(my_destinations.data(),
max_n_destinations,
MPI_UNSIGNED,
all_destinations.data(),
max_n_destinations,
MPI_UNSIGNED,
mpi_comm);
AssertThrowMPI(ierr);

// now we know who is going to communicate with whom. collect who is
// going to communicate with us!
std::vector<unsigned int> origins;
for (unsigned int i = 0; i < n_procs; ++i)
for (unsigned int j = 0; j < max_n_destinations; ++j)
if (all_destinations[i * max_n_destinations + j] == myid)
origins.push_back(i);
else if (all_destinations[i * max_n_destinations + j] ==
numbers::invalid_unsigned_int)
break;

return origins;
// If we don't have MPI_Reduce_scatter_block available, fall back to
// a different algorithm that requires even more memory: the number
// of processes times the max over the number of destinations they
// each have:

// Start by letting all processors communicate the maximal number of
// destinations they have:
const unsigned int max_n_destinations =
Utilities::MPI::max(destinations.size(), mpi_comm);

if (max_n_destinations == 0)
// all processes have nothing to send/receive:
return std::vector<unsigned int>();

// now that we know the number of data packets every processor wants to
// send, set up a buffer with the maximal size and copy our destinations
// in there, padded with -1's
std::vector<unsigned int> my_destinations(max_n_destinations,
numbers::invalid_unsigned_int);
std::copy(destinations.begin(),
destinations.end(),
my_destinations.begin());

// now exchange these (we could communicate less data if we used
// MPI_Allgatherv, but we'd have to communicate my_n_destinations to all
// processors in this case, which is more expensive than the reduction
// operation above in MPI_Allreduce)
std::vector<unsigned int> all_destinations(max_n_destinations * n_procs);
const int ierr = MPI_Allgather(my_destinations.data(),
max_n_destinations,
MPI_UNSIGNED,
all_destinations.data(),
max_n_destinations,
MPI_UNSIGNED,
mpi_comm);
AssertThrowMPI(ierr);

// now we know who is going to communicate with whom. collect who is
// going to communicate with us!
std::vector<unsigned int> origins;
for (unsigned int i = 0; i < n_procs; ++i)
for (unsigned int j = 0; j < max_n_destinations; ++j)
if (all_destinations[i * max_n_destinations + j] == myid)
origins.push_back(i);
else if (all_destinations[i * max_n_destinations + j] ==
numbers::invalid_unsigned_int)
break;

return origins;
# endif
}
}


Expand All @@ -544,22 +563,34 @@ namespace Utilities
const MPI_Comm & mpi_comm,
const std::vector<unsigned int> &destinations)
{
// check if destinations contain duplicates
auto destinations_unique = destinations;
std::sort(destinations_unique.begin(), destinations_unique.end());
destinations_unique.erase(std::unique(destinations_unique.begin(),
destinations_unique.end()),
destinations_unique.end());

if (Utilities::MPI::min<unsigned int>(destinations.size() ==
destinations_unique.size(),
mpi_comm))
// Have a little function that checks if destinations provided
// to the current process are unique:
const bool my_destinations_are_unique = [destinations]() {
std::vector<unsigned int> my_destinations = destinations;
const unsigned int n_destinations = my_destinations.size();
std::sort(my_destinations.begin(), my_destinations.end());
my_destinations.erase(std::unique(my_destinations.begin(),
my_destinations.end()),
my_destinations.end());
return (my_destinations.size() == n_destinations);
}();

// If all processes report that they have unique destinations,
// then we can short-cut the process using a consensus algorithm (which
// is implemented only for the case of unique destinations, and also only
// for MPI 3 and later):
# if DEAL_II_MPI_VERSION_GTE(3, 0)
if (Utilities::MPI::min((my_destinations_are_unique ? 1 : 0), mpi_comm) ==
1)
{
return compute_point_to_point_communication_pattern(mpi_comm,
destinations)
.size();
ConsensusAlgorithms::AnonymousProcess<char, char> process(
[&]() { return destinations; });
ConsensusAlgorithms::NBX<char, char> consensus_algorithm(process,
mpi_comm);
return consensus_algorithm.run().size();
}
else
# endif
{
const unsigned int n_procs =
Utilities::MPI::n_mpi_processes(mpi_comm);
Expand Down Expand Up @@ -594,31 +625,31 @@ namespace Utilities

return n_recv_from;
# else
// Find out how many processes will send to this one
// by reducing with sum and then scattering the
// results over all processes
std::vector<unsigned int> buffer(dest_vector.size());
unsigned int n_recv_from = 0;

int ierr = MPI_Reduce(dest_vector.data(),
buffer.data(),
dest_vector.size(),
MPI_UNSIGNED,
MPI_SUM,
0,
mpi_comm);
AssertThrowMPI(ierr);
ierr = MPI_Scatter(buffer.data(),
1,
MPI_UNSIGNED,
&n_recv_from,
1,
MPI_UNSIGNED,
0,
mpi_comm);
AssertThrowMPI(ierr);
// Find out how many processes will send to this one
// by reducing with sum and then scattering the
// results over all processes
std::vector<unsigned int> buffer(dest_vector.size());
unsigned int n_recv_from = 0;

int ierr = MPI_Reduce(dest_vector.data(),
buffer.data(),
dest_vector.size(),
MPI_UNSIGNED,
MPI_SUM,
0,
mpi_comm);
AssertThrowMPI(ierr);
ierr = MPI_Scatter(buffer.data(),
1,
MPI_UNSIGNED,
&n_recv_from,
1,
MPI_UNSIGNED,
0,
mpi_comm);
AssertThrowMPI(ierr);

return n_recv_from;
return n_recv_from;
# endif
}
}
Expand Down

0 comments on commit 07a2f3d

Please sign in to comment.