Skip to content

Commit

Permalink
tests: add test for function migration thread and fix bug in thread s…
Browse files Browse the repository at this point in the history
…hutdown
  • Loading branch information
csegarragonz committed Jan 8, 2022
1 parent ff175df commit 2130201
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 31 deletions.
2 changes: 1 addition & 1 deletion include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class Scheduler
// ----------------------------------
// Function Migration
// ----------------------------------
void checkForMigrationOpportunities(
bool checkForMigrationOpportunities(
faabric::util::MigrationStrategy =
faabric::util::MigrationStrategy::BIN_PACK);

Expand Down
19 changes: 15 additions & 4 deletions src/scheduler/FunctionMigrationThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
namespace faabric::scheduler {
void FunctionMigrationThread::start(int wakeUpPeriodSecondsIn)
{
// Initialise wake up period and shutdown variable
wakeUpPeriodSeconds = wakeUpPeriodSecondsIn;
isShutdown.store(false, std::memory_order_release);

// Main work loop
workThread = std::make_unique<std::thread>([&] {
Expand All @@ -26,9 +28,16 @@ void FunctionMigrationThread::start(int wakeUpPeriodSecondsIn)
// If we hit the timeout it means we have not been notified to
// reset or stop. Thus we check for migration oportunities.
if (returnVal == std::cv_status::timeout) {
SPDLOG_DEBUG("Checking for migration oportunities");
faabric::scheduler::getScheduler()
.checkForMigrationOpportunities();
SPDLOG_DEBUG(
"Migration thread checking for migration oportunities");
// If there are no more apps in-flight to be checked-for, the
// scheduler will return false, so we can shut down
bool shutdown = faabric::scheduler::getScheduler()
.checkForMigrationOpportunities();
if (shutdown) {
SPDLOG_INFO("Shutting down ourselves...");
isShutdown.store(true, std::memory_order_release);
}
}
};

Expand All @@ -42,7 +51,7 @@ void FunctionMigrationThread::stop()
return;
}

{
if (!isShutdown.load(std::memory_order_acquire)) {
faabric::util::UniqueLock lock(mx);

// We set the flag _before_ we notify and after we acquire the lock.
Expand All @@ -55,5 +64,7 @@ void FunctionMigrationThread::stop()
if (workThread->joinable()) {
workThread->join();
}

workThread = nullptr;
}
}
24 changes: 14 additions & 10 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
std::make_shared<faabric::util::SchedulingDecision>(decision);
inFlightRequests[decision.appId] = std::make_pair(req, decisionPtr);
if (inFlightRequests.size() == 1) {
SPDLOG_INFO("starting migration thread");
functionMigrationThread.start(firstMsg.migrationcheckperiod());
} else if (firstMsg.migrationcheckperiod() !=
functionMigrationThread.wakeUpPeriodSeconds) {
Expand Down Expand Up @@ -916,11 +917,6 @@ void Scheduler::setFunctionResult(faabric::Message& msg)

inFlightRequests.erase(msg.appid());
pendingMigrations.erase(msg.appid());
// If there are no more apps to track, stop the thread checking for
// migration opportunities
if (inFlightRequests.size() == 0) {
functionMigrationThread.stop();
}
}

// Write the successful result to the result queue
Expand Down Expand Up @@ -1167,18 +1163,25 @@ ExecGraphNode Scheduler::getFunctionExecGraphNode(unsigned int messageId)
return node;
}

void Scheduler::checkForMigrationOpportunities(
bool Scheduler::checkForMigrationOpportunities(
faabric::util::MigrationStrategy migrationStrategy)
{
// Vector to cache all migrations we have to do, and update the shared map
// at the very end just once. This is because we need a unique lock to write
// to the shared map, but the rest of this method can do with a shared lock.
std::vector<std::shared_ptr<faabric::PendingMigrations>>
tmpPendingMigrations;
SPDLOG_INFO("Checking for migration opportunities");

{
faabric::util::SharedLock lock(mx);

// If no in-flight requests, stop the background thread
if (inFlightRequests.size() == 0) {
SPDLOG_INFO("No requests to check");
return true;
}

// For each in-flight request that has opted in to be migrated,
// check if there is an opportunity to migrate
for (const auto& app : inFlightRequests) {
Expand All @@ -1188,6 +1191,9 @@ void Scheduler::checkForMigrationOpportunities(
// If we have already recorded a pending migration for this req,
// skip
if (canAppBeMigrated(originalDecision.appId) != nullptr) {
SPDLOG_INFO("Skipping app {} as migration opportunity has "
"already been recorded",
originalDecision.appId);
continue;
}

Expand Down Expand Up @@ -1216,9 +1222,6 @@ void Scheduler::checkForMigrationOpportunities(
};
auto claimSlot = [&r]() {
int currentUsedSlots = r.usedslots();
SPDLOG_INFO("Old slots: {} - New slots: {}",
currentUsedSlots,
currentUsedSlots + 1);
r.set_usedslots(currentUsedSlots + 1);
};
while (left < right) {
Expand Down Expand Up @@ -1282,10 +1285,11 @@ void Scheduler::checkForMigrationOpportunities(
if (tmpPendingMigrations.size() > 0) {
faabric::util::FullLock lock(mx);
for (auto msgPtr : tmpPendingMigrations) {
SPDLOG_INFO("Adding app: {}", msgPtr->appid());
pendingMigrations[msgPtr->appid()] = std::move(msgPtr);
}
}

return false;
}

std::shared_ptr<faabric::PendingMigrations> Scheduler::canAppBeMigrated(
Expand Down
110 changes: 94 additions & 16 deletions tests/test/scheduler/test_function_migration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class FunctionMigrationTestFixture : public SchedulerTestFixture

~FunctionMigrationTestFixture()
{
sch.clearRecordedMessages();
faabric::util::setMockMode(false);

// Remove all hosts from global set
Expand Down Expand Up @@ -89,7 +88,7 @@ TEST_CASE_METHOD(FunctionMigrationTestFixture,

TEST_CASE_METHOD(
FunctionMigrationTestFixture,
"Test function migration thread only works if set in the message",
"Test migration oportunities are only detected if set in the message",
"[scheduler]")
{
// First set resources before calling the functions: one will be allocated
Expand All @@ -105,6 +104,7 @@ TEST_CASE_METHOD(
req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep));
uint32_t appId = req->messages().at(0).appid();

// Build expected pending migrations
std::shared_ptr<faabric::PendingMigrations> expectedMigrations;
SECTION("Migration not enabled") { expectedMigrations = nullptr; }

Expand Down Expand Up @@ -157,10 +157,9 @@ TEST_CASE_METHOD(
REQUIRE(sch.canAppBeMigrated(appId) == nullptr);
}

TEST_CASE_METHOD(
FunctionMigrationTestFixture,
"Test function migration thread detects migration opportunities",
"[scheduler]")
TEST_CASE_METHOD(FunctionMigrationTestFixture,
"Test checking for migration opportunities",
"[scheduler]")
{
std::vector<std::string> hosts = { masterHost, "hostA" };
std::vector<int> slots = { 1, 1 };
Expand All @@ -170,9 +169,12 @@ TEST_CASE_METHOD(
auto req = faabric::util::batchExecFactory("foo", "sleep", 2);
int timeToSleep = SHORT_TEST_TIMEOUT_MS;
req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep));
req->mutable_messages()->at(0).set_migrationcheckperiod(2);
uint32_t appId = req->messages().at(0).appid();

// By setting the check period to a non-zero value, we are effectively
// opting in to be considered for migration
req->mutable_messages()->at(0).set_migrationcheckperiod(2);

auto decision = sch.callFunctions(req);

std::shared_ptr<faabric::PendingMigrations> expectedMigrations;
Expand Down Expand Up @@ -226,11 +228,11 @@ TEST_CASE_METHOD(

TEST_CASE_METHOD(
FunctionMigrationTestFixture,
"Test detecting migration opportunities with several hosts and requests",
"Test detecting migration opportunities for several messages and hosts",
"[scheduler]")
{
// First set resources before calling the functions: one will be allocated
// locally, another one in the remote host
// First set resources before calling the functions: one request will be
// allocated to each host
std::vector<std::string> hosts = { masterHost, "hostA", "hostB", "hostC" };
std::vector<int> slots = { 1, 1, 1, 1 };
std::vector<int> usedSlots = { 0, 0, 0, 0 };
Expand All @@ -239,9 +241,11 @@ TEST_CASE_METHOD(
auto req = faabric::util::batchExecFactory("foo", "sleep", 4);
int timeToSleep = SHORT_TEST_TIMEOUT_MS;
req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep));
req->mutable_messages()->at(0).set_migrationcheckperiod(2);
uint32_t appId = req->messages().at(0).appid();

// Opt in to be considered for migration
req->mutable_messages()->at(0).set_migrationcheckperiod(2);

auto decision = sch.callFunctions(req);

// Set up expectations
Expand All @@ -256,17 +260,17 @@ TEST_CASE_METHOD(
std::vector<int> newUsedSlots = { 1, 1, 1, 1 };
setHostResources(hosts, newSlots, newUsedSlots);

// Build expected result
// Build expected result: two migrations
faabric::PendingMigrations expected;
expected.set_appid(appId);
// Migrate last message (scheduled to last host) to first host. This
// fills up the first host.
// Migration 1: migrate last message (originally scheduled to last host)
// to first host. This fills up the first host.
auto* migration1 = expected.add_migrations();
migration1->set_messageid(req->messages().at(3).id());
migration1->set_srchost(hosts.at(3));
migration1->set_dsthost(hosts.at(0));
// Migrate penultimate message (scheduled to penultimate host) to second
// host. This fills up the second host.
// Migration 2: migrate penultimate message (originally scheduled to
// penultimate host) to second host. This fills up the second host.
auto* migration2 = expected.add_migrations();
migration2->set_messageid(req->messages().at(2).id());
migration2->set_srchost(hosts.at(2));
Expand Down Expand Up @@ -301,4 +305,78 @@ TEST_CASE_METHOD(
sch.checkForMigrationOpportunities();
REQUIRE(sch.canAppBeMigrated(appId) == nullptr);
}

TEST_CASE_METHOD(
FunctionMigrationTestFixture,
"Test function migration thread detects migration opportunities",
"[scheduler]")
{
std::vector<std::string> hosts = { masterHost, "hostA" };
std::vector<int> slots = { 1, 1 };
std::vector<int> usedSlots = { 0, 0 };
setHostResources(hosts, slots, usedSlots);

auto req = faabric::util::batchExecFactory("foo", "sleep", 2);
int checkPeriodSecs = 1;
int timeToSleep = 4 * checkPeriodSecs * 1000;
req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep));
uint32_t appId = req->messages().at(0).appid();

// Opt in to be migrated
req->mutable_messages()->at(0).set_migrationcheckperiod(checkPeriodSecs);

auto decision = sch.callFunctions(req);

std::shared_ptr<faabric::PendingMigrations> expectedMigrations;

SECTION("Can not migrate") { expectedMigrations = nullptr; }

// As we don't update the available resources, no migration opportunities
// will appear, even though we are checking for them
SECTION("Can migrate")
{
// Update host resources so that a migration opportunity appears
updateLocalResources(2, 1);

// Build expected result
faabric::PendingMigrations expected;
expected.set_appid(appId);
auto* migration = expected.add_migrations();
migration->set_messageid(req->messages().at(1).id());
migration->set_srchost(hosts.at(1));
migration->set_dsthost(hosts.at(0));
expectedMigrations =
std::make_shared<faabric::PendingMigrations>(expected);
}

// Instead of directly calling the scheduler function to check for migration
// opportunites, sleep for enough time (twice the check period) so that a
// migration is detected by the background thread.
SLEEP_MS(2 * checkPeriodSecs * 1000);

auto actualMigrations = sch.canAppBeMigrated(appId);
if (expectedMigrations == nullptr) {
REQUIRE(actualMigrations == expectedMigrations);
} else {
REQUIRE(actualMigrations->appid() == expectedMigrations->appid());
REQUIRE(actualMigrations->migrations_size() ==
expectedMigrations->migrations_size());
for (int i = 0; i < actualMigrations->migrations_size(); i++) {
auto actual = actualMigrations->mutable_migrations()->at(i);
auto expected = expectedMigrations->mutable_migrations()->at(i);
REQUIRE(actual.messageid() == expected.messageid());
REQUIRE(actual.srchost() == expected.srchost());
REQUIRE(actual.dsthost() == expected.dsthost());
}
}

faabric::Message res =
sch.getFunctionResult(req->messages().at(0).id(), 2 * timeToSleep);
REQUIRE(res.returnvalue() == 0);

// Check that after the result is set, the app can't be migrated no more
sch.checkForMigrationOpportunities();
REQUIRE(sch.canAppBeMigrated(appId) == nullptr);
}

}

0 comments on commit 2130201

Please sign in to comment.