diff --git a/CMakeLists.txt b/CMakeLists.txt index 765ec9a58..cd0cd9e91 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,6 +3,7 @@ project(faabric) option(FAABRIC_WASM_BUILD "Build Faabric wasm library" OFF) option(FAABRIC_BUILD_TESTS "Build Faabric tests" ON) +option(FAABRIC_SELF_TRACING "Turn on system tracing using the logger" OFF) # Enable colorized compiler output if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU") @@ -22,6 +23,11 @@ set(CMAKE_EXE_LINKER_FLAGS "-fuse-ld=lld") # Compile comamnds for clang tools set(CMAKE_EXPORT_COMPILE_COMMANDS ON) +if(${FAABRIC_SELF_TRACING}) + message("-- Activated Faabric self-tracing") + add_definitions(-DTRACE_ALL=1) +endif() + # Set-up use of sanitisers if (FAABRIC_USE_SANITISER STREQUAL "Address") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") diff --git a/include/faabric/snapshot/SnapshotClient.h b/include/faabric/snapshot/SnapshotClient.h index 5c88b7cee..af81dc2bf 100644 --- a/include/faabric/snapshot/SnapshotClient.h +++ b/include/faabric/snapshot/SnapshotClient.h @@ -38,9 +38,13 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient void pushSnapshot(const std::string& key, std::shared_ptr data); + void pushSnapshotUpdate( + std::string snapshotKey, + const std::shared_ptr& data, + const std::vector& diffs); + void pushSnapshotDiffs( std::string snapshotKey, - bool force, const std::vector& diffs); void deleteSnapshot(const std::string& key); @@ -49,5 +53,10 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient private: void sendHeader(faabric::snapshot::SnapshotCalls call); + + void doPushSnapshotDiffs( + const std::string& snapshotKey, + const std::shared_ptr& data, + const std::vector& diffs); }; } diff --git a/include/faabric/snapshot/SnapshotRegistry.h b/include/faabric/snapshot/SnapshotRegistry.h index 7d6952a57..901de02b0 100644 --- a/include/faabric/snapshot/SnapshotRegistry.h +++ b/include/faabric/snapshot/SnapshotRegistry.h @@ -20,15 +20,9 @@ class SnapshotRegistry bool snapshotExists(const std::string& key); - void mapSnapshot(const std::string& key, uint8_t* target); - void registerSnapshot(const std::string& key, std::shared_ptr data); - void registerSnapshotIfNotExists( - const std::string& key, - std::shared_ptr data); - void deleteSnapshot(const std::string& key); size_t getSnapshotCount(); @@ -44,10 +38,6 @@ class SnapshotRegistry int writeSnapshotToFd(const std::string& key, faabric::util::SnapshotData& data); - - void doRegisterSnapshot(const std::string& key, - std::shared_ptr data, - bool overwrite); }; SnapshotRegistry& getSnapshotRegistry(); diff --git a/include/faabric/util/memory.h b/include/faabric/util/memory.h index 510c7acb5..59000ca1f 100644 --- a/include/faabric/util/memory.h +++ b/include/faabric/util/memory.h @@ -51,6 +51,8 @@ std::vector> getDirtyRegions(const uint8_t* ptr, // ------------------------- typedef std::unique_ptr> MemoryRegion; +MemoryRegion allocatePrivateMemory(size_t size); + MemoryRegion allocateSharedMemory(size_t size); MemoryRegion allocateVirtualMemory(size_t size); diff --git a/include/faabric/util/snapshot.h b/include/faabric/util/snapshot.h index 7cd6e9130..43b50a5cf 100644 --- a/include/faabric/util/snapshot.h +++ b/include/faabric/util/snapshot.h @@ -17,7 +17,11 @@ namespace faabric::util { enum SnapshotDataType { Raw, - Int + Bool, + Int, + Long, + Float, + Double }; enum SnapshotMergeOperation @@ -27,7 +31,8 @@ enum SnapshotMergeOperation Product, Subtract, Max, - Min + Min, + Ignore }; class SnapshotDiff @@ -65,14 +70,106 @@ class SnapshotMergeRegion SnapshotDataType dataType = SnapshotDataType::Raw; SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite; + SnapshotMergeRegion() = default; + + SnapshotMergeRegion(uint32_t offsetIn, + size_t lengthIn, + SnapshotDataType dataTypeIn, + SnapshotMergeOperation operationIn); + void addDiffs(std::vector& diffs, - const uint8_t* original, - uint32_t originalSize, - const uint8_t* updated, - uint32_t dirtyRegionStart, - uint32_t dirtyRegionEnd); + std::span originalData, + std::span updatedData, + std::pair dirtyRange); + + private: + void addOverwriteDiff(std::vector& diffs, + std::span original, + std::span updated, + std::pair dirtyRange); }; +template +inline bool calculateDiffValue(const uint8_t* original, + uint8_t* updated, + SnapshotMergeOperation operation) +{ + // Cast to value + T updatedValue = unalignedRead(updated); + T originalValue = unalignedRead(original); + + // Skip if no change + if (originalValue == updatedValue) { + return false; + } + + // Work out final result + switch (operation) { + case (SnapshotMergeOperation::Sum): { + // Sums must send the value to be _added_, and + // not the final result + updatedValue -= originalValue; + break; + } + case (SnapshotMergeOperation::Subtract): { + // Subtractions must send the value to be + // subtracted, not the result + updatedValue = originalValue - updatedValue; + break; + } + case (SnapshotMergeOperation::Product): { + // Products must send the value to be + // multiplied, not the result + updatedValue /= originalValue; + break; + } + case (SnapshotMergeOperation::Max): + case (SnapshotMergeOperation::Min): + // Min and max don't need to change + break; + default: { + SPDLOG_ERROR("Can't calculate diff for operation: {}", operation); + throw std::runtime_error("Can't calculate diff"); + } + } + + unalignedWrite(updatedValue, updated); + + return true; +} + +template +inline T applyDiffValue(const uint8_t* original, + const uint8_t* diff, + SnapshotMergeOperation operation) +{ + + auto diffValue = unalignedRead(diff); + T originalValue = unalignedRead(original); + + switch (operation) { + case (SnapshotMergeOperation::Sum): { + return diffValue + originalValue; + } + case (SnapshotMergeOperation::Subtract): { + return originalValue - diffValue; + } + case (SnapshotMergeOperation::Product): { + return originalValue * diffValue; + } + case (SnapshotMergeOperation::Max): { + return std::max(originalValue, diffValue); + } + case (SnapshotMergeOperation::Min): { + return std::min(originalValue, diffValue); + } + default: { + SPDLOG_ERROR("Can't apply merge operation: {}", operation); + throw std::runtime_error("Can't apply merge operation"); + } + } +} + class SnapshotData { public: @@ -100,7 +197,7 @@ class SnapshotData std::vector getDataCopy(uint32_t offset, size_t dataSize); - void mapToMemory(uint8_t* target); + void mapToMemory(std::span target); void addMergeRegion(uint32_t offset, size_t length, @@ -108,6 +205,8 @@ class SnapshotData SnapshotMergeOperation operation, bool overwrite = false); + void fillGapsWithOverwriteRegions(); + void clearMergeRegions(); std::map getMergeRegions(); diff --git a/src/flat/faabric.fbs b/src/flat/faabric.fbs index 1f7a7ef5e..938d2160e 100644 --- a/src/flat/faabric.fbs +++ b/src/flat/faabric.fbs @@ -1,24 +1,33 @@ +table SnapshotMergeRegionRequest { + offset:int; + length:ulong; + data_type:int; + merge_op:int; +} + table SnapshotPushRequest { key:string; - maxSize:ulong; + max_size:ulong; contents:[ubyte]; + merge_regions:[SnapshotMergeRegionRequest]; } table SnapshotDeleteRequest { key:string; } -table SnapshotDiffChunk { +table SnapshotDiffRequest { offset:int; - dataType:int; - mergeOp:int; + data_type:int; + merge_op:int; data:[ubyte]; } table SnapshotDiffPushRequest { key:string; force:bool; - chunks:[SnapshotDiffChunk]; + merge_regions:[SnapshotMergeRegionRequest]; + diffs:[SnapshotDiffRequest]; } table ThreadResultRequest { diff --git a/src/runner/FaabricMain.cpp b/src/runner/FaabricMain.cpp index 6517c53c5..e66152460 100644 --- a/src/runner/FaabricMain.cpp +++ b/src/runner/FaabricMain.cpp @@ -25,6 +25,8 @@ void FaabricMain::startBackground() // Crash handler faabric::util::setUpCrashHandler(); + PROF_BEGIN + // Start basics startRunner(); @@ -39,6 +41,8 @@ void FaabricMain::startBackground() // Work sharing startFunctionCallServer(); + + PROF_SUMMARY } void FaabricMain::startRunner() diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index dbe5767d8..1c9c00a03 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -298,11 +298,15 @@ void Executor::threadPoolThread(int threadPoolIdx) SPDLOG_TRACE("Diffing memory with pre-execution snapshot for {}", msg.snapshotkey()); - // If we're on master, we write the diffs straight to the snapshot - // otherwise we push them to the master. + // Fill gaps with overwrites + snap->fillGapsWithOverwriteRegions(); + + // Work out the diffs std::vector diffs = funcMemory.diffWithSnapshot(snap); + // On master we queue the diffs locally directly, on a remote host + // we push them back to master if (isMaster) { SPDLOG_DEBUG("Queueing {} diffs for {} to snapshot {} on " "master (group {})", @@ -314,11 +318,12 @@ void Executor::threadPoolThread(int threadPoolIdx) snap->queueDiffs(diffs); } else { sch.pushSnapshotDiffs(msg, diffs); - - // Reset dirty page tracking on non-master - faabric::util::resetDirtyTracking(); } + // Reset dirty page tracking + faabric::util::resetDirtyTracking(); + + // Clear merge regions SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey()); snap->clearMergeRegions(); } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 403395e95..23a5d30ed 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -508,7 +508,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( std::vector snapshotDiffs = snapMemView.getDirtyRegions(); - c.pushSnapshotDiffs(snapshotKey, true, snapshotDiffs); + c.pushSnapshotUpdate(snapshotKey, snap, snapshotDiffs); } else { c.pushSnapshot(snapshotKey, snap); pushedSnapshotsMap[snapshotKey].insert(host); @@ -916,7 +916,7 @@ void Scheduler::pushSnapshotDiffs( } SnapshotClient& c = getSnapshotClient(msg.masterhost()); - c.pushSnapshotDiffs(snapKey, false, diffs); + c.pushSnapshotDiffs(snapKey, diffs); } void Scheduler::setThreadResultLocally(uint32_t msgId, int32_t returnValue) diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index dd8108c4d..92fb6126d 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -93,11 +93,25 @@ void SnapshotClient::pushSnapshot( // Set up the main request // TODO - avoid copying data here? flatbuffers::FlatBufferBuilder mb; + + std::vector> + mrsFbVector; + mrsFbVector.reserve(data->getMergeRegions().size()); + for (const auto& m : data->getMergeRegions()) { + auto mr = CreateSnapshotMergeRegionRequest(mb, + m.second.offset, + m.second.length, + m.second.dataType, + m.second.operation); + mrsFbVector.push_back(mr); + } + auto keyOffset = mb.CreateString(key); auto dataOffset = mb.CreateVector(data->getDataPtr(), data->getSize()); + auto mrsOffset = mb.CreateVector(mrsFbVector); auto requestOffset = CreateSnapshotPushRequest( - mb, keyOffset, data->getMaxSize(), dataOffset); + mb, keyOffset, data->getMaxSize(), dataOffset, mrsOffset); mb.Finish(requestOffset); // Send it @@ -105,39 +119,84 @@ void SnapshotClient::pushSnapshot( } } +void SnapshotClient::pushSnapshotUpdate( + std::string snapshotKey, + const std::shared_ptr& data, + const std::vector& diffs) +{ + SPDLOG_DEBUG("Pushing update to snapshot {} to {} ({} diffs, {} regions)", + snapshotKey, + host, + diffs.size(), + data->getMergeRegions().size()); + + doPushSnapshotDiffs(snapshotKey, data, diffs); +} + void SnapshotClient::pushSnapshotDiffs( std::string snapshotKey, - bool force, + const std::vector& diffs) +{ + SPDLOG_DEBUG("Pushing {} diffs for snapshot {} to {}", + diffs.size(), + snapshotKey, + host); + + doPushSnapshotDiffs(snapshotKey, nullptr, diffs); +} + +void SnapshotClient::doPushSnapshotDiffs( + const std::string& snapshotKey, + const std::shared_ptr& data, const std::vector& diffs) { if (faabric::util::isMockMode()) { faabric::util::UniqueLock lock(mockMutex); snapshotDiffPushes.emplace_back(host, diffs); } else { - SPDLOG_DEBUG("Pushing {} diffs for snapshot {} to {}", - diffs.size(), - snapshotKey, - host); - flatbuffers::FlatBufferBuilder mb; - // Create objects for all the chunks - std::vector> diffsFbVector; + // Create objects for all the diffs + std::vector> diffsFbVector; + diffsFbVector.reserve(diffs.size()); for (const auto& d : diffs) { std::span diffData = d.getData(); auto dataOffset = mb.CreateVector(diffData.data(), diffData.size()); - auto chunk = CreateSnapshotDiffChunk( + auto diff = CreateSnapshotDiffRequest( mb, d.getOffset(), d.getDataType(), d.getOperation(), dataOffset); - diffsFbVector.push_back(chunk); + diffsFbVector.push_back(diff); + } + + // If we have snapshot data, we need to include the merge regions and + // force too. + std::vector> + mrsFbVector; + bool force = false; + if (data != nullptr) { + mrsFbVector.reserve(data->getMergeRegions().size()); + for (const auto& m : data->getMergeRegions()) { + auto mr = CreateSnapshotMergeRegionRequest(mb, + m.second.offset, + m.second.length, + m.second.dataType, + m.second.operation); + mrsFbVector.push_back(mr); + } + + force = true; + } else { + force = false; } - // Set up the request auto keyOffset = mb.CreateString(snapshotKey); auto diffsOffset = mb.CreateVector(diffsFbVector); - auto requestOffset = - CreateSnapshotDiffPushRequest(mb, keyOffset, force, diffsOffset); + auto mrsOffset = mb.CreateVector(mrsFbVector); + + auto requestOffset = CreateSnapshotDiffPushRequest( + mb, keyOffset, force, mrsOffset, diffsOffset); + mb.Finish(requestOffset); SEND_FB_MSG(SnapshotCalls::PushSnapshotDiffs, mb); diff --git a/src/snapshot/SnapshotRegistry.cpp b/src/snapshot/SnapshotRegistry.cpp index 9175cdcdb..50b69990f 100644 --- a/src/snapshot/SnapshotRegistry.cpp +++ b/src/snapshot/SnapshotRegistry.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -11,6 +12,7 @@ namespace faabric::snapshot { std::shared_ptr SnapshotRegistry::getSnapshot( const std::string& key) { + PROF_START(GetSnapshot) faabric::util::SharedLock lock(snapshotsMx); if (key.empty()) { @@ -23,6 +25,7 @@ std::shared_ptr SnapshotRegistry::getSnapshot( throw std::runtime_error("Snapshot doesn't exist"); } + PROF_END(GetSnapshot) return snapshotMap[key]; } @@ -32,41 +35,18 @@ bool SnapshotRegistry::snapshotExists(const std::string& key) return snapshotMap.find(key) != snapshotMap.end(); } -void SnapshotRegistry::mapSnapshot(const std::string& key, uint8_t* target) -{ - auto d = getSnapshot(key); - d->mapToMemory(target); -} - -void SnapshotRegistry::registerSnapshotIfNotExists( - const std::string& key, - std::shared_ptr data) -{ - doRegisterSnapshot(key, std::move(data), false); -} - void SnapshotRegistry::registerSnapshot( const std::string& key, std::shared_ptr data) -{ - doRegisterSnapshot(key, std::move(data), true); -} - -void SnapshotRegistry::doRegisterSnapshot( - const std::string& key, - std::shared_ptr data, - bool overwrite) { faabric::util::FullLock lock(snapshotsMx); - if (!overwrite && (snapshotMap.find(key) != snapshotMap.end())) { - SPDLOG_TRACE("Skipping already existing snapshot {}", key); - return; - } - SPDLOG_TRACE("Registering snapshot {} size {}", key, data->getSize()); snapshotMap.insert_or_assign(key, std::move(data)); + + // Reset dirty tracking + faabric::util::resetDirtyTracking(); } void SnapshotRegistry::deleteSnapshot(const std::string& key) diff --git a/src/snapshot/SnapshotServer.cpp b/src/snapshot/SnapshotServer.cpp index 21a7e529c..d53392f09 100644 --- a/src/snapshot/SnapshotServer.cpp +++ b/src/snapshot/SnapshotServer.cpp @@ -76,7 +76,7 @@ std::unique_ptr SnapshotServer::recvPushSnapshot( SPDLOG_DEBUG("Receiving snapshot {} (size {}, max {})", r->key()->c_str(), r->contents()->size(), - r->maxSize()); + r->max_size()); faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); @@ -84,11 +84,20 @@ std::unique_ptr SnapshotServer::recvPushSnapshot( // Set up the snapshot size_t snapSize = r->contents()->size(); std::string snapKey = r->key()->str(); - auto d = std::make_shared( - std::span((uint8_t*)r->contents()->Data(), snapSize), r->maxSize()); + auto snap = std::make_shared( + std::span((uint8_t*)r->contents()->Data(), snapSize), r->max_size()); + + // Add the merge regions + for (const auto* mr : *r->merge_regions()) { + snap->addMergeRegion( + mr->offset(), + mr->length(), + static_cast(mr->data_type()), + static_cast(mr->merge_op())); + } // Register snapshot - reg.registerSnapshot(snapKey, d); + reg.registerSnapshot(snapKey, snap); // Send response return std::make_unique(); @@ -114,31 +123,43 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) flatbuffers::GetRoot(buffer); SPDLOG_DEBUG( - "Applying {} diffs to snapshot {}", r->chunks()->size(), r->key()->str()); + "Applying {} diffs to snapshot {}", r->diffs()->size(), r->key()->str()); // Get the snapshot faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); auto snap = reg.getSnapshot(r->key()->str()); - // Convert chunks to snapshot diff objects + // Convert diffs to snapshot diff objects std::vector diffs; - diffs.reserve(r->chunks()->size()); - for (const auto* chunk : *r->chunks()) { + diffs.reserve(r->diffs()->size()); + for (const auto* diff : *r->diffs()) { diffs.emplace_back( - static_cast(chunk->dataType()), - static_cast(chunk->mergeOp()), - chunk->offset(), - std::span(chunk->data()->data(), - chunk->data()->size())); + static_cast(diff->data_type()), + static_cast(diff->merge_op()), + diff->offset(), + std::span(diff->data()->data(), diff->data()->size())); } // Queue on the snapshot snap->queueDiffs(diffs); - // Write if necessary + // Write diffs and set merge regions if necessary if (r->force()) { + // Write queued diffs snap->writeQueuedDiffs(); + + // Clear merge regions + snap->clearMergeRegions(); + + // Add merge regions from request + for (const auto* mr : *r->merge_regions()) { + snap->addMergeRegion( + mr->offset(), + mr->length(), + static_cast(mr->data_type()), + static_cast(mr->merge_op())); + } } // Send response diff --git a/src/util/memory.cpp b/src/util/memory.cpp index 35ef062a6..bbcd6b1c9 100644 --- a/src/util/memory.cpp +++ b/src/util/memory.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -80,27 +81,50 @@ AlignedChunk getPageAlignedChunk(long offset, long length) // Dirty page tracking // ------------------------- -void resetDirtyTracking() +class ClearRefsWrapper { - SPDLOG_DEBUG("Resetting dirty tracking"); - - FILE* fd = fopen(CLEAR_REFS, "w"); - if (fd == nullptr) { - SPDLOG_ERROR("Could not open clear_refs ({})", strerror(errno)); - throw std::runtime_error("Could not open clear_refs"); + public: + ClearRefsWrapper() + { + f = ::fopen(CLEAR_REFS, "w"); + if (f == nullptr) { + SPDLOG_ERROR("Could not open clear_refs ({})", strerror(errno)); + throw std::runtime_error("Could not open clear_refs"); + } } - // Write 4 to the file to track from now on - // https://www.kernel.org/doc/html/v5.4/admin-guide/mm/soft-dirty.html - char value[] = "4"; - size_t nWritten = fwrite(value, sizeof(char), 1, fd); - if (nWritten != 1) { - SPDLOG_ERROR("Failed to write to clear_refs ({})", nWritten); - fclose(fd); - throw std::runtime_error("Failed to write to clear_refs"); + ~ClearRefsWrapper() { ::fclose(f); } + + void reset() + { + // Write 4 to the file to track from now on + // https://www.kernel.org/doc/html/v5.4/admin-guide/mm/soft-dirty.html + char value[] = "4"; + size_t nWritten = ::fwrite(value, sizeof(char), 1, f); + + if (nWritten != 1) { + SPDLOG_ERROR("Failed to write to clear_refs ({})", nWritten); + ::fclose(f); + throw std::runtime_error("Failed to write to clear_refs"); + } + + ::rewind(f); } - fclose(fd); + private: + FILE* f = nullptr; +}; + +void resetDirtyTracking() +{ + static ClearRefsWrapper wrap; + + PROF_START(ResetDirty) + + SPDLOG_DEBUG("Resetting dirty tracking"); + wrap.reset(); + + PROF_END(ResetDirty) } std::vector readPagemapEntries(uintptr_t ptr, int nEntries) @@ -198,6 +222,11 @@ MemoryRegion doAlloc(size_t size, int prot, int flags) return mem; } +MemoryRegion allocatePrivateMemory(size_t size) +{ + return doAlloc(size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS); +} + MemoryRegion allocateSharedMemory(size_t size) { return doAlloc(size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS); diff --git a/src/util/snapshot.cpp b/src/util/snapshot.cpp index 16e6db1fe..8b8bf378e 100644 --- a/src/util/snapshot.cpp +++ b/src/util/snapshot.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include @@ -151,10 +152,7 @@ void SnapshotData::addMergeRegion(uint32_t offset, { faabric::util::FullLock lock(snapMx); - SnapshotMergeRegion region{ .offset = offset, - .length = length, - .dataType = dataType, - .operation = operation }; + SnapshotMergeRegion region(offset, length, dataType, operation); if (mergeRegions.find(region.offset) != mergeRegions.end()) { if (!overwrite) { @@ -187,17 +185,79 @@ void SnapshotData::addMergeRegion(uint32_t offset, mergeRegions[region.offset] = region; } -void SnapshotData::mapToMemory(uint8_t* target) +void SnapshotData::fillGapsWithOverwriteRegions() { faabric::util::FullLock lock(snapMx); - if (fd <= 0) { - std::string msg = "Attempting to map memory of non-restorable snapshot"; - SPDLOG_ERROR(msg); - throw std::runtime_error(msg); + // If there's no merge regions, just do one big one (note, zero length means + // fill all space + if (mergeRegions.empty()) { + SPDLOG_TRACE("Filling gap with single overwrite merge region"); + mergeRegions.emplace(std::pair( + 0, + { 0, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite })); + + return; } - mapMemoryPrivate({ target, size }, fd); + uint32_t lastRegionEnd = 0; + for (auto [offset, region] : mergeRegions) { + if (offset == 0) { + // Zeroth byte is in a merge region + lastRegionEnd = region.length; + continue; + } + + uint32_t regionLen = region.offset - lastRegionEnd; + + SPDLOG_TRACE("Filling gap with overwrite merge region {}-{}", + lastRegionEnd, + lastRegionEnd + regionLen); + + mergeRegions.emplace(std::pair( + lastRegionEnd, + { lastRegionEnd, + regionLen, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite })); + + lastRegionEnd = region.offset + region.length; + } + + if (lastRegionEnd < size) { + SPDLOG_TRACE("Filling final gap with merge region starting at {}", + lastRegionEnd); + + // Add a final region at the end of the snapshot + mergeRegions.emplace(std::pair( + lastRegionEnd, + { lastRegionEnd, + 0, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite })); + } +} + +void SnapshotData::mapToMemory(std::span target) +{ + // Note we only need a shared lock here as we are not modifying data and the + // OS will handle synchronisation of the mapping itself + PROF_START(MapSnapshot) + faabric::util::SharedLock lock(snapMx); + + if (target.size() > size) { + SPDLOG_ERROR("Mapping target memory larger than snapshot ({} > {})", + target.size(), + size); + throw std::runtime_error("Target memory larger than snapshot"); + } + + faabric::util::mapMemoryPrivate(target, fd); + + // Reset dirty tracking otherwise whole mapped region is marked dirty + faabric::util::resetDirtyTracking(); + + PROF_END(MapSnapshot) } std::map SnapshotData::getMergeRegions() @@ -228,98 +288,100 @@ void SnapshotData::queueDiffs(const std::span diffs) void SnapshotData::writeQueuedDiffs() { + PROF_START(WriteQueuedDiffs) faabric::util::FullLock lock(snapMx); + SPDLOG_DEBUG("Writing {} queued diffs to snapshot", queuedDiffs.size()); + // Iterate through diffs for (auto& diff : queuedDiffs) { + if (diff.getOperation() == + faabric::util::SnapshotMergeOperation::Ignore) { + + SPDLOG_TRACE("Ignoring region {}-{}", + diff.getOffset(), + diff.getOffset() + diff.getData().size()); + + continue; + } + if (diff.getOperation() == + faabric::util::SnapshotMergeOperation::Overwrite) { + + SPDLOG_TRACE("Copying overwrite diff into {}-{}", + diff.getOffset(), + diff.getOffset() + diff.getData().size()); + + writeData(diff.getData(), diff.getOffset()); + + continue; + } + + uint8_t* copyTarget = validatedOffsetPtr(diff.getOffset()); switch (diff.getDataType()) { - case (faabric::util::SnapshotDataType::Raw): { - switch (diff.getOperation()) { - case (faabric::util::SnapshotMergeOperation::Overwrite): { - SPDLOG_TRACE("Copying raw snapshot diff into {}-{}", - diff.getOffset(), - diff.getOffset() + diff.getData().size()); - - writeData(diff.getData(), diff.getOffset()); - break; - } - default: { - SPDLOG_ERROR("Unsupported raw merge operation: {}", - diff.getOperation()); - throw std::runtime_error( - "Unsupported raw merge operation"); - } - } - break; - } case (faabric::util::SnapshotDataType::Int): { - auto diffValue = unalignedRead(diff.getData().data()); - - auto original = faabric::util::unalignedRead( - validatedOffsetPtr(diff.getOffset())); - - int32_t finalValue = 0; - switch (diff.getOperation()) { - case (faabric::util::SnapshotMergeOperation::Sum): { - finalValue = original + diffValue; - - SPDLOG_TRACE("Applying int sum diff {}: {} = {} + {}", - diff.getOffset(), - finalValue, - original, - diffValue); - break; - } - case (faabric::util::SnapshotMergeOperation::Subtract): { - finalValue = original - diffValue; - - SPDLOG_TRACE("Applying int sub diff {}: {} = {} - {}", - diff.getOffset(), - finalValue, - original, - diffValue); - break; - } - case (faabric::util::SnapshotMergeOperation::Product): { - finalValue = original * diffValue; - - SPDLOG_TRACE("Applying int mult diff {}: {} = {} * {}", - diff.getOffset(), - finalValue, - original, - diffValue); - break; - } - case (faabric::util::SnapshotMergeOperation::Min): { - finalValue = std::min(original, diffValue); - - SPDLOG_TRACE("Applying int min diff {}: min({}, {})", - diff.getOffset(), - original, - diffValue); - break; - } - case (faabric::util::SnapshotMergeOperation::Max): { - finalValue = std::max(original, diffValue); - - SPDLOG_TRACE("Applying int max diff {}: max({}, {})", - diff.getOffset(), - original, - diffValue); - break; - } - default: { - SPDLOG_ERROR("Unsupported int merge operation: {}", - diff.getOperation()); - throw std::runtime_error( - "Unsupported int merge operation"); - } - } + int32_t finalValue = + applyDiffValue(validatedOffsetPtr(diff.getOffset()), + diff.getData().data(), + diff.getOperation()); + + SPDLOG_TRACE("Writing int {} diff: {} {} -> {}", + snapshotMergeOpStr(diff.getOperation()), + unalignedRead(copyTarget), + unalignedRead(diff.getData().data()), + finalValue); writeData({ BYTES(&finalValue), sizeof(int32_t) }, diff.getOffset()); break; } + case (faabric::util::SnapshotDataType::Long): { + long finalValue = + applyDiffValue(validatedOffsetPtr(diff.getOffset()), + diff.getData().data(), + diff.getOperation()); + + SPDLOG_TRACE("Writing long {} diff: {} {} -> {}", + snapshotMergeOpStr(diff.getOperation()), + unalignedRead(copyTarget), + unalignedRead(diff.getData().data()), + finalValue); + + writeData({ BYTES(&finalValue), sizeof(long) }, + diff.getOffset()); + break; + } + case (faabric::util::SnapshotDataType::Float): { + float finalValue = + applyDiffValue(validatedOffsetPtr(diff.getOffset()), + diff.getData().data(), + diff.getOperation()); + + SPDLOG_TRACE("Writing float {} diff: {} {} -> {}", + snapshotMergeOpStr(diff.getOperation()), + unalignedRead(copyTarget), + unalignedRead(diff.getData().data()), + finalValue); + + writeData({ BYTES(&finalValue), sizeof(float) }, + diff.getOffset()); + break; + } + case (faabric::util::SnapshotDataType::Double): { + double finalValue = + applyDiffValue(validatedOffsetPtr(diff.getOffset()), + diff.getData().data(), + diff.getOperation()); + + SPDLOG_TRACE("Writing double {} diff: {} {} -> {}", + snapshotMergeOpStr(diff.getOperation()), + unalignedRead(copyTarget), + unalignedRead(diff.getData().data()), + finalValue); + + writeData({ BYTES(&finalValue), sizeof(double) }, + diff.getOffset()); + break; + } default: { SPDLOG_ERROR("Unsupported data type: {}", diff.getDataType()); throw std::runtime_error("Unsupported merge data type"); @@ -329,6 +391,7 @@ void SnapshotData::writeQueuedDiffs() // Clear queue queuedDiffs.clear(); + PROF_END(WriteQueuedDiffs) } MemoryView::MemoryView(std::span dataIn) @@ -337,6 +400,7 @@ MemoryView::MemoryView(std::span dataIn) std::vector MemoryView::getDirtyRegions() { + PROF_START(GetDirtyRegions) if (data.empty()) { return {}; } @@ -346,6 +410,9 @@ std::vector MemoryView::getDirtyRegions() std::vector dirtyPageNumbers = getDirtyPageNumbers(data.data(), nPages); + SPDLOG_DEBUG( + "Memory view has {}/{} dirty pages", dirtyPageNumbers.size(), nPages); + std::vector> regions = faabric::util::getDirtyRegions(data.data(), nPages); @@ -353,20 +420,21 @@ std::vector MemoryView::getDirtyRegions() std::vector diffs; diffs.reserve(regions.size()); for (auto [regionBegin, regionEnd] : regions) { + SPDLOG_TRACE("Memory view dirty {}-{}", regionBegin, regionEnd); diffs.emplace_back(SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite, regionBegin, data.subspan(regionBegin, regionEnd - regionBegin)); } - SPDLOG_DEBUG("Memory view has {}/{} dirty pages", diffs.size(), nPages); - + PROF_END(GetDirtyRegions) return diffs; } std::vector MemoryView::diffWithSnapshot( std::shared_ptr snap) { + PROF_START(DiffWithSnapshot) std::vector diffs; std::map mergeRegions = snap->getMergeRegions(); @@ -398,14 +466,13 @@ std::vector MemoryView::diffWithSnapshot( for (auto& dirtyRegion : dirtyRegions) { // Add the diffs mr.addDiffs(diffs, - snap->getDataPtr(), - snap->getSize(), - data.data(), - dirtyRegion.first, - dirtyRegion.second); + { snap->getDataPtr(), snap->getSize() }, + data, + dirtyRegion); } } + PROF_END(DiffWithSnapshot) return diffs; } @@ -415,9 +482,21 @@ std::string snapshotDataTypeStr(SnapshotDataType dt) case (SnapshotDataType::Raw): { return "Raw"; } + case (SnapshotDataType::Bool): { + return "Bool"; + } case (SnapshotDataType::Int): { return "Int"; } + case (SnapshotDataType::Long): { + return "Long"; + } + case (SnapshotDataType::Float): { + return "Float"; + } + case (SnapshotDataType::Double): { + return "Double"; + } default: { SPDLOG_ERROR("Cannot convert snapshot data type to string: {}", dt); throw std::runtime_error("Cannot convert data type to string"); @@ -428,6 +507,9 @@ std::string snapshotDataTypeStr(SnapshotDataType dt) std::string snapshotMergeOpStr(SnapshotMergeOperation op) { switch (op) { + case (SnapshotMergeOperation::Ignore): { + return "Ignore"; + } case (SnapshotMergeOperation::Max): { return "Max"; } @@ -453,185 +535,224 @@ std::string snapshotMergeOpStr(SnapshotMergeOperation op) } } +void SnapshotMergeRegion::addOverwriteDiff( + std::vector& diffs, + std::span original, + std::span updated, + std::pair dirtyRange) +{ + auto operation = SnapshotMergeOperation::Overwrite; + + // Work out bounds of region we're checking + uint32_t checkStart = std::max(dirtyRange.first, offset); + + // Here we need to make sure we don't overrun the original or the updated + // data + uint32_t checkEnd; + if (length == 0) { + checkEnd = dirtyRange.second; + } else { + checkEnd = std::min(dirtyRange.second, offset + length); + } + + // If the region is outside the original data, automatically add a diff for + // the whole region + if (checkStart >= original.size()) { + SPDLOG_TRACE("Single extension {} overwrite diff at {}-{}", + snapshotDataTypeStr(dataType), + checkStart, + checkEnd - checkStart); + diffs.emplace_back(dataType, + operation, + checkStart, + updated.subspan(checkStart, checkEnd - checkStart)); + return; + } + + bool diffInProgress = false; + uint32_t diffStart = 0; + for (uint32_t b = checkStart; b <= checkEnd; b++) { + // If this byte is outside the original region, everything from here on + // is dirty, so we can add a single region to go from here to the end + if (b >= original.size()) { + if (!diffInProgress) { + diffStart = b; + } + + uint32_t diffLength = checkEnd - diffStart; + + SPDLOG_TRACE("Extension {} overwrite diff at {}-{}", + snapshotDataTypeStr(dataType), + diffStart, + diffStart + diffLength); + + diffs.emplace_back(dataType, + operation, + diffStart, + updated.subspan(diffStart, diffLength)); + return; + } + + bool isDirtyByte = (*(original.data() + b) != *(updated.data() + b)); + if (isDirtyByte && !diffInProgress) { + // Diff starts here if it's different and diff + // not in progress + diffInProgress = true; + diffStart = b; + } else if (!isDirtyByte && diffInProgress) { + // Diff ends if it's not different and diff is + // in progress + uint32_t diffLength = b - diffStart; + SPDLOG_TRACE("Found {} overwrite diff at {}-{}", + snapshotDataTypeStr(dataType), + diffStart, + diffStart + diffLength); + + diffInProgress = false; + diffs.emplace_back(dataType, + operation, + diffStart, + updated.subspan(diffStart, diffLength)); + } + } + + // If we've reached the end of this region with a diff + // in progress, we need to close it off + if (diffInProgress) { + uint32_t finalDiffLength = checkEnd - diffStart; + SPDLOG_TRACE("Adding {} {} diff at {}-{} (end of region)", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation), + diffStart, + diffStart + finalDiffLength); + + diffs.emplace_back(dataType, + operation, + diffStart, + updated.subspan(diffStart, finalDiffLength)); + } +} + +SnapshotMergeRegion::SnapshotMergeRegion(uint32_t offsetIn, + size_t lengthIn, + SnapshotDataType dataTypeIn, + SnapshotMergeOperation operationIn) + : offset(offsetIn) + , length(lengthIn) + , dataType(dataTypeIn) + , operation(operationIn) +{} + void SnapshotMergeRegion::addDiffs(std::vector& diffs, - const uint8_t* original, - uint32_t originalSize, - const uint8_t* updated, - uint32_t dirtyRegionStart, - uint32_t dirtyRegionEnd) + std::span originalData, + std::span updatedData, + std::pair dirtyRange) { // If the region has zero length, it signifies that it goes to the // end of the memory, so we go all the way to the end of the dirty region. // For all other regions, we just check if the dirty range is within the // merge region. - bool isInRange = (dirtyRegionEnd > offset) && - ((length == 0) || (dirtyRegionStart < offset + length)); + bool isInRange = (dirtyRange.second > offset) && + ((length == 0) || (dirtyRange.first < offset + length)); if (!isInRange) { + SPDLOG_TRACE("{} {} merge region {}-{} not in dirty region {}-{}", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation), + offset, + offset + length, + dirtyRange.first, + dirtyRange.second); return; } - SPDLOG_TRACE("{} {} merge region {}-{} aligns with dirty region {}-{}", - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - offset, - offset + length, - dirtyRegionStart, - dirtyRegionEnd); - - switch (dataType) { - case (SnapshotDataType::Int): { - // Check if the value has changed - const uint8_t* updatedValue = updated + offset; - int updatedInt = *(reinterpret_cast(updatedValue)); - - if (originalSize < offset) { - throw std::runtime_error( - "Do not support int operations outside original snapshot"); - } + SPDLOG_TRACE( + "{} {} merge region {}-{}, dirty region {}-{}, original size {}", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation), + offset, + offset + length, + dirtyRange.first, + dirtyRange.second, + originalData.size()); + + if (operation == SnapshotMergeOperation::Overwrite) { + addOverwriteDiff(diffs, originalData, updatedData, dirtyRange); + return; + } - const uint8_t* originalValue = original + offset; - int originalInt = *(reinterpret_cast(originalValue)); + if (operation == SnapshotMergeOperation::Ignore) { + return; + } - // Skip if no change - if (originalInt == updatedInt) { - return; - } + if (originalData.size() < offset) { + throw std::runtime_error( + "Do not support non-overwrite operations outside original snapshot"); + } - // Potentially modify the original in place depending on the - // operation - switch (operation) { - case (SnapshotMergeOperation::Sum): { - // Sums must send the value to be _added_, and - // not the final result - updatedInt -= originalInt; - break; - } - case (SnapshotMergeOperation::Subtract): { - // Subtractions must send the value to be - // subtracted, not the result - updatedInt = originalInt - updatedInt; - break; - } - case (SnapshotMergeOperation::Product): { - // Products must send the value to be - // multiplied, not the result - updatedInt /= originalInt; - break; - } - case (SnapshotMergeOperation::Max): - case (SnapshotMergeOperation::Min): - // Min and max don't need to change - break; - default: { - SPDLOG_ERROR("Unhandled integer merge operation: {}", - operation); - throw std::runtime_error( - "Unhandled integer merge operation"); - } - } + uint8_t* updated = (uint8_t*)updatedData.data() + offset; + const uint8_t* original = originalData.data() + offset; - // TODO - somehow avoid casting away the const here? - // Modify the memory in-place here - std::memcpy( - (uint8_t*)updatedValue, BYTES(&updatedInt), sizeof(int32_t)); + bool changed = false; + switch (dataType) { + case (SnapshotDataType::Int): { + int preUpdate = unalignedRead(updated); + changed = calculateDiffValue(original, updated, operation); - // Add the diff - diffs.emplace_back(dataType, - operation, - offset, - std::span(updatedValue, length)); + SPDLOG_TRACE("Calculated int {} merge: {} {} -> {}", + snapshotMergeOpStr(operation), + preUpdate, + unalignedRead(original), + unalignedRead(updated)); + break; + } + case (SnapshotDataType::Long): { + long preUpdate = unalignedRead(updated); + changed = calculateDiffValue(original, updated, operation); - SPDLOG_TRACE("Found {} {} diff at {}-{} ({})", - snapshotDataTypeStr(dataType), + SPDLOG_TRACE("Calculated long {} merge: {} {} -> {}", snapshotMergeOpStr(operation), - offset, - offset + length, - updatedInt); + preUpdate, + unalignedRead(original), + unalignedRead(updated)); + break; + } + case (SnapshotDataType::Float): { + float preUpdate = unalignedRead(updated); + changed = calculateDiffValue(original, updated, operation); + SPDLOG_TRACE("Calculated float {} merge: {} {} -> {}", + snapshotMergeOpStr(operation), + preUpdate, + unalignedRead(original), + unalignedRead(updated)); break; } - case (SnapshotDataType::Raw): { - switch (operation) { - case (SnapshotMergeOperation::Overwrite): { - // Work out bounds of region we're checking - uint32_t checkStart = - std::max(dirtyRegionStart, offset); - - uint32_t checkEnd; - if (length == 0) { - checkEnd = dirtyRegionEnd; - } else { - checkEnd = - std::min(dirtyRegionEnd, offset + length); - } - - bool diffInProgress = false; - int diffStart = 0; - for (int b = checkStart; b <= checkEnd; b++) { - // If this byte is outside the original region, we can't - // compare (i.e. always dirty) - bool isDirtyByte = (b > originalSize) || - (*(original + b) != *(updated + b)); - - if (isDirtyByte && !diffInProgress) { - // Diff starts here if it's different and diff - // not in progress - diffInProgress = true; - diffStart = b; - } else if (!isDirtyByte && diffInProgress) { - // Diff ends if it's not different and diff is - // in progress - int diffLength = b - diffStart; - SPDLOG_TRACE("Found {} {} diff at {}-{}", - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - diffStart, - diffStart + diffLength); - - diffInProgress = false; - diffs.emplace_back( - dataType, - operation, - diffStart, - std::span(updated + diffStart, - diffLength)); - } - } - - // If we've reached the end of this region with a diff - // in progress, we need to close it off - if (diffInProgress) { - int finalDiffLength = checkEnd - diffStart; - SPDLOG_TRACE( - "Adding {} {} diff at {}-{} (end of region)", - snapshotDataTypeStr(dataType), - snapshotMergeOpStr(operation), - diffStart, - diffStart + finalDiffLength); - - diffs.emplace_back( - dataType, - operation, - diffStart, - std::span(updated + diffStart, - finalDiffLength)); - } - break; - } - default: { - SPDLOG_ERROR("Unhandled raw merge operation: {}", - operation); - throw std::runtime_error("Unhandled raw merge operation"); - } - } + case (SnapshotDataType::Double): { + double preUpdate = unalignedRead(updated); + changed = calculateDiffValue(original, updated, operation); + SPDLOG_TRACE("Calculated double {} merge: {} {} -> {}", + snapshotMergeOpStr(operation), + preUpdate, + unalignedRead(original), + unalignedRead(updated)); break; } default: { - SPDLOG_ERROR("Merge region for unhandled data type: {}", dataType); - throw std::runtime_error("Merge region for unhandled data type"); + SPDLOG_ERROR("Unsupported merge op combination {} {}", + snapshotDataTypeStr(dataType), + snapshotMergeOpStr(operation)); + throw std::runtime_error("Unsupported merge op combination"); } } + + // Add the diff + if (changed) { + diffs.emplace_back(dataType, + operation, + offset, + std::span(updated, length)); + } } } diff --git a/tasks/dev.py b/tasks/dev.py index 7a8601ef8..3559e1f1b 100644 --- a/tasks/dev.py +++ b/tasks/dev.py @@ -14,7 +14,9 @@ @task -def cmake(ctx, clean=False, shared=False, build="Debug", sanitiser="None"): +def cmake( + ctx, clean=False, shared=False, build="Debug", sanitiser="None", prof=False +): """ Configures the build """ @@ -41,6 +43,7 @@ def cmake(ctx, clean=False, shared=False, build="Debug", sanitiser="None"): "-DCMAKE_CXX_COMPILER=/usr/bin/clang++-13", "-DCMAKE_C_COMPILER=/usr/bin/clang-13", "-DFAABRIC_USE_SANITISER={}".format(sanitiser), + "-DFAABRIC_SELF_TRACING=ON" if prof else "", PROJ_ROOT, ] diff --git a/tests/dist/DistTestExecutor.cpp b/tests/dist/DistTestExecutor.cpp index 12622f49c..c7f337a40 100644 --- a/tests/dist/DistTestExecutor.cpp +++ b/tests/dist/DistTestExecutor.cpp @@ -72,7 +72,7 @@ void DistTestExecutor::restore(faabric::Message& msg) setUpDummyMemory(snap->getSize()); - reg.mapSnapshot(msg.snapshotkey(), dummyMemory.get()); + snap->mapToMemory({ dummyMemory.get(), dummyMemorySize }); } faabric::util::MemoryView DistTestExecutor::getMemoryView() @@ -89,7 +89,7 @@ void DistTestExecutor::setUpDummyMemory(size_t memSize) { if (dummyMemory.get() == nullptr) { SPDLOG_DEBUG("Dist test executor initialising memory size {}", memSize); - dummyMemory = faabric::util::allocateSharedMemory(memSize); + dummyMemory = faabric::util::allocatePrivateMemory(memSize); dummyMemorySize = memSize; } } diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index f0b755767..23765fd0d 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -72,20 +72,13 @@ int handleFakeDiffsFunction(tests::DistTestExecutor* exec, auto originalSnap = reg.getSnapshot(snapshotKey); - // Add a single merge region to catch both diffs - int offsetA = 10; - int offsetB = 100; std::vector inputBytes = faabric::util::stringToBytes(msgInput); - originalSnap->addMergeRegion( - 0, - offsetB + inputBytes.size() + 10, - faabric::util::SnapshotDataType::Raw, - faabric::util::SnapshotMergeOperation::Overwrite); - // Modify the executor's memory std::vector keyBytes = faabric::util::stringToBytes(snapshotKey); + int offsetA = 10; + int offsetB = 100; std::memcpy(exec->getDummyMemory().data() + offsetA, keyBytes.data(), keyBytes.size()); @@ -201,7 +194,6 @@ int handleFakeDiffsThreadedFunction( } else { // This is the code that will be executed by the remote threads. - // Add a merge region to catch the modification int idx = msg.appidx(); int regionOffset = 2 * idx * faabric::util::HOST_PAGE_SIZE; @@ -211,16 +203,6 @@ int handleFakeDiffsThreadedFunction( std::vector inputBytes = faabric::util::stringToBytes(msgInput); - auto originalSnap = reg.getSnapshot(snapshotKey); - - // Make sure it's captured by the region - int regionLength = 20 + inputBytes.size(); - originalSnap->addMergeRegion( - regionOffset, - regionLength, - faabric::util::SnapshotDataType::Raw, - faabric::util::SnapshotMergeOperation::Overwrite); - // Now modify the memory std::memcpy(exec->getDummyMemory().data() + changeOffset, inputBytes.data(), @@ -258,18 +240,18 @@ int handleReductionFunction(tests::DistTestExecutor* exec, uint32_t reductionBOffset = 2 * HOST_PAGE_SIZE; uint32_t arrayOffset = HOST_PAGE_SIZE + 10 * sizeof(int32_t); - // Initialise message bool isThread = req->type() == faabric::BatchExecuteRequest::THREADS; - // Set up snapshot - std::string snapKey = "dist-reduction-" + std::to_string(generateGid()); - std::shared_ptr snap = - std::make_shared(snapSize); - reg.registerSnapshot(snapKey, snap); - // Main function will set up the snapshot and merge regions, while the child // threads will modify an array and perform a reduction operation if (!isThread) { + // Set up snapshot + std::string snapKey = "dist-reduction-" + std::to_string(generateGid()); + std::shared_ptr snap = + std::make_shared(snapSize); + reg.registerSnapshot(snapKey, snap); + + // Perform operations in a loop for (int r = 0; r < nRepeats; r++) { // Set up thread request auto req = faabric::util::batchExecFactory( @@ -285,6 +267,19 @@ int handleReductionFunction(tests::DistTestExecutor* exec, m.set_appidx(i); } + // Set merge regions + snap->addMergeRegion(reductionAOffset, + sizeof(int32_t), + SnapshotDataType::Int, + SnapshotMergeOperation::Sum, + true); + + snap->addMergeRegion(reductionBOffset, + sizeof(int32_t), + SnapshotDataType::Int, + SnapshotMergeOperation::Sum, + true); + // Make the request faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); @@ -328,7 +323,7 @@ int handleReductionFunction(tests::DistTestExecutor* exec, snap->writeQueuedDiffs(); // Remap memory to snapshot - snap->mapToMemory(exec->getDummyMemory().data()); + snap->mapToMemory(exec->getDummyMemory()); uint8_t* reductionAPtr = exec->getDummyMemory().data() + reductionAOffset; @@ -386,26 +381,6 @@ int handleReductionFunction(tests::DistTestExecutor* exec, uint32_t thisIdx = msg.appidx(); uint8_t* thisArrayPtr = arrayPtr + (sizeof(int32_t) * thisIdx); - // Set merge regions - std::shared_ptr snap = reg.getSnapshot(msg.snapshotkey()); - snap->addMergeRegion(reductionAOffset, - sizeof(int32_t), - SnapshotDataType::Int, - SnapshotMergeOperation::Sum, - true); - - snap->addMergeRegion(reductionBOffset, - sizeof(int32_t), - SnapshotDataType::Int, - SnapshotMergeOperation::Sum, - true); - - snap->addMergeRegion(arrayOffset, - sizeof(int32_t) * nThreads, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite, - true); - // Lock group locally while doing reduction std::shared_ptr group = faabric::transport::PointToPointGroup::getGroup(groupId); diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index 1d983deda..cd93df8ad 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -26,11 +26,12 @@ TEST_CASE_METHOD(DistTestsFixture, std::string user = "snapshots"; std::string function = "fake-diffs"; std::string snapshotKey = "dist-snap-check"; + std::vector inputData = { 0, 1, 2, 3, 4, 5, 6 }; + // Set up snapshot size_t snapSize = 2 * faabric::util::HOST_PAGE_SIZE; auto snap = std::make_shared(snapSize); - // Set up snapshot reg.registerSnapshot(snapshotKey, snap); // Set up the message @@ -40,7 +41,6 @@ TEST_CASE_METHOD(DistTestsFixture, // Set up some input data faabric::Message& m = req->mutable_messages()->at(0); - std::vector inputData = { 0, 1, 2, 3, 4, 5, 6 }; m.set_inputdata(inputData.data(), inputData.size()); m.set_snapshotkey(snapshotKey); diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index ae92cbf9a..3cda020ec 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -49,8 +49,8 @@ void TestExecutor::restore(faabric::Message& msg) auto snap = reg.getSnapshot(msg.snapshotkey()); dummyMemorySize = snap->getSize(); - dummyMemory = faabric::util::allocateSharedMemory(snap->getSize()); - reg.mapSnapshot(msg.snapshotkey(), dummyMemory.get()); + dummyMemory = faabric::util::allocatePrivateMemory(snap->getSize()); + snap->mapToMemory({ dummyMemory.get(), dummyMemorySize }); } faabric::util::MemoryView TestExecutor::getMemoryView() @@ -178,19 +178,15 @@ int32_t TestExecutor::executeTask( auto snapData = faabric::snapshot::getSnapshotRegistry().getSnapshot( msg.snapshotkey()); - // Avoid writing a zero here as the memory is already zeroed hence + // Set up the data. + // Note, avoid writing a zero here as the memory is already zeroed hence // it's not a change std::vector data = { (uint8_t)(pageIdx + 1), (uint8_t)(pageIdx + 2), (uint8_t)(pageIdx + 3) }; - // Set up a merge region that should catch the diff + // Copy in the data size_t offset = (pageIdx * faabric::util::HOST_PAGE_SIZE); - snapData->addMergeRegion(offset, - data.size() + 10, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite); - SPDLOG_DEBUG("TestExecutor modifying page {} of memory ({}-{})", pageIdx, offset, diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 98820cfca..8cc3b23f9 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -226,8 +226,20 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") faabric::snapshot::getSnapshotRegistry(); std::unique_ptr snapshotDataAllocation; + std::vector snapshotMergeRegions; if (!expectedSnapshot.empty()) { auto snap = std::make_shared(1234); + + snap->addMergeRegion(123, + 1234, + faabric::util::SnapshotDataType::Int, + faabric::util::SnapshotMergeOperation::Sum); + + snap->addMergeRegion(345, + 3456, + faabric::util::SnapshotDataType::Raw, + faabric::util::SnapshotMergeOperation::Overwrite); + snapRegistry.registerSnapshot(expectedSnapshot, snap); } @@ -305,6 +317,8 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") REQUIRE(pushedSnapshot.first == otherHost); REQUIRE(pushedSnapshot.second->getSize() == snapshot->getSize()); REQUIRE(pushedSnapshot.second->getDataPtr() == snapshot->getDataPtr()); + REQUIRE(pushedSnapshot.second->getMergeRegions().size() == + snapshot->getMergeRegions().size()); } // Check the executor counts on this host diff --git a/tests/test/snapshot/test_snapshot_client_server.cpp b/tests/test/snapshot/test_snapshot_client_server.cpp index 25abd5545..0acd747e0 100644 --- a/tests/test/snapshot/test_snapshot_client_server.cpp +++ b/tests/test/snapshot/test_snapshot_client_server.cpp @@ -80,9 +80,20 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, std::vector dataA(snapSizeA, 1); std::vector dataB(snapSizeB, 2); + // Set up snapshots auto snapA = std::make_shared(dataA); auto snapB = std::make_shared(dataB); + // Add merge regions to one + std::vector mergeRegions = { + { 123, 1234, SnapshotDataType::Int, SnapshotMergeOperation::Sum }, + { 345, 3456, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite } + }; + + for (const auto& m : mergeRegions) { + snapA->addMergeRegion(m.offset, m.length, m.dataType, m.operation); + } + REQUIRE(reg.getSnapshotCount() == 0); // Send the messages @@ -97,6 +108,21 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, REQUIRE(actualA->getSize() == snapA->getSize()); REQUIRE(actualB->getSize() == snapB->getSize()); + // Check merge regions + REQUIRE(actualA->getMergeRegions().size() == mergeRegions.size()); + REQUIRE(actualB->getMergeRegions().empty()); + + for (int i = 0; i < mergeRegions.size(); i++) { + SnapshotMergeRegion expected = mergeRegions.at(i); + SnapshotMergeRegion actual = snapA->getMergeRegions()[expected.offset]; + + REQUIRE(actual.offset == expected.offset); + REQUIRE(actual.dataType == expected.dataType); + REQUIRE(actual.length == expected.length); + REQUIRE(actual.operation == expected.operation); + } + + // Check data contents std::vector actualDataA = actualA->getDataCopy(); std::vector actualDataB = actualB->getDataCopy(); @@ -134,6 +160,20 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, // Set up the snapshot reg.registerSnapshot(snapKey, snap); + // Set up another snapshot with some merge regions to check they're added + // on an update + auto otherSnap = + std::make_shared(initialSnapSize, expandedSnapSize); + + std::vector mergeRegions = { + { 123, 1234, SnapshotDataType::Int, SnapshotMergeOperation::Sum }, + { 345, 3456, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite } + }; + + for (const auto& m : mergeRegions) { + otherSnap->addMergeRegion(m.offset, m.length, m.dataType, m.operation); + } + // Set up some diffs for the initial request uint32_t offsetA1 = 5; uint32_t offsetA2 = 2 * HOST_PAGE_SIZE; @@ -153,7 +193,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, diffDataA2); std::vector diffsA = { diffA1, diffA2 }; - cli.pushSnapshotDiffs(snapKey, false, diffsA); + cli.pushSnapshotDiffs(snapKey, diffsA); REQUIRE(snap->getQueuedDiffsCount() == 2); // Submit some more diffs, some larger than the original snapshot (to check @@ -183,22 +223,41 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, std::vector diffsB = { diffB1, diffB2, diffB3 }; - bool force = false; - SECTION("Force") { force = true; } - - SECTION("Don't force") { force = false; } - - // Make the request - cli.pushSnapshotDiffs(snapKey, force, diffsB); + SECTION("Full update") + { + // Make the request + cli.pushSnapshotUpdate(snapKey, otherSnap, diffsB); - if (force) { // Check nothing queued REQUIRE(snap->getQueuedDiffsCount() == 0); - } else { + + // Check merge regions from other snap pushed + REQUIRE(snap->getMergeRegions().size() == mergeRegions.size()); + + for (int i = 0; i < mergeRegions.size(); i++) { + SnapshotMergeRegion expected = mergeRegions.at(i); + SnapshotMergeRegion actual = + snap->getMergeRegions()[expected.offset]; + + REQUIRE(actual.offset == expected.offset); + REQUIRE(actual.dataType == expected.dataType); + REQUIRE(actual.length == expected.length); + REQUIRE(actual.operation == expected.operation); + } + } + + SECTION("Just diffs") + { + // Make the request + cli.pushSnapshotDiffs(snapKey, diffsB); + // Check and write queued diffs REQUIRE(snap->getQueuedDiffsCount() == 5); snap->writeQueuedDiffs(); + + // Check no merge regions sent + REQUIRE(snap->getMergeRegions().empty()); } // Check diffs have been applied @@ -243,7 +302,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, size_t originalDiffsApplied = snap->getQueuedDiffsCount(); diffs = { diffA1, diffA2 }; - cli.pushSnapshotDiffs(snapKey, false, diffs); + cli.pushSnapshotDiffs(snapKey, diffs); // Ensure the right number of diffs is applied REQUIRE(snap->getQueuedDiffsCount() == originalDiffsApplied + 2); @@ -358,7 +417,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, size_t originalDiffsApplied = snap->getQueuedDiffsCount(); std::vector diffs = { diff }; - cli.pushSnapshotDiffs(snapKey, false, diffs); + cli.pushSnapshotDiffs(snapKey, diffs); // Ensure the right number of diffs is applied REQUIRE(snap->getQueuedDiffsCount() == originalDiffsApplied + 1); diff --git a/tests/test/snapshot/test_snapshot_diffs.cpp b/tests/test/snapshot/test_snapshot_diffs.cpp index 05ac925f8..066b01ad3 100644 --- a/tests/test/snapshot/test_snapshot_diffs.cpp +++ b/tests/test/snapshot/test_snapshot_diffs.cpp @@ -36,9 +36,13 @@ TEST_CASE_METHOD(SnapshotTestFixture, int sharedMemPages = 8; size_t sharedMemSize = sharedMemPages * HOST_PAGE_SIZE; - MemoryRegion sharedMem = allocateSharedMemory(sharedMemSize); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + // Check we can write to shared mem + sharedMem[0] = 1; + + // Map to the snapshot + snap->mapToMemory({ sharedMem.get(), snapSize }); // Make various changes sharedMem[0] = 1; @@ -64,11 +68,10 @@ TEST_CASE_METHOD(SnapshotTestFixture, "Test snapshot diffs", "[snapshot]") // Make shared memory larger than original snapshot int sharedMemPages = 8; size_t sharedMemSize = sharedMemPages * HOST_PAGE_SIZE; - MemoryRegion sharedMem = - allocateSharedMemory(sharedMemPages * HOST_PAGE_SIZE); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); // Map the snapshot to the start of the memory - reg.mapSnapshot(snapKey, sharedMem.get()); + snap->mapToMemory({ sharedMem.get(), snapSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); diff --git a/tests/test/snapshot/test_snapshot_registry.cpp b/tests/test/snapshot/test_snapshot_registry.cpp index f919618d8..1b15d2c73 100644 --- a/tests/test/snapshot/test_snapshot_registry.cpp +++ b/tests/test/snapshot/test_snapshot_registry.cpp @@ -67,13 +67,15 @@ TEST_CASE_METHOD(SnapshotTestFixture, REQUIRE(actualC->getDataPtr() == snapC->getDataPtr()); // Create regions onto which we will map the snapshots - MemoryRegion actualDataA = allocateSharedMemory(1 * HOST_PAGE_SIZE); - MemoryRegion actualDataB = allocateSharedMemory(2 * HOST_PAGE_SIZE); - MemoryRegion actualDataC = allocateSharedMemory(3 * HOST_PAGE_SIZE); + size_t sizeA = HOST_PAGE_SIZE; + size_t sizeC = 3 * HOST_PAGE_SIZE; + MemoryRegion actualDataA = allocatePrivateMemory(sizeA); + MemoryRegion actualDataB = allocatePrivateMemory(2 * HOST_PAGE_SIZE); + MemoryRegion actualDataC = allocatePrivateMemory(sizeC); // Map two of them - reg.mapSnapshot(keyA, actualDataA.get()); - reg.mapSnapshot(keyC, actualDataC.get()); + snapA->mapToMemory({ actualDataA.get(), sizeA }); + snapC->mapToMemory({ actualDataC.get(), sizeC }); // Here we need to check the actual data after mapping std::vector vecDataA = snapA->getDataCopy(); @@ -103,48 +105,6 @@ TEST_CASE_METHOD(SnapshotTestFixture, REQUIRE(reg.getSnapshotCount() == 0); } -TEST_CASE_METHOD(SnapshotTestFixture, - "Test register snapshot if not exists", - "[snapshot]") -{ - REQUIRE(reg.getSnapshotCount() == 0); - - std::string keyA = "snapA"; - std::string keyB = "snapB"; - - REQUIRE(!reg.snapshotExists(keyA)); - REQUIRE(!reg.snapshotExists(keyB)); - - // Take one of the snapshots - auto snapBefore = setUpSnapshot(keyA, 1); - - REQUIRE(reg.snapshotExists(keyA)); - REQUIRE(!reg.snapshotExists(keyB)); - REQUIRE(reg.getSnapshotCount() == 1); - - auto otherSnap = - std::make_shared(snapBefore->getSize() + 10); - std::vector otherData(snapBefore->getSize() + 10, 1); - otherSnap->copyInData(otherData); - - // Check existing snapshot is not overwritten - reg.registerSnapshotIfNotExists(keyA, otherSnap); - auto snapAfterA = reg.getSnapshot(keyA); - REQUIRE(snapAfterA->getDataPtr() == snapBefore->getDataPtr()); - REQUIRE(snapAfterA->getSize() == snapBefore->getSize()); - - // Check new snapshot is still created - reg.registerSnapshotIfNotExists(keyB, otherSnap); - - REQUIRE(reg.snapshotExists(keyA)); - REQUIRE(reg.snapshotExists(keyB)); - REQUIRE(reg.getSnapshotCount() == 2); - - auto snapAfterB = reg.getSnapshot(keyB); - REQUIRE(snapAfterB->getDataPtr() == otherSnap->getDataPtr()); - REQUIRE(snapAfterB->getSize() == otherSnap->getSize()); -} - TEST_CASE_METHOD(SnapshotTestFixture, "Test can't get snapshot with empty key", "[snapshot]") diff --git a/tests/test/util/test_memory.cpp b/tests/test/util/test_memory.cpp index d49c3684b..d23ffe9cd 100644 --- a/tests/test/util/test_memory.cpp +++ b/tests/test/util/test_memory.cpp @@ -402,7 +402,7 @@ TEST_CASE("Test mapping memory", "[util]") writeToFd(fd, 0, { vMem.get(), chunk.size() }); // Map some new memory to this fd - MemoryRegion memA = allocateSharedMemory(chunk.size()); + MemoryRegion memA = allocatePrivateMemory(chunk.size()); mapMemoryPrivate({ memA.get(), chunk.size() }, fd); std::vector memAData(memA.get(), memA.get() + chunk.size()); @@ -417,7 +417,7 @@ TEST_CASE("Test mapping memory", "[util]") appendDataToFd(fd, { chunkB.data(), chunkB.size() }); // Map a region to both chunks - MemoryRegion memB = allocateSharedMemory(chunk.size() + chunkB.size()); + MemoryRegion memB = allocatePrivateMemory(chunk.size() + chunkB.size()); mapMemoryPrivate({ memB.get(), chunk.size() + chunkB.size() }, fd); // Check region now contains both bits of data @@ -433,7 +433,7 @@ TEST_CASE("Test mapping memory", "[util]") TEST_CASE("Test mapping memory fails with invalid fd", "[util]") { size_t memSize = 10 * HOST_PAGE_SIZE; - MemoryRegion sharedMem = allocateSharedMemory(memSize); + MemoryRegion sharedMem = allocatePrivateMemory(memSize); int fd = 0; SECTION("Zero fd") { fd = 0; } @@ -442,4 +442,42 @@ TEST_CASE("Test mapping memory fails with invalid fd", "[util]") REQUIRE_THROWS(mapMemoryPrivate({ sharedMem.get(), memSize }, fd)); } + +TEST_CASE("Test remapping memory", "[util]") +{ + // Set up some data + size_t dataSize = 10 * HOST_PAGE_SIZE; + std::vector expectedData(dataSize, 3); + + // Write this to a file descriptor + int fd = createFd(expectedData.size(), "foobar"); + writeToFd(fd, 0, { expectedData.data(), expectedData.size() }); + + // Map some new memory to this fd + MemoryRegion mappedMem = allocatePrivateMemory(dataSize); + mapMemoryPrivate({ mappedMem.get(), dataSize }, fd); + + std::vector actualData(mappedMem.get(), + mappedMem.get() + dataSize); + REQUIRE(actualData == expectedData); + + // Modify the memory + std::vector update(100, 4); + size_t updateOffset = HOST_PAGE_SIZE + 10; + std::memcpy(mappedMem.get() + updateOffset, update.data(), update.size()); + + // Spot check to make sure update has been made + REQUIRE(*(mappedMem.get() + (updateOffset + 5)) == (uint8_t)4); + + // Remap + mapMemoryPrivate({ mappedMem.get(), dataSize }, fd); + + // Spot check to make sure update has been removed + REQUIRE(*(mappedMem.get() + (updateOffset + 5)) == (uint8_t)3); + + // Check all data + std::vector actualDataAfter(mappedMem.get(), + mappedMem.get() + dataSize); + REQUIRE(actualDataAfter == expectedData); +} } diff --git a/tests/test/util/test_snapshot.cpp b/tests/test/util/test_snapshot.cpp index 2fcf9d169..837d0606b 100644 --- a/tests/test/util/test_snapshot.cpp +++ b/tests/test/util/test_snapshot.cpp @@ -1,9 +1,9 @@ #include -#include "faabric/snapshot/SnapshotRegistry.h" #include "faabric_utils.h" #include "fixtures.h" +#include #include #include #include @@ -62,7 +62,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::vector dataA(100, 1); std::vector dataB(2 * HOST_PAGE_SIZE, 2); - MemoryRegion memB = allocateSharedMemory(dataB.size()); + MemoryRegion memB = allocatePrivateMemory(dataB.size()); std::memcpy(memB.get(), dataB.data(), dataB.size()); std::vector dataC(sizeof(int), 3); @@ -141,8 +141,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Set up shared memory int sharedMemPages = 20; - MemoryRegion sharedMem = - allocateSharedMemory(sharedMemPages * HOST_PAGE_SIZE); + size_t sharedMemSize = sharedMemPages * HOST_PAGE_SIZE; + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); // Check it's zeroed std::vector expectedInitial(snap->getSize(), 0); @@ -151,7 +151,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, REQUIRE(actualSharedMemBefore == expectedInitial); // Map the snapshot and check again - snap->mapToMemory(sharedMem.get()); + snap->mapToMemory({ sharedMem.get(), snap->getSize() }); std::vector actualSharedMemAfter( sharedMem.get(), sharedMem.get() + snap->getSize()); REQUIRE(actualSharedMemAfter == actualSnapMem); @@ -179,12 +179,12 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::vector expectedSnapMem = snap->getDataCopy(); // Set up two shared mem regions - MemoryRegion sharedMemA = allocateSharedMemory(snapSize); - MemoryRegion sharedMemB = allocateSharedMemory(snapSize); + MemoryRegion sharedMemA = allocatePrivateMemory(snapSize); + MemoryRegion sharedMemB = allocatePrivateMemory(snapSize); // Map the snapshot and both regions reflect the change - snap->mapToMemory(sharedMemA.get()); - snap->mapToMemory(sharedMemB.get()); + snap->mapToMemory({ sharedMemA.get(), snapSize }); + snap->mapToMemory({ sharedMemB.get(), snapSize }); REQUIRE(std::vector(sharedMemA.get(), sharedMemA.get() + snapSize) == expectedSnapMem); @@ -272,8 +272,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, REQUIRE(originalData.size() == originalSize); // Map to some other region of memory large enough for the extended version - MemoryRegion sharedMem = allocateSharedMemory(expandedSize); - snap->mapToMemory(sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(expandedSize); + snap->mapToMemory({ sharedMem.get(), originalSize }); // Add some data to the extended region. Check the snapshot extends to fit std::vector dataC(300, 5); @@ -290,7 +290,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, REQUIRE(snap->getSize() == expectedSizeB); // Remap to shared memory - snap->mapToMemory(sharedMem.get()); + snap->mapToMemory({ sharedMem.get(), snap->getSize() }); // Check mapped region matches std::vector actualData = snap->getDataCopy(); @@ -331,8 +331,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, reg.registerSnapshot(snapKey, snap); // Map the snapshot to some memory - MemoryRegion sharedMem = allocateSharedMemory(memSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(memSize); + snap->mapToMemory({ sharedMem.get(), memSize }); // Check mapping works int* intA = (int*)(sharedMem.get() + intAOffset); @@ -449,9 +449,9 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Map the snapshot to some memory size_t sharedMemSize = snapPages * HOST_PAGE_SIZE; - MemoryRegion sharedMem = allocateSharedMemory(snapPages * HOST_PAGE_SIZE); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + snap->mapToMemory({ sharedMem.get(), sharedMemSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); @@ -530,6 +530,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, size_t dataLength = 0; size_t regionLength = 0; + bool expectNoDiff = false; + SECTION("Integer") { int originalValue = 0; @@ -590,6 +592,214 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, expectedData = faabric::util::valueToBytes(diffValue); } + SECTION("Long") + { + long originalValue = 0; + long finalValue = 0; + long diffValue = 0; + + dataType = faabric::util::SnapshotDataType::Long; + dataLength = sizeof(long); + regionLength = sizeof(long); + + SECTION("Long sum") + { + originalValue = 100; + finalValue = 150; + diffValue = 50; + + operation = faabric::util::SnapshotMergeOperation::Sum; + } + + SECTION("Long subtract") + { + originalValue = 150; + finalValue = 100; + diffValue = 50; + + operation = faabric::util::SnapshotMergeOperation::Subtract; + } + + SECTION("Long product") + { + originalValue = 3; + finalValue = 150; + diffValue = 50; + + operation = faabric::util::SnapshotMergeOperation::Product; + } + + SECTION("Long max") + { + originalValue = 10; + finalValue = 200; + diffValue = 200; + + operation = faabric::util::SnapshotMergeOperation::Max; + } + + SECTION("Long min") + { + originalValue = 30; + finalValue = 10; + diffValue = 10; + + operation = faabric::util::SnapshotMergeOperation::Max; + } + + originalData = faabric::util::valueToBytes(originalValue); + updatedData = faabric::util::valueToBytes(finalValue); + expectedData = faabric::util::valueToBytes(diffValue); + } + + SECTION("Float") + { + float originalValue = 0.0; + float finalValue = 0.0; + float diffValue = 0.0; + + dataType = faabric::util::SnapshotDataType::Float; + dataLength = sizeof(float); + regionLength = sizeof(float); + + // Note - imprecision in float arithmetic makes it difficult to test + // the floating point types here unless we use integer values. + SECTION("Float sum") + { + originalValue = 513; + finalValue = 945; + diffValue = 432; + + operation = faabric::util::SnapshotMergeOperation::Sum; + } + + SECTION("Float subtract") + { + originalValue = 945; + finalValue = 513; + diffValue = 432; + + operation = faabric::util::SnapshotMergeOperation::Subtract; + } + + SECTION("Float product") + { + originalValue = 5; + finalValue = 655; + diffValue = 131; + + operation = faabric::util::SnapshotMergeOperation::Product; + } + + SECTION("Float max") + { + originalValue = 555.55; + finalValue = 666.66; + diffValue = 666.66; + + operation = faabric::util::SnapshotMergeOperation::Max; + } + + SECTION("Float min") + { + originalValue = 222.22; + finalValue = 333.33; + diffValue = 333.33; + + operation = faabric::util::SnapshotMergeOperation::Min; + } + + originalData = faabric::util::valueToBytes(originalValue); + updatedData = faabric::util::valueToBytes(finalValue); + expectedData = faabric::util::valueToBytes(diffValue); + } + + SECTION("Double") + { + double originalValue = 0.0; + double finalValue = 0.0; + double diffValue = 0.0; + + dataType = faabric::util::SnapshotDataType::Double; + dataLength = sizeof(double); + regionLength = sizeof(double); + + // Note - imprecision in float arithmetic makes it difficult to test + // the floating point types here unless we use integer values. + SECTION("Double sum") + { + originalValue = 513; + finalValue = 945; + diffValue = 432; + + operation = faabric::util::SnapshotMergeOperation::Sum; + } + + SECTION("Double subtract") + { + originalValue = 945; + finalValue = 513; + diffValue = 432; + + operation = faabric::util::SnapshotMergeOperation::Subtract; + } + + SECTION("Double product") + { + originalValue = 5; + finalValue = 655; + diffValue = 131; + + operation = faabric::util::SnapshotMergeOperation::Product; + } + + SECTION("Double max") + { + originalValue = 555.55; + finalValue = 666.66; + diffValue = 666.66; + + operation = faabric::util::SnapshotMergeOperation::Max; + } + + SECTION("Double min") + { + originalValue = 222.22; + finalValue = 333.33; + diffValue = 333.33; + + operation = faabric::util::SnapshotMergeOperation::Min; + } + + originalData = faabric::util::valueToBytes(originalValue); + updatedData = faabric::util::valueToBytes(finalValue); + expectedData = faabric::util::valueToBytes(diffValue); + } + + SECTION("Bool") + { + bool originalValue = false; + bool finalValue = false; + bool diffValue = false; + + dataType = faabric::util::SnapshotDataType::Bool; + dataLength = sizeof(bool); + regionLength = sizeof(bool); + + SECTION("Bool overwrite") + { + originalValue = true; + finalValue = false; + diffValue = false; + + operation = faabric::util::SnapshotMergeOperation::Overwrite; + } + + originalData = faabric::util::valueToBytes(originalValue); + updatedData = faabric::util::valueToBytes(finalValue); + expectedData = faabric::util::valueToBytes(diffValue); + } + SECTION("Raw") { dataLength = 100; @@ -610,6 +820,15 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, regionLength = 0; operation = faabric::util::SnapshotMergeOperation::Overwrite; } + + SECTION("Ignore") + { + regionLength = dataLength; + operation = faabric::util::SnapshotMergeOperation::Ignore; + + expectedData = originalData; + expectNoDiff = true; + } } // Write the original data into place @@ -620,9 +839,9 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Map the snapshot to some memory size_t sharedMemSize = snapPages * HOST_PAGE_SIZE; - MemoryRegion sharedMem = allocateSharedMemory(snapPages * HOST_PAGE_SIZE); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + snap->mapToMemory({ sharedMem.get(), sharedMemSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); @@ -638,15 +857,20 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::vector actualDiffs = MemoryView({ sharedMem.get(), sharedMemSize }).diffWithSnapshot(snap); - // Check diff - REQUIRE(actualDiffs.size() == 1); - std::vector expectedDiffs = { { dataType, - operation, - offset, - { expectedData.data(), - expectedData.size() } } }; - - checkDiffs(actualDiffs, expectedDiffs); + if (expectNoDiff) { + REQUIRE(actualDiffs.empty()); + } else { + // Check diff + REQUIRE(actualDiffs.size() == 1); + std::vector expectedDiffs = { + { dataType, + operation, + offset, + { expectedData.data(), expectedData.size() } } + }; + + checkDiffs(actualDiffs, expectedDiffs); + } } TEST_CASE_METHOD(SnapshotMergeTestFixture, @@ -668,12 +892,12 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::string expectedMsg; - SECTION("Integer overwrite") + SECTION("Bool product") { - dataType = faabric::util::SnapshotDataType::Int; - operation = faabric::util::SnapshotMergeOperation::Overwrite; - dataLength = sizeof(int32_t); - expectedMsg = "Unhandled integer merge operation"; + dataType = faabric::util::SnapshotDataType::Bool; + operation = faabric::util::SnapshotMergeOperation::Product; + dataLength = sizeof(bool); + expectedMsg = "Unsupported merge op combination"; } SECTION("Raw sum") @@ -681,7 +905,7 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, dataType = faabric::util::SnapshotDataType::Raw; operation = faabric::util::SnapshotMergeOperation::Sum; dataLength = 123; - expectedMsg = "Unhandled raw merge operation"; + expectedMsg = "Unsupported merge op combination"; } // Take the snapshot @@ -689,8 +913,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, // Map the snapshot size_t sharedMemSize = snapPages * HOST_PAGE_SIZE; - MemoryRegion sharedMem = allocateSharedMemory(snapPages * HOST_PAGE_SIZE); - reg.mapSnapshot(snapKey, sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); + snap->mapToMemory({ sharedMem.get(), sharedMemSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); @@ -779,8 +1003,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, reg.registerSnapshot(snapKey, snap); // Map the snapshot - MemoryRegion sharedMem = allocateSharedMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); + snap->mapToMemory({ sharedMem.get(), sharedMemSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); @@ -834,6 +1058,130 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, checkDiffs(actualDiffs, expectedDiffs); } +TEST_CASE_METHOD(SnapshotMergeTestFixture, + "Test filling gaps in regions with overwrite", + "[snapshot][util]") +{ + int snapPages = 3; + size_t snapSize = snapPages * HOST_PAGE_SIZE; + + std::shared_ptr snap = + std::make_shared(snapSize); + + std::map expectedRegions; + + SECTION("No existing regions") + { + expectedRegions[0] = { + 0, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite + }; + } + + SECTION("One region at start") + { + snap->addMergeRegion( + 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite); + expectedRegions[0] = { + 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite + }; + expectedRegions[100] = { + 100, 0, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite + }; + } + + SECTION("One region at end") + { + snap->addMergeRegion(snapSize - 100, + 100, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); + + expectedRegions[0] = { 0, + snapSize - 100, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite }; + + expectedRegions[snapSize - 100] = { (uint32_t)snapSize - 100, + 100, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite }; + } + + SECTION("Multiple regions") + { + // Deliberately add out of order + snap->addMergeRegion(HOST_PAGE_SIZE, + sizeof(double), + SnapshotDataType::Double, + SnapshotMergeOperation::Product); + + snap->addMergeRegion( + 100, sizeof(int), SnapshotDataType::Int, SnapshotMergeOperation::Sum); + + snap->addMergeRegion(HOST_PAGE_SIZE + 200, + HOST_PAGE_SIZE, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); + + expectedRegions[0] = { + 0, 100, SnapshotDataType::Raw, SnapshotMergeOperation::Overwrite + }; + + expectedRegions[100] = { + 100, sizeof(int), SnapshotDataType::Int, SnapshotMergeOperation::Sum + }; + + expectedRegions[100 + sizeof(int)] = { + 100 + sizeof(int), + HOST_PAGE_SIZE - (100 + sizeof(int)), + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite + }; + + expectedRegions[HOST_PAGE_SIZE] = { (uint32_t)HOST_PAGE_SIZE, + sizeof(double), + SnapshotDataType::Double, + SnapshotMergeOperation::Product }; + + expectedRegions[HOST_PAGE_SIZE + sizeof(double)] = { + (uint32_t)(HOST_PAGE_SIZE + sizeof(double)), + 200 - sizeof(double), + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite + }; + + expectedRegions[HOST_PAGE_SIZE + 200] = { + (uint32_t)HOST_PAGE_SIZE + 200, + (uint32_t)HOST_PAGE_SIZE, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite + }; + + expectedRegions[(2 * HOST_PAGE_SIZE) + 200] = { + (uint32_t)(2 * HOST_PAGE_SIZE) + 200, + 0, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite + }; + } + + snap->fillGapsWithOverwriteRegions(); + + std::map actualRegions = + snap->getMergeRegions(); + + REQUIRE(actualRegions.size() == expectedRegions.size()); + for (auto [expectedOffset, expectedRegion] : expectedRegions) { + REQUIRE(actualRegions.find(expectedOffset) != actualRegions.end()); + + SnapshotMergeRegion actualRegion = actualRegions[expectedOffset]; + REQUIRE(actualRegion.offset == expectedRegion.offset); + REQUIRE(actualRegion.dataType == expectedRegion.dataType); + REQUIRE(actualRegion.length == expectedRegion.length); + REQUIRE(actualRegion.operation == expectedRegion.operation); + } +} + TEST_CASE_METHOD(SnapshotMergeTestFixture, "Test mix of applicable and non-applicable merge regions", "[snapshot][util]") @@ -846,8 +1194,8 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, reg.registerSnapshot(snapKey, snap); // Map the snapshot - MemoryRegion sharedMem = allocateSharedMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); + snap->mapToMemory({ sharedMem.get(), sharedMemSize }); // Reset dirty tracking faabric::util::resetDirtyTracking(); @@ -913,31 +1261,86 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, } TEST_CASE_METHOD(SnapshotMergeTestFixture, - "Test overwrite region to end of memory", + "Test merge regions past end of original memory", "[snapshot][util]") { int snapPages = 6; int sharedMemPages = 10; + size_t snapSize = snapPages * HOST_PAGE_SIZE; size_t sharedMemSize = sharedMemPages * HOST_PAGE_SIZE; std::shared_ptr snap = - std::make_shared(snapPages * HOST_PAGE_SIZE); + std::make_shared(snapSize); reg.registerSnapshot(snapKey, snap); // Map the snapshot - MemoryRegion sharedMem = allocateSharedMemory(sharedMemSize); - reg.mapSnapshot(snapKey, sharedMem.get()); + MemoryRegion sharedMem = allocatePrivateMemory(sharedMemSize); + snap->mapToMemory({ sharedMem.get(), snapSize }); faabric::util::resetDirtyTracking(); - // Make an edit somewhere in the extended memory, outside the original - // snapshot - uint32_t diffPageStart = (snapPages + 2) * HOST_PAGE_SIZE; - uint32_t diffOffset = diffPageStart + 100; - std::vector diffData(120, 2); - std::memcpy(sharedMem.get() + diffOffset, diffData.data(), diffData.size()); + uint32_t changeStartPage = 0; + uint32_t changeOffset = 0; + uint32_t mergeRegionStart = snapSize; + size_t changeLength = 123; + + uint32_t expectedDiffStart = 0; + uint32_t expectedDiffSize = 0; + + // When memory has changed at or past the end of the original data, the diff + // will start at the end of the original data and round up to the next page + // boundary. If the change starts before the end, it will start at the + // beginning of the change and continue into the page boundary past the + // original data. + + SECTION("Change at end of original data, overlapping merge region") + { + changeStartPage = snapSize; + changeOffset = changeStartPage + 100; + mergeRegionStart = snapSize; + expectedDiffStart = changeStartPage; + expectedDiffSize = HOST_PAGE_SIZE; + } + + SECTION("Change and merge region aligned at end of original data") + { + changeStartPage = snapSize; + changeOffset = changeStartPage; + mergeRegionStart = snapSize; + expectedDiffStart = changeStartPage; + expectedDiffSize = HOST_PAGE_SIZE; + } + + SECTION("Change after end of original data, overlapping merge region") + { + changeStartPage = (snapPages + 2) * HOST_PAGE_SIZE; + changeOffset = changeStartPage + 100; + mergeRegionStart = changeStartPage; + expectedDiffStart = changeStartPage; + expectedDiffSize = HOST_PAGE_SIZE; + } + + SECTION("Merge region and change crossing end of original data") + { + // Merge region starts before diff + changeStartPage = (snapPages - 1) * HOST_PAGE_SIZE; + changeOffset = changeStartPage + 100; + mergeRegionStart = (snapPages - 2) * HOST_PAGE_SIZE; + + // Change goes from inside original data to overshoot the end + changeLength = 2 * HOST_PAGE_SIZE; + + // Diff will cover from the start of the change to round up to the + // nearest page in the overshoot region. + expectedDiffStart = changeOffset; + expectedDiffSize = changeLength + (HOST_PAGE_SIZE - 100); + } + + std::vector diffData(changeLength, 2); + std::memcpy( + sharedMem.get() + changeOffset, diffData.data(), diffData.size()); - // Add a merge region from near end of original snapshot upwards - snap->addMergeRegion(snap->getSize() - 120, + // Add a merge region + snap->addMergeRegion(mergeRegionStart, 0, faabric::util::SnapshotDataType::Raw, faabric::util::SnapshotMergeOperation::Overwrite); @@ -945,12 +1348,12 @@ TEST_CASE_METHOD(SnapshotMergeTestFixture, std::vector actualDiffs = MemoryView({ sharedMem.get(), sharedMemSize }).diffWithSnapshot(snap); - // Make sure the whole page containing the diff is included + // Set up expected diff std::vector expectedDiffs = { { faabric::util::SnapshotDataType::Raw, faabric::util::SnapshotMergeOperation::Overwrite, - diffPageStart, - { sharedMem.get() + diffPageStart, (size_t)HOST_PAGE_SIZE } }, + expectedDiffStart, + { sharedMem.get() + expectedDiffStart, expectedDiffSize } }, }; checkDiffs(actualDiffs, expectedDiffs); @@ -1047,8 +1450,8 @@ TEST_CASE("Test snapshot mapped memory diffs", "[snapshot][util]") snap->copyInData(dataA); // Map some memory - MemoryRegion memA = allocateSharedMemory(snapSize); - snap->mapToMemory(memA.get()); + MemoryRegion memA = allocatePrivateMemory(snapSize); + snap->mapToMemory({ memA.get(), snapSize }); faabric::util::resetDirtyTracking(); @@ -1098,8 +1501,8 @@ TEST_CASE("Test snapshot mapped memory diffs", "[snapshot][util]") REQUIRE(dirtyRegionData == expectedDirtyRegionData); // Map more memory from the snapshot, check it contains all updates - MemoryRegion memB = allocateSharedMemory(snapSize); - snap->mapToMemory(memB.get()); + MemoryRegion memB = allocatePrivateMemory(snapSize); + snap->mapToMemory({ memB.get(), snapSize }); std::vector expectedFinal(snapSize, 0); std::memcpy(expectedFinal.data() + offsetA, dataA.data(), dataA.size()); std::memcpy(expectedFinal.data() + offsetB, dataB.data(), dataB.size()); @@ -1109,7 +1512,7 @@ TEST_CASE("Test snapshot mapped memory diffs", "[snapshot][util]") REQUIRE(actualMemB == expectedFinal); // Remap first memory and check this also contains all updates - snap->mapToMemory(memA.get()); + snap->mapToMemory({ memA.get(), snapSize }); std::vector remappedMemA(memB.get(), memB.get() + snapSize); REQUIRE(remappedMemA == expectedFinal); }