Skip to content

Commit

Permalink
Add barrier tests + Change queue timeout (#123)
Browse files Browse the repository at this point in the history
* adding local barrier test

* making queue timeout larger

* adding remote barrier test + formatting
  • Loading branch information
csegarragonz committed Jun 23, 2021
1 parent 2d7fb18 commit 608967e
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 1 deletion.
2 changes: 1 addition & 1 deletion include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include <queue>

#define DEFAULT_QUEUE_TIMEOUT_MS 500
#define DEFAULT_QUEUE_TIMEOUT_MS 5000

namespace faabric::util {
class QueueTimeoutException : public faabric::util::FaabricException
Expand Down
1 change: 1 addition & 0 deletions src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,7 @@ void MpiWorld::barrier(int thisRank)
{
if (thisRank == 0) {
// This is the root, hence just does the waiting
SPDLOG_TRACE("MPI - barrier init {}", thisRank);

// Await messages from all others
for (int r = 1; r < size; r++) {
Expand Down
37 changes: 37 additions & 0 deletions tests/test/scheduler/test_mpi_world.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <faabric/util/random.h>
#include <faabric_utils.h>

#include <thread>

using namespace faabric::scheduler;

namespace tests {
Expand Down Expand Up @@ -171,6 +173,41 @@ TEST_CASE_METHOD(MpiBaseTestFixture, "Test cartesian communicator", "[mpi]")
world.destroy();
}

TEST_CASE_METHOD(MpiBaseTestFixture, "Test local barrier", "[mpi]")
{
// Create the world
int worldSize = 2;
MpiWorld world;
world.create(msg, worldId, worldSize);

int rankA1 = 0;
int rankA2 = 1;
std::vector<int> sendData = { 0, 1, 2 };
std::vector<int> recvData = { -1, -1, -1 };

std::thread senderThread([&world, rankA1, rankA2, &sendData, &recvData] {
world.send(
rankA1, rankA2, BYTES(sendData.data()), MPI_INT, sendData.size());

world.barrier(rankA1);
assert(sendData == recvData);
});

world.recv(rankA1,
rankA2,
BYTES(recvData.data()),
MPI_INT,
recvData.size(),
MPI_STATUS_IGNORE);

REQUIRE(recvData == sendData);

world.barrier(rankA2);

senderThread.join();
world.destroy();
}

void checkMessage(faabric::MPIMessage& actualMessage,
int worldId,
int senderRank,
Expand Down
43 changes: 43 additions & 0 deletions tests/test/scheduler/test_remote_mpi_worlds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,49 @@ TEST_CASE_METHOD(RemoteMpiTestFixture,
localWorld.destroy();
}

TEST_CASE_METHOD(RemoteMpiTestFixture, "Test barrier across hosts", "[mpi]")
{
// Register two ranks (one on each host)
this->setWorldsSizes(2, 1, 1);
int rankA = 0;
int rankB = 1;
std::vector<int> sendData = { 0, 1, 2 };
std::vector<int> recvData = { -1, -1, -1 };

// Init worlds
MpiWorld& localWorld = getMpiWorldRegistry().createWorld(msg, worldId);
faabric::util::setMockMode(false);

std::thread senderThread([this, rankA, rankB, &sendData, &recvData] {
remoteWorld.initialiseFromMsg(msg);

remoteWorld.send(
rankB, rankA, BYTES(sendData.data()), MPI_INT, sendData.size());

// Barrier on this rank
remoteWorld.barrier(rankB);
assert(sendData == recvData);

remoteWorld.destroy();
});

// Receive the message for the given rank
localWorld.recv(rankB,
rankA,
BYTES(recvData.data()),
MPI_INT,
recvData.size(),
MPI_STATUS_IGNORE);
REQUIRE(recvData == sendData);

// Call barrier to synchronise remote host
localWorld.barrier(rankA);

// Destroy worlds
senderThread.join();
localWorld.destroy();
}

TEST_CASE_METHOD(RemoteMpiTestFixture,
"Test sending many messages across host",
"[mpi]")
Expand Down

0 comments on commit 608967e

Please sign in to comment.