Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Different async mechanism: MPI message buffer #111

Merged
merged 8 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions include/faabric/scheduler/MpiMessageBuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#include <faabric/mpi/mpi.h>
#include <faabric/proto/faabric.pb.h>

#include <iterator>
#include <list>

namespace faabric::scheduler {
/* The MPI message buffer (MMB) keeps track of the asyncrhonous
* messages that we must have received (i.e. through an irecv call) but we
* still have not waited on (acknowledged). Messages are acknowledged either
* through a call to recv or a call to await. A call to recv will
* acknowledge (i.e. synchronously read from transport buffers) as many
* unacknowleged messages there are. A call to await with a request
* id as a parameter will acknowledge as many unacknowleged messages there are
* until said request id.
*/
class MpiMessageBuffer
{
public:
/* This structure holds the metadata for each Mpi message we keep in the
* buffer. Note that the message field will point to null if unacknowleged
* or to a valid message otherwise.
*/
struct Arguments
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
{
int requestId;
std::shared_ptr<faabric::MPIMessage> msg;
int sendRank;
int recvRank;
uint8_t* buffer;
faabric_datatype_t* dataType;
int count;
faabric::MPIMessage::MPIMessageType messageType;
};

/* Interface to query the buffer size */

bool isEmpty();

int size();

/* Interface to add and delete messages to the buffer */

void addMessage(Arguments arg);

void deleteMessage(const std::list<Arguments>::iterator& argIt);

/* Interface to get a pointer to a message in the MMB */

// Pointer to a message given its request id
std::list<Arguments>::iterator getRequestArguments(int requestId);

// Pointer to the first null-pointing (unacknowleged) message
std::list<Arguments>::iterator getFirstNullMsg();

/* Interface to ask for the number of unacknowleged messages */

// Unacknowledged messages until an iterator (used in await)
int getTotalUnackedMessagesUntil(
const std::list<Arguments>::iterator& argIt);

// Unacknowledged messages in the whole buffer (used in recv)
int getTotalUnackedMessages();

private:
std::list<Arguments> args;

std::list<Arguments>::iterator getFirstNullMsgUntil(
const std::list<Arguments>::iterator& argIt);
};
}
40 changes: 0 additions & 40 deletions include/faabric/scheduler/MpiThreadPool.h

This file was deleted.

27 changes: 19 additions & 8 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
#include <faabric/mpi/mpi.h>

#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/scheduler/MpiThreadPool.h>
#include <faabric/scheduler/MpiMessageBuffer.h>
#include <faabric/transport/MpiMessageEndpoint.h>
#include <faabric/util/logging.h>
#include <faabric/util/timing.h>

#include <atomic>
#include <thread>

namespace faabric::scheduler {
typedef faabric::util::Queue<std::shared_ptr<faabric::MPIMessage>>
Expand Down Expand Up @@ -41,8 +39,6 @@ class MpiWorld

void destroy();

void shutdownThreadPool();

void getCartesianRank(int rank,
int maxDims,
const int* dims,
Expand Down Expand Up @@ -196,9 +192,6 @@ class MpiWorld
std::string user;
std::string function;

std::shared_ptr<faabric::scheduler::MpiAsyncThreadPool> threadPool;
int getMpiThreadPoolSize();

std::vector<int> cartProcsPerDim;

/* MPI internal messaging layer */
Expand All @@ -221,6 +214,24 @@ class MpiWorld
int recvRank);
void closeMpiMessageEndpoints();

// Support for asyncrhonous communications
std::shared_ptr<faabric::scheduler::MpiMessageBuffer>
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
getUnackedMessageBuffer(int sendRank, int recvRank);
std::shared_ptr<faabric::MPIMessage> recvBatchReturnLast(int sendRank,
int recvRank,
int batchSize = 0);

/* Helper methods */

void checkRanksRange(int sendRank, int recvRank);

// Abstraction of the bulk of the recv work, shared among various functions
void doRecv(std::shared_ptr<faabric::MPIMessage> m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPI_Status* status,
faabric::MPIMessage::MPIMessageType messageType =
faabric::MPIMessage::NORMAL);
};
}
4 changes: 2 additions & 2 deletions include/faabric/transport/MpiMessageEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ faabric::MpiHostsToRanksMessage recvMpiHostRankMsg();
void sendMpiHostRankMsg(const std::string& hostIn,
const faabric::MpiHostsToRanksMessage msg);

/* This class abstracts the notion of a communication channel between two MPI
* ranks. There will always be one rank local to this host, and one remote.
/* This class abstracts the notion of a communication channel between two remote
* MPI ranks. There will always be one rank local to this host, and one remote.
* Note that the port is unique per (user, function, sendRank, recvRank) tuple.
*/
class MpiMessageEndpoint
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ set(LIB_FILES
SnapshotServer.cpp
SnapshotClient.cpp
MpiContext.cpp
MpiThreadPool.cpp
MpiMessageBuffer.cpp
MpiWorldRegistry.cpp
MpiWorld.cpp
${HEADERS}
Expand Down
69 changes: 69 additions & 0 deletions src/scheduler/MpiMessageBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include <faabric/scheduler/MpiMessageBuffer.h>
#include <faabric/util/logging.h>

namespace faabric::scheduler {
typedef std::list<MpiMessageBuffer::Arguments>::iterator ArgListIterator;
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
bool MpiMessageBuffer::isEmpty()
{
return args.empty();
}

int MpiMessageBuffer::size()
{
return args.size();
}

void MpiMessageBuffer::addMessage(Arguments arg)
{
args.push_back(arg);
}

void MpiMessageBuffer::deleteMessage(const ArgListIterator& argIt)
{
args.erase(argIt);
}

ArgListIterator MpiMessageBuffer::getRequestArguments(int requestId)
{
// The request id must be in the MMB, as an irecv must happen before an
// await
ArgListIterator argIt =
std::find_if(args.begin(), args.end(), [requestId](Arguments args) {
return args.requestId == requestId;
});

// If it's not there, error out
if (argIt == args.end()) {
SPDLOG_ERROR("Asynchronous request id not in buffer: {}", requestId);
throw std::runtime_error("Async request not in buffer");
}

return argIt;
}

ArgListIterator MpiMessageBuffer::getFirstNullMsgUntil(
const ArgListIterator& argItEnd)
{
return std::find_if(args.begin(), argItEnd, [](Arguments args) {
return args.msg == nullptr;
});
}

ArgListIterator MpiMessageBuffer::getFirstNullMsg()
{
return getFirstNullMsgUntil(args.end());
}

int MpiMessageBuffer::getTotalUnackedMessagesUntil(
const ArgListIterator& argItEnd)
{
ArgListIterator firstNull = getFirstNullMsgUntil(argItEnd);
return std::distance(firstNull, argItEnd);
}

int MpiMessageBuffer::getTotalUnackedMessages()
{
ArgListIterator firstNull = getFirstNullMsg();
return std::distance(firstNull, args.end());
}
}
67 changes: 0 additions & 67 deletions src/scheduler/MpiThreadPool.cpp

This file was deleted.

Loading