Skip to content

Commit

Permalink
Add Scheduling Topology Hints (#180)
Browse files Browse the repository at this point in the history
* add scheduling topology hint and extensive testing

* adding a couple more tests

* move tests to scheduler folder and change tags

* move to callFunctions and remove publicMakeSchedulingDecision

* add check for the recorded messages

* refactor hint from PAIRS to NEVER_ALONE

* set force local as a topology hint

* change overloading logic as discussed offline

* pr comments
  • Loading branch information
csegarragonz committed Nov 25, 2021
1 parent 4930c61 commit c7454fa
Show file tree
Hide file tree
Showing 8 changed files with 654 additions and 211 deletions.
5 changes: 3 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class Scheduler

faabric::util::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal = false);
faabric::util::SchedulingTopologyHint =
faabric::util::SchedulingTopologyHint::NORMAL);

faabric::util::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
Expand Down Expand Up @@ -233,7 +234,7 @@ class Scheduler

faabric::util::SchedulingDecision makeSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal);
faabric::util::SchedulingTopologyHint topologyHint);

faabric::util::SchedulingDecision doCallFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
Expand Down
13 changes: 13 additions & 0 deletions include/faabric/util/scheduling.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,17 @@ class SchedulingDecision
int32_t appIdx,
int32_t groupIdx);
};

// Scheduling topology hints help the scheduler decide which host to assign new
// requests in a batch.
// - NORMAL: bin-packs requests to slots in hosts starting from the master
// host, and overloadds the master if it runs out of resources.
// - NEVER_ALONE: never allocates a single (non-master) request to a host
// without other requests of the batch.
enum SchedulingTopologyHint
{
NORMAL,
FORCE_LOCAL,
NEVER_ALONE
};
}
2 changes: 1 addition & 1 deletion src/scheduler/FunctionCallServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void FunctionCallServer::recvExecuteFunctions(const uint8_t* buffer,
// This host has now been told to execute these functions no matter what
// TODO - avoid this copy
scheduler.callFunctions(std::make_shared<faabric::BatchExecuteRequest>(msg),
true);
faabric::util::SchedulingTopologyHint::FORCE_LOCAL);
}

void FunctionCallServer::recvUnregister(const uint8_t* buffer,
Expand Down
55 changes: 45 additions & 10 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void Scheduler::notifyExecutorShutdown(Executor* exec,

faabric::util::SchedulingDecision Scheduler::callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal)
faabric::util::SchedulingTopologyHint topologyHint)
{
// Note, we assume all the messages are for the same function and have the
// same master host
Expand All @@ -224,7 +224,8 @@ faabric::util::SchedulingDecision Scheduler::callFunctions(

// If we're not the master host, we need to forward the request back to the
// master host. This will only happen if a nested batch execution happens.
if (!forceLocal && masterHost != thisHost) {
if (topologyHint != faabric::util::SchedulingTopologyHint::FORCE_LOCAL &&
masterHost != thisHost) {
std::string funcStr = faabric::util::funcToString(firstMsg, false);
SPDLOG_DEBUG("Forwarding {} back to master {}", funcStr, masterHost);

Expand All @@ -236,12 +237,13 @@ faabric::util::SchedulingDecision Scheduler::callFunctions(

faabric::util::FullLock lock(mx);

SchedulingDecision decision = makeSchedulingDecision(req, forceLocal);
SchedulingDecision decision = makeSchedulingDecision(req, topologyHint);

// Send out point-to-point mappings if necessary (unless being forced to
// execute locally, in which case they will be transmitted from the
// master)
if (!forceLocal && (firstMsg.groupid() > 0)) {
if (topologyHint != faabric::util::SchedulingTopologyHint::FORCE_LOCAL &&
(firstMsg.groupid() > 0)) {
broker.setAndSendMappingsFromSchedulingDecision(decision);
}

Expand All @@ -251,14 +253,14 @@ faabric::util::SchedulingDecision Scheduler::callFunctions(

faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal)
faabric::util::SchedulingTopologyHint topologyHint)
{
int nMessages = req->messages_size();
faabric::Message& firstMsg = req->mutable_messages()->at(0);
std::string funcStr = faabric::util::funcToString(firstMsg, false);

std::vector<std::string> hosts;
if (forceLocal) {
if (topologyHint == faabric::util::SchedulingTopologyHint::FORCE_LOCAL) {
// We're forced to execute locally here so we do all the messages
for (int i = 0; i < nMessages; i++) {
hosts.push_back(thisHost);
Expand Down Expand Up @@ -296,6 +298,14 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision(
int available = r.slots() - r.usedslots();
int nOnThisHost = std::min(available, remainder);

// Under the NEVER_ALONE topology hint, we never choose a host
// unless we can schedule at least two requests in it.
if (topologyHint ==
faabric::util::SchedulingTopologyHint::NEVER_ALONE &&
nOnThisHost < 2) {
continue;
}

for (int i = 0; i < nOnThisHost; i++) {
hosts.push_back(h);
}
Expand Down Expand Up @@ -323,6 +333,12 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision(
int available = r.slots() - r.usedslots();
int nOnThisHost = std::min(available, remainder);

if (topologyHint ==
faabric::util::SchedulingTopologyHint::NEVER_ALONE &&
nOnThisHost < 2) {
continue;
}

// Register the host if it's exected a function
if (nOnThisHost > 0) {
registeredHosts[funcStr].insert(h);
Expand All @@ -342,11 +358,26 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision(
// At this point there's no more capacity in the system, so we
// just need to overload locally
if (remainder > 0) {
SPDLOG_DEBUG(
"Overloading {}/{} {} locally", remainder, nMessages, funcStr);
std::string overloadedHost = thisHost;

// Under the NEVER_ALONE scheduling topology hint we want to
// overload the last host we assigned requests to.
if (topologyHint ==
faabric::util::SchedulingTopologyHint::NEVER_ALONE &&
!hosts.empty()) {
overloadedHost = hosts.back();
}

SPDLOG_DEBUG("Overloading {}/{} {} {}",
remainder,
nMessages,
funcStr,
overloadedHost == thisHost
? "locally"
: "to host " + overloadedHost);

for (int i = 0; i < remainder; i++) {
hosts.push_back(thisHost);
hosts.push_back(overloadedHost);
}
}
}
Expand Down Expand Up @@ -636,7 +667,11 @@ void Scheduler::callFunction(faabric::Message& msg, bool forceLocal)
req->set_type(req->FUNCTIONS);

// Make the call
callFunctions(req, forceLocal);
if (forceLocal) {
callFunctions(req, faabric::util::SchedulingTopologyHint::FORCE_LOCAL);
} else {
callFunctions(req);
}
}

void Scheduler::clearRecordedMessages()
Expand Down
Loading

0 comments on commit c7454fa

Please sign in to comment.