Skip to content

Commit

Permalink
removing thread pool and implementing the umb
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Jun 14, 2021
1 parent faf237d commit 7165f26
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 294 deletions.
40 changes: 0 additions & 40 deletions include/faabric/scheduler/MpiThreadPool.h

This file was deleted.

52 changes: 43 additions & 9 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,42 @@
#include <faabric/mpi/mpi.h>

#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/scheduler/MpiThreadPool.h>
#include <faabric/transport/MpiMessageEndpoint.h>
#include <faabric/util/logging.h>
#include <faabric/util/timing.h>

#include <atomic>
#include <thread>
#include <list>

namespace faabric::scheduler {
typedef faabric::util::Queue<std::shared_ptr<faabric::MPIMessage>>
InMemoryMpiQueue;

/* The untracked message buffer (UMB) keeps track of the asyncrhonous
* message that we must have received (i.e. through an irecv call) but we
* still have not waited on (acknowledged). Messages are acknowledged either
* through a call to recv or a call to await. A call to recv will
* acknowledge (i.e. synchronously read from transport buffers) as many
* unacknowleged messages there are, plus one.
*/
struct UnackedMessageBuffer
{
struct Arguments
{
int sendRank;
int recvRank;
uint8_t* buffer;
faabric_datatype_t* dataType;
int count;
faabric::MPIMessage::MPIMessageType messageType;
};

std::list<int> ids;
std::list<Arguments> args;
std::list<std::shared_ptr<faabric::MPIMessage>> msgs;
};

class MpiWorld
{
public:
Expand All @@ -41,8 +63,6 @@ class MpiWorld

void destroy();

void shutdownThreadPool();

void getCartesianRank(int rank,
int maxDims,
const int* dims,
Expand Down Expand Up @@ -196,9 +216,6 @@ class MpiWorld
std::string user;
std::string function;

std::shared_ptr<faabric::scheduler::MpiAsyncThreadPool> threadPool;
int getMpiThreadPoolSize();

std::vector<int> cartProcsPerDim;

/* MPI internal messaging layer */
Expand All @@ -218,6 +235,23 @@ class MpiWorld
const std::shared_ptr<faabric::MPIMessage>& msg);
std::shared_ptr<faabric::MPIMessage> recvRemoteMpiMessage(int sendRank,
int recvRank);
void closeMpiMessageEndpoints();

// Support for asyncrhonous communications
std::shared_ptr<faabric::scheduler::UnackedMessageBuffer>
getUnackedMessageBuffer(int sendRank, int recvRank);
std::shared_ptr<faabric::MPIMessage> recvBatchReturnLast(int sendRank,
int recvRank,
int batchSize = 0);

/* Helper methods */

// Abstraction of the bulk of the recv work, shared among various functions
void doRecv(std::shared_ptr<faabric::MPIMessage> m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPI_Status* status,
faabric::MPIMessage::MPIMessageType messageType =
faabric::MPIMessage::NORMAL);
};
}
1 change: 0 additions & 1 deletion src/scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ set(LIB_FILES
SnapshotServer.cpp
SnapshotClient.cpp
MpiContext.cpp
MpiThreadPool.cpp
MpiWorldRegistry.cpp
MpiWorld.cpp
${HEADERS}
Expand Down
67 changes: 0 additions & 67 deletions src/scheduler/MpiThreadPool.cpp

This file was deleted.

Loading

0 comments on commit 7165f26

Please sign in to comment.