Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialise point-to-point mappings from scheduler output (groundwork) #160

Merged
merged 14 commits into from
Oct 25, 2021
5 changes: 3 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <faabric/util/config.h>
#include <faabric/util/func.h>
#include <faabric/util/queue.h>
#include <faabric/util/scheduling.h>
#include <faabric/util/snapshot.h>
#include <faabric/util/timing.h>

Expand Down Expand Up @@ -99,7 +100,7 @@ class Scheduler

void callFunction(faabric::Message& msg, bool forceLocal = false);

std::vector<std::string> callFunctions(
faabric::util::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal = false);

Expand Down Expand Up @@ -233,7 +234,7 @@ class Scheduler
int scheduleFunctionsOnHost(
const std::string& host,
std::shared_ptr<faabric::BatchExecuteRequest> req,
std::vector<std::string>& records,
faabric::util::SchedulingDecision& decision,
int offset,
faabric::util::SnapshotData* snapshot);
};
Expand Down
13 changes: 10 additions & 3 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <faabric/scheduler/Scheduler.h>
#include <faabric/transport/PointToPointClient.h>
#include <faabric/util/scheduling.h>

#include <set>
#include <shared_mutex>
Expand All @@ -17,11 +18,13 @@ class PointToPointBroker

std::string getHostForReceiver(int appId, int recvIdx);

void setHostForReceiver(int appId, int recvIdx, const std::string& host);
std::set<std::string> setUpLocalMappingsFromSchedulingDecision(
const faabric::util::SchedulingDecision& decision);

void broadcastMappings(int appId);
void setAndSendMappingsFromSchedulingDecision(
const faabric::util::SchedulingDecision& decision);

void sendMappings(int appId, const std::string& host);
void waitForMappingsOnThisHost(int appId);

std::set<int> getIdxsRegisteredForApp(int appId);

Expand All @@ -43,6 +46,10 @@ class PointToPointBroker
std::unordered_map<int, std::set<int>> appIdxs;
std::unordered_map<std::string, std::string> mappings;

std::unordered_map<int, bool> appMappingsFlags;
std::unordered_map<int, std::mutex> appMappingMutexes;
std::unordered_map<int, std::condition_variable> appMappingCvs;

std::shared_ptr<PointToPointClient> getClient(const std::string& host);

faabric::scheduler::Scheduler& sch;
Expand Down
35 changes: 35 additions & 0 deletions include/faabric/util/scheduling.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <cstdint>
#include <string>
#include <vector>

#include <faabric/proto/faabric.pb.h>

namespace faabric::util {

class SchedulingDecision
{
public:
static SchedulingDecision fromPointToPointMappings(
faabric::PointToPointMappings& mappings);

SchedulingDecision(uint32_t appIdIn);
uint32_t appId = 0;

int32_t nFunctions = 0;

std::vector<int32_t> messageIds;
std::vector<std::string> hosts;

std::vector<int32_t> appIdxs;

std::string returnHost;

void addMessage(const std::string& host, const faabric::Message& msg);

void addDecision(const std::string& host,
int32_t messageId,
int32_t appIdx);
};
}
11 changes: 7 additions & 4 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,13 @@ message PointToPointMessage {
}

message PointToPointMappings {
int32 appId = 1;

message PointToPointMapping {
int32 appId = 1;
int32 recvIdx = 2;
string host = 3;
string host = 1;
int32 messageId = 2;
int32 recvIdx = 3;
}
repeated PointToPointMapping mappings = 1;

repeated PointToPointMapping mappings = 2;
}
4 changes: 3 additions & 1 deletion src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <faabric/util/func.h>
#include <faabric/util/gids.h>
#include <faabric/util/macros.h>
#include <faabric/util/scheduling.h>
#include <faabric/util/testing.h>

// Each MPI rank runs in a separate thread, thus we use TLS to maintain the
Expand Down Expand Up @@ -199,7 +200,8 @@ void MpiWorld::create(const faabric::Message& call, int newId, int newSize)
std::vector<std::string> executedAt;
if (size > 1) {
// Send the init messages (note that message i corresponds to rank i+1)
executedAt = sch.callFunctions(req);
faabric::util::SchedulingDecision decision = sch.callFunctions(req);
executedAt = decision.hosts;
}
assert(executedAt.size() == size - 1);

Expand Down
28 changes: 16 additions & 12 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <faabric/util/logging.h>
#include <faabric/util/memory.h>
#include <faabric/util/random.h>
#include <faabric/util/scheduling.h>
#include <faabric/util/snapshot.h>
#include <faabric/util/testing.h>
#include <faabric/util/timing.h>
Expand Down Expand Up @@ -202,14 +203,13 @@ void Scheduler::notifyExecutorShutdown(Executor* exec,
}
}

std::vector<std::string> Scheduler::callFunctions(
faabric::util::SchedulingDecision Scheduler::callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
bool forceLocal)
{
// Extract properties of the request
int nMessages = req->messages_size();
bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
std::vector<std::string> executed(nMessages);

// Note, we assume all the messages are for the same function and have the
// same master host
Expand All @@ -222,6 +222,9 @@ std::vector<std::string> Scheduler::callFunctions(
throw std::runtime_error("Message with no master host");
}

// Set up scheduling decision
SchedulingDecision decision(firstMsg.appid());

// TODO - more granular locking, this is incredibly conservative
faabric::util::FullLock lock(mx);

Expand All @@ -233,14 +236,15 @@ std::vector<std::string> Scheduler::callFunctions(
"Forwarding {} {} back to master {}", nMessages, funcStr, masterHost);

getFunctionCallClient(masterHost).executeFunctions(req);
return executed;
decision.returnHost = masterHost;
return decision;
}

if (forceLocal) {
// We're forced to execute locally here so we do all the messages
for (int i = 0; i < nMessages; i++) {
localMessageIdxs.emplace_back(i);
executed.at(i) = thisHost;
decision.addMessage(thisHost, req->messages().at(i));
}
} else {
// At this point we know we're the master host, and we've not been
Expand Down Expand Up @@ -315,7 +319,7 @@ std::vector<std::string> Scheduler::callFunctions(
"Executing {}/{} {} locally", nLocally, nMessages, funcStr);
for (int i = 0; i < nLocally; i++) {
localMessageIdxs.emplace_back(i);
executed.at(i) = thisHost;
decision.addMessage(thisHost, req->messages().at(i));
}
}

Expand All @@ -325,7 +329,7 @@ std::vector<std::string> Scheduler::callFunctions(
// Schedule first to already registered hosts
for (const auto& h : thisRegisteredHosts) {
int nOnThisHost = scheduleFunctionsOnHost(
h, req, executed, offset, &snapshotData);
h, req, decision, offset, &snapshotData);

offset += nOnThisHost;
if (offset >= nMessages) {
Expand All @@ -347,7 +351,7 @@ std::vector<std::string> Scheduler::callFunctions(

// Schedule functions on the host
int nOnThisHost = scheduleFunctionsOnHost(
h, req, executed, offset, &snapshotData);
h, req, decision, offset, &snapshotData);

// Register the host if it's exected a function
if (nOnThisHost > 0) {
Expand All @@ -372,7 +376,7 @@ std::vector<std::string> Scheduler::callFunctions(

for (; offset < nMessages; offset++) {
localMessageIdxs.emplace_back(offset);
executed.at(offset) = thisHost;
decision.addMessage(thisHost, req->messages().at(offset));
}
}

Expand Down Expand Up @@ -438,7 +442,7 @@ std::vector<std::string> Scheduler::callFunctions(
// Records for tests
if (faabric::util::isTestMode()) {
for (int i = 0; i < nMessages; i++) {
std::string executedHost = executed.at(i);
std::string executedHost = decision.hosts.at(i);
faabric::Message msg = req->messages().at(i);

// Log results if in test mode
Expand All @@ -451,7 +455,7 @@ std::vector<std::string> Scheduler::callFunctions(
}
}

return executed;
return decision;
}

std::vector<std::string> Scheduler::getUnregisteredHosts(
Expand Down Expand Up @@ -498,7 +502,7 @@ void Scheduler::broadcastSnapshotDelete(const faabric::Message& msg,
int Scheduler::scheduleFunctionsOnHost(
const std::string& host,
std::shared_ptr<faabric::BatchExecuteRequest> req,
std::vector<std::string>& records,
SchedulingDecision& decision,
int offset,
faabric::util::SnapshotData* snapshot)
{
Expand Down Expand Up @@ -532,7 +536,7 @@ int Scheduler::scheduleFunctionsOnHost(
auto* newMsg = hostRequest->add_messages();
*newMsg = req->messages().at(i);
newMsg->set_executeslocally(false);
records.at(i) = host;
decision.addMessage(host, req->messages().at(i));
}

SPDLOG_DEBUG(
Expand Down
Loading