diff --git a/.gitignore b/.gitignore index da0bc95d5..3fdf4508e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ build/ +work/ conan-cache/ # Clang .clangd diff --git a/include/faabric/snapshot/SnapshotRegistry.h b/include/faabric/snapshot/SnapshotRegistry.h index 99e4b738f..0c3e97f8c 100644 --- a/include/faabric/snapshot/SnapshotRegistry.h +++ b/include/faabric/snapshot/SnapshotRegistry.h @@ -1,10 +1,11 @@ #pragma once -#include +#include #include #include #include +#include #include namespace faabric::snapshot { @@ -14,7 +15,8 @@ class SnapshotRegistry public: SnapshotRegistry() = default; - faabric::util::SnapshotData& getSnapshot(const std::string& key); + std::shared_ptr getSnapshot( + const std::string& key); bool snapshotExists(const std::string& key); @@ -35,11 +37,14 @@ class SnapshotRegistry void clear(); private: - std::unordered_map snapshotMap; + std::unordered_map> + snapshotMap; - std::mutex snapshotsMx; + std::shared_mutex snapshotsMx; - int writeSnapshotToFd(const std::string& key); + int writeSnapshotToFd(const std::string& key, + faabric::util::SnapshotData& data); void doTakeSnapshot(const std::string& key, faabric::util::SnapshotData data, diff --git a/include/faabric/util/bytes.h b/include/faabric/util/bytes.h index ef47afeeb..a24876289 100644 --- a/include/faabric/util/bytes.h +++ b/include/faabric/util/bytes.h @@ -25,6 +25,21 @@ int safeCopyToBuffer(const uint8_t* dataIn, uint8_t* buffer, int bufferLen); +template +T unalignedRead(const std::byte* bytes) +{ + T value; + std::copy_n(bytes, sizeof(T), reinterpret_cast(&value)); + return value; +} + +template +void unalignedWrite(const T& value, std::byte* destination) +{ + std::copy_n( + reinterpret_cast(&value), sizeof(T), destination); +} + template void appendBytesOf(std::vector& container, T value) { diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 025270d73..11b9b1733 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -282,9 +282,9 @@ void Executor::threadPoolThread(int threadPoolIdx) // Handle snapshot diffs _before_ we reset the executor if (isLastInBatch && task.needsSnapshotPush) { // Get diffs between original snapshot and after execution - faabric::util::SnapshotData snapshotPostExecution = snapshot(); + auto snapshotPostExecution = snapshot(); - faabric::util::SnapshotData snapshotPreExecution = + auto snapshotPreExecution = faabric::snapshot::getSnapshotRegistry().getSnapshot( msg.snapshotkey()); @@ -292,8 +292,8 @@ void Executor::threadPoolThread(int threadPoolIdx) msg.snapshotkey()); std::vector diffs = - snapshotPreExecution.getChangeDiffs(snapshotPostExecution.data, - snapshotPostExecution.size); + snapshotPreExecution->getChangeDiffs(snapshotPostExecution.data, + snapshotPostExecution.size); sch.pushSnapshotDiffs(msg, diffs); diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index e491da212..e5d259240 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -497,18 +497,18 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( if (!snapshotKey.empty()) { for (const auto& host : getFunctionRegisteredHosts(firstMsg, false)) { SnapshotClient& c = getSnapshotClient(host); - faabric::util::SnapshotData snapshotData = + auto snapshotData = faabric::snapshot::getSnapshotRegistry().getSnapshot(snapshotKey); // See if we've already pushed this snapshot to the given host, // if so, just push the diffs if (pushedSnapshotsMap[snapshotKey].contains(host)) { std::vector snapshotDiffs = - snapshotData.getDirtyPages(); + snapshotData->getDirtyPages(); c.pushSnapshotDiffs( snapshotKey, firstMsg.groupid(), snapshotDiffs); } else { - c.pushSnapshot(snapshotKey, firstMsg.groupid(), snapshotData); + c.pushSnapshot(snapshotKey, firstMsg.groupid(), *snapshotData); pushedSnapshotsMap[snapshotKey].insert(host); } } @@ -909,18 +909,21 @@ void Scheduler::pushSnapshotDiffs( void Scheduler::setThreadResultLocally(uint32_t msgId, int32_t returnValue) { + faabric::util::SharedLock lock(mx); SPDLOG_DEBUG("Setting result for thread {} to {}", msgId, returnValue); - threadResults[msgId].set_value(returnValue); + threadResults.at(msgId).set_value(returnValue); } int32_t Scheduler::awaitThreadResult(uint32_t messageId) { - if (threadResults.count(messageId) == 0) { + faabric::util::SharedLock lock(mx); + auto it = threadResults.find(messageId); + if (it == threadResults.end()) { SPDLOG_ERROR("Thread {} not registered on this host", messageId); throw std::runtime_error("Awaiting unregistered thread"); } - - return threadResults[messageId].get_future().get(); + lock.unlock(); + return it->second.get_future().get(); } faabric::Message Scheduler::getFunctionResult(unsigned int messageId, diff --git a/src/snapshot/SnapshotRegistry.cpp b/src/snapshot/SnapshotRegistry.cpp index 1cde5e621..1fa343ec8 100644 --- a/src/snapshot/SnapshotRegistry.cpp +++ b/src/snapshot/SnapshotRegistry.cpp @@ -7,9 +7,11 @@ #include namespace faabric::snapshot { -faabric::util::SnapshotData& SnapshotRegistry::getSnapshot( +std::shared_ptr SnapshotRegistry::getSnapshot( const std::string& key) { + faabric::util::SharedLock lock(snapshotsMx); + if (key.empty()) { SPDLOG_ERROR("Attempting to get snapshot with empty key"); throw std::runtime_error("Getting snapshot with empty key"); @@ -30,7 +32,7 @@ bool SnapshotRegistry::snapshotExists(const std::string& key) void SnapshotRegistry::mapSnapshot(const std::string& key, uint8_t* target) { - faabric::util::SnapshotData d = getSnapshot(key); + auto d = getSnapshot(key); if (!faabric::util::isPageAligned((void*)target)) { SPDLOG_ERROR( @@ -39,13 +41,13 @@ void SnapshotRegistry::mapSnapshot(const std::string& key, uint8_t* target) "Mapping snapshot to non page-aligned address"); } - if (d.fd == 0) { + if (d->fd == 0) { SPDLOG_ERROR("Attempting to map non-restorable snapshot"); throw std::runtime_error("Mapping non-restorable snapshot"); } void* mmapRes = - mmap(target, d.size, PROT_WRITE, MAP_PRIVATE | MAP_FIXED, d.fd, 0); + mmap(target, d->size, PROT_WRITE, MAP_PRIVATE | MAP_FIXED, d->fd, 0); if (mmapRes == MAP_FAILED) { SPDLOG_ERROR( @@ -78,7 +80,7 @@ void SnapshotRegistry::doTakeSnapshot(const std::string& key, throw std::runtime_error("Taking snapshot size zero"); } - faabric::util::UniqueLock lock(snapshotsMx); + faabric::util::FullLock lock(snapshotsMx); if (snapshotExists(key) && !overwrite) { SPDLOG_TRACE("Skipping already existing snapshot {}", key); @@ -92,29 +94,32 @@ void SnapshotRegistry::doTakeSnapshot(const std::string& key, // Note - we only preserve the snapshot in the in-memory file, and do not // take ownership for the original data referenced in SnapshotData - snapshotMap[key] = data; + auto shared_data = + std::make_shared(std::move(data)); + snapshotMap[key] = shared_data; // Write to fd to be locally restorable if (locallyRestorable) { - writeSnapshotToFd(key); + writeSnapshotToFd(key, *shared_data); } } void SnapshotRegistry::deleteSnapshot(const std::string& key) { - faabric::util::UniqueLock lock(snapshotsMx); + faabric::util::FullLock lock(snapshotsMx); if (snapshotMap.count(key) == 0) { return; } - faabric::util::SnapshotData d = snapshotMap[key]; + auto d = snapshotMap[key]; // Note - the data referenced by the SnapshotData object is not owned by the // snapshot registry so we don't delete it here. We only remove the file // descriptor used for mapping memory - if (d.fd > 0) { - ::close(d.fd); + if (d->fd > 0) { + ::close(d->fd); + d->fd = 0; } snapshotMap.erase(key); @@ -122,7 +127,7 @@ void SnapshotRegistry::deleteSnapshot(const std::string& key) size_t SnapshotRegistry::getSnapshotCount() { - faabric::util::UniqueLock lock(snapshotsMx); + faabric::util::FullLock lock(snapshotsMx); return snapshotMap.size(); } @@ -134,36 +139,37 @@ SnapshotRegistry& getSnapshotRegistry() void SnapshotRegistry::clear() { + faabric::util::FullLock lock(snapshotsMx); for (auto p : snapshotMap) { - if (p.second.fd > 0) { - ::close(p.second.fd); + if (p.second->fd > 0) { + ::close(p.second->fd); } } snapshotMap.clear(); } -int SnapshotRegistry::writeSnapshotToFd(const std::string& key) +int SnapshotRegistry::writeSnapshotToFd(const std::string& key, + faabric::util::SnapshotData& data) { int fd = ::memfd_create(key.c_str(), 0); - faabric::util::SnapshotData snapData = getSnapshot(key); // Make the fd big enough - int ferror = ::ftruncate(fd, snapData.size); + int ferror = ::ftruncate(fd, data.size); if (ferror) { SPDLOG_ERROR("ferror call failed with error {}", ferror); throw std::runtime_error("Failed writing memory to fd (ftruncate)"); } // Write the data - ssize_t werror = ::write(fd, snapData.data, snapData.size); + ssize_t werror = ::write(fd, data.data, data.size); if (werror == -1) { SPDLOG_ERROR("Write call failed with error {}", werror); throw std::runtime_error("Failed writing memory to fd (write)"); } // Record the fd - getSnapshot(key).fd = fd; + data.fd = fd; SPDLOG_DEBUG("Wrote snapshot {} to fd {}", key, fd); return fd; diff --git a/src/snapshot/SnapshotServer.cpp b/src/snapshot/SnapshotServer.cpp index e864d9f6b..9e800c2ed 100644 --- a/src/snapshot/SnapshotServer.cpp +++ b/src/snapshot/SnapshotServer.cpp @@ -127,7 +127,7 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) // Get the snapshot faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); - faabric::util::SnapshotData& snap = reg.getSnapshot(r->key()->str()); + auto snap = reg.getSnapshot(r->key()->str()); // Lock the function group if it exists if (groupId > 0 && @@ -138,7 +138,7 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) // Iterate through the chunks passed in the request for (const auto* chunk : *r->chunks()) { - uint8_t* dest = snap.data + chunk->offset(); + uint8_t* dest = snap->data + chunk->offset(); SPDLOG_TRACE("Applying snapshot diff to {} at {}-{}", r->key()->str(), diff --git a/tests/dist/DistTestExecutor.cpp b/tests/dist/DistTestExecutor.cpp index 7f2782dbe..f1130c21b 100644 --- a/tests/dist/DistTestExecutor.cpp +++ b/tests/dist/DistTestExecutor.cpp @@ -66,12 +66,12 @@ void DistTestExecutor::restore(faabric::Message& msg) // Initialise the dummy memory and map to snapshot faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); - faabric::util::SnapshotData& snap = reg.getSnapshot(msg.snapshotkey()); + auto snap = reg.getSnapshot(msg.snapshotkey()); // Note this has to be mmapped to be page-aligned snapshotMemory = (uint8_t*)mmap( - nullptr, snap.size, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - snapshotSize = snap.size; + nullptr, snap->size, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + snapshotSize = snap->size; reg.mapSnapshot(msg.snapshotkey(), snapshotMemory); } diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index a91ad0405..8680964ee 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -67,7 +67,7 @@ int handleFakeDiffsFunction(faabric::scheduler::Executor* exec, faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); - faabric::util::SnapshotData& originalSnap = reg.getSnapshot(snapshotKey); + auto originalSnap = reg.getSnapshot(snapshotKey); faabric::util::SnapshotData updatedSnap = exec->snapshot(); // Add a single merge region to catch both diffs @@ -75,7 +75,7 @@ int handleFakeDiffsFunction(faabric::scheduler::Executor* exec, int offsetB = 100; std::vector inputBytes = faabric::util::stringToBytes(msgInput); - originalSnap.addMergeRegion( + originalSnap->addMergeRegion( 0, offsetB + inputBytes.size() + 10, faabric::util::SnapshotDataType::Raw, @@ -209,13 +209,12 @@ int handleFakeDiffsThreadedFunction( std::vector inputBytes = faabric::util::stringToBytes(msgInput); - faabric::util::SnapshotData& originalSnap = - reg.getSnapshot(snapshotKey); + auto originalSnap = reg.getSnapshot(snapshotKey); faabric::util::SnapshotData updatedSnap = exec->snapshot(); // Make sure it's captured by the region int regionLength = 20 + inputBytes.size(); - originalSnap.addMergeRegion( + originalSnap->addMergeRegion( regionOffset, regionLength, faabric::util::SnapshotDataType::Raw, diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 022683996..ad8f67481 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -56,12 +56,12 @@ void TestExecutor::restore(faabric::Message& msg) // Initialise the dummy memory and map to snapshot faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); - faabric::util::SnapshotData& snap = reg.getSnapshot(msg.snapshotkey()); + auto snap = reg.getSnapshot(msg.snapshotkey()); // Note this has to be mmapped to be page-aligned - dummyMemorySize = snap.size; + dummyMemorySize = snap->size; dummyMemory = (uint8_t*)mmap( - nullptr, snap.size, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + nullptr, snap->size, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); reg.mapSnapshot(msg.snapshotkey(), dummyMemory); } @@ -196,9 +196,8 @@ int32_t TestExecutor::executeTask( // Modify a page of the dummy memory uint8_t pageIdx = threadPoolIdx; - faabric::util::SnapshotData& snapData = - faabric::snapshot::getSnapshotRegistry().getSnapshot( - msg.snapshotkey()); + auto snapData = faabric::snapshot::getSnapshotRegistry().getSnapshot( + msg.snapshotkey()); // Avoid writing a zero here as the memory is already zeroed hence // it's not a change @@ -208,10 +207,10 @@ int32_t TestExecutor::executeTask( // Set up a merge region that should catch the diff size_t offset = (pageIdx * faabric::util::HOST_PAGE_SIZE); - snapData.addMergeRegion(offset, - data.size() + 10, - SnapshotDataType::Raw, - SnapshotMergeOperation::Overwrite); + snapData->addMergeRegion(offset, + data.size() + 10, + SnapshotDataType::Raw, + SnapshotMergeOperation::Overwrite); SPDLOG_DEBUG("TestExecutor modifying page {} of memory ({}-{})", pageIdx, @@ -777,17 +776,17 @@ TEST_CASE_METHOD(TestExecutorFixture, REQUIRE(faabric::snapshot::getSnapshotDiffPushes().empty()); // Check that we're not registering any dirty pages on the snapshot - faabric::util::SnapshotData& snap = reg.getSnapshot(snapshotKey); - REQUIRE(snap.getDirtyPages().empty()); + auto snap = reg.getSnapshot(snapshotKey); + REQUIRE(snap->getDirtyPages().empty()); // Now reset snapshot pushes of all kinds faabric::snapshot::clearMockSnapshotRequests(); // Make an edit to the snapshot memory and get the expected diffs - snap.data[0] = 9; - snap.data[(2 * faabric::util::HOST_PAGE_SIZE) + 1] = 9; + snap->data[0] = 9; + snap->data[(2 * faabric::util::HOST_PAGE_SIZE) + 1] = 9; std::vector expectedDiffs = - snap.getDirtyPages(); + snap->getDirtyPages(); REQUIRE(expectedDiffs.size() == 2); // Set up another function diff --git a/tests/test/snapshot/test_snapshot_client_server.cpp b/tests/test/snapshot/test_snapshot_client_server.cpp index a2097e1c2..7a9a55d99 100644 --- a/tests/test/snapshot/test_snapshot_client_server.cpp +++ b/tests/test/snapshot/test_snapshot_client_server.cpp @@ -99,14 +99,16 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, // Check snapshots created in registry REQUIRE(reg.getSnapshotCount() == 2); - const SnapshotData& actualA = reg.getSnapshot(snapKeyA); - const SnapshotData& actualB = reg.getSnapshot(snapKeyB); + const auto actualA = reg.getSnapshot(snapKeyA); + const auto actualB = reg.getSnapshot(snapKeyB); - REQUIRE(actualA.size == snapA.size); - REQUIRE(actualB.size == snapB.size); + REQUIRE(actualA->size == snapA.size); + REQUIRE(actualB->size == snapB.size); - std::vector actualDataA(actualA.data, actualA.data + dataA.size()); - std::vector actualDataB(actualB.data, actualB.data + dataB.size()); + std::vector actualDataA(actualA->data, + actualA->data + dataA.size()); + std::vector actualDataB(actualB->data, + actualB->data + dataB.size()); REQUIRE(actualDataA == dataA); REQUIRE(actualDataB == dataB); diff --git a/tests/test/snapshot/test_snapshot_registry.cpp b/tests/test/snapshot/test_snapshot_registry.cpp index b68219628..2ae4e14cb 100644 --- a/tests/test/snapshot/test_snapshot_registry.cpp +++ b/tests/test/snapshot/test_snapshot_registry.cpp @@ -52,22 +52,22 @@ TEST_CASE_METHOD(SnapshotTestFixture, reg.takeSnapshot(keyB, snapB, false); reg.takeSnapshot(keyC, snapC); - SnapshotData actualA = reg.getSnapshot(keyA); - SnapshotData actualB = reg.getSnapshot(keyB); - SnapshotData actualC = reg.getSnapshot(keyC); + auto actualA = reg.getSnapshot(keyA); + auto actualB = reg.getSnapshot(keyB); + auto actualC = reg.getSnapshot(keyC); - REQUIRE(actualA.size == snapA.size); - REQUIRE(actualB.size == snapB.size); - REQUIRE(actualC.size == snapC.size); + REQUIRE(actualA->size == snapA.size); + REQUIRE(actualB->size == snapB.size); + REQUIRE(actualC->size == snapC.size); // Pointer equality here is good enough - REQUIRE(actualA.data == snapA.data); - REQUIRE(actualB.data == snapB.data); - REQUIRE(actualC.data == snapC.data); + REQUIRE(actualA->data == snapA.data); + REQUIRE(actualB->data == snapB.data); + REQUIRE(actualC->data == snapC.data); - REQUIRE(actualA.fd > 0); - REQUIRE(actualB.fd == 0); - REQUIRE(actualC.fd > 0); + REQUIRE(actualA->fd > 0); + REQUIRE(actualB->fd == 0); + REQUIRE(actualC->fd > 0); // Create regions onto which we will map the snapshots uint8_t* actualDataA = allocatePages(1); @@ -146,9 +146,9 @@ TEST_CASE_METHOD(SnapshotTestFixture, // Check existing snapshot is not overwritten reg.takeSnapshotIfNotExists(keyA, snapUpdateA, true); - SnapshotData snapAfterA = reg.getSnapshot(keyA); - REQUIRE(snapAfterA.data == snapBefore.data); - REQUIRE(snapAfterA.size == snapBefore.size); + auto snapAfterA = reg.getSnapshot(keyA); + REQUIRE(snapAfterA->data == snapBefore.data); + REQUIRE(snapAfterA->size == snapBefore.size); // Check new snapshot is still created reg.takeSnapshotIfNotExists(keyB, snapUpdateB, true); @@ -157,9 +157,9 @@ TEST_CASE_METHOD(SnapshotTestFixture, REQUIRE(reg.snapshotExists(keyB)); REQUIRE(reg.getSnapshotCount() == 2); - SnapshotData snapAfterB = reg.getSnapshot(keyB); - REQUIRE(snapAfterB.data == otherDataB.data()); - REQUIRE(snapAfterB.size == otherDataB.size()); + auto snapAfterB = reg.getSnapshot(keyB); + REQUIRE(snapAfterB->data == otherDataB.data()); + REQUIRE(snapAfterB->size == otherDataB.size()); } TEST_CASE_METHOD(SnapshotTestFixture, diff --git a/tests/test/transport/test_message_endpoint_client.cpp b/tests/test/transport/test_message_endpoint_client.cpp index 09ce5abda..80a11470f 100644 --- a/tests/test/transport/test_message_endpoint_client.cpp +++ b/tests/test/transport/test_message_endpoint_client.cpp @@ -13,6 +13,9 @@ using namespace faabric::transport; namespace tests { +// These tests are unstable under ThreadSanitizer +#if !(defined(__has_feature) && __has_feature(thread_sanitizer)) + TEST_CASE_METHOD(SchedulerTestFixture, "Test send/recv one message", "[transport]") @@ -142,8 +145,6 @@ TEST_CASE_METHOD(SchedulerTestFixture, } } -// This test hangs ThreadSanitizer -#if !(defined(__has_feature) && __has_feature(thread_sanitizer)) TEST_CASE_METHOD(SchedulerTestFixture, "Test send/recv many messages from many clients", "[transport]") @@ -183,7 +184,6 @@ TEST_CASE_METHOD(SchedulerTestFixture, } } } -#endif TEST_CASE_METHOD(SchedulerTestFixture, "Test can't set invalid send/recv timeouts", @@ -223,4 +223,7 @@ TEST_CASE_METHOD(SchedulerTestFixture, REQUIRE_THROWS(SyncSendMessageEndpoint(LOCALHOST, TEST_PORT + 10, -1)); } } + +#endif + } diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index e19973b94..c75a67963 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -145,8 +145,8 @@ class SnapshotTestFixture void removeSnapshot(const std::string& key, int nPages) { - faabric::util::SnapshotData snap = reg.getSnapshot(key); - deallocatePages(snap.data, nPages); + auto snap = reg.getSnapshot(key); + deallocatePages(snap->data, nPages); reg.deleteSnapshot(key); }