Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Feb 2, 2022
1 parent 6123721 commit 446e90a
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 41 deletions.
2 changes: 0 additions & 2 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,6 @@ class MpiWorld
int recvRank,
int batchSize = 0);

void finishMigration(int thisRank);

/* Helper methods */

void checkRanksRange(int sendRank, int recvRank);
Expand Down
4 changes: 0 additions & 4 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ class Executor
faabric::Message& msg,
bool createIfNotExists = false);

// TODO - maybe remove me
bool doMigration(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

virtual std::span<uint8_t> getMemoryView();

protected:
Expand Down
7 changes: 4 additions & 3 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,11 @@ void Executor::threadPoolThread(int threadPoolIdx)
returnValue = -99;

// MPI migration
if(msg.ismpi()) {
// TODO - when should we delete the pending migration?
if (msg.ismpi()) {
// TODO - delete the pending migration
auto& mpiWorld =
faabric::scheduler::getMpiWorldRegistry().getWorld(msg.mpiworldid());
faabric::scheduler::getMpiWorldRegistry().getWorld(
msg.mpiworldid());
mpiWorld.destroy();
}
} catch (const std::exception& ex) {
Expand Down
9 changes: 2 additions & 7 deletions src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1758,8 +1758,6 @@ void MpiWorld::checkRanksRange(int sendRank, int recvRank)
}
}

// Once per host we modify the per-world accounting of rank-to-hosts mappings
// and the corresponding ports for cross-host messaging.
void MpiWorld::prepareMigration(
int thisRank,
std::shared_ptr<faabric::PendingMigrations> pendingMigrations)
Expand Down Expand Up @@ -1802,16 +1800,13 @@ void MpiWorld::prepareMigration(
// TODO - merge with initLocalQueues
for (const int sendRank : ranksForHost[thisHost]) {
for (const int recvRank : ranksForHost[thisHost]) {
if (localQueues[getIndexForRanks(sendRank, recvRank)] == nullptr) {
if (localQueues[getIndexForRanks(sendRank, recvRank)] ==
nullptr) {
localQueues[getIndexForRanks(sendRank, recvRank)] =
std::make_shared<InMemoryMpiQueue>();
}
}
}

// Lastly, remove the migrations from the pending migrations map
// TODO - when should we remove the pending migration from the map?
// getScheduler().removePendingMigration(thisRankMsg->appid());
}
}
}
1 change: 0 additions & 1 deletion src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,6 @@ Scheduler::doCheckForMigrationOpportunities(
&(*(req->mutable_messages()->begin() +
std::distance(originalDecision.hosts.begin(), right)));
auto* migrationMsgPtr = migration->mutable_msg();
// faabric::util::copyMessage(msgPtr, migrationMsgPtr);
*migrationMsgPtr = *msgPtr;
// Decrement by one the availability, and check for more
// possible sources of migration
Expand Down
25 changes: 1 addition & 24 deletions tests/test/scheduler/test_function_migration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <faabric/scheduler/FunctionMigrationThread.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/util/config.h>
#include <faabric/util/message.h>
#include <faabric/util/testing.h>

using namespace faabric::scheduler;
Expand Down Expand Up @@ -86,8 +85,7 @@ class FunctionMigrationTestFixture : public SchedulerTestFixture
for (auto pair : migrations) {
auto* migration = expected.add_migrations();
auto* migrationMsg = migration->mutable_msg();
faabric::util::copyMessage(&req->mutable_messages()->at(pair.first),
migrationMsg);
*migrationMsg = req->mutable_messages()->at(pair.first);
migration->set_srchost(hosts.at(pair.first));
migration->set_dsthost(hosts.at(pair.second));
}
Expand Down Expand Up @@ -449,32 +447,11 @@ TEST_CASE_METHOD(FunctionMigrationTestFixture,
checkPendingMigrationsExpectation(
expectedMigrations, actualMigrations, hosts, true);

// Check that certain MPI calls actually do the migration
SECTION("MPI barrier triggers a migration point") { world.barrier(0); }

SECTION("MPI all reduce triggers a migration point")
{
std::vector<int> messageData = { 0, 1, 2 };
world.allReduce(0,
BYTES(messageData.data()),
BYTES(messageData.data()),
MPI_INT,
messageData.size(),
MPI_SUM);
}

// When performing the migration, MPI will remove it from the pending
// migrations map
REQUIRE(sch.getPendingAppMigrations(appId) == nullptr);
checkPendingMigrationsExpectation(
expectedMigrations, getMpiMockedPendingMigrations().front(), hosts, true);

faabric::Message res =
sch.getFunctionResult(firstMsg->id(), 2 * timeToSleep);
REQUIRE(res.returnvalue() == 0);

// Clean up
world.destroy();
}

}

0 comments on commit 446e90a

Please sign in to comment.