Skip to content

Commit

Permalink
migration working
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Jan 12, 2022
2 parents 07709f0 + fefbe12 commit 6123721
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 245 deletions.
14 changes: 6 additions & 8 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ class MpiWorld

void setMsgForRank(faabric::Message& msg);

/* Function Migration */

void prepareMigration(
int thisRank,
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

private:
int id = -1;
int size = -1;
Expand Down Expand Up @@ -274,14 +280,6 @@ class MpiWorld
int recvRank,
int batchSize = 0);

/* Function Migration */

void tryMigrate(int thisRank);

void prepareMigration(
int thisRank,
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

void finishMigration(int thisRank);

/* Helper methods */
Expand Down
5 changes: 3 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,17 @@ class Executor
faabric::Message& msg,
bool createIfNotExists = false);

// TODO - maybe remove me
bool doMigration(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

virtual std::span<uint8_t> getMemoryView();

protected:
virtual void restore(const std::string& snapshotKey);

virtual void postFinish();

virtual std::span<uint8_t> getMemoryView();

virtual void setMemorySize(size_t newSize);

faabric::Message boundMessage;
Expand Down
8 changes: 8 additions & 0 deletions include/faabric/util/func.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@

namespace faabric::util {

class FunctionMigratedException : public faabric::util::FaabricException
{
public:
explicit FunctionMigratedException(std::string message)
: FaabricException(std::move(message))
{}
};

std::string funcToString(const faabric::Message& msg, bool includeId);

std::string funcToString(
Expand Down
7 changes: 0 additions & 7 deletions include/faabric/util/message.h

This file was deleted.

70 changes: 25 additions & 45 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "faabric/scheduler/MpiWorldRegistry.h"
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/snapshot/SnapshotRegistry.h>
Expand All @@ -14,7 +15,6 @@
#include <faabric/util/logging.h>
#include <faabric/util/macros.h>
#include <faabric/util/memory.h>
#include <faabric/util/message.h>
#include <faabric/util/queue.h>
#include <faabric/util/snapshot.h>
#include <faabric/util/string_tools.h>
Expand Down Expand Up @@ -529,15 +529,28 @@ void Executor::threadPoolThread(int threadPoolIdx)

// Execute the task
int32_t returnValue;
bool migrated = false;
try {
returnValue =
executeTask(threadPoolIdx, task.messageIndex, task.req);
} catch (const faabric::scheduler::ExecutorMigratedException& ex) {
SPDLOG_TRACE("Task {} has been migrated.", msg.id());
} catch (const faabric::util::FunctionMigratedException& ex) {
SPDLOG_DEBUG(
"Task {} migrated, shutting down executor {}", msg.id(), id);

returnValue = 0;
msg.set_outputdata(
"The execution of this message has been migrated");
// Note that when a task has been migrated, we need to perform all
// the normal executor shutdown, but we must NOT set the result for
// the call.
migrated = true;
selfShutdown = true;
returnValue = -99;

// MPI migration
if(msg.ismpi()) {
// TODO - when should we delete the pending migration?
auto& mpiWorld =
faabric::scheduler::getMpiWorldRegistry().getWorld(msg.mpiworldid());
mpiWorld.destroy();
}
} catch (const std::exception& ex) {
returnValue = 1;

Expand Down Expand Up @@ -674,6 +687,12 @@ void Executor::threadPoolThread(int threadPoolIdx)
// executor.
sch.vacateSlot();

// If the function has been migrated, we drop out here and shut down the
// executor
if (migrated) {
break;
}

// Finally set the result of the task, this will allow anything
// waiting on its result to continue execution, therefore must be
// done once the executor has been reset, otherwise the executor may
Expand Down Expand Up @@ -742,45 +761,6 @@ void Executor::releaseClaim()
claimed.store(false);
}

bool Executor::doMigration(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations)
{
for (int i = 0; i < pendingMigrations->migrations_size(); i++) {
auto m = pendingMigrations->mutable_migrations()->at(i);
if (m.msg().id() == boundMessage.id()) {
migrateFunction(m.msg(), m.dsthost());
return true;
}
}

return false;
}

void Executor::migrateFunction(const faabric::Message& msg,
const std::string& host)
{
SPDLOG_DEBUG("Executor received request to migrate message {} from host {}"
" to host {}",
msg.id(),
msg.executedhost(),
host);

// Take snapshot and push to recepient host
// TODO - could we just push the snapshot diffs here?
auto snap = std::make_shared<faabric::util::SnapshotData>(getMemoryView());
std::string snapKey = fmt::format("{}_migration_{}",
faabric::util::funcToString(msg, false),
faabric::util::generateGid());
sch.getSnapshotClient(host).pushSnapshot(snapKey, snap);

// Create request execution for migrated function
auto req = faabric::util::batchExecFactory(msg.user(), msg.function(), 1);
faabric::util::copyMessage(&msg, req->mutable_messages(0));
req->set_type(faabric::BatchExecuteRequest::MIGRATION);
req->mutable_messages(0)->set_snapshotkey(snapKey);
sch.getFunctionCallClient(host).executeFunctions(req);
}

// ------------------------------------------
// HOOKS
// ------------------------------------------
Expand Down
84 changes: 21 additions & 63 deletions src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)
if (thisRankMsg != nullptr) {
// Set message fields to allow for function migration
msg.set_appid(thisRankMsg->appid());
msg.set_cmdline(thisRankMsg->cmdline());
msg.set_inputdata(thisRankMsg->inputdata());
msg.set_migrationcheckperiod(thisRankMsg->migrationcheckperiod());
// Log chained functions to generate execution graphs
Expand Down Expand Up @@ -409,6 +410,9 @@ void MpiWorld::initLocalRemoteLeaders()
{
// First, group the ranks per host they belong to for convinience
assert(hostForRank.size() == size);
// Clear the existing map in case we are calling this method during a
// migration
ranksForHost.clear();

for (int rank = 0; rank < hostForRank.size(); rank++) {
std::string host = hostForRank.at(rank);
Expand Down Expand Up @@ -1311,11 +1315,6 @@ void MpiWorld::allReduce(int rank,
// Second, 0 broadcasts the result to all ranks
broadcast(
0, rank, recvBuffer, datatype, count, faabric::MPIMessage::ALLREDUCE);

// All reduce triggers a migration point in MPI
if (thisRankMsg != nullptr && thisRankMsg->migrationcheckperiod() > 0) {
tryMigrate(rank);
}
}

void MpiWorld::op_reduce(faabric_op_t* operation,
Expand Down Expand Up @@ -1563,11 +1562,6 @@ void MpiWorld::barrier(int thisRank)
broadcast(
0, thisRank, nullptr, MPI_INT, 0, faabric::MPIMessage::BARRIER_DONE);
SPDLOG_TRACE("MPI - barrier done {}", thisRank);

// Barrier triggers a migration point
if (thisRankMsg != nullptr && thisRankMsg->migrationcheckperiod() > 0) {
tryMigrate(thisRank);
}
}

std::shared_ptr<InMemoryMpiQueue> MpiWorld::getLocalQueue(int sendRank,
Expand All @@ -1588,9 +1582,9 @@ void MpiWorld::initLocalQueues()
// Assert we only allocate queues once
assert(localQueues.size() == 0);
localQueues.resize(size * size);
for (int recvRank = 0; recvRank < size; recvRank++) {
if (getHostForRank(recvRank) == thisHost) {
for (int sendRank = 0; sendRank < size; sendRank++) {
for (const int sendRank : ranksForHost[thisHost]) {
for (const int recvRank : ranksForHost[thisHost]) {
if (localQueues[getIndexForRanks(sendRank, recvRank)] == nullptr) {
localQueues[getIndexForRanks(sendRank, recvRank)] =
std::make_shared<InMemoryMpiQueue>();
}
Expand Down Expand Up @@ -1764,45 +1758,6 @@ void MpiWorld::checkRanksRange(int sendRank, int recvRank)
}
}

// This method will either (i) do nothing or (ii) run the whole migration.
// Note that every rank will call this method.
void MpiWorld::tryMigrate(int thisRank)
{
auto pendingMigrations =
getScheduler().getPendingAppMigrations(thisRankMsg->appid());

if (pendingMigrations == nullptr) {
return;
}

prepareMigration(thisRank, pendingMigrations);

bool mustShutdown = false;
if (faabric::util::isMockMode()) {
// When mocking in the tests, the call to get the current executing
// executor may fail
faabric::util::UniqueLock lock(mockMutex);
mpiMockedPendingMigrations.push_back(pendingMigrations);
} else {
// An executing thread may never return from the next call. However,
// the restored executing thread will know it is an MPI function, its
// rank, and its world id, and will therefore call the method to finish
// the migration from the snapshot server.
mustShutdown = getExecutingExecutor()->doMigration(pendingMigrations);
}

if (mustShutdown) {
SPDLOG_INFO("MPI rank {} is being migrated", thisRank);
destroy();
throw faabric::scheduler::ExecutorMigratedException(
"Executor has been migrated");
} else {
SPDLOG_INFO("MPI rank {} is part of a migration, but not migrated",
thisRank);
finishMigration(thisRank);
}
}

// Once per host we modify the per-world accounting of rank-to-hosts mappings
// and the corresponding ports for cross-host messaging.
void MpiWorld::prepareMigration(
Expand Down Expand Up @@ -1842,18 +1797,21 @@ void MpiWorld::prepareMigration(
// Reset the internal mappings.
initLocalBasePorts(hostForRank);
initLocalRemoteLeaders();
}
}

// This method will be called from two different points:
// (i) snapshot server when restoring an MPI function after migration
// (ii) after running the migration from a rank that is not migrated
void MpiWorld::finishMigration(int thisRank)
{
if (thisRank == localLeader) {
getScheduler().removePendingMigration(thisRankMsg->appid());
}
// Add the necessary new local messaging queues
// TODO - merge with initLocalQueues
for (const int sendRank : ranksForHost[thisHost]) {
for (const int recvRank : ranksForHost[thisHost]) {
if (localQueues[getIndexForRanks(sendRank, recvRank)] == nullptr) {
localQueues[getIndexForRanks(sendRank, recvRank)] =
std::make_shared<InMemoryMpiQueue>();
}
}
}

barrier(thisRank);
// Lastly, remove the migrations from the pending migrations map
// TODO - when should we remove the pending migration from the map?
// getScheduler().removePendingMigration(thisRankMsg->appid());
}
}
}
10 changes: 5 additions & 5 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>
#include <faabric/util/memory.h>
#include <faabric/util/message.h>
#include <faabric/util/network.h>
#include <faabric/util/random.h>
#include <faabric/util/scheduling.h>
Expand Down Expand Up @@ -487,8 +486,8 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
for (int i = 0; i < firstMsg.mpiworldsize() - 1; i++) {
// Append message to original request
auto* newMsgPtr = originalReq->add_messages();
faabric::util::copyMessage(&req->messages().at(i),
newMsgPtr);
*newMsgPtr = req->messages().at(i);

// Append message to original decision
originalDecision->addMessage(decision.hosts.at(i),
req->messages().at(i));
Expand Down Expand Up @@ -575,7 +574,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
snapshotKey = firstMsg.snapshotkey();
}

if (!snapshotKey.empty() && !isMigration) {
if (!snapshotKey.empty()) {
auto snap =
faabric::snapshot::getSnapshotRegistry().getSnapshot(snapshotKey);

Expand Down Expand Up @@ -1372,7 +1371,8 @@ Scheduler::doCheckForMigrationOpportunities(
&(*(req->mutable_messages()->begin() +
std::distance(originalDecision.hosts.begin(), right)));
auto* migrationMsgPtr = migration->mutable_msg();
faabric::util::copyMessage(msgPtr, migrationMsgPtr);
// faabric::util::copyMessage(msgPtr, migrationMsgPtr);
*migrationMsgPtr = *msgPtr;
// Decrement by one the availability, and check for more
// possible sources of migration
claimSlot();
Expand Down
1 change: 0 additions & 1 deletion src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ faabric_lib(util
locks.cpp
logging.cpp
memory.cpp
message.cpp
network.cpp
queue.cpp
random.cpp
Expand Down
Loading

0 comments on commit 6123721

Please sign in to comment.