diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 423e1f860..f151a1e0a 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -65,6 +65,8 @@ class Executor std::atomic claimed = false; + std::atomic pendingSnapshotPush = false; + std::atomic executingTaskCount = 0; std::mutex threadsMutex; @@ -117,9 +119,9 @@ class Scheduler void setThreadResult(const faabric::Message& msg, int32_t returnValue); - void setThreadResult(const faabric::Message& msg, - int32_t returnValue, - const std::vector& diffs); + void pushSnapshotDiffs( + const faabric::Message& msg, + const std::vector& diffs); void setThreadResultLocally(uint32_t msgId, int32_t returnValue); diff --git a/include/faabric/snapshot/SnapshotClient.h b/include/faabric/snapshot/SnapshotClient.h index 3129755ce..77225cd54 100644 --- a/include/faabric/snapshot/SnapshotClient.h +++ b/include/faabric/snapshot/SnapshotClient.h @@ -20,11 +20,7 @@ getSnapshotDiffPushes(); std::vector> getSnapshotDeletes(); -std::vector>>> +std::vector>> getThreadResults(); void clearMockSnapshotRequests(); @@ -50,12 +46,6 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient void pushThreadResult(uint32_t messageId, int returnValue); - void pushThreadResult( - uint32_t messageId, - int returnValue, - const std::string& snapshotKey, - const std::vector& diffs); - private: void sendHeader(faabric::snapshot::SnapshotCalls call); }; diff --git a/include/faabric/snapshot/SnapshotServer.h b/include/faabric/snapshot/SnapshotServer.h index 9919c282b..6cb6a88ed 100644 --- a/include/faabric/snapshot/SnapshotServer.h +++ b/include/faabric/snapshot/SnapshotServer.h @@ -30,10 +30,5 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer void recvDeleteSnapshot(const uint8_t* buffer, size_t bufferSize); void recvThreadResult(const uint8_t* buffer, size_t bufferSize); - - private: - void applyDiffsToSnapshot( - const std::string& snapshotKey, - const flatbuffers::Vector>* diffs); }; } diff --git a/src/flat/faabric.fbs b/src/flat/faabric.fbs index 80bf93568..9a9509f88 100644 --- a/src/flat/faabric.fbs +++ b/src/flat/faabric.fbs @@ -20,6 +20,4 @@ table SnapshotDiffPushRequest { table ThreadResultRequest { message_id:int; return_value:int; - key:string; - chunks:[SnapshotDiffChunk]; } diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 8fb228cee..4b93d0034 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -106,8 +106,7 @@ void Executor::executeTasks(std::vector msgIdxs, if (isSnapshot && !alreadyRestored) { if ((!isMaster && isThreads) || !isThreads) { - SPDLOG_DEBUG( - "Performing snapshot restore {} [{}]", funcStr, snapshotKey); + SPDLOG_DEBUG("Restoring {} from snapshot {}", funcStr, snapshotKey); lastSnapshot = snapshotKey; restore(firstMsg); } else { @@ -124,6 +123,7 @@ void Executor::executeTasks(std::vector msgIdxs, // Note this must be done after the restore has happened if (isThreads && isSnapshot) { faabric::util::resetDirtyTracking(); + pendingSnapshotPush = true; } // Set executing task count @@ -161,7 +161,7 @@ void Executor::threadPoolThread(int threadPoolIdx) SPDLOG_DEBUG("Thread pool thread {}:{} starting up", id, threadPoolIdx); auto& sch = faabric::scheduler::getScheduler(); - auto& conf = faabric::util::getSystemConfig(); + const auto& conf = faabric::util::getSystemConfig(); bool selfShutdown = false; @@ -196,11 +196,13 @@ void Executor::threadPoolThread(int threadPoolIdx) assert(req->messages_size() >= msgIdx + 1); faabric::Message& msg = req->mutable_messages()->at(msgIdx); - SPDLOG_TRACE("Thread {}:{} executing task {} ({})", + bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; + SPDLOG_TRACE("Thread {}:{} executing task {} ({}, thread={})", id, threadPoolIdx, msgIdx, - msg.id()); + msg.id(), + isThreads); int32_t returnValue; try { @@ -208,8 +210,10 @@ void Executor::threadPoolThread(int threadPoolIdx) } catch (const std::exception& ex) { returnValue = 1; - msg.set_outputdata(fmt::format( - "Task {} threw exception. What: {}", msg.id(), ex.what())); + std::string errorMessage = fmt::format( + "Task {} threw exception. What: {}", msg.id(), ex.what()); + SPDLOG_ERROR(errorMessage); + msg.set_outputdata(errorMessage); } // Set the return value @@ -221,21 +225,21 @@ void Executor::threadPoolThread(int threadPoolIdx) bool isLastTask = oldTaskCount == 1; SPDLOG_TRACE("Task {} finished by thread {}:{} ({} left)", - msg.id(), + faabric::util::funcToString(msg, true), id, threadPoolIdx, - executingTaskCount); + oldTaskCount - 1); - // Get snapshot diffs _before_ we reset the executor - bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; - std::vector diffs; - if (isLastTask && isThreads) { + // Handle snapshot diffs _before_ we reset the executor + if (isLastTask && pendingSnapshotPush) { // Get diffs faabric::util::SnapshotData d = snapshot(); - diffs = d.getDirtyPages(); + std::vector diffs = d.getDirtyPages(); + sch.pushSnapshotDiffs(msg, diffs); - // Reset dirty page tracking now that we've got the diffs + // Reset dirty page tracking now that we've pushed the diffs faabric::util::resetDirtyTracking(); + pendingSnapshotPush = false; } // If this batch is finished, reset the executor and release its claim. @@ -255,14 +259,7 @@ void Executor::threadPoolThread(int threadPoolIdx) // on its result to continue execution, therefore must be done once the // executor has been reset, otherwise the executor may not be reused for // a repeat invocation. - if (isLastTask && isThreads) { - // Send diffs along with thread result - SPDLOG_DEBUG("Task {} finished, returning {} snapshot diffs", - msg.id(), - diffs.size()); - - sch.setThreadResult(msg, returnValue, diffs); - } else if (isThreads) { + if (isThreads) { // Set non-final thread result sch.setThreadResult(msg, returnValue); } else { diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index dcede0518..5b33627ff 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -72,6 +72,7 @@ void FunctionCallServer::recvExecuteFunctions(const uint8_t* buffer, PARSE_MSG(faabric::BatchExecuteRequest, buffer, bufferSize) // This host has now been told to execute these functions no matter what + // TODO - avoid this copy scheduler.callFunctions(std::make_shared(msg), true); } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 0869a9be0..4b13f1200 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -257,40 +257,43 @@ std::vector Scheduler::callFunctions( // This ensures everything is up to date, and we don't have to // maintain different records of which hosts hold which updates. faabric::util::SnapshotData snapshotData; - std::vector snapshotDiffs; std::string snapshotKey = firstMsg.snapshotkey(); bool snapshotNeeded = req->type() == req->THREADS || req->type() == req->PROCESSES; - if (snapshotNeeded && snapshotKey.empty()) { - SPDLOG_ERROR("No snapshot provided for {}", funcStr); - throw std::runtime_error( - "Empty snapshot for distributed threads/ processes"); - } - if (snapshotNeeded) { + if (snapshotKey.empty()) { + SPDLOG_ERROR("No snapshot provided for {}", funcStr); + throw std::runtime_error( + "Empty snapshot for distributed threads/ processes"); + } + snapshotData = faabric::snapshot::getSnapshotRegistry().getSnapshot(snapshotKey); - snapshotDiffs = snapshotData.getDirtyPages(); - - // Do the snapshot diff pushing - if (!snapshotDiffs.empty()) { - for (const auto& h : thisRegisteredHosts) { - SPDLOG_DEBUG("Pushing {} snapshot diffs for {} to {}", - snapshotDiffs.size(), - funcStr, - h); - SnapshotClient& c = getSnapshotClient(h); - c.pushSnapshotDiffs(snapshotKey, snapshotDiffs); + + if (!thisRegisteredHosts.empty()) { + std::vector snapshotDiffs = + snapshotData.getDirtyPages(); + + // Do the snapshot diff pushing + if (!snapshotDiffs.empty()) { + for (const auto& h : thisRegisteredHosts) { + SPDLOG_DEBUG("Pushing {} snapshot diffs for {} to {}", + snapshotDiffs.size(), + funcStr, + h); + SnapshotClient& c = getSnapshotClient(h); + c.pushSnapshotDiffs(snapshotKey, snapshotDiffs); + } } - } - // Now reset the dirty page tracking, as we want the next batch of - // diffs to contain everything from now on (including the updates - // sent back from all the threads) - SPDLOG_DEBUG("Resetting dirty tracking after pushing diffs {}", - funcStr); - faabric::util::resetDirtyTracking(); + // Now reset the dirty page tracking, as we want the next batch + // of diffs to contain everything from now on (including the + // updates sent back from all the threads) + SPDLOG_DEBUG("Resetting dirty tracking after pushing diffs {}", + funcStr); + faabric::util::resetDirtyTracking(); + } } // Work out how many we can handle locally @@ -513,6 +516,8 @@ int Scheduler::scheduleFunctionsOnHost( faabric::util::batchExecFactory(); hostRequest->set_snapshotkey(req->snapshotkey()); hostRequest->set_type(req->type()); + hostRequest->set_subtype(req->subtype()); + hostRequest->set_contextdata(req->contextdata()); // Add messages int nOnThisHost = std::min(available, remainder); @@ -699,27 +704,25 @@ void Scheduler::registerThread(uint32_t msgId) void Scheduler::setThreadResult(const faabric::Message& msg, int32_t returnValue) { - std::vector empty; - setThreadResult(msg, returnValue, empty); + bool isMaster = msg.masterhost() == conf.endpointHost; + + if (isMaster) { + setThreadResultLocally(msg.id(), returnValue); + } else { + SnapshotClient& c = getSnapshotClient(msg.masterhost()); + c.pushThreadResult(msg.id(), returnValue); + } } -void Scheduler::setThreadResult( +void Scheduler::pushSnapshotDiffs( const faabric::Message& msg, - int32_t returnValue, const std::vector& diffs) { bool isMaster = msg.masterhost() == conf.endpointHost; - if (isMaster) { - setThreadResultLocally(msg.id(), returnValue); - } else { + if (!isMaster && !diffs.empty()) { SnapshotClient& c = getSnapshotClient(msg.masterhost()); - - if (diffs.empty()) { - c.pushThreadResult(msg.id(), returnValue); - } else { - c.pushThreadResult(msg.id(), returnValue, msg.snapshotkey(), diffs); - } + c.pushSnapshotDiffs(msg.snapshotkey(), diffs); } } diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index deff086dd..c16176cd6 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -23,12 +23,7 @@ static std::vector< static std::vector> snapshotDeletes; -static std::vector< - std::pair>>> +static std::vector>> threadResults; std::vector> @@ -48,12 +43,7 @@ std::vector> getSnapshotDeletes() return snapshotDeletes; } -std::vector>>> -getThreadResults() +std::vector>> getThreadResults() { return threadResults; } @@ -79,7 +69,12 @@ SnapshotClient::SnapshotClient(const std::string& hostIn) void SnapshotClient::pushSnapshot(const std::string& key, const faabric::util::SnapshotData& data) { - SPDLOG_DEBUG("Pushing snapshot {} to {}", key, host); + if (data.size == 0) { + SPDLOG_ERROR("Cannot push snapshot {} with size zero to {}", key, host); + throw std::runtime_error("Pushing snapshot with zero size"); + } + + SPDLOG_DEBUG("Pushing snapshot {} to {} ({} bytes)", key, host, data.size); if (faabric::util::isMockMode()) { faabric::util::UniqueLock lock(mockMutex); @@ -154,57 +149,20 @@ void SnapshotClient::deleteSnapshot(const std::string& key) } void SnapshotClient::pushThreadResult(uint32_t messageId, int returnValue) -{ - std::vector empty; - pushThreadResult(messageId, returnValue, "", empty); -} - -void SnapshotClient::pushThreadResult( - uint32_t messageId, - int returnValue, - const std::string& snapshotKey, - const std::vector& diffs) { if (faabric::util::isMockMode()) { faabric::util::UniqueLock lock(mockMutex); - threadResults.emplace_back(std::make_pair( - host, std::make_tuple(messageId, returnValue, snapshotKey, diffs))); + threadResults.emplace_back( + std::make_pair(host, std::make_pair(messageId, returnValue))); } else { flatbuffers::FlatBufferBuilder mb; flatbuffers::Offset requestOffset; - if (!diffs.empty()) { - SPDLOG_DEBUG( - "Sending thread result for {} to {} (plus {} snapshot diffs)", - messageId, - host, - diffs.size()); - - // Create objects for the diffs - std::vector> diffsFbVector; - for (const auto& d : diffs) { - auto dataOffset = mb.CreateVector(d.data, d.size); - auto chunk = CreateSnapshotDiffChunk(mb, d.offset, dataOffset); - diffsFbVector.push_back(chunk); - } - - // Create message with diffs - auto diffsOffset = mb.CreateVector(diffsFbVector); - - auto keyOffset = mb.CreateString(snapshotKey); - requestOffset = CreateThreadResultRequest( - mb, messageId, returnValue, keyOffset, diffsOffset); - } else { - SPDLOG_DEBUG( - "Sending thread result for {} to {} (with no snapshot diffs)", - messageId, - host); - - // Create message without diffs - requestOffset = - CreateThreadResultRequest(mb, messageId, returnValue); - } + SPDLOG_DEBUG("Sending thread result for {} to {}", messageId, host); + + // Create message without diffs + requestOffset = CreateThreadResultRequest(mb, messageId, returnValue); mb.Finish(requestOffset); SEND_FB_MSG_ASYNC(SnapshotCalls::ThreadResult, mb) diff --git a/src/snapshot/SnapshotRegistry.cpp b/src/snapshot/SnapshotRegistry.cpp index b5e7a53df..e754a6a58 100644 --- a/src/snapshot/SnapshotRegistry.cpp +++ b/src/snapshot/SnapshotRegistry.cpp @@ -50,6 +50,16 @@ void SnapshotRegistry::takeSnapshot(const std::string& key, faabric::util::SnapshotData data, bool locallyRestorable) { + if (data.size == 0) { + SPDLOG_ERROR("Cannot take snapshot {} of size zero", key); + throw std::runtime_error("Taking snapshot size zero"); + } + + SPDLOG_TRACE("Registering snapshot {} size {} (restorable={})", + key, + data.size, + locallyRestorable); + // Note - we only preserve the snapshot in the in-memory file, and do not // take ownership for the original data referenced in SnapshotData faabric::util::UniqueLock lock(snapshotsMx); diff --git a/src/snapshot/SnapshotServer.cpp b/src/snapshot/SnapshotServer.cpp index be6bf086f..aaf9a2f8a 100644 --- a/src/snapshot/SnapshotServer.cpp +++ b/src/snapshot/SnapshotServer.cpp @@ -60,6 +60,11 @@ std::unique_ptr SnapshotServer::recvPushSnapshot( const SnapshotPushRequest* r = flatbuffers::GetRoot(buffer); + if (r->contents()->size() == 0) { + SPDLOG_ERROR("Received shapshot {} with zero size", r->key()->c_str()); + throw std::runtime_error("Received snapshot with zero size"); + } + SPDLOG_DEBUG("Receiving shapshot {} (size {})", r->key()->c_str(), r->contents()->size()); @@ -90,12 +95,6 @@ void SnapshotServer::recvThreadResult(const uint8_t* buffer, size_t bufferSize) const ThreadResultRequest* r = flatbuffers::GetRoot(buffer); - // Apply snapshot diffs *first* (these must be applied before other threads - // can continue) - if (r->chunks() != nullptr && r->chunks()->size() > 0) { - applyDiffsToSnapshot(r->key()->str(), r->chunks()); - } - SPDLOG_DEBUG("Receiving thread result {} for message {}", r->return_value(), r->message_id()); @@ -110,30 +109,22 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) const SnapshotDiffPushRequest* r = flatbuffers::GetRoot(buffer); - applyDiffsToSnapshot(r->key()->str(), r->chunks()); - - // Send response - return std::make_unique(); -} - -void SnapshotServer::applyDiffsToSnapshot( - const std::string& snapshotKey, - const flatbuffers::Vector>* diffs) -{ SPDLOG_DEBUG( - "Applying {} diffs to snapshot {}", diffs->size(), snapshotKey); + "Applying {} diffs to snapshot {}", r->chunks()->size(), r->key()->str()); // Get the snapshot faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); - faabric::util::SnapshotData& snap = reg.getSnapshot(snapshotKey); + faabric::util::SnapshotData& snap = reg.getSnapshot(r->key()->str()); // Copy diffs to snapshot - for (const auto* r : *diffs) { + for (const auto* r : *r->chunks()) { const uint8_t* chunkPtr = r->data()->data(); uint8_t* dest = snap.data + r->offset(); std::memcpy(dest, chunkPtr, r->data()->size()); } + // Send response + return std::make_unique(); } void SnapshotServer::recvDeleteSnapshot(const uint8_t* buffer, diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index 2ff080c50..f0dec0075 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -4,12 +4,15 @@ #include "DistTestExecutor.h" #include "init.h" +#include + #include #include #include #include #include #include +#include namespace tests { @@ -74,6 +77,108 @@ int handleFakeDiffsFunction(faabric::scheduler::Executor* exec, return 123; } +int handleFakeDiffsThreadedFunction( + faabric::scheduler::Executor* exec, + int threadPoolIdx, + int msgIdx, + std::shared_ptr req) +{ + bool isThread = req->type() == faabric::BatchExecuteRequest::THREADS; + faabric::Message& msg = req->mutable_messages()->at(msgIdx); + std::string snapshotKey = "fake-diffs-threaded-snap"; + std::string msgInput = msg.inputdata(); + + // This function creates a snapshot, then spawns some child threads that + // will modify the shared memory. It then awaits the results and checks that + // the modifications are synced back to the original host. + if (!isThread) { + int nThreads = std::stoi(msgInput); + + // Set up the snapshot + size_t snapSize = (nThreads * 4) * faabric::util::HOST_PAGE_SIZE; + uint8_t* snapMemory = (uint8_t*)mmap( + nullptr, snapSize, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + + faabric::util::SnapshotData snap; + snap.data = snapMemory; + snap.size = snapSize; + + faabric::snapshot::SnapshotRegistry& reg = + faabric::snapshot::getSnapshotRegistry(); + reg.takeSnapshot(snapshotKey, snap); + + auto req = + faabric::util::batchExecFactory(msg.user(), msg.function(), nThreads); + req->set_type(faabric::BatchExecuteRequest::THREADS); + + for (int i = 0; i < nThreads; i++) { + auto& m = req->mutable_messages()->at(i); + m.set_appindex(i); + m.set_inputdata(std::string("thread_" + std::to_string(i))); + m.set_snapshotkey(snapshotKey); + } + + // Dispatch the message, we expect them all to be executed other hosts + std::string thisHost = faabric::util::getSystemConfig().endpointHost; + faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); + std::vector executedHosts = sch.callFunctions(req); + + bool rightHosts = true; + for (auto& h : executedHosts) { + if (h == thisHost) { + SPDLOG_ERROR("Expected child threads to be executed on other " + "hosts (this host {}, actual host {})", + thisHost, + h); + rightHosts = false; + } + } + + if (!rightHosts) { + return 111; + } + + // Wait for the threads + for (auto& m : req->messages()) { + sch.awaitThreadResult(m.id()); + } + + // Check that the changes have been made to the snapshot memory + bool diffsApplied = true; + for (int i = 0; i < nThreads; i++) { + uint32_t offset = 2 * i * faabric::util::HOST_PAGE_SIZE; + std::string expectedData("thread_" + std::to_string(i)); + auto* charPtr = reinterpret_cast(snapMemory + offset); + std::string actual(charPtr); + + if (actual != expectedData) { + SPDLOG_ERROR( + "Diff not as expected. {} != {}", actual, expectedData); + diffsApplied = false; + } + } + + if (!diffsApplied) { + return 222; + } + + } else { + int idx = msg.appindex(); + uint32_t offset = 2 * idx * faabric::util::HOST_PAGE_SIZE; + + // Modify the executor's memory + std::vector inputBytes = + faabric::util::stringToBytes(msgInput); + + faabric::util::SnapshotData snap = exec->snapshot(); + std::memcpy(snap.data + offset, inputBytes.data(), inputBytes.size()); + + return 0; + } + + return 333; +} + void registerSchedulerTestFunctions() { registerDistTestExecutorCallback("threads", "simple", handleSimpleThread); @@ -82,5 +187,8 @@ void registerSchedulerTestFunctions() registerDistTestExecutorCallback( "snapshots", "fake-diffs", handleFakeDiffsFunction); + + registerDistTestExecutorCallback( + "snapshots", "fake-diffs-threaded", handleFakeDiffsThreadedFunction); } } diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index 9298c5ce7..88285aecf 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -22,23 +22,23 @@ TEST_CASE_METHOD(DistTestsFixture, "Check snapshots sent back from worker are applied", "[snapshots]") { - // Set up the snapshot + std::string user = "snapshots"; + std::string function = "fake-diffs"; std::string snapshotKey = "dist-snap-check"; size_t snapSize = 2 * faabric::util::HOST_PAGE_SIZE; uint8_t* snapMemory = (uint8_t*)mmap( nullptr, snapSize, PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + // Set up snapshot faabric::util::SnapshotData snap; snap.data = snapMemory; snap.size = snapSize; - reg.takeSnapshot(snapshotKey, snap); - // Invoke the function that ought to send back some snapshot diffs that - // should be applied + // Set up the message std::shared_ptr req = - faabric::util::batchExecFactory("snapshots", "fake-diffs", 1); + faabric::util::batchExecFactory(user, function, 1); req->set_type(faabric::BatchExecuteRequest::THREADS); // Set up some input data @@ -73,4 +73,31 @@ TEST_CASE_METHOD(DistTestsFixture, REQUIRE(actualA == expectedA); REQUIRE(actualB == expectedB); } + +TEST_CASE_METHOD(DistTestsFixture, + "Check snapshots sent back from child threads", + "[snapshots]") +{ + std::string user = "snapshots"; + std::string function = "fake-diffs-threaded"; + int nThreads = 3; + + std::shared_ptr req = + faabric::util::batchExecFactory(user, function, 1); + faabric::Message& m = req->mutable_messages()->at(0); + m.set_inputdata(std::to_string(nThreads)); + + // Force the function itself to be executed on this host, but its child + // threads on another host + faabric::HostResources res; + res.set_slots(1); + sch.setThisHostResources(res); + + std::vector expectedHosts = { MASTER_IP }; + std::vector executedHosts = sch.callFunctions(req); + REQUIRE(expectedHosts == executedHosts); + + faabric::Message actualResult = sch.getFunctionResult(m.id(), 10000); + REQUIRE(actualResult.returnvalue() == 333); +} } diff --git a/tests/dist/server.cpp b/tests/dist/server.cpp index 64adbdda6..733111946 100644 --- a/tests/dist/server.cpp +++ b/tests/dist/server.cpp @@ -15,6 +15,12 @@ int main() faabric::transport::initGlobalMessageContext(); tests::initDistTests(); + int slots = 4; + SPDLOG_INFO("Forcing distributed test server to have {} slots", slots); + faabric::HostResources res; + res.set_slots(slots); + faabric::scheduler::getScheduler().setThisHostResources(res); + // WARNING: All 0MQ operations must be contained within their own scope so // that all sockets are destructed before the context is closed. { diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 806bf3cc7..16c8ba620 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -489,7 +489,7 @@ TEST_CASE_METHOD(TestExecutorFixture, for (auto& r : results) { REQUIRE(r.first == thisHost); auto args = r.second; - sch.setThreadResultLocally(std::get<0>(args), std::get<1>(args)); + sch.setThreadResultLocally(args.first, args.second); } // Rejoin the other thread @@ -531,8 +531,8 @@ TEST_CASE_METHOD(TestExecutorFixture, std::vector actualMessageIds; for (auto& p : actual) { REQUIRE(p.first == otherHost); - uint32_t messageId = std::get<0>(p.second); - int32_t returnValue = std::get<1>(p.second); + uint32_t messageId = p.second.first; + int32_t returnValue = p.second.second; REQUIRE(returnValue == messageId / 100); actualMessageIds.push_back(messageId); @@ -720,22 +720,11 @@ TEST_CASE_METHOD(TestExecutorFixture, auto actualResults = faabric::snapshot::getThreadResults(); REQUIRE(actualResults.size() == nThreads); - // Check only one has diffs attached - std::vector diffList; - for (const auto& res : actualResults) { - REQUIRE(res.first == otherHost); - - std::vector thisDiffs = - std::get<3>(res.second); - if (thisDiffs.empty()) { - continue; - } - - if (!diffList.empty()) { - FAIL("Found more than one thread result with diffs"); - } - diffList = thisDiffs; - } + // Check diffs also sent + auto diffs = faabric::snapshot::getSnapshotDiffPushes(); + REQUIRE(diffs.size() == 1); + REQUIRE(diffs.at(0).first == otherHost); + std::vector diffList = diffs.at(0).second; // Each thread should have edited one page, check diffs are correct REQUIRE(diffList.size() == nThreads); diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 2a0e41c44..62919831d 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -182,17 +182,25 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") { std::string expectedSnapshot; faabric::BatchExecuteRequest::BatchExecuteType execMode; + int32_t expectedSubType; + std::string expectedContextData; SECTION("Threads") { execMode = faabric::BatchExecuteRequest::THREADS; expectedSnapshot = "threadSnap"; + + expectedSubType = 123; + expectedContextData = "thread context"; } SECTION("Processes") { execMode = faabric::BatchExecuteRequest::PROCESSES; expectedSnapshot = "procSnap"; + + expectedSubType = 345; + expectedContextData = "proc context"; } SECTION("Functions") @@ -245,6 +253,8 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") std::shared_ptr reqOne = faabric::util::batchExecFactory("foo", "bar", nCallsOne); reqOne->set_type(execMode); + reqOne->set_subtype(expectedSubType); + reqOne->set_contextdata(expectedContextData); for (int i = 0; i < nCallsOne; i++) { // Set snapshot key @@ -307,9 +317,13 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") // Check the message is dispatched to the other host auto batchRequestsOne = faabric::scheduler::getBatchRequests(); REQUIRE(batchRequestsOne.size() == 1); + auto batchRequestOne = batchRequestsOne.at(0); REQUIRE(batchRequestOne.first == otherHost); REQUIRE(batchRequestOne.second->messages_size() == nCallsOffloadedOne); + REQUIRE(batchRequestOne.second->type() == execMode); + REQUIRE(batchRequestOne.second->subtype() == expectedSubType); + REQUIRE(batchRequestOne.second->contextdata() == expectedContextData); // Clear mocks faabric::scheduler::clearMockRequests(); @@ -773,37 +787,14 @@ TEST_CASE_METHOD(SlowExecutorFixture, sch.setThreadResult(msg, returnValue); } - SECTION("With diffs") - { - snapshotKey = "blahblah"; - msg.set_snapshotkey(snapshotKey); - - std::vector diffDataA(10, 1); - std::vector diffDataB(20, 2); - - diffs = { - { 0, diffDataA.data(), diffDataA.size() }, - { 50, diffDataB.data(), diffDataB.size() }, - }; - - // Set the thread result - sch.setThreadResult(msg, returnValue, diffs); - } - - // Check the results have been pushed along with the thread result auto actualResults = faabric::snapshot::getThreadResults(); REQUIRE(actualResults.size() == 1); REQUIRE(actualResults.at(0).first == "otherHost"); - auto actualTuple = actualResults.at(0).second; - REQUIRE(std::get<0>(actualTuple) == msg.id()); - REQUIRE(std::get<1>(actualTuple) == returnValue); - REQUIRE(std::get<2>(actualTuple) == snapshotKey); - - std::vector actualDiffs = - std::get<3>(actualTuple); - REQUIRE(actualDiffs.size() == diffs.size()); + auto actualPair = actualResults.at(0).second; + REQUIRE(actualPair.first == msg.id()); + REQUIRE(actualPair.second == returnValue); } TEST_CASE_METHOD(DummyExecutorFixture, "Test executor reuse", "[scheduler]") diff --git a/tests/test/scheduler/test_snapshot_client_server.cpp b/tests/test/snapshot/test_snapshot_client_server.cpp similarity index 78% rename from tests/test/scheduler/test_snapshot_client_server.cpp rename to tests/test/snapshot/test_snapshot_client_server.cpp index b6b3ef8db..72bddd6a4 100644 --- a/tests/test/scheduler/test_snapshot_client_server.cpp +++ b/tests/test/snapshot/test_snapshot_client_server.cpp @@ -37,7 +37,7 @@ class SnapshotClientServerFixture TEST_CASE_METHOD(SnapshotClientServerFixture, "Test pushing and deleting snapshots", - "[scheduler]") + "[snapshot]") { // Check nothing to start with REQUIRE(reg.getSnapshotCount() == 0); @@ -91,17 +91,9 @@ void checkDiffsApplied(const uint8_t* snapBase, } TEST_CASE_METHOD(SnapshotClientServerFixture, - "Test set thread result", - "[scheduler]") + "Test push snapshot diffs", + "[snapshot]") { - // Register threads on this host - int threadIdA = 123; - int threadIdB = 345; - int returnValueA = 88; - int returnValueB = 99; - sch.registerThread(threadIdA); - sch.registerThread(threadIdB); - // Set up a snapshot faabric::util::SnapshotData snap; snap.size = 5 * faabric::util::HOST_PAGE_SIZE; @@ -119,34 +111,38 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, std::vector diffsA; std::vector diffsB; - SECTION("Without diffs") - { - cli.pushThreadResult(threadIdA, returnValueA); - cli.pushThreadResult(threadIdB, returnValueB); - } + faabric::util::SnapshotDiff diffA1(5, diffDataA1.data(), diffDataA1.size()); + faabric::util::SnapshotDiff diffA2( + 2 * faabric::util::HOST_PAGE_SIZE, diffDataA2.data(), diffDataA2.size()); + diffsA = { diffA1, diffA2 }; + cli.pushSnapshotDiffs(snapKey, diffsA); - SECTION("Empty diffs") - { - cli.pushThreadResult(threadIdA, returnValueA, snapKey, diffsA); - cli.pushThreadResult(threadIdB, returnValueB, snapKey, diffsB); - } + faabric::util::SnapshotDiff diffB( + 3 * faabric::util::HOST_PAGE_SIZE, diffDataB.data(), diffDataB.size()); + diffsB = { diffB }; + cli.pushSnapshotDiffs(snapKey, diffsB); - SECTION("With diffs") - { - faabric::util::SnapshotDiff diffA1( - 5, diffDataA1.data(), diffDataA1.size()); - faabric::util::SnapshotDiff diffA2(2 * faabric::util::HOST_PAGE_SIZE, - diffDataA2.data(), - diffDataA2.size()); - diffsA = { diffA1, diffA2 }; - cli.pushThreadResult(threadIdA, returnValueA, snapKey, diffsA); - - faabric::util::SnapshotDiff diffB(3 * faabric::util::HOST_PAGE_SIZE, - diffDataB.data(), - diffDataB.size()); - diffsB = { diffB }; - cli.pushThreadResult(threadIdB, returnValueB, snapKey, diffsB); - } + // Check changes have been applied + checkDiffsApplied(snap.data, diffsA); + checkDiffsApplied(snap.data, diffsB); + + munmap(snap.data, snap.size); +} + +TEST_CASE_METHOD(SnapshotClientServerFixture, + "Test set thread result", + "[snapshot]") +{ + // Register threads on this host + int threadIdA = 123; + int threadIdB = 345; + int returnValueA = 88; + int returnValueB = 99; + sch.registerThread(threadIdA); + sch.registerThread(threadIdB); + + cli.pushThreadResult(threadIdA, returnValueA); + cli.pushThreadResult(threadIdB, returnValueB); // Set up two threads to await the results std::thread tA([threadIdA, returnValueA] { @@ -168,11 +164,5 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, if (tB.joinable()) { tB.join(); } - - // Check changes have been applied - checkDiffsApplied(snap.data, diffsA); - checkDiffsApplied(snap.data, diffsB); - - munmap(snap.data, snap.size); } }