Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push snapshot diffs before reset, and thread results after #126

Merged
merged 16 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class Executor

std::atomic<bool> claimed = false;

std::atomic<bool> pendingSnapshotPush = false;

std::atomic<int> executingTaskCount = 0;

std::mutex threadsMutex;
Expand Down Expand Up @@ -117,9 +119,9 @@ class Scheduler

void setThreadResult(const faabric::Message& msg, int32_t returnValue);

void setThreadResult(const faabric::Message& msg,
int32_t returnValue,
const std::vector<faabric::util::SnapshotDiff>& diffs);
void pushSnapshotDiffs(
const faabric::Message& msg,
const std::vector<faabric::util::SnapshotDiff>& diffs);

void setThreadResultLocally(uint32_t msgId, int32_t returnValue);

Expand Down
12 changes: 1 addition & 11 deletions include/faabric/snapshot/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ getSnapshotDiffPushes();

std::vector<std::pair<std::string, std::string>> getSnapshotDeletes();

std::vector<std::pair<std::string,
std::tuple<uint32_t,
int,
std::string,
std::vector<faabric::util::SnapshotDiff>>>>
std::vector<std::pair<std::string, std::pair<uint32_t, int>>>
getThreadResults();

void clearMockSnapshotRequests();
Expand All @@ -50,12 +46,6 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient

void pushThreadResult(uint32_t messageId, int returnValue);

void pushThreadResult(
uint32_t messageId,
int returnValue,
const std::string& snapshotKey,
const std::vector<faabric::util::SnapshotDiff>& diffs);

private:
void sendHeader(faabric::snapshot::SnapshotCalls call);
};
Expand Down
5 changes: 0 additions & 5 deletions include/faabric/snapshot/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,5 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
void recvDeleteSnapshot(const uint8_t* buffer, size_t bufferSize);

void recvThreadResult(const uint8_t* buffer, size_t bufferSize);

private:
void applyDiffsToSnapshot(
const std::string& snapshotKey,
const flatbuffers::Vector<flatbuffers::Offset<SnapshotDiffChunk>>* diffs);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was never implemented/ used

};
}
2 changes: 0 additions & 2 deletions src/flat/faabric.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,4 @@ table SnapshotDiffPushRequest {
table ThreadResultRequest {
message_id:int;
return_value:int;
key:string;
chunks:[SnapshotDiffChunk];
}
43 changes: 20 additions & 23 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ void Executor::executeTasks(std::vector<int> msgIdxs,

if (isSnapshot && !alreadyRestored) {
if ((!isMaster && isThreads) || !isThreads) {
SPDLOG_DEBUG(
"Performing snapshot restore {} [{}]", funcStr, snapshotKey);
SPDLOG_DEBUG("Restoring {} from snapshot {}", funcStr, snapshotKey);
lastSnapshot = snapshotKey;
restore(firstMsg);
} else {
Expand All @@ -124,6 +123,7 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
// Note this must be done after the restore has happened
if (isThreads && isSnapshot) {
faabric::util::resetDirtyTracking();
pendingSnapshotPush = true;
}

// Set executing task count
Expand Down Expand Up @@ -161,7 +161,7 @@ void Executor::threadPoolThread(int threadPoolIdx)
SPDLOG_DEBUG("Thread pool thread {}:{} starting up", id, threadPoolIdx);

auto& sch = faabric::scheduler::getScheduler();
auto& conf = faabric::util::getSystemConfig();
const auto& conf = faabric::util::getSystemConfig();

bool selfShutdown = false;

Expand Down Expand Up @@ -196,20 +196,24 @@ void Executor::threadPoolThread(int threadPoolIdx)
assert(req->messages_size() >= msgIdx + 1);
faabric::Message& msg = req->mutable_messages()->at(msgIdx);

SPDLOG_TRACE("Thread {}:{} executing task {} ({})",
bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
SPDLOG_TRACE("Thread {}:{} executing task {} ({}, thread={})",
id,
threadPoolIdx,
msgIdx,
msg.id());
msg.id(),
isThreads);

int32_t returnValue;
try {
returnValue = executeTask(threadPoolIdx, msgIdx, req);
} catch (const std::exception& ex) {
returnValue = 1;

msg.set_outputdata(fmt::format(
"Task {} threw exception. What: {}", msg.id(), ex.what()));
std::string errorMessage = fmt::format(
"Task {} threw exception. What: {}", msg.id(), ex.what());
SPDLOG_ERROR(errorMessage);
msg.set_outputdata(errorMessage);
}

// Set the return value
Expand All @@ -221,21 +225,21 @@ void Executor::threadPoolThread(int threadPoolIdx)
bool isLastTask = oldTaskCount == 1;

SPDLOG_TRACE("Task {} finished by thread {}:{} ({} left)",
msg.id(),
faabric::util::funcToString(msg, true),
id,
threadPoolIdx,
executingTaskCount);
oldTaskCount - 1);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logging wasn't thread-safe by accessing executingTaskCount directly.


// Get snapshot diffs _before_ we reset the executor
bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
std::vector<faabric::util::SnapshotDiff> diffs;
if (isLastTask && isThreads) {
// Handle snapshot diffs _before_ we reset the executor
if (isLastTask && pendingSnapshotPush) {
// Get diffs
faabric::util::SnapshotData d = snapshot();
diffs = d.getDirtyPages();
std::vector<faabric::util::SnapshotDiff> diffs = d.getDirtyPages();
sch.pushSnapshotDiffs(msg, diffs);

// Reset dirty page tracking now that we've got the diffs
// Reset dirty page tracking now that we've pushed the diffs
faabric::util::resetDirtyTracking();
pendingSnapshotPush = false;
}

// If this batch is finished, reset the executor and release its claim.
Expand All @@ -255,14 +259,7 @@ void Executor::threadPoolThread(int threadPoolIdx)
// on its result to continue execution, therefore must be done once the
// executor has been reset, otherwise the executor may not be reused for
// a repeat invocation.
if (isLastTask && isThreads) {
// Send diffs along with thread result
SPDLOG_DEBUG("Task {} finished, returning {} snapshot diffs",
msg.id(),
diffs.size());

sch.setThreadResult(msg, returnValue, diffs);
} else if (isThreads) {
if (isThreads) {
// Set non-final thread result
sch.setThreadResult(msg, returnValue);
} else {
Expand Down
1 change: 1 addition & 0 deletions src/scheduler/FunctionCallServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ void FunctionCallServer::recvExecuteFunctions(const uint8_t* buffer,
PARSE_MSG(faabric::BatchExecuteRequest, buffer, bufferSize)

// This host has now been told to execute these functions no matter what
// TODO - avoid this copy
scheduler.callFunctions(std::make_shared<faabric::BatchExecuteRequest>(msg),
true);
}
Expand Down
79 changes: 41 additions & 38 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,40 +257,43 @@ std::vector<std::string> Scheduler::callFunctions(
// This ensures everything is up to date, and we don't have to
// maintain different records of which hosts hold which updates.
faabric::util::SnapshotData snapshotData;
std::vector<faabric::util::SnapshotDiff> snapshotDiffs;
std::string snapshotKey = firstMsg.snapshotkey();
bool snapshotNeeded =
req->type() == req->THREADS || req->type() == req->PROCESSES;

if (snapshotNeeded && snapshotKey.empty()) {
SPDLOG_ERROR("No snapshot provided for {}", funcStr);
throw std::runtime_error(
"Empty snapshot for distributed threads/ processes");
}

if (snapshotNeeded) {
if (snapshotKey.empty()) {
SPDLOG_ERROR("No snapshot provided for {}", funcStr);
throw std::runtime_error(
"Empty snapshot for distributed threads/ processes");
}

snapshotData =
faabric::snapshot::getSnapshotRegistry().getSnapshot(snapshotKey);
snapshotDiffs = snapshotData.getDirtyPages();

// Do the snapshot diff pushing
if (!snapshotDiffs.empty()) {
for (const auto& h : thisRegisteredHosts) {
SPDLOG_DEBUG("Pushing {} snapshot diffs for {} to {}",
snapshotDiffs.size(),
funcStr,
h);
SnapshotClient& c = getSnapshotClient(h);
c.pushSnapshotDiffs(snapshotKey, snapshotDiffs);

if (!thisRegisteredHosts.empty()) {
std::vector<faabric::util::SnapshotDiff> snapshotDiffs =
snapshotData.getDirtyPages();

// Do the snapshot diff pushing
if (!snapshotDiffs.empty()) {
for (const auto& h : thisRegisteredHosts) {
SPDLOG_DEBUG("Pushing {} snapshot diffs for {} to {}",
snapshotDiffs.size(),
funcStr,
h);
SnapshotClient& c = getSnapshotClient(h);
c.pushSnapshotDiffs(snapshotKey, snapshotDiffs);
}
}
}

// Now reset the dirty page tracking, as we want the next batch of
// diffs to contain everything from now on (including the updates
// sent back from all the threads)
SPDLOG_DEBUG("Resetting dirty tracking after pushing diffs {}",
funcStr);
faabric::util::resetDirtyTracking();
// Now reset the dirty page tracking, as we want the next batch
// of diffs to contain everything from now on (including the
// updates sent back from all the threads)
SPDLOG_DEBUG("Resetting dirty tracking after pushing diffs {}",
funcStr);
faabric::util::resetDirtyTracking();
}
}

// Work out how many we can handle locally
Expand Down Expand Up @@ -513,6 +516,8 @@ int Scheduler::scheduleFunctionsOnHost(
faabric::util::batchExecFactory();
hostRequest->set_snapshotkey(req->snapshotkey());
hostRequest->set_type(req->type());
hostRequest->set_subtype(req->subtype());
hostRequest->set_contextdata(req->contextdata());

// Add messages
int nOnThisHost = std::min<int>(available, remainder);
Expand Down Expand Up @@ -699,27 +704,25 @@ void Scheduler::registerThread(uint32_t msgId)
void Scheduler::setThreadResult(const faabric::Message& msg,
int32_t returnValue)
{
std::vector<faabric::util::SnapshotDiff> empty;
setThreadResult(msg, returnValue, empty);
bool isMaster = msg.masterhost() == conf.endpointHost;

if (isMaster) {
setThreadResultLocally(msg.id(), returnValue);
} else {
SnapshotClient& c = getSnapshotClient(msg.masterhost());
c.pushThreadResult(msg.id(), returnValue);
}
}

void Scheduler::setThreadResult(
void Scheduler::pushSnapshotDiffs(
const faabric::Message& msg,
int32_t returnValue,
const std::vector<faabric::util::SnapshotDiff>& diffs)
{
bool isMaster = msg.masterhost() == conf.endpointHost;

if (isMaster) {
setThreadResultLocally(msg.id(), returnValue);
} else {
if (!isMaster && !diffs.empty()) {
SnapshotClient& c = getSnapshotClient(msg.masterhost());

if (diffs.empty()) {
c.pushThreadResult(msg.id(), returnValue);
} else {
c.pushThreadResult(msg.id(), returnValue, msg.snapshotkey(), diffs);
}
c.pushSnapshotDiffs(msg.snapshotkey(), diffs);
}
}

Expand Down
70 changes: 14 additions & 56 deletions src/snapshot/SnapshotClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ static std::vector<

static std::vector<std::pair<std::string, std::string>> snapshotDeletes;

static std::vector<
std::pair<std::string,
std::tuple<uint32_t,
int,
std::string,
std::vector<faabric::util::SnapshotDiff>>>>
static std::vector<std::pair<std::string, std::pair<uint32_t, int>>>
threadResults;

std::vector<std::pair<std::string, faabric::util::SnapshotData>>
Expand All @@ -48,12 +43,7 @@ std::vector<std::pair<std::string, std::string>> getSnapshotDeletes()
return snapshotDeletes;
}

std::vector<std::pair<std::string,
std::tuple<uint32_t,
int,
std::string,
std::vector<faabric::util::SnapshotDiff>>>>
getThreadResults()
std::vector<std::pair<std::string, std::pair<uint32_t, int>>> getThreadResults()
{
return threadResults;
}
Expand All @@ -79,7 +69,12 @@ SnapshotClient::SnapshotClient(const std::string& hostIn)
void SnapshotClient::pushSnapshot(const std::string& key,
const faabric::util::SnapshotData& data)
{
SPDLOG_DEBUG("Pushing snapshot {} to {}", key, host);
if (data.size == 0) {
SPDLOG_ERROR("Cannot push snapshot {} with size zero to {}", key, host);
throw std::runtime_error("Pushing snapshot with zero size");
}

SPDLOG_DEBUG("Pushing snapshot {} to {} ({} bytes)", key, host, data.size);

if (faabric::util::isMockMode()) {
faabric::util::UniqueLock lock(mockMutex);
Expand Down Expand Up @@ -154,57 +149,20 @@ void SnapshotClient::deleteSnapshot(const std::string& key)
}

void SnapshotClient::pushThreadResult(uint32_t messageId, int returnValue)
{
std::vector<faabric::util::SnapshotDiff> empty;
pushThreadResult(messageId, returnValue, "", empty);
}

void SnapshotClient::pushThreadResult(
uint32_t messageId,
int returnValue,
const std::string& snapshotKey,
const std::vector<faabric::util::SnapshotDiff>& diffs)
{
if (faabric::util::isMockMode()) {
faabric::util::UniqueLock lock(mockMutex);
threadResults.emplace_back(std::make_pair(
host, std::make_tuple(messageId, returnValue, snapshotKey, diffs)));
threadResults.emplace_back(
std::make_pair(host, std::make_pair(messageId, returnValue)));

} else {
flatbuffers::FlatBufferBuilder mb;
flatbuffers::Offset<ThreadResultRequest> requestOffset;

if (!diffs.empty()) {
SPDLOG_DEBUG(
"Sending thread result for {} to {} (plus {} snapshot diffs)",
messageId,
host,
diffs.size());

// Create objects for the diffs
std::vector<flatbuffers::Offset<SnapshotDiffChunk>> diffsFbVector;
for (const auto& d : diffs) {
auto dataOffset = mb.CreateVector<uint8_t>(d.data, d.size);
auto chunk = CreateSnapshotDiffChunk(mb, d.offset, dataOffset);
diffsFbVector.push_back(chunk);
}

// Create message with diffs
auto diffsOffset = mb.CreateVector(diffsFbVector);

auto keyOffset = mb.CreateString(snapshotKey);
requestOffset = CreateThreadResultRequest(
mb, messageId, returnValue, keyOffset, diffsOffset);
} else {
SPDLOG_DEBUG(
"Sending thread result for {} to {} (with no snapshot diffs)",
messageId,
host);

// Create message without diffs
requestOffset =
CreateThreadResultRequest(mb, messageId, returnValue);
}
SPDLOG_DEBUG("Sending thread result for {} to {}", messageId, host);

// Create message without diffs
requestOffset = CreateThreadResultRequest(mb, messageId, returnValue);

mb.Finish(requestOffset);
SEND_FB_MSG_ASYNC(SnapshotCalls::ThreadResult, mb)
Expand Down
Loading