From 2130201aa8466a1f7b010d9b70e3e316f0c4bbe1 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Sat, 8 Jan 2022 19:15:08 +0000 Subject: [PATCH] tests: add test for function migration thread and fix bug in thread shutdown --- include/faabric/scheduler/Scheduler.h | 2 +- src/scheduler/FunctionMigrationThread.cpp | 19 ++- src/scheduler/Scheduler.cpp | 24 ++-- .../scheduler/test_function_migration.cpp | 110 +++++++++++++++--- 4 files changed, 124 insertions(+), 31 deletions(-) diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 2d78f15dd..200b805f6 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -228,7 +228,7 @@ class Scheduler // ---------------------------------- // Function Migration // ---------------------------------- - void checkForMigrationOpportunities( + bool checkForMigrationOpportunities( faabric::util::MigrationStrategy = faabric::util::MigrationStrategy::BIN_PACK); diff --git a/src/scheduler/FunctionMigrationThread.cpp b/src/scheduler/FunctionMigrationThread.cpp index 805f7f8a8..1fe16b472 100644 --- a/src/scheduler/FunctionMigrationThread.cpp +++ b/src/scheduler/FunctionMigrationThread.cpp @@ -6,7 +6,9 @@ namespace faabric::scheduler { void FunctionMigrationThread::start(int wakeUpPeriodSecondsIn) { + // Initialise wake up period and shutdown variable wakeUpPeriodSeconds = wakeUpPeriodSecondsIn; + isShutdown.store(false, std::memory_order_release); // Main work loop workThread = std::make_unique([&] { @@ -26,9 +28,16 @@ void FunctionMigrationThread::start(int wakeUpPeriodSecondsIn) // If we hit the timeout it means we have not been notified to // reset or stop. Thus we check for migration oportunities. if (returnVal == std::cv_status::timeout) { - SPDLOG_DEBUG("Checking for migration oportunities"); - faabric::scheduler::getScheduler() - .checkForMigrationOpportunities(); + SPDLOG_DEBUG( + "Migration thread checking for migration oportunities"); + // If there are no more apps in-flight to be checked-for, the + // scheduler will return false, so we can shut down + bool shutdown = faabric::scheduler::getScheduler() + .checkForMigrationOpportunities(); + if (shutdown) { + SPDLOG_INFO("Shutting down ourselves..."); + isShutdown.store(true, std::memory_order_release); + } } }; @@ -42,7 +51,7 @@ void FunctionMigrationThread::stop() return; } - { + if (!isShutdown.load(std::memory_order_acquire)) { faabric::util::UniqueLock lock(mx); // We set the flag _before_ we notify and after we acquire the lock. @@ -55,5 +64,7 @@ void FunctionMigrationThread::stop() if (workThread->joinable()) { workThread->join(); } + + workThread = nullptr; } } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 663fc2ac0..30512db01 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -463,6 +463,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( std::make_shared(decision); inFlightRequests[decision.appId] = std::make_pair(req, decisionPtr); if (inFlightRequests.size() == 1) { + SPDLOG_INFO("starting migration thread"); functionMigrationThread.start(firstMsg.migrationcheckperiod()); } else if (firstMsg.migrationcheckperiod() != functionMigrationThread.wakeUpPeriodSeconds) { @@ -916,11 +917,6 @@ void Scheduler::setFunctionResult(faabric::Message& msg) inFlightRequests.erase(msg.appid()); pendingMigrations.erase(msg.appid()); - // If there are no more apps to track, stop the thread checking for - // migration opportunities - if (inFlightRequests.size() == 0) { - functionMigrationThread.stop(); - } } // Write the successful result to the result queue @@ -1167,7 +1163,7 @@ ExecGraphNode Scheduler::getFunctionExecGraphNode(unsigned int messageId) return node; } -void Scheduler::checkForMigrationOpportunities( +bool Scheduler::checkForMigrationOpportunities( faabric::util::MigrationStrategy migrationStrategy) { // Vector to cache all migrations we have to do, and update the shared map @@ -1175,10 +1171,17 @@ void Scheduler::checkForMigrationOpportunities( // to the shared map, but the rest of this method can do with a shared lock. std::vector> tmpPendingMigrations; + SPDLOG_INFO("Checking for migration opportunities"); { faabric::util::SharedLock lock(mx); + // If no in-flight requests, stop the background thread + if (inFlightRequests.size() == 0) { + SPDLOG_INFO("No requests to check"); + return true; + } + // For each in-flight request that has opted in to be migrated, // check if there is an opportunity to migrate for (const auto& app : inFlightRequests) { @@ -1188,6 +1191,9 @@ void Scheduler::checkForMigrationOpportunities( // If we have already recorded a pending migration for this req, // skip if (canAppBeMigrated(originalDecision.appId) != nullptr) { + SPDLOG_INFO("Skipping app {} as migration opportunity has " + "already been recorded", + originalDecision.appId); continue; } @@ -1216,9 +1222,6 @@ void Scheduler::checkForMigrationOpportunities( }; auto claimSlot = [&r]() { int currentUsedSlots = r.usedslots(); - SPDLOG_INFO("Old slots: {} - New slots: {}", - currentUsedSlots, - currentUsedSlots + 1); r.set_usedslots(currentUsedSlots + 1); }; while (left < right) { @@ -1282,10 +1285,11 @@ void Scheduler::checkForMigrationOpportunities( if (tmpPendingMigrations.size() > 0) { faabric::util::FullLock lock(mx); for (auto msgPtr : tmpPendingMigrations) { - SPDLOG_INFO("Adding app: {}", msgPtr->appid()); pendingMigrations[msgPtr->appid()] = std::move(msgPtr); } } + + return false; } std::shared_ptr Scheduler::canAppBeMigrated( diff --git a/tests/test/scheduler/test_function_migration.cpp b/tests/test/scheduler/test_function_migration.cpp index 7d2090a08..022b9cb49 100644 --- a/tests/test/scheduler/test_function_migration.cpp +++ b/tests/test/scheduler/test_function_migration.cpp @@ -25,7 +25,6 @@ class FunctionMigrationTestFixture : public SchedulerTestFixture ~FunctionMigrationTestFixture() { - sch.clearRecordedMessages(); faabric::util::setMockMode(false); // Remove all hosts from global set @@ -89,7 +88,7 @@ TEST_CASE_METHOD(FunctionMigrationTestFixture, TEST_CASE_METHOD( FunctionMigrationTestFixture, - "Test function migration thread only works if set in the message", + "Test migration oportunities are only detected if set in the message", "[scheduler]") { // First set resources before calling the functions: one will be allocated @@ -105,6 +104,7 @@ TEST_CASE_METHOD( req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); uint32_t appId = req->messages().at(0).appid(); + // Build expected pending migrations std::shared_ptr expectedMigrations; SECTION("Migration not enabled") { expectedMigrations = nullptr; } @@ -157,10 +157,9 @@ TEST_CASE_METHOD( REQUIRE(sch.canAppBeMigrated(appId) == nullptr); } -TEST_CASE_METHOD( - FunctionMigrationTestFixture, - "Test function migration thread detects migration opportunities", - "[scheduler]") +TEST_CASE_METHOD(FunctionMigrationTestFixture, + "Test checking for migration opportunities", + "[scheduler]") { std::vector hosts = { masterHost, "hostA" }; std::vector slots = { 1, 1 }; @@ -170,9 +169,12 @@ TEST_CASE_METHOD( auto req = faabric::util::batchExecFactory("foo", "sleep", 2); int timeToSleep = SHORT_TEST_TIMEOUT_MS; req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); - req->mutable_messages()->at(0).set_migrationcheckperiod(2); uint32_t appId = req->messages().at(0).appid(); + // By setting the check period to a non-zero value, we are effectively + // opting in to be considered for migration + req->mutable_messages()->at(0).set_migrationcheckperiod(2); + auto decision = sch.callFunctions(req); std::shared_ptr expectedMigrations; @@ -226,11 +228,11 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( FunctionMigrationTestFixture, - "Test detecting migration opportunities with several hosts and requests", + "Test detecting migration opportunities for several messages and hosts", "[scheduler]") { - // First set resources before calling the functions: one will be allocated - // locally, another one in the remote host + // First set resources before calling the functions: one request will be + // allocated to each host std::vector hosts = { masterHost, "hostA", "hostB", "hostC" }; std::vector slots = { 1, 1, 1, 1 }; std::vector usedSlots = { 0, 0, 0, 0 }; @@ -239,9 +241,11 @@ TEST_CASE_METHOD( auto req = faabric::util::batchExecFactory("foo", "sleep", 4); int timeToSleep = SHORT_TEST_TIMEOUT_MS; req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); - req->mutable_messages()->at(0).set_migrationcheckperiod(2); uint32_t appId = req->messages().at(0).appid(); + // Opt in to be considered for migration + req->mutable_messages()->at(0).set_migrationcheckperiod(2); + auto decision = sch.callFunctions(req); // Set up expectations @@ -256,17 +260,17 @@ TEST_CASE_METHOD( std::vector newUsedSlots = { 1, 1, 1, 1 }; setHostResources(hosts, newSlots, newUsedSlots); - // Build expected result + // Build expected result: two migrations faabric::PendingMigrations expected; expected.set_appid(appId); - // Migrate last message (scheduled to last host) to first host. This - // fills up the first host. + // Migration 1: migrate last message (originally scheduled to last host) + // to first host. This fills up the first host. auto* migration1 = expected.add_migrations(); migration1->set_messageid(req->messages().at(3).id()); migration1->set_srchost(hosts.at(3)); migration1->set_dsthost(hosts.at(0)); - // Migrate penultimate message (scheduled to penultimate host) to second - // host. This fills up the second host. + // Migration 2: migrate penultimate message (originally scheduled to + // penultimate host) to second host. This fills up the second host. auto* migration2 = expected.add_migrations(); migration2->set_messageid(req->messages().at(2).id()); migration2->set_srchost(hosts.at(2)); @@ -301,4 +305,78 @@ TEST_CASE_METHOD( sch.checkForMigrationOpportunities(); REQUIRE(sch.canAppBeMigrated(appId) == nullptr); } + +TEST_CASE_METHOD( + FunctionMigrationTestFixture, + "Test function migration thread detects migration opportunities", + "[scheduler]") +{ + std::vector hosts = { masterHost, "hostA" }; + std::vector slots = { 1, 1 }; + std::vector usedSlots = { 0, 0 }; + setHostResources(hosts, slots, usedSlots); + + auto req = faabric::util::batchExecFactory("foo", "sleep", 2); + int checkPeriodSecs = 1; + int timeToSleep = 4 * checkPeriodSecs * 1000; + req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); + uint32_t appId = req->messages().at(0).appid(); + + // Opt in to be migrated + req->mutable_messages()->at(0).set_migrationcheckperiod(checkPeriodSecs); + + auto decision = sch.callFunctions(req); + + std::shared_ptr expectedMigrations; + + SECTION("Can not migrate") { expectedMigrations = nullptr; } + + // As we don't update the available resources, no migration opportunities + // will appear, even though we are checking for them + SECTION("Can migrate") + { + // Update host resources so that a migration opportunity appears + updateLocalResources(2, 1); + + // Build expected result + faabric::PendingMigrations expected; + expected.set_appid(appId); + auto* migration = expected.add_migrations(); + migration->set_messageid(req->messages().at(1).id()); + migration->set_srchost(hosts.at(1)); + migration->set_dsthost(hosts.at(0)); + expectedMigrations = + std::make_shared(expected); + } + + // Instead of directly calling the scheduler function to check for migration + // opportunites, sleep for enough time (twice the check period) so that a + // migration is detected by the background thread. + SLEEP_MS(2 * checkPeriodSecs * 1000); + + auto actualMigrations = sch.canAppBeMigrated(appId); + if (expectedMigrations == nullptr) { + REQUIRE(actualMigrations == expectedMigrations); + } else { + REQUIRE(actualMigrations->appid() == expectedMigrations->appid()); + REQUIRE(actualMigrations->migrations_size() == + expectedMigrations->migrations_size()); + for (int i = 0; i < actualMigrations->migrations_size(); i++) { + auto actual = actualMigrations->mutable_migrations()->at(i); + auto expected = expectedMigrations->mutable_migrations()->at(i); + REQUIRE(actual.messageid() == expected.messageid()); + REQUIRE(actual.srchost() == expected.srchost()); + REQUIRE(actual.dsthost() == expected.dsthost()); + } + } + + faabric::Message res = + sch.getFunctionResult(req->messages().at(0).id(), 2 * timeToSleep); + REQUIRE(res.returnvalue() == 0); + + // Check that after the result is set, the app can't be migrated no more + sch.checkForMigrationOpportunities(); + REQUIRE(sch.canAppBeMigrated(appId) == nullptr); +} + }