From 55d8ddfee133c207e4ee88569cf77660ad2b98ab Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 20 Dec 2021 17:21:40 +0000 Subject: [PATCH 01/24] Client/ server for pushing snapshot merge regions --- include/faabric/snapshot/SnapshotClient.h | 11 ++- src/flat/faabric.fbs | 19 +++-- src/scheduler/Executor.cpp | 4 +- src/scheduler/Scheduler.cpp | 5 +- src/snapshot/SnapshotClient.cpp | 84 +++++++++++++++---- src/snapshot/SnapshotServer.cpp | 44 ++++++---- .../snapshot/test_snapshot_client_server.cpp | 83 +++++++++++++++--- 7 files changed, 199 insertions(+), 51 deletions(-) diff --git a/include/faabric/snapshot/SnapshotClient.h b/include/faabric/snapshot/SnapshotClient.h index 5c88b7cee..f840a76cb 100644 --- a/include/faabric/snapshot/SnapshotClient.h +++ b/include/faabric/snapshot/SnapshotClient.h @@ -38,9 +38,13 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient void pushSnapshot(const std::string& key, std::shared_ptr data); + void pushSnapshotUpdate( + std::string snapshotKey, + std::shared_ptr data, + const std::vector& diffs); + void pushSnapshotDiffs( std::string snapshotKey, - bool force, const std::vector& diffs); void deleteSnapshot(const std::string& key); @@ -49,5 +53,10 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient private: void sendHeader(faabric::snapshot::SnapshotCalls call); + + void doPushSnapshotDiffs( + const std::string& snapshotKey, + std::shared_ptr data, + const std::vector& diffs); }; } diff --git a/src/flat/faabric.fbs b/src/flat/faabric.fbs index 1f7a7ef5e..938d2160e 100644 --- a/src/flat/faabric.fbs +++ b/src/flat/faabric.fbs @@ -1,24 +1,33 @@ +table SnapshotMergeRegionRequest { + offset:int; + length:ulong; + data_type:int; + merge_op:int; +} + table SnapshotPushRequest { key:string; - maxSize:ulong; + max_size:ulong; contents:[ubyte]; + merge_regions:[SnapshotMergeRegionRequest]; } table SnapshotDeleteRequest { key:string; } -table SnapshotDiffChunk { +table SnapshotDiffRequest { offset:int; - dataType:int; - mergeOp:int; + data_type:int; + merge_op:int; data:[ubyte]; } table SnapshotDiffPushRequest { key:string; force:bool; - chunks:[SnapshotDiffChunk]; + merge_regions:[SnapshotMergeRegionRequest]; + diffs:[SnapshotDiffRequest]; } table ThreadResultRequest { diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index dbe5767d8..48451d790 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -313,14 +313,12 @@ void Executor::threadPoolThread(int threadPoolIdx) snap->queueDiffs(diffs); } else { + // Push diffs back to master sch.pushSnapshotDiffs(msg, diffs); // Reset dirty page tracking on non-master faabric::util::resetDirtyTracking(); } - - SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey()); - snap->clearMergeRegions(); } // If this batch is finished, reset the executor and release its claim. diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 403395e95..be2cfc09c 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -508,7 +508,8 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( std::vector snapshotDiffs = snapMemView.getDirtyRegions(); - c.pushSnapshotDiffs(snapshotKey, true, snapshotDiffs); + c.pushSnapshotUpdate( + snapshotKey, std::move(snap), snapshotDiffs); } else { c.pushSnapshot(snapshotKey, snap); pushedSnapshotsMap[snapshotKey].insert(host); @@ -916,7 +917,7 @@ void Scheduler::pushSnapshotDiffs( } SnapshotClient& c = getSnapshotClient(msg.masterhost()); - c.pushSnapshotDiffs(snapKey, false, diffs); + c.pushSnapshotDiffs(snapKey, diffs); } void Scheduler::setThreadResultLocally(uint32_t msgId, int32_t returnValue) diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index dd8108c4d..95c34a804 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -93,11 +93,24 @@ void SnapshotClient::pushSnapshot( // Set up the main request // TODO - avoid copying data here? flatbuffers::FlatBufferBuilder mb; + + std::vector> + mrsFbVector; + for (const auto& m : data->getMergeRegions()) { + auto mr = CreateSnapshotMergeRegionRequest(mb, + m.second.offset, + m.second.length, + m.second.dataType, + m.second.operation); + mrsFbVector.push_back(mr); + } + auto keyOffset = mb.CreateString(key); auto dataOffset = mb.CreateVector(data->getDataPtr(), data->getSize()); + auto mrsOffset = mb.CreateVector(mrsFbVector); auto requestOffset = CreateSnapshotPushRequest( - mb, keyOffset, data->getMaxSize(), dataOffset); + mb, keyOffset, data->getMaxSize(), dataOffset, mrsOffset); mb.Finish(requestOffset); // Send it @@ -105,39 +118,82 @@ void SnapshotClient::pushSnapshot( } } +void SnapshotClient::pushSnapshotUpdate( + std::string snapshotKey, + std::shared_ptr data, + const std::vector& diffs) +{ + SPDLOG_DEBUG("Pushing update to snapshot {} to {} ({} diffs, {} regions)", + snapshotKey, + host, + diffs.size(), + data->getMergeRegions().size()); + + doPushSnapshotDiffs(snapshotKey, data, diffs); +} + void SnapshotClient::pushSnapshotDiffs( std::string snapshotKey, - bool force, + const std::vector& diffs) +{ + SPDLOG_DEBUG("Pushing {} diffs for snapshot {} to {}", + diffs.size(), + snapshotKey, + host); + + doPushSnapshotDiffs(snapshotKey, nullptr, diffs); +} + +void SnapshotClient::doPushSnapshotDiffs( + const std::string& snapshotKey, + std::shared_ptr data, const std::vector& diffs) { if (faabric::util::isMockMode()) { faabric::util::UniqueLock lock(mockMutex); snapshotDiffPushes.emplace_back(host, diffs); } else { - SPDLOG_DEBUG("Pushing {} diffs for snapshot {} to {}", - diffs.size(), - snapshotKey, - host); - flatbuffers::FlatBufferBuilder mb; - // Create objects for all the chunks - std::vector> diffsFbVector; + // Create objects for all the diffs + std::vector> diffsFbVector; for (const auto& d : diffs) { std::span diffData = d.getData(); auto dataOffset = mb.CreateVector(diffData.data(), diffData.size()); - auto chunk = CreateSnapshotDiffChunk( + auto diff = CreateSnapshotDiffRequest( mb, d.getOffset(), d.getDataType(), d.getOperation(), dataOffset); - diffsFbVector.push_back(chunk); + diffsFbVector.push_back(diff); + } + + // If we have snapshot data, we need to include the merge regions and + // force too. + std::vector> + mrsFbVector; + bool force = false; + if (data != nullptr) { + for (const auto& m : data->getMergeRegions()) { + auto mr = CreateSnapshotMergeRegionRequest(mb, + m.second.offset, + m.second.length, + m.second.dataType, + m.second.operation); + mrsFbVector.push_back(mr); + } + + force = true; + } else { + force = false; } - // Set up the request auto keyOffset = mb.CreateString(snapshotKey); auto diffsOffset = mb.CreateVector(diffsFbVector); - auto requestOffset = - CreateSnapshotDiffPushRequest(mb, keyOffset, force, diffsOffset); + auto mrsOffset = mb.CreateVector(mrsFbVector); + + auto requestOffset = CreateSnapshotDiffPushRequest( + mb, keyOffset, force, mrsOffset, diffsOffset); + mb.Finish(requestOffset); SEND_FB_MSG(SnapshotCalls::PushSnapshotDiffs, mb); diff --git a/src/snapshot/SnapshotServer.cpp b/src/snapshot/SnapshotServer.cpp index 21a7e529c..47dd0ea12 100644 --- a/src/snapshot/SnapshotServer.cpp +++ b/src/snapshot/SnapshotServer.cpp @@ -76,7 +76,7 @@ std::unique_ptr SnapshotServer::recvPushSnapshot( SPDLOG_DEBUG("Receiving snapshot {} (size {}, max {})", r->key()->c_str(), r->contents()->size(), - r->maxSize()); + r->max_size()); faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); @@ -84,11 +84,20 @@ std::unique_ptr SnapshotServer::recvPushSnapshot( // Set up the snapshot size_t snapSize = r->contents()->size(); std::string snapKey = r->key()->str(); - auto d = std::make_shared( - std::span((uint8_t*)r->contents()->Data(), snapSize), r->maxSize()); + auto snap = std::make_shared( + std::span((uint8_t*)r->contents()->Data(), snapSize), r->max_size()); + + // Add the merge regions + for (const auto* mr : *r->merge_regions()) { + snap->addMergeRegion( + mr->offset(), + mr->length(), + static_cast(mr->data_type()), + static_cast(mr->merge_op())); + } // Register snapshot - reg.registerSnapshot(snapKey, d); + reg.registerSnapshot(snapKey, snap); // Send response return std::make_unique(); @@ -114,31 +123,38 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) flatbuffers::GetRoot(buffer); SPDLOG_DEBUG( - "Applying {} diffs to snapshot {}", r->chunks()->size(), r->key()->str()); + "Applying {} diffs to snapshot {}", r->diffs()->size(), r->key()->str()); // Get the snapshot faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); auto snap = reg.getSnapshot(r->key()->str()); - // Convert chunks to snapshot diff objects + // Convert diffs to snapshot diff objects std::vector diffs; - diffs.reserve(r->chunks()->size()); - for (const auto* chunk : *r->chunks()) { + diffs.reserve(r->diffs()->size()); + for (const auto* diff : *r->diffs()) { diffs.emplace_back( - static_cast(chunk->dataType()), - static_cast(chunk->mergeOp()), - chunk->offset(), - std::span(chunk->data()->data(), - chunk->data()->size())); + static_cast(diff->data_type()), + static_cast(diff->merge_op()), + diff->offset(), + std::span(diff->data()->data(), diff->data()->size())); } // Queue on the snapshot snap->queueDiffs(diffs); - // Write if necessary + // Write diffs and set merge regions if necessary if (r->force()) { snap->writeQueuedDiffs(); + + for (const auto* mr : *r->merge_regions()) { + snap->addMergeRegion( + mr->offset(), + mr->length(), + static_cast(mr->data_type()), + static_cast(mr->merge_op())); + } } // Send response diff --git a/tests/test/snapshot/test_snapshot_client_server.cpp b/tests/test/snapshot/test_snapshot_client_server.cpp index 25abd5545..0acd747e0 100644 --- a/tests/test/snapshot/test_snapshot_client_server.cpp +++ b/tests/test/snapshot/test_snapshot_client_server.cpp @@ -80,9 +80,20 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, std::vector dataA(snapSizeA, 1); std::vector dataB(snapSizeB, 2); + // Set up snapshots auto snapA = std::make_shared(dataA); auto snapB = std::make_shared(dataB); + // Add merge regions to one + std::vector mergeRegions = { + { 123, 1234, SnapshotDataType::Int, SnapshotMergeOperation::Sum }, + { 345, 3456, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite } + }; + + for (const auto& m : mergeRegions) { + snapA->addMergeRegion(m.offset, m.length, m.dataType, m.operation); + } + REQUIRE(reg.getSnapshotCount() == 0); // Send the messages @@ -97,6 +108,21 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, REQUIRE(actualA->getSize() == snapA->getSize()); REQUIRE(actualB->getSize() == snapB->getSize()); + // Check merge regions + REQUIRE(actualA->getMergeRegions().size() == mergeRegions.size()); + REQUIRE(actualB->getMergeRegions().empty()); + + for (int i = 0; i < mergeRegions.size(); i++) { + SnapshotMergeRegion expected = mergeRegions.at(i); + SnapshotMergeRegion actual = snapA->getMergeRegions()[expected.offset]; + + REQUIRE(actual.offset == expected.offset); + REQUIRE(actual.dataType == expected.dataType); + REQUIRE(actual.length == expected.length); + REQUIRE(actual.operation == expected.operation); + } + + // Check data contents std::vector actualDataA = actualA->getDataCopy(); std::vector actualDataB = actualB->getDataCopy(); @@ -134,6 +160,20 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, // Set up the snapshot reg.registerSnapshot(snapKey, snap); + // Set up another snapshot with some merge regions to check they're added + // on an update + auto otherSnap = + std::make_shared(initialSnapSize, expandedSnapSize); + + std::vector mergeRegions = { + { 123, 1234, SnapshotDataType::Int, SnapshotMergeOperation::Sum }, + { 345, 3456, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite } + }; + + for (const auto& m : mergeRegions) { + otherSnap->addMergeRegion(m.offset, m.length, m.dataType, m.operation); + } + // Set up some diffs for the initial request uint32_t offsetA1 = 5; uint32_t offsetA2 = 2 * HOST_PAGE_SIZE; @@ -153,7 +193,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, diffDataA2); std::vector diffsA = { diffA1, diffA2 }; - cli.pushSnapshotDiffs(snapKey, false, diffsA); + cli.pushSnapshotDiffs(snapKey, diffsA); REQUIRE(snap->getQueuedDiffsCount() == 2); // Submit some more diffs, some larger than the original snapshot (to check @@ -183,22 +223,41 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, std::vector diffsB = { diffB1, diffB2, diffB3 }; - bool force = false; - SECTION("Force") { force = true; } - - SECTION("Don't force") { force = false; } - - // Make the request - cli.pushSnapshotDiffs(snapKey, force, diffsB); + SECTION("Full update") + { + // Make the request + cli.pushSnapshotUpdate(snapKey, otherSnap, diffsB); - if (force) { // Check nothing queued REQUIRE(snap->getQueuedDiffsCount() == 0); - } else { + + // Check merge regions from other snap pushed + REQUIRE(snap->getMergeRegions().size() == mergeRegions.size()); + + for (int i = 0; i < mergeRegions.size(); i++) { + SnapshotMergeRegion expected = mergeRegions.at(i); + SnapshotMergeRegion actual = + snap->getMergeRegions()[expected.offset]; + + REQUIRE(actual.offset == expected.offset); + REQUIRE(actual.dataType == expected.dataType); + REQUIRE(actual.length == expected.length); + REQUIRE(actual.operation == expected.operation); + } + } + + SECTION("Just diffs") + { + // Make the request + cli.pushSnapshotDiffs(snapKey, diffsB); + // Check and write queued diffs REQUIRE(snap->getQueuedDiffsCount() == 5); snap->writeQueuedDiffs(); + + // Check no merge regions sent + REQUIRE(snap->getMergeRegions().empty()); } // Check diffs have been applied @@ -243,7 +302,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, size_t originalDiffsApplied = snap->getQueuedDiffsCount(); diffs = { diffA1, diffA2 }; - cli.pushSnapshotDiffs(snapKey, false, diffs); + cli.pushSnapshotDiffs(snapKey, diffs); // Ensure the right number of diffs is applied REQUIRE(snap->getQueuedDiffsCount() == originalDiffsApplied + 2); @@ -358,7 +417,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, size_t originalDiffsApplied = snap->getQueuedDiffsCount(); std::vector diffs = { diff }; - cli.pushSnapshotDiffs(snapKey, false, diffs); + cli.pushSnapshotDiffs(snapKey, diffs); // Ensure the right number of diffs is applied REQUIRE(snap->getQueuedDiffsCount() == originalDiffsApplied + 1); From ff0aaafd6c01bbc8b5b0ac8ec431045040978878 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 20 Dec 2021 17:36:11 +0000 Subject: [PATCH 02/24] Reinstate clearing of merge regions --- src/scheduler/Executor.cpp | 3 +++ src/snapshot/SnapshotServer.cpp | 2 ++ 2 files changed, 5 insertions(+) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 48451d790..69da7b652 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -319,6 +319,9 @@ void Executor::threadPoolThread(int threadPoolIdx) // Reset dirty page tracking on non-master faabric::util::resetDirtyTracking(); } + + SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey()); + snap->clearMergeRegions(); } // If this batch is finished, reset the executor and release its claim. diff --git a/src/snapshot/SnapshotServer.cpp b/src/snapshot/SnapshotServer.cpp index 47dd0ea12..a466e8f9a 100644 --- a/src/snapshot/SnapshotServer.cpp +++ b/src/snapshot/SnapshotServer.cpp @@ -146,8 +146,10 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) // Write diffs and set merge regions if necessary if (r->force()) { + // Write queued diffs snap->writeQueuedDiffs(); + // Add merge regions from request for (const auto* mr : *r->merge_regions()) { snap->addMergeRegion( mr->offset(), From 02f7daad7ac727330b452d9210347027235ce876 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 21 Dec 2021 10:52:27 +0000 Subject: [PATCH 03/24] Dist tests and test --- tests/dist/scheduler/functions.cpp | 85 +++++++++++-------------- tests/dist/scheduler/test_snapshots.cpp | 10 ++- tests/test/scheduler/test_scheduler.cpp | 14 ++++ 3 files changed, 60 insertions(+), 49 deletions(-) diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index f0b755767..c2b4f5140 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -72,20 +72,13 @@ int handleFakeDiffsFunction(tests::DistTestExecutor* exec, auto originalSnap = reg.getSnapshot(snapshotKey); - // Add a single merge region to catch both diffs - int offsetA = 10; - int offsetB = 100; std::vector inputBytes = faabric::util::stringToBytes(msgInput); - originalSnap->addMergeRegion( - 0, - offsetB + inputBytes.size() + 10, - faabric::util::SnapshotDataType::Raw, - faabric::util::SnapshotMergeOperation::Overwrite); - // Modify the executor's memory std::vector keyBytes = faabric::util::stringToBytes(snapshotKey); + int offsetA = 10; + int offsetB = 100; std::memcpy(exec->getDummyMemory().data() + offsetA, keyBytes.data(), keyBytes.size()); @@ -136,6 +129,16 @@ int handleFakeDiffsThreadedFunction( std::vector localChange(3, i); int offset = 2 * i * faabric::util::HOST_PAGE_SIZE; snap->copyInData(localChange, offset); + + // Make sure changes made by this message are covered by a merge + // region + int regionOffset = 2 * i * faabric::util::HOST_PAGE_SIZE; + int regionLength = 20 + msg.inputdata().size(); + snap->addMergeRegion( + regionOffset, + regionLength, + faabric::util::SnapshotDataType::Raw, + faabric::util::SnapshotMergeOperation::Overwrite); } // Dispatch the message, expecting them all to execute on other hosts @@ -201,7 +204,6 @@ int handleFakeDiffsThreadedFunction( } else { // This is the code that will be executed by the remote threads. - // Add a merge region to catch the modification int idx = msg.appidx(); int regionOffset = 2 * idx * faabric::util::HOST_PAGE_SIZE; @@ -211,16 +213,6 @@ int handleFakeDiffsThreadedFunction( std::vector inputBytes = faabric::util::stringToBytes(msgInput); - auto originalSnap = reg.getSnapshot(snapshotKey); - - // Make sure it's captured by the region - int regionLength = 20 + inputBytes.size(); - originalSnap->addMergeRegion( - regionOffset, - regionLength, - faabric::util::SnapshotDataType::Raw, - faabric::util::SnapshotMergeOperation::Overwrite); - // Now modify the memory std::memcpy(exec->getDummyMemory().data() + changeOffset, inputBytes.data(), @@ -258,18 +250,18 @@ int handleReductionFunction(tests::DistTestExecutor* exec, uint32_t reductionBOffset = 2 * HOST_PAGE_SIZE; uint32_t arrayOffset = HOST_PAGE_SIZE + 10 * sizeof(int32_t); - // Initialise message bool isThread = req->type() == faabric::BatchExecuteRequest::THREADS; - // Set up snapshot - std::string snapKey = "dist-reduction-" + std::to_string(generateGid()); - std::shared_ptr snap = - std::make_shared(snapSize); - reg.registerSnapshot(snapKey, snap); - // Main function will set up the snapshot and merge regions, while the child // threads will modify an array and perform a reduction operation if (!isThread) { + // Set up snapshot + std::string snapKey = "dist-reduction-" + std::to_string(generateGid()); + std::shared_ptr snap = + std::make_shared(snapSize); + reg.registerSnapshot(snapKey, snap); + + // Perform operations in a loop for (int r = 0; r < nRepeats; r++) { // Set up thread request auto req = faabric::util::batchExecFactory( @@ -285,6 +277,25 @@ int handleReductionFunction(tests::DistTestExecutor* exec, m.set_appidx(i); } + // Set merge regions + snap->addMergeRegion(reductionAOffset, + sizeof(int32_t), + SnapshotDataType::Int, + SnapshotMergeOperation::Sum, + true); + + snap->addMergeRegion(reductionBOffset, + sizeof(int32_t), + SnapshotDataType::Int, + SnapshotMergeOperation::Sum, + true); + + snap->addMergeRegion(arrayOffset, + sizeof(int32_t) * nThreads, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite, + true); + // Make the request faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); @@ -386,26 +397,6 @@ int handleReductionFunction(tests::DistTestExecutor* exec, uint32_t thisIdx = msg.appidx(); uint8_t* thisArrayPtr = arrayPtr + (sizeof(int32_t) * thisIdx); - // Set merge regions - std::shared_ptr snap = reg.getSnapshot(msg.snapshotkey()); - snap->addMergeRegion(reductionAOffset, - sizeof(int32_t), - SnapshotDataType::Int, - SnapshotMergeOperation::Sum, - true); - - snap->addMergeRegion(reductionBOffset, - sizeof(int32_t), - SnapshotDataType::Int, - SnapshotMergeOperation::Sum, - true); - - snap->addMergeRegion(arrayOffset, - sizeof(int32_t) * nThreads, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite, - true); - // Lock group locally while doing reduction std::shared_ptr group = faabric::transport::PointToPointGroup::getGroup(groupId); diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index 1d983deda..794f2d120 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -26,11 +26,18 @@ TEST_CASE_METHOD(DistTestsFixture, std::string user = "snapshots"; std::string function = "fake-diffs"; std::string snapshotKey = "dist-snap-check"; + std::vector inputData = { 0, 1, 2, 3, 4, 5, 6 }; + // Set up snapshot size_t snapSize = 2 * faabric::util::HOST_PAGE_SIZE; auto snap = std::make_shared(snapSize); - // Set up snapshot + // Add a merge region to catch all changes + snap->addMergeRegion(0, + snapSize, + faabric::util::SnapshotDataType::Raw, + faabric::util::SnapshotMergeOperation::Overwrite); + reg.registerSnapshot(snapshotKey, snap); // Set up the message @@ -40,7 +47,6 @@ TEST_CASE_METHOD(DistTestsFixture, // Set up some input data faabric::Message& m = req->mutable_messages()->at(0); - std::vector inputData = { 0, 1, 2, 3, 4, 5, 6 }; m.set_inputdata(inputData.data(), inputData.size()); m.set_snapshotkey(snapshotKey); diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 98820cfca..8cc3b23f9 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -226,8 +226,20 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") faabric::snapshot::getSnapshotRegistry(); std::unique_ptr snapshotDataAllocation; + std::vector snapshotMergeRegions; if (!expectedSnapshot.empty()) { auto snap = std::make_shared(1234); + + snap->addMergeRegion(123, + 1234, + faabric::util::SnapshotDataType::Int, + faabric::util::SnapshotMergeOperation::Sum); + + snap->addMergeRegion(345, + 3456, + faabric::util::SnapshotDataType::Raw, + faabric::util::SnapshotMergeOperation::Overwrite); + snapRegistry.registerSnapshot(expectedSnapshot, snap); } @@ -305,6 +317,8 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") REQUIRE(pushedSnapshot.first == otherHost); REQUIRE(pushedSnapshot.second->getSize() == snapshot->getSize()); REQUIRE(pushedSnapshot.second->getDataPtr() == snapshot->getDataPtr()); + REQUIRE(pushedSnapshot.second->getMergeRegions().size() == + snapshot->getMergeRegions().size()); } // Check the executor counts on this host From 5355bf9da793ec5d340a42993902e502da9b6fa0 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 21 Dec 2021 11:28:23 +0000 Subject: [PATCH 04/24] Add reserves where possible --- src/snapshot/SnapshotClient.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index 95c34a804..2ce472417 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -96,6 +96,7 @@ void SnapshotClient::pushSnapshot( std::vector> mrsFbVector; + mrsFbVector.reserve(data->getMergeRegions().size()); for (const auto& m : data->getMergeRegions()) { auto mr = CreateSnapshotMergeRegionRequest(mb, m.second.offset, @@ -171,6 +172,7 @@ void SnapshotClient::doPushSnapshotDiffs( // force too. std::vector> mrsFbVector; + mrsFbVector.reserve(data->getMergeRegions().size()); bool force = false; if (data != nullptr) { for (const auto& m : data->getMergeRegions()) { From 2066535dde68faf0037c8d2705656779e27d305f Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 21 Dec 2021 12:57:21 +0000 Subject: [PATCH 05/24] Support more merge ops --- include/faabric/util/snapshot.h | 101 ++++++++- src/snapshot/SnapshotClient.cpp | 2 +- src/util/snapshot.cpp | 388 ++++++++++++-------------------- 3 files changed, 247 insertions(+), 244 deletions(-) diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index 7cd6e9130..7afcf3363 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -17,7 +17,10 @@ namespace faabric::util { enum SnapshotDataType { Raw, - Int + Bool, + Int, + Float, + Double }; enum SnapshotMergeOperation @@ -66,13 +69,101 @@ class SnapshotMergeRegion SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite; void addDiffs(std::vector& diffs, - const uint8_t* original, - uint32_t originalSize, + std::span original, const uint8_t* updated, - uint32_t dirtyRegionStart, - uint32_t dirtyRegionEnd); + std::pair dirtyRange); + + private: + void addOverwriteDiff(std::vector& diffs, + std::span original, + const uint8_t* updated, + std::pair dirtyRange); + + void addMergeDiff(std::vector& diffs, + std::span original, + const uint8_t* updated, + std::pair dirtyRange); }; +template +T calculateDiffValue(const uint8_t* original, + const uint8_t* updated, + SnapshotMergeOperation operation) +{ + // Cast to value + T updatedValue = unalignedRead(updated); + T originalValue = unalignedRead(original); + + // Skip if no change + if (originalValue == updatedValue) { + return; + } + + // Work out final result + switch (operation) { + case (SnapshotMergeOperation::Sum): { + // Sums must send the value to be _added_, and + // not the final result + updatedValue -= originalValue; + break; + } + case (SnapshotMergeOperation::Subtract): { + // Subtractions must send the value to be + // subtracted, not the result + updatedValue = originalValue - updatedValue; + break; + } + case (SnapshotMergeOperation::Product): { + // Products must send the value to be + // multiplied, not the result + updatedValue /= originalValue; + break; + } + case (SnapshotMergeOperation::Max): + case (SnapshotMergeOperation::Min): + // Min and max don't need to change + break; + default: { + SPDLOG_ERROR("Can't calculate diff for operation: {}", operation); + throw std::runtime_error("Can't calculate diff"); + } + } + + return updatedValue; +} + +template +T applyDiffValue(const uint8_t* original, + const uint8_t* diff, + SnapshotMergeOperation operation) +{ + + auto diffValue = unalignedRead(diff); + T originalValue = unalignedRead(original); + + switch (operation) { + case (SnapshotMergeOperation::Sum): { + return diffValue + originalValue; + } + case (SnapshotMergeOperation::Subtract): { + return originalValue - diffValue; + } + case (SnapshotMergeOperation::Product): { + return originalValue * diffValue; + } + case (SnapshotMergeOperation::Max): { + return std::max(originalValue, diffValue); + } + case (SnapshotMergeOperation::Min): { + return std::min(originalValue, diffValue); + } + default: { + SPDLOG_ERROR("Can't apply merge operation: {}", operation); + throw std::runtime_error("Can't apply merge operation"); + } + } +} + class SnapshotData { public: diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index 2ce472417..5ef23b2dd 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -172,9 +172,9 @@ void SnapshotClient::doPushSnapshotDiffs( // force too. std::vector> mrsFbVector; - mrsFbVector.reserve(data->getMergeRegions().size()); bool force = false; if (data != nullptr) { + mrsFbVector.reserve(data->getMergeRegions().size()); for (const auto& m : data->getMergeRegions()) { auto mr = CreateSnapshotMergeRegionRequest(mb, m.second.offset, diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 16e6db1fe..b3c0cfc86 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -232,94 +232,44 @@ void SnapshotData::writeQueuedDiffs() // Iterate through diffs for (auto& diff : queuedDiffs) { + if (diff.getOperation() == + faabric::util::SnapshotMergeOperation::Overwrite) { + + SPDLOG_TRACE("Copying snapshot diff into {}-{}", + diff.getOffset(), + diff.getOffset() + diff.getData().size()); + + writeData(diff.getData(), diff.getOffset()); + } + switch (diff.getDataType()) { - case (faabric::util::SnapshotDataType::Raw): { - switch (diff.getOperation()) { - case (faabric::util::SnapshotMergeOperation::Overwrite): { - SPDLOG_TRACE("Copying raw snapshot diff into {}-{}", - diff.getOffset(), - diff.getOffset() + diff.getData().size()); - - writeData(diff.getData(), diff.getOffset()); - break; - } - default: { - SPDLOG_ERROR("Unsupported raw merge operation: {}", - diff.getOperation()); - throw std::runtime_error( - "Unsupported raw merge operation"); - } - } - break; - } case (faabric::util::SnapshotDataType::Int): { - auto diffValue = unalignedRead(diff.getData().data()); - - auto original = faabric::util::unalignedRead( - validatedOffsetPtr(diff.getOffset())); - - int32_t finalValue = 0; - switch (diff.getOperation()) { - case (faabric::util::SnapshotMergeOperation::Sum): { - finalValue = original + diffValue; - - SPDLOG_TRACE("Applying int sum diff {}: {} = {} + {}", - diff.getOffset(), - finalValue, - original, - diffValue); - break; - } - case (faabric::util::SnapshotMergeOperation::Subtract): { - finalValue = original - diffValue; - - SPDLOG_TRACE("Applying int sub diff {}: {} = {} - {}", - diff.getOffset(), - finalValue, - original, - diffValue); - break; - } - case (faabric::util::SnapshotMergeOperation::Product): { - finalValue = original * diffValue; - - SPDLOG_TRACE("Applying int mult diff {}: {} = {} * {}", - diff.getOffset(), - finalValue, - original, - diffValue); - break; - } - case (faabric::util::SnapshotMergeOperation::Min): { - finalValue = std::min(original, diffValue); - - SPDLOG_TRACE("Applying int min diff {}: min({}, {})", - diff.getOffset(), - original, - diffValue); - break; - } - case (faabric::util::SnapshotMergeOperation::Max): { - finalValue = std::max(original, diffValue); - - SPDLOG_TRACE("Applying int max diff {}: max({}, {})", - diff.getOffset(), - original, - diffValue); - break; - } - default: { - SPDLOG_ERROR("Unsupported int merge operation: {}", - diff.getOperation()); - throw std::runtime_error( - "Unsupported int merge operation"); - } - } - + int32_t finalValue = + applyDiffValue(validatedOffsetPtr(diff.getOffset()), + diff.getData().data(), + diff.getOperation()); writeData({ BYTES(&finalValue), sizeof(int32_t) }, diff.getOffset()); break; } + case (faabric::util::SnapshotDataType::Float): { + float finalValue = + applyDiffValue(validatedOffsetPtr(diff.getOffset()), + diff.getData().data(), + diff.getOperation()); + writeData({ BYTES(&finalValue), sizeof(float) }, + diff.getOffset()); + break; + } + case (faabric::util::SnapshotDataType::Double): { + double finalValue = + applyDiffValue(validatedOffsetPtr(diff.getOffset()), + diff.getData().data(), + diff.getOperation()); + writeData({ BYTES(&finalValue), sizeof(double) }, + diff.getOffset()); + break; + } default: { SPDLOG_ERROR("Unsupported data type: {}", diff.getDataType()); throw std::runtime_error("Unsupported merge data type"); @@ -398,11 +348,9 @@ std::vector MemoryView::diffWithSnapshot( for (auto& dirtyRegion : dirtyRegions) { // Add the diffs mr.addDiffs(diffs, - snap->getDataPtr(), - snap->getSize(), + { snap->getDataPtr(), snap->getSize() }, data.data(), - dirtyRegion.first, - dirtyRegion.second); + dirtyRegion); } } @@ -415,9 +363,18 @@ std::string snapshotDataTypeStr(SnapshotDataType dt) case (SnapshotDataType::Raw): { return "Raw"; } + case (SnapshotDataType::Bool): { + return "Bool"; + } case (SnapshotDataType::Int): { return "Int"; } + case (SnapshotDataType::Float): { + return "Float"; + } + case (SnapshotDataType::Double): { + return "Double"; + } default: { SPDLOG_ERROR("Cannot convert snapshot data type to string: {}", dt); throw std::runtime_error("Cannot convert data type to string"); @@ -453,19 +410,84 @@ std::string snapshotMergeOpStr(SnapshotMergeOperation op) } } +void SnapshotMergeRegion::addOverwriteDiff( + std::vector& diffs, + std::span original, + const uint8_t* updated, + std::pair dirtyRange) +{ + auto operation = SnapshotMergeOperation::Overwrite; + + // Work out bounds of region we're checking + uint32_t checkStart = std::max(dirtyRange.first, offset); + + uint32_t checkEnd; + if (length == 0) { + checkEnd = dirtyRange.second; + } else { + checkEnd = std::min(dirtyRange.second, offset + length); + } + + bool diffInProgress = false; + int diffStart = 0; + for (int b = checkStart; b <= checkEnd; b++) { + // If this byte is outside the original region, we can't + // compare (i.e. always dirty) + bool isDirtyByte = + (b > original.size()) || (*(original.data() + b) != *(updated + b)); + + if (isDirtyByte && !diffInProgress) { + // Diff starts here if it's different and diff + // not in progress + diffInProgress = true; + diffStart = b; + } else if (!isDirtyByte && diffInProgress) { + // Diff ends if it's not different and diff is + // in progress + int diffLength = b - diffStart; + SPDLOG_TRACE("Found {} overwrite diff at {}-{}", + snapshotDataTypeStr(dataType), + diffStart, + diffStart + diffLength); + + diffInProgress = false; + diffs.emplace_back( + dataType, + operation, + diffStart, + std::span(updated + diffStart, diffLength)); + } + } + + // If we've reached the end of this region with a diff + // in progress, we need to close it off + if (diffInProgress) { + int finalDiffLength = checkEnd - diffStart; + SPDLOG_TRACE("Adding {} {} diff at {}-{} (end of region)", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation), + diffStart, + diffStart + finalDiffLength); + + diffs.emplace_back( + dataType, + operation, + diffStart, + std::span(updated + diffStart, finalDiffLength)); + } +} + void SnapshotMergeRegion::addDiffs(std::vector& diffs, - const uint8_t* original, - uint32_t originalSize, + std::span original, const uint8_t* updated, - uint32_t dirtyRegionStart, - uint32_t dirtyRegionEnd) + std::pair dirtyRange) { // If the region has zero length, it signifies that it goes to the // end of the memory, so we go all the way to the end of the dirty region. // For all other regions, we just check if the dirty range is within the // merge region. - bool isInRange = (dirtyRegionEnd > offset) && - ((length == 0) || (dirtyRegionStart < offset + length)); + bool isInRange = (dirtyRange.second > offset) && + ((length == 0) || (dirtyRange.first < offset + length)); if (!isInRange) { return; @@ -476,162 +498,52 @@ void SnapshotMergeRegion::addDiffs(std::vector& diffs, snapshotMergeOpStr(operation), offset, offset + length, - dirtyRegionStart, - dirtyRegionEnd); - - switch (dataType) { - case (SnapshotDataType::Int): { - // Check if the value has changed - const uint8_t* updatedValue = updated + offset; - int updatedInt = *(reinterpret_cast(updatedValue)); - - if (originalSize < offset) { - throw std::runtime_error( - "Do not support int operations outside original snapshot"); - } - - const uint8_t* originalValue = original + offset; - int originalInt = *(reinterpret_cast(originalValue)); - - // Skip if no change - if (originalInt == updatedInt) { - return; - } + dirtyRange.first, + dirtyRange.second); - // Potentially modify the original in place depending on the - // operation - switch (operation) { - case (SnapshotMergeOperation::Sum): { - // Sums must send the value to be _added_, and - // not the final result - updatedInt -= originalInt; - break; - } - case (SnapshotMergeOperation::Subtract): { - // Subtractions must send the value to be - // subtracted, not the result - updatedInt = originalInt - updatedInt; - break; - } - case (SnapshotMergeOperation::Product): { - // Products must send the value to be - // multiplied, not the result - updatedInt /= originalInt; - break; - } - case (SnapshotMergeOperation::Max): - case (SnapshotMergeOperation::Min): - // Min and max don't need to change - break; - default: { - SPDLOG_ERROR("Unhandled integer merge operation: {}", - operation); - throw std::runtime_error( - "Unhandled integer merge operation"); - } - } - - // TODO - somehow avoid casting away the const here? - // Modify the memory in-place here - std::memcpy( - (uint8_t*)updatedValue, BYTES(&updatedInt), sizeof(int32_t)); + if (operation == SnapshotMergeOperation::Overwrite) { + addOverwriteDiff(diffs, original, updated, dirtyRange); + return; + } - // Add the diff - diffs.emplace_back(dataType, - operation, - offset, - std::span(updatedValue, length)); + if (original.size() < offset) { + throw std::runtime_error( + "Do not support non-overwrite operations outside original snapshot"); + } - SPDLOG_TRACE("Found {} {} diff at {}-{} ({})", - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - offset, - offset + length, - updatedInt); + uint8_t* updatedValue = (uint8_t*)updated + offset; + const uint8_t* originalValue = original.data() + offset; + switch (dataType) { + case (SnapshotDataType::Int): { + int merged = + calculateDiffValue(originalValue, updatedValue, operation); + unalignedWrite(merged, updatedValue); break; } - case (SnapshotDataType::Raw): { - switch (operation) { - case (SnapshotMergeOperation::Overwrite): { - // Work out bounds of region we're checking - uint32_t checkStart = - std::max(dirtyRegionStart, offset); - - uint32_t checkEnd; - if (length == 0) { - checkEnd = dirtyRegionEnd; - } else { - checkEnd = - std::min(dirtyRegionEnd, offset + length); - } - - bool diffInProgress = false; - int diffStart = 0; - for (int b = checkStart; b <= checkEnd; b++) { - // If this byte is outside the original region, we can't - // compare (i.e. always dirty) - bool isDirtyByte = (b > originalSize) || - (*(original + b) != *(updated + b)); - - if (isDirtyByte && !diffInProgress) { - // Diff starts here if it's different and diff - // not in progress - diffInProgress = true; - diffStart = b; - } else if (!isDirtyByte && diffInProgress) { - // Diff ends if it's not different and diff is - // in progress - int diffLength = b - diffStart; - SPDLOG_TRACE("Found {} {} diff at {}-{}", - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - diffStart, - diffStart + diffLength); - - diffInProgress = false; - diffs.emplace_back( - dataType, - operation, - diffStart, - std::span(updated + diffStart, - diffLength)); - } - } - - // If we've reached the end of this region with a diff - // in progress, we need to close it off - if (diffInProgress) { - int finalDiffLength = checkEnd - diffStart; - SPDLOG_TRACE( - "Adding {} {} diff at {}-{} (end of region)", - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - diffStart, - diffStart + finalDiffLength); - - diffs.emplace_back( - dataType, - operation, - diffStart, - std::span(updated + diffStart, - finalDiffLength)); - } - break; - } - default: { - SPDLOG_ERROR("Unhandled raw merge operation: {}", - operation); - throw std::runtime_error("Unhandled raw merge operation"); - } - } - + case (SnapshotDataType::Float): { + float merged = + calculateDiffValue(originalValue, updatedValue, operation); + unalignedWrite(merged, updatedValue); + break; + } + case (SnapshotDataType::Double): { + double merged = calculateDiffValue( + originalValue, updatedValue, operation); + unalignedWrite(merged, updatedValue); break; } default: { - SPDLOG_ERROR("Merge region for unhandled data type: {}", dataType); - throw std::runtime_error("Merge region for unhandled data type"); + SPDLOG_ERROR( + "Unsupported merge op combination {} {}", dataType, operation); + throw std::runtime_error("Unsupported merge op combination"); } } + + // Add the diff + diffs.emplace_back(dataType, + operation, + offset, + std::span(updatedValue, length)); } } From 2fe3cc0bc22893e64db4de808233bb81baa2d41c Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 21 Dec 2021 14:47:12 +0000 Subject: [PATCH 06/24] Tests for more datatypes --- include/faabric/util/snapshot.h | 22 +++-- src/util/snapshot.cpp | 45 +++++---- tests/test/util/test_snapshot.cpp | 159 ++++++++++++++++++++++++++++-- 3 files changed, 187 insertions(+), 39 deletions(-) diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index 7afcf3363..77256c664 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -69,8 +69,8 @@ class SnapshotMergeRegion SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite; void addDiffs(std::vector& diffs, - std::span original, - const uint8_t* updated, + std::span originalData, + const uint8_t* updatedData, std::pair dirtyRange); private: @@ -86,9 +86,9 @@ class SnapshotMergeRegion }; template -T calculateDiffValue(const uint8_t* original, - const uint8_t* updated, - SnapshotMergeOperation operation) +inline bool calculateDiffValue(const uint8_t* original, + uint8_t* updated, + SnapshotMergeOperation operation) { // Cast to value T updatedValue = unalignedRead(updated); @@ -96,7 +96,7 @@ T calculateDiffValue(const uint8_t* original, // Skip if no change if (originalValue == updatedValue) { - return; + return false; } // Work out final result @@ -129,13 +129,15 @@ T calculateDiffValue(const uint8_t* original, } } - return updatedValue; + unalignedWrite(updatedValue, updated); + + return true; } template -T applyDiffValue(const uint8_t* original, - const uint8_t* diff, - SnapshotMergeOperation operation) +inline T applyDiffValue(const uint8_t* original, + const uint8_t* diff, + SnapshotMergeOperation operation) { auto diffValue = unalignedRead(diff); diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index b3c0cfc86..f61d3867c 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -240,6 +240,8 @@ void SnapshotData::writeQueuedDiffs() diff.getOffset() + diff.getData().size()); writeData(diff.getData(), diff.getOffset()); + + continue; } switch (diff.getDataType()) { @@ -255,8 +257,8 @@ void SnapshotData::writeQueuedDiffs() case (faabric::util::SnapshotDataType::Float): { float finalValue = applyDiffValue(validatedOffsetPtr(diff.getOffset()), - diff.getData().data(), - diff.getOperation()); + diff.getData().data(), + diff.getOperation()); writeData({ BYTES(&finalValue), sizeof(float) }, diff.getOffset()); break; @@ -264,8 +266,8 @@ void SnapshotData::writeQueuedDiffs() case (faabric::util::SnapshotDataType::Double): { double finalValue = applyDiffValue(validatedOffsetPtr(diff.getOffset()), - diff.getData().data(), - diff.getOperation()); + diff.getData().data(), + diff.getOperation()); writeData({ BYTES(&finalValue), sizeof(double) }, diff.getOffset()); break; @@ -478,8 +480,8 @@ void SnapshotMergeRegion::addOverwriteDiff( } void SnapshotMergeRegion::addDiffs(std::vector& diffs, - std::span original, - const uint8_t* updated, + std::span originalData, + const uint8_t* updatedData, std::pair dirtyRange) { // If the region has zero length, it signifies that it goes to the @@ -502,35 +504,30 @@ void SnapshotMergeRegion::addDiffs(std::vector& diffs, dirtyRange.second); if (operation == SnapshotMergeOperation::Overwrite) { - addOverwriteDiff(diffs, original, updated, dirtyRange); + addOverwriteDiff(diffs, originalData, updatedData, dirtyRange); return; } - if (original.size() < offset) { + if (originalData.size() < offset) { throw std::runtime_error( "Do not support non-overwrite operations outside original snapshot"); } - uint8_t* updatedValue = (uint8_t*)updated + offset; - const uint8_t* originalValue = original.data() + offset; + uint8_t* updated = (uint8_t*)updatedData + offset; + const uint8_t* original = originalData.data() + offset; + bool changed = false; switch (dataType) { case (SnapshotDataType::Int): { - int merged = - calculateDiffValue(originalValue, updatedValue, operation); - unalignedWrite(merged, updatedValue); + changed = calculateDiffValue(original, updated, operation); break; } case (SnapshotDataType::Float): { - float merged = - calculateDiffValue(originalValue, updatedValue, operation); - unalignedWrite(merged, updatedValue); + changed = calculateDiffValue(original, updated, operation); break; } case (SnapshotDataType::Double): { - double merged = calculateDiffValue( - originalValue, updatedValue, operation); - unalignedWrite(merged, updatedValue); + changed = calculateDiffValue(original, updated, operation); break; } default: { @@ -541,9 +538,11 @@ void SnapshotMergeRegion::addDiffs(std::vector& diffs, } // Add the diff - diffs.emplace_back(dataType, - operation, - offset, - std::span(updatedValue, length)); + if (changed) { + diffs.emplace_back(dataType, + operation, + offset, + std::span(updated, length)); + } } } diff --git a/tests/test/util/test_snapshot.cpp b/tests/test/util/test_snapshot.cpp index 2fcf9d169..930e41d17 100644 --- a/tests/test/util/test_snapshot.cpp +++ b/tests/test/util/test_snapshot.cpp @@ -590,6 +590,153 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, expectedData = faabric::util::valueToBytes(diffValue); } + SECTION("Float") + { + float originalValue = 0.0; + float finalValue = 0.0; + float diffValue = 0.0; + + dataType = faabric::util::SnapshotDataType::Float; + dataLength = sizeof(float); + regionLength = sizeof(float); + + // Note - imprecision in float arithmetic makes it difficult to test + // the floating point types here unless we use integer values. + SECTION("Float sum") + { + originalValue = 513; + finalValue = 945; + diffValue = 432; + + operation = faabric::util::SnapshotMergeOperation::Sum; + } + + SECTION("Float subtract") + { + originalValue = 945; + finalValue = 513; + diffValue = 432; + + operation = faabric::util::SnapshotMergeOperation::Subtract; + } + + SECTION("Float product") + { + originalValue = 5; + finalValue = 655; + diffValue = 131; + + operation = faabric::util::SnapshotMergeOperation::Product; + } + + SECTION("Float max") + { + originalValue = 555.55; + finalValue = 666.66; + diffValue = 666.66; + + operation = faabric::util::SnapshotMergeOperation::Max; + } + + SECTION("Float min") + { + originalValue = 222.22; + finalValue = 333.33; + diffValue = 333.33; + + operation = faabric::util::SnapshotMergeOperation::Min; + } + + originalData = faabric::util::valueToBytes(originalValue); + updatedData = faabric::util::valueToBytes(finalValue); + expectedData = faabric::util::valueToBytes(diffValue); + } + + SECTION("Double") + { + double originalValue = 0.0; + double finalValue = 0.0; + double diffValue = 0.0; + + dataType = faabric::util::SnapshotDataType::Double; + dataLength = sizeof(double); + regionLength = sizeof(double); + + // Note - imprecision in float arithmetic makes it difficult to test + // the floating point types here unless we use integer values. + SECTION("Double sum") + { + originalValue = 513; + finalValue = 945; + diffValue = 432; + + operation = faabric::util::SnapshotMergeOperation::Sum; + } + + SECTION("Double subtract") + { + originalValue = 945; + finalValue = 513; + diffValue = 432; + + operation = faabric::util::SnapshotMergeOperation::Subtract; + } + + SECTION("Double product") + { + originalValue = 5; + finalValue = 655; + diffValue = 131; + + operation = faabric::util::SnapshotMergeOperation::Product; + } + + SECTION("Double max") + { + originalValue = 555.55; + finalValue = 666.66; + diffValue = 666.66; + + operation = faabric::util::SnapshotMergeOperation::Max; + } + + SECTION("Double min") + { + originalValue = 222.22; + finalValue = 333.33; + diffValue = 333.33; + + operation = faabric::util::SnapshotMergeOperation::Min; + } + + originalData = faabric::util::valueToBytes(originalValue); + updatedData = faabric::util::valueToBytes(finalValue); + expectedData = faabric::util::valueToBytes(diffValue); + } + SECTION("Bool") + { + bool originalValue = false; + bool finalValue = false; + bool diffValue = false; + + dataType = faabric::util::SnapshotDataType::Bool; + dataLength = sizeof(bool); + regionLength = sizeof(bool); + + SECTION("Bool overwrite") + { + originalValue = true; + finalValue = false; + diffValue = false; + + operation = faabric::util::SnapshotMergeOperation::Overwrite; + } + + originalData = faabric::util::valueToBytes(originalValue); + updatedData = faabric::util::valueToBytes(finalValue); + expectedData = faabric::util::valueToBytes(diffValue); + } + SECTION("Raw") { dataLength = 100; @@ -668,12 +815,12 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::string expectedMsg; - SECTION("Integer overwrite") + SECTION("Bool product") { - dataType = faabric::util::SnapshotDataType::Int; - operation = faabric::util::SnapshotMergeOperation::Overwrite; - dataLength = sizeof(int32_t); - expectedMsg = "Unhandled integer merge operation"; + dataType = faabric::util::SnapshotDataType::Bool; + operation = faabric::util::SnapshotMergeOperation::Product; + dataLength = sizeof(bool); + expectedMsg = "Unsupported merge op combination"; } SECTION("Raw sum") @@ -681,7 +828,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, dataType = faabric::util::SnapshotDataType::Raw; operation = faabric::util::SnapshotMergeOperation::Sum; dataLength = 123; - expectedMsg = "Unhandled raw merge operation"; + expectedMsg = "Unsupported merge op combination"; } // Take the snapshot From 832982ad09fa81615618e6f84e903de029a3b241 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 21 Dec 2021 16:35:55 +0000 Subject: [PATCH 07/24] Added long type --- include/faabric/util/snapshot.h | 1 + src/util/snapshot.cpp | 18 ++++++++++ tests/test/util/test_snapshot.cpp | 60 +++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+) diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index 77256c664..036c28ea3 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -19,6 +19,7 @@ enum SnapshotDataType Raw, Bool, Int, + Long, Float, Double }; diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index f61d3867c..894f2988b 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -230,6 +230,8 @@ void SnapshotData::writeQueuedDiffs() { faabric::util::FullLock lock(snapMx); + SPDLOG_DEBUG("Writing {} queued diffs to snapshot", queuedDiffs.size()); + // Iterate through diffs for (auto& diff : queuedDiffs) { if (diff.getOperation() == @@ -254,6 +256,15 @@ void SnapshotData::writeQueuedDiffs() diff.getOffset()); break; } + case (faabric::util::SnapshotDataType::Long): { + long finalValue = + applyDiffValue(validatedOffsetPtr(diff.getOffset()), + diff.getData().data(), + diff.getOperation()); + writeData({ BYTES(&finalValue), sizeof(long) }, + diff.getOffset()); + break; + } case (faabric::util::SnapshotDataType::Float): { float finalValue = applyDiffValue(validatedOffsetPtr(diff.getOffset()), @@ -371,6 +382,9 @@ std::string snapshotDataTypeStr(SnapshotDataType dt) case (SnapshotDataType::Int): { return "Int"; } + case (SnapshotDataType::Long): { + return "Long"; + } case (SnapshotDataType::Float): { return "Float"; } @@ -522,6 +536,10 @@ void SnapshotMergeRegion::addDiffs(std::vector& diffs, changed = calculateDiffValue(original, updated, operation); break; } + case (SnapshotDataType::Long): { + changed = calculateDiffValue(original, updated, operation); + break; + } case (SnapshotDataType::Float): { changed = calculateDiffValue(original, updated, operation); break; diff --git a/tests/test/util/test_snapshot.cpp b/tests/test/util/test_snapshot.cpp index 930e41d17..01949455a 100644 --- a/tests/test/util/test_snapshot.cpp +++ b/tests/test/util/test_snapshot.cpp @@ -590,6 +590,66 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, expectedData = faabric::util::valueToBytes(diffValue); } + SECTION("Long") + { + long originalValue = 0; + long finalValue = 0; + long diffValue = 0; + + dataType = faabric::util::SnapshotDataType::Long; + dataLength = sizeof(long); + regionLength = sizeof(long); + + SECTION("Long sum") + { + originalValue = 100; + finalValue = 150; + diffValue = 50; + + operation = faabric::util::SnapshotMergeOperation::Sum; + } + + SECTION("Long subtract") + { + originalValue = 150; + finalValue = 100; + diffValue = 50; + + operation = faabric::util::SnapshotMergeOperation::Subtract; + } + + SECTION("Long product") + { + originalValue = 3; + finalValue = 150; + diffValue = 50; + + operation = faabric::util::SnapshotMergeOperation::Product; + } + + SECTION("Long max") + { + originalValue = 10; + finalValue = 200; + diffValue = 200; + + operation = faabric::util::SnapshotMergeOperation::Max; + } + + SECTION("Long min") + { + originalValue = 30; + finalValue = 10; + diffValue = 10; + + operation = faabric::util::SnapshotMergeOperation::Max; + } + + originalData = faabric::util::valueToBytes(originalValue); + updatedData = faabric::util::valueToBytes(finalValue); + expectedData = faabric::util::valueToBytes(diffValue); + } + SECTION("Float") { float originalValue = 0.0; From 19cc7d37b664e85852361a2961e9d35394f8e08f Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Wed, 22 Dec 2021 09:13:55 +0000 Subject: [PATCH 08/24] Add trace logging to difsf --- src/util/snapshot.cpp | 64 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 894f2988b..8f70602d3 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -237,7 +237,7 @@ void SnapshotData::writeQueuedDiffs() if (diff.getOperation() == faabric::util::SnapshotMergeOperation::Overwrite) { - SPDLOG_TRACE("Copying snapshot diff into {}-{}", + SPDLOG_TRACE("Copying overwrite diff into {}-{}", diff.getOffset(), diff.getOffset() + diff.getData().size()); @@ -246,12 +246,20 @@ void SnapshotData::writeQueuedDiffs() continue; } + uint8_t* copyTarget = validatedOffsetPtr(diff.getOffset()); switch (diff.getDataType()) { case (faabric::util::SnapshotDataType::Int): { int32_t finalValue = applyDiffValue(validatedOffsetPtr(diff.getOffset()), diff.getData().data(), diff.getOperation()); + + SPDLOG_TRACE("Writing int {} diff: {} {} -> {}", + snapshotMergeOpStr(diff.getOperation()), + unalignedRead(copyTarget), + unalignedRead(diff.getData().data()), + finalValue); + writeData({ BYTES(&finalValue), sizeof(int32_t) }, diff.getOffset()); break; @@ -261,6 +269,13 @@ void SnapshotData::writeQueuedDiffs() applyDiffValue(validatedOffsetPtr(diff.getOffset()), diff.getData().data(), diff.getOperation()); + + SPDLOG_TRACE("Writing long {} diff: {} {} -> {}", + snapshotMergeOpStr(diff.getOperation()), + unalignedRead(copyTarget), + unalignedRead(diff.getData().data()), + finalValue); + writeData({ BYTES(&finalValue), sizeof(long) }, diff.getOffset()); break; @@ -270,6 +285,13 @@ void SnapshotData::writeQueuedDiffs() applyDiffValue(validatedOffsetPtr(diff.getOffset()), diff.getData().data(), diff.getOperation()); + + SPDLOG_TRACE("Writing float {} diff: {} {} -> {}", + snapshotMergeOpStr(diff.getOperation()), + unalignedRead(copyTarget), + unalignedRead(diff.getData().data()), + finalValue); + writeData({ BYTES(&finalValue), sizeof(float) }, diff.getOffset()); break; @@ -279,6 +301,13 @@ void SnapshotData::writeQueuedDiffs() applyDiffValue(validatedOffsetPtr(diff.getOffset()), diff.getData().data(), diff.getOperation()); + + SPDLOG_TRACE("Writing double {} diff: {} {} -> {}", + snapshotMergeOpStr(diff.getOperation()), + unalignedRead(copyTarget), + unalignedRead(diff.getData().data()), + finalValue); + writeData({ BYTES(&finalValue), sizeof(double) }, diff.getOffset()); break; @@ -533,24 +562,53 @@ void SnapshotMergeRegion::addDiffs(std::vector& diffs, bool changed = false; switch (dataType) { case (SnapshotDataType::Int): { + int preUpdate = unalignedRead(updated); changed = calculateDiffValue(original, updated, operation); + + SPDLOG_TRACE("Calculated int {} merge: {} {} -> {}", + snapshotMergeOpStr(operation), + preUpdate, + unalignedRead(original), + unalignedRead(updated)); break; } case (SnapshotDataType::Long): { + long preUpdate = unalignedRead(updated); changed = calculateDiffValue(original, updated, operation); + + SPDLOG_TRACE("Calculated long {} merge: {} {} -> {}", + snapshotMergeOpStr(operation), + preUpdate, + unalignedRead(original), + unalignedRead(updated)); break; } case (SnapshotDataType::Float): { + float preUpdate = unalignedRead(updated); changed = calculateDiffValue(original, updated, operation); + + SPDLOG_TRACE("Calculated float {} merge: {} {} -> {}", + snapshotMergeOpStr(operation), + preUpdate, + unalignedRead(original), + unalignedRead(updated)); break; } case (SnapshotDataType::Double): { + double preUpdate = unalignedRead(updated); changed = calculateDiffValue(original, updated, operation); + + SPDLOG_TRACE("Calculated double {} merge: {} {} -> {}", + snapshotMergeOpStr(operation), + preUpdate, + unalignedRead(original), + unalignedRead(updated)); break; } default: { - SPDLOG_ERROR( - "Unsupported merge op combination {} {}", dataType, operation); + SPDLOG_ERROR("Unsupported merge op combination {} {}", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation)); throw std::runtime_error("Unsupported merge op combination"); } } From 95d32b8c22ecdeb9890dec4e412af85db980020c Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Wed, 22 Dec 2021 15:06:21 +0000 Subject: [PATCH 09/24] Remove register snapshot if exists --- include/faabric/snapshot/SnapshotRegistry.h | 8 ---- src/scheduler/Executor.cpp | 6 +-- src/snapshot/SnapshotRegistry.cpp | 24 +++-------- src/snapshot/SnapshotServer.cpp | 3 ++ src/util/snapshot.cpp | 6 ++- .../test/snapshot/test_snapshot_registry.cpp | 42 ------------------- 6 files changed, 15 insertions(+), 74 deletions(-) diff --git a/include/faabric/snapshot/SnapshotRegistry.h b/include/faabric/snapshot/SnapshotRegistry.h index 7d6952a57..bf8318d70 100644 --- a/include/faabric/snapshot/SnapshotRegistry.h +++ b/include/faabric/snapshot/SnapshotRegistry.h @@ -25,10 +25,6 @@ class SnapshotRegistry void registerSnapshot(const std::string& key, std::shared_ptr data); - void registerSnapshotIfNotExists( - const std::string& key, - std::shared_ptr data); - void deleteSnapshot(const std::string& key); size_t getSnapshotCount(); @@ -44,10 +40,6 @@ class SnapshotRegistry int writeSnapshotToFd(const std::string& key, faabric::util::SnapshotData& data); - - void doRegisterSnapshot(const std::string& key, - std::shared_ptr data, - bool overwrite); }; SnapshotRegistry& getSnapshotRegistry(); diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 69da7b652..c4cbfc55b 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -312,6 +312,9 @@ void Executor::threadPoolThread(int threadPoolIdx) msg.groupid()); snap->queueDiffs(diffs); + + // Reset dirty page tracking on master + faabric::util::resetDirtyTracking(); } else { // Push diffs back to master sch.pushSnapshotDiffs(msg, diffs); @@ -319,9 +322,6 @@ void Executor::threadPoolThread(int threadPoolIdx) // Reset dirty page tracking on non-master faabric::util::resetDirtyTracking(); } - - SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey()); - snap->clearMergeRegions(); } // If this batch is finished, reset the executor and release its claim. diff --git a/src/snapshot/SnapshotRegistry.cpp b/src/snapshot/SnapshotRegistry.cpp index 9175cdcdb..bec466f8a 100644 --- a/src/snapshot/SnapshotRegistry.cpp +++ b/src/snapshot/SnapshotRegistry.cpp @@ -36,37 +36,23 @@ void SnapshotRegistry::mapSnapshot(const std::string& key, uint8_t* target) { auto d = getSnapshot(key); d->mapToMemory(target); -} -void SnapshotRegistry::registerSnapshotIfNotExists( - const std::string& key, - std::shared_ptr data) -{ - doRegisterSnapshot(key, std::move(data), false); + // Reset dirty tracking otherwise whole mapped region is marked dirty + faabric::util::resetDirtyTracking(); } void SnapshotRegistry::registerSnapshot( const std::string& key, std::shared_ptr data) -{ - doRegisterSnapshot(key, std::move(data), true); -} - -void SnapshotRegistry::doRegisterSnapshot( - const std::string& key, - std::shared_ptr data, - bool overwrite) { faabric::util::FullLock lock(snapshotsMx); - if (!overwrite && (snapshotMap.find(key) != snapshotMap.end())) { - SPDLOG_TRACE("Skipping already existing snapshot {}", key); - return; - } - SPDLOG_TRACE("Registering snapshot {} size {}", key, data->getSize()); snapshotMap.insert_or_assign(key, std::move(data)); + + // Reset dirty tracking + faabric::util::resetDirtyTracking(); } void SnapshotRegistry::deleteSnapshot(const std::string& key) diff --git a/src/snapshot/SnapshotServer.cpp b/src/snapshot/SnapshotServer.cpp index a466e8f9a..d53392f09 100644 --- a/src/snapshot/SnapshotServer.cpp +++ b/src/snapshot/SnapshotServer.cpp @@ -149,6 +149,9 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) // Write queued diffs snap->writeQueuedDiffs(); + // Clear merge regions + snap->clearMergeRegions(); + // Add merge regions from request for (const auto* mr : *r->merge_regions()) { snap->addMergeRegion( diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 8f70602d3..1344009a3 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -338,6 +338,9 @@ std::vector MemoryView::getDirtyRegions() std::vector dirtyPageNumbers = getDirtyPageNumbers(data.data(), nPages); + SPDLOG_DEBUG( + "Memory view has {}/{} dirty pages", dirtyPageNumbers.size(), nPages); + std::vector> regions = faabric::util::getDirtyRegions(data.data(), nPages); @@ -345,14 +348,13 @@ std::vector MemoryView::getDirtyRegions() std::vector diffs; diffs.reserve(regions.size()); for (auto [regionBegin, regionEnd] : regions) { + SPDLOG_TRACE("Memory view dirty {}-{}", regionBegin, regionEnd); diffs.emplace_back(SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite, regionBegin, data.subspan(regionBegin, regionEnd - regionBegin)); } - SPDLOG_DEBUG("Memory view has {}/{} dirty pages", diffs.size(), nPages); - return diffs; } diff --git a/tests/test/snapshot/test_snapshot_registry.cpp b/tests/test/snapshot/test_snapshot_registry.cpp index f919618d8..dd3c4ed0e 100644 --- a/tests/test/snapshot/test_snapshot_registry.cpp +++ b/tests/test/snapshot/test_snapshot_registry.cpp @@ -103,48 +103,6 @@ TEST_CASE_METHOD(SnapshotTestFixture, REQUIRE(reg.getSnapshotCount() == 0); } -TEST_CASE_METHOD(SnapshotTestFixture, - "Test register snapshot if not exists", - "[snapshot]") -{ - REQUIRE(reg.getSnapshotCount() == 0); - - std::string keyA = "snapA"; - std::string keyB = "snapB"; - - REQUIRE(!reg.snapshotExists(keyA)); - REQUIRE(!reg.snapshotExists(keyB)); - - // Take one of the snapshots - auto snapBefore = setUpSnapshot(keyA, 1); - - REQUIRE(reg.snapshotExists(keyA)); - REQUIRE(!reg.snapshotExists(keyB)); - REQUIRE(reg.getSnapshotCount() == 1); - - auto otherSnap = - std::make_shared(snapBefore->getSize() + 10); - std::vector otherData(snapBefore->getSize() + 10, 1); - otherSnap->copyInData(otherData); - - // Check existing snapshot is not overwritten - reg.registerSnapshotIfNotExists(keyA, otherSnap); - auto snapAfterA = reg.getSnapshot(keyA); - REQUIRE(snapAfterA->getDataPtr() == snapBefore->getDataPtr()); - REQUIRE(snapAfterA->getSize() == snapBefore->getSize()); - - // Check new snapshot is still created - reg.registerSnapshotIfNotExists(keyB, otherSnap); - - REQUIRE(reg.snapshotExists(keyA)); - REQUIRE(reg.snapshotExists(keyB)); - REQUIRE(reg.getSnapshotCount() == 2); - - auto snapAfterB = reg.getSnapshot(keyB); - REQUIRE(snapAfterB->getDataPtr() == otherSnap->getDataPtr()); - REQUIRE(snapAfterB->getSize() == otherSnap->getSize()); -} - TEST_CASE_METHOD(SnapshotTestFixture, "Test can't get snapshot with empty key", "[snapshot]") From 1903f428620455419fc774fa74ace4b740b6b832 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Wed, 22 Dec 2021 17:04:57 +0000 Subject: [PATCH 10/24] Formatting --- include/faabric/util/snapshot.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index 036c28ea3..b827061e5 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -88,8 +88,8 @@ class SnapshotMergeRegion template inline bool calculateDiffValue(const uint8_t* original, - uint8_t* updated, - SnapshotMergeOperation operation) + uint8_t* updated, + SnapshotMergeOperation operation) { // Cast to value T updatedValue = unalignedRead(updated); From 94ed106b67ac69f6686341fd4e9c5b4483c0cb98 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Wed, 22 Dec 2021 18:08:13 +0000 Subject: [PATCH 11/24] More snapshot logging --- src/util/snapshot.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 1344009a3..fab8d8064 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -537,6 +537,13 @@ void SnapshotMergeRegion::addDiffs(std::vector& diffs, ((length == 0) || (dirtyRange.first < offset + length)); if (!isInRange) { + SPDLOG_TRACE("{} {} merge region {}-{} not in dirty region {}-{}", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation), + offset, + offset + length, + dirtyRange.first, + dirtyRange.second); return; } From f1e5bf8cc39d3ba04c766f347026a92f9410a023 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Thu, 23 Dec 2021 12:28:04 +0000 Subject: [PATCH 12/24] Add filling in gaps capacity --- include/faabric/util/snapshot.h | 9 +++ src/util/snapshot.cpp | 58 +++++++++++++- tests/test/util/test_snapshot.cpp | 124 ++++++++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 4 deletions(-) diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index b827061e5..33185fa40 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -69,6 +69,13 @@ class SnapshotMergeRegion SnapshotDataType dataType = SnapshotDataType::Raw; SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite; + SnapshotMergeRegion() = default; + + SnapshotMergeRegion(uint32_t offsetIn, + size_t lengthIn, + SnapshotDataType dataTypeIn, + SnapshotMergeOperation operationIn); + void addDiffs(std::vector& diffs, std::span originalData, const uint8_t* updatedData, @@ -202,6 +209,8 @@ class SnapshotData SnapshotMergeOperation operation, bool overwrite = false); + void fillGapsWithOverwriteRegions(); + void clearMergeRegions(); std::map getMergeRegions(); diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index fab8d8064..5ef7a0dc4 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -151,10 +151,7 @@ void SnapshotData::addMergeRegion(uint32_t offset, { faabric::util::FullLock lock(snapMx); - SnapshotMergeRegion region{ .offset = offset, - .length = length, - .dataType = dataType, - .operation = operation }; + SnapshotMergeRegion region(offset, length, dataType, operation); if (mergeRegions.find(region.offset) != mergeRegions.end()) { if (!overwrite) { @@ -187,6 +184,49 @@ void SnapshotData::addMergeRegion(uint32_t offset, mergeRegions[region.offset] = region; } +void SnapshotData::fillGapsWithOverwriteRegions() +{ + faabric::util::FullLock lock(snapMx); + + // If there's no merge regions, just do one big one (note, zero length means + // fill all space + if (mergeRegions.empty()) { + mergeRegions.emplace(std::pair( + 0, + { 0, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite })); + + return; + } + + uint32_t lastRegionEnd = 0; + for (auto [offset, region] : mergeRegions) { + if (offset == 0) { + // Zeroth byte is in a merge region + lastRegionEnd = region.length; + continue; + } + + mergeRegions.emplace(std::pair( + lastRegionEnd, + { lastRegionEnd, + region.offset - lastRegionEnd, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite })); + + lastRegionEnd = region.offset + region.length; + } + + if (lastRegionEnd < size) { + // Add a final region at the end of the snapshot + mergeRegions.emplace(std::pair( + lastRegionEnd, + { lastRegionEnd, + 0, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite })); + } +} + void SnapshotData::mapToMemory(uint8_t* target) { faabric::util::FullLock lock(snapMx); @@ -524,6 +564,16 @@ void SnapshotMergeRegion::addOverwriteDiff( } } +SnapshotMergeRegion::SnapshotMergeRegion(uint32_t offsetIn, + size_t lengthIn, + SnapshotDataType dataTypeIn, + SnapshotMergeOperation operationIn) + : offset(offsetIn) + , length(lengthIn) + , dataType(dataTypeIn) + , operation(operationIn) +{} + void SnapshotMergeRegion::addDiffs(std::vector& diffs, std::span originalData, const uint8_t* updatedData, diff --git a/tests/test/util/test_snapshot.cpp b/tests/test/util/test_snapshot.cpp index 01949455a..f768d0d88 100644 --- a/tests/test/util/test_snapshot.cpp +++ b/tests/test/util/test_snapshot.cpp @@ -1041,6 +1041,130 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, checkDiffs(actualDiffs, expectedDiffs); } +TEST_CASE_METHOD(SnapshotMergeTestFixture, + "Test filling gaps in regions with overwrite", + "[snapshot][util]") +{ + int snapPages = 3; + size_t snapSize = snapPages * HOST_PAGE_SIZE; + + std::shared_ptr snap = + std::make_shared(snapSize); + + std::map expectedRegions; + + SECTION("No existing regions") + { + expectedRegions[0] = { + 0, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite + }; + } + + SECTION("One region at start") + { + snap->addMergeRegion( + 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); + expectedRegions[0] = { + 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite + }; + expectedRegions[100] = { + 100, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite + }; + } + + SECTION("One region at end") + { + snap->addMergeRegion(snapSize - 100, + 100, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); + + expectedRegions[0] = { 0, + snapSize - 100, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite }; + + expectedRegions[snapSize - 100] = { (uint32_t)snapSize - 100, + 100, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite }; + } + + SECTION("Multiple regions") + { + // Deliberately add out of order + snap->addMergeRegion(HOST_PAGE_SIZE, + sizeof(double), + SnapshotDataType::Double, + SnapshotMergeOperation::Product); + + snap->addMergeRegion( + 100, sizeof(int), SnapshotDataType::Int, SnapshotMergeOperation::Sum); + + snap->addMergeRegion(HOST_PAGE_SIZE + 200, + HOST_PAGE_SIZE, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); + + expectedRegions[0] = { + 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite + }; + + expectedRegions[100] = { + 100, sizeof(int), SnapshotDataType::Int, SnapshotMergeOperation::Sum + }; + + expectedRegions[100 + sizeof(int)] = { + 100 + sizeof(int), + HOST_PAGE_SIZE - (100 + sizeof(int)), + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite + }; + + expectedRegions[HOST_PAGE_SIZE] = { (uint32_t)HOST_PAGE_SIZE, + sizeof(double), + SnapshotDataType::Double, + SnapshotMergeOperation::Product }; + + expectedRegions[HOST_PAGE_SIZE + sizeof(double)] = { + (uint32_t)(HOST_PAGE_SIZE + sizeof(double)), + 200 - sizeof(double), + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite + }; + + expectedRegions[HOST_PAGE_SIZE + 200] = { + (uint32_t)HOST_PAGE_SIZE + 200, + (uint32_t)HOST_PAGE_SIZE, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite + }; + + expectedRegions[(2 * HOST_PAGE_SIZE) + 200] = { + (uint32_t)(2 * HOST_PAGE_SIZE) + 200, + 0, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite + }; + } + + snap->fillGapsWithOverwriteRegions(); + + std::map actualRegions = + snap->getMergeRegions(); + + REQUIRE(actualRegions.size() == expectedRegions.size()); + for (auto [expectedOffset, expectedRegion] : expectedRegions) { + REQUIRE(actualRegions.find(expectedOffset) != actualRegions.end()); + + SnapshotMergeRegion actualRegion = actualRegions[expectedOffset]; + REQUIRE(actualRegion.offset == expectedRegion.offset); + REQUIRE(actualRegion.dataType == expectedRegion.dataType); + REQUIRE(actualRegion.length == expectedRegion.length); + REQUIRE(actualRegion.operation == expectedRegion.operation); + } +} + TEST_CASE_METHOD(SnapshotMergeTestFixture, "Test mix of applicable and non-applicable merge regions", "[snapshot][util]") From 1c2ed8949442a3f3bfabc8705591516a94f9ba4d Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Thu, 23 Dec 2021 14:26:45 +0000 Subject: [PATCH 13/24] Default to shared --- src/scheduler/Executor.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index c4cbfc55b..a7fa0c882 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -298,6 +298,9 @@ void Executor::threadPoolThread(int threadPoolIdx) SPDLOG_TRACE("Diffing memory with pre-execution snapshot for {}", msg.snapshotkey()); + // Fill gaps with overwrites + snap->fillGapsWithOverwriteRegions(); + // If we're on master, we write the diffs straight to the snapshot // otherwise we push them to the master. std::vector diffs = From 69094d7b28a12d831b165452d6bb214e5d939476 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Thu, 23 Dec 2021 14:46:42 +0000 Subject: [PATCH 14/24] Use spans where possible --- include/faabric/util/snapshot.h | 4 ++-- src/util/snapshot.cpp | 32 ++++++++++++++++---------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index 33185fa40..44c9e972d 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -78,13 +78,13 @@ class SnapshotMergeRegion void addDiffs(std::vector& diffs, std::span originalData, - const uint8_t* updatedData, + std::span updatedData, std::pair dirtyRange); private: void addOverwriteDiff(std::vector& diffs, std::span original, - const uint8_t* updated, + std::span updated, std::pair dirtyRange); void addMergeDiff(std::vector& diffs, diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 5ef7a0dc4..2e72691fb 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -433,7 +433,7 @@ std::vector MemoryView::diffWithSnapshot( // Add the diffs mr.addDiffs(diffs, { snap->getDataPtr(), snap->getSize() }, - data.data(), + data, dirtyRegion); } } @@ -500,7 +500,7 @@ std::string snapshotMergeOpStr(SnapshotMergeOperation op) void SnapshotMergeRegion::addOverwriteDiff( std::vector& diffs, std::span original, - const uint8_t* updated, + std::span updated, std::pair dirtyRange) { auto operation = SnapshotMergeOperation::Overwrite; @@ -508,6 +508,8 @@ void SnapshotMergeRegion::addOverwriteDiff( // Work out bounds of region we're checking uint32_t checkStart = std::max(dirtyRange.first, offset); + // Here we need to make sure we don't overrun the original or the updated + // data uint32_t checkEnd; if (length == 0) { checkEnd = dirtyRange.second; @@ -520,8 +522,8 @@ void SnapshotMergeRegion::addOverwriteDiff( for (int b = checkStart; b <= checkEnd; b++) { // If this byte is outside the original region, we can't // compare (i.e. always dirty) - bool isDirtyByte = - (b > original.size()) || (*(original.data() + b) != *(updated + b)); + bool isDirtyByte = (b > original.size()) || + (*(original.data() + b) != *(updated.data() + b)); if (isDirtyByte && !diffInProgress) { // Diff starts here if it's different and diff @@ -538,11 +540,10 @@ void SnapshotMergeRegion::addOverwriteDiff( diffStart + diffLength); diffInProgress = false; - diffs.emplace_back( - dataType, - operation, - diffStart, - std::span(updated + diffStart, diffLength)); + diffs.emplace_back(dataType, + operation, + diffStart, + updated.subspan(diffStart, diffLength)); } } @@ -556,11 +557,10 @@ void SnapshotMergeRegion::addOverwriteDiff( diffStart, diffStart + finalDiffLength); - diffs.emplace_back( - dataType, - operation, - diffStart, - std::span(updated + diffStart, finalDiffLength)); + diffs.emplace_back(dataType, + operation, + diffStart, + updated.subspan(diffStart, finalDiffLength)); } } @@ -576,7 +576,7 @@ SnapshotMergeRegion::SnapshotMergeRegion(uint32_t offsetIn, void SnapshotMergeRegion::addDiffs(std::vector& diffs, std::span originalData, - const uint8_t* updatedData, + std::span updatedData, std::pair dirtyRange) { // If the region has zero length, it signifies that it goes to the @@ -615,7 +615,7 @@ void SnapshotMergeRegion::addDiffs(std::vector& diffs, "Do not support non-overwrite operations outside original snapshot"); } - uint8_t* updated = (uint8_t*)updatedData + offset; + uint8_t* updated = (uint8_t*)updatedData.data() + offset; const uint8_t* original = originalData.data() + offset; bool changed = false; From 6dcf8ce76e60b01e8f46a31cdbad764e59173051 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 27 Dec 2021 10:13:02 +0000 Subject: [PATCH 15/24] Logging and reinstate MR clearing --- src/scheduler/Executor.cpp | 3 +++ src/util/snapshot.cpp | 9 ++++++++- tests/test/scheduler/test_executor.cpp | 10 +++------- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index a7fa0c882..3adcd3f72 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -325,6 +325,9 @@ void Executor::threadPoolThread(int threadPoolIdx) // Reset dirty page tracking on non-master faabric::util::resetDirtyTracking(); } + + SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey()); + snap->clearMergeRegions(); } // If this batch is finished, reset the executor and release its claim. diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 2e72691fb..ed1ed3268 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -191,6 +191,7 @@ void SnapshotData::fillGapsWithOverwriteRegions() // If there's no merge regions, just do one big one (note, zero length means // fill all space if (mergeRegions.empty()) { + SPDLOG_TRACE("Filling gap with single overwrite merge region"); mergeRegions.emplace(std::pair( 0, { 0, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite })); @@ -206,10 +207,16 @@ void SnapshotData::fillGapsWithOverwriteRegions() continue; } + uint32_t regionLen = region.offset - lastRegionEnd; + + SPDLOG_TRACE("Filling gap with overwrite merge region {}-{}", + lastRegionEnd, + lastRegionEnd + regionLen); + mergeRegions.emplace(std::pair( lastRegionEnd, { lastRegionEnd, - region.offset - lastRegionEnd, + regionLen, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite })); diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index ae92cbf9a..250b5f733 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -178,19 +178,15 @@ int32_t TestExecutor::executeTask( auto snapData = faabric::snapshot::getSnapshotRegistry().getSnapshot( msg.snapshotkey()); - // Avoid writing a zero here as the memory is already zeroed hence + // Set up the data. + // Note, avoid writing a zero here as the memory is already zeroed hence // it's not a change std::vector data = { (uint8_t)(pageIdx + 1), (uint8_t)(pageIdx + 2), (uint8_t)(pageIdx + 3) }; - // Set up a merge region that should catch the diff + // Copy in the data size_t offset = (pageIdx * faabric::util::HOST_PAGE_SIZE); - snapData->addMergeRegion(offset, - data.size() + 10, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite); - SPDLOG_DEBUG("TestExecutor modifying page {} of memory ({}-{})", pageIdx, offset, From 0406a3f69e5babbc5a565f710c46067f69398652 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 27 Dec 2021 10:59:22 +0000 Subject: [PATCH 16/24] Add ignore diffs --- include/faabric/util/snapshot.h | 3 ++- src/util/snapshot.cpp | 16 +++++++++++++++ tests/test/util/test_snapshot.cpp | 33 +++++++++++++++++++++++-------- 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index 44c9e972d..83baf24a5 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -31,7 +31,8 @@ enum SnapshotMergeOperation Product, Subtract, Max, - Min + Min, + Ignore }; class SnapshotDiff diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index ed1ed3268..318c42b47 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -281,6 +281,15 @@ void SnapshotData::writeQueuedDiffs() // Iterate through diffs for (auto& diff : queuedDiffs) { + if (diff.getOperation() == + faabric::util::SnapshotMergeOperation::Ignore) { + + SPDLOG_TRACE("Ignoring region {}-{}", + diff.getOffset(), + diff.getOffset() + diff.getData().size()); + + continue; + } if (diff.getOperation() == faabric::util::SnapshotMergeOperation::Overwrite) { @@ -479,6 +488,9 @@ std::string snapshotDataTypeStr(SnapshotDataType dt) std::string snapshotMergeOpStr(SnapshotMergeOperation op) { switch (op) { + case (SnapshotMergeOperation::Ignore): { + return "Ignore"; + } case (SnapshotMergeOperation::Max): { return "Max"; } @@ -617,6 +629,10 @@ void SnapshotMergeRegion::addDiffs(std::vector& diffs, return; } + if (operation == SnapshotMergeOperation::Ignore) { + return; + } + if (originalData.size() < offset) { throw std::runtime_error( "Do not support non-overwrite operations outside original snapshot"); diff --git a/tests/test/util/test_snapshot.cpp b/tests/test/util/test_snapshot.cpp index f768d0d88..28f3945de 100644 --- a/tests/test/util/test_snapshot.cpp +++ b/tests/test/util/test_snapshot.cpp @@ -530,6 +530,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, size_t dataLength = 0; size_t regionLength = 0; + bool expectNoDiff = false; + SECTION("Integer") { int originalValue = 0; @@ -773,6 +775,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, updatedData = faabric::util::valueToBytes(finalValue); expectedData = faabric::util::valueToBytes(diffValue); } + SECTION("Bool") { bool originalValue = false; @@ -817,6 +820,15 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, regionLength = 0; operation = faabric::util::SnapshotMergeOperation::Overwrite; } + + SECTION("Ignore") + { + regionLength = dataLength; + operation = faabric::util::SnapshotMergeOperation::Ignore; + + expectedData = originalData; + expectNoDiff = true; + } } // Write the original data into place @@ -845,15 +857,20 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::vector actualDiffs = MemoryView({ sharedMem.get(), sharedMemSize }).diffWithSnapshot(snap); - // Check diff - REQUIRE(actualDiffs.size() == 1); - std::vector expectedDiffs = { { dataType, - operation, - offset, - { expectedData.data(), - expectedData.size() } } }; + if (expectNoDiff) { + REQUIRE(actualDiffs.empty()); + } else { + // Check diff + REQUIRE(actualDiffs.size() == 1); + std::vector expectedDiffs = { + { dataType, + operation, + offset, + { expectedData.data(), expectedData.size() } } + }; - checkDiffs(actualDiffs, expectedDiffs); + checkDiffs(actualDiffs, expectedDiffs); + } } TEST_CASE_METHOD(SnapshotMergeTestFixture, From 8e21f283b3f92cdc95cb3ddae6b4e97bbee3c0c8 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 27 Dec 2021 11:55:48 +0000 Subject: [PATCH 17/24] Fix bug in overflowing original snap data --- src/util/snapshot.cpp | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 318c42b47..d7a0de910 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -224,6 +224,9 @@ void SnapshotData::fillGapsWithOverwriteRegions() } if (lastRegionEnd < size) { + SPDLOG_TRACE("Filling final gap with merge region starting at {}", + lastRegionEnd); + // Add a final region at the end of the snapshot mergeRegions.emplace(std::pair( lastRegionEnd, @@ -536,12 +539,22 @@ void SnapshotMergeRegion::addOverwriteDiff( checkEnd = std::min(dirtyRange.second, offset + length); } + // If the region is outside the original data, automatically add a diff for + // the whole region + if (checkStart >= original.size()) { + diffs.emplace_back(dataType, + operation, + checkStart, + updated.subspan(checkStart, checkEnd - checkStart)); + return; + } + bool diffInProgress = false; int diffStart = 0; for (int b = checkStart; b <= checkEnd; b++) { // If this byte is outside the original region, we can't // compare (i.e. always dirty) - bool isDirtyByte = (b > original.size()) || + bool isDirtyByte = (b >= original.size()) || (*(original.data() + b) != *(updated.data() + b)); if (isDirtyByte && !diffInProgress) { @@ -616,13 +629,15 @@ void SnapshotMergeRegion::addDiffs(std::vector& diffs, return; } - SPDLOG_TRACE("{} {} merge region {}-{} aligns with dirty region {}-{}", - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - offset, - offset + length, - dirtyRange.first, - dirtyRange.second); + SPDLOG_TRACE( + "{} {} merge region {}-{}, dirty region {}-{}, original size {}", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation), + offset, + offset + length, + dirtyRange.first, + dirtyRange.second, + originalData.size()); if (operation == SnapshotMergeOperation::Overwrite) { addOverwriteDiff(diffs, originalData, updatedData, dirtyRange); From 4ec33cee54a799a60fa1c2f7b33cd6b07431fe72 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Mon, 27 Dec 2021 15:02:41 +0000 Subject: [PATCH 18/24] Remove unnecessary merge regions in dist tests --- tests/dist/scheduler/functions.cpp | 16 ---------------- tests/dist/scheduler/test_snapshots.cpp | 6 ------ 2 files changed, 22 deletions(-) diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index c2b4f5140..c46f85002 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -129,16 +129,6 @@ int handleFakeDiffsThreadedFunction( std::vector localChange(3, i); int offset = 2 * i * faabric::util::HOST_PAGE_SIZE; snap->copyInData(localChange, offset); - - // Make sure changes made by this message are covered by a merge - // region - int regionOffset = 2 * i * faabric::util::HOST_PAGE_SIZE; - int regionLength = 20 + msg.inputdata().size(); - snap->addMergeRegion( - regionOffset, - regionLength, - faabric::util::SnapshotDataType::Raw, - faabric::util::SnapshotMergeOperation::Overwrite); } // Dispatch the message, expecting them all to execute on other hosts @@ -290,12 +280,6 @@ int handleReductionFunction(tests::DistTestExecutor* exec, SnapshotMergeOperation::Sum, true); - snap->addMergeRegion(arrayOffset, - sizeof(int32_t) * nThreads, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite, - true); - // Make the request faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index 794f2d120..cd93df8ad 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -32,12 +32,6 @@ TEST_CASE_METHOD(DistTestsFixture, size_t snapSize = 2 * faabric::util::HOST_PAGE_SIZE; auto snap = std::make_shared(snapSize); - // Add a merge region to catch all changes - snap->addMergeRegion(0, - snapSize, - faabric::util::SnapshotDataType::Raw, - faabric::util::SnapshotMergeOperation::Overwrite); - reg.registerSnapshot(snapshotKey, snap); // Set up the message From 5d6b7bbd10b3ce7889bd6d4fb8f71c24bb9a9b1f Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 28 Dec 2021 10:00:27 +0000 Subject: [PATCH 19/24] More snapshot tests --- tests/test/util/test_snapshot.cpp | 43 ++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/tests/test/util/test_snapshot.cpp b/tests/test/util/test_snapshot.cpp index 28f3945de..3a14ea719 100644 --- a/tests/test/util/test_snapshot.cpp +++ b/tests/test/util/test_snapshot.cpp @@ -1261,15 +1261,16 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, } TEST_CASE_METHOD(SnapshotMergeTestFixture, - "Test overwrite region to end of memory", + "Test merge regions past end of original memory", "[snapshot][util]") { int snapPages = 6; int sharedMemPages = 10; + size_t snapSize = snapPages * HOST_PAGE_SIZE; size_t sharedMemSize = sharedMemPages * HOST_PAGE_SIZE; std::shared_ptr snap = - std::make_shared(snapPages * HOST_PAGE_SIZE); + std::make_shared(snapSize); reg.registerSnapshot(snapKey, snap); // Map the snapshot @@ -1279,13 +1280,43 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Make an edit somewhere in the extended memory, outside the original // snapshot - uint32_t diffPageStart = (snapPages + 2) * HOST_PAGE_SIZE; - uint32_t diffOffset = diffPageStart + 100; + uint32_t diffPageStart = 0; + uint32_t diffOffset = 0; + uint32_t mergeRegionStart = snapSize; + + SECTION("Diff at end of original data, overlapping merge region") + { + diffPageStart = snapSize; + diffOffset = diffPageStart + 100; + mergeRegionStart = snapSize; + } + + SECTION("Diff and merge region aligned at end of original data") + { + diffPageStart = snapSize; + diffOffset = diffPageStart; + mergeRegionStart = snapSize; + } + + SECTION("Diff Past end of original data, overlapping merge region") + { + diffPageStart = (snapPages + 2) * HOST_PAGE_SIZE; + diffOffset = diffPageStart + 100; + mergeRegionStart = diffPageStart; + } + + SECTION("Diff and merge region aligned past end of original data") + { + diffPageStart = (snapPages + 2) * HOST_PAGE_SIZE; + diffOffset = diffPageStart; + mergeRegionStart = diffPageStart; + } + std::vector diffData(120, 2); std::memcpy(sharedMem.get() + diffOffset, diffData.data(), diffData.size()); - // Add a merge region from near end of original snapshot upwards - snap->addMergeRegion(snap->getSize() - 120, + // Add a merge region + snap->addMergeRegion(mergeRegionStart, 0, faabric::util::SnapshotDataType::Raw, faabric::util::SnapshotMergeOperation::Overwrite); From 3753ae244629891666d65cd203a4a6dded562a48 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 28 Dec 2021 12:00:01 +0000 Subject: [PATCH 20/24] Fix uint mismatch --- src/util/snapshot.cpp | 37 ++++++++++++---- tests/test/util/test_snapshot.cpp | 72 ++++++++++++++++++++----------- 2 files changed, 77 insertions(+), 32 deletions(-) diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index d7a0de910..5dc2bf0cf 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -542,6 +542,10 @@ void SnapshotMergeRegion::addOverwriteDiff( // If the region is outside the original data, automatically add a diff for // the whole region if (checkStart >= original.size()) { + SPDLOG_TRACE("Single extension {} overwrite diff at {}-{}", + snapshotDataTypeStr(dataType), + checkStart, + checkEnd - checkStart); diffs.emplace_back(dataType, operation, checkStart, @@ -550,13 +554,30 @@ void SnapshotMergeRegion::addOverwriteDiff( } bool diffInProgress = false; - int diffStart = 0; - for (int b = checkStart; b <= checkEnd; b++) { - // If this byte is outside the original region, we can't - // compare (i.e. always dirty) - bool isDirtyByte = (b >= original.size()) || - (*(original.data() + b) != *(updated.data() + b)); + uint32_t diffStart = 0; + for (uint32_t b = checkStart; b <= checkEnd; b++) { + // If this byte is outside the original region, everything from here on + // is dirty, so we can add a single region to go from here to the end + if (b >= original.size()) { + if (!diffInProgress) { + diffStart = b; + } + + uint32_t diffLength = checkEnd - diffStart; + + SPDLOG_TRACE("Extension {} overwrite diff at {}-{}", + snapshotDataTypeStr(dataType), + diffStart, + diffStart + diffLength); + + diffs.emplace_back(dataType, + operation, + diffStart, + updated.subspan(diffStart, diffLength)); + return; + } + bool isDirtyByte = (*(original.data() + b) != *(updated.data() + b)); if (isDirtyByte && !diffInProgress) { // Diff starts here if it's different and diff // not in progress @@ -565,7 +586,7 @@ void SnapshotMergeRegion::addOverwriteDiff( } else if (!isDirtyByte && diffInProgress) { // Diff ends if it's not different and diff is // in progress - int diffLength = b - diffStart; + uint32_t diffLength = b - diffStart; SPDLOG_TRACE("Found {} overwrite diff at {}-{}", snapshotDataTypeStr(dataType), diffStart, @@ -582,7 +603,7 @@ void SnapshotMergeRegion::addOverwriteDiff( // If we've reached the end of this region with a diff // in progress, we need to close it off if (diffInProgress) { - int finalDiffLength = checkEnd - diffStart; + uint32_t finalDiffLength = checkEnd - diffStart; SPDLOG_TRACE("Adding {} {} diff at {}-{} (end of region)", snapshotDataTypeStr(dataType), snapshotMergeOpStr(operation), diff --git a/tests/test/util/test_snapshot.cpp b/tests/test/util/test_snapshot.cpp index 3a14ea719..67382b1bd 100644 --- a/tests/test/util/test_snapshot.cpp +++ b/tests/test/util/test_snapshot.cpp @@ -1,9 +1,9 @@ #include -#include "faabric/snapshot/SnapshotRegistry.h" #include "faabric_utils.h" #include "fixtures.h" +#include #include #include #include @@ -1278,42 +1278,66 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, reg.mapSnapshot(snapKey, sharedMem.get()); faabric::util::resetDirtyTracking(); - // Make an edit somewhere in the extended memory, outside the original - // snapshot - uint32_t diffPageStart = 0; - uint32_t diffOffset = 0; + uint32_t changeStartPage = 0; + uint32_t changeOffset = 0; uint32_t mergeRegionStart = snapSize; + size_t changeLength = 123; - SECTION("Diff at end of original data, overlapping merge region") + uint32_t expectedDiffStart = 0; + uint32_t expectedDiffSize = 0; + + // When memory has changed at or past the end of the original data, the diff + // will start at the end of the original data and round up to the next page + // boundary. If the change starts before the end, it will start at the + // beginning of the change and continue into the page boundary past the + // original data. + + SECTION("Change at end of original data, overlapping merge region") { - diffPageStart = snapSize; - diffOffset = diffPageStart + 100; + changeStartPage = snapSize; + changeOffset = changeStartPage + 100; mergeRegionStart = snapSize; + expectedDiffStart = changeStartPage; + expectedDiffSize = HOST_PAGE_SIZE; } - SECTION("Diff and merge region aligned at end of original data") + SECTION("Change and merge region aligned at end of original data") { - diffPageStart = snapSize; - diffOffset = diffPageStart; + changeStartPage = snapSize; + changeOffset = changeStartPage; mergeRegionStart = snapSize; + expectedDiffStart = changeStartPage; + expectedDiffSize = HOST_PAGE_SIZE; } - SECTION("Diff Past end of original data, overlapping merge region") + SECTION("Change after end of original data, overlapping merge region") { - diffPageStart = (snapPages + 2) * HOST_PAGE_SIZE; - diffOffset = diffPageStart + 100; - mergeRegionStart = diffPageStart; + changeStartPage = (snapPages + 2) * HOST_PAGE_SIZE; + changeOffset = changeStartPage + 100; + mergeRegionStart = changeStartPage; + expectedDiffStart = changeStartPage; + expectedDiffSize = HOST_PAGE_SIZE; } - SECTION("Diff and merge region aligned past end of original data") + SECTION("Merge region and change crossing end of original data") { - diffPageStart = (snapPages + 2) * HOST_PAGE_SIZE; - diffOffset = diffPageStart; - mergeRegionStart = diffPageStart; + // Merge region starts before diff + changeStartPage = (snapPages - 1) * HOST_PAGE_SIZE; + changeOffset = changeStartPage + 100; + mergeRegionStart = (snapPages - 2) * HOST_PAGE_SIZE; + + // Change goes from inside original data to overshoot the end + changeLength = 2 * HOST_PAGE_SIZE; + + // Diff will cover from the start of the change to round up to the + // nearest page in the overshoot region. + expectedDiffStart = changeOffset; + expectedDiffSize = changeLength + (HOST_PAGE_SIZE - 100); } - std::vector diffData(120, 2); - std::memcpy(sharedMem.get() + diffOffset, diffData.data(), diffData.size()); + std::vector diffData(changeLength, 2); + std::memcpy( + sharedMem.get() + changeOffset, diffData.data(), diffData.size()); // Add a merge region snap->addMergeRegion(mergeRegionStart, @@ -1324,12 +1348,12 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::vector actualDiffs = MemoryView({ sharedMem.get(), sharedMemSize }).diffWithSnapshot(snap); - // Make sure the whole page containing the diff is included + // Set up expected diff std::vector expectedDiffs = { { faabric::util::SnapshotDataType::Raw, faabric::util::SnapshotMergeOperation::Overwrite, - diffPageStart, - { sharedMem.get() + diffPageStart, (size_t)HOST_PAGE_SIZE } }, + expectedDiffStart, + { sharedMem.get() + expectedDiffStart, expectedDiffSize } }, }; checkDiffs(actualDiffs, expectedDiffs); From 6f2ed5b5aa02ebf331cb53eab6e0122a855cb3f3 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 28 Dec 2021 12:24:37 +0000 Subject: [PATCH 21/24] Self review --- include/faabric/snapshot/SnapshotClient.h | 4 ++-- include/faabric/util/snapshot.h | 5 ----- src/scheduler/Executor.cpp | 16 +++++++--------- src/scheduler/Scheduler.cpp | 3 +-- src/snapshot/SnapshotClient.cpp | 5 +++-- 5 files changed, 13 insertions(+), 20 deletions(-) diff --git a/include/faabric/snapshot/SnapshotClient.h b/include/faabric/snapshot/SnapshotClient.h index f840a76cb..af81dc2bf 100644 --- a/include/faabric/snapshot/SnapshotClient.h +++ b/include/faabric/snapshot/SnapshotClient.h @@ -40,7 +40,7 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient void pushSnapshotUpdate( std::string snapshotKey, - std::shared_ptr data, + const std::shared_ptr& data, const std::vector& diffs); void pushSnapshotDiffs( @@ -56,7 +56,7 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient void doPushSnapshotDiffs( const std::string& snapshotKey, - std::shared_ptr data, + const std::shared_ptr& data, const std::vector& diffs); }; } diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index 83baf24a5..03ee5df8a 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -87,11 +87,6 @@ class SnapshotMergeRegion std::span original, std::span updated, std::pair dirtyRange); - - void addMergeDiff(std::vector& diffs, - std::span original, - const uint8_t* updated, - std::pair dirtyRange); }; template diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 3adcd3f72..1c9c00a03 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -301,11 +301,12 @@ void Executor::threadPoolThread(int threadPoolIdx) // Fill gaps with overwrites snap->fillGapsWithOverwriteRegions(); - // If we're on master, we write the diffs straight to the snapshot - // otherwise we push them to the master. + // Work out the diffs std::vector diffs = funcMemory.diffWithSnapshot(snap); + // On master we queue the diffs locally directly, on a remote host + // we push them back to master if (isMaster) { SPDLOG_DEBUG("Queueing {} diffs for {} to snapshot {} on " "master (group {})", @@ -315,17 +316,14 @@ void Executor::threadPoolThread(int threadPoolIdx) msg.groupid()); snap->queueDiffs(diffs); - - // Reset dirty page tracking on master - faabric::util::resetDirtyTracking(); } else { - // Push diffs back to master sch.pushSnapshotDiffs(msg, diffs); - - // Reset dirty page tracking on non-master - faabric::util::resetDirtyTracking(); } + // Reset dirty page tracking + faabric::util::resetDirtyTracking(); + + // Clear merge regions SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey()); snap->clearMergeRegions(); } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index be2cfc09c..23a5d30ed 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -508,8 +508,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( std::vector snapshotDiffs = snapMemView.getDirtyRegions(); - c.pushSnapshotUpdate( - snapshotKey, std::move(snap), snapshotDiffs); + c.pushSnapshotUpdate(snapshotKey, snap, snapshotDiffs); } else { c.pushSnapshot(snapshotKey, snap); pushedSnapshotsMap[snapshotKey].insert(host); diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index 5ef23b2dd..92fb6126d 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -121,7 +121,7 @@ void SnapshotClient::pushSnapshot( void SnapshotClient::pushSnapshotUpdate( std::string snapshotKey, - std::shared_ptr data, + const std::shared_ptr& data, const std::vector& diffs) { SPDLOG_DEBUG("Pushing update to snapshot {} to {} ({} diffs, {} regions)", @@ -147,7 +147,7 @@ void SnapshotClient::pushSnapshotDiffs( void SnapshotClient::doPushSnapshotDiffs( const std::string& snapshotKey, - std::shared_ptr data, + const std::shared_ptr& data, const std::vector& diffs) { if (faabric::util::isMockMode()) { @@ -158,6 +158,7 @@ void SnapshotClient::doPushSnapshotDiffs( // Create objects for all the diffs std::vector> diffsFbVector; + diffsFbVector.reserve(diffs.size()); for (const auto& d : diffs) { std::span diffData = d.getData(); auto dataOffset = From 086359d668746e279688f30644bff1102a78a041 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Tue, 28 Dec 2021 17:29:06 +0000 Subject: [PATCH 22/24] Profiling --- CMakeLists.txt | 6 ++++++ include/faabric/util/snapshot.h | 2 ++ src/runner/FaabricMain.cpp | 4 ++++ src/snapshot/SnapshotRegistry.cpp | 7 +++++++ src/util/snapshot.cpp | 9 +++++++++ tasks/dev.py | 5 ++++- 6 files changed, 32 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 765ec9a58..cd0cd9e91 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,6 +3,7 @@ project(faabric) option(FAABRIC_WASM_BUILD "Build Faabric wasm library" OFF) option(FAABRIC_BUILD_TESTS "Build Faabric tests" ON) +option(FAABRIC_SELF_TRACING "Turn on system tracing using the logger" OFF) # Enable colorized compiler output if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU") @@ -22,6 +23,11 @@ set(CMAKE_EXE_LINKER_FLAGS "-fuse-ld=lld") # Compile comamnds for clang tools set(CMAKE_EXPORT_COMPILE_COMMANDS ON) +if(${FAABRIC_SELF_TRACING}) + message("-- Activated Faabric self-tracing") + add_definitions(-DTRACE_ALL=1) +endif() + # Set-up use of sanitisers if (FAABRIC_USE_SANITISER STREQUAL "Address") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index 03ee5df8a..6e7ec2be4 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -197,6 +197,8 @@ class SnapshotData std::vector getDataCopy(uint32_t offset, size_t dataSize); + void remapToMemory(uint8_t* target); + void mapToMemory(uint8_t* target); void addMergeRegion(uint32_t offset, diff --git a/src/runner/FaabricMain.cpp b/src/runner/FaabricMain.cpp index 6517c53c5..e66152460 100644 --- a/src/runner/FaabricMain.cpp +++ b/src/runner/FaabricMain.cpp @@ -25,6 +25,8 @@ void FaabricMain::startBackground() // Crash handler faabric::util::setUpCrashHandler(); + PROF_BEGIN + // Start basics startRunner(); @@ -39,6 +41,8 @@ void FaabricMain::startBackground() // Work sharing startFunctionCallServer(); + + PROF_SUMMARY } void FaabricMain::startRunner() diff --git a/src/snapshot/SnapshotRegistry.cpp b/src/snapshot/SnapshotRegistry.cpp index bec466f8a..28f3b9925 100644 --- a/src/snapshot/SnapshotRegistry.cpp +++ b/src/snapshot/SnapshotRegistry.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -11,6 +12,7 @@ namespace faabric::snapshot { std::shared_ptr SnapshotRegistry::getSnapshot( const std::string& key) { + PROF_START(GetSnapshot) faabric::util::SharedLock lock(snapshotsMx); if (key.empty()) { @@ -23,6 +25,7 @@ std::shared_ptr SnapshotRegistry::getSnapshot( throw std::runtime_error("Snapshot doesn't exist"); } + PROF_END(GetSnapshot) return snapshotMap[key]; } @@ -34,17 +37,20 @@ bool SnapshotRegistry::snapshotExists(const std::string& key) void SnapshotRegistry::mapSnapshot(const std::string& key, uint8_t* target) { + PROF_START(MapSnapshot) auto d = getSnapshot(key); d->mapToMemory(target); // Reset dirty tracking otherwise whole mapped region is marked dirty faabric::util::resetDirtyTracking(); + PROF_END(MapSnapshot) } void SnapshotRegistry::registerSnapshot( const std::string& key, std::shared_ptr data) { + PROF_START(RegisterSnapshot) faabric::util::FullLock lock(snapshotsMx); SPDLOG_TRACE("Registering snapshot {} size {}", key, data->getSize()); @@ -53,6 +59,7 @@ void SnapshotRegistry::registerSnapshot( // Reset dirty tracking faabric::util::resetDirtyTracking(); + PROF_END(RegisterSnapshot) } void SnapshotRegistry::deleteSnapshot(const std::string& key) diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 5dc2bf0cf..9578e709b 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include @@ -239,6 +240,7 @@ void SnapshotData::fillGapsWithOverwriteRegions() void SnapshotData::mapToMemory(uint8_t* target) { + PROF_START(MapToMemory) faabric::util::FullLock lock(snapMx); if (fd <= 0) { @@ -248,6 +250,7 @@ void SnapshotData::mapToMemory(uint8_t* target) } mapMemoryPrivate({ target, size }, fd); + PROF_END(MapToMemory) } std::map SnapshotData::getMergeRegions() @@ -278,6 +281,7 @@ void SnapshotData::queueDiffs(const std::span diffs) void SnapshotData::writeQueuedDiffs() { + PROF_START(WriteQueuedDiffs) faabric::util::FullLock lock(snapMx); SPDLOG_DEBUG("Writing {} queued diffs to snapshot", queuedDiffs.size()); @@ -380,6 +384,7 @@ void SnapshotData::writeQueuedDiffs() // Clear queue queuedDiffs.clear(); + PROF_END(WriteQueuedDiffs) } MemoryView::MemoryView(std::span dataIn) @@ -388,6 +393,7 @@ MemoryView::MemoryView(std::span dataIn) std::vector MemoryView::getDirtyRegions() { + PROF_START(GetDirtyRegions) if (data.empty()) { return {}; } @@ -414,12 +420,14 @@ std::vector MemoryView::getDirtyRegions() data.subspan(regionBegin, regionEnd - regionBegin)); } + PROF_END(GetDirtyRegions) return diffs; } std::vector MemoryView::diffWithSnapshot( std::shared_ptr snap) { + PROF_START(DiffWithSnapshot) std::vector diffs; std::map mergeRegions = snap->getMergeRegions(); @@ -457,6 +465,7 @@ std::vector MemoryView::diffWithSnapshot( } } + PROF_END(DiffWithSnapshot) return diffs; } diff --git a/tasks/dev.py b/tasks/dev.py index 7a8601ef8..3559e1f1b 100644 --- a/tasks/dev.py +++ b/tasks/dev.py @@ -14,7 +14,9 @@ @task -def cmake(ctx, clean=False, shared=False, build="Debug", sanitiser="None"): +def cmake( + ctx, clean=False, shared=False, build="Debug", sanitiser="None", prof=False +): """ Configures the build """ @@ -41,6 +43,7 @@ def cmake(ctx, clean=False, shared=False, build="Debug", sanitiser="None"): "-DCMAKE_CXX_COMPILER=/usr/bin/clang++-13", "-DCMAKE_C_COMPILER=/usr/bin/clang-13", "-DFAABRIC_USE_SANITISER={}".format(sanitiser), + "-DFAABRIC_SELF_TRACING=ON" if prof else "", PROJ_ROOT, ] From 54b440346774c9d883cf282de80094b3db00f660 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Wed, 29 Dec 2021 11:07:53 +0000 Subject: [PATCH 23/24] Use spans --- include/faabric/snapshot/SnapshotRegistry.h | 2 - include/faabric/util/memory.h | 2 + include/faabric/util/snapshot.h | 4 +- src/snapshot/SnapshotRegistry.cpp | 13 ---- src/util/memory.cpp | 5 ++ src/util/snapshot.cpp | 22 ++++--- tests/dist/DistTestExecutor.cpp | 4 +- tests/dist/scheduler/functions.cpp | 2 +- tests/test/scheduler/test_executor.cpp | 4 +- tests/test/snapshot/test_snapshot_diffs.cpp | 13 ++-- .../test/snapshot/test_snapshot_registry.cpp | 12 ++-- tests/test/util/test_memory.cpp | 44 +++++++++++++- tests/test/util/test_snapshot.cpp | 60 +++++++++---------- 13 files changed, 112 insertions(+), 75 deletions(-) diff --git a/include/faabric/snapshot/SnapshotRegistry.h b/include/faabric/snapshot/SnapshotRegistry.h index bf8318d70..901de02b0 100644 --- a/include/faabric/snapshot/SnapshotRegistry.h +++ b/include/faabric/snapshot/SnapshotRegistry.h @@ -20,8 +20,6 @@ class SnapshotRegistry bool snapshotExists(const std::string& key); - void mapSnapshot(const std::string& key, uint8_t* target); - void registerSnapshot(const std::string& key, std::shared_ptr data); diff --git a/include/faabric/util/memory.h b/include/faabric/util/memory.h index 510c7acb5..59000ca1f 100644 --- a/include/faabric/util/memory.h +++ b/include/faabric/util/memory.h @@ -51,6 +51,8 @@ std::vector> getDirtyRegions(const uint8_t* ptr, // ------------------------- typedef std::unique_ptr> MemoryRegion; +MemoryRegion allocatePrivateMemory(size_t size); + MemoryRegion allocateSharedMemory(size_t size); MemoryRegion allocateVirtualMemory(size_t size); diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index 6e7ec2be4..43b50a5cf 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -197,9 +197,7 @@ class SnapshotData std::vector getDataCopy(uint32_t offset, size_t dataSize); - void remapToMemory(uint8_t* target); - - void mapToMemory(uint8_t* target); + void mapToMemory(std::span target); void addMergeRegion(uint32_t offset, size_t length, diff --git a/src/snapshot/SnapshotRegistry.cpp b/src/snapshot/SnapshotRegistry.cpp index 28f3b9925..50b69990f 100644 --- a/src/snapshot/SnapshotRegistry.cpp +++ b/src/snapshot/SnapshotRegistry.cpp @@ -35,22 +35,10 @@ bool SnapshotRegistry::snapshotExists(const std::string& key) return snapshotMap.find(key) != snapshotMap.end(); } -void SnapshotRegistry::mapSnapshot(const std::string& key, uint8_t* target) -{ - PROF_START(MapSnapshot) - auto d = getSnapshot(key); - d->mapToMemory(target); - - // Reset dirty tracking otherwise whole mapped region is marked dirty - faabric::util::resetDirtyTracking(); - PROF_END(MapSnapshot) -} - void SnapshotRegistry::registerSnapshot( const std::string& key, std::shared_ptr data) { - PROF_START(RegisterSnapshot) faabric::util::FullLock lock(snapshotsMx); SPDLOG_TRACE("Registering snapshot {} size {}", key, data->getSize()); @@ -59,7 +47,6 @@ void SnapshotRegistry::registerSnapshot( // Reset dirty tracking faabric::util::resetDirtyTracking(); - PROF_END(RegisterSnapshot) } void SnapshotRegistry::deleteSnapshot(const std::string& key) diff --git a/src/util/memory.cpp b/src/util/memory.cpp index 35ef062a6..bf2df6ada 100644 --- a/src/util/memory.cpp +++ b/src/util/memory.cpp @@ -198,6 +198,11 @@ MemoryRegion doAlloc(size_t size, int prot, int flags) return mem; } +MemoryRegion allocatePrivateMemory(size_t size) +{ + return doAlloc(size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS); +} + MemoryRegion allocateSharedMemory(size_t size) { return doAlloc(size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS); diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 9578e709b..06b2ec868 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -238,19 +238,23 @@ void SnapshotData::fillGapsWithOverwriteRegions() } } -void SnapshotData::mapToMemory(uint8_t* target) +void SnapshotData::mapToMemory(std::span target) { - PROF_START(MapToMemory) - faabric::util::FullLock lock(snapMx); + // Note we only need a shared lock here as we are not modifying data and the + // OS will handle synchronisation of the mapping itself + PROF_START(MapSnapshot) + faabric::util::SharedLock lock(snapMx); - if (fd <= 0) { - std::string msg = "Attempting to map memory of non-restorable snapshot"; - SPDLOG_ERROR(msg); - throw std::runtime_error(msg); + if (target.size() > size) { + SPDLOG_ERROR("Mapping target memory larger than snapshot ({} > {})", + target.size(), + size); + throw std::runtime_error("Target memory larger than snapshot"); } - mapMemoryPrivate({ target, size }, fd); - PROF_END(MapToMemory) + faabric::util::mapMemoryPrivate(target, fd); + + PROF_END(MapSnapshot) } std::map SnapshotData::getMergeRegions() diff --git a/tests/dist/DistTestExecutor.cpp b/tests/dist/DistTestExecutor.cpp index 12622f49c..c7f337a40 100644 --- a/tests/dist/DistTestExecutor.cpp +++ b/tests/dist/DistTestExecutor.cpp @@ -72,7 +72,7 @@ void DistTestExecutor::restore(faabric::Message& msg) setUpDummyMemory(snap->getSize()); - reg.mapSnapshot(msg.snapshotkey(), dummyMemory.get()); + snap->mapToMemory({ dummyMemory.get(), dummyMemorySize }); } faabric::util::MemoryView DistTestExecutor::getMemoryView() @@ -89,7 +89,7 @@ void DistTestExecutor::setUpDummyMemory(size_t memSize) { if (dummyMemory.get() == nullptr) { SPDLOG_DEBUG("Dist test executor initialising memory size {}", memSize); - dummyMemory = faabric::util::allocateSharedMemory(memSize); + dummyMemory = faabric::util::allocatePrivateMemory(memSize); dummyMemorySize = memSize; } } diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index c46f85002..23765fd0d 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -323,7 +323,7 @@ int handleReductionFunction(tests::DistTestExecutor* exec, snap->writeQueuedDiffs(); // Remap memory to snapshot - snap->mapToMemory(exec->getDummyMemory().data()); + snap->mapToMemory(exec->getDummyMemory()); uint8_t* reductionAPtr = exec->getDummyMemory().data() + reductionAOffset; diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 250b5f733..3cda020ec 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -49,8 +49,8 @@ void TestExecutor::restore(faabric::Message& msg) auto snap = reg.getSnapshot(msg.snapshotkey()); dummyMemorySize = snap->getSize(); - dummyMemory = faabric::util::allocateSharedMemory(snap->getSize()); - reg.mapSnapshot(msg.snapshotkey(), dummyMemory.get()); + dummyMemory = faabric::util::allocatePrivateMemory(snap->getSize()); + snap->mapToMemory({ dummyMemory.get(), dummyMemorySize }); } faabric::util::MemoryView TestExecutor::getMemoryView() diff --git a/tests/test/snapshot/test_snapshot_diffs.cpp b/tests/test/snapshot/test_snapshot_diffs.cpp index 05ac925f8..066b01ad3 100644 --- a/tests/test/snapshot/test_snapshot_diffs.cpp +++ b/tests/test/snapshot/test_snapshot_diffs.cpp @@ -36,9 +36,13 @@ TEST_CASE_METHOD(SnapshotTestFixture, int sharedMemPages = 8; size_t sharedMemSize = sharedMemPages * HOST_PAGE_SIZE; - MemoryRegion sharedMem = allocateSharedMemory(sharedMemSize); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + // Check we can write to shared mem + sharedMem[0] = 1; + + // Map to the snapshot + snap->mapToMemory({ sharedMem.get(), snapSize }); // Make various changes sharedMem[0] = 1; @@ -64,11 +68,10 @@ TEST_CASE_METHOD(SnapshotTestFixture, "Test snapshot diffs", "[snapshot]") // Make shared memory larger than original snapshot int sharedMemPages = 8; size_t sharedMemSize = sharedMemPages * HOST_PAGE_SIZE; - MemoryRegion sharedMem = - allocateSharedMemory(sharedMemPages * HOST_PAGE_SIZE); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); // Map the snapshot to the start of the memory - reg.mapSnapshot(snapKey, sharedMem.get()); + snap->mapToMemory({ sharedMem.get(), snapSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); diff --git a/tests/test/snapshot/test_snapshot_registry.cpp b/tests/test/snapshot/test_snapshot_registry.cpp index dd3c4ed0e..1b15d2c73 100644 --- a/tests/test/snapshot/test_snapshot_registry.cpp +++ b/tests/test/snapshot/test_snapshot_registry.cpp @@ -67,13 +67,15 @@ TEST_CASE_METHOD(SnapshotTestFixture, REQUIRE(actualC->getDataPtr() == snapC->getDataPtr()); // Create regions onto which we will map the snapshots - MemoryRegion actualDataA = allocateSharedMemory(1 * HOST_PAGE_SIZE); - MemoryRegion actualDataB = allocateSharedMemory(2 * HOST_PAGE_SIZE); - MemoryRegion actualDataC = allocateSharedMemory(3 * HOST_PAGE_SIZE); + size_t sizeA = HOST_PAGE_SIZE; + size_t sizeC = 3 * HOST_PAGE_SIZE; + MemoryRegion actualDataA = allocatePrivateMemory(sizeA); + MemoryRegion actualDataB = allocatePrivateMemory(2 * HOST_PAGE_SIZE); + MemoryRegion actualDataC = allocatePrivateMemory(sizeC); // Map two of them - reg.mapSnapshot(keyA, actualDataA.get()); - reg.mapSnapshot(keyC, actualDataC.get()); + snapA->mapToMemory({ actualDataA.get(), sizeA }); + snapC->mapToMemory({ actualDataC.get(), sizeC }); // Here we need to check the actual data after mapping std::vector vecDataA = snapA->getDataCopy(); diff --git a/tests/test/util/test_memory.cpp b/tests/test/util/test_memory.cpp index d49c3684b..d23ffe9cd 100644 --- a/tests/test/util/test_memory.cpp +++ b/tests/test/util/test_memory.cpp @@ -402,7 +402,7 @@ TEST_CASE("Test mapping memory", "[util]") writeToFd(fd, 0, { vMem.get(), chunk.size() }); // Map some new memory to this fd - MemoryRegion memA = allocateSharedMemory(chunk.size()); + MemoryRegion memA = allocatePrivateMemory(chunk.size()); mapMemoryPrivate({ memA.get(), chunk.size() }, fd); std::vector memAData(memA.get(), memA.get() + chunk.size()); @@ -417,7 +417,7 @@ TEST_CASE("Test mapping memory", "[util]") appendDataToFd(fd, { chunkB.data(), chunkB.size() }); // Map a region to both chunks - MemoryRegion memB = allocateSharedMemory(chunk.size() + chunkB.size()); + MemoryRegion memB = allocatePrivateMemory(chunk.size() + chunkB.size()); mapMemoryPrivate({ memB.get(), chunk.size() + chunkB.size() }, fd); // Check region now contains both bits of data @@ -433,7 +433,7 @@ TEST_CASE("Test mapping memory", "[util]") TEST_CASE("Test mapping memory fails with invalid fd", "[util]") { size_t memSize = 10 * HOST_PAGE_SIZE; - MemoryRegion sharedMem = allocateSharedMemory(memSize); + MemoryRegion sharedMem = allocatePrivateMemory(memSize); int fd = 0; SECTION("Zero fd") { fd = 0; } @@ -442,4 +442,42 @@ TEST_CASE("Test mapping memory fails with invalid fd", "[util]") REQUIRE_THROWS(mapMemoryPrivate({ sharedMem.get(), memSize }, fd)); } + +TEST_CASE("Test remapping memory", "[util]") +{ + // Set up some data + size_t dataSize = 10 * HOST_PAGE_SIZE; + std::vector expectedData(dataSize, 3); + + // Write this to a file descriptor + int fd = createFd(expectedData.size(), "foobar"); + writeToFd(fd, 0, { expectedData.data(), expectedData.size() }); + + // Map some new memory to this fd + MemoryRegion mappedMem = allocatePrivateMemory(dataSize); + mapMemoryPrivate({ mappedMem.get(), dataSize }, fd); + + std::vector actualData(mappedMem.get(), + mappedMem.get() + dataSize); + REQUIRE(actualData == expectedData); + + // Modify the memory + std::vector update(100, 4); + size_t updateOffset = HOST_PAGE_SIZE + 10; + std::memcpy(mappedMem.get() + updateOffset, update.data(), update.size()); + + // Spot check to make sure update has been made + REQUIRE(*(mappedMem.get() + (updateOffset + 5)) == (uint8_t)4); + + // Remap + mapMemoryPrivate({ mappedMem.get(), dataSize }, fd); + + // Spot check to make sure update has been removed + REQUIRE(*(mappedMem.get() + (updateOffset + 5)) == (uint8_t)3); + + // Check all data + std::vector actualDataAfter(mappedMem.get(), + mappedMem.get() + dataSize); + REQUIRE(actualDataAfter == expectedData); +} } diff --git a/tests/test/util/test_snapshot.cpp b/tests/test/util/test_snapshot.cpp index 67382b1bd..837d0606b 100644 --- a/tests/test/util/test_snapshot.cpp +++ b/tests/test/util/test_snapshot.cpp @@ -62,7 +62,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::vector dataA(100, 1); std::vector dataB(2 * HOST_PAGE_SIZE, 2); - MemoryRegion memB = allocateSharedMemory(dataB.size()); + MemoryRegion memB = allocatePrivateMemory(dataB.size()); std::memcpy(memB.get(), dataB.data(), dataB.size()); std::vector dataC(sizeof(int), 3); @@ -141,8 +141,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Set up shared memory int sharedMemPages = 20; - MemoryRegion sharedMem = - allocateSharedMemory(sharedMemPages * HOST_PAGE_SIZE); + size_t sharedMemSize = sharedMemPages * HOST_PAGE_SIZE; + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); // Check it's zeroed std::vector expectedInitial(snap->getSize(), 0); @@ -151,7 +151,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, REQUIRE(actualSharedMemBefore == expectedInitial); // Map the snapshot and check again - snap->mapToMemory(sharedMem.get()); + snap->mapToMemory({ sharedMem.get(), snap->getSize() }); std::vector actualSharedMemAfter( sharedMem.get(), sharedMem.get() + snap->getSize()); REQUIRE(actualSharedMemAfter == actualSnapMem); @@ -179,12 +179,12 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::vector expectedSnapMem = snap->getDataCopy(); // Set up two shared mem regions - MemoryRegion sharedMemA = allocateSharedMemory(snapSize); - MemoryRegion sharedMemB = allocateSharedMemory(snapSize); + MemoryRegion sharedMemA = allocatePrivateMemory(snapSize); + MemoryRegion sharedMemB = allocatePrivateMemory(snapSize); // Map the snapshot and both regions reflect the change - snap->mapToMemory(sharedMemA.get()); - snap->mapToMemory(sharedMemB.get()); + snap->mapToMemory({ sharedMemA.get(), snapSize }); + snap->mapToMemory({ sharedMemB.get(), snapSize }); REQUIRE(std::vector(sharedMemA.get(), sharedMemA.get() + snapSize) == expectedSnapMem); @@ -272,8 +272,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, REQUIRE(originalData.size() == originalSize); // Map to some other region of memory large enough for the extended version - MemoryRegion sharedMem = allocateSharedMemory(expandedSize); - snap->mapToMemory(sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(expandedSize); + snap->mapToMemory({ sharedMem.get(), originalSize }); // Add some data to the extended region. Check the snapshot extends to fit std::vector dataC(300, 5); @@ -290,7 +290,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, REQUIRE(snap->getSize() == expectedSizeB); // Remap to shared memory - snap->mapToMemory(sharedMem.get()); + snap->mapToMemory({ sharedMem.get(), snap->getSize() }); // Check mapped region matches std::vector actualData = snap->getDataCopy(); @@ -331,8 +331,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, reg.registerSnapshot(snapKey, snap); // Map the snapshot to some memory - MemoryRegion sharedMem = allocateSharedMemory(memSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(memSize); + snap->mapToMemory({ sharedMem.get(), memSize }); // Check mapping works int* intA = (int*)(sharedMem.get() + intAOffset); @@ -449,9 +449,9 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Map the snapshot to some memory size_t sharedMemSize = snapPages * HOST_PAGE_SIZE; - MemoryRegion sharedMem = allocateSharedMemory(snapPages * HOST_PAGE_SIZE); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + snap->mapToMemory({ sharedMem.get(), sharedMemSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); @@ -839,9 +839,9 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Map the snapshot to some memory size_t sharedMemSize = snapPages * HOST_PAGE_SIZE; - MemoryRegion sharedMem = allocateSharedMemory(snapPages * HOST_PAGE_SIZE); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + snap->mapToMemory({ sharedMem.get(), sharedMemSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); @@ -913,8 +913,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Map the snapshot size_t sharedMemSize = snapPages * HOST_PAGE_SIZE; - MemoryRegion sharedMem = allocateSharedMemory(snapPages * HOST_PAGE_SIZE); - reg.mapSnapshot(snapKey, sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); + snap->mapToMemory({ sharedMem.get(), sharedMemSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); @@ -1003,8 +1003,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, reg.registerSnapshot(snapKey, snap); // Map the snapshot - MemoryRegion sharedMem = allocateSharedMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); + snap->mapToMemory({ sharedMem.get(), sharedMemSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); @@ -1194,8 +1194,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, reg.registerSnapshot(snapKey, snap); // Map the snapshot - MemoryRegion sharedMem = allocateSharedMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); + snap->mapToMemory({ sharedMem.get(), sharedMemSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); @@ -1274,8 +1274,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, reg.registerSnapshot(snapKey, snap); // Map the snapshot - MemoryRegion sharedMem = allocateSharedMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); + snap->mapToMemory({ sharedMem.get(), snapSize }); faabric::util::resetDirtyTracking(); uint32_t changeStartPage = 0; @@ -1450,8 +1450,8 @@ TEST_CASE("Test snapshot mapped memory diffs", "[snapshot][util]") snap->copyInData(dataA); // Map some memory - MemoryRegion memA = allocateSharedMemory(snapSize); - snap->mapToMemory(memA.get()); + MemoryRegion memA = allocatePrivateMemory(snapSize); + snap->mapToMemory({ memA.get(), snapSize }); faabric::util::resetDirtyTracking(); @@ -1501,8 +1501,8 @@ TEST_CASE("Test snapshot mapped memory diffs", "[snapshot][util]") REQUIRE(dirtyRegionData == expectedDirtyRegionData); // Map more memory from the snapshot, check it contains all updates - MemoryRegion memB = allocateSharedMemory(snapSize); - snap->mapToMemory(memB.get()); + MemoryRegion memB = allocatePrivateMemory(snapSize); + snap->mapToMemory({ memB.get(), snapSize }); std::vector expectedFinal(snapSize, 0); std::memcpy(expectedFinal.data() + offsetA, dataA.data(), dataA.size()); std::memcpy(expectedFinal.data() + offsetB, dataB.data(), dataB.size()); @@ -1512,7 +1512,7 @@ TEST_CASE("Test snapshot mapped memory diffs", "[snapshot][util]") REQUIRE(actualMemB == expectedFinal); // Remap first memory and check this also contains all updates - snap->mapToMemory(memA.get()); + snap->mapToMemory({ memA.get(), snapSize }); std::vector remappedMemA(memB.get(), memB.get() + snapSize); REQUIRE(remappedMemA == expectedFinal); } From 58ca527c42292c2c3e261bf4523fc1ce02145e53 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Wed, 29 Dec 2021 11:34:36 +0000 Subject: [PATCH 24/24] Use static initialised for resetting dirty pages --- src/util/memory.cpp | 56 ++++++++++++++++++++++++++++++------------- src/util/snapshot.cpp | 3 +++ 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/src/util/memory.cpp b/src/util/memory.cpp index bf2df6ada..bbcd6b1c9 100644 --- a/src/util/memory.cpp +++ b/src/util/memory.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -80,27 +81,50 @@ AlignedChunk getPageAlignedChunk(long offset, long length) // Dirty page tracking // ------------------------- -void resetDirtyTracking() +class ClearRefsWrapper { - SPDLOG_DEBUG("Resetting dirty tracking"); - - FILE* fd = fopen(CLEAR_REFS, "w"); - if (fd == nullptr) { - SPDLOG_ERROR("Could not open clear_refs ({})", strerror(errno)); - throw std::runtime_error("Could not open clear_refs"); + public: + ClearRefsWrapper() + { + f = ::fopen(CLEAR_REFS, "w"); + if (f == nullptr) { + SPDLOG_ERROR("Could not open clear_refs ({})", strerror(errno)); + throw std::runtime_error("Could not open clear_refs"); + } } - // Write 4 to the file to track from now on - // https://www.kernel.org/doc/html/v5.4/admin-guide/mm/soft-dirty.html - char value[] = "4"; - size_t nWritten = fwrite(value, sizeof(char), 1, fd); - if (nWritten != 1) { - SPDLOG_ERROR("Failed to write to clear_refs ({})", nWritten); - fclose(fd); - throw std::runtime_error("Failed to write to clear_refs"); + ~ClearRefsWrapper() { ::fclose(f); } + + void reset() + { + // Write 4 to the file to track from now on + // https://www.kernel.org/doc/html/v5.4/admin-guide/mm/soft-dirty.html + char value[] = "4"; + size_t nWritten = ::fwrite(value, sizeof(char), 1, f); + + if (nWritten != 1) { + SPDLOG_ERROR("Failed to write to clear_refs ({})", nWritten); + ::fclose(f); + throw std::runtime_error("Failed to write to clear_refs"); + } + + ::rewind(f); } - fclose(fd); + private: + FILE* f = nullptr; +}; + +void resetDirtyTracking() +{ + static ClearRefsWrapper wrap; + + PROF_START(ResetDirty) + + SPDLOG_DEBUG("Resetting dirty tracking"); + wrap.reset(); + + PROF_END(ResetDirty) } std::vector readPagemapEntries(uintptr_t ptr, int nEntries) diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 06b2ec868..8b8bf378e 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -254,6 +254,9 @@ void SnapshotData::mapToMemory(std::span target) faabric::util::mapMemoryPrivate(target, fd); + // Reset dirty tracking otherwise whole mapped region is marked dirty + faabric::util::resetDirtyTracking(); + PROF_END(MapSnapshot) }