Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Different async mechanism: MPI message buffer #111

Merged
merged 8 commits into from
Jun 16, 2021
Merged

Different async mechanism: MPI message buffer #111

merged 8 commits into from
Jun 16, 2021

Conversation

csegarragonz
Copy link
Collaborator

@csegarragonz csegarragonz commented Jun 10, 2021

Problem

Current thread pool is shared among all ranks. Thus, any thread in the pool may perform send/recv in lieu of any other rank in the host. This has several problems:

  1. Increases the number of thread-local endpoint arrays.
  2. Throws EADDRINUSE errors when the main thread and a pool-worker thread try to recv from the same rank (into the same rank as well).
  3. In general, it breaks the point-to-point thread-local semantics desirable for MPI.
  4. Lastly, send/recv are very lightweight methods. One could argue that running them in separate threads is an overkill.

In line 327 here I introduce a test that breaks the current semantics.

Solution

Remove the thread pool. Keep track of async messages that we expected to receive per (sendRank, recvRank) and perform batch reads in blocking calls.

Rely on ZeroMQ's asyncrhonous API. In particular, on the poll call, that blocks until it has received at least one message (but does not actually call recv). I.e. it polls the underlying receive buffers.
After having thought through it, I think we don't need polling (which is used to multiplex input sockets) but just recv-ing enough times according to our protocol.

We assume that the MPI application expects to recv messages in the order it issues recv/irecvs. This is, if the application issues irecv, irecv, and recv, then the 3rd message entering our network buffer will correspond to the recv, and not to any of the irecv even in this case:

/* SENDER */
send(msg1);
send(msg2);
send(msg3);

/* RECEIVER */
int id1 = irecv();
int id2 = irecv();

// third message in buffer
msg_t msg3 = recv();

// second message in buffer
msg_t msg2 = await(id2)
// first message in buffer
msg_t msg1 = await(id1)

This is the case for OpenMPI tag-less (tested), note that we don't support tags.

Detail on the implementation

We introduce a thread local unacknowledged message buffer (UMB). There is one per rank-to-rank channel local to the host. Each host has localRanks * worldSize such channels.

It stores messages that have been received, but not claimed, or claimed but not received.

The struct prototype is:

struct UMB {
    // List of claims
    std::list<int> ids;
    // List of messages
    std::list<msg_t> msgs;

    // How many claims have no matching message
    int pendingIds() {
        return std::max<int>(0, ids.size() - msgs.size());
    }
};

There are three methods that modify UMB: recv -> msg, irecv -> id, await(id) -> msg. All are executed by the same thread so we need no locks. I include the pseudo-code for each:

  1. irecv: receive a message asynchronously.
int irecv()
{
    int id = generateId();
    UMB.ids.push_back(id);
}
  1. recv: recv a message synchronously (block until we receive it)
msg_t recv()
{
    return recvBatchReturnLast(pendingIds() + 1);
}
  1. await(id): wait until irecv with id has finished (i.e. msg has arrived)
msg_t await(id)
{
    // Await comes after irecv
    auto index = UMB.ids.find(id);
    assert(index != UMB.ids.end());

    // We assume that awaits can happen in a different order than `irecv`s,
    // that's why we can't use queues in the UMB in the first place, as here
    // we may want to pop an element in the middle of the list
    assert(UMB.msgs.size > index);
    auto it = std::advance(UMB.msgs.begin(), index);
    if (*it != nullptr) {
        msg_t toReturn = *it;
        // Remove ids and message from lists
        UMB.msgs.erase(it);
        UMB.ids.erase(index);
        return toReturn;
    } else {
        // Receive enough messages to get to our index
        return recvBatchReturnLast(index);
    }
}

Lastly, here's the auxiliary method to poll until a certain number of messages have arrived our inbound network sockets (ZeroMQ's):

msg_t recvBatchReturnLast(int numMsgToRecv)
{
    // First we receive all messages for which there is an id but no msg
    // i.e. `irecv`'s that happened before our `recv/await`.
    for (int i = 0; i < numMsgToRecv - 1; i++) {
        if (UMB.isLocal) {
            msg_t msg = localQueue.dequeue();
        } else {
            msg_t msg = socket.recv();
        }
        UMB.msg.push_back(msg);
    }

    // This is the message we are interested in returning
    msg_t toReturn = socket.recv;
    // Note that there may be other messages ready to be acknowledged in the
    // underlying buffer, but adding them to the UMB may add messages that 
    // correspond to standard `recv` and not `irecv`.

    return msg;
}

Possible pitfalls

  1. Is this slowing our fast path down?
  2. Can a slow recv be stuck behind a irecv that never appears?
    • No. According to our assumptions, all irecv sends need to be done before the recv send, if recv happens after those irecv.

@csegarragonz csegarragonz self-assigned this Jun 10, 2021
@csegarragonz csegarragonz added bug Something isn't working enhancement New feature or request help wanted Extra attention is needed mpi Related to the MPI implementation labels Jun 10, 2021
@csegarragonz csegarragonz changed the title Poller Different async mechanism Jun 10, 2021
@Shillaker
Copy link
Collaborator

This looks great, very well thought out. I also like the iteration to avoid 0MQ's async API, the simpler we make it the better.

Re. slowing down the fast path, it's probably much of a muchness, given that this implementation is much nicer (and actually works), I wouldn't let that stop us. We can do some benchmarking once it's done as well.

functionCallClients;
/* Each MPI rank runs in a separate thread, however they interact with faabric
* as a library. Thus, we use thread_local storage to guarantee that each rank
* sees its own version of these data structures.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this comment, what do you mean "they interact with faabric as a library"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really the comment is too verbose and poorly phrased, will rephrase.

assert(index >= 0 && index < size * size);

// Lazily initialise send endpoints
if (mpiMessageEndpoints[index] == nullptr) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will these definitely be nullptr when uninitialised? If the condition for being uninitialised is that they are nullptr, I would usually explicitly set that in the constructor, however, it might be overkill and I'm not sure what the spec says.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Currently going through the comments in #105, but this applies as well)

And yes, this will definately be uninitialised as we explicitely emplace_back a nullptr in line 45.

src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
for (int i = 0; i < size * size; i++) {
unackedMessageBuffers.emplace_back(nullptr);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do unackedMessageBuffers = std::vector(size * size, nullptr) rather than a loop here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case yes as we are using shared pointers. However, I will use resize as I think it fits better.
This is, unackedMessageBuffers.resize(size * size, nullptr).

src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
include/faabric/scheduler/MpiWorld.h Outdated Show resolved Hide resolved
faabric_datatype_t* dataType;
int count;
faabric::MPIMessage::MPIMessageType messageType;
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we put the shared pointer to the message into this struct, could we get rid of one of the std::lists in this class?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in fact after some deep refactoring I realized we could just as well use one single list (which makes iterator management much easier).

src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
@Shillaker
Copy link
Collaborator

Ah sorry I got confused and reviewed this when you had actually requested a review on the other one 🤦

@@ -366,4 +362,187 @@ TEST_CASE_METHOD(RemoteCollectiveTestFixture,
senderThread.join();
localWorld.destroy();
}

TEST_CASE_METHOD(RemoteMpiTestFixture,
"Test sending sync and async message to same host",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the test that exposed the flaws in the thread pool (and would make the current master fail).

@csegarragonz csegarragonz changed the title Different async mechanism Different async mechanism: MPI message buffer Jun 15, 2021
Copy link
Collaborator

@Shillaker Shillaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is looking good, just a few style/ structure changes.

SPDLOG_TRACE("MPI - pending recv {} -> {}", sendRank, recvRank);
auto _m = getLocalQueue(sendRank, recvRank)->dequeue();

assert(_m != nullptr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry should have been clearer. I think it would be more useful to put the check in the enqueue method, as that will catch the issue at the source of the problem (i.e. when it's enqueued) rather than here.

include/faabric/scheduler/MpiMessageBuffer.h Outdated Show resolved Hide resolved
src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
src/scheduler/MpiWorld.cpp Outdated Show resolved Hide resolved
tests/test/scheduler/test_mpi_message_buffer.cpp Outdated Show resolved Hide resolved
include/faabric/scheduler/MpiWorld.h Outdated Show resolved Hide resolved
src/scheduler/MpiMessageBuffer.cpp Outdated Show resolved Hide resolved
}
}
unackedMessageBuffers.clear();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add a couple of small tests to make sure these destroy checks work, e.g. call isend a few times then call destroy and REQUIRE_THROWS

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out, as I had actually missed checking if iSendRequest was empty at destruction time.

@csegarragonz csegarragonz removed the help wanted Extra attention is needed label Jun 15, 2021
@@ -212,15 +205,36 @@ class MpiWorld
void initLocalQueues();

// Rank-to-rank sockets for remote messaging
void initRemoteMpiEndpoint(int sendRank, int recvRank);
int getMpiPort(int sendRank, int recvRank);
std::vector<int> basePorts;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed these to accomodate for the new port offset per world.

@csegarragonz csegarragonz force-pushed the poller branch 3 times, most recently from a3287fd to bee0ef8 Compare June 16, 2021 11:45
@Shillaker Shillaker self-requested a review June 16, 2021 11:49
@@ -261,16 +262,51 @@ std::string MpiWorld::getHostForRank(int rank)
return host;
}

// Returns a pair (sendPort, recvPort)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also worth checking this which has changed since last review.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(basically this commit)

Copy link
Collaborator

@Shillaker Shillaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, LGTM.

@csegarragonz csegarragonz merged commit b9da3c0 into master Jun 16, 2021
@csegarragonz csegarragonz deleted the poller branch June 16, 2021 13:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working enhancement New feature or request mpi Related to the MPI implementation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants