From e3bc10cc60390cd2f7807a9942305d6876511cb9 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Nov 2021 11:14:17 +0000 Subject: [PATCH 1/9] add scheduling topology hint and extensive testing --- include/faabric/scheduler/Scheduler.h | 12 +- include/faabric/util/scheduling.h | 12 ++ src/scheduler/Scheduler.cpp | 58 ++++-- tests/test/util/test_scheduling.cpp | 256 +++++++++++++++++++++++++- 4 files changed, 323 insertions(+), 15 deletions(-) diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 48da69b86..2f79e5df6 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -102,7 +102,9 @@ class Scheduler faabric::util::SchedulingDecision callFunctions( std::shared_ptr req, - bool forceLocal = false); + bool forceLocal = false, + faabric::util::SchedulingTopologyHint = + faabric::util::SchedulingTopologyHint::NORMAL); faabric::util::SchedulingDecision callFunctions( std::shared_ptr req, @@ -177,6 +179,11 @@ class Scheduler void clearRecordedMessages(); + faabric::util::SchedulingDecision publicMakeSchedulingDecision( + std::shared_ptr req, + bool forceLocal, + faabric::util::SchedulingTopologyHint topologyHint); + // ---------------------------------- // Exec graph // ---------------------------------- @@ -233,7 +240,8 @@ class Scheduler faabric::util::SchedulingDecision makeSchedulingDecision( std::shared_ptr req, - bool forceLocal); + bool forceLocal, + faabric::util::SchedulingTopologyHint topologyHint); faabric::util::SchedulingDecision doCallFunctions( std::shared_ptr req, diff --git a/include/faabric/util/scheduling.h b/include/faabric/util/scheduling.h index 4c8d8a5ce..1c84a453e 100644 --- a/include/faabric/util/scheduling.h +++ b/include/faabric/util/scheduling.h @@ -41,4 +41,16 @@ class SchedulingDecision int32_t appIdx, int32_t groupIdx); }; + +// Scheduling topology hints help the scheduler decide which host to assign new +// requests in a batch. +// - NORMAL: bin-packs requests to slots in hosts starting from the master +// host, and overloadds the master if it runs out of resources. +// - PAIRS: never allocates a single (non-master) request to a host without +// other requests of the batch. +enum SchedulingTopologyHint +{ + NORMAL, + PAIRS +}; } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 946700919..abb9e6477 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -210,7 +210,8 @@ void Scheduler::notifyExecutorShutdown(Executor* exec, faabric::util::SchedulingDecision Scheduler::callFunctions( std::shared_ptr req, - bool forceLocal) + bool forceLocal, + faabric::util::SchedulingTopologyHint topologyHint) { // Note, we assume all the messages are for the same function and have the // same master host @@ -236,7 +237,8 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( faabric::util::FullLock lock(mx); - SchedulingDecision decision = makeSchedulingDecision(req, forceLocal); + SchedulingDecision decision = + makeSchedulingDecision(req, forceLocal, topologyHint); // Send out point-to-point mappings if necessary (unless being forced to // execute locally, in which case they will be transmitted from the @@ -249,9 +251,22 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( return doCallFunctions(req, decision, lock); } +faabric::util::SchedulingDecision Scheduler::publicMakeSchedulingDecision( + std::shared_ptr req, + bool forceLocal, + faabric::util::SchedulingTopologyHint topologyHint) +{ + if (!faabric::util::isTestMode()) { + throw std::runtime_error("This function must only be called in tests"); + } + + return makeSchedulingDecision(req, forceLocal, topologyHint); +} + faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( std::shared_ptr req, - bool forceLocal) + bool forceLocal, + faabric::util::SchedulingTopologyHint topologyHint) { int nMessages = req->messages_size(); faabric::Message& firstMsg = req->mutable_messages()->at(0); @@ -296,8 +311,20 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( int available = r.slots() - r.usedslots(); int nOnThisHost = std::min(available, remainder); - for (int i = 0; i < nOnThisHost; i++) { - hosts.push_back(h); + // Under the pairs topology hint, we never allocate a single + // non-master request (id != 0) to a host without other + // requests of the batch + bool stickToPreviousHost = + (topologyHint == + faabric::util::SchedulingTopologyHint::PAIRS && + nOnThisHost == 1 && hosts.size() > 0); + + if (stickToPreviousHost) { + hosts.push_back(hosts.back()); + } else { + for (int i = 0; i < nOnThisHost; i++) { + hosts.push_back(h); + } } remainder -= nOnThisHost; @@ -323,13 +350,22 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( int available = r.slots() - r.usedslots(); int nOnThisHost = std::min(available, remainder); - // Register the host if it's exected a function - if (nOnThisHost > 0) { - registeredHosts[funcStr].insert(h); - } + bool stickToPreviousHost = + (topologyHint == + faabric::util::SchedulingTopologyHint::PAIRS && + nOnThisHost == 1 && hosts.size() > 0); - for (int i = 0; i < nOnThisHost; i++) { - hosts.push_back(h); + if (stickToPreviousHost) { + hosts.push_back(hosts.back()); + } else { + // Register the host if it's exected a function + if (nOnThisHost > 0) { + registeredHosts[funcStr].insert(h); + } + + for (int i = 0; i < nOnThisHost; i++) { + hosts.push_back(h); + } } remainder -= nOnThisHost; diff --git a/tests/test/util/test_scheduling.cpp b/tests/test/util/test_scheduling.cpp index d3b45be34..e6e025e67 100644 --- a/tests/test/util/test_scheduling.cpp +++ b/tests/test/util/test_scheduling.cpp @@ -3,6 +3,8 @@ #include "faabric_utils.h" #include "fixtures.h" +#include +#include #include #include @@ -10,7 +12,7 @@ using namespace faabric::util; namespace tests { -TEST_CASE("Test building scheduling decisions", "[util]") +TEST_CASE("Test building scheduling decisions", "[util][scheduling-decisions]") { int appId = 123; int groupId = 345; @@ -46,7 +48,7 @@ TEST_CASE("Test building scheduling decisions", "[util]") } TEST_CASE("Test converting point-to-point mappings to scheduling decisions", - "[util]") + "[util][scheduling-decisions]") { int appId = 123; int groupId = 345; @@ -93,4 +95,254 @@ TEST_CASE("Test converting point-to-point mappings to scheduling decisions", REQUIRE(actual.messageIds == expectedMessageIds); REQUIRE(actual.hosts == expectedHosts); } + +class SchedulingDecisionTestFixture : public SchedulerTestFixture +{ + public: + SchedulingDecisionTestFixture() { faabric::util::setMockMode(true); } + + ~SchedulingDecisionTestFixture() { faabric::util::setMockMode(false); } + + protected: + int appId = 123; + int groupId = 456; + std::string masterHost = faabric::util::getSystemConfig().endpointHost; + + // Helper struct to configure one scheduling decision + struct SchedulingConfig + { + std::vector hosts; + std::vector slots; + int numReqs; + bool forceLocal; + SchedulingTopologyHint topologyHint; + std::vector expectedHosts; + }; + + // Helper method to set the available hosts and slots per host prior to + // making a scheduling decision + void setHostResources(std::vector registeredHosts, + std::vector slotsPerHost) + { + assert(registeredHosts.size() == slotsPerHost.size()); + auto& sch = faabric::scheduler::getScheduler(); + sch.clearRecordedMessages(); + + for (int i = 0; i < registeredHosts.size(); i++) { + faabric::HostResources resources; + resources.set_slots(slotsPerHost.at(i)); + resources.set_usedslots(0); + + sch.addHostToGlobalSet(registeredHosts.at(i)); + + // If setting resources for the master host, update the scheduler. + // Otherwise, queue the resource response + if (i == 0) { + sch.setThisHostResources(resources); + } else { + faabric::scheduler::queueResourceResponse(registeredHosts.at(i), + resources); + } + } + } + + // We test the scheduling decision twice: the first one will follow the + // unregistered hosts path, the second one the registerd hosts one. + void testActualSchedulingDecision( + std::shared_ptr req, + const SchedulingConfig& config) + { + auto& sch = faabric::scheduler::getScheduler(); + SchedulingDecision actualDecision(appId, groupId); + + // Set resources for all hosts + setHostResources(config.hosts, config.slots); + + // The first time we request the scheduling decision, we will follow the + // unregistered hosts path + actualDecision = sch.publicMakeSchedulingDecision( + req, config.forceLocal, config.topologyHint); + REQUIRE(actualDecision.hosts == config.expectedHosts); + + // Set resources again to reset the used slots + setHostResources(config.hosts, config.slots); + + // The second time we request the scheduling decision, we will follow + // the registered hosts path + actualDecision = sch.publicMakeSchedulingDecision( + req, config.forceLocal, config.topologyHint); + REQUIRE(actualDecision.hosts == config.expectedHosts); + } +}; + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test basic scheduling decision", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA" }, + }; + + auto req = batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test overloading all resources defaults to master", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 3, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA", masterHost }, + }; + + auto req = batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test force local forces executing at master", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA" }, + }; + + auto req = batchExecFactory("foo", "bar", config.numReqs); + + SECTION("Force local off") + { + config.forceLocal = false; + config.expectedHosts = { masterHost, "hostA" }; + } + + SECTION("Force local on") + { + config.forceLocal = true; + config.expectedHosts = { masterHost, masterHost }; + } + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test master running out of resources", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 0, 2 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::NORMAL, + .expectedHosts = { "hostA", "hostA" }, + }; + + auto req = batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test scheduling decision with many requests", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA", "hostB", "hostC" }, + .slots = { 0, 0, 0, 0 }, + .numReqs = 8, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, masterHost, masterHost, masterHost }, + }; + + auto req = batchExecFactory("foo", "bar", config.numReqs); + + SECTION("Even slot distribution across hosts") + { + config.slots = { 2, 2, 2, 2 }; + config.expectedHosts = { masterHost, masterHost, "hostA", "hostA", + "hostB", "hostB", "hostC", "hostC" }; + } + + SECTION("Uneven slot ditribution across hosts") + { + config.slots = { 3, 2, 2, 1 }; + config.expectedHosts = { masterHost, masterHost, masterHost, "hostA", + "hostA", "hostB", "hostB", "hostC" }; + } + + SECTION("Very uneven slot distribution across hosts") + { + config.slots = { 1, 0, 0, 0 }; + config.expectedHosts = { + masterHost, masterHost, masterHost, masterHost, + masterHost, masterHost, masterHost, masterHost + }; + } + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test sticky pairs scheduling topology hint", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::PAIRS, + .expectedHosts = { masterHost, "hostA" }, + }; + + std::shared_ptr req; + + SECTION("Test with hint we only schedule to new hosts pairs of requests") + { + config.expectedHosts = { masterHost, masterHost }; + req = batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test with hint we may overload remote hosts") + { + config.hosts = { masterHost, "hostA", "hostB" }; + config.numReqs = 5; + config.slots = { 2, 2, 1 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", "hostA" + }; + req = batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test with hint we still overload master if running out of slots") + { + config.hosts = { masterHost, "hostA" }; + config.numReqs = 5; + config.slots = { 2, 2 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", masterHost + }; + req = batchExecFactory("foo", "bar", config.numReqs); + } + + testActualSchedulingDecision(req, config); +} } From b9c12e516f1f903c57184d79b681a14780d650aa Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Nov 2021 11:37:03 +0000 Subject: [PATCH 2/9] adding a couple more tests --- tests/test/util/test_scheduling.cpp | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/test/util/test_scheduling.cpp b/tests/test/util/test_scheduling.cpp index e6e025e67..f4f6b4a57 100644 --- a/tests/test/util/test_scheduling.cpp +++ b/tests/test/util/test_scheduling.cpp @@ -321,6 +321,14 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, req = batchExecFactory("foo", "bar", config.numReqs); } + SECTION("Test hint does not apply for master requests") + { + config.slots = { 0, 1 }; + config.numReqs = 1; + config.expectedHosts = { "hostA" }; + req = batchExecFactory("foo", "bar", config.numReqs); + } + SECTION("Test with hint we may overload remote hosts") { config.hosts = { masterHost, "hostA", "hostB" }; @@ -343,6 +351,27 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, req = batchExecFactory("foo", "bar", config.numReqs); } + SECTION("Test hint with uneven slot distribution") + { + config.hosts = { masterHost, "hostA" }; + config.numReqs = 5; + config.slots = { 2, 3 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", "hostA" + }; + req = batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test hint with uneven slot distribution and overload") + { + config.hosts = { masterHost, "hostA", "hostB" }; + config.numReqs = 6; + config.slots = { 2, 3, 1 }; + config.expectedHosts = { masterHost, masterHost, "hostA", + "hostA", "hostA", "hostA" }; + req = batchExecFactory("foo", "bar", config.numReqs); + } + testActualSchedulingDecision(req, config); } } From 0402ceff0f9229dbfb7fbd5882f5017eabdfe88f Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Nov 2021 14:55:12 +0000 Subject: [PATCH 3/9] move tests to scheduler folder and change tags --- .../scheduler/test_scheduling_decisions.cpp | 288 ++++++++++++++++++ tests/test/util/test_scheduling.cpp | 285 +---------------- 2 files changed, 290 insertions(+), 283 deletions(-) create mode 100644 tests/test/scheduler/test_scheduling_decisions.cpp diff --git a/tests/test/scheduler/test_scheduling_decisions.cpp b/tests/test/scheduler/test_scheduling_decisions.cpp new file mode 100644 index 000000000..167554eb6 --- /dev/null +++ b/tests/test/scheduler/test_scheduling_decisions.cpp @@ -0,0 +1,288 @@ +#include + +#include "fixtures.h" + +#include + +using namespace faabric::scheduler; + +namespace tests { + +class SchedulingDecisionTestFixture : public SchedulerTestFixture +{ + public: + SchedulingDecisionTestFixture() { faabric::util::setMockMode(true); } + + ~SchedulingDecisionTestFixture() { faabric::util::setMockMode(false); } + + protected: + int appId = 123; + int groupId = 456; + std::string masterHost = faabric::util::getSystemConfig().endpointHost; + + // Helper struct to configure one scheduling decision + struct SchedulingConfig + { + std::vector hosts; + std::vector slots; + int numReqs; + bool forceLocal; + faabric::util::SchedulingTopologyHint topologyHint; + std::vector expectedHosts; + }; + + // Helper method to set the available hosts and slots per host prior to + // making a scheduling decision + void setHostResources(std::vector registeredHosts, + std::vector slotsPerHost) + { + assert(registeredHosts.size() == slotsPerHost.size()); + auto& sch = getScheduler(); + sch.clearRecordedMessages(); + + for (int i = 0; i < registeredHosts.size(); i++) { + faabric::HostResources resources; + resources.set_slots(slotsPerHost.at(i)); + resources.set_usedslots(0); + + sch.addHostToGlobalSet(registeredHosts.at(i)); + + // If setting resources for the master host, update the scheduler. + // Otherwise, queue the resource response + if (i == 0) { + sch.setThisHostResources(resources); + } else { + queueResourceResponse(registeredHosts.at(i), resources); + } + } + } + + // We test the scheduling decision twice: the first one will follow the + // unregistered hosts path, the second one the registerd hosts one. + void testActualSchedulingDecision( + std::shared_ptr req, + const SchedulingConfig& config) + { + auto& sch = getScheduler(); + faabric::util::SchedulingDecision actualDecision(appId, groupId); + + // Set resources for all hosts + setHostResources(config.hosts, config.slots); + + // The first time we request the scheduling decision, we will follow the + // unregistered hosts path + actualDecision = sch.publicMakeSchedulingDecision( + req, config.forceLocal, config.topologyHint); + REQUIRE(actualDecision.hosts == config.expectedHosts); + + // Set resources again to reset the used slots + setHostResources(config.hosts, config.slots); + + // The second time we request the scheduling decision, we will follow + // the registered hosts path + actualDecision = sch.publicMakeSchedulingDecision( + req, config.forceLocal, config.topologyHint); + REQUIRE(actualDecision.hosts == config.expectedHosts); + } +}; + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test basic scheduling decision", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA" }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test overloading all resources defaults to master", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 3, + .forceLocal = false, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA", masterHost }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test force local forces executing at master", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA" }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + SECTION("Force local off") + { + config.forceLocal = false; + config.expectedHosts = { masterHost, "hostA" }; + } + + SECTION("Force local on") + { + config.forceLocal = true; + config.expectedHosts = { masterHost, masterHost }; + } + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test master running out of resources", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 0, 2 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { "hostA", "hostA" }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test scheduling decision with many requests", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA", "hostB", "hostC" }, + .slots = { 0, 0, 0, 0 }, + .numReqs = 8, + .forceLocal = false, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, masterHost, masterHost, masterHost }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + SECTION("Even slot distribution across hosts") + { + config.slots = { 2, 2, 2, 2 }; + config.expectedHosts = { masterHost, masterHost, "hostA", "hostA", + "hostB", "hostB", "hostC", "hostC" }; + } + + SECTION("Uneven slot ditribution across hosts") + { + config.slots = { 3, 2, 2, 1 }; + config.expectedHosts = { masterHost, masterHost, masterHost, "hostA", + "hostA", "hostB", "hostB", "hostC" }; + } + + SECTION("Very uneven slot distribution across hosts") + { + config.slots = { 1, 0, 0, 0 }; + config.expectedHosts = { + masterHost, masterHost, masterHost, masterHost, + masterHost, masterHost, masterHost, masterHost + }; + } + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test sticky pairs scheduling topology hint", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = faabric::util::SchedulingTopologyHint::PAIRS, + .expectedHosts = { masterHost, "hostA" }, + }; + + std::shared_ptr req; + + SECTION("Test with hint we only schedule to new hosts pairs of requests") + { + config.expectedHosts = { masterHost, masterHost }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test hint does not apply for master requests") + { + config.slots = { 0, 1 }; + config.numReqs = 1; + config.expectedHosts = { "hostA" }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test with hint we may overload remote hosts") + { + config.hosts = { masterHost, "hostA", "hostB" }; + config.numReqs = 5; + config.slots = { 2, 2, 1 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", "hostA" + }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test with hint we still overload master if running out of slots") + { + config.hosts = { masterHost, "hostA" }; + config.numReqs = 5; + config.slots = { 2, 2 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", masterHost + }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test hint with uneven slot distribution") + { + config.hosts = { masterHost, "hostA" }; + config.numReqs = 5; + config.slots = { 2, 3 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", "hostA" + }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test hint with uneven slot distribution and overload") + { + config.hosts = { masterHost, "hostA", "hostB" }; + config.numReqs = 6; + config.slots = { 2, 3, 1 }; + config.expectedHosts = { masterHost, masterHost, "hostA", + "hostA", "hostA", "hostA" }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + testActualSchedulingDecision(req, config); +} +} diff --git a/tests/test/util/test_scheduling.cpp b/tests/test/util/test_scheduling.cpp index f4f6b4a57..d3b45be34 100644 --- a/tests/test/util/test_scheduling.cpp +++ b/tests/test/util/test_scheduling.cpp @@ -3,8 +3,6 @@ #include "faabric_utils.h" #include "fixtures.h" -#include -#include #include #include @@ -12,7 +10,7 @@ using namespace faabric::util; namespace tests { -TEST_CASE("Test building scheduling decisions", "[util][scheduling-decisions]") +TEST_CASE("Test building scheduling decisions", "[util]") { int appId = 123; int groupId = 345; @@ -48,7 +46,7 @@ TEST_CASE("Test building scheduling decisions", "[util][scheduling-decisions]") } TEST_CASE("Test converting point-to-point mappings to scheduling decisions", - "[util][scheduling-decisions]") + "[util]") { int appId = 123; int groupId = 345; @@ -95,283 +93,4 @@ TEST_CASE("Test converting point-to-point mappings to scheduling decisions", REQUIRE(actual.messageIds == expectedMessageIds); REQUIRE(actual.hosts == expectedHosts); } - -class SchedulingDecisionTestFixture : public SchedulerTestFixture -{ - public: - SchedulingDecisionTestFixture() { faabric::util::setMockMode(true); } - - ~SchedulingDecisionTestFixture() { faabric::util::setMockMode(false); } - - protected: - int appId = 123; - int groupId = 456; - std::string masterHost = faabric::util::getSystemConfig().endpointHost; - - // Helper struct to configure one scheduling decision - struct SchedulingConfig - { - std::vector hosts; - std::vector slots; - int numReqs; - bool forceLocal; - SchedulingTopologyHint topologyHint; - std::vector expectedHosts; - }; - - // Helper method to set the available hosts and slots per host prior to - // making a scheduling decision - void setHostResources(std::vector registeredHosts, - std::vector slotsPerHost) - { - assert(registeredHosts.size() == slotsPerHost.size()); - auto& sch = faabric::scheduler::getScheduler(); - sch.clearRecordedMessages(); - - for (int i = 0; i < registeredHosts.size(); i++) { - faabric::HostResources resources; - resources.set_slots(slotsPerHost.at(i)); - resources.set_usedslots(0); - - sch.addHostToGlobalSet(registeredHosts.at(i)); - - // If setting resources for the master host, update the scheduler. - // Otherwise, queue the resource response - if (i == 0) { - sch.setThisHostResources(resources); - } else { - faabric::scheduler::queueResourceResponse(registeredHosts.at(i), - resources); - } - } - } - - // We test the scheduling decision twice: the first one will follow the - // unregistered hosts path, the second one the registerd hosts one. - void testActualSchedulingDecision( - std::shared_ptr req, - const SchedulingConfig& config) - { - auto& sch = faabric::scheduler::getScheduler(); - SchedulingDecision actualDecision(appId, groupId); - - // Set resources for all hosts - setHostResources(config.hosts, config.slots); - - // The first time we request the scheduling decision, we will follow the - // unregistered hosts path - actualDecision = sch.publicMakeSchedulingDecision( - req, config.forceLocal, config.topologyHint); - REQUIRE(actualDecision.hosts == config.expectedHosts); - - // Set resources again to reset the used slots - setHostResources(config.hosts, config.slots); - - // The second time we request the scheduling decision, we will follow - // the registered hosts path - actualDecision = sch.publicMakeSchedulingDecision( - req, config.forceLocal, config.topologyHint); - REQUIRE(actualDecision.hosts == config.expectedHosts); - } -}; - -TEST_CASE_METHOD(SchedulingDecisionTestFixture, - "Test basic scheduling decision", - "[util][scheduling-decision]") -{ - SchedulingConfig config = { - .hosts = { masterHost, "hostA" }, - .slots = { 1, 1 }, - .numReqs = 2, - .forceLocal = false, - .topologyHint = SchedulingTopologyHint::NORMAL, - .expectedHosts = { masterHost, "hostA" }, - }; - - auto req = batchExecFactory("foo", "bar", config.numReqs); - - testActualSchedulingDecision(req, config); -} - -TEST_CASE_METHOD(SchedulingDecisionTestFixture, - "Test overloading all resources defaults to master", - "[util][scheduling-decision]") -{ - SchedulingConfig config = { - .hosts = { masterHost, "hostA" }, - .slots = { 1, 1 }, - .numReqs = 3, - .forceLocal = false, - .topologyHint = SchedulingTopologyHint::NORMAL, - .expectedHosts = { masterHost, "hostA", masterHost }, - }; - - auto req = batchExecFactory("foo", "bar", config.numReqs); - - testActualSchedulingDecision(req, config); -} - -TEST_CASE_METHOD(SchedulingDecisionTestFixture, - "Test force local forces executing at master", - "[util][scheduling-decision]") -{ - SchedulingConfig config = { - .hosts = { masterHost, "hostA" }, - .slots = { 1, 1 }, - .numReqs = 2, - .forceLocal = false, - .topologyHint = SchedulingTopologyHint::NORMAL, - .expectedHosts = { masterHost, "hostA" }, - }; - - auto req = batchExecFactory("foo", "bar", config.numReqs); - - SECTION("Force local off") - { - config.forceLocal = false; - config.expectedHosts = { masterHost, "hostA" }; - } - - SECTION("Force local on") - { - config.forceLocal = true; - config.expectedHosts = { masterHost, masterHost }; - } - - testActualSchedulingDecision(req, config); -} - -TEST_CASE_METHOD(SchedulingDecisionTestFixture, - "Test master running out of resources", - "[util][scheduling-decision]") -{ - SchedulingConfig config = { - .hosts = { masterHost, "hostA" }, - .slots = { 0, 2 }, - .numReqs = 2, - .forceLocal = false, - .topologyHint = SchedulingTopologyHint::NORMAL, - .expectedHosts = { "hostA", "hostA" }, - }; - - auto req = batchExecFactory("foo", "bar", config.numReqs); - - testActualSchedulingDecision(req, config); -} - -TEST_CASE_METHOD(SchedulingDecisionTestFixture, - "Test scheduling decision with many requests", - "[util][scheduling-decision]") -{ - SchedulingConfig config = { - .hosts = { masterHost, "hostA", "hostB", "hostC" }, - .slots = { 0, 0, 0, 0 }, - .numReqs = 8, - .forceLocal = false, - .topologyHint = SchedulingTopologyHint::NORMAL, - .expectedHosts = { masterHost, masterHost, masterHost, masterHost }, - }; - - auto req = batchExecFactory("foo", "bar", config.numReqs); - - SECTION("Even slot distribution across hosts") - { - config.slots = { 2, 2, 2, 2 }; - config.expectedHosts = { masterHost, masterHost, "hostA", "hostA", - "hostB", "hostB", "hostC", "hostC" }; - } - - SECTION("Uneven slot ditribution across hosts") - { - config.slots = { 3, 2, 2, 1 }; - config.expectedHosts = { masterHost, masterHost, masterHost, "hostA", - "hostA", "hostB", "hostB", "hostC" }; - } - - SECTION("Very uneven slot distribution across hosts") - { - config.slots = { 1, 0, 0, 0 }; - config.expectedHosts = { - masterHost, masterHost, masterHost, masterHost, - masterHost, masterHost, masterHost, masterHost - }; - } - - testActualSchedulingDecision(req, config); -} - -TEST_CASE_METHOD(SchedulingDecisionTestFixture, - "Test sticky pairs scheduling topology hint", - "[util][scheduling-decision]") -{ - SchedulingConfig config = { - .hosts = { masterHost, "hostA" }, - .slots = { 1, 1 }, - .numReqs = 2, - .forceLocal = false, - .topologyHint = SchedulingTopologyHint::PAIRS, - .expectedHosts = { masterHost, "hostA" }, - }; - - std::shared_ptr req; - - SECTION("Test with hint we only schedule to new hosts pairs of requests") - { - config.expectedHosts = { masterHost, masterHost }; - req = batchExecFactory("foo", "bar", config.numReqs); - } - - SECTION("Test hint does not apply for master requests") - { - config.slots = { 0, 1 }; - config.numReqs = 1; - config.expectedHosts = { "hostA" }; - req = batchExecFactory("foo", "bar", config.numReqs); - } - - SECTION("Test with hint we may overload remote hosts") - { - config.hosts = { masterHost, "hostA", "hostB" }; - config.numReqs = 5; - config.slots = { 2, 2, 1 }; - config.expectedHosts = { - masterHost, masterHost, "hostA", "hostA", "hostA" - }; - req = batchExecFactory("foo", "bar", config.numReqs); - } - - SECTION("Test with hint we still overload master if running out of slots") - { - config.hosts = { masterHost, "hostA" }; - config.numReqs = 5; - config.slots = { 2, 2 }; - config.expectedHosts = { - masterHost, masterHost, "hostA", "hostA", masterHost - }; - req = batchExecFactory("foo", "bar", config.numReqs); - } - - SECTION("Test hint with uneven slot distribution") - { - config.hosts = { masterHost, "hostA" }; - config.numReqs = 5; - config.slots = { 2, 3 }; - config.expectedHosts = { - masterHost, masterHost, "hostA", "hostA", "hostA" - }; - req = batchExecFactory("foo", "bar", config.numReqs); - } - - SECTION("Test hint with uneven slot distribution and overload") - { - config.hosts = { masterHost, "hostA", "hostB" }; - config.numReqs = 6; - config.slots = { 2, 3, 1 }; - config.expectedHosts = { masterHost, masterHost, "hostA", - "hostA", "hostA", "hostA" }; - req = batchExecFactory("foo", "bar", config.numReqs); - } - - testActualSchedulingDecision(req, config); -} } From 850801e50c79b158c26ff45e033ec02780aef83b Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Nov 2021 15:59:59 +0000 Subject: [PATCH 4/9] move to callFunctions and remove publicMakeSchedulingDecision --- include/faabric/scheduler/Scheduler.h | 5 - src/scheduler/Scheduler.cpp | 12 - tests/test/scheduler/test_executor.cpp | 370 +++++++++--------- .../scheduler/test_scheduling_decisions.cpp | 30 +- tests/utils/fixtures.h | 32 ++ 5 files changed, 232 insertions(+), 217 deletions(-) diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 2f79e5df6..6bdc92a62 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -179,11 +179,6 @@ class Scheduler void clearRecordedMessages(); - faabric::util::SchedulingDecision publicMakeSchedulingDecision( - std::shared_ptr req, - bool forceLocal, - faabric::util::SchedulingTopologyHint topologyHint); - // ---------------------------------- // Exec graph // ---------------------------------- diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index abb9e6477..bc76c9206 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -251,18 +251,6 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( return doCallFunctions(req, decision, lock); } -faabric::util::SchedulingDecision Scheduler::publicMakeSchedulingDecision( - std::shared_ptr req, - bool forceLocal, - faabric::util::SchedulingTopologyHint topologyHint) -{ - if (!faabric::util::isTestMode()) { - throw std::runtime_error("This function must only be called in tests"); - } - - return makeSchedulingDecision(req, forceLocal, topologyHint); -} - faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( std::shared_ptr req, bool forceLocal, diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 573a08522..43d33736b 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -29,238 +29,224 @@ namespace tests { std::atomic restoreCount = 0; std::atomic resetCount = 0; -class TestExecutor final : public Executor +TestExecutor::TestExecutor(faabric::Message& msg) + : Executor(msg) +{} + +TestExecutor::~TestExecutor() {} + +void TestExecutor::postFinish() { - public: - TestExecutor(faabric::Message& msg) - : Executor(msg) - {} + if (dummyMemory != nullptr) { + munmap(dummyMemory, dummyMemorySize); + } +} - ~TestExecutor() {} +void TestExecutor::reset(faabric::Message& msg) +{ + SPDLOG_DEBUG("Resetting TestExecutor"); + resetCount += 1; +} - uint8_t* dummyMemory = nullptr; - size_t dummyMemorySize = 0; +void TestExecutor::restore(faabric::Message& msg) +{ + SPDLOG_DEBUG("Restoring TestExecutor"); + restoreCount += 1; - void postFinish() override - { - if (dummyMemory != nullptr) { - munmap(dummyMemory, dummyMemorySize); - } - } + // Initialise the dummy memory and map to snapshot + faabric::snapshot::SnapshotRegistry& reg = + faabric::snapshot::getSnapshotRegistry(); + faabric::util::SnapshotData& snap = reg.getSnapshot(msg.snapshotkey()); - void reset(faabric::Message& msg) override - { - SPDLOG_DEBUG("Resetting TestExecutor"); - resetCount += 1; - } + // Note this has to be mmapped to be page-aligned + dummyMemorySize = snap.size; + dummyMemory = (uint8_t*)mmap( + nullptr, snap.size, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - void restore(faabric::Message& msg) override - { - SPDLOG_DEBUG("Restoring TestExecutor"); - restoreCount += 1; + reg.mapSnapshot(msg.snapshotkey(), dummyMemory); +} - // Initialise the dummy memory and map to snapshot - faabric::snapshot::SnapshotRegistry& reg = - faabric::snapshot::getSnapshotRegistry(); - faabric::util::SnapshotData& snap = reg.getSnapshot(msg.snapshotkey()); +faabric::util::SnapshotData TestExecutor::snapshot() +{ + faabric::util::SnapshotData snap; + snap.data = dummyMemory; + snap.size = dummyMemorySize; + return snap; +} - // Note this has to be mmapped to be page-aligned - dummyMemorySize = snap.size; - dummyMemory = (uint8_t*)mmap( - nullptr, snap.size, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); +int32_t TestExecutor::executeTask( + int threadPoolIdx, + int msgIdx, + std::shared_ptr reqOrig) +{ - reg.mapSnapshot(msg.snapshotkey(), dummyMemory); - } + faabric::Message& msg = reqOrig->mutable_messages()->at(msgIdx); - faabric::util::SnapshotData snapshot() override - { - faabric::util::SnapshotData snap; - snap.data = dummyMemory; - snap.size = dummyMemorySize; - return snap; - } + std::string funcStr = faabric::util::funcToString(msg, true); - int32_t executeTask( - int threadPoolIdx, - int msgIdx, - std::shared_ptr reqOrig) override - { + bool isThread = reqOrig->type() == faabric::BatchExecuteRequest::THREADS; - faabric::Message& msg = reqOrig->mutable_messages()->at(msgIdx); + // Check we're being asked to execute the function we've bound to + assert(msg.user() == boundMessage.user()); + assert(msg.function() == boundMessage.function()); - std::string funcStr = faabric::util::funcToString(msg, true); + // Custom thread-check function + if (msg.function() == "thread-check" && !isThread) { + msg.set_outputdata( + fmt::format("Threaded function {} executed successfully", msg.id())); - bool isThread = - reqOrig->type() == faabric::BatchExecuteRequest::THREADS; + // Set up the request + int nThreads = 5; + if (!msg.inputdata().empty()) { + nThreads = std::stoi(msg.inputdata()); + } - // Check we're being asked to execute the function we've bound to - assert(msg.user() == boundMessage.user()); - assert(msg.function() == boundMessage.function()); + SPDLOG_DEBUG("TestExecutor spawning {} threads", nThreads); - // Custom thread-check function - if (msg.function() == "thread-check" && !isThread) { - msg.set_outputdata(fmt::format( - "Threaded function {} executed successfully", msg.id())); + std::shared_ptr chainedReq = + faabric::util::batchExecFactory("dummy", "thread-check", nThreads); + chainedReq->set_type(faabric::BatchExecuteRequest::THREADS); - // Set up the request - int nThreads = 5; - if (!msg.inputdata().empty()) { - nThreads = std::stoi(msg.inputdata()); - } + // Create a dummy snapshot + std::string snapKey = funcStr + "-snap"; + faabric::snapshot::SnapshotRegistry& reg = + faabric::snapshot::getSnapshotRegistry(); + faabric::util::SnapshotData snap; + snap.data = new uint8_t[10]; + snap.size = 10; - SPDLOG_DEBUG("TestExecutor spawning {} threads", nThreads); + reg.takeSnapshot(snapKey, snap); - std::shared_ptr chainedReq = - faabric::util::batchExecFactory( - "dummy", "thread-check", nThreads); - chainedReq->set_type(faabric::BatchExecuteRequest::THREADS); + for (int i = 0; i < chainedReq->messages_size(); i++) { + faabric::Message& m = chainedReq->mutable_messages()->at(i); + m.set_snapshotkey(snapKey); + m.set_appidx(i + 1); + } - // Create a dummy snapshot - std::string snapKey = funcStr + "-snap"; - faabric::snapshot::SnapshotRegistry& reg = - faabric::snapshot::getSnapshotRegistry(); - faabric::util::SnapshotData snap; - snap.data = new uint8_t[10]; - snap.size = 10; + // Call the threads + Scheduler& sch = getScheduler(); + sch.callFunctions(chainedReq); - reg.takeSnapshot(snapKey, snap); + // Await the results + for (const auto& msg : chainedReq->messages()) { + uint32_t mid = msg.id(); + int threadRes = sch.awaitThreadResult(mid); - for (int i = 0; i < chainedReq->messages_size(); i++) { - faabric::Message& m = chainedReq->mutable_messages()->at(i); - m.set_snapshotkey(snapKey); - m.set_appidx(i + 1); + if (threadRes != mid / 100) { + SPDLOG_ERROR("TestExecutor got invalid thread result, {} != {}", + threadRes, + mid / 100); + return 1; } + } - // Call the threads - Scheduler& sch = getScheduler(); - sch.callFunctions(chainedReq); - - // Await the results - for (const auto& msg : chainedReq->messages()) { - uint32_t mid = msg.id(); - int threadRes = sch.awaitThreadResult(mid); - - if (threadRes != mid / 100) { - SPDLOG_ERROR( - "TestExecutor got invalid thread result, {} != {}", - threadRes, - mid / 100); - return 1; - } - } + // Delete the snapshot + delete[] snap.data; + reg.deleteSnapshot(snapKey); - // Delete the snapshot - delete[] snap.data; - reg.deleteSnapshot(snapKey); + SPDLOG_TRACE("TestExecutor got {} thread results", + chainedReq->messages_size()); + return 0; + } - SPDLOG_TRACE("TestExecutor got {} thread results", - chainedReq->messages_size()); - return 0; - } + if (msg.function() == "ret-one") { + return 1; + } - if (msg.function() == "ret-one") { - return 1; - } + if (msg.function() == "chain-check-a") { + if (msg.inputdata() == "chained") { + // Set up output data for the chained call + msg.set_outputdata("chain-check-a successful"); + } else { + // Chain this function and another + std::shared_ptr reqThis = + faabric::util::batchExecFactory("dummy", "chain-check-a", 1); + reqThis->mutable_messages()->at(0).set_inputdata("chained"); - if (msg.function() == "chain-check-a") { - if (msg.inputdata() == "chained") { - // Set up output data for the chained call - msg.set_outputdata("chain-check-a successful"); - } else { - // Chain this function and another - std::shared_ptr reqThis = - faabric::util::batchExecFactory("dummy", "chain-check-a", 1); - reqThis->mutable_messages()->at(0).set_inputdata("chained"); - - std::shared_ptr reqOther = - faabric::util::batchExecFactory("dummy", "chain-check-b", 1); - - Scheduler& sch = getScheduler(); - sch.callFunctions(reqThis); - sch.callFunctions(reqOther); - - for (const auto& m : reqThis->messages()) { - faabric::Message res = - sch.getFunctionResult(m.id(), SHORT_TEST_TIMEOUT_MS); - assert(res.outputdata() == "chain-check-a successful"); - } - - for (const auto& m : reqOther->messages()) { - faabric::Message res = - sch.getFunctionResult(m.id(), SHORT_TEST_TIMEOUT_MS); - assert(res.outputdata() == "chain-check-b successful"); - } - - msg.set_outputdata("All chain checks successful"); - } - return 0; - } + std::shared_ptr reqOther = + faabric::util::batchExecFactory("dummy", "chain-check-b", 1); - if (msg.function() == "chain-check-b") { - msg.set_outputdata("chain-check-b successful"); - return 0; - } + Scheduler& sch = getScheduler(); + sch.callFunctions(reqThis); + sch.callFunctions(reqOther); - if (msg.function() == "snap-check") { - // Modify a page of the dummy memory - uint8_t pageIdx = threadPoolIdx; - - faabric::util::SnapshotData& snapData = - faabric::snapshot::getSnapshotRegistry().getSnapshot( - msg.snapshotkey()); - - // Avoid writing a zero here as the memory is already zeroed hence - // it's not a change - std::vector data = { (uint8_t)(pageIdx + 1), - (uint8_t)(pageIdx + 2), - (uint8_t)(pageIdx + 3) }; - - // Set up a merge region that should catch the diff - size_t offset = (pageIdx * faabric::util::HOST_PAGE_SIZE); - snapData.addMergeRegion(offset, - data.size() + 10, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite); - - SPDLOG_DEBUG("TestExecutor modifying page {} of memory ({}-{})", - pageIdx, - offset, - offset + data.size()); - - uint8_t* offsetPtr = dummyMemory + offset; - std::memcpy(offsetPtr, data.data(), data.size()); - } + for (const auto& m : reqThis->messages()) { + faabric::Message res = + sch.getFunctionResult(m.id(), SHORT_TEST_TIMEOUT_MS); + assert(res.outputdata() == "chain-check-a successful"); + } - if (msg.function() == "echo") { - msg.set_outputdata(msg.inputdata()); - return 0; - } + for (const auto& m : reqOther->messages()) { + faabric::Message res = + sch.getFunctionResult(m.id(), SHORT_TEST_TIMEOUT_MS); + assert(res.outputdata() == "chain-check-b successful"); + } - if (msg.function() == "error") { - throw std::runtime_error("This is a test error"); + msg.set_outputdata("All chain checks successful"); } + return 0; + } - if (reqOrig->type() == faabric::BatchExecuteRequest::THREADS) { - SPDLOG_DEBUG("TestExecutor executing simple thread {}", msg.id()); - return msg.id() / 100; - } + if (msg.function() == "chain-check-b") { + msg.set_outputdata("chain-check-b successful"); + return 0; + } - // Default - msg.set_outputdata( - fmt::format("Simple function {} executed", msg.id())); + if (msg.function() == "snap-check") { + // Modify a page of the dummy memory + uint8_t pageIdx = threadPoolIdx; + + faabric::util::SnapshotData& snapData = + faabric::snapshot::getSnapshotRegistry().getSnapshot( + msg.snapshotkey()); + + // Avoid writing a zero here as the memory is already zeroed hence + // it's not a change + std::vector data = { (uint8_t)(pageIdx + 1), + (uint8_t)(pageIdx + 2), + (uint8_t)(pageIdx + 3) }; + + // Set up a merge region that should catch the diff + size_t offset = (pageIdx * faabric::util::HOST_PAGE_SIZE); + snapData.addMergeRegion(offset, + data.size() + 10, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); + + SPDLOG_DEBUG("TestExecutor modifying page {} of memory ({}-{})", + pageIdx, + offset, + offset + data.size()); + + uint8_t* offsetPtr = dummyMemory + offset; + std::memcpy(offsetPtr, data.data(), data.size()); + } + if (msg.function() == "echo") { + msg.set_outputdata(msg.inputdata()); return 0; } -}; -class TestExecutorFactory : public ExecutorFactory -{ - protected: - std::shared_ptr createExecutor(faabric::Message& msg) override - { - return std::make_shared(msg); + if (msg.function() == "error") { + throw std::runtime_error("This is a test error"); } -}; + + if (reqOrig->type() == faabric::BatchExecuteRequest::THREADS) { + SPDLOG_DEBUG("TestExecutor executing simple thread {}", msg.id()); + return msg.id() / 100; + } + + // Default + msg.set_outputdata(fmt::format("Simple function {} executed", msg.id())); + + return 0; +} + +std::shared_ptr TestExecutorFactory::createExecutor( + faabric::Message& msg) +{ + return std::make_shared(msg); +} class TestExecutorFixture : public SchedulerTestFixture diff --git a/tests/test/scheduler/test_scheduling_decisions.cpp b/tests/test/scheduler/test_scheduling_decisions.cpp index 167554eb6..c9a4b5b81 100644 --- a/tests/test/scheduler/test_scheduling_decisions.cpp +++ b/tests/test/scheduler/test_scheduling_decisions.cpp @@ -11,7 +11,14 @@ namespace tests { class SchedulingDecisionTestFixture : public SchedulerTestFixture { public: - SchedulingDecisionTestFixture() { faabric::util::setMockMode(true); } + SchedulingDecisionTestFixture() + { + faabric::util::setMockMode(true); + + std::shared_ptr fac = + std::make_shared(); + setExecutorFactory(fac); + } ~SchedulingDecisionTestFixture() { faabric::util::setMockMode(false); } @@ -63,25 +70,32 @@ class SchedulingDecisionTestFixture : public SchedulerTestFixture std::shared_ptr req, const SchedulingConfig& config) { - auto& sch = getScheduler(); faabric::util::SchedulingDecision actualDecision(appId, groupId); // Set resources for all hosts setHostResources(config.hosts, config.slots); - // The first time we request the scheduling decision, we will follow the + // The first time we run the batch request, we will follow the // unregistered hosts path - actualDecision = sch.publicMakeSchedulingDecision( - req, config.forceLocal, config.topologyHint); + actualDecision = + sch.callFunctions(req, config.forceLocal, config.topologyHint); REQUIRE(actualDecision.hosts == config.expectedHosts); + // We wait for the execution to finish and the scheduler to vacate + // the slots. We can't wait on the function result, as sometimes + // functions won't be executed at all (e.g. master running out of + // resources). + SLEEP_MS(100); + // Set resources again to reset the used slots + auto reqCopy = + faabric::util::batchExecFactory("foo", "baz", req->messages_size()); setHostResources(config.hosts, config.slots); - // The second time we request the scheduling decision, we will follow + // The second time we run the batch request, we will follow // the registered hosts path - actualDecision = sch.publicMakeSchedulingDecision( - req, config.forceLocal, config.topologyHint); + actualDecision = + sch.callFunctions(reqCopy, config.forceLocal, config.topologyHint); REQUIRE(actualDecision.hosts == config.expectedHosts); } }; diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index ebd5c60f8..e19973b94 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -312,4 +312,36 @@ class PointToPointClientServerFixture faabric::transport::PointToPointClient cli; faabric::transport::PointToPointServer server; }; + +class TestExecutor final : public faabric::scheduler::Executor +{ + public: + TestExecutor(faabric::Message& msg); + + ~TestExecutor(); + + uint8_t* dummyMemory = nullptr; + + size_t dummyMemorySize = 0; + + void postFinish() override; + + void reset(faabric::Message& msg) override; + + void restore(faabric::Message& msg) override; + + faabric::util::SnapshotData snapshot() override; + + int32_t executeTask( + int threadPoolIdx, + int msgIdx, + std::shared_ptr reqOrig) override; +}; + +class TestExecutorFactory : public faabric::scheduler::ExecutorFactory +{ + protected: + std::shared_ptr createExecutor( + faabric::Message& msg) override; +}; } From 941804e3aa5a4001e3ca6933ee4f319b72ebcfe4 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Nov 2021 16:20:24 +0000 Subject: [PATCH 5/9] add check for the recorded messages --- .../scheduler/test_scheduling_decisions.cpp | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/test/scheduler/test_scheduling_decisions.cpp b/tests/test/scheduler/test_scheduling_decisions.cpp index c9a4b5b81..bc812dc9b 100644 --- a/tests/test/scheduler/test_scheduling_decisions.cpp +++ b/tests/test/scheduler/test_scheduling_decisions.cpp @@ -64,6 +64,29 @@ class SchedulingDecisionTestFixture : public SchedulerTestFixture } } + void checkRecordedBatchMessages( + faabric::util::SchedulingDecision actualDecision, + const SchedulingConfig& config) + { + auto batchMessages = faabric::scheduler::getBatchRequests(); + + // First, turn our expected list of hosts to a map with frequency count + // and exclude the master host as no message is sent + std::map expectedHostCount; + for (const auto& h : config.expectedHosts) { + if (h != masterHost) { + ++expectedHostCount[h]; + } + } + + // Then check that the count matches the size of the batch sent + for (const auto& hostReqPair : batchMessages) { + REQUIRE(expectedHostCount.contains(hostReqPair.first)); + REQUIRE(expectedHostCount.at(hostReqPair.first) == + hostReqPair.second->messages_size()); + } + } + // We test the scheduling decision twice: the first one will follow the // unregistered hosts path, the second one the registerd hosts one. void testActualSchedulingDecision( @@ -80,6 +103,7 @@ class SchedulingDecisionTestFixture : public SchedulerTestFixture actualDecision = sch.callFunctions(req, config.forceLocal, config.topologyHint); REQUIRE(actualDecision.hosts == config.expectedHosts); + checkRecordedBatchMessages(actualDecision, config); // We wait for the execution to finish and the scheduler to vacate // the slots. We can't wait on the function result, as sometimes @@ -97,6 +121,7 @@ class SchedulingDecisionTestFixture : public SchedulerTestFixture actualDecision = sch.callFunctions(reqCopy, config.forceLocal, config.topologyHint); REQUIRE(actualDecision.hosts == config.expectedHosts); + checkRecordedBatchMessages(actualDecision, config); } }; From 1a91cfc49f951da275f331c9813916bd50684ce6 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Nov 2021 16:23:04 +0000 Subject: [PATCH 6/9] refactor hint from PAIRS to NEVER_ALONE --- include/faabric/util/scheduling.h | 7 ++++--- src/scheduler/Scheduler.cpp | 4 ++-- tests/test/scheduler/test_scheduling_decisions.cpp | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/include/faabric/util/scheduling.h b/include/faabric/util/scheduling.h index 1c84a453e..db9aa3c86 100644 --- a/include/faabric/util/scheduling.h +++ b/include/faabric/util/scheduling.h @@ -46,11 +46,12 @@ class SchedulingDecision // requests in a batch. // - NORMAL: bin-packs requests to slots in hosts starting from the master // host, and overloadds the master if it runs out of resources. -// - PAIRS: never allocates a single (non-master) request to a host without -// other requests of the batch. +// - NEVER_ALONE: never allocates a single (non-master) request to a host +// without +// other requests of the batch. enum SchedulingTopologyHint { NORMAL, - PAIRS + NEVER_ALONE }; } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index bc76c9206..ab9b4330e 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -304,7 +304,7 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( // requests of the batch bool stickToPreviousHost = (topologyHint == - faabric::util::SchedulingTopologyHint::PAIRS && + faabric::util::SchedulingTopologyHint::NEVER_ALONE && nOnThisHost == 1 && hosts.size() > 0); if (stickToPreviousHost) { @@ -340,7 +340,7 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( bool stickToPreviousHost = (topologyHint == - faabric::util::SchedulingTopologyHint::PAIRS && + faabric::util::SchedulingTopologyHint::NEVER_ALONE && nOnThisHost == 1 && hosts.size() > 0); if (stickToPreviousHost) { diff --git a/tests/test/scheduler/test_scheduling_decisions.cpp b/tests/test/scheduler/test_scheduling_decisions.cpp index bc812dc9b..36288e945 100644 --- a/tests/test/scheduler/test_scheduling_decisions.cpp +++ b/tests/test/scheduler/test_scheduling_decisions.cpp @@ -259,7 +259,7 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, .slots = { 1, 1 }, .numReqs = 2, .forceLocal = false, - .topologyHint = faabric::util::SchedulingTopologyHint::PAIRS, + .topologyHint = faabric::util::SchedulingTopologyHint::NEVER_ALONE, .expectedHosts = { masterHost, "hostA" }, }; From 79862abac7241a282a8c903a0b5fc83192ee72ae Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Nov 2021 16:40:00 +0000 Subject: [PATCH 7/9] set force local as a topology hint --- include/faabric/scheduler/Scheduler.h | 2 -- include/faabric/util/scheduling.h | 1 + src/scheduler/FunctionCallServer.cpp | 2 +- src/scheduler/Scheduler.cpp | 19 ++++++++++-------- tests/test/scheduler/test_executor.cpp | 16 ++++++++++----- tests/test/scheduler/test_scheduler.cpp | 10 +++++++++- .../scheduler/test_scheduling_decisions.cpp | 20 ++++++------------- 7 files changed, 39 insertions(+), 31 deletions(-) diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 6bdc92a62..f1355f02b 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -102,7 +102,6 @@ class Scheduler faabric::util::SchedulingDecision callFunctions( std::shared_ptr req, - bool forceLocal = false, faabric::util::SchedulingTopologyHint = faabric::util::SchedulingTopologyHint::NORMAL); @@ -235,7 +234,6 @@ class Scheduler faabric::util::SchedulingDecision makeSchedulingDecision( std::shared_ptr req, - bool forceLocal, faabric::util::SchedulingTopologyHint topologyHint); faabric::util::SchedulingDecision doCallFunctions( diff --git a/include/faabric/util/scheduling.h b/include/faabric/util/scheduling.h index db9aa3c86..1ef289331 100644 --- a/include/faabric/util/scheduling.h +++ b/include/faabric/util/scheduling.h @@ -52,6 +52,7 @@ class SchedulingDecision enum SchedulingTopologyHint { NORMAL, + FORCE_LOCAL, NEVER_ALONE }; } diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index e8b6b35e0..2e0604baf 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -77,7 +77,7 @@ void FunctionCallServer::recvExecuteFunctions(const uint8_t* buffer, // This host has now been told to execute these functions no matter what // TODO - avoid this copy scheduler.callFunctions(std::make_shared(msg), - true); + faabric::util::SchedulingTopologyHint::FORCE_LOCAL); } void FunctionCallServer::recvUnregister(const uint8_t* buffer, diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index ab9b4330e..15000193d 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -210,7 +210,6 @@ void Scheduler::notifyExecutorShutdown(Executor* exec, faabric::util::SchedulingDecision Scheduler::callFunctions( std::shared_ptr req, - bool forceLocal, faabric::util::SchedulingTopologyHint topologyHint) { // Note, we assume all the messages are for the same function and have the @@ -225,7 +224,8 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( // If we're not the master host, we need to forward the request back to the // master host. This will only happen if a nested batch execution happens. - if (!forceLocal && masterHost != thisHost) { + if (topologyHint != faabric::util::SchedulingTopologyHint::FORCE_LOCAL && + masterHost != thisHost) { std::string funcStr = faabric::util::funcToString(firstMsg, false); SPDLOG_DEBUG("Forwarding {} back to master {}", funcStr, masterHost); @@ -237,13 +237,13 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( faabric::util::FullLock lock(mx); - SchedulingDecision decision = - makeSchedulingDecision(req, forceLocal, topologyHint); + SchedulingDecision decision = makeSchedulingDecision(req, topologyHint); // Send out point-to-point mappings if necessary (unless being forced to // execute locally, in which case they will be transmitted from the // master) - if (!forceLocal && (firstMsg.groupid() > 0)) { + if (topologyHint != faabric::util::SchedulingTopologyHint::FORCE_LOCAL && + (firstMsg.groupid() > 0)) { broker.setAndSendMappingsFromSchedulingDecision(decision); } @@ -253,7 +253,6 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( std::shared_ptr req, - bool forceLocal, faabric::util::SchedulingTopologyHint topologyHint) { int nMessages = req->messages_size(); @@ -261,7 +260,7 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( std::string funcStr = faabric::util::funcToString(firstMsg, false); std::vector hosts; - if (forceLocal) { + if (topologyHint == faabric::util::SchedulingTopologyHint::FORCE_LOCAL) { // We're forced to execute locally here so we do all the messages for (int i = 0; i < nMessages; i++) { hosts.push_back(thisHost); @@ -660,7 +659,11 @@ void Scheduler::callFunction(faabric::Message& msg, bool forceLocal) req->set_type(req->FUNCTIONS); // Make the call - callFunctions(req, forceLocal); + if (forceLocal) { + callFunctions(req, faabric::util::SchedulingTopologyHint::FORCE_LOCAL); + } else { + callFunctions(req); + } } void Scheduler::clearRecordedMessages() diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 43d33736b..9c79ebff3 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -281,8 +281,14 @@ class TestExecutorFixture { conf.overrideCpuCount = 10; conf.boundTimeout = SHORT_TEST_TIMEOUT_MS; + faabric::util::SchedulingTopologyHint topologyHint = + faabric::util::SchedulingTopologyHint::NORMAL; - return sch.callFunctions(req, forceLocal).hosts; + if (forceLocal) { + topologyHint = faabric::util::SchedulingTopologyHint::FORCE_LOCAL; + } + + return sch.callFunctions(req, topologyHint).hosts; } private: @@ -544,9 +550,9 @@ TEST_CASE_METHOD(TestExecutorFixture, conf.boundTimeout = SHORT_TEST_TIMEOUT_MS; // Execute all the functions - sch.callFunctions(reqA, false); - sch.callFunctions(reqB, false); - sch.callFunctions(reqC, false); + sch.callFunctions(reqA); + sch.callFunctions(reqB); + sch.callFunctions(reqC); faabric::Message resA1 = sch.getFunctionResult(reqA->messages().at(0).id(), SHORT_TEST_TIMEOUT_MS); @@ -830,7 +836,7 @@ TEST_CASE_METHOD(TestExecutorFixture, } // Call functions and force to execute locally - sch.callFunctions(req, true); + sch.callFunctions(req, faabric::util::SchedulingTopologyHint::FORCE_LOCAL); // As we're faking a non-master execution results will be sent back to // the fake master so we can't wait on them, thus have to sleep diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 69c683119..861db0af8 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -932,9 +932,17 @@ TEST_CASE_METHOD(DummyExecutorFixture, expectedDecision.addMessage(expectedHosts.at(i), req->messages().at(i)); } + // Set topology hint + faabric::util::SchedulingTopologyHint topologyHint = + faabric::util::SchedulingTopologyHint::NORMAL; + + if (forceLocal) { + topologyHint = faabric::util::SchedulingTopologyHint::FORCE_LOCAL; + } + // Schedule and check decision faabric::util::SchedulingDecision actualDecision = - sch.callFunctions(req, forceLocal); + sch.callFunctions(req, topologyHint); checkSchedulingDecisionEquality(expectedDecision, actualDecision); // Check mappings set up locally or not diff --git a/tests/test/scheduler/test_scheduling_decisions.cpp b/tests/test/scheduler/test_scheduling_decisions.cpp index 36288e945..9e20438a9 100644 --- a/tests/test/scheduler/test_scheduling_decisions.cpp +++ b/tests/test/scheduler/test_scheduling_decisions.cpp @@ -33,7 +33,6 @@ class SchedulingDecisionTestFixture : public SchedulerTestFixture std::vector hosts; std::vector slots; int numReqs; - bool forceLocal; faabric::util::SchedulingTopologyHint topologyHint; std::vector expectedHosts; }; @@ -100,8 +99,7 @@ class SchedulingDecisionTestFixture : public SchedulerTestFixture // The first time we run the batch request, we will follow the // unregistered hosts path - actualDecision = - sch.callFunctions(req, config.forceLocal, config.topologyHint); + actualDecision = sch.callFunctions(req, config.topologyHint); REQUIRE(actualDecision.hosts == config.expectedHosts); checkRecordedBatchMessages(actualDecision, config); @@ -118,8 +116,7 @@ class SchedulingDecisionTestFixture : public SchedulerTestFixture // The second time we run the batch request, we will follow // the registered hosts path - actualDecision = - sch.callFunctions(reqCopy, config.forceLocal, config.topologyHint); + actualDecision = sch.callFunctions(reqCopy, config.topologyHint); REQUIRE(actualDecision.hosts == config.expectedHosts); checkRecordedBatchMessages(actualDecision, config); } @@ -133,7 +130,6 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, .hosts = { masterHost, "hostA" }, .slots = { 1, 1 }, .numReqs = 2, - .forceLocal = false, .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, .expectedHosts = { masterHost, "hostA" }, }; @@ -151,7 +147,6 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, .hosts = { masterHost, "hostA" }, .slots = { 1, 1 }, .numReqs = 3, - .forceLocal = false, .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, .expectedHosts = { masterHost, "hostA", masterHost }, }; @@ -169,8 +164,7 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, .hosts = { masterHost, "hostA" }, .slots = { 1, 1 }, .numReqs = 2, - .forceLocal = false, - .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .topologyHint = faabric::util::SchedulingTopologyHint::FORCE_LOCAL, .expectedHosts = { masterHost, "hostA" }, }; @@ -178,13 +172,14 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, SECTION("Force local off") { - config.forceLocal = false; + config.topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, config.expectedHosts = { masterHost, "hostA" }; } SECTION("Force local on") { - config.forceLocal = true; + config.topologyHint = + faabric::util::SchedulingTopologyHint::FORCE_LOCAL, config.expectedHosts = { masterHost, masterHost }; } @@ -199,7 +194,6 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, .hosts = { masterHost, "hostA" }, .slots = { 0, 2 }, .numReqs = 2, - .forceLocal = false, .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, .expectedHosts = { "hostA", "hostA" }, }; @@ -217,7 +211,6 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, .hosts = { masterHost, "hostA", "hostB", "hostC" }, .slots = { 0, 0, 0, 0 }, .numReqs = 8, - .forceLocal = false, .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, .expectedHosts = { masterHost, masterHost, masterHost, masterHost }, }; @@ -258,7 +251,6 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, .hosts = { masterHost, "hostA" }, .slots = { 1, 1 }, .numReqs = 2, - .forceLocal = false, .topologyHint = faabric::util::SchedulingTopologyHint::NEVER_ALONE, .expectedHosts = { masterHost, "hostA" }, }; From 93fedb72a06ba7f27f08ef3ea555ff104bb320a6 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Nov 2021 17:25:15 +0000 Subject: [PATCH 8/9] change overloading logic as discussed offline --- src/scheduler/Scheduler.cpp | 68 ++++++++++--------- .../scheduler/test_scheduling_decisions.cpp | 63 ++++++++++++++--- 2 files changed, 90 insertions(+), 41 deletions(-) diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 15000193d..09bad3d56 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -298,20 +298,16 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( int available = r.slots() - r.usedslots(); int nOnThisHost = std::min(available, remainder); - // Under the pairs topology hint, we never allocate a single - // non-master request (id != 0) to a host without other - // requests of the batch - bool stickToPreviousHost = - (topologyHint == - faabric::util::SchedulingTopologyHint::NEVER_ALONE && - nOnThisHost == 1 && hosts.size() > 0); - - if (stickToPreviousHost) { - hosts.push_back(hosts.back()); - } else { - for (int i = 0; i < nOnThisHost; i++) { - hosts.push_back(h); - } + // Under the NEVER_ALONE topology hint, we never choose a host + // unless we can schedule at least two requests in it. + if (topologyHint == + faabric::util::SchedulingTopologyHint::NEVER_ALONE && + nOnThisHost < 2) { + continue; + } + + for (int i = 0; i < nOnThisHost; i++) { + hosts.push_back(h); } remainder -= nOnThisHost; @@ -337,22 +333,19 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( int available = r.slots() - r.usedslots(); int nOnThisHost = std::min(available, remainder); - bool stickToPreviousHost = - (topologyHint == - faabric::util::SchedulingTopologyHint::NEVER_ALONE && - nOnThisHost == 1 && hosts.size() > 0); + if (topologyHint == + faabric::util::SchedulingTopologyHint::NEVER_ALONE && + nOnThisHost < 2) { + continue; + } - if (stickToPreviousHost) { - hosts.push_back(hosts.back()); - } else { - // Register the host if it's exected a function - if (nOnThisHost > 0) { - registeredHosts[funcStr].insert(h); - } + // Register the host if it's exected a function + if (nOnThisHost > 0) { + registeredHosts[funcStr].insert(h); + } - for (int i = 0; i < nOnThisHost; i++) { - hosts.push_back(h); - } + for (int i = 0; i < nOnThisHost; i++) { + hosts.push_back(h); } remainder -= nOnThisHost; @@ -365,11 +358,24 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( // At this point there's no more capacity in the system, so we // just need to overload locally if (remainder > 0) { - SPDLOG_DEBUG( - "Overloading {}/{} {} locally", remainder, nMessages, funcStr); + std::string overloadedHost = thisHost; + + if (topologyHint == + faabric::util::SchedulingTopologyHint::NEVER_ALONE && + hosts.size() > 0) { + overloadedHost = hosts.back(); + } + + SPDLOG_DEBUG("Overloading {}/{} {} {}", + remainder, + nMessages, + funcStr, + overloadedHost == thisHost + ? "locally" + : "to host " + overloadedHost); for (int i = 0; i < remainder; i++) { - hosts.push_back(thisHost); + hosts.push_back(overloadedHost); } } } diff --git a/tests/test/scheduler/test_scheduling_decisions.cpp b/tests/test/scheduler/test_scheduling_decisions.cpp index 9e20438a9..a016698bb 100644 --- a/tests/test/scheduler/test_scheduling_decisions.cpp +++ b/tests/test/scheduler/test_scheduling_decisions.cpp @@ -203,6 +203,34 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, testActualSchedulingDecision(req, config); } +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test scheduling decision skips fully occupied worker hosts", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA", "hostB" }, + .slots = { 2, 0, 2 }, + .numReqs = 4, + .topologyHint = faabric::util::SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, masterHost, "hostB", "hostB" }, + }; + + auto req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + + SECTION("No topology hint") + { + config.topologyHint = faabric::util::SchedulingTopologyHint::NORMAL; + } + + SECTION("Never alone topology hint") + { + config.topologyHint = + faabric::util::SchedulingTopologyHint::NEVER_ALONE; + } + + testActualSchedulingDecision(req, config); +} + TEST_CASE_METHOD(SchedulingDecisionTestFixture, "Test scheduling decision with many requests", "[scheduler]") @@ -240,6 +268,28 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, }; } + SECTION("Decreasing to one and increasing slot distribution") + { + config.slots = { 2, 2, 1, 2 }; + + SECTION("No topology hint") + { + config.topologyHint = faabric::util::SchedulingTopologyHint::NORMAL; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", + "hostB", "hostC", "hostC", masterHost + }; + } + + SECTION("Never alone topology hint") + { + config.topologyHint = + faabric::util::SchedulingTopologyHint::NEVER_ALONE; + config.expectedHosts = { masterHost, masterHost, "hostA", "hostA", + "hostC", "hostC", "hostC", "hostC" }; + } + } + testActualSchedulingDecision(req, config); } @@ -263,14 +313,6 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); } - SECTION("Test hint does not apply for master requests") - { - config.slots = { 0, 1 }; - config.numReqs = 1; - config.expectedHosts = { "hostA" }; - req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); - } - SECTION("Test with hint we may overload remote hosts") { config.hosts = { masterHost, "hostA", "hostB" }; @@ -282,13 +324,14 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); } - SECTION("Test with hint we still overload master if running out of slots") + SECTION( + "Test with hint we still overload correctly if running out of slots") { config.hosts = { masterHost, "hostA" }; config.numReqs = 5; config.slots = { 2, 2 }; config.expectedHosts = { - masterHost, masterHost, "hostA", "hostA", masterHost + masterHost, masterHost, "hostA", "hostA", "hostA" }; req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); } From a325bc6abdb9134ab410e1da4f0ed129a3039cc0 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 25 Nov 2021 09:54:39 +0000 Subject: [PATCH 9/9] pr comments --- include/faabric/util/scheduling.h | 3 +-- src/scheduler/Scheduler.cpp | 4 +++- tests/test/scheduler/test_executor.cpp | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/include/faabric/util/scheduling.h b/include/faabric/util/scheduling.h index 1ef289331..0f2a6a64d 100644 --- a/include/faabric/util/scheduling.h +++ b/include/faabric/util/scheduling.h @@ -47,8 +47,7 @@ class SchedulingDecision // - NORMAL: bin-packs requests to slots in hosts starting from the master // host, and overloadds the master if it runs out of resources. // - NEVER_ALONE: never allocates a single (non-master) request to a host -// without -// other requests of the batch. +// without other requests of the batch. enum SchedulingTopologyHint { NORMAL, diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 09bad3d56..97f50d556 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -360,9 +360,11 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( if (remainder > 0) { std::string overloadedHost = thisHost; + // Under the NEVER_ALONE scheduling topology hint we want to + // overload the last host we assigned requests to. if (topologyHint == faabric::util::SchedulingTopologyHint::NEVER_ALONE && - hosts.size() > 0) { + !hosts.empty()) { overloadedHost = hosts.back(); } diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 9c79ebff3..5ef4f1ef5 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -33,7 +33,7 @@ TestExecutor::TestExecutor(faabric::Message& msg) : Executor(msg) {} -TestExecutor::~TestExecutor() {} +TestExecutor::~TestExecutor() = default; void TestExecutor::postFinish() {