Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Different async mechanism: MPI message buffer #111

Merged
merged 8 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions include/faabric/scheduler/MpiMessageBuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include <faabric/mpi/mpi.h>
#include <faabric/proto/faabric.pb.h>

#include <iterator>
#include <list>

namespace faabric::scheduler {
/* The MPI message buffer (MMB) keeps track of the asyncrhonous
* messages that we must have received (i.e. through an irecv call) but we
* still have not waited on (acknowledged). Messages are acknowledged either
* through a call to recv or a call to await. A call to recv will
* acknowledge (i.e. synchronously read from transport buffers) as many
* unacknowleged messages there are. A call to await with a request
* id as a parameter will acknowledge as many unacknowleged messages there are
* until said request id.
*/
class MpiMessageBuffer
{
public:
/* This structure holds the metadata for each Mpi message we keep in the
* buffer. Note that the message field will point to null if unacknowleged
* or to a valid message otherwise.
*/
class PendingAsyncMpiMessage
{
public:
int requestId = -1;
std::shared_ptr<faabric::MPIMessage> msg = nullptr;
int sendRank = -1;
int recvRank = -1;
uint8_t* buffer = nullptr;
faabric_datatype_t* dataType = nullptr;
int count = -1;
faabric::MPIMessage::MPIMessageType messageType =
faabric::MPIMessage::NORMAL;

bool isAcknowledged() { return msg != nullptr; }

void acknowledge(std::shared_ptr<faabric::MPIMessage> msgIn)
{
msg = msgIn;
}
};

/* Interface to query the buffer size */

bool isEmpty();

int size();

/* Interface to add and delete messages to the buffer */

void addMessage(PendingAsyncMpiMessage msg);

void deleteMessage(
const std::list<PendingAsyncMpiMessage>::iterator& msgIt);

/* Interface to get a pointer to a message in the MMB */

// Pointer to a message given its request id
std::list<PendingAsyncMpiMessage>::iterator getRequestPendingMsg(
int requestId);

// Pointer to the first null-pointing (unacknowleged) message
std::list<PendingAsyncMpiMessage>::iterator getFirstNullMsg();

/* Interface to ask for the number of unacknowleged messages */

// Unacknowledged messages until an iterator (used in await)
int getTotalUnackedMessagesUntil(
const std::list<PendingAsyncMpiMessage>::iterator& msgIt);

// Unacknowledged messages in the whole buffer (used in recv)
int getTotalUnackedMessages();

private:
std::list<PendingAsyncMpiMessage> pendingMsgs;

std::list<PendingAsyncMpiMessage>::iterator getFirstNullMsgUntil(
const std::list<PendingAsyncMpiMessage>::iterator& msgIt);
};
}
40 changes: 0 additions & 40 deletions include/faabric/scheduler/MpiThreadPool.h

This file was deleted.

36 changes: 25 additions & 11 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
#include <faabric/mpi/mpi.h>

#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/scheduler/MpiThreadPool.h>
#include <faabric/scheduler/MpiMessageBuffer.h>
#include <faabric/transport/MpiMessageEndpoint.h>
#include <faabric/util/logging.h>
#include <faabric/util/timing.h>

#include <atomic>
#include <thread>

namespace faabric::scheduler {
typedef faabric::util::Queue<std::shared_ptr<faabric::MPIMessage>>
Expand All @@ -29,7 +27,7 @@ class MpiWorld

std::string getHostForRank(int rank);

void setAllRankHosts(const faabric::MpiHostsToRanksMessage& msg);
void setAllRankHostsPorts(const faabric::MpiHostsToRanksMessage& msg);

std::string getUser();

Expand All @@ -41,8 +39,6 @@ class MpiWorld

void destroy();

void shutdownThreadPool();

void getCartesianRank(int rank,
int maxDims,
const int* dims,
Expand Down Expand Up @@ -196,9 +192,6 @@ class MpiWorld
std::string user;
std::string function;

std::shared_ptr<faabric::scheduler::MpiAsyncThreadPool> threadPool;
int getMpiThreadPoolSize();

std::vector<int> cartProcsPerDim;

/* MPI internal messaging layer */
Expand All @@ -212,15 +205,36 @@ class MpiWorld
void initLocalQueues();

// Rank-to-rank sockets for remote messaging
void initRemoteMpiEndpoint(int sendRank, int recvRank);
int getMpiPort(int sendRank, int recvRank);
std::vector<int> basePorts;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed these to accomodate for the new port offset per world.

std::vector<int> initLocalBasePorts(
const std::vector<std::string>& executedAt);
void initRemoteMpiEndpoint(int localRank, int remoteRank);
std::pair<int, int> getPortForRanks(int localRank, int remoteRank);
void sendRemoteMpiMessage(int sendRank,
int recvRank,
const std::shared_ptr<faabric::MPIMessage>& msg);
std::shared_ptr<faabric::MPIMessage> recvRemoteMpiMessage(int sendRank,
int recvRank);
void closeMpiMessageEndpoints();

// Support for asyncrhonous communications
std::shared_ptr<MpiMessageBuffer> getUnackedMessageBuffer(int sendRank,
int recvRank);
std::shared_ptr<faabric::MPIMessage> recvBatchReturnLast(int sendRank,
int recvRank,
int batchSize = 0);

/* Helper methods */

void checkRanksRange(int sendRank, int recvRank);

// Abstraction of the bulk of the recv work, shared among various functions
void doRecv(std::shared_ptr<faabric::MPIMessage> m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPI_Status* status,
faabric::MPIMessage::MPIMessageType messageType =
faabric::MPIMessage::NORMAL);
};
}
2 changes: 0 additions & 2 deletions include/faabric/transport/MessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ class RecvMessageEndpoint : public MessageEndpoint
public:
RecvMessageEndpoint(int portIn);

RecvMessageEndpoint(int portIn, const std::string& overrideHost);

void open(MessageContext& context);

void close();
Expand Down
8 changes: 3 additions & 5 deletions include/faabric/transport/MpiMessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@ faabric::MpiHostsToRanksMessage recvMpiHostRankMsg();
void sendMpiHostRankMsg(const std::string& hostIn,
const faabric::MpiHostsToRanksMessage msg);

/* This class abstracts the notion of a communication channel between two MPI
* ranks. There will always be one rank local to this host, and one remote.
/* This class abstracts the notion of a communication channel between two remote
* MPI ranks. There will always be one rank local to this host, and one remote.
* Note that the port is unique per (user, function, sendRank, recvRank) tuple.
*/
class MpiMessageEndpoint
{
public:
MpiMessageEndpoint(const std::string& hostIn, int portIn);

MpiMessageEndpoint(const std::string& hostIn,
int portIn,
const std::string& overrideRecvHost);
MpiMessageEndpoint(const std::string& hostIn, int sendPort, int recvPort);

void sendMpiMessage(const std::shared_ptr<faabric::MPIMessage>& msg);

Expand Down
1 change: 1 addition & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message MPIMessage {
// fields.
message MpiHostsToRanksMessage {
repeated string hosts = 1;
repeated int32 basePorts = 2;
}

message Message {
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ set(LIB_FILES
SnapshotServer.cpp
SnapshotClient.cpp
MpiContext.cpp
MpiThreadPool.cpp
MpiMessageBuffer.cpp
MpiWorldRegistry.cpp
MpiWorld.cpp
${HEADERS}
Expand Down
73 changes: 73 additions & 0 deletions src/scheduler/MpiMessageBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include <faabric/scheduler/MpiMessageBuffer.h>
#include <faabric/util/logging.h>

namespace faabric::scheduler {
typedef std::list<MpiMessageBuffer::PendingAsyncMpiMessage>::iterator
MpiMessageIterator;
bool MpiMessageBuffer::isEmpty()
{
return pendingMsgs.empty();
}

int MpiMessageBuffer::size()
{
return pendingMsgs.size();
}

void MpiMessageBuffer::addMessage(PendingAsyncMpiMessage msg)
{
pendingMsgs.push_back(msg);
}

void MpiMessageBuffer::deleteMessage(const MpiMessageIterator& msgIt)
{
pendingMsgs.erase(msgIt);
}

MpiMessageIterator MpiMessageBuffer::getRequestPendingMsg(int requestId)
{
// The request id must be in the MMB, as an irecv must happen before an
// await
MpiMessageIterator msgIt =
std::find_if(pendingMsgs.begin(),
pendingMsgs.end(),
[requestId](PendingAsyncMpiMessage pendingMsg) {
return pendingMsg.requestId == requestId;
});

// If it's not there, error out
if (msgIt == pendingMsgs.end()) {
SPDLOG_ERROR("Asynchronous request id not in buffer: {}", requestId);
throw std::runtime_error("Async request not in buffer");
}

return msgIt;
}

MpiMessageIterator MpiMessageBuffer::getFirstNullMsgUntil(
const MpiMessageIterator& msgItEnd)
{
return std::find_if(
pendingMsgs.begin(), msgItEnd, [](PendingAsyncMpiMessage pendingMsg) {
return pendingMsg.msg == nullptr;
});
}

MpiMessageIterator MpiMessageBuffer::getFirstNullMsg()
{
return getFirstNullMsgUntil(pendingMsgs.end());
}

int MpiMessageBuffer::getTotalUnackedMessagesUntil(
const MpiMessageIterator& msgItEnd)
{
MpiMessageIterator firstNull = getFirstNullMsgUntil(msgItEnd);
return std::distance(firstNull, msgItEnd);
}

int MpiMessageBuffer::getTotalUnackedMessages()
{
MpiMessageIterator firstNull = getFirstNullMsg();
return std::distance(firstNull, pendingMsgs.end());
}
}
67 changes: 0 additions & 67 deletions src/scheduler/MpiThreadPool.cpp

This file was deleted.

Loading