diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 48da69b86..2f79e5df6 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -102,7 +102,9 @@ class Scheduler faabric::util::SchedulingDecision callFunctions( std::shared_ptr req, - bool forceLocal = false); + bool forceLocal = false, + faabric::util::SchedulingTopologyHint = + faabric::util::SchedulingTopologyHint::NORMAL); faabric::util::SchedulingDecision callFunctions( std::shared_ptr req, @@ -177,6 +179,11 @@ class Scheduler void clearRecordedMessages(); + faabric::util::SchedulingDecision publicMakeSchedulingDecision( + std::shared_ptr req, + bool forceLocal, + faabric::util::SchedulingTopologyHint topologyHint); + // ---------------------------------- // Exec graph // ---------------------------------- @@ -233,7 +240,8 @@ class Scheduler faabric::util::SchedulingDecision makeSchedulingDecision( std::shared_ptr req, - bool forceLocal); + bool forceLocal, + faabric::util::SchedulingTopologyHint topologyHint); faabric::util::SchedulingDecision doCallFunctions( std::shared_ptr req, diff --git a/include/faabric/util/scheduling.h b/include/faabric/util/scheduling.h index 4c8d8a5ce..1c84a453e 100644 --- a/include/faabric/util/scheduling.h +++ b/include/faabric/util/scheduling.h @@ -41,4 +41,16 @@ class SchedulingDecision int32_t appIdx, int32_t groupIdx); }; + +// Scheduling topology hints help the scheduler decide which host to assign new +// requests in a batch. +// - NORMAL: bin-packs requests to slots in hosts starting from the master +// host, and overloadds the master if it runs out of resources. +// - PAIRS: never allocates a single (non-master) request to a host without +// other requests of the batch. +enum SchedulingTopologyHint +{ + NORMAL, + PAIRS +}; } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 946700919..abb9e6477 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -210,7 +210,8 @@ void Scheduler::notifyExecutorShutdown(Executor* exec, faabric::util::SchedulingDecision Scheduler::callFunctions( std::shared_ptr req, - bool forceLocal) + bool forceLocal, + faabric::util::SchedulingTopologyHint topologyHint) { // Note, we assume all the messages are for the same function and have the // same master host @@ -236,7 +237,8 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( faabric::util::FullLock lock(mx); - SchedulingDecision decision = makeSchedulingDecision(req, forceLocal); + SchedulingDecision decision = + makeSchedulingDecision(req, forceLocal, topologyHint); // Send out point-to-point mappings if necessary (unless being forced to // execute locally, in which case they will be transmitted from the @@ -249,9 +251,22 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( return doCallFunctions(req, decision, lock); } +faabric::util::SchedulingDecision Scheduler::publicMakeSchedulingDecision( + std::shared_ptr req, + bool forceLocal, + faabric::util::SchedulingTopologyHint topologyHint) +{ + if (!faabric::util::isTestMode()) { + throw std::runtime_error("This function must only be called in tests"); + } + + return makeSchedulingDecision(req, forceLocal, topologyHint); +} + faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( std::shared_ptr req, - bool forceLocal) + bool forceLocal, + faabric::util::SchedulingTopologyHint topologyHint) { int nMessages = req->messages_size(); faabric::Message& firstMsg = req->mutable_messages()->at(0); @@ -296,8 +311,20 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( int available = r.slots() - r.usedslots(); int nOnThisHost = std::min(available, remainder); - for (int i = 0; i < nOnThisHost; i++) { - hosts.push_back(h); + // Under the pairs topology hint, we never allocate a single + // non-master request (id != 0) to a host without other + // requests of the batch + bool stickToPreviousHost = + (topologyHint == + faabric::util::SchedulingTopologyHint::PAIRS && + nOnThisHost == 1 && hosts.size() > 0); + + if (stickToPreviousHost) { + hosts.push_back(hosts.back()); + } else { + for (int i = 0; i < nOnThisHost; i++) { + hosts.push_back(h); + } } remainder -= nOnThisHost; @@ -323,13 +350,22 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( int available = r.slots() - r.usedslots(); int nOnThisHost = std::min(available, remainder); - // Register the host if it's exected a function - if (nOnThisHost > 0) { - registeredHosts[funcStr].insert(h); - } + bool stickToPreviousHost = + (topologyHint == + faabric::util::SchedulingTopologyHint::PAIRS && + nOnThisHost == 1 && hosts.size() > 0); - for (int i = 0; i < nOnThisHost; i++) { - hosts.push_back(h); + if (stickToPreviousHost) { + hosts.push_back(hosts.back()); + } else { + // Register the host if it's exected a function + if (nOnThisHost > 0) { + registeredHosts[funcStr].insert(h); + } + + for (int i = 0; i < nOnThisHost; i++) { + hosts.push_back(h); + } } remainder -= nOnThisHost; diff --git a/tests/test/util/test_scheduling.cpp b/tests/test/util/test_scheduling.cpp index d3b45be34..2d52b6208 100644 --- a/tests/test/util/test_scheduling.cpp +++ b/tests/test/util/test_scheduling.cpp @@ -3,6 +3,8 @@ #include "faabric_utils.h" #include "fixtures.h" +#include +#include #include #include @@ -10,7 +12,7 @@ using namespace faabric::util; namespace tests { -TEST_CASE("Test building scheduling decisions", "[util]") +TEST_CASE("Test building scheduling decisions", "[util][scheduling-decisions]") { int appId = 123; int groupId = 345; @@ -46,7 +48,7 @@ TEST_CASE("Test building scheduling decisions", "[util]") } TEST_CASE("Test converting point-to-point mappings to scheduling decisions", - "[util]") + "[util][scheduling-decisions]") { int appId = 123; int groupId = 345; @@ -93,4 +95,255 @@ TEST_CASE("Test converting point-to-point mappings to scheduling decisions", REQUIRE(actual.messageIds == expectedMessageIds); REQUIRE(actual.hosts == expectedHosts); } + +class SchedulingDecisionTestFixture + : public SchedulerTestFixture +{ + public: + SchedulingDecisionTestFixture() { faabric::util::setMockMode(true); } + + ~SchedulingDecisionTestFixture() { faabric::util::setMockMode(false); } + + protected: + int appId = 123; + int groupId = 456; + std::string masterHost = faabric::util::getSystemConfig().endpointHost; + + // Helper struct to configure one scheduling decision + struct SchedulingConfig + { + std::vector hosts; + std::vector slots; + int numReqs; + bool forceLocal; + SchedulingTopologyHint topologyHint; + std::vector expectedHosts; + }; + + // Helper method to set the available hosts and slots per host prior to + // making a scheduling decision + void setHostResources(std::vector registeredHosts, + std::vector slotsPerHost) + { + assert(registeredHosts.size() == slotsPerHost.size()); + auto& sch = faabric::scheduler::getScheduler(); + sch.clearRecordedMessages(); + + for (int i = 0; i < registeredHosts.size(); i++) { + faabric::HostResources resources; + resources.set_slots(slotsPerHost.at(i)); + resources.set_usedslots(0); + + sch.addHostToGlobalSet(registeredHosts.at(i)); + + // If setting resources for the master host, update the scheduler. + // Otherwise, queue the resource response + if (i == 0) { + sch.setThisHostResources(resources); + } else { + faabric::scheduler::queueResourceResponse(registeredHosts.at(i), + resources); + } + } + } + + // We test the scheduling decision twice: the first one will follow the + // unregistered hosts path, the second one the registerd hosts one. + void testActualSchedulingDecision( + std::shared_ptr req, + const SchedulingConfig& config) + { + auto& sch = faabric::scheduler::getScheduler(); + SchedulingDecision actualDecision(appId, groupId); + + // Set resources for all hosts + setHostResources(config.hosts, config.slots); + + // The first time we request the scheduling decision, we will follow the + // unregistered hosts path + actualDecision = sch.publicMakeSchedulingDecision( + req, config.forceLocal, config.topologyHint); + REQUIRE(actualDecision.hosts == config.expectedHosts); + + // Set resources again to reset the used slots + setHostResources(config.hosts, config.slots); + + // The second time we request the scheduling decision, we will follow + // the registered hosts path + actualDecision = sch.publicMakeSchedulingDecision( + req, config.forceLocal, config.topologyHint); + REQUIRE(actualDecision.hosts == config.expectedHosts); + } +}; + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test basic scheduling decision", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA" }, + }; + + auto req = batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test overloading all resources defaults to master", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 3, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA", masterHost }, + }; + + auto req = batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test force local forces executing at master", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, "hostA" }, + }; + + auto req = batchExecFactory("foo", "bar", config.numReqs); + + SECTION("Force local off") + { + config.forceLocal = false; + config.expectedHosts = { masterHost, "hostA" }; + } + + SECTION("Force local on") + { + config.forceLocal = true; + config.expectedHosts = { masterHost, masterHost }; + } + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test master running out of resources", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 0, 2 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::NORMAL, + .expectedHosts = { "hostA", "hostA" }, + }; + + auto req = batchExecFactory("foo", "bar", config.numReqs); + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test scheduling decision with many requests", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA", "hostB", "hostC" }, + .slots = { 0, 0, 0, 0 }, + .numReqs = 8, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::NORMAL, + .expectedHosts = { masterHost, masterHost, masterHost, masterHost }, + }; + + auto req = batchExecFactory("foo", "bar", config.numReqs); + + SECTION("Even slot distribution across hosts") + { + config.slots = { 2, 2, 2, 2 }; + config.expectedHosts = { masterHost, masterHost, "hostA", "hostA", + "hostB", "hostB", "hostC", "hostC" }; + } + + SECTION("Uneven slot ditribution across hosts") + { + config.slots = { 3, 2, 2, 1 }; + config.expectedHosts = { masterHost, masterHost, masterHost, "hostA", + "hostA", "hostB", "hostB", "hostC" }; + } + + SECTION("Very uneven slot distribution across hosts") + { + config.slots = { 1, 0, 0, 0 }; + config.expectedHosts = { + masterHost, masterHost, masterHost, masterHost, + masterHost, masterHost, masterHost, masterHost + }; + } + + testActualSchedulingDecision(req, config); +} + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test sticky pairs scheduling topology hint", + "[util][scheduling-decision]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 1, 1 }, + .numReqs = 2, + .forceLocal = false, + .topologyHint = SchedulingTopologyHint::PAIRS, + .expectedHosts = { masterHost, "hostA" }, + }; + + std::shared_ptr req; + + SECTION("Test with hint we only schedule to new hosts pairs of requests") + { + config.expectedHosts = { masterHost, masterHost }; + req = batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test with hint we may overload remote hosts") + { + config.hosts = { masterHost, "hostA", "hostB" }; + config.numReqs = 5; + config.slots = { 2, 2, 1 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", "hostA" + }; + req = batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test with hint we still overload master if running out of slots") + { + config.hosts = { masterHost, "hostA" }; + config.numReqs = 5; + config.slots = { 2, 2 }; + config.expectedHosts = { + masterHost, masterHost, "hostA", "hostA", masterHost + }; + req = batchExecFactory("foo", "bar", config.numReqs); + } + + testActualSchedulingDecision(req, config); +} }