diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 48da69b86..f1355f02b 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -102,7 +102,8 @@ class Scheduler faabric::util::SchedulingDecision callFunctions( std::shared_ptr req, - bool forceLocal = false); + faabric::util::SchedulingTopologyHint = + faabric::util::SchedulingTopologyHint::NORMAL); faabric::util::SchedulingDecision callFunctions( std::shared_ptr req, @@ -233,7 +234,7 @@ class Scheduler faabric::util::SchedulingDecision makeSchedulingDecision( std::shared_ptr req, - bool forceLocal); + faabric::util::SchedulingTopologyHint topologyHint); faabric::util::SchedulingDecision doCallFunctions( std::shared_ptr req, diff --git a/include/faabric/util/scheduling.h b/include/faabric/util/scheduling.h index 4c8d8a5ce..0f2a6a64d 100644 --- a/include/faabric/util/scheduling.h +++ b/include/faabric/util/scheduling.h @@ -41,4 +41,17 @@ class SchedulingDecision int32_t appIdx, int32_t groupIdx); }; + +// Scheduling topology hints help the scheduler decide which host to assign new +// requests in a batch. +// - NORMAL: bin-packs requests to slots in hosts starting from the master +// host, and overloadds the master if it runs out of resources. +// - NEVER_ALONE: never allocates a single (non-master) request to a host +// without other requests of the batch. +enum SchedulingTopologyHint +{ + NORMAL, + FORCE_LOCAL, + NEVER_ALONE +}; } diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index e8b6b35e0..2e0604baf 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -77,7 +77,7 @@ void FunctionCallServer::recvExecuteFunctions(const uint8_t* buffer, // This host has now been told to execute these functions no matter what // TODO - avoid this copy scheduler.callFunctions(std::make_shared(msg), - true); + faabric::util::SchedulingTopologyHint::FORCE_LOCAL); } void FunctionCallServer::recvUnregister(const uint8_t* buffer, diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 946700919..97f50d556 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -210,7 +210,7 @@ void Scheduler::notifyExecutorShutdown(Executor* exec, faabric::util::SchedulingDecision Scheduler::callFunctions( std::shared_ptr req, - bool forceLocal) + faabric::util::SchedulingTopologyHint topologyHint) { // Note, we assume all the messages are for the same function and have the // same master host @@ -224,7 +224,8 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( // If we're not the master host, we need to forward the request back to the // master host. This will only happen if a nested batch execution happens. - if (!forceLocal && masterHost != thisHost) { + if (topologyHint != faabric::util::SchedulingTopologyHint::FORCE_LOCAL && + masterHost != thisHost) { std::string funcStr = faabric::util::funcToString(firstMsg, false); SPDLOG_DEBUG("Forwarding {} back to master {}", funcStr, masterHost); @@ -236,12 +237,13 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( faabric::util::FullLock lock(mx); - SchedulingDecision decision = makeSchedulingDecision(req, forceLocal); + SchedulingDecision decision = makeSchedulingDecision(req, topologyHint); // Send out point-to-point mappings if necessary (unless being forced to // execute locally, in which case they will be transmitted from the // master) - if (!forceLocal && (firstMsg.groupid() > 0)) { + if (topologyHint != faabric::util::SchedulingTopologyHint::FORCE_LOCAL && + (firstMsg.groupid() > 0)) { broker.setAndSendMappingsFromSchedulingDecision(decision); } @@ -251,14 +253,14 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( std::shared_ptr req, - bool forceLocal) + faabric::util::SchedulingTopologyHint topologyHint) { int nMessages = req->messages_size(); faabric::Message& firstMsg = req->mutable_messages()->at(0); std::string funcStr = faabric::util::funcToString(firstMsg, false); std::vector hosts; - if (forceLocal) { + if (topologyHint == faabric::util::SchedulingTopologyHint::FORCE_LOCAL) { // We're forced to execute locally here so we do all the messages for (int i = 0; i < nMessages; i++) { hosts.push_back(thisHost); @@ -296,6 +298,14 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( int available = r.slots() - r.usedslots(); int nOnThisHost = std::min(available, remainder); + // Under the NEVER_ALONE topology hint, we never choose a host + // unless we can schedule at least two requests in it. + if (topologyHint == + faabric::util::SchedulingTopologyHint::NEVER_ALONE && + nOnThisHost < 2) { + continue; + } + for (int i = 0; i < nOnThisHost; i++) { hosts.push_back(h); } @@ -323,6 +333,12 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( int available = r.slots() - r.usedslots(); int nOnThisHost = std::min(available, remainder); + if (topologyHint == + faabric::util::SchedulingTopologyHint::NEVER_ALONE && + nOnThisHost < 2) { + continue; + } + // Register the host if it's exected a function if (nOnThisHost > 0) { registeredHosts[funcStr].insert(h); @@ -342,11 +358,26 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( // At this point there's no more capacity in the system, so we // just need to overload locally if (remainder > 0) { - SPDLOG_DEBUG( - "Overloading {}/{} {} locally", remainder, nMessages, funcStr); + std::string overloadedHost = thisHost; + + // Under the NEVER_ALONE scheduling topology hint we want to + // overload the last host we assigned requests to. + if (topologyHint == + faabric::util::SchedulingTopologyHint::NEVER_ALONE && + !hosts.empty()) { + overloadedHost = hosts.back(); + } + + SPDLOG_DEBUG("Overloading {}/{} {} {}", + remainder, + nMessages, + funcStr, + overloadedHost == thisHost + ? "locally" + : "to host " + overloadedHost); for (int i = 0; i < remainder; i++) { - hosts.push_back(thisHost); + hosts.push_back(overloadedHost); } } } @@ -636,7 +667,11 @@ void Scheduler::callFunction(faabric::Message& msg, bool forceLocal) req->set_type(req->FUNCTIONS); // Make the call - callFunctions(req, forceLocal); + if (forceLocal) { + callFunctions(req, faabric::util::SchedulingTopologyHint::FORCE_LOCAL); + } else { + callFunctions(req); + } } void Scheduler::clearRecordedMessages() diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 573a08522..5ef4f1ef5 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -29,238 +29,224 @@ namespace tests { std::atomic restoreCount = 0; std::atomic resetCount = 0; -class TestExecutor final : public Executor +TestExecutor::TestExecutor(faabric::Message& msg) + : Executor(msg) +{} + +TestExecutor::~TestExecutor() = default; + +void TestExecutor::postFinish() { - public: - TestExecutor(faabric::Message& msg) - : Executor(msg) - {} + if (dummyMemory != nullptr) { + munmap(dummyMemory, dummyMemorySize); + } +} - ~TestExecutor() {} +void TestExecutor::reset(faabric::Message& msg) +{ + SPDLOG_DEBUG("Resetting TestExecutor"); + resetCount += 1; +} - uint8_t* dummyMemory = nullptr; - size_t dummyMemorySize = 0; +void TestExecutor::restore(faabric::Message& msg) +{ + SPDLOG_DEBUG("Restoring TestExecutor"); + restoreCount += 1; - void postFinish() override - { - if (dummyMemory != nullptr) { - munmap(dummyMemory, dummyMemorySize); - } - } + // Initialise the dummy memory and map to snapshot + faabric::snapshot::SnapshotRegistry& reg = + faabric::snapshot::getSnapshotRegistry(); + faabric::util::SnapshotData& snap = reg.getSnapshot(msg.snapshotkey()); - void reset(faabric::Message& msg) override - { - SPDLOG_DEBUG("Resetting TestExecutor"); - resetCount += 1; - } + // Note this has to be mmapped to be page-aligned + dummyMemorySize = snap.size; + dummyMemory = (uint8_t*)mmap( + nullptr, snap.size, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - void restore(faabric::Message& msg) override - { - SPDLOG_DEBUG("Restoring TestExecutor"); - restoreCount += 1; + reg.mapSnapshot(msg.snapshotkey(), dummyMemory); +} - // Initialise the dummy memory and map to snapshot - faabric::snapshot::SnapshotRegistry& reg = - faabric::snapshot::getSnapshotRegistry(); - faabric::util::SnapshotData& snap = reg.getSnapshot(msg.snapshotkey()); +faabric::util::SnapshotData TestExecutor::snapshot() +{ + faabric::util::SnapshotData snap; + snap.data = dummyMemory; + snap.size = dummyMemorySize; + return snap; +} - // Note this has to be mmapped to be page-aligned - dummyMemorySize = snap.size; - dummyMemory = (uint8_t*)mmap( - nullptr, snap.size, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); +int32_t TestExecutor::executeTask( + int threadPoolIdx, + int msgIdx, + std::shared_ptr reqOrig) +{ - reg.mapSnapshot(msg.snapshotkey(), dummyMemory); - } + faabric::Message& msg = reqOrig->mutable_messages()->at(msgIdx); - faabric::util::SnapshotData snapshot() override - { - faabric::util::SnapshotData snap; - snap.data = dummyMemory; - snap.size = dummyMemorySize; - return snap; - } + std::string funcStr = faabric::util::funcToString(msg, true); - int32_t executeTask( - int threadPoolIdx, - int msgIdx, - std::shared_ptr reqOrig) override - { + bool isThread = reqOrig->type() == faabric::BatchExecuteRequest::THREADS; - faabric::Message& msg = reqOrig->mutable_messages()->at(msgIdx); + // Check we're being asked to execute the function we've bound to + assert(msg.user() == boundMessage.user()); + assert(msg.function() == boundMessage.function()); - std::string funcStr = faabric::util::funcToString(msg, true); + // Custom thread-check function + if (msg.function() == "thread-check" && !isThread) { + msg.set_outputdata( + fmt::format("Threaded function {} executed successfully", msg.id())); - bool isThread = - reqOrig->type() == faabric::BatchExecuteRequest::THREADS; + // Set up the request + int nThreads = 5; + if (!msg.inputdata().empty()) { + nThreads = std::stoi(msg.inputdata()); + } - // Check we're being asked to execute the function we've bound to - assert(msg.user() == boundMessage.user()); - assert(msg.function() == boundMessage.function()); + SPDLOG_DEBUG("TestExecutor spawning {} threads", nThreads); - // Custom thread-check function - if (msg.function() == "thread-check" && !isThread) { - msg.set_outputdata(fmt::format( - "Threaded function {} executed successfully", msg.id())); + std::shared_ptr chainedReq = + faabric::util::batchExecFactory("dummy", "thread-check", nThreads); + chainedReq->set_type(faabric::BatchExecuteRequest::THREADS); - // Set up the request - int nThreads = 5; - if (!msg.inputdata().empty()) { - nThreads = std::stoi(msg.inputdata()); - } + // Create a dummy snapshot + std::string snapKey = funcStr + "-snap"; + faabric::snapshot::SnapshotRegistry& reg = + faabric::snapshot::getSnapshotRegistry(); + faabric::util::SnapshotData snap; + snap.data = new uint8_t[10]; + snap.size = 10; - SPDLOG_DEBUG("TestExecutor spawning {} threads", nThreads); + reg.takeSnapshot(snapKey, snap); - std::shared_ptr chainedReq = - faabric::util::batchExecFactory( - "dummy", "thread-check", nThreads); - chainedReq->set_type(faabric::BatchExecuteRequest::THREADS); + for (int i = 0; i < chainedReq->messages_size(); i++) { + faabric::Message& m = chainedReq->mutable_messages()->at(i); + m.set_snapshotkey(snapKey); + m.set_appidx(i + 1); + } - // Create a dummy snapshot - std::string snapKey = funcStr + "-snap"; - faabric::snapshot::SnapshotRegistry& reg = - faabric::snapshot::getSnapshotRegistry(); - faabric::util::SnapshotData snap; - snap.data = new uint8_t[10]; - snap.size = 10; + // Call the threads + Scheduler& sch = getScheduler(); + sch.callFunctions(chainedReq); - reg.takeSnapshot(snapKey, snap); + // Await the results + for (const auto& msg : chainedReq->messages()) { + uint32_t mid = msg.id(); + int threadRes = sch.awaitThreadResult(mid); - for (int i = 0; i < chainedReq->messages_size(); i++) { - faabric::Message& m = chainedReq->mutable_messages()->at(i); - m.set_snapshotkey(snapKey); - m.set_appidx(i + 1); + if (threadRes != mid / 100) { + SPDLOG_ERROR("TestExecutor got invalid thread result, {} != {}", + threadRes, + mid / 100); + return 1; } + } - // Call the threads - Scheduler& sch = getScheduler(); - sch.callFunctions(chainedReq); - - // Await the results - for (const auto& msg : chainedReq->messages()) { - uint32_t mid = msg.id(); - int threadRes = sch.awaitThreadResult(mid); - - if (threadRes != mid / 100) { - SPDLOG_ERROR( - "TestExecutor got invalid thread result, {} != {}", - threadRes, - mid / 100); - return 1; - } - } + // Delete the snapshot + delete[] snap.data; + reg.deleteSnapshot(snapKey); - // Delete the snapshot - delete[] snap.data; - reg.deleteSnapshot(snapKey); + SPDLOG_TRACE("TestExecutor got {} thread results", + chainedReq->messages_size()); + return 0; + } - SPDLOG_TRACE("TestExecutor got {} thread results", - chainedReq->messages_size()); - return 0; - } + if (msg.function() == "ret-one") { + return 1; + } - if (msg.function() == "ret-one") { - return 1; - } + if (msg.function() == "chain-check-a") { + if (msg.inputdata() == "chained") { + // Set up output data for the chained call + msg.set_outputdata("chain-check-a successful"); + } else { + // Chain this function and another + std::shared_ptr reqThis = + faabric::util::batchExecFactory("dummy", "chain-check-a", 1); + reqThis->mutable_messages()->at(0).set_inputdata("chained"); - if (msg.function() == "chain-check-a") { - if (msg.inputdata() == "chained") { - // Set up output data for the chained call - msg.set_outputdata("chain-check-a successful"); - } else { - // Chain this function and another - std::shared_ptr reqThis = - faabric::util::batchExecFactory("dummy", "chain-check-a", 1); - reqThis->mutable_messages()->at(0).set_inputdata("chained"); - - std::shared_ptr reqOther = - faabric::util::batchExecFactory("dummy", "chain-check-b", 1); - - Scheduler& sch = getScheduler(); - sch.callFunctions(reqThis); - sch.callFunctions(reqOther); - - for (const auto& m : reqThis->messages()) { - faabric::Message res = - sch.getFunctionResult(m.id(), SHORT_TEST_TIMEOUT_MS); - assert(res.outputdata() == "chain-check-a successful"); - } - - for (const auto& m : reqOther->messages()) { - faabric::Message res = - sch.getFunctionResult(m.id(), SHORT_TEST_TIMEOUT_MS); - assert(res.outputdata() == "chain-check-b successful"); - } - - msg.set_outputdata("All chain checks successful"); - } - return 0; - } + std::shared_ptr reqOther = + faabric::util::batchExecFactory("dummy", "chain-check-b", 1); - if (msg.function() == "chain-check-b") { - msg.set_outputdata("chain-check-b successful"); - return 0; - } + Scheduler& sch = getScheduler(); + sch.callFunctions(reqThis); + sch.callFunctions(reqOther); - if (msg.function() == "snap-check") { - // Modify a page of the dummy memory - uint8_t pageIdx = threadPoolIdx; - - faabric::util::SnapshotData& snapData = - faabric::snapshot::getSnapshotRegistry().getSnapshot( - msg.snapshotkey()); - - // Avoid writing a zero here as the memory is already zeroed hence - // it's not a change - std::vector data = { (uint8_t)(pageIdx + 1), - (uint8_t)(pageIdx + 2), - (uint8_t)(pageIdx + 3) }; - - // Set up a merge region that should catch the diff - size_t offset = (pageIdx * faabric::util::HOST_PAGE_SIZE); - snapData.addMergeRegion(offset, - data.size() + 10, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite); - - SPDLOG_DEBUG("TestExecutor modifying page {} of memory ({}-{})", - pageIdx, - offset, - offset + data.size()); - - uint8_t* offsetPtr = dummyMemory + offset; - std::memcpy(offsetPtr, data.data(), data.size()); - } + for (const auto& m : reqThis->messages()) { + faabric::Message res = + sch.getFunctionResult(m.id(), SHORT_TEST_TIMEOUT_MS); + assert(res.outputdata() == "chain-check-a successful"); + } - if (msg.function() == "echo") { - msg.set_outputdata(msg.inputdata()); - return 0; - } + for (const auto& m : reqOther->messages()) { + faabric::Message res = + sch.getFunctionResult(m.id(), SHORT_TEST_TIMEOUT_MS); + assert(res.outputdata() == "chain-check-b successful"); + } - if (msg.function() == "error") { - throw std::runtime_error("This is a test error"); + msg.set_outputdata("All chain checks successful"); } + return 0; + } - if (reqOrig->type() == faabric::BatchExecuteRequest::THREADS) { - SPDLOG_DEBUG("TestExecutor executing simple thread {}", msg.id()); - return msg.id() / 100; - } + if (msg.function() == "chain-check-b") { + msg.set_outputdata("chain-check-b successful"); + return 0; + } - // Default - msg.set_outputdata( - fmt::format("Simple function {} executed", msg.id())); + if (msg.function() == "snap-check") { + // Modify a page of the dummy memory + uint8_t pageIdx = threadPoolIdx; + + faabric::util::SnapshotData& snapData = + faabric::snapshot::getSnapshotRegistry().getSnapshot( + msg.snapshotkey()); + + // Avoid writing a zero here as the memory is already zeroed hence + // it's not a change + std::vector data = { (uint8_t)(pageIdx + 1), + (uint8_t)(pageIdx + 2), + (uint8_t)(pageIdx + 3) }; + + // Set up a merge region that should catch the diff + size_t offset = (pageIdx * faabric::util::HOST_PAGE_SIZE); + snapData.addMergeRegion(offset, + data.size() + 10, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); + + SPDLOG_DEBUG("TestExecutor modifying page {} of memory ({}-{})", + pageIdx, + offset, + offset + data.size()); + + uint8_t* offsetPtr = dummyMemory + offset; + std::memcpy(offsetPtr, data.data(), data.size()); + } + if (msg.function() == "echo") { + msg.set_outputdata(msg.inputdata()); return 0; } -}; -class TestExecutorFactory : public ExecutorFactory -{ - protected: - std::shared_ptr createExecutor(faabric::Message& msg) override - { - return std::make_shared(msg); + if (msg.function() == "error") { + throw std::runtime_error("This is a test error"); + } + + if (reqOrig->type() == faabric::BatchExecuteRequest::THREADS) { + SPDLOG_DEBUG("TestExecutor executing simple thread {}", msg.id()); + return msg.id() / 100; } -}; + + // Default + msg.set_outputdata(fmt::format("Simple function {} executed", msg.id())); + + return 0; +} + +std::shared_ptr TestExecutorFactory::createExecutor( + faabric::Message& msg) +{ + return std::make_shared(msg); +} class TestExecutorFixture : public SchedulerTestFixture @@ -295,8 +281,14 @@ class TestExecutorFixture { conf.overrideCpuCount = 10; conf.boundTimeout = SHORT_TEST_TIMEOUT_MS; + faabric::util::SchedulingTopologyHint topologyHint = + faabric::util::SchedulingTopologyHint::NORMAL; + + if (forceLocal) { + topologyHint = faabric::util::SchedulingTopologyHint::FORCE_LOCAL; + } - return sch.callFunctions(req, forceLocal).hosts; + return sch.callFunctions(req, topologyHint).hosts; } private: @@ -558,9 +550,9 @@ TEST_CASE_METHOD(TestExecutorFixture, conf.boundTimeout = SHORT_TEST_TIMEOUT_MS; // Execute all the functions - sch.callFunctions(reqA, false); - sch.callFunctions(reqB, false); - sch.callFunctions(reqC, false); + sch.callFunctions(reqA); + sch.callFunctions(reqB); + sch.callFunctions(reqC); faabric::Message resA1 = sch.getFunctionResult(reqA->messages().at(0).id(), SHORT_TEST_TIMEOUT_MS); @@ -844,7 +836,7 @@ TEST_CASE_METHOD(TestExecutorFixture, } // Call functions and force to execute locally - sch.callFunctions(req, true); + sch.callFunctions(req, faabric::util::SchedulingTopologyHint::FORCE_LOCAL); // As we're faking a non-master execution results will be sent back to // the fake master so we can't wait on them, thus have to sleep diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 69c683119..861db0af8 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -932,9 +932,17 @@ TEST_CASE_METHOD(DummyExecutorFixture, expectedDecision.addMessage(expectedHosts.at(i), req->messages().at(i)); } + // Set topology hint + faabric::util::SchedulingTopologyHint topologyHint = + faabric::util::SchedulingTopologyHint::NORMAL; + + if (forceLocal) { + topologyHint = faabric::util::SchedulingTopologyHint::FORCE_LOCAL; + } + // Schedule and check decision faabric::util::SchedulingDecision actualDecision = - sch.callFunctions(req, forceLocal); + sch.callFunctions(req, topologyHint); checkSchedulingDecisionEquality(expectedDecision, actualDecision); // Check mappings set up locally or not diff --git a/tests/test/scheduler/test_scheduling_decisions.cpp b/tests/test/scheduler/test_scheduling_decisions.cpp new file mode 100644 index 000000000..a016698bb --- /dev/null +++ b/tests/test/scheduler/test_scheduling_decisions.cpp @@ -0,0 +1,362 @@ +#include + +#include "fixtures.h" + +#include + +using namespace faabric::scheduler; + +namespace tests { + +class SchedulingDecisionTestFixture : public SchedulerTestFixture +{ + public: + SchedulingDecisionTestFixture() + { + faabric::util::setMockMode(true); + + std::shared_ptr fac = + std::make_shared(); + setExecutorFactory(fac); + } + + ~SchedulingDecisionTestFixture() { faabric::util::setMockMode(false); } + + protected: + int appId = 123; + int groupId = 456; + std::string masterHost = faabric::util::getSystemConfig().endpointHost; + + // Helper struct to configure one scheduling decision + struct SchedulingConfig + { + std::vector hosts; + std::vector slots; + int numReqs; + faabric::util::SchedulingTopologyHint topologyHint; + std::vector expectedHosts; + }; + + // Helper method to set the available hosts and slots per host prior to + // making a scheduling decision + void setHostResources(std::vector registeredHosts, + std::vector slotsPerHost) + { + assert(registeredHosts.size() == slotsPerHost.size()); + auto& sch = getScheduler(); + sch.clearRecordedMessages(); + + for (int i = 0; i < registeredHosts.size(); i++) { + faabric::HostResources resources; + resources.set_slots(slotsPerHost.at(i)); + resources.set_usedslots(0); + + sch.addHostToGlobalSet(registeredHosts.at(i)); + + // If setting resources for the master host, update the scheduler. + // Otherwise, queue the resource response + if (i == 0) { + sch.setThisHostResources(resources); + } else { + queueResourceResponse(registeredHosts.at(i), resources); + } + } + } + + void checkRecordedBatchMessages( + faabric::util::SchedulingDecision actualDecision, + const SchedulingConfig& config) + { + auto batchMessages = faabric::scheduler::getBatchRequests(); + + // First, turn our expected list of hosts to a map with frequency count + // and exclude the master host as no message is sent + std::map expectedHostCount; + for (const auto& h : config.expectedHosts) { + if (h != masterHost) { + ++expectedHostCount[h]; + } + } + + // Then check that the count matches the size of the batch sent + for (const auto& hostReqPair : batchMessages) { + REQUIRE(expectedHostCount.contains(hostReqPair.first)); + REQUIRE(expectedHostCount.at(hostReqPair.first) == + hostReqPair.second->messages_size()); + } + } + + // We test the scheduling decision twice: the first one will follow the + // unregistered hosts path, the second one the registerd hosts one. + void testActualSchedulingDecision( + std::shared_ptr req, + const SchedulingConfig& config) + { + faabric::util::SchedulingDecision actualDecision(appId, groupId); + + // Set resources for all hosts + setHostResources(config.hosts, config.slots); + + // The first time we run the batch request, we will follow the + // unregistered hosts path + actualDecision = sch.callFunctions(req, config.topologyHint); + REQUIRE(actualDecision.hosts == config.expectedHosts); + checkRecordedBatchMessages(actualDecision, config); + + // We wait for the execution to finish and the scheduler to vacate + // the slots. We can't wait on the function result, as sometimes + // functions won't be executed at all (e.g. master running out of + // resources). + SLEEP_MS(100); + + // Set resources again to reset the used slots + auto reqCopy = + faabric::util::batchExecFactory("foo", "baz", req->messages_size()); + setHostResources(config.hosts, config.slots); + + // The second time we run the batch request, we will follow + // the registered hosts path + actualDecision = sch.callFunctions(reqCopy, config.topologyHint); + REQUIRE(actualDecision.hosts == config.expectedHosts); + checkRecordedBatchMessages(actualDecision, config); + } +}; + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test basic scheduling decision", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA" }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test overloading all resources defaults to master", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 3, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA", masterHost }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test force local forces executing at master", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .topologyHint = faabric::util::SchedulingTopologyHint::FORCE_LOCAL, + .expectedHosts = { masterHost, "hostA" }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + SECTION("Force local off") + { + config.topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + config.expectedHosts = { masterHost, "hostA" }; + } + + SECTION("Force local on") + { + config.topologyHint = + faabric::util::SchedulingTopologyHint::FORCE_LOCAL, + config.expectedHosts = { masterHost, masterHost }; + } + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test master running out of resources", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 0, 2 }, + .numReqs = 2, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { "hostA", "hostA" }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test scheduling decision skips fully occupied worker hosts", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA", "hostB" }, + .slots = { 2, 0, 2 }, + .numReqs = 4, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, masterHost, "hostB", "hostB" }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + SECTION("No topology hint") + { + config.topologyHint = faabric::util::SchedulingTopologyHint::NORMAL; + } + + SECTION("Never alone topology hint") + { + config.topologyHint = + faabric::util::SchedulingTopologyHint::NEVER_ALONE; + } + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test scheduling decision with many requests", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA", "hostB", "hostC" }, + .slots = { 0, 0, 0, 0 }, + .numReqs = 8, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, masterHost, masterHost, masterHost }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + SECTION("Even slot distribution across hosts") + { + config.slots = { 2, 2, 2, 2 }; + config.expectedHosts = { masterHost, masterHost, "hostA", "hostA", + "hostB", "hostB", "hostC", "hostC" }; + } + + SECTION("Uneven slot ditribution across hosts") + { + config.slots = { 3, 2, 2, 1 }; + config.expectedHosts = { masterHost, masterHost, masterHost, "hostA", + "hostA", "hostB", "hostB", "hostC" }; + } + + SECTION("Very uneven slot distribution across hosts") + { + config.slots = { 1, 0, 0, 0 }; + config.expectedHosts = { + masterHost, masterHost, masterHost, masterHost, + masterHost, masterHost, masterHost, masterHost + }; + } + + SECTION("Decreasing to one and increasing slot distribution") + { + config.slots = { 2, 2, 1, 2 }; + + SECTION("No topology hint") + { + config.topologyHint = faabric::util::SchedulingTopologyHint::NORMAL; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", + "hostB", "hostC", "hostC", masterHost + }; + } + + SECTION("Never alone topology hint") + { + config.topologyHint = + faabric::util::SchedulingTopologyHint::NEVER_ALONE; + config.expectedHosts = { masterHost, masterHost, "hostA", "hostA", + "hostC", "hostC", "hostC", "hostC" }; + } + } + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test sticky pairs scheduling topology hint", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .topologyHint = faabric::util::SchedulingTopologyHint::NEVER_ALONE, + .expectedHosts = { masterHost, "hostA" }, + }; + + std::shared_ptr req; + + SECTION("Test with hint we only schedule to new hosts pairs of requests") + { + config.expectedHosts = { masterHost, masterHost }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test with hint we may overload remote hosts") + { + config.hosts = { masterHost, "hostA", "hostB" }; + config.numReqs = 5; + config.slots = { 2, 2, 1 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", "hostA" + }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION( + "Test with hint we still overload correctly if running out of slots") + { + config.hosts = { masterHost, "hostA" }; + config.numReqs = 5; + config.slots = { 2, 2 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", "hostA" + }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test hint with uneven slot distribution") + { + config.hosts = { masterHost, "hostA" }; + config.numReqs = 5; + config.slots = { 2, 3 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", "hostA" + }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test hint with uneven slot distribution and overload") + { + config.hosts = { masterHost, "hostA", "hostB" }; + config.numReqs = 6; + config.slots = { 2, 3, 1 }; + config.expectedHosts = { masterHost, masterHost, "hostA", + "hostA", "hostA", "hostA" }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + testActualSchedulingDecision(req, config); +} +} diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index ebd5c60f8..e19973b94 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -312,4 +312,36 @@ class PointToPointClientServerFixture faabric::transport::PointToPointClient cli; faabric::transport::PointToPointServer server; }; + +class TestExecutor final : public faabric::scheduler::Executor +{ + public: + TestExecutor(faabric::Message& msg); + + ~TestExecutor(); + + uint8_t* dummyMemory = nullptr; + + size_t dummyMemorySize = 0; + + void postFinish() override; + + void reset(faabric::Message& msg) override; + + void restore(faabric::Message& msg) override; + + faabric::util::SnapshotData snapshot() override; + + int32_t executeTask( + int threadPoolIdx, + int msgIdx, + std::shared_ptr reqOrig) override; +}; + +class TestExecutorFactory : public faabric::scheduler::ExecutorFactory +{ + protected: + std::shared_ptr createExecutor( + faabric::Message& msg) override; +}; }