Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locality aware gather #198

Merged
merged 6 commits into from
Dec 16, 2021
Merged

Locality aware gather #198

merged 6 commits into from
Dec 16, 2021

Conversation

csegarragonz
Copy link
Collaborator

@csegarragonz csegarragonz commented Dec 13, 2021

In this PR I change the implementation for the gather collective communication algorithm to minimise cross-host messaging as we previously did with broadcast and reduce.

Gather is slightly more tricky than broadcast and reduce as every rank contributes with a different bit of data, and the order of these bits must be preserved. This means that each remote leader sends a message with the to-be-gathered data from its local ranks coalesced. The receiver must ensure that the bits are then correctly re-mapped in the receive buffer.

To implement gather, I found very useful to have a dict where the key was the host and the value a vector with all the ranks assigned to that host. As there was already an overload of methods and variables tracking rank-to-host infromation, I re-factor the three collective communications algorithms to use the same dictionary. Further, I set the remote leader for each host to be the first element in said vector.

This means that I can get rid of the localRanks, and remoteLeaders vectors. I keep the local leader variable, but it could also be removed. Lastly, it felt natural that this dict was named ranksForHost, however, this was very close to the rankHosts vector we use to broadcast the mappings upon world creation. Given that the latter actually stores the host for a given rank, I re-factor it to hostForRank.

Note that it is preferrable to maintain the hostForRank vector (even though we use the dict around) as it is easier to broadcast and receive.

@csegarragonz csegarragonz self-assigned this Dec 13, 2021
@csegarragonz csegarragonz added the mpi Related to the MPI implementation label Dec 13, 2021
SPDLOG_TRACE("MPI - reduce ({}) all -> {}", operation->id, recvRank);

// Work out the list of all the ranks we need to wait for
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer need this after the refactor.

ranksForHost.push_back(i);
}
for (int rank = 0; rank < hostForRank.size(); rank++) {
std::string host = hostForRank.at(rank);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrote this in two lines for the sake of clarity, reading hostForRank and ranksForHost on the same line can be a bit of a tong-twister. Happy to merge to one though.

for (auto it : ranksForHost) {
// Persist the local leader in this host for further use
if (it.first == thisHost) {
localLeader = *std::min_element(it.second.begin(), it.second.end());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the PR description, I keep the localLeader variable to avoid having to write ranksForHost[thisHost].front(), but I could do so.

nullptr,
faabric::MPIMessage::GATHER);

// Copy each received chunk to its offset
for (int r = 0; r < it.second.size(); r++) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary as we can not assume that ranks for the same host are always consecutive numbers.

The way MPI works atm this only happens if we are overloading the master after running out of resources.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We construct this vector on world creation time don't we? Can we sort it at that point to simplify this code by knowing that ranks are consecutive?

Copy link
Collaborator Author

@csegarragonz csegarragonz Dec 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I was trying to say here though is that if rank X, local leader of host Y, is sending the contribution of all its local ranks including itself [X, X1, X2, X3], we can't assume X, X1, X2, X3 to be consecutive numbers.

As a consequence, the receiver of the gather (that has alocated a buffer with worldSize slots), can't just copy the four coalesced bits of information starting at Xs offset.

However, as the receiver knows the ranks local to X and their order, it copies messages one-by-one to the right offset.


for (int r = 0; r < ranksForHost[thisHost].size(); r++) {
if (ranksForHost[thisHost].at(r) == sendRank) {
// If gathering in-place from a rank that is not-receiving
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gather has an added factor of grimness because the offset were the data to be sent lives in the send buffer depends:

  • If doing an MPI_Allgather with MPI_IN_PLACE, the send buffer already contains space for the gather result, and the send data is at the offset corresponding to the sending rank.
  • In any other situation it is directly at offset zero.

include/faabric/scheduler/MpiWorld.h Outdated Show resolved Hide resolved
src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
sendOffset);
}

continue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This structure is equivalent to a single if-else but hard to read with the nested ifs:

for(...) {
    if(a) {
        if(b) {
            c();
        }
        continue;
    }
    
    d();
}

Is equivalent to (what I think is more readable):

for(...) {
    if(a && b) {
        c();
    } else if(!a) {
        d();
    }
}

@@ -626,11 +626,26 @@ TEST_CASE_METHOD(RemoteCollectiveTestFixture,
std::vector<int> actual(thisWorldSize * nPerRank, -1);

// Call gather for each rank other than the root (out of order)
int root = thisHostRankA;
int root;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected more testing for this change, are we checking how many cross-host messages are sent in the remote case (i.e. that the change is actually doing what we need it to)?

nullptr,
faabric::MPIMessage::GATHER);

// Copy each received chunk to its offset
for (int r = 0; r < it.second.size(); r++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We construct this vector on world creation time don't we? Can we sort it at that point to simplify this code by knowing that ranks are consecutive?

@@ -912,53 +918,138 @@ void MpiWorld::gather(int sendRank,

bool isInPlace = sendBuffer == recvBuffer;

// If we're the root, do the gathering
Copy link
Collaborator

@Shillaker Shillaker Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is now lacking comments like this that say what it's doing at a high level. At the start of the function you could put a comment summarising the four (?) different scenarios, then make it clearer by factoring out some of the booleans to make the big conditional easier to parse:

// This does a gather via local leader ranks. There are four scenarios:
// - this rank is the root so ...
// - this rank is the local leader but the root is on this host so ...
// - this rank is the local leader so ...
// - this rank is neither the local leader, nor the root, so ...

bool isGatherRoot = sendRank == recvRank;
bool isLocalLeader = sendRank == localLeader;
bool rootOnThisHost = getHostForRank(recvRank) == thisHost;

if(isGatherRoot) {
    // This is the root rank, do xyz
    ...
} else if(isLocalLeader && rootOnThisHost) {
    // This is the local leader and root rank is on this host, do xyz
} else if(isLocalLeader) {
    // This rank is the local leader and root is on another host, do xyz
} else {
    // This rank is not local leader or root, do xyz
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I actually think this was a very good idea and it makes the code much more readable (imo).

src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
sendCount * ranksForHost[thisHost].size(),
faabric::MPIMessage::GATHER);

} else if (isLocalLeader && isLocalGather) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scenario 3 and Scenario 5 are, after the re-factor, identical. However, they are, semantically, different situations.

I have decided to keep both as different branches for readability, and to help follow the code using the comment at the beginning.

@csegarragonz csegarragonz merged commit 1e9edf6 into master Dec 16, 2021
@csegarragonz csegarragonz deleted the locality-aware-gather branch December 16, 2021 14:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
mpi Related to the MPI implementation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants