Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Scheduling Topology Hints #180

Merged
merged 9 commits into from
Nov 25, 2021
5 changes: 3 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class Scheduler

faabric::util::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal = false);
faabric::util::SchedulingTopologyHint =
faabric::util::SchedulingTopologyHint::NORMAL);
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved

faabric::util::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
Expand Down Expand Up @@ -233,7 +234,7 @@ class Scheduler

faabric::util::SchedulingDecision makeSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal);
faabric::util::SchedulingTopologyHint topologyHint);

faabric::util::SchedulingDecision doCallFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
Expand Down
13 changes: 13 additions & 0 deletions include/faabric/util/scheduling.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,17 @@ class SchedulingDecision
int32_t appIdx,
int32_t groupIdx);
};

// Scheduling topology hints help the scheduler decide which host to assign new
// requests in a batch.
// - NORMAL: bin-packs requests to slots in hosts starting from the master
// host, and overloadds the master if it runs out of resources.
// - NEVER_ALONE: never allocates a single (non-master) request to a host
// without other requests of the batch.
enum SchedulingTopologyHint
{
NORMAL,
FORCE_LOCAL,
NEVER_ALONE
};
}
2 changes: 1 addition & 1 deletion src/scheduler/FunctionCallServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void FunctionCallServer::recvExecuteFunctions(const uint8_t* buffer,
// This host has now been told to execute these functions no matter what
// TODO - avoid this copy
scheduler.callFunctions(std::make_shared<faabric::BatchExecuteRequest>(msg),
true);
faabric::util::SchedulingTopologyHint::FORCE_LOCAL);
}

void FunctionCallServer::recvUnregister(const uint8_t* buffer,
Expand Down
55 changes: 45 additions & 10 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void Scheduler::notifyExecutorShutdown(Executor* exec,

faabric::util::SchedulingDecision Scheduler::callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal)
faabric::util::SchedulingTopologyHint topologyHint)
{
// Note, we assume all the messages are for the same function and have the
// same master host
Expand All @@ -224,7 +224,8 @@ faabric::util::SchedulingDecision Scheduler::callFunctions(

// If we're not the master host, we need to forward the request back to the
// master host. This will only happen if a nested batch execution happens.
if (!forceLocal && masterHost != thisHost) {
if (topologyHint != faabric::util::SchedulingTopologyHint::FORCE_LOCAL &&
masterHost != thisHost) {
std::string funcStr = faabric::util::funcToString(firstMsg, false);
SPDLOG_DEBUG("Forwarding {} back to master {}", funcStr, masterHost);

Expand All @@ -236,12 +237,13 @@ faabric::util::SchedulingDecision Scheduler::callFunctions(

faabric::util::FullLock lock(mx);

SchedulingDecision decision = makeSchedulingDecision(req, forceLocal);
SchedulingDecision decision = makeSchedulingDecision(req, topologyHint);

// Send out point-to-point mappings if necessary (unless being forced to
// execute locally, in which case they will be transmitted from the
// master)
if (!forceLocal && (firstMsg.groupid() > 0)) {
if (topologyHint != faabric::util::SchedulingTopologyHint::FORCE_LOCAL &&
(firstMsg.groupid() > 0)) {
broker.setAndSendMappingsFromSchedulingDecision(decision);
}

Expand All @@ -251,14 +253,14 @@ faabric::util::SchedulingDecision Scheduler::callFunctions(

faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal)
faabric::util::SchedulingTopologyHint topologyHint)
{
int nMessages = req->messages_size();
faabric::Message& firstMsg = req->mutable_messages()->at(0);
std::string funcStr = faabric::util::funcToString(firstMsg, false);

std::vector<std::string> hosts;
if (forceLocal) {
if (topologyHint == faabric::util::SchedulingTopologyHint::FORCE_LOCAL) {
// We're forced to execute locally here so we do all the messages
for (int i = 0; i < nMessages; i++) {
hosts.push_back(thisHost);
Expand Down Expand Up @@ -296,6 +298,14 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision(
int available = r.slots() - r.usedslots();
int nOnThisHost = std::min(available, remainder);

// Under the NEVER_ALONE topology hint, we never choose a host
// unless we can schedule at least two requests in it.
if (topologyHint ==
faabric::util::SchedulingTopologyHint::NEVER_ALONE &&
nOnThisHost < 2) {
continue;
}

for (int i = 0; i < nOnThisHost; i++) {
hosts.push_back(h);
}
Expand Down Expand Up @@ -323,6 +333,12 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision(
int available = r.slots() - r.usedslots();
int nOnThisHost = std::min(available, remainder);

if (topologyHint ==
faabric::util::SchedulingTopologyHint::NEVER_ALONE &&
nOnThisHost < 2) {
continue;
}

// Register the host if it's exected a function
if (nOnThisHost > 0) {
registeredHosts[funcStr].insert(h);
Expand All @@ -342,11 +358,26 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision(
// At this point there's no more capacity in the system, so we
// just need to overload locally
if (remainder > 0) {
SPDLOG_DEBUG(
"Overloading {}/{} {} locally", remainder, nMessages, funcStr);
std::string overloadedHost = thisHost;

// Under the NEVER_ALONE scheduling topology hint we want to
// overload the last host we assigned requests to.
if (topologyHint ==
faabric::util::SchedulingTopologyHint::NEVER_ALONE &&
!hosts.empty()) {
overloadedHost = hosts.back();
}
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved

SPDLOG_DEBUG("Overloading {}/{} {} {}",
remainder,
nMessages,
funcStr,
overloadedHost == thisHost
? "locally"
: "to host " + overloadedHost);

for (int i = 0; i < remainder; i++) {
hosts.push_back(thisHost);
hosts.push_back(overloadedHost);
}
}
}
Expand Down Expand Up @@ -636,7 +667,11 @@ void Scheduler::callFunction(faabric::Message& msg, bool forceLocal)
req->set_type(req->FUNCTIONS);

// Make the call
callFunctions(req, forceLocal);
if (forceLocal) {
callFunctions(req, faabric::util::SchedulingTopologyHint::FORCE_LOCAL);
} else {
callFunctions(req);
}
}

void Scheduler::clearRecordedMessages()
Expand Down
Loading