From a564f794078186df314e99c4868a833df60a743a Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 5 Jul 2021 16:58:43 +0000 Subject: [PATCH 01/15] Check if we have registered hosts before doing snapshot pushes --- src/scheduler/Scheduler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 0869a9be0..f179ff063 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -268,7 +268,7 @@ std::vector Scheduler::callFunctions( "Empty snapshot for distributed threads/ processes"); } - if (snapshotNeeded) { + if (snapshotNeeded && !registeredHosts.empty()) { snapshotData = faabric::snapshot::getSnapshotRegistry().getSnapshot(snapshotKey); snapshotDiffs = snapshotData.getDirtyPages(); From 4beab684474881cbcfcdd477c10881ba4e6efb35 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 5 Jul 2021 17:05:51 +0000 Subject: [PATCH 02/15] Started splitting thread results and snapshot diffs --- include/faabric/scheduler/Scheduler.h | 3 +-- include/faabric/snapshot/SnapshotClient.h | 6 ------ src/flat/faabric.fbs | 2 -- src/scheduler/Executor.cpp | 10 ++-------- src/scheduler/Scheduler.cpp | 24 +++++++++++------------ 5 files changed, 14 insertions(+), 31 deletions(-) diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 423e1f860..a0c4b9d5d 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -117,8 +117,7 @@ class Scheduler void setThreadResult(const faabric::Message& msg, int32_t returnValue); - void setThreadResult(const faabric::Message& msg, - int32_t returnValue, + void pushSnapshotDiffs(const faabric::Message& msg, const std::vector& diffs); void setThreadResultLocally(uint32_t msgId, int32_t returnValue); diff --git a/include/faabric/snapshot/SnapshotClient.h b/include/faabric/snapshot/SnapshotClient.h index 3129755ce..e6a32d23e 100644 --- a/include/faabric/snapshot/SnapshotClient.h +++ b/include/faabric/snapshot/SnapshotClient.h @@ -50,12 +50,6 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient void pushThreadResult(uint32_t messageId, int returnValue); - void pushThreadResult( - uint32_t messageId, - int returnValue, - const std::string& snapshotKey, - const std::vector& diffs); - private: void sendHeader(faabric::snapshot::SnapshotCalls call); }; diff --git a/src/flat/faabric.fbs b/src/flat/faabric.fbs index 80bf93568..9a9509f88 100644 --- a/src/flat/faabric.fbs +++ b/src/flat/faabric.fbs @@ -20,6 +20,4 @@ table SnapshotDiffPushRequest { table ThreadResultRequest { message_id:int; return_value:int; - key:string; - chunks:[SnapshotDiffChunk]; } diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 8fb228cee..5ba7e3eb9 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -233,6 +233,7 @@ void Executor::threadPoolThread(int threadPoolIdx) // Get diffs faabric::util::SnapshotData d = snapshot(); diffs = d.getDirtyPages(); + sch.pushSnapshotDiffs(msg, diffs); // Reset dirty page tracking now that we've got the diffs faabric::util::resetDirtyTracking(); @@ -255,14 +256,7 @@ void Executor::threadPoolThread(int threadPoolIdx) // on its result to continue execution, therefore must be done once the // executor has been reset, otherwise the executor may not be reused for // a repeat invocation. - if (isLastTask && isThreads) { - // Send diffs along with thread result - SPDLOG_DEBUG("Task {} finished, returning {} snapshot diffs", - msg.id(), - diffs.size()); - - sch.setThreadResult(msg, returnValue, diffs); - } else if (isThreads) { + if (isThreads) { // Set non-final thread result sch.setThreadResult(msg, returnValue); } else { diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index f179ff063..5516577c9 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -699,27 +699,25 @@ void Scheduler::registerThread(uint32_t msgId) void Scheduler::setThreadResult(const faabric::Message& msg, int32_t returnValue) { - std::vector empty; - setThreadResult(msg, returnValue, empty); + bool isMaster = msg.masterhost() == conf.endpointHost; + + if (isMaster) { + setThreadResultLocally(msg.id(), returnValue); + } else { + SnapshotClient& c = getSnapshotClient(msg.masterhost()); + c.pushThreadResult(msg.id(), returnValue); + } } -void Scheduler::setThreadResult( +void Scheduler::pushSnapshotDiffs( const faabric::Message& msg, - int32_t returnValue, const std::vector& diffs) { bool isMaster = msg.masterhost() == conf.endpointHost; - if (isMaster) { - setThreadResultLocally(msg.id(), returnValue); - } else { + if (!isMaster && !diffs.empty()) { SnapshotClient& c = getSnapshotClient(msg.masterhost()); - - if (diffs.empty()) { - c.pushThreadResult(msg.id(), returnValue); - } else { - c.pushThreadResult(msg.id(), returnValue, msg.snapshotkey(), diffs); - } + c.pushSnapshotDiffs(msg.snapshotkey(), diffs); } } From f00ef03cbd77fc449a9539a4a535300fde902e99 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 6 Jul 2021 06:20:51 +0000 Subject: [PATCH 03/15] Fix up compilation and tests --- include/faabric/scheduler/Scheduler.h | 5 +- include/faabric/snapshot/SnapshotClient.h | 6 +- include/faabric/snapshot/SnapshotServer.h | 5 -- src/snapshot/SnapshotClient.cpp | 66 +++------------- src/snapshot/SnapshotServer.cpp | 24 ++---- tests/test/scheduler/test_executor.cpp | 27 ++----- tests/test/scheduler/test_scheduler.cpp | 29 +------ .../test_snapshot_client_server.cpp | 76 ++++++++----------- 8 files changed, 64 insertions(+), 174 deletions(-) rename tests/test/{scheduler => snapshot}/test_snapshot_client_server.cpp (78%) diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index a0c4b9d5d..bb2f86d56 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -117,8 +117,9 @@ class Scheduler void setThreadResult(const faabric::Message& msg, int32_t returnValue); - void pushSnapshotDiffs(const faabric::Message& msg, - const std::vector& diffs); + void pushSnapshotDiffs( + const faabric::Message& msg, + const std::vector& diffs); void setThreadResultLocally(uint32_t msgId, int32_t returnValue); diff --git a/include/faabric/snapshot/SnapshotClient.h b/include/faabric/snapshot/SnapshotClient.h index e6a32d23e..77225cd54 100644 --- a/include/faabric/snapshot/SnapshotClient.h +++ b/include/faabric/snapshot/SnapshotClient.h @@ -20,11 +20,7 @@ getSnapshotDiffPushes(); std::vector> getSnapshotDeletes(); -std::vector>>> +std::vector>> getThreadResults(); void clearMockSnapshotRequests(); diff --git a/include/faabric/snapshot/SnapshotServer.h b/include/faabric/snapshot/SnapshotServer.h index 9919c282b..6cb6a88ed 100644 --- a/include/faabric/snapshot/SnapshotServer.h +++ b/include/faabric/snapshot/SnapshotServer.h @@ -30,10 +30,5 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer void recvDeleteSnapshot(const uint8_t* buffer, size_t bufferSize); void recvThreadResult(const uint8_t* buffer, size_t bufferSize); - - private: - void applyDiffsToSnapshot( - const std::string& snapshotKey, - const flatbuffers::Vector>* diffs); }; } diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index deff086dd..4d874c9a4 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -23,12 +23,7 @@ static std::vector< static std::vector> snapshotDeletes; -static std::vector< - std::pair>>> +static std::vector>> threadResults; std::vector> @@ -48,12 +43,7 @@ std::vector> getSnapshotDeletes() return snapshotDeletes; } -std::vector>>> -getThreadResults() +std::vector>> getThreadResults() { return threadResults; } @@ -154,57 +144,23 @@ void SnapshotClient::deleteSnapshot(const std::string& key) } void SnapshotClient::pushThreadResult(uint32_t messageId, int returnValue) -{ - std::vector empty; - pushThreadResult(messageId, returnValue, "", empty); -} - -void SnapshotClient::pushThreadResult( - uint32_t messageId, - int returnValue, - const std::string& snapshotKey, - const std::vector& diffs) { if (faabric::util::isMockMode()) { faabric::util::UniqueLock lock(mockMutex); - threadResults.emplace_back(std::make_pair( - host, std::make_tuple(messageId, returnValue, snapshotKey, diffs))); + threadResults.emplace_back( + std::make_pair(host, std::make_pair(messageId, returnValue))); } else { flatbuffers::FlatBufferBuilder mb; flatbuffers::Offset requestOffset; - if (!diffs.empty()) { - SPDLOG_DEBUG( - "Sending thread result for {} to {} (plus {} snapshot diffs)", - messageId, - host, - diffs.size()); - - // Create objects for the diffs - std::vector> diffsFbVector; - for (const auto& d : diffs) { - auto dataOffset = mb.CreateVector(d.data, d.size); - auto chunk = CreateSnapshotDiffChunk(mb, d.offset, dataOffset); - diffsFbVector.push_back(chunk); - } - - // Create message with diffs - auto diffsOffset = mb.CreateVector(diffsFbVector); - - auto keyOffset = mb.CreateString(snapshotKey); - requestOffset = CreateThreadResultRequest( - mb, messageId, returnValue, keyOffset, diffsOffset); - } else { - SPDLOG_DEBUG( - "Sending thread result for {} to {} (with no snapshot diffs)", - messageId, - host); - - // Create message without diffs - requestOffset = - CreateThreadResultRequest(mb, messageId, returnValue); - } + SPDLOG_DEBUG( + "Sending thread result for {} to {} (with no snapshot diffs)", + messageId, + host); + + // Create message without diffs + requestOffset = CreateThreadResultRequest(mb, messageId, returnValue); mb.Finish(requestOffset); SEND_FB_MSG_ASYNC(SnapshotCalls::ThreadResult, mb) diff --git a/src/snapshot/SnapshotServer.cpp b/src/snapshot/SnapshotServer.cpp index be6bf086f..d8b011be8 100644 --- a/src/snapshot/SnapshotServer.cpp +++ b/src/snapshot/SnapshotServer.cpp @@ -90,12 +90,6 @@ void SnapshotServer::recvThreadResult(const uint8_t* buffer, size_t bufferSize) const ThreadResultRequest* r = flatbuffers::GetRoot(buffer); - // Apply snapshot diffs *first* (these must be applied before other threads - // can continue) - if (r->chunks() != nullptr && r->chunks()->size() > 0) { - applyDiffsToSnapshot(r->key()->str(), r->chunks()); - } - SPDLOG_DEBUG("Receiving thread result {} for message {}", r->return_value(), r->message_id()); @@ -110,30 +104,22 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) const SnapshotDiffPushRequest* r = flatbuffers::GetRoot(buffer); - applyDiffsToSnapshot(r->key()->str(), r->chunks()); - - // Send response - return std::make_unique(); -} - -void SnapshotServer::applyDiffsToSnapshot( - const std::string& snapshotKey, - const flatbuffers::Vector>* diffs) -{ SPDLOG_DEBUG( - "Applying {} diffs to snapshot {}", diffs->size(), snapshotKey); + "Applying {} diffs to snapshot {}", r->chunks()->size(), r->key()->str()); // Get the snapshot faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); - faabric::util::SnapshotData& snap = reg.getSnapshot(snapshotKey); + faabric::util::SnapshotData& snap = reg.getSnapshot(r->key()->str()); // Copy diffs to snapshot - for (const auto* r : *diffs) { + for (const auto* r : *r->chunks()) { const uint8_t* chunkPtr = r->data()->data(); uint8_t* dest = snap.data + r->offset(); std::memcpy(dest, chunkPtr, r->data()->size()); } + // Send response + return std::make_unique(); } void SnapshotServer::recvDeleteSnapshot(const uint8_t* buffer, diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 806bf3cc7..16c8ba620 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -489,7 +489,7 @@ TEST_CASE_METHOD(TestExecutorFixture, for (auto& r : results) { REQUIRE(r.first == thisHost); auto args = r.second; - sch.setThreadResultLocally(std::get<0>(args), std::get<1>(args)); + sch.setThreadResultLocally(args.first, args.second); } // Rejoin the other thread @@ -531,8 +531,8 @@ TEST_CASE_METHOD(TestExecutorFixture, std::vector actualMessageIds; for (auto& p : actual) { REQUIRE(p.first == otherHost); - uint32_t messageId = std::get<0>(p.second); - int32_t returnValue = std::get<1>(p.second); + uint32_t messageId = p.second.first; + int32_t returnValue = p.second.second; REQUIRE(returnValue == messageId / 100); actualMessageIds.push_back(messageId); @@ -720,22 +720,11 @@ TEST_CASE_METHOD(TestExecutorFixture, auto actualResults = faabric::snapshot::getThreadResults(); REQUIRE(actualResults.size() == nThreads); - // Check only one has diffs attached - std::vector diffList; - for (const auto& res : actualResults) { - REQUIRE(res.first == otherHost); - - std::vector thisDiffs = - std::get<3>(res.second); - if (thisDiffs.empty()) { - continue; - } - - if (!diffList.empty()) { - FAIL("Found more than one thread result with diffs"); - } - diffList = thisDiffs; - } + // Check diffs also sent + auto diffs = faabric::snapshot::getSnapshotDiffPushes(); + REQUIRE(diffs.size() == 1); + REQUIRE(diffs.at(0).first == otherHost); + std::vector diffList = diffs.at(0).second; // Each thread should have edited one page, check diffs are correct REQUIRE(diffList.size() == nThreads); diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 2a0e41c44..a5f86b05d 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -773,37 +773,14 @@ TEST_CASE_METHOD(SlowExecutorFixture, sch.setThreadResult(msg, returnValue); } - SECTION("With diffs") - { - snapshotKey = "blahblah"; - msg.set_snapshotkey(snapshotKey); - - std::vector diffDataA(10, 1); - std::vector diffDataB(20, 2); - - diffs = { - { 0, diffDataA.data(), diffDataA.size() }, - { 50, diffDataB.data(), diffDataB.size() }, - }; - - // Set the thread result - sch.setThreadResult(msg, returnValue, diffs); - } - - // Check the results have been pushed along with the thread result auto actualResults = faabric::snapshot::getThreadResults(); REQUIRE(actualResults.size() == 1); REQUIRE(actualResults.at(0).first == "otherHost"); - auto actualTuple = actualResults.at(0).second; - REQUIRE(std::get<0>(actualTuple) == msg.id()); - REQUIRE(std::get<1>(actualTuple) == returnValue); - REQUIRE(std::get<2>(actualTuple) == snapshotKey); - - std::vector actualDiffs = - std::get<3>(actualTuple); - REQUIRE(actualDiffs.size() == diffs.size()); + auto actualPair = actualResults.at(0).second; + REQUIRE(actualPair.first == msg.id()); + REQUIRE(actualPair.second == returnValue); } TEST_CASE_METHOD(DummyExecutorFixture, "Test executor reuse", "[scheduler]") diff --git a/tests/test/scheduler/test_snapshot_client_server.cpp b/tests/test/snapshot/test_snapshot_client_server.cpp similarity index 78% rename from tests/test/scheduler/test_snapshot_client_server.cpp rename to tests/test/snapshot/test_snapshot_client_server.cpp index b6b3ef8db..72bddd6a4 100644 --- a/tests/test/scheduler/test_snapshot_client_server.cpp +++ b/tests/test/snapshot/test_snapshot_client_server.cpp @@ -37,7 +37,7 @@ class SnapshotClientServerFixture TEST_CASE_METHOD(SnapshotClientServerFixture, "Test pushing and deleting snapshots", - "[scheduler]") + "[snapshot]") { // Check nothing to start with REQUIRE(reg.getSnapshotCount() == 0); @@ -91,17 +91,9 @@ void checkDiffsApplied(const uint8_t* snapBase, } TEST_CASE_METHOD(SnapshotClientServerFixture, - "Test set thread result", - "[scheduler]") + "Test push snapshot diffs", + "[snapshot]") { - // Register threads on this host - int threadIdA = 123; - int threadIdB = 345; - int returnValueA = 88; - int returnValueB = 99; - sch.registerThread(threadIdA); - sch.registerThread(threadIdB); - // Set up a snapshot faabric::util::SnapshotData snap; snap.size = 5 * faabric::util::HOST_PAGE_SIZE; @@ -119,34 +111,38 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, std::vector diffsA; std::vector diffsB; - SECTION("Without diffs") - { - cli.pushThreadResult(threadIdA, returnValueA); - cli.pushThreadResult(threadIdB, returnValueB); - } + faabric::util::SnapshotDiff diffA1(5, diffDataA1.data(), diffDataA1.size()); + faabric::util::SnapshotDiff diffA2( + 2 * faabric::util::HOST_PAGE_SIZE, diffDataA2.data(), diffDataA2.size()); + diffsA = { diffA1, diffA2 }; + cli.pushSnapshotDiffs(snapKey, diffsA); - SECTION("Empty diffs") - { - cli.pushThreadResult(threadIdA, returnValueA, snapKey, diffsA); - cli.pushThreadResult(threadIdB, returnValueB, snapKey, diffsB); - } + faabric::util::SnapshotDiff diffB( + 3 * faabric::util::HOST_PAGE_SIZE, diffDataB.data(), diffDataB.size()); + diffsB = { diffB }; + cli.pushSnapshotDiffs(snapKey, diffsB); - SECTION("With diffs") - { - faabric::util::SnapshotDiff diffA1( - 5, diffDataA1.data(), diffDataA1.size()); - faabric::util::SnapshotDiff diffA2(2 * faabric::util::HOST_PAGE_SIZE, - diffDataA2.data(), - diffDataA2.size()); - diffsA = { diffA1, diffA2 }; - cli.pushThreadResult(threadIdA, returnValueA, snapKey, diffsA); - - faabric::util::SnapshotDiff diffB(3 * faabric::util::HOST_PAGE_SIZE, - diffDataB.data(), - diffDataB.size()); - diffsB = { diffB }; - cli.pushThreadResult(threadIdB, returnValueB, snapKey, diffsB); - } + // Check changes have been applied + checkDiffsApplied(snap.data, diffsA); + checkDiffsApplied(snap.data, diffsB); + + munmap(snap.data, snap.size); +} + +TEST_CASE_METHOD(SnapshotClientServerFixture, + "Test set thread result", + "[snapshot]") +{ + // Register threads on this host + int threadIdA = 123; + int threadIdB = 345; + int returnValueA = 88; + int returnValueB = 99; + sch.registerThread(threadIdA); + sch.registerThread(threadIdB); + + cli.pushThreadResult(threadIdA, returnValueA); + cli.pushThreadResult(threadIdB, returnValueB); // Set up two threads to await the results std::thread tA([threadIdA, returnValueA] { @@ -168,11 +164,5 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, if (tB.joinable()) { tB.join(); } - - // Check changes have been applied - checkDiffsApplied(snap.data, diffsA); - checkDiffsApplied(snap.data, diffsB); - - munmap(snap.data, snap.size); } } From 684875d20738e52100dd9b0b2db8910b81a9ea06 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 6 Jul 2021 06:29:08 +0000 Subject: [PATCH 04/15] Small rearrange --- src/scheduler/Executor.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 5ba7e3eb9..acbc46d71 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -161,7 +161,7 @@ void Executor::threadPoolThread(int threadPoolIdx) SPDLOG_DEBUG("Thread pool thread {}:{} starting up", id, threadPoolIdx); auto& sch = faabric::scheduler::getScheduler(); - auto& conf = faabric::util::getSystemConfig(); + const auto& conf = faabric::util::getSystemConfig(); bool selfShutdown = false; @@ -226,13 +226,13 @@ void Executor::threadPoolThread(int threadPoolIdx) threadPoolIdx, executingTaskCount); - // Get snapshot diffs _before_ we reset the executor + // Handle snapshot diffs _before_ we reset the executor bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; - std::vector diffs; if (isLastTask && isThreads) { // Get diffs faabric::util::SnapshotData d = snapshot(); - diffs = d.getDirtyPages(); + + std::vector diffs = d.getDirtyPages(); sch.pushSnapshotDiffs(msg, diffs); // Reset dirty page tracking now that we've got the diffs From 378f0c1955547246f72557449dda1bf633846d28 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 6 Jul 2021 06:48:57 +0000 Subject: [PATCH 05/15] Update comment --- src/snapshot/SnapshotClient.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index 4d874c9a4..bfa97e1a5 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -155,7 +155,7 @@ void SnapshotClient::pushThreadResult(uint32_t messageId, int returnValue) flatbuffers::Offset requestOffset; SPDLOG_DEBUG( - "Sending thread result for {} to {} (with no snapshot diffs)", + "Sending thread result for {} to {}", messageId, host); From 3f77e3b7813bc12701b59d410cfb66062e068c64 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 6 Jul 2021 08:58:49 +0000 Subject: [PATCH 06/15] Push snapshot diffs whenever pending --- include/faabric/scheduler/Scheduler.h | 2 ++ src/scheduler/Executor.cpp | 14 ++++++++------ src/scheduler/Scheduler.cpp | 2 +- src/util/memory.cpp | 2 ++ 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index bb2f86d56..f151a1e0a 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -65,6 +65,8 @@ class Executor std::atomic claimed = false; + std::atomic pendingSnapshotPush = false; + std::atomic executingTaskCount = 0; std::mutex threadsMutex; diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index acbc46d71..6fbeeefab 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -124,6 +124,7 @@ void Executor::executeTasks(std::vector msgIdxs, // Note this must be done after the restore has happened if (isThreads && isSnapshot) { faabric::util::resetDirtyTracking(); + pendingSnapshotPush = true; } // Set executing task count @@ -196,11 +197,13 @@ void Executor::threadPoolThread(int threadPoolIdx) assert(req->messages_size() >= msgIdx + 1); faabric::Message& msg = req->mutable_messages()->at(msgIdx); - SPDLOG_TRACE("Thread {}:{} executing task {} ({})", + bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; + SPDLOG_TRACE("Thread {}:{} executing task {} ({}, thread={})", id, threadPoolIdx, msgIdx, - msg.id()); + msg.id(), + isThreads); int32_t returnValue; try { @@ -224,19 +227,18 @@ void Executor::threadPoolThread(int threadPoolIdx) msg.id(), id, threadPoolIdx, - executingTaskCount); + oldTaskCount - 1); // Handle snapshot diffs _before_ we reset the executor - bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; - if (isLastTask && isThreads) { + if (isLastTask && pendingSnapshotPush) { // Get diffs faabric::util::SnapshotData d = snapshot(); - std::vector diffs = d.getDirtyPages(); sch.pushSnapshotDiffs(msg, diffs); // Reset dirty page tracking now that we've got the diffs faabric::util::resetDirtyTracking(); + pendingSnapshotPush = false; } // If this batch is finished, reset the executor and release its claim. diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 5516577c9..331ecd138 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -268,7 +268,7 @@ std::vector Scheduler::callFunctions( "Empty snapshot for distributed threads/ processes"); } - if (snapshotNeeded && !registeredHosts.empty()) { + if (snapshotNeeded && !thisRegisteredHosts.empty()) { snapshotData = faabric::snapshot::getSnapshotRegistry().getSnapshot(snapshotKey); snapshotDiffs = snapshotData.getDirtyPages(); diff --git a/src/util/memory.cpp b/src/util/memory.cpp index 7f756ec81..9b9e8d8d0 100644 --- a/src/util/memory.cpp +++ b/src/util/memory.cpp @@ -97,6 +97,8 @@ void resetDirtyTracking() throw std::runtime_error("Failed to write to clear_refs"); } + SPDLOG_TRACE("Reset dirty page tracking"); + fclose(fd); } From 997a2e2ab2fbd338b0e7f2fd5682e3db9dc91127 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 6 Jul 2021 09:39:19 +0000 Subject: [PATCH 07/15] Add checks for zero size snapshots --- src/snapshot/SnapshotClient.cpp | 7 ++++++- src/snapshot/SnapshotRegistry.cpp | 5 +++++ src/snapshot/SnapshotServer.cpp | 5 +++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index bfa97e1a5..1fb971eaa 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -69,7 +69,12 @@ SnapshotClient::SnapshotClient(const std::string& hostIn) void SnapshotClient::pushSnapshot(const std::string& key, const faabric::util::SnapshotData& data) { - SPDLOG_DEBUG("Pushing snapshot {} to {}", key, host); + if(data.size == 0) { + SPDLOG_ERROR("Cannot push snapshot {} with size zero to {}", key, host); + throw std::runtime_error("Pushing snapshot with zero size"); + } + + SPDLOG_DEBUG("Pushing snapshot {} to {} ({} bytes)", key, host, data.size); if (faabric::util::isMockMode()) { faabric::util::UniqueLock lock(mockMutex); diff --git a/src/snapshot/SnapshotRegistry.cpp b/src/snapshot/SnapshotRegistry.cpp index b5e7a53df..a651032f8 100644 --- a/src/snapshot/SnapshotRegistry.cpp +++ b/src/snapshot/SnapshotRegistry.cpp @@ -50,6 +50,11 @@ void SnapshotRegistry::takeSnapshot(const std::string& key, faabric::util::SnapshotData data, bool locallyRestorable) { + if(data.size == 0) { + SPDLOG_ERROR("Cannot take snapshot {} of size zero", key); + throw std::runtime_error("Taking snapshot size zero"); + } + // Note - we only preserve the snapshot in the in-memory file, and do not // take ownership for the original data referenced in SnapshotData faabric::util::UniqueLock lock(snapshotsMx); diff --git a/src/snapshot/SnapshotServer.cpp b/src/snapshot/SnapshotServer.cpp index d8b011be8..aaf9a2f8a 100644 --- a/src/snapshot/SnapshotServer.cpp +++ b/src/snapshot/SnapshotServer.cpp @@ -60,6 +60,11 @@ std::unique_ptr SnapshotServer::recvPushSnapshot( const SnapshotPushRequest* r = flatbuffers::GetRoot(buffer); + if (r->contents()->size() == 0) { + SPDLOG_ERROR("Received shapshot {} with zero size", r->key()->c_str()); + throw std::runtime_error("Received snapshot with zero size"); + } + SPDLOG_DEBUG("Receiving shapshot {} (size {})", r->key()->c_str(), r->contents()->size()); From 9a21b627ea3b66985f3cb94f8a47fa60b629d58f Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 6 Jul 2021 10:11:12 +0000 Subject: [PATCH 08/15] Slight rework of conditional around snapshots in scheduler --- src/scheduler/Scheduler.cpp | 53 ++++++++++++++++--------------- src/snapshot/SnapshotClient.cpp | 7 ++-- src/snapshot/SnapshotRegistry.cpp | 7 +++- 3 files changed, 36 insertions(+), 31 deletions(-) diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 331ecd138..7f025cbc9 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -257,40 +257,43 @@ std::vector Scheduler::callFunctions( // This ensures everything is up to date, and we don't have to // maintain different records of which hosts hold which updates. faabric::util::SnapshotData snapshotData; - std::vector snapshotDiffs; std::string snapshotKey = firstMsg.snapshotkey(); bool snapshotNeeded = req->type() == req->THREADS || req->type() == req->PROCESSES; - if (snapshotNeeded && snapshotKey.empty()) { - SPDLOG_ERROR("No snapshot provided for {}", funcStr); - throw std::runtime_error( - "Empty snapshot for distributed threads/ processes"); - } + if (snapshotNeeded) { + if (snapshotKey.empty()) { + SPDLOG_ERROR("No snapshot provided for {}", funcStr); + throw std::runtime_error( + "Empty snapshot for distributed threads/ processes"); + } - if (snapshotNeeded && !thisRegisteredHosts.empty()) { snapshotData = faabric::snapshot::getSnapshotRegistry().getSnapshot(snapshotKey); - snapshotDiffs = snapshotData.getDirtyPages(); - - // Do the snapshot diff pushing - if (!snapshotDiffs.empty()) { - for (const auto& h : thisRegisteredHosts) { - SPDLOG_DEBUG("Pushing {} snapshot diffs for {} to {}", - snapshotDiffs.size(), - funcStr, - h); - SnapshotClient& c = getSnapshotClient(h); - c.pushSnapshotDiffs(snapshotKey, snapshotDiffs); + + if (!thisRegisteredHosts.empty()) { + std::vector snapshotDiffs = + snapshotData.getDirtyPages(); + + // Do the snapshot diff pushing + if (!snapshotDiffs.empty()) { + for (const auto& h : thisRegisteredHosts) { + SPDLOG_DEBUG("Pushing {} snapshot diffs for {} to {}", + snapshotDiffs.size(), + funcStr, + h); + SnapshotClient& c = getSnapshotClient(h); + c.pushSnapshotDiffs(snapshotKey, snapshotDiffs); + } } - } - // Now reset the dirty page tracking, as we want the next batch of - // diffs to contain everything from now on (including the updates - // sent back from all the threads) - SPDLOG_DEBUG("Resetting dirty tracking after pushing diffs {}", - funcStr); - faabric::util::resetDirtyTracking(); + // Now reset the dirty page tracking, as we want the next batch + // of diffs to contain everything from now on (including the + // updates sent back from all the threads) + SPDLOG_DEBUG("Resetting dirty tracking after pushing diffs {}", + funcStr); + faabric::util::resetDirtyTracking(); + } } // Work out how many we can handle locally diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index 1fb971eaa..c16176cd6 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -69,7 +69,7 @@ SnapshotClient::SnapshotClient(const std::string& hostIn) void SnapshotClient::pushSnapshot(const std::string& key, const faabric::util::SnapshotData& data) { - if(data.size == 0) { + if (data.size == 0) { SPDLOG_ERROR("Cannot push snapshot {} with size zero to {}", key, host); throw std::runtime_error("Pushing snapshot with zero size"); } @@ -159,10 +159,7 @@ void SnapshotClient::pushThreadResult(uint32_t messageId, int returnValue) flatbuffers::FlatBufferBuilder mb; flatbuffers::Offset requestOffset; - SPDLOG_DEBUG( - "Sending thread result for {} to {}", - messageId, - host); + SPDLOG_DEBUG("Sending thread result for {} to {}", messageId, host); // Create message without diffs requestOffset = CreateThreadResultRequest(mb, messageId, returnValue); diff --git a/src/snapshot/SnapshotRegistry.cpp b/src/snapshot/SnapshotRegistry.cpp index a651032f8..e754a6a58 100644 --- a/src/snapshot/SnapshotRegistry.cpp +++ b/src/snapshot/SnapshotRegistry.cpp @@ -50,11 +50,16 @@ void SnapshotRegistry::takeSnapshot(const std::string& key, faabric::util::SnapshotData data, bool locallyRestorable) { - if(data.size == 0) { + if (data.size == 0) { SPDLOG_ERROR("Cannot take snapshot {} of size zero", key); throw std::runtime_error("Taking snapshot size zero"); } + SPDLOG_TRACE("Registering snapshot {} size {} (restorable={})", + key, + data.size, + locallyRestorable); + // Note - we only preserve the snapshot in the in-memory file, and do not // take ownership for the original data referenced in SnapshotData faabric::util::UniqueLock lock(snapshotsMx); From acaed2ae8771c77924c45d819cd9b91208e8534d Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 6 Jul 2021 12:06:37 +0000 Subject: [PATCH 09/15] Added test for snapshots from child threads --- src/scheduler/Executor.cpp | 2 +- tests/dist/scheduler/functions.cpp | 108 ++++++++++++++++++++++++ tests/dist/scheduler/test_snapshots.cpp | 32 ++++++- 3 files changed, 139 insertions(+), 3 deletions(-) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 6fbeeefab..59b66929f 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -224,7 +224,7 @@ void Executor::threadPoolThread(int threadPoolIdx) bool isLastTask = oldTaskCount == 1; SPDLOG_TRACE("Task {} finished by thread {}:{} ({} left)", - msg.id(), + faabric::util::funcToString(msg, true), id, threadPoolIdx, oldTaskCount - 1); diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index 2ff080c50..f0dec0075 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -4,12 +4,15 @@ #include "DistTestExecutor.h" #include "init.h" +#include + #include #include #include #include #include #include +#include namespace tests { @@ -74,6 +77,108 @@ int handleFakeDiffsFunction(faabric::scheduler::Executor* exec, return 123; } +int handleFakeDiffsThreadedFunction( + faabric::scheduler::Executor* exec, + int threadPoolIdx, + int msgIdx, + std::shared_ptr req) +{ + bool isThread = req->type() == faabric::BatchExecuteRequest::THREADS; + faabric::Message& msg = req->mutable_messages()->at(msgIdx); + std::string snapshotKey = "fake-diffs-threaded-snap"; + std::string msgInput = msg.inputdata(); + + // This function creates a snapshot, then spawns some child threads that + // will modify the shared memory. It then awaits the results and checks that + // the modifications are synced back to the original host. + if (!isThread) { + int nThreads = std::stoi(msgInput); + + // Set up the snapshot + size_t snapSize = (nThreads * 4) * faabric::util::HOST_PAGE_SIZE; + uint8_t* snapMemory = (uint8_t*)mmap( + nullptr, snapSize, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + + faabric::util::SnapshotData snap; + snap.data = snapMemory; + snap.size = snapSize; + + faabric::snapshot::SnapshotRegistry& reg = + faabric::snapshot::getSnapshotRegistry(); + reg.takeSnapshot(snapshotKey, snap); + + auto req = + faabric::util::batchExecFactory(msg.user(), msg.function(), nThreads); + req->set_type(faabric::BatchExecuteRequest::THREADS); + + for (int i = 0; i < nThreads; i++) { + auto& m = req->mutable_messages()->at(i); + m.set_appindex(i); + m.set_inputdata(std::string("thread_" + std::to_string(i))); + m.set_snapshotkey(snapshotKey); + } + + // Dispatch the message, we expect them all to be executed other hosts + std::string thisHost = faabric::util::getSystemConfig().endpointHost; + faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); + std::vector executedHosts = sch.callFunctions(req); + + bool rightHosts = true; + for (auto& h : executedHosts) { + if (h == thisHost) { + SPDLOG_ERROR("Expected child threads to be executed on other " + "hosts (this host {}, actual host {})", + thisHost, + h); + rightHosts = false; + } + } + + if (!rightHosts) { + return 111; + } + + // Wait for the threads + for (auto& m : req->messages()) { + sch.awaitThreadResult(m.id()); + } + + // Check that the changes have been made to the snapshot memory + bool diffsApplied = true; + for (int i = 0; i < nThreads; i++) { + uint32_t offset = 2 * i * faabric::util::HOST_PAGE_SIZE; + std::string expectedData("thread_" + std::to_string(i)); + auto* charPtr = reinterpret_cast(snapMemory + offset); + std::string actual(charPtr); + + if (actual != expectedData) { + SPDLOG_ERROR( + "Diff not as expected. {} != {}", actual, expectedData); + diffsApplied = false; + } + } + + if (!diffsApplied) { + return 222; + } + + } else { + int idx = msg.appindex(); + uint32_t offset = 2 * idx * faabric::util::HOST_PAGE_SIZE; + + // Modify the executor's memory + std::vector inputBytes = + faabric::util::stringToBytes(msgInput); + + faabric::util::SnapshotData snap = exec->snapshot(); + std::memcpy(snap.data + offset, inputBytes.data(), inputBytes.size()); + + return 0; + } + + return 333; +} + void registerSchedulerTestFunctions() { registerDistTestExecutorCallback("threads", "simple", handleSimpleThread); @@ -82,5 +187,8 @@ void registerSchedulerTestFunctions() registerDistTestExecutorCallback( "snapshots", "fake-diffs", handleFakeDiffsFunction); + + registerDistTestExecutorCallback( + "snapshots", "fake-diffs-threaded", handleFakeDiffsThreadedFunction); } } diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index 9298c5ce7..d7ee5417b 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -22,7 +22,8 @@ TEST_CASE_METHOD(DistTestsFixture, "Check snapshots sent back from worker are applied", "[snapshots]") { - // Set up the snapshot + std::string user = "snapshots"; + std::string function = "fake-diffs"; std::string snapshotKey = "dist-snap-check"; size_t snapSize = 2 * faabric::util::HOST_PAGE_SIZE; @@ -38,7 +39,7 @@ TEST_CASE_METHOD(DistTestsFixture, // Invoke the function that ought to send back some snapshot diffs that // should be applied std::shared_ptr req = - faabric::util::batchExecFactory("snapshots", "fake-diffs", 1); + faabric::util::batchExecFactory(user, function, 1); req->set_type(faabric::BatchExecuteRequest::THREADS); // Set up some input data @@ -73,4 +74,31 @@ TEST_CASE_METHOD(DistTestsFixture, REQUIRE(actualA == expectedA); REQUIRE(actualB == expectedB); } + +TEST_CASE_METHOD(DistTestsFixture, + "Check snapshots sent back from child threads", + "[snapshots]") +{ + std::string user = "snapshots"; + std::string function = "fake-diffs-threaded"; + int nThreads = 3; + + std::shared_ptr req = + faabric::util::batchExecFactory(user, function, 1); + faabric::Message& m = req->mutable_messages()->at(0); + m.set_inputdata(std::to_string(nThreads)); + + // Force the function itself to be executed on this host, but its child + // threads on another host + faabric::HostResources res; + res.set_slots(1); + sch.setThisHostResources(res); + + std::vector expectedHosts = { MASTER_IP }; + std::vector executedHosts = sch.callFunctions(req); + REQUIRE(expectedHosts == executedHosts); + + int actualResult = sch.awaitThreadResult(m.id()); + REQUIRE(actualResult == 333); +} } From 9af666b3ad8692b0835dede9980db768e2b6388b Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 6 Jul 2021 12:09:43 +0000 Subject: [PATCH 10/15] Correct wait statement in test --- tests/dist/scheduler/test_snapshots.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index d7ee5417b..f408378ad 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -98,7 +98,7 @@ TEST_CASE_METHOD(DistTestsFixture, std::vector executedHosts = sch.callFunctions(req); REQUIRE(expectedHosts == executedHosts); - int actualResult = sch.awaitThreadResult(m.id()); - REQUIRE(actualResult == 333); + faabric::Message actualResult = sch.getFunctionResult(m.id(), 10000); + REQUIRE(actualResult.returnvalue() == 333); } } From b82189192d91684f07c292640b040d170693963b Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 6 Jul 2021 15:01:29 +0000 Subject: [PATCH 11/15] Small comment update --- src/scheduler/Executor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 59b66929f..d104487c0 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -236,7 +236,7 @@ void Executor::threadPoolThread(int threadPoolIdx) std::vector diffs = d.getDirtyPages(); sch.pushSnapshotDiffs(msg, diffs); - // Reset dirty page tracking now that we've got the diffs + // Reset dirty page tracking now that we've pushed the diffs faabric::util::resetDirtyTracking(); pendingSnapshotPush = false; } From d352faac8f8fc1ade8deb1e3b6be9dee02b1fbfe Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Wed, 7 Jul 2021 16:22:40 +0000 Subject: [PATCH 12/15] Started experimenting with other pagetable entries --- include/faabric/util/memory.h | 2 + src/util/memory.cpp | 29 +++++-- tests/test/snapshot/test_snapshot_dirty.cpp | 83 +++++++++++++++++++ .../test/snapshot/test_snapshot_registry.cpp | 5 +- 4 files changed, 111 insertions(+), 8 deletions(-) create mode 100644 tests/test/snapshot/test_snapshot_dirty.cpp diff --git a/include/faabric/util/memory.h b/include/faabric/util/memory.h index f87e71097..919c61516 100644 --- a/include/faabric/util/memory.h +++ b/include/faabric/util/memory.h @@ -38,4 +38,6 @@ AlignedChunk getPageAlignedChunk(long offset, long length); void resetDirtyTracking(); std::vector getDirtyPages(const uint8_t* ptr, int nPages); + +std::vector getDirtyPagesForMappedMemory(const uint8_t* ptr, int nPages); } diff --git a/src/util/memory.cpp b/src/util/memory.cpp index 9b9e8d8d0..52f111969 100644 --- a/src/util/memory.cpp +++ b/src/util/memory.cpp @@ -12,8 +12,13 @@ #define CLEAR_REFS "/proc/self/clear_refs" #define PAGEMAP "/proc/self/pagemap" +// See docs: https://www.kernel.org/doc/html/v5.4/admin-guide/mm/pagemap.html +// and source (grep PM_SOFT_DIRTY): +// https://github.com/torvalds/linux/blob/master/fs/proc/task_mmu.c #define PAGEMAP_ENTRY_BYTES 8 #define PAGEMAP_SOFT_DIRTY (1Ull << 55) +#define PAGEMAP_EXCLUSIVE_MAP (1Ull << 56) +#define PAGEMAP_FILE (1Ull << 61) namespace faabric::util { @@ -134,21 +139,33 @@ std::vector readPagemapEntries(uintptr_t ptr, int nEntries) return entries; } -std::vector getDirtyPages(const uint8_t* ptr, int nPages) +std::vector getPagemapFlags(const uint8_t* ptr, + int nPages, + uint64_t flag, + bool foundFlag) { - uintptr_t vptr = (uintptr_t)ptr; - // Get the pagemap entries + uintptr_t vptr = (uintptr_t)ptr; std::vector entries = readPagemapEntries(vptr, nPages); // Iterate through to get boolean flags - std::vector flags(nPages, false); + std::vector flags(nPages, !foundFlag); for (int i = 0; i < nPages; i++) { - if (entries.at(i) & PAGEMAP_SOFT_DIRTY) { - flags.at(i) = true; + if (entries.at(i) & flag) { + flags.at(i) = foundFlag; } } return flags; } + +std::vector getDirtyPagesForMappedMemory(const uint8_t* ptr, int nPages) +{ + return getPagemapFlags(ptr, nPages, PAGEMAP_FILE, false); +} + +std::vector getDirtyPages(const uint8_t* ptr, int nPages) +{ + return getPagemapFlags(ptr, nPages, PAGEMAP_SOFT_DIRTY, true); +} } diff --git a/tests/test/snapshot/test_snapshot_dirty.cpp b/tests/test/snapshot/test_snapshot_dirty.cpp new file mode 100644 index 000000000..e3dc4e07b --- /dev/null +++ b/tests/test/snapshot/test_snapshot_dirty.cpp @@ -0,0 +1,83 @@ +#include + +#include "faabric/util/snapshot.h" +#include "faabric_utils.h" + +#include + +#include +#include + +using namespace faabric::snapshot; + +namespace tests { + +static uint8_t* allocatePages(int nPages) +{ + return (uint8_t*)mmap(nullptr, + nPages * faabric::util::HOST_PAGE_SIZE, + PROT_WRITE, + MAP_SHARED | MAP_ANONYMOUS, + -1, + 0); +} + +static void deallocatePages(uint8_t* base, int nPages) +{ + munmap(base, nPages * faabric::util::HOST_PAGE_SIZE); +} + +TEST_CASE_METHOD(SnapshotTestFixture, + "Test snapshot dirty page checks", + "[snapshot]") +{ + + std::string snapKey = "dirty-check-snap"; + + int nPages = 100; + + faabric::util::SnapshotData snap; + snap.data = allocatePages(100); + snap.size = nPages * faabric::util::HOST_PAGE_SIZE; + reg.takeSnapshot(snapKey, snap, true); + + uint8_t* sharedMem = allocatePages(nPages); + + // Reset tracking before mapping + faabric::util::resetDirtyTracking(); + + // Map the memory + reg.mapSnapshot(snapKey, sharedMem); + + // Mapping will cause dirty pages on only the mapped region + std::vector snapDiffs = snap.getDirtyPages(); + REQUIRE(snapDiffs.empty()); + + std::vector sharedMemDiffs = + faabric::util::getDirtyPages(sharedMem, nPages); + REQUIRE(sharedMemDiffs.size() == nPages); + + // Reset standard dirty page check and check nothing changes + faabric::util::resetDirtyTracking(); + REQUIRE(snap.getDirtyPages().empty()); + REQUIRE(faabric::util::getDirtyPages(sharedMem, nPages).size() == nPages); + + // Check that the mapped region is "clean" + REQUIRE( + faabric::util::getDirtyPagesForMappedMemory(sharedMem, nPages).empty()); + + // Write to shared mem, check only shared mem is dirty + sharedMem[2 * faabric::util::HOST_PAGE_SIZE + 2] = 1; + sharedMem[5 * faabric::util::HOST_PAGE_SIZE + 2] = 1; + sharedMem[20 * faabric::util::HOST_PAGE_SIZE + 2] = 1; + + REQUIRE(snap.getDirtyPages().empty()); + + REQUIRE( + faabric::util::getDirtyPagesForMappedMemory(sharedMem, nPages).size() == + 3); + + // Tidy up + deallocatePages(sharedMem, nPages); +} +} diff --git a/tests/test/snapshot/test_snapshot_registry.cpp b/tests/test/snapshot/test_snapshot_registry.cpp index d5b08a25a..617e732d3 100644 --- a/tests/test/snapshot/test_snapshot_registry.cpp +++ b/tests/test/snapshot/test_snapshot_registry.cpp @@ -11,7 +11,7 @@ using namespace faabric::util; namespace tests { -uint8_t* allocatePages(int nPages) +static uint8_t* allocatePages(int nPages) { return (uint8_t*)mmap(nullptr, nPages * HOST_PAGE_SIZE, @@ -20,7 +20,8 @@ uint8_t* allocatePages(int nPages) -1, 0); } -void deallocatePages(uint8_t* base, int nPages) + +static void deallocatePages(uint8_t* base, int nPages) { munmap(base, nPages * HOST_PAGE_SIZE); } From aaca105d7311a013df82341641377eefa37048c5 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 12 Jul 2021 09:20:32 +0000 Subject: [PATCH 13/15] Reverting changes moved to other PR --- include/faabric/util/memory.h | 2 - src/util/memory.cpp | 31 ++----- tests/test/snapshot/test_snapshot_dirty.cpp | 83 ------------------- .../test/snapshot/test_snapshot_registry.cpp | 5 +- 4 files changed, 8 insertions(+), 113 deletions(-) delete mode 100644 tests/test/snapshot/test_snapshot_dirty.cpp diff --git a/include/faabric/util/memory.h b/include/faabric/util/memory.h index 919c61516..f87e71097 100644 --- a/include/faabric/util/memory.h +++ b/include/faabric/util/memory.h @@ -38,6 +38,4 @@ AlignedChunk getPageAlignedChunk(long offset, long length); void resetDirtyTracking(); std::vector getDirtyPages(const uint8_t* ptr, int nPages); - -std::vector getDirtyPagesForMappedMemory(const uint8_t* ptr, int nPages); } diff --git a/src/util/memory.cpp b/src/util/memory.cpp index 52f111969..7f756ec81 100644 --- a/src/util/memory.cpp +++ b/src/util/memory.cpp @@ -12,13 +12,8 @@ #define CLEAR_REFS "/proc/self/clear_refs" #define PAGEMAP "/proc/self/pagemap" -// See docs: https://www.kernel.org/doc/html/v5.4/admin-guide/mm/pagemap.html -// and source (grep PM_SOFT_DIRTY): -// https://github.com/torvalds/linux/blob/master/fs/proc/task_mmu.c #define PAGEMAP_ENTRY_BYTES 8 #define PAGEMAP_SOFT_DIRTY (1Ull << 55) -#define PAGEMAP_EXCLUSIVE_MAP (1Ull << 56) -#define PAGEMAP_FILE (1Ull << 61) namespace faabric::util { @@ -102,8 +97,6 @@ void resetDirtyTracking() throw std::runtime_error("Failed to write to clear_refs"); } - SPDLOG_TRACE("Reset dirty page tracking"); - fclose(fd); } @@ -139,33 +132,21 @@ std::vector readPagemapEntries(uintptr_t ptr, int nEntries) return entries; } -std::vector getPagemapFlags(const uint8_t* ptr, - int nPages, - uint64_t flag, - bool foundFlag) +std::vector getDirtyPages(const uint8_t* ptr, int nPages) { - // Get the pagemap entries uintptr_t vptr = (uintptr_t)ptr; + + // Get the pagemap entries std::vector entries = readPagemapEntries(vptr, nPages); // Iterate through to get boolean flags - std::vector flags(nPages, !foundFlag); + std::vector flags(nPages, false); for (int i = 0; i < nPages; i++) { - if (entries.at(i) & flag) { - flags.at(i) = foundFlag; + if (entries.at(i) & PAGEMAP_SOFT_DIRTY) { + flags.at(i) = true; } } return flags; } - -std::vector getDirtyPagesForMappedMemory(const uint8_t* ptr, int nPages) -{ - return getPagemapFlags(ptr, nPages, PAGEMAP_FILE, false); -} - -std::vector getDirtyPages(const uint8_t* ptr, int nPages) -{ - return getPagemapFlags(ptr, nPages, PAGEMAP_SOFT_DIRTY, true); -} } diff --git a/tests/test/snapshot/test_snapshot_dirty.cpp b/tests/test/snapshot/test_snapshot_dirty.cpp deleted file mode 100644 index e3dc4e07b..000000000 --- a/tests/test/snapshot/test_snapshot_dirty.cpp +++ /dev/null @@ -1,83 +0,0 @@ -#include - -#include "faabric/util/snapshot.h" -#include "faabric_utils.h" - -#include - -#include -#include - -using namespace faabric::snapshot; - -namespace tests { - -static uint8_t* allocatePages(int nPages) -{ - return (uint8_t*)mmap(nullptr, - nPages * faabric::util::HOST_PAGE_SIZE, - PROT_WRITE, - MAP_SHARED | MAP_ANONYMOUS, - -1, - 0); -} - -static void deallocatePages(uint8_t* base, int nPages) -{ - munmap(base, nPages * faabric::util::HOST_PAGE_SIZE); -} - -TEST_CASE_METHOD(SnapshotTestFixture, - "Test snapshot dirty page checks", - "[snapshot]") -{ - - std::string snapKey = "dirty-check-snap"; - - int nPages = 100; - - faabric::util::SnapshotData snap; - snap.data = allocatePages(100); - snap.size = nPages * faabric::util::HOST_PAGE_SIZE; - reg.takeSnapshot(snapKey, snap, true); - - uint8_t* sharedMem = allocatePages(nPages); - - // Reset tracking before mapping - faabric::util::resetDirtyTracking(); - - // Map the memory - reg.mapSnapshot(snapKey, sharedMem); - - // Mapping will cause dirty pages on only the mapped region - std::vector snapDiffs = snap.getDirtyPages(); - REQUIRE(snapDiffs.empty()); - - std::vector sharedMemDiffs = - faabric::util::getDirtyPages(sharedMem, nPages); - REQUIRE(sharedMemDiffs.size() == nPages); - - // Reset standard dirty page check and check nothing changes - faabric::util::resetDirtyTracking(); - REQUIRE(snap.getDirtyPages().empty()); - REQUIRE(faabric::util::getDirtyPages(sharedMem, nPages).size() == nPages); - - // Check that the mapped region is "clean" - REQUIRE( - faabric::util::getDirtyPagesForMappedMemory(sharedMem, nPages).empty()); - - // Write to shared mem, check only shared mem is dirty - sharedMem[2 * faabric::util::HOST_PAGE_SIZE + 2] = 1; - sharedMem[5 * faabric::util::HOST_PAGE_SIZE + 2] = 1; - sharedMem[20 * faabric::util::HOST_PAGE_SIZE + 2] = 1; - - REQUIRE(snap.getDirtyPages().empty()); - - REQUIRE( - faabric::util::getDirtyPagesForMappedMemory(sharedMem, nPages).size() == - 3); - - // Tidy up - deallocatePages(sharedMem, nPages); -} -} diff --git a/tests/test/snapshot/test_snapshot_registry.cpp b/tests/test/snapshot/test_snapshot_registry.cpp index 617e732d3..d5b08a25a 100644 --- a/tests/test/snapshot/test_snapshot_registry.cpp +++ b/tests/test/snapshot/test_snapshot_registry.cpp @@ -11,7 +11,7 @@ using namespace faabric::util; namespace tests { -static uint8_t* allocatePages(int nPages) +uint8_t* allocatePages(int nPages) { return (uint8_t*)mmap(nullptr, nPages * HOST_PAGE_SIZE, @@ -20,8 +20,7 @@ static uint8_t* allocatePages(int nPages) -1, 0); } - -static void deallocatePages(uint8_t* base, int nPages) +void deallocatePages(uint8_t* base, int nPages) { munmap(base, nPages * HOST_PAGE_SIZE); } From 3a3401b2492b2066ca9cd9b005fb7c83e50d374e Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 12 Jul 2021 13:19:48 +0000 Subject: [PATCH 14/15] Add missing subtype and context to chained batch call --- src/scheduler/Executor.cpp | 9 +++++---- src/scheduler/FunctionCallServer.cpp | 1 + src/scheduler/Scheduler.cpp | 2 ++ tests/test/scheduler/test_scheduler.cpp | 14 ++++++++++++++ 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index d104487c0..4b93d0034 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -106,8 +106,7 @@ void Executor::executeTasks(std::vector msgIdxs, if (isSnapshot && !alreadyRestored) { if ((!isMaster && isThreads) || !isThreads) { - SPDLOG_DEBUG( - "Performing snapshot restore {} [{}]", funcStr, snapshotKey); + SPDLOG_DEBUG("Restoring {} from snapshot {}", funcStr, snapshotKey); lastSnapshot = snapshotKey; restore(firstMsg); } else { @@ -211,8 +210,10 @@ void Executor::threadPoolThread(int threadPoolIdx) } catch (const std::exception& ex) { returnValue = 1; - msg.set_outputdata(fmt::format( - "Task {} threw exception. What: {}", msg.id(), ex.what())); + std::string errorMessage = fmt::format( + "Task {} threw exception. What: {}", msg.id(), ex.what()); + SPDLOG_ERROR(errorMessage); + msg.set_outputdata(errorMessage); } // Set the return value diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index dcede0518..5b33627ff 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -72,6 +72,7 @@ void FunctionCallServer::recvExecuteFunctions(const uint8_t* buffer, PARSE_MSG(faabric::BatchExecuteRequest, buffer, bufferSize) // This host has now been told to execute these functions no matter what + // TODO - avoid this copy scheduler.callFunctions(std::make_shared(msg), true); } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 7f025cbc9..4b13f1200 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -516,6 +516,8 @@ int Scheduler::scheduleFunctionsOnHost( faabric::util::batchExecFactory(); hostRequest->set_snapshotkey(req->snapshotkey()); hostRequest->set_type(req->type()); + hostRequest->set_subtype(req->subtype()); + hostRequest->set_contextdata(req->contextdata()); // Add messages int nOnThisHost = std::min(available, remainder); diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index a5f86b05d..62919831d 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -182,17 +182,25 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") { std::string expectedSnapshot; faabric::BatchExecuteRequest::BatchExecuteType execMode; + int32_t expectedSubType; + std::string expectedContextData; SECTION("Threads") { execMode = faabric::BatchExecuteRequest::THREADS; expectedSnapshot = "threadSnap"; + + expectedSubType = 123; + expectedContextData = "thread context"; } SECTION("Processes") { execMode = faabric::BatchExecuteRequest::PROCESSES; expectedSnapshot = "procSnap"; + + expectedSubType = 345; + expectedContextData = "proc context"; } SECTION("Functions") @@ -245,6 +253,8 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") std::shared_ptr reqOne = faabric::util::batchExecFactory("foo", "bar", nCallsOne); reqOne->set_type(execMode); + reqOne->set_subtype(expectedSubType); + reqOne->set_contextdata(expectedContextData); for (int i = 0; i < nCallsOne; i++) { // Set snapshot key @@ -307,9 +317,13 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") // Check the message is dispatched to the other host auto batchRequestsOne = faabric::scheduler::getBatchRequests(); REQUIRE(batchRequestsOne.size() == 1); + auto batchRequestOne = batchRequestsOne.at(0); REQUIRE(batchRequestOne.first == otherHost); REQUIRE(batchRequestOne.second->messages_size() == nCallsOffloadedOne); + REQUIRE(batchRequestOne.second->type() == execMode); + REQUIRE(batchRequestOne.second->subtype() == expectedSubType); + REQUIRE(batchRequestOne.second->contextdata() == expectedContextData); // Clear mocks faabric::scheduler::clearMockRequests(); From 037cf81fc3ca4295167716326977ba0d68e30693 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 12 Jul 2021 13:28:24 +0000 Subject: [PATCH 15/15] Force slots on dist test server --- tests/dist/scheduler/test_snapshots.cpp | 5 ++--- tests/dist/server.cpp | 6 ++++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index f408378ad..88285aecf 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -30,14 +30,13 @@ TEST_CASE_METHOD(DistTestsFixture, uint8_t* snapMemory = (uint8_t*)mmap( nullptr, snapSize, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + // Set up snapshot faabric::util::SnapshotData snap; snap.data = snapMemory; snap.size = snapSize; - reg.takeSnapshot(snapshotKey, snap); - // Invoke the function that ought to send back some snapshot diffs that - // should be applied + // Set up the message std::shared_ptr req = faabric::util::batchExecFactory(user, function, 1); req->set_type(faabric::BatchExecuteRequest::THREADS); diff --git a/tests/dist/server.cpp b/tests/dist/server.cpp index 64adbdda6..733111946 100644 --- a/tests/dist/server.cpp +++ b/tests/dist/server.cpp @@ -15,6 +15,12 @@ int main() faabric::transport::initGlobalMessageContext(); tests::initDistTests(); + int slots = 4; + SPDLOG_INFO("Forcing distributed test server to have {} slots", slots); + faabric::HostResources res; + res.set_slots(slots); + faabric::scheduler::getScheduler().setThisHostResources(res); + // WARNING: All 0MQ operations must be contained within their own scope so // that all sockets are destructed before the context is closed. {