Skip to content

Commit

Permalink
function call server: add call to add a pending migration to remote h…
Browse files Browse the repository at this point in the history
…osts
  • Loading branch information
csegarragonz committed Jan 9, 2022
1 parent 4efe4ca commit 13560b3
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 223 deletions.
3 changes: 2 additions & 1 deletion include/faabric/scheduler/FunctionCallApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ enum FunctionCalls
ExecuteFunctions = 1,
Flush = 2,
Unregister = 3,
GetResources = 4
GetResources = 4,
AddPendingMigration = 5
};
}
6 changes: 6 additions & 0 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ getBatchRequests();
std::vector<std::pair<std::string, faabric::EmptyRequest>>
getResourceRequests();

std::vector<std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
getAddPendingMigrationRequests();

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
getUnregisterRequests();

Expand All @@ -42,6 +45,9 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient

faabric::HostResources getResources();

void sendAddPendingMigration(
std::shared_ptr<faabric::PendingMigrations> req);

void executeFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(faabric::UnregisterRequest& req);
Expand Down
4 changes: 4 additions & 0 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class FunctionCallServer final
const uint8_t* buffer,
size_t bufferSize);

std::unique_ptr<google::protobuf::Message> recvAddPendingMigration(
const uint8_t* buffer,
size_t bufferSize);

void recvExecuteFunctions(const uint8_t* buffer, size_t bufferSize);

void recvUnregister(const uint8_t* buffer, size_t bufferSize);
Expand Down
16 changes: 13 additions & 3 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,15 @@ class Scheduler
// ----------------------------------
// Function Migration
// ----------------------------------
bool checkForMigrationOpportunities(
faabric::util::MigrationStrategy =
faabric::util::MigrationStrategy::BIN_PACK);
void checkForMigrationOpportunities();

std::shared_ptr<faabric::PendingMigrations> canAppBeMigrated(
uint32_t appId);

void addPendingMigration(std::shared_ptr<faabric::PendingMigrations> msg);

void removePendingMigration(uint32_t appId);

private:
std::string thisHost;

Expand Down Expand Up @@ -311,6 +313,14 @@ class Scheduler
std::unordered_map<uint32_t, InFlightPair> inFlightRequests;
std::unordered_map<uint32_t, std::shared_ptr<faabric::PendingMigrations>>
pendingMigrations;

std::vector<std::shared_ptr<faabric::PendingMigrations>>
doCheckForMigrationOpportunities(
faabric::util::MigrationStrategy migrationStrategy =
faabric::util::MigrationStrategy::BIN_PACK);

void broadcastAddPendingMigration(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);
};

}
2 changes: 1 addition & 1 deletion src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ message PendingMigrations {
int32 groupId = 2;

message PendingMigration {
int32 messageId = 1;
Message msg = 1;
string srcHost = 2;
string dstHost = 3;
}
Expand Down
28 changes: 28 additions & 0 deletions src/scheduler/FunctionCallClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ static std::unordered_map<std::string,
faabric::util::Queue<faabric::HostResources>>
queuedResourceResponses;

static std::vector<
std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
addPendingMigrationRequests;

static std::vector<std::pair<std::string, faabric::UnregisterRequest>>
unregisterRequests;

Expand Down Expand Up @@ -57,6 +61,13 @@ std::vector<std::pair<std::string, faabric::EmptyRequest>> getResourceRequests()
return resourceRequests;
}

std::vector<std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
getAddPendingMigrationRequests()
{
faabric::util::UniqueLock lock(mockMutex);
return addPendingMigrationRequests;
}

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
getUnregisterRequests()
{
Expand All @@ -76,6 +87,7 @@ void clearMockRequests()
functionCalls.clear();
batchMessages.clear();
resourceRequests.clear();
addPendingMigrationRequests.clear();
unregisterRequests.clear();

for (auto& p : queuedResourceResponses) {
Expand Down Expand Up @@ -128,6 +140,22 @@ faabric::HostResources FunctionCallClient::getResources()
return response;
}

void FunctionCallClient::sendAddPendingMigration(
std::shared_ptr<PendingMigrations> req)
{
faabric::PendingMigrations request;
faabric::EmptyResponse response;

if (faabric::util::isMockMode()) {
faabric::util::UniqueLock lock(mockMutex);
addPendingMigrationRequests.emplace_back(host, req);
} else {
syncSend(faabric::scheduler::FunctionCalls::AddPendingMigration,
req.get(),
&response);
}
}

void FunctionCallClient::executeFunctions(
const std::shared_ptr<faabric::BatchExecuteRequest> req)
{
Expand Down
16 changes: 16 additions & 0 deletions src/scheduler/FunctionCallServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ std::unique_ptr<google::protobuf::Message> FunctionCallServer::doSyncRecv(
case faabric::scheduler::FunctionCalls::GetResources: {
return recvGetResources(buffer, bufferSize);
}
case faabric::scheduler::FunctionCalls::AddPendingMigration: {
return recvAddPendingMigration(buffer, bufferSize);
}
default: {
throw std::runtime_error(
fmt::format("Unrecognized sync call header: {}", header));
Expand Down Expand Up @@ -100,4 +103,17 @@ std::unique_ptr<google::protobuf::Message> FunctionCallServer::recvGetResources(
scheduler.getThisHostResources());
return response;
}

std::unique_ptr<google::protobuf::Message>
FunctionCallServer::recvAddPendingMigration(const uint8_t* buffer,
size_t bufferSize)
{
PARSE_MSG(faabric::PendingMigrations, buffer, bufferSize);

auto msgPtr = std::make_shared<faabric::PendingMigrations>(msg);

scheduler.addPendingMigration(msgPtr);

return std::make_unique<faabric::EmptyResponse>();
}
}
9 changes: 2 additions & 7 deletions src/scheduler/FunctionMigrationThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@ void FunctionMigrationThread::start(int wakeUpPeriodSecondsIn)
if (returnVal == std::cv_status::timeout) {
SPDLOG_DEBUG(
"Migration thread checking for migration opportunities");
// If there are no more apps in-flight to be checked-for, the
// scheduler will return false, so we can shut down
bool shutdown = faabric::scheduler::getScheduler()
.checkForMigrationOpportunities();
if (shutdown) {
isShutdown.store(true, std::memory_order_release);
}
faabric::scheduler::getScheduler()
.checkForMigrationOpportunities();
}
};

Expand Down
Loading

0 comments on commit 13560b3

Please sign in to comment.