diff --git a/dist-test/dev_server.sh b/dist-test/dev_server.sh index ed710b18d..0c1d26516 100755 --- a/dist-test/dev_server.sh +++ b/dist-test/dev_server.sh @@ -6,14 +6,18 @@ PROJ_ROOT=${THIS_DIR}/.. pushd ${PROJ_ROOT} > /dev/null if [[ -z "$1" ]]; then - docker-compose \ - up \ - -d \ - dist-test-server + docker-compose up -d dist-test-server +elif [[ "$1" == "restart" ]]; then + docker-compose restart dist-test-server +elif [[ "$1" == "stop" ]]; then + docker-compose stop dist-test-server else - docker-compose \ - restart \ - dist-test-server + echo "Unrecognised argument: $1" + echo "" + echo "Usage:" + echo "" + echo "./dist-test/dev_server.sh [restart|stop]" + exit 1 fi popd > /dev/null diff --git a/docs/development.md b/docs/development.md index 4b1d18a80..533a6614c 100644 --- a/docs/development.md +++ b/docs/development.md @@ -91,10 +91,14 @@ inv dev.cc faabric_dist_tests inv dev.cc faabric_dist_test_server ``` -In another terminal, start the server: +In another terminal, (re)start the server: ```bash +# Start ./dist-tests/dev_server.sh + +# Restart +./dist-tests/dev_server.sh restart ``` Back in the CLI, you can then run the tests: diff --git a/include/faabric/scheduler/FunctionCallApi.h b/include/faabric/scheduler/FunctionCallApi.h index 0da78587c..eb9b83449 100644 --- a/include/faabric/scheduler/FunctionCallApi.h +++ b/include/faabric/scheduler/FunctionCallApi.h @@ -7,7 +7,6 @@ enum FunctionCalls ExecuteFunctions = 1, Flush = 2, Unregister = 3, - GetResources = 4, - SetThreadResult = 5, + GetResources = 4 }; } diff --git a/include/faabric/scheduler/FunctionCallClient.h b/include/faabric/scheduler/FunctionCallClient.h index be86d98a1..921139c86 100644 --- a/include/faabric/scheduler/FunctionCallClient.h +++ b/include/faabric/scheduler/FunctionCallClient.h @@ -38,14 +38,11 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient public: explicit FunctionCallClient(const std::string& hostIn); - /* Function call client external API */ - void sendFlush(); faabric::HostResources getResources(); - void executeFunctions( - const std::shared_ptr req); + void executeFunctions(std::shared_ptr req); void unregister(faabric::UnregisterRequest& req); diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 0b1b41e74..f95219959 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -188,48 +189,49 @@ class Scheduler faabric::util::SystemConfig& conf; + std::shared_mutex mx; + + // ---- Executors ---- std::vector> deadExecutors; std::unordered_map>> executors; - std::shared_mutex mx; - + // ---- Threads ---- std::unordered_map> threadResults; + std::unordered_map>> + localResults; + + std::mutex localResultsMutex; + + // ---- Clients ---- faabric::scheduler::FunctionCallClient& getFunctionCallClient( const std::string& otherHost); faabric::snapshot::SnapshotClient& getSnapshotClient( const std::string& otherHost); + // ---- Host resources and hosts ---- faabric::HostResources thisHostResources; std::atomic thisHostUsedSlots; - std::set availableHostsCache; - std::unordered_map> registeredHosts; - std::unordered_map>> - localResults; - std::mutex localResultsMutex; + void updateHostResources(); - std::vector recordedMessagesAll; - std::vector recordedMessagesLocal; - std::vector> - recordedMessagesShared; + faabric::HostResources getHostResources(const std::string& host); - std::vector getUnregisteredHosts(const std::string& funcStr, - bool noCache = false); + // ---- Actual scheduling ---- + std::set availableHostsCache; + + std::unordered_map> registeredHosts; std::shared_ptr claimExecutor( faabric::Message& msg, faabric::util::FullLock& schedulerLock); - faabric::HostResources getHostResources(const std::string& host); - - ExecGraphNode getFunctionExecGraphNode(unsigned int msgId); - - void updateHostResources(); + std::vector getUnregisteredHosts(const std::string& funcStr, + bool noCache = false); int scheduleFunctionsOnHost( const std::string& host, @@ -237,6 +239,17 @@ class Scheduler faabric::util::SchedulingDecision& decision, int offset, faabric::util::SnapshotData* snapshot); + + // ---- Accounting and debugging ---- + std::vector recordedMessagesAll; + std::vector recordedMessagesLocal; + std::vector> + recordedMessagesShared; + + ExecGraphNode getFunctionExecGraphNode(unsigned int msgId); + + // ---- Point-to-point ---- + faabric::transport::PointToPointBroker& broker; }; } diff --git a/include/faabric/snapshot/SnapshotClient.h b/include/faabric/snapshot/SnapshotClient.h index 77225cd54..2c1aedded 100644 --- a/include/faabric/snapshot/SnapshotClient.h +++ b/include/faabric/snapshot/SnapshotClient.h @@ -26,7 +26,7 @@ getThreadResults(); void clearMockSnapshotRequests(); // ----------------------------------- -// gRPC client +// Client // ----------------------------------- class SnapshotClient final : public faabric::transport::MessageEndpointClient @@ -34,12 +34,12 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient public: explicit SnapshotClient(const std::string& hostIn); - /* Snapshot client external API */ - void pushSnapshot(const std::string& key, + int32_t groupId, const faabric::util::SnapshotData& data); void pushSnapshotDiffs(std::string snapshotKey, + int32_t groupId, std::vector diffs); void deleteSnapshot(const std::string& key); diff --git a/include/faabric/snapshot/SnapshotServer.h b/include/faabric/snapshot/SnapshotServer.h index 6cb6a88ed..3d79cb98d 100644 --- a/include/faabric/snapshot/SnapshotServer.h +++ b/include/faabric/snapshot/SnapshotServer.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace faabric::snapshot { class SnapshotServer final : public faabric::transport::MessageEndpointServer @@ -30,5 +31,8 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer void recvDeleteSnapshot(const uint8_t* buffer, size_t bufferSize); void recvThreadResult(const uint8_t* buffer, size_t bufferSize); + + private: + faabric::transport::PointToPointBroker& broker; }; } diff --git a/include/faabric/transport/MessageEndpoint.h b/include/faabric/transport/MessageEndpoint.h index b8ca3c212..515b6be39 100644 --- a/include/faabric/transport/MessageEndpoint.h +++ b/include/faabric/transport/MessageEndpoint.h @@ -13,9 +13,10 @@ #define ANY_HOST "0.0.0.0" // These timeouts should be long enough to permit sending and receiving large -// messages, but short enough not to hang around when something has gone wrong. -#define DEFAULT_RECV_TIMEOUT_MS 20000 -#define DEFAULT_SEND_TIMEOUT_MS 20000 +// messages, note that they also determine the period on which endpoints will +// re-poll. +#define DEFAULT_RECV_TIMEOUT_MS 60000 +#define DEFAULT_SEND_TIMEOUT_MS 60000 // How long undelivered messages will hang around when the socket is closed, // which also determines how long the context will hang for when closing if diff --git a/include/faabric/transport/PointToPointBroker.h b/include/faabric/transport/PointToPointBroker.h index ebdd1866f..cb4bd33c8 100644 --- a/include/faabric/transport/PointToPointBroker.h +++ b/include/faabric/transport/PointToPointBroker.h @@ -1,22 +1,94 @@ #pragma once -#include #include +#include #include +#include +#include #include #include +#include #include #include #include +#define DEFAULT_DISTRIBUTED_TIMEOUT_MS 30000 + +#define POINT_TO_POINT_MASTER_IDX 0 + namespace faabric::transport { + +class PointToPointBroker; + +class PointToPointGroup +{ + public: + static std::shared_ptr getGroup(int groupId); + + static bool groupExists(int groupId); + + static void addGroup(int appId, int groupId, int groupSize); + + static void clear(); + + PointToPointGroup(int appId, int groupIdIn, int groupSizeIn); + + void lock(int groupIdx, bool recursive); + + void unlock(int groupIdx, bool recursive); + + int getLockOwner(bool recursive); + + void localLock(); + + void localUnlock(); + + bool localTryLock(); + + void barrier(int groupIdx); + + void notify(int groupIdx); + + int getNotifyCount(); + + private: + faabric::util::SystemConfig& conf; + + int timeoutMs = DEFAULT_DISTRIBUTED_TIMEOUT_MS; + + std::string masterHost; + int appId = 0; + int groupId = 0; + int groupSize = 0; + + std::mutex mx; + + // Transport + faabric::transport::PointToPointBroker& ptpBroker; + + // Local lock + std::timed_mutex localMx; + std::recursive_timed_mutex localRecursiveMx; + + // Distributed lock + std::stack recursiveLockOwners; + int lockOwnerIdx = -1; + std::queue lockWaiters; + + void notifyLocked(int groupIdx); + + void masterLock(int groupIdx, bool recursive); + + void masterUnlock(int groupIdx, bool recursive); +}; + class PointToPointBroker { public: PointToPointBroker(); - std::string getHostForReceiver(int appId, int recvIdx); + std::string getHostForReceiver(int groupId, int recvIdx); std::set setUpLocalMappingsFromSchedulingDecision( const faabric::util::SchedulingDecision& decision); @@ -24,17 +96,17 @@ class PointToPointBroker void setAndSendMappingsFromSchedulingDecision( const faabric::util::SchedulingDecision& decision); - void waitForMappingsOnThisHost(int appId); + void waitForMappingsOnThisHost(int groupId); - std::set getIdxsRegisteredForApp(int appId); + std::set getIdxsRegisteredForGroup(int groupId); - void sendMessage(int appId, + void sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t* buffer, size_t bufferSize); - std::vector recvMessage(int appId, int sendIdx, int recvIdx); + std::vector recvMessage(int groupId, int sendIdx, int recvIdx); void clear(); @@ -43,16 +115,14 @@ class PointToPointBroker private: std::shared_mutex brokerMutex; - std::unordered_map> appIdxs; + std::unordered_map> groupIdIdxsMap; std::unordered_map mappings; - std::unordered_map appMappingsFlags; - std::unordered_map appMappingMutexes; - std::unordered_map appMappingCvs; - - std::shared_ptr getClient(const std::string& host); + std::unordered_map groupMappingsFlags; + std::unordered_map groupMappingMutexes; + std::unordered_map groupMappingCvs; - faabric::scheduler::Scheduler& sch; + faabric::util::SystemConfig& conf; }; PointToPointBroker& getPointToPointBroker(); diff --git a/include/faabric/transport/PointToPointCall.h b/include/faabric/transport/PointToPointCall.h index a636b0b39..a719e60cc 100644 --- a/include/faabric/transport/PointToPointCall.h +++ b/include/faabric/transport/PointToPointCall.h @@ -5,6 +5,10 @@ namespace faabric::transport { enum PointToPointCall { MAPPING = 0, - MESSAGE = 1 + MESSAGE = 1, + LOCK_GROUP = 2, + LOCK_GROUP_RECURSIVE = 3, + UNLOCK_GROUP = 4, + UNLOCK_GROUP_RECURSIVE = 5, }; } diff --git a/include/faabric/transport/PointToPointClient.h b/include/faabric/transport/PointToPointClient.h index 806592426..231d40a59 100644 --- a/include/faabric/transport/PointToPointClient.h +++ b/include/faabric/transport/PointToPointClient.h @@ -2,6 +2,7 @@ #include #include +#include namespace faabric::transport { @@ -11,6 +12,11 @@ getSentMappings(); std::vector> getSentPointToPointMessages(); +std::vector> +getSentLockMessages(); + void clearSentMessages(); class PointToPointClient : public faabric::transport::MessageEndpointClient @@ -21,5 +27,21 @@ class PointToPointClient : public faabric::transport::MessageEndpointClient void sendMappings(faabric::PointToPointMappings& mappings); void sendMessage(faabric::PointToPointMessage& msg); + + void groupLock(int appId, + int groupId, + int groupIdx, + bool recursive = false); + + void groupUnlock(int appId, + int groupId, + int groupIdx, + bool recursive = false); + + private: + void makeCoordinationRequest(int appId, + int groupId, + int groupIdx, + faabric::transport::PointToPointCall call); }; } diff --git a/include/faabric/transport/PointToPointServer.h b/include/faabric/transport/PointToPointServer.h index 2c961b908..29a15e77c 100644 --- a/include/faabric/transport/PointToPointServer.h +++ b/include/faabric/transport/PointToPointServer.h @@ -25,5 +25,13 @@ class PointToPointServer final : public MessageEndpointServer std::unique_ptr doRecvMappings( const uint8_t* buffer, size_t bufferSize); + + void recvGroupLock(const uint8_t* buffer, + size_t bufferSize, + bool recursive); + + void recvGroupUnlock(const uint8_t* buffer, + size_t bufferSize, + bool recursive); }; } diff --git a/include/faabric/util/scheduling.h b/include/faabric/util/scheduling.h index 670067345..4c8d8a5ce 100644 --- a/include/faabric/util/scheduling.h +++ b/include/faabric/util/scheduling.h @@ -14,22 +14,31 @@ class SchedulingDecision static SchedulingDecision fromPointToPointMappings( faabric::PointToPointMappings& mappings); - SchedulingDecision(uint32_t appIdIn); + SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn); uint32_t appId = 0; - int32_t nFunctions = 0; + int32_t groupId = 0; - std::vector messageIds; + int32_t nFunctions = 0; std::vector hosts; + std::vector messageIds; + std::vector appIdxs; + std::vector groupIdxs; + std::string returnHost; void addMessage(const std::string& host, const faabric::Message& msg); void addMessage(const std::string& host, int32_t messageId, int32_t appIdx); + + void addMessage(const std::string& host, + int32_t messageId, + int32_t appIdx, + int32_t groupIdx); }; } diff --git a/include/faabric/util/string_tools.h b/include/faabric/util/string_tools.h index 5b3a0ee1b..fe257434f 100644 --- a/include/faabric/util/string_tools.h +++ b/include/faabric/util/string_tools.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -15,4 +16,22 @@ bool contains(const std::string& input, const std::string& subStr); std::string removeSubstr(const std::string& input, const std::string& toErase); bool stringIsInt(const std::string& input); + +template +std::string vectorToString(std::vector vec) +{ + std::stringstream ss; + + ss << "["; + for (int i = 0; i < vec.size(); i++) { + ss << vec.at(i); + + if (i < vec.size() - 1) { + ss << ", "; + } + } + ss << "]"; + + return ss.str(); +} } diff --git a/src/flat/faabric.fbs b/src/flat/faabric.fbs index 725e5bffd..db2e6d3f5 100644 --- a/src/flat/faabric.fbs +++ b/src/flat/faabric.fbs @@ -1,5 +1,6 @@ table SnapshotPushRequest { key:string; + groupid:int; contents:[ubyte]; } @@ -16,6 +17,7 @@ table SnapshotDiffChunk { table SnapshotDiffPushRequest { key:string; + groupid:int; chunks:[SnapshotDiffChunk]; } diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 026ad14f5..65b2d800c 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -57,6 +57,10 @@ message FunctionStatusResponse { FunctionStatus status = 1; } +// --------------------------------------------- +// MPI +// --------------------------------------------- + message MPIMessage { enum MPIMessageType { NORMAL = 0; @@ -94,7 +98,7 @@ message MpiHostsToRanksMessage { message Message { int32 id = 1; int32 appId = 2; - int32 appIndex = 3; + int32 appIdx = 3; string masterHost = 4; enum MessageType { @@ -133,21 +137,27 @@ message Message { string pythonFunction = 25; string pythonEntry = 26; - bool isMpi = 27; - int32 mpiWorldId = 28; - int32 mpiRank = 29; - int32 mpiWorldSize = 30; + // Function groups + int32 groupId = 27; + int32 groupIdx = 28; + int32 groupSize = 29; - string cmdline = 31; + // MPI + bool isMpi = 30; + int32 mpiWorldId = 31; + int32 mpiRank = 32; + int32 mpiWorldSize = 33; + + string cmdline = 34; // SGX - bool isSgx = 32; + bool isSgx = 35; - string sgxSid = 33; - string sgxNonce = 34; - string sgxTag = 35; - bytes sgxPolicy = 36; - bytes sgxResult = 37; + string sgxSid = 36; + string sgxNonce = 37; + string sgxTag = 38; + bytes sgxPolicy = 39; + bytes sgxResult = 40; } // --------------------------------------------- @@ -207,20 +217,22 @@ message StateAppendedResponse { message PointToPointMessage { int32 appId = 1; - int32 sendIdx = 2; - int32 recvIdx = 3; - - bytes data = 4; + int32 groupId = 2; + int32 sendIdx = 3; + int32 recvIdx = 4; + bytes data = 5; } message PointToPointMappings { int32 appId = 1; + int32 groupId = 2; message PointToPointMapping { string host = 1; int32 messageId = 2; - int32 recvIdx = 3; + int32 appIdx = 3; + int32 groupIdx = 4; } - repeated PointToPointMapping mappings = 2; + repeated PointToPointMapping mappings = 3; } diff --git a/src/scheduler/CMakeLists.txt b/src/scheduler/CMakeLists.txt index fa02a1c8c..f3772c10a 100644 --- a/src/scheduler/CMakeLists.txt +++ b/src/scheduler/CMakeLists.txt @@ -1,15 +1,14 @@ - faabric_lib(scheduler + ExecGraph.cpp ExecutorFactory.cpp Executor.cpp - ExecGraph.cpp FunctionCallClient.cpp FunctionCallServer.cpp - Scheduler.cpp MpiContext.cpp MpiMessageBuffer.cpp - MpiWorldRegistry.cpp MpiWorld.cpp + MpiWorldRegistry.cpp + Scheduler.cpp ) target_link_libraries(scheduler PRIVATE diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index edd511a72..0a2c77c67 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -159,14 +159,14 @@ void Executor::executeTasks(std::vector msgIdxs, int threadPoolIdx; if (isThreads) { assert(threadPoolSize > 1); - threadPoolIdx = (msg.appindex() % (threadPoolSize - 1)) + 1; + threadPoolIdx = (msg.appidx() % (threadPoolSize - 1)) + 1; } else { - threadPoolIdx = msg.appindex() % threadPoolSize; + threadPoolIdx = msg.appidx() % threadPoolSize; } // Enqueue the task SPDLOG_TRACE( - "Assigning app index {} to thread {}", msg.appindex(), threadPoolIdx); + "Assigning app index {} to thread {}", msg.appidx(), threadPoolIdx); threadTaskQueues[threadPoolIdx].enqueue(ExecutorTask( msgIdx, req, batchCounter, needsSnapshotPush, skipReset)); diff --git a/src/scheduler/FunctionCallClient.cpp b/src/scheduler/FunctionCallClient.cpp index 3492e04f2..c37f3c174 100644 --- a/src/scheduler/FunctionCallClient.cpp +++ b/src/scheduler/FunctionCallClient.cpp @@ -10,7 +10,8 @@ namespace faabric::scheduler { // ----------------------------------- // Mocking // ----------------------------------- -std::mutex mockMutex; + +static std::mutex mockMutex; static std::vector> functionCalls; diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index e65267cd9..263ac7d87 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -45,6 +46,7 @@ Scheduler& getScheduler() Scheduler::Scheduler() : thisHost(faabric::util::getSystemConfig().endpointHost) , conf(faabric::util::getSystemConfig()) + , broker(faabric::transport::getPointToPointBroker()) { // Set up the initial resources int cores = faabric::util::getUsableCores(); @@ -223,9 +225,9 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( } // Set up scheduling decision - SchedulingDecision decision(firstMsg.appid()); + SchedulingDecision decision(firstMsg.appid(), firstMsg.groupid()); - // TODO - more granular locking, this is incredibly conservative + // TODO - more granular locking, this is conservative faabric::util::FullLock lock(mx); // If we're not the master host, we need to forward the request back to the @@ -286,7 +288,8 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( funcStr, h); SnapshotClient& c = getSnapshotClient(h); - c.pushSnapshotDiffs(snapshotKey, snapshotDiffs); + c.pushSnapshotDiffs( + snapshotKey, firstMsg.groupid(), snapshotDiffs); } } @@ -439,6 +442,12 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( } } + // Send out point-to-point mappings if necessary (unless being forced to + // execute locally, in which case they will be transmitted from the master) + if (!forceLocal && (firstMsg.groupid() > 0)) { + broker.setAndSendMappingsFromSchedulingDecision(decision); + } + // Records for tests if (faabric::util::isTestMode()) { for (int i = 0; i < nMessages; i++) { @@ -546,7 +555,7 @@ int Scheduler::scheduleFunctionsOnHost( std::string snapshotKey = firstMsg.snapshotkey(); if (snapshot != nullptr && !snapshotKey.empty()) { SnapshotClient& c = getSnapshotClient(host); - c.pushSnapshot(snapshotKey, *snapshot); + c.pushSnapshot(snapshotKey, firstMsg.groupid(), *snapshot); } getFunctionCallClient(host).executeFunctions(hostRequest); @@ -746,7 +755,7 @@ void Scheduler::pushSnapshotDiffs( if (!isMaster && !diffs.empty()) { SnapshotClient& c = getSnapshotClient(msg.masterhost()); - c.pushSnapshotDiffs(msg.snapshotkey(), diffs); + c.pushSnapshotDiffs(msg.snapshotkey(), msg.groupid(), diffs); } } diff --git a/src/snapshot/SnapshotClient.cpp b/src/snapshot/SnapshotClient.cpp index 90fcbe4a8..0f2009b58 100644 --- a/src/snapshot/SnapshotClient.cpp +++ b/src/snapshot/SnapshotClient.cpp @@ -67,6 +67,7 @@ SnapshotClient::SnapshotClient(const std::string& hostIn) {} void SnapshotClient::pushSnapshot(const std::string& key, + int groupId, const faabric::util::SnapshotData& data) { if (data.size == 0) { @@ -86,7 +87,7 @@ void SnapshotClient::pushSnapshot(const std::string& key, auto keyOffset = mb.CreateString(key); auto dataOffset = mb.CreateVector(data.data, data.size); auto requestOffset = - CreateSnapshotPushRequest(mb, keyOffset, dataOffset); + CreateSnapshotPushRequest(mb, keyOffset, groupId, dataOffset); mb.Finish(requestOffset); // Send it @@ -96,6 +97,7 @@ void SnapshotClient::pushSnapshot(const std::string& key, void SnapshotClient::pushSnapshotDiffs( std::string snapshotKey, + int groupId, std::vector diffs) { if (faabric::util::isMockMode()) { @@ -124,7 +126,7 @@ void SnapshotClient::pushSnapshotDiffs( auto keyOffset = mb.CreateString(snapshotKey); auto diffsOffset = mb.CreateVector(diffsFbVector); auto requestOffset = - CreateSnapshotDiffPushRequest(mb, keyOffset, diffsOffset); + CreateSnapshotDiffPushRequest(mb, keyOffset, groupId, diffsOffset); mb.Finish(requestOffset); SEND_FB_MSG(SnapshotCalls::PushSnapshotDiffs, mb); diff --git a/src/snapshot/SnapshotServer.cpp b/src/snapshot/SnapshotServer.cpp index 7d0d87b36..7cb778be7 100644 --- a/src/snapshot/SnapshotServer.cpp +++ b/src/snapshot/SnapshotServer.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -18,6 +19,7 @@ SnapshotServer::SnapshotServer() SNAPSHOT_SYNC_PORT, SNAPSHOT_INPROC_LABEL, faabric::util::getSystemConfig().snapshotServerThreads) + , broker(faabric::transport::getPointToPointBroker()) {} void SnapshotServer::doAsyncRecv(int header, @@ -69,9 +71,10 @@ std::unique_ptr SnapshotServer::recvPushSnapshot( throw std::runtime_error("Received snapshot with zero size"); } - SPDLOG_DEBUG("Receiving shapshot {} (size {})", + SPDLOG_DEBUG("Receiving snapshot {} (size {}, lock {})", r->key()->c_str(), - r->contents()->size()); + r->contents()->size(), + r->groupid()); faabric::snapshot::SnapshotRegistry& reg = faabric::snapshot::getSnapshotRegistry(); @@ -80,6 +83,12 @@ std::unique_ptr SnapshotServer::recvPushSnapshot( faabric::util::SnapshotData data; data.size = r->contents()->size(); + // Lock the function group if necessary + if (r->groupid() > 0) { + faabric::transport::PointToPointGroup::getGroup(r->groupid()) + ->localLock(); + } + // TODO - avoid this copy by changing server superclass to allow subclasses // to provide a buffer to receive data. // TODO - work out snapshot ownership here, how do we know when to delete @@ -90,6 +99,12 @@ std::unique_ptr SnapshotServer::recvPushSnapshot( reg.takeSnapshot(r->key()->str(), data, true); + // Unlock the application + if (r->groupid() > 0) { + faabric::transport::PointToPointGroup::getGroup(r->groupid()) + ->localUnlock(); + } + // Send response return std::make_unique(); } @@ -121,6 +136,12 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) faabric::snapshot::getSnapshotRegistry(); faabric::util::SnapshotData& snap = reg.getSnapshot(r->key()->str()); + // Lock the function group + if (r->groupid() > 0) { + faabric::transport::PointToPointGroup::getGroup(r->groupid()) + ->localLock(); + } + // Apply diffs to snapshot for (const auto* r : *r->chunks()) { uint8_t* dest = snap.data + r->offset(); @@ -181,6 +202,12 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize) } } + // Unlock + if (r->groupid() > 0) { + faabric::transport::PointToPointGroup::getGroup(r->groupid()) + ->localUnlock(); + } + // Send response return std::make_unique(); } diff --git a/src/transport/MessageEndpoint.cpp b/src/transport/MessageEndpoint.cpp index 6eec1c457..119324885 100644 --- a/src/transport/MessageEndpoint.cpp +++ b/src/transport/MessageEndpoint.cpp @@ -256,7 +256,10 @@ std::optional MessageEndpoint::recvBuffer(zmq::socket_t& socket, auto res = socket.recv(zmq::buffer(msg.udata(), msg.size())); if (!res.has_value()) { - SPDLOG_TRACE("Timed out receiving message of size {}", size); + SPDLOG_TRACE("Did not receive message size {} within {}ms on {}", + size, + timeoutMs, + address); return std::nullopt; } @@ -288,7 +291,9 @@ std::optional MessageEndpoint::recvNoBuffer(zmq::socket_t& socket) try { auto res = socket.recv(msg); if (!res.has_value()) { - SPDLOG_TRACE("Timed out receiving message with no size"); + SPDLOG_TRACE("Did not receive message within {}ms on {}", + timeoutMs, + address); return std::nullopt; } } catch (zmq::error_t& e) { diff --git a/src/transport/PointToPointBroker.cpp b/src/transport/PointToPointBroker.cpp index 5a9b35f60..38e9e4921 100644 --- a/src/transport/PointToPointBroker.cpp +++ b/src/transport/PointToPointBroker.cpp @@ -6,10 +6,22 @@ #include #include -#define MAPPING_TIMEOUT_MS 10000 +#define NO_LOCK_OWNER_IDX -1 + +#define LOCK_TIMEOUT(mx, ms) \ + auto timePoint = \ + std::chrono::system_clock::now() + std::chrono::milliseconds(ms); \ + bool success = mx.try_lock_until(timePoint); \ + if (!success) { \ + throw std::runtime_error("Distributed coordination timeout"); \ + } + +#define MAPPING_TIMEOUT_MS 20000 namespace faabric::transport { +static std::unordered_map> groups; + // NOTE: Keeping 0MQ sockets in TLS is usually a bad idea, as they _must_ be // closed before the global context. However, in this case it's worth it // to cache the sockets across messages, as otherwise we'd be creating and @@ -28,29 +40,283 @@ thread_local std::unordered_map> clients; -std::string getPointToPointKey(int appId, int sendIdx, int recvIdx) +static std::shared_ptr getClient(const std::string& host) +{ + // Note - this map is thread-local so no locking required + if (clients.find(host) == clients.end()) { + clients.insert( + std::pair>( + host, std::make_shared(host))); + + SPDLOG_TRACE("Created new point-to-point client {}", host); + } + + return clients.at(host); +} + +std::string getPointToPointKey(int groupId, int sendIdx, int recvIdx) +{ + return fmt::format("{}-{}-{}", groupId, sendIdx, recvIdx); +} + +std::string getPointToPointKey(int groupId, int recvIdx) +{ + return fmt::format("{}-{}", groupId, recvIdx); +} + +std::shared_ptr PointToPointGroup::getGroup(int groupId) +{ + if (groups.find(groupId) == groups.end()) { + SPDLOG_ERROR("Did not find group ID {} on this host", groupId); + throw std::runtime_error("Group ID not found on host"); + } + + return groups.at(groupId); +} + +bool PointToPointGroup::groupExists(int groupId) +{ + return groups.find(groupId) != groups.end(); +} + +void PointToPointGroup::addGroup(int appId, int groupId, int groupSize) +{ + groups.emplace(std::make_pair( + groupId, std::make_shared(appId, groupId, groupSize))); +} + +void PointToPointGroup::clear() +{ + groups.clear(); +} + +PointToPointGroup::PointToPointGroup(int appIdIn, + int groupIdIn, + int groupSizeIn) + : conf(faabric::util::getSystemConfig()) + , appId(appIdIn) + , groupId(groupIdIn) + , groupSize(groupSizeIn) + , ptpBroker(faabric::transport::getPointToPointBroker()) +{} + +void PointToPointGroup::lock(int groupIdx, bool recursive) +{ + std::string host = + ptpBroker.getHostForReceiver(groupId, POINT_TO_POINT_MASTER_IDX); + + if (host == conf.endpointHost) { + masterLock(groupIdx, recursive); + } else { + auto cli = getClient(host); + faabric::PointToPointMessage msg; + msg.set_groupid(groupId); + msg.set_sendidx(groupIdx); + msg.set_recvidx(POINT_TO_POINT_MASTER_IDX); + + SPDLOG_TRACE("Remote lock {}:{}:{} to {}", + groupId, + groupIdx, + POINT_TO_POINT_MASTER_IDX, + host); + + cli->groupLock(appId, groupId, groupIdx, recursive); + + // Await ptp response + ptpBroker.recvMessage(groupId, POINT_TO_POINT_MASTER_IDX, groupIdx); + } +} + +void PointToPointGroup::masterLock(int groupIdx, bool recursive) +{ + SPDLOG_TRACE("Master lock {}:{}", groupId, groupIdx); + + bool success = false; + { + faabric::util::UniqueLock lock(mx); + if (recursive) { + bool isFree = recursiveLockOwners.empty(); + + bool lockOwnedByThisIdx = + !isFree && (recursiveLockOwners.top() == groupIdx); + + if (isFree || lockOwnedByThisIdx) { + // Recursive and either free, or already locked by this idx + SPDLOG_TRACE("Group idx {} recursively locked {} ({})", + groupIdx, + groupId, + lockWaiters.size()); + recursiveLockOwners.push(groupIdx); + success = true; + } else { + SPDLOG_TRACE("Group idx {} unable to recursively lock {} ({})", + groupIdx, + groupId, + lockWaiters.size()); + } + } else if (lockOwnerIdx == NO_LOCK_OWNER_IDX) { + // Non-recursive and free + SPDLOG_TRACE("Group idx {} locked {}", groupIdx, groupId); + lockOwnerIdx = groupIdx; + success = true; + } else { + // Unable to lock, wait in queue + SPDLOG_TRACE("Group idx {} unable to lock {}", groupIdx, groupId); + lockWaiters.push(groupIdx); + } + } + + if (success) { + notifyLocked(groupIdx); + } +} + +void PointToPointGroup::localLock() +{ + LOCK_TIMEOUT(localMx, timeoutMs); +} + +bool PointToPointGroup::localTryLock() +{ + SPDLOG_TRACE("Trying local lock on {}", groupId); + return localMx.try_lock(); +} + +void PointToPointGroup::unlock(int groupIdx, bool recursive) { - return fmt::format("{}-{}-{}", appId, sendIdx, recvIdx); + std::string host = + ptpBroker.getHostForReceiver(groupId, POINT_TO_POINT_MASTER_IDX); + + if (host == conf.endpointHost) { + masterUnlock(groupIdx, recursive); + } else { + auto cli = getClient(host); + faabric::PointToPointMessage msg; + msg.set_groupid(groupId); + msg.set_sendidx(groupIdx); + msg.set_recvidx(POINT_TO_POINT_MASTER_IDX); + + SPDLOG_TRACE("Remote lock {}:{}:{} to {}", + groupId, + groupIdx, + POINT_TO_POINT_MASTER_IDX, + host); + + cli->groupUnlock(appId, groupId, groupIdx, recursive); + } } -std::string getPointToPointKey(int appId, int recvIdx) +void PointToPointGroup::masterUnlock(int groupIdx, bool recursive) { - return fmt::format("{}-{}", appId, recvIdx); + faabric::util::UniqueLock lock(mx); + + if (recursive) { + recursiveLockOwners.pop(); + + if (!recursiveLockOwners.empty()) { + return; + } + + if (!lockWaiters.empty()) { + recursiveLockOwners.push(lockWaiters.front()); + notifyLocked(lockWaiters.front()); + lockWaiters.pop(); + } + } else { + lockOwnerIdx = NO_LOCK_OWNER_IDX; + + if (!lockWaiters.empty()) { + lockOwnerIdx = lockWaiters.front(); + notifyLocked(lockWaiters.front()); + lockWaiters.pop(); + } + } +} + +void PointToPointGroup::localUnlock() +{ + localMx.unlock(); +} + +void PointToPointGroup::notifyLocked(int groupIdx) +{ + std::vector data(1, 0); + + ptpBroker.sendMessage( + groupId, POINT_TO_POINT_MASTER_IDX, groupIdx, data.data(), data.size()); +} + +void PointToPointGroup::barrier(int groupIdx) +{ + // TODO more efficient barrier implementation to avoid load on the master + if (groupIdx == POINT_TO_POINT_MASTER_IDX) { + // Receive from all + for (int i = 1; i < groupSize; i++) { + ptpBroker.recvMessage(groupId, i, POINT_TO_POINT_MASTER_IDX); + } + + // Reply to all + std::vector data(1, 0); + for (int i = 1; i < groupSize; i++) { + ptpBroker.sendMessage( + groupId, POINT_TO_POINT_MASTER_IDX, i, data.data(), data.size()); + } + } else { + // Do the send + std::vector data(1, 0); + ptpBroker.sendMessage(groupId, + groupIdx, + POINT_TO_POINT_MASTER_IDX, + data.data(), + data.size()); + + // Await the response + ptpBroker.recvMessage(groupId, POINT_TO_POINT_MASTER_IDX, groupIdx); + } +} + +void PointToPointGroup::notify(int groupIdx) +{ + if (groupIdx == POINT_TO_POINT_MASTER_IDX) { + for (int i = 1; i < groupSize; i++) { + ptpBroker.recvMessage(groupId, i, POINT_TO_POINT_MASTER_IDX); + } + } else { + std::vector data(1, 0); + ptpBroker.sendMessage(groupId, + groupIdx, + POINT_TO_POINT_MASTER_IDX, + data.data(), + data.size()); + } +} + +int PointToPointGroup::getLockOwner(bool recursive) +{ + if (recursive) { + if (!recursiveLockOwners.empty()) { + return recursiveLockOwners.top(); + } + + return NO_LOCK_OWNER_IDX; + } + + return lockOwnerIdx; } PointToPointBroker::PointToPointBroker() - : sch(faabric::scheduler::getScheduler()) + : conf(faabric::util::getSystemConfig()) {} -std::string PointToPointBroker::getHostForReceiver(int appId, int recvIdx) +std::string PointToPointBroker::getHostForReceiver(int groupId, int recvIdx) { faabric::util::SharedLock lock(brokerMutex); - std::string key = getPointToPointKey(appId, recvIdx); + std::string key = getPointToPointKey(groupId, recvIdx); if (mappings.find(key) == mappings.end()) { SPDLOG_ERROR( - "No point-to-point mapping for app {} idx {}", appId, recvIdx); + "No point-to-point mapping for group {} idx {}", groupId, recvIdx); throw std::runtime_error("No point-to-point mapping found"); } @@ -61,7 +327,9 @@ std::set PointToPointBroker::setUpLocalMappingsFromSchedulingDecision( const faabric::util::SchedulingDecision& decision) { - int appId = decision.appId; + int groupId = decision.groupId; + + // Prepare set of hosts in these mappings std::set hosts; { @@ -69,37 +337,45 @@ PointToPointBroker::setUpLocalMappingsFromSchedulingDecision( // Set up the mappings for (int i = 0; i < decision.nFunctions; i++) { - int recvIdx = decision.appIdxs.at(i); + int groupIdx = decision.groupIdxs.at(i); const std::string& host = decision.hosts.at(i); - SPDLOG_DEBUG("Setting point-to-point mapping {}:{} to {}", - appId, - recvIdx, + SPDLOG_DEBUG("Setting point-to-point mapping {}:{}:{} on {}", + decision.appId, + groupId, + groupIdx, host); - // Record this index for this app - appIdxs[appId].insert(recvIdx); + // Record this index for this group + groupIdIdxsMap[groupId].insert(groupIdx); // Add host mapping - std::string key = getPointToPointKey(appId, recvIdx); + std::string key = getPointToPointKey(groupId, groupIdx); mappings[key] = host; // If it's not this host, add to set of returned hosts - if (host != faabric::util::getSystemConfig().endpointHost) { + if (host != conf.endpointHost) { hosts.insert(host); } } + + // Register the group + PointToPointGroup::addGroup( + decision.appId, groupId, decision.nFunctions); } { - // Lock this app - std::unique_lock lock(appMappingMutexes[appId]); + // Lock this group + faabric::util::UniqueLock lock(groupMappingMutexes[groupId]); - // Enable the app - appMappingsFlags[appId] = true; + SPDLOG_TRACE( + "Enabling point-to-point mapping for {}:{}", decision.appId, groupId); + + // Enable the group + groupMappingsFlags[groupId] = true; // Notify waiters - appMappingCvs[appId].notify_all(); + groupMappingCvs[groupId].notify_all(); } return hosts; @@ -108,8 +384,6 @@ PointToPointBroker::setUpLocalMappingsFromSchedulingDecision( void PointToPointBroker::setAndSendMappingsFromSchedulingDecision( const faabric::util::SchedulingDecision& decision) { - int appId = decision.appId; - // Set up locally std::set otherHosts = setUpLocalMappingsFromSchedulingDecision(decision); @@ -117,20 +391,22 @@ void PointToPointBroker::setAndSendMappingsFromSchedulingDecision( // Send out to other hosts for (const auto& host : otherHosts) { faabric::PointToPointMappings msg; - msg.set_appid(appId); + msg.set_appid(decision.appId); + msg.set_groupid(decision.groupId); - std::set& indexes = appIdxs[appId]; + std::set& indexes = groupIdIdxsMap[decision.groupId]; for (int i = 0; i < decision.nFunctions; i++) { auto* mapping = msg.add_mappings(); mapping->set_host(decision.hosts.at(i)); mapping->set_messageid(decision.messageIds.at(i)); - mapping->set_recvidx(decision.appIdxs.at(i)); + mapping->set_appidx(decision.appIdxs.at(i)); + mapping->set_groupidx(decision.groupIdxs.at(i)); } SPDLOG_DEBUG("Sending {} point-to-point mappings for {} to {}", indexes.size(), - appId, + decision.groupId, host); auto cli = getClient(host); @@ -138,45 +414,54 @@ void PointToPointBroker::setAndSendMappingsFromSchedulingDecision( } } -void PointToPointBroker::waitForMappingsOnThisHost(int appId) +void PointToPointBroker::waitForMappingsOnThisHost(int groupId) { // Check if it's been enabled - if (!appMappingsFlags[appId]) { - // Lock this app - std::unique_lock lock(appMappingMutexes[appId]); - - // Wait for app to be enabled - auto timePoint = std::chrono::system_clock::now() + - std::chrono::milliseconds(MAPPING_TIMEOUT_MS); - - if (!appMappingCvs[appId].wait_until(lock, timePoint, [this, appId] { - return appMappingsFlags[appId]; - })) { + if (!groupMappingsFlags[groupId]) { + + // Lock this group + faabric::util::UniqueLock lock(groupMappingMutexes[groupId]); + + // Check again + if (!groupMappingsFlags[groupId]) { + // Wait for group to be enabled + auto timePoint = std::chrono::system_clock::now() + + std::chrono::milliseconds(MAPPING_TIMEOUT_MS); + + if (!groupMappingCvs[groupId].wait_until( + lock, timePoint, [this, groupId] { + return groupMappingsFlags[groupId]; + })) { + + SPDLOG_ERROR("Timed out waiting for group mappings {}", + groupId); + throw std::runtime_error( + "Timed out waiting for group mappings"); + } - SPDLOG_ERROR("Timed out waiting for app mappings {}", appId); - throw std::runtime_error("Timed out waiting for app mappings"); + SPDLOG_TRACE("Point-to-point mappings for {} ready", groupId); } } } -std::set PointToPointBroker::getIdxsRegisteredForApp(int appId) +std::set PointToPointBroker::getIdxsRegisteredForGroup(int groupId) { faabric::util::SharedLock lock(brokerMutex); - return appIdxs[appId]; + return groupIdIdxsMap[groupId]; } -void PointToPointBroker::sendMessage(int appId, +void PointToPointBroker::sendMessage(int groupId, int sendIdx, int recvIdx, const uint8_t* buffer, size_t bufferSize) { - waitForMappingsOnThisHost(appId); + waitForMappingsOnThisHost(groupId); - std::string host = getHostForReceiver(appId, recvIdx); + std::string host = getHostForReceiver(groupId, recvIdx); - if (host == faabric::util::getSystemConfig().endpointHost) { - std::string label = getPointToPointKey(appId, sendIdx, recvIdx); + if (host == conf.endpointHost) { + std::string label = getPointToPointKey(groupId, sendIdx, recvIdx); // Note - this map is thread-local so no locking required if (sendEndpoints.find(label) == sendEndpoints.end()) { @@ -188,7 +473,7 @@ void PointToPointBroker::sendMessage(int appId, } SPDLOG_TRACE("Local point-to-point message {}:{}:{} to {}", - appId, + groupId, sendIdx, recvIdx, sendEndpoints[label]->getAddress()); @@ -198,13 +483,13 @@ void PointToPointBroker::sendMessage(int appId, } else { auto cli = getClient(host); faabric::PointToPointMessage msg; - msg.set_appid(appId); + msg.set_groupid(groupId); msg.set_sendidx(sendIdx); msg.set_recvidx(recvIdx); msg.set_data(buffer, bufferSize); SPDLOG_TRACE("Remote point-to-point message {}:{}:{} to {}", - appId, + groupId, sendIdx, recvIdx, host); @@ -213,11 +498,11 @@ void PointToPointBroker::sendMessage(int appId, } } -std::vector PointToPointBroker::recvMessage(int appId, +std::vector PointToPointBroker::recvMessage(int groupId, int sendIdx, int recvIdx) { - std::string label = getPointToPointKey(appId, sendIdx, recvIdx); + std::string label = getPointToPointKey(groupId, sendIdx, recvIdx); // Note: this map is thread-local so no locking required if (recvEndpoints.find(label) == recvEndpoints.end()) { @@ -235,29 +520,18 @@ std::vector PointToPointBroker::recvMessage(int appId, return messageData.dataCopy(); } -std::shared_ptr PointToPointBroker::getClient( - const std::string& host) -{ - // Note - this map is thread-local so no locking required - if (clients.find(host) == clients.end()) { - clients[host] = std::make_shared(host); - - SPDLOG_TRACE("Created new point-to-point client {}", host); - } - - return clients[host]; -} - void PointToPointBroker::clear() { faabric::util::SharedLock lock(brokerMutex); - appIdxs.clear(); + groupIdIdxsMap.clear(); mappings.clear(); - appMappingMutexes.clear(); - appMappingsFlags.clear(); - appMappingCvs.clear(); + PointToPointGroup::clear(); + + groupMappingMutexes.clear(); + groupMappingsFlags.clear(); + groupMappingCvs.clear(); } void PointToPointBroker::resetThreadLocalCache() @@ -271,7 +545,7 @@ void PointToPointBroker::resetThreadLocalCache() PointToPointBroker& getPointToPointBroker() { - static PointToPointBroker reg; - return reg; + static PointToPointBroker broker; + return broker; } } diff --git a/src/transport/PointToPointClient.cpp b/src/transport/PointToPointClient.cpp index 872b82d3d..a89a728de 100644 --- a/src/transport/PointToPointClient.cpp +++ b/src/transport/PointToPointClient.cpp @@ -8,12 +8,19 @@ namespace faabric::transport { +static std::mutex mockMutex; + static std::vector> sentMappings; static std::vector> sentMessages; +static std::vector> + sentLockMessages; + std::vector> getSentMappings() { @@ -26,10 +33,19 @@ getSentPointToPointMessages() return sentMessages; } +std::vector> +getSentLockMessages() +{ + return sentLockMessages; +} + void clearSentMessages() { sentMappings.clear(); sentMessages.clear(); + sentLockMessages.clear(); } PointToPointClient::PointToPointClient(const std::string& hostIn) @@ -56,4 +72,73 @@ void PointToPointClient::sendMessage(faabric::PointToPointMessage& msg) asyncSend(PointToPointCall::MESSAGE, &msg); } } + +void PointToPointClient::makeCoordinationRequest( + int appId, + int groupId, + int groupIdx, + faabric::transport::PointToPointCall call) +{ + faabric::PointToPointMessage req; + req.set_appid(appId); + req.set_groupid(groupId); + req.set_sendidx(groupIdx); + req.set_recvidx(POINT_TO_POINT_MASTER_IDX); + + switch (call) { + case (faabric::transport::PointToPointCall::LOCK_GROUP): { + SPDLOG_TRACE("Requesting lock on {} at {}", groupId, host); + break; + } + case (faabric::transport::PointToPointCall::LOCK_GROUP_RECURSIVE): { + SPDLOG_TRACE( + "Requesting recursive lock on {} at {}", groupId, host); + break; + } + case (faabric::transport::PointToPointCall::UNLOCK_GROUP): { + SPDLOG_TRACE("Requesting unlock on {} at {}", groupId, host); + break; + } + case (faabric::transport::PointToPointCall::UNLOCK_GROUP_RECURSIVE): { + SPDLOG_TRACE( + "Requesting recurisve unlock on {} at {}", groupId, host); + break; + } + default: { + SPDLOG_ERROR("Invalid function group call {}", call); + throw std::runtime_error("Invalid function group call"); + } + } + + if (faabric::util::isMockMode()) { + faabric::util::UniqueLock lock(mockMutex); + sentLockMessages.emplace_back(host, call, req); + } else { + asyncSend(call, &req); + } +} + +void PointToPointClient::groupLock(int appId, + int groupId, + int groupIdx, + bool recursive) +{ + makeCoordinationRequest(appId, + groupId, + groupIdx, + recursive ? PointToPointCall::LOCK_GROUP_RECURSIVE + : PointToPointCall::LOCK_GROUP); +} + +void PointToPointClient::groupUnlock(int appId, + int groupId, + int groupIdx, + bool recursive) +{ + makeCoordinationRequest(appId, + groupId, + groupIdx, + recursive ? PointToPointCall::UNLOCK_GROUP_RECURSIVE + : PointToPointCall::UNLOCK_GROUP); +} } diff --git a/src/transport/PointToPointServer.cpp b/src/transport/PointToPointServer.cpp index d037bc67f..7791f0b0f 100644 --- a/src/transport/PointToPointServer.cpp +++ b/src/transport/PointToPointServer.cpp @@ -28,13 +28,29 @@ void PointToPointServer::doAsyncRecv(int header, PARSE_MSG(faabric::PointToPointMessage, buffer, bufferSize) // Send the message locally to the downstream socket - reg.sendMessage(msg.appid(), + reg.sendMessage(msg.groupid(), msg.sendidx(), msg.recvidx(), BYTES_CONST(msg.data().c_str()), msg.data().size()); break; } + case faabric::transport::PointToPointCall::LOCK_GROUP: { + recvGroupLock(buffer, bufferSize, false); + break; + } + case faabric::transport::PointToPointCall::LOCK_GROUP_RECURSIVE: { + recvGroupLock(buffer, bufferSize, true); + break; + } + case faabric::transport::PointToPointCall::UNLOCK_GROUP: { + recvGroupUnlock(buffer, bufferSize, false); + break; + } + case faabric::transport::PointToPointCall::UNLOCK_GROUP_RECURSIVE: { + recvGroupUnlock(buffer, bufferSize, true); + break; + } default: { SPDLOG_ERROR("Invalid aync point-to-point header: {}", header); throw std::runtime_error("Invalid async point-to-point message"); @@ -67,15 +83,44 @@ std::unique_ptr PointToPointServer::doRecvMappings( faabric::util::SchedulingDecision decision = faabric::util::SchedulingDecision::fromPointToPointMappings(msg); + SPDLOG_DEBUG("Receiving {} point-to-point mappings", decision.nFunctions); + reg.setUpLocalMappingsFromSchedulingDecision(decision); return std::make_unique(); } +void PointToPointServer::recvGroupLock(const uint8_t* buffer, + size_t bufferSize, + bool recursive) +{ + PARSE_MSG(faabric::PointToPointMessage, buffer, bufferSize) + SPDLOG_TRACE("Receiving lock on {} for idx {} (recursive {})", + msg.groupid(), + msg.sendidx(), + recursive); + + PointToPointGroup::getGroup(msg.groupid())->lock(msg.sendidx(), recursive); +} + +void PointToPointServer::recvGroupUnlock(const uint8_t* buffer, + size_t bufferSize, + bool recursive) +{ + PARSE_MSG(faabric::PointToPointMessage, buffer, bufferSize) + + SPDLOG_TRACE("Receiving unlock on {} for idx {} (recursive {})", + msg.groupid(), + msg.sendidx(), + recursive); + + PointToPointGroup::getGroup(msg.groupid()) + ->unlock(msg.sendidx(), recursive); +} + void PointToPointServer::onWorkerStop() { // Clear any thread-local cached sockets reg.resetThreadLocalCache(); } - } diff --git a/src/util/latch.cpp b/src/util/latch.cpp index 8d90e007d..c44962beb 100644 --- a/src/util/latch.cpp +++ b/src/util/latch.cpp @@ -21,6 +21,7 @@ void Latch::wait() waiters++; if (waiters > count) { + SPDLOG_ERROR("Latch already used: {} > {}", waiters, count); throw std::runtime_error("Latch already used"); } diff --git a/src/util/scheduling.cpp b/src/util/scheduling.cpp index 42c8e1bed..01a75e43a 100644 --- a/src/util/scheduling.cpp +++ b/src/util/scheduling.cpp @@ -1,34 +1,38 @@ #include namespace faabric::util { -SchedulingDecision::SchedulingDecision(uint32_t appIdIn) + +SchedulingDecision::SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn) : appId(appIdIn) + , groupId(groupIdIn) {} void SchedulingDecision::addMessage(const std::string& host, const faabric::Message& msg) { - addMessage(host, msg.id(), msg.appindex()); + addMessage(host, msg.id(), msg.appidx(), msg.groupidx()); } void SchedulingDecision::addMessage(const std::string& host, int32_t messageId, - int32_t appIdx) + int32_t appIdx, + int32_t groupIdx) { nFunctions++; hosts.emplace_back(host); messageIds.emplace_back(messageId); appIdxs.emplace_back(appIdx); + groupIdxs.emplace_back(groupIdx); } SchedulingDecision SchedulingDecision::fromPointToPointMappings( faabric::PointToPointMappings& mappings) { - SchedulingDecision decision(mappings.appid()); + SchedulingDecision decision(mappings.appid(), mappings.groupid()); for (const auto& m : mappings.mappings()) { - decision.addMessage(m.host(), m.messageid(), m.recvidx()); + decision.addMessage(m.host(), m.messageid(), m.appidx(), m.groupidx()); } return decision; diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index b862cbb4d..28436f7f4 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -113,7 +114,7 @@ int handleFakeDiffsThreadedFunction( for (int i = 0; i < nThreads; i++) { auto& m = req->mutable_messages()->at(i); - m.set_appindex(i); + m.set_appidx(i); m.set_inputdata(std::string("thread_" + std::to_string(i))); m.set_snapshotkey(snapshotKey); @@ -184,7 +185,7 @@ int handleFakeDiffsThreadedFunction( } } else { - int idx = msg.appindex(); + int idx = msg.appidx(); uint32_t offset = 2 * idx * faabric::util::HOST_PAGE_SIZE + 10; // Modify the executor's memory diff --git a/tests/dist/scheduler/test_funcs.cpp b/tests/dist/scheduler/test_funcs.cpp index 191aa547d..bc24963b1 100644 --- a/tests/dist/scheduler/test_funcs.cpp +++ b/tests/dist/scheduler/test_funcs.cpp @@ -1,6 +1,6 @@ -#include "faabric_utils.h" #include +#include "faabric_utils.h" #include "fixtures.h" #include "init.h" @@ -32,8 +32,9 @@ TEST_CASE_METHOD(DistTestsFixture, faabric::util::batchExecFactory("funcs", "simple", nFuncs); // Set up the expectation - faabric::util::SchedulingDecision expectedDecision( - req->messages().at(0).appid()); + const faabric::Message firstMsg = req->messages().at(0); + faabric::util::SchedulingDecision expectedDecision(firstMsg.appid(), + firstMsg.groupid()); expectedDecision.addMessage(thisHost, req->messages().at(0)); expectedDecision.addMessage(thisHost, req->messages().at(1)); expectedDecision.addMessage(otherHost, req->messages().at(2)); diff --git a/tests/dist/server.cpp b/tests/dist/server.cpp index 733111946..676723228 100644 --- a/tests/dist/server.cpp +++ b/tests/dist/server.cpp @@ -30,6 +30,10 @@ int main() faabric::runner::FaabricMain m(fac); m.startBackground(); + SPDLOG_INFO("---------------------------------"); + SPDLOG_INFO("Distributed test server started"); + SPDLOG_INFO("---------------------------------"); + // Note, endpoint will block until killed SPDLOG_INFO("Starting HTTP endpoint on worker"); faabric::endpoint::FaabricEndpoint endpoint; diff --git a/tests/dist/transport/functions.cpp b/tests/dist/transport/functions.cpp index 6a519113d..559fc75cd 100644 --- a/tests/dist/transport/functions.cpp +++ b/tests/dist/transport/functions.cpp @@ -7,6 +7,7 @@ #include #include #include +#include using namespace faabric::util; @@ -20,29 +21,30 @@ int handlePointToPointFunction( { faabric::Message& msg = req->mutable_messages()->at(msgIdx); - uint8_t appIdx = (uint8_t)msg.appindex(); + int groupId = msg.groupid(); + uint8_t groupIdx = (uint8_t)msg.groupidx(); faabric::transport::PointToPointBroker& broker = faabric::transport::getPointToPointBroker(); // Send to next index in ring and recv from previous in ring. - uint8_t minIdx = 1; + uint8_t minIdx = 0; uint8_t maxIdx = 3; - uint8_t sendToIdx = appIdx < maxIdx ? appIdx + 1 : minIdx; - uint8_t recvFromIdx = appIdx > minIdx ? appIdx - 1 : maxIdx; + uint8_t sendToIdx = groupIdx < maxIdx ? groupIdx + 1 : minIdx; + uint8_t recvFromIdx = groupIdx > minIdx ? groupIdx - 1 : maxIdx; // Send a series of our own index, expect to receive the same from other // senders - std::vector sendData(10, appIdx); + std::vector sendData(10, groupIdx); std::vector expectedRecvData(10, recvFromIdx); // Do the sending broker.sendMessage( - msg.appid(), appIdx, sendToIdx, sendData.data(), sendData.size()); + groupId, groupIdx, sendToIdx, sendData.data(), sendData.size()); // Do the receiving std::vector actualRecvData = - broker.recvMessage(msg.appid(), recvFromIdx, appIdx); + broker.recvMessage(groupId, recvFromIdx, groupIdx); // Check data is as expected if (actualRecvData != expectedRecvData) { @@ -55,9 +57,274 @@ int handlePointToPointFunction( return 0; } +class DistributedCoordinationTestRunner +{ + public: + DistributedCoordinationTestRunner(faabric::Message& msgIn, + const std::string& statePrefixIn, + int nChainedIn) + : msg(msgIn) + , statePrefix(statePrefixIn) + , nChained(nChainedIn) + , state(state::getGlobalState()) + { + for (int i = 0; i < nChained; i++) { + stateKeys.emplace_back(statePrefix + std::to_string(i)); + } + } + + std::vector getStateKeys() { return stateKeys; } + + std::vector setUpStateKeys() + { + for (int i = 0; i < nChained; i++) { + int initialValue = -1; + state.getKV(msg.user(), stateKeys.at(i), sizeof(int32_t)) + ->set(BYTES(&initialValue)); + } + + return stateKeys; + } + + void writeResultForIndex() + { + int idx = msg.groupidx(); + + faabric::state::State& state = state::getGlobalState(); + std::string stateKey = stateKeys.at(idx); + SPDLOG_DEBUG("{}/{} {} writing result to {}", + msg.user(), + msg.function(), + idx, + stateKey); + + std::shared_ptr kv = + state.getKV(msg.user(), stateKey, sizeof(int32_t)); + kv->set(BYTES(&idx)); + kv->pushFull(); + } + + int callChainedFunc(const std::string& func) + { + // Set up chained messages + auto chainReq = + faabric::util::batchExecFactory(msg.user(), func, nChained); + + for (int i = 0; i < nChained; i++) { + auto& m = chainReq->mutable_messages()->at(i); + + // Set app index and group data + m.set_appid(msg.appid()); + m.set_appidx(i); + + m.set_groupid(groupId); + m.set_groupidx(i); + m.set_groupsize(nChained); + + m.set_inputdata(msg.inputdata()); + } + + faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); + sch.callFunctions(chainReq); + + bool success = true; + for (const auto& m : chainReq->messages()) { + faabric::Message result = sch.getFunctionResult(m.id(), 10000); + if (result.returnvalue() != 0) { + SPDLOG_ERROR("Distributed coordination check call failed: {}", + m.id()); + + success = false; + } + } + + return success ? 0 : 1; + } + + int checkResults(std::vector expectedResults) + { + std::vector actualResults(expectedResults.size(), 0); + + // Load all results + for (int i = 0; i < expectedResults.size(); i++) { + auto idxKv = + state.getKV(msg.user(), stateKeys.at(i), sizeof(int32_t)); + idxKv->pull(); + + uint8_t* idxRawValue = idxKv->get(); + int actualIdxValue = *(int*)idxRawValue; + actualResults.at(i) = actualIdxValue; + } + + // Check them + if (actualResults != expectedResults) { + SPDLOG_ERROR("{}/{} {} check failed on host {} ({} != {})", + msg.user(), + msg.function(), + msg.groupidx(), + faabric::util::getSystemConfig().endpointHost, + faabric::util::vectorToString(expectedResults), + faabric::util::vectorToString(actualResults)); + return 1; + } + + SPDLOG_DEBUG("{} results for {}/{} ok", + expectedResults.size(), + msg.user(), + msg.function()); + + return 0; + } + + private: + faabric::Message& msg; + const std::string statePrefix; + int nChained = 0; + faabric::state::State& state; + + std::vector stateKeys; + + int groupId = 123; +}; + +int handleDistributedBarrier(faabric::scheduler::Executor* exec, + int threadPoolIdx, + int msgIdx, + std::shared_ptr req) +{ + faabric::Message& msg = req->mutable_messages()->at(msgIdx); + int nChainedFuncs = std::stoi(msg.inputdata()); + + DistributedCoordinationTestRunner runner( + msg, "barrier-test-", nChainedFuncs); + + runner.setUpStateKeys(); + + // Make request and wait for results + return runner.callChainedFunc("barrier-worker"); +} + +int handleDistributedBarrierWorker( + faabric::scheduler::Executor* exec, + int threadPoolIdx, + int msgIdx, + std::shared_ptr req) +{ + faabric::Message& msg = req->mutable_messages()->at(msgIdx); + + DistributedCoordinationTestRunner runner( + msg, "barrier-test-", msg.groupsize()); + + // Sleep for some time + int groupIdx = msg.groupidx(); + int waitMs = 500 * groupIdx; + SPDLOG_DEBUG("barrier-worker {} sleeping for {}ms", groupIdx, waitMs); + SLEEP_MS(waitMs); + + // Write result for this thread + runner.writeResultForIndex(); + + // Wait on a barrier + SPDLOG_DEBUG("barrier-worker {} waiting on barrier (size {})", + groupIdx, + msg.groupsize()); + + faabric::transport::PointToPointGroup::getGroup(msg.groupid()) + ->barrier(msg.groupidx()); + + // At this point all workers should have completed (i.e. everyone has had to + // wait on the barrier) + std::vector expectedResults; + for (int i = 0; i < msg.groupsize(); i++) { + expectedResults.push_back(i); + } + return runner.checkResults(expectedResults); +} + +int handleDistributedNotify(faabric::scheduler::Executor* exec, + int threadPoolIdx, + int msgIdx, + std::shared_ptr req) +{ + faabric::Message& msg = req->mutable_messages()->at(msgIdx); + int nChainedFuncs = std::stoi(msg.inputdata()); + + DistributedCoordinationTestRunner runner( + msg, "notify-test-", nChainedFuncs); + + runner.setUpStateKeys(); + + // Make request and wait for results + return runner.callChainedFunc("notify-worker"); +} + +int handleDistributedNotifyWorker( + faabric::scheduler::Executor* exec, + int threadPoolIdx, + int msgIdx, + std::shared_ptr req) +{ + faabric::Message& msg = req->mutable_messages()->at(msgIdx); + + DistributedCoordinationTestRunner runner( + msg, "notify-test-", msg.groupsize()); + + // Sleep for some time + int groupIdx = msg.groupidx(); + int waitMs = 1000 * groupIdx; + SPDLOG_DEBUG("notify-worker {} sleeping for {}ms", groupIdx, waitMs); + SLEEP_MS(waitMs); + + int returnValue = 0; + std::vector expectedResults; + if (msg.groupidx() == 0) { + // Master should wait until it's been notified + faabric::transport::PointToPointGroup::getGroup(msg.groupid()) + ->notify(msg.groupidx()); + + // Write result + runner.writeResultForIndex(); + + // Check that all other workers have finished + for (int i = 0; i < msg.groupsize(); i++) { + expectedResults.push_back(i); + } + returnValue = runner.checkResults(expectedResults); + + } else { + // Write the result for this worker + runner.writeResultForIndex(); + + // Check results before notifying + expectedResults = std::vector(msg.groupsize(), -1); + for (int i = 1; i <= msg.groupidx(); i++) { + expectedResults.at(i) = i; + } + + returnValue = runner.checkResults(expectedResults); + + // Notify + faabric::transport::PointToPointGroup::getGroup(msg.groupid()) + ->notify(msg.groupidx()); + } + + return returnValue; +} + void registerTransportTestFunctions() { registerDistTestExecutorCallback( "ptp", "simple", handlePointToPointFunction); + + registerDistTestExecutorCallback( + "ptp", "barrier", handleDistributedBarrier); + + registerDistTestExecutorCallback( + "ptp", "barrier-worker", handleDistributedBarrierWorker); + + registerDistTestExecutorCallback("ptp", "notify", handleDistributedNotify); + + registerDistTestExecutorCallback( + "ptp", "notify-worker", handleDistributedNotifyWorker); } } diff --git a/tests/dist/transport/test_point_to_point.cpp b/tests/dist/transport/test_point_to_point.cpp index 7d5e488ba..023d15e50 100644 --- a/tests/dist/transport/test_point_to_point.cpp +++ b/tests/dist/transport/test_point_to_point.cpp @@ -15,16 +15,19 @@ namespace tests { TEST_CASE_METHOD(DistTestsFixture, "Test point-to-point messaging on multiple hosts", - "[ptp]") + "[ptp][transport]") { std::set actualAvailable = sch.getAvailableHosts(); std::set expectedAvailable = { getMasterIP(), getWorkerIP() }; REQUIRE(actualAvailable == expectedAvailable); + int appId = 222; + int groupId = 333; + // Set up this host's resources // Make sure some functions execute remotely, some locally int nLocalSlots = 1; - int nFuncs = 3; + int nFuncs = 4; faabric::HostResources res; res.set_slots(nLocalSlots); @@ -34,24 +37,20 @@ TEST_CASE_METHOD(DistTestsFixture, std::shared_ptr req = faabric::util::batchExecFactory("ptp", "simple", nFuncs); - // Double check app id - int appId = req->messages().at(0).appid(); - REQUIRE(appId > 0); - - faabric::transport::PointToPointBroker& broker = - faabric::transport::getPointToPointBroker(); - - std::vector expectedHosts = { getMasterIP(), - getWorkerIP(), - getWorkerIP() }; + // Prepare expected decision + faabric::util::SchedulingDecision expectedDecision(appId, groupId); + std::vector expectedHosts = { + getMasterIP(), getWorkerIP(), getWorkerIP(), getWorkerIP() + }; // Set up individual messages - // Note that this thread is acting as app index 0 - faabric::util::SchedulingDecision expectedDecision(appId); for (int i = 0; i < nFuncs; i++) { faabric::Message& msg = req->mutable_messages()->at(i); - msg.set_appindex(i + 1); + msg.set_appid(appId); + msg.set_appidx(i); + msg.set_groupid(groupId); + msg.set_groupidx(i); // Add to expected decision expectedDecision.addMessage(expectedHosts.at(i), req->messages().at(i)); @@ -61,9 +60,6 @@ TEST_CASE_METHOD(DistTestsFixture, faabric::util::SchedulingDecision actualDecision = sch.callFunctions(req); checkSchedulingDecisionEquality(actualDecision, expectedDecision); - // Set up point-to-point mappings - broker.setAndSendMappingsFromSchedulingDecision(actualDecision); - // Check functions executed successfully for (int i = 0; i < nFuncs; i++) { faabric::Message& m = req->mutable_messages()->at(i); @@ -72,4 +68,39 @@ TEST_CASE_METHOD(DistTestsFixture, REQUIRE(m.returnvalue() == 0); } } + +TEST_CASE_METHOD(DistTestsFixture, + "Test distributed coordination", + "[ptp][transport]") +{ + // Set up this host's resources, force execution across hosts + int nChainedFuncs = 4; + int nLocalSlots = 2; + + faabric::HostResources res; + res.set_slots(nLocalSlots); + sch.setThisHostResources(res); + + std::string function; + SECTION("Barrier") { function = "barrier"; } + + SECTION("Notify") { function = "notify"; } + + // Set up the message + std::shared_ptr req = + faabric::util::batchExecFactory("ptp", function, 1); + + // Set number of chained funcs + faabric::Message& m = req->mutable_messages()->at(0); + m.set_inputdata(std::to_string(nChainedFuncs)); + + // Call the function + std::vector expectedHosts = { getMasterIP() }; + std::vector executedHosts = sch.callFunctions(req).hosts; + REQUIRE(expectedHosts == executedHosts); + + // Get result + faabric::Message result = sch.getFunctionResult(m.id(), 10000); + REQUIRE(result.returnvalue() == 0); +} } diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 02028c551..4dbf2983b 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -1,6 +1,7 @@ -#include "faabric_utils.h" #include +#include "faabric_utils.h" + #include #include @@ -126,7 +127,7 @@ class TestExecutor final : public Executor for (int i = 0; i < chainedReq->messages_size(); i++) { faabric::Message& m = chainedReq->mutable_messages()->at(i); m.set_snapshotkey(snapKey); - m.set_appindex(i + 1); + m.set_appidx(i + 1); } // Call the threads @@ -349,7 +350,7 @@ TEST_CASE_METHOD(TestExecutorFixture, for (int i = 0; i < nThreads; i++) { faabric::Message& msg = req->mutable_messages()->at(i); msg.set_snapshotkey(snapshotKey); - msg.set_appindex(i); + msg.set_appidx(i); messageIds.emplace_back(req->messages().at(i).id()); } @@ -723,7 +724,7 @@ TEST_CASE_METHOD(TestExecutorFixture, faabric::Message& msg = req->mutable_messages()->at(i); msg.set_snapshotkey(snapshotKey); msg.set_masterhost(otherHost); - msg.set_appindex(i); + msg.set_appidx(i); messageIds.emplace_back(msg.id()); } diff --git a/tests/test/scheduler/test_function_client_server.cpp b/tests/test/scheduler/test_function_client_server.cpp index c7e982540..747ccd34d 100644 --- a/tests/test/scheduler/test_function_client_server.cpp +++ b/tests/test/scheduler/test_function_client_server.cpp @@ -1,5 +1,8 @@ #include +#include "faabric_utils.h" +#include "fixtures.h" + #include #include @@ -16,7 +19,6 @@ #include #include #include -#include using namespace faabric::scheduler; @@ -25,12 +27,18 @@ class ClientServerFixture : public RedisTestFixture , public SchedulerTestFixture , public StateTestFixture + , public PointToPointTestFixture + , public ConfTestFixture { protected: FunctionCallServer server; FunctionCallClient cli; + std::shared_ptr executorFactory; + int groupId = 123; + int groupSize = 2; + public: ClientServerFixture() : cli(LOCALHOST) diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 19b2feadf..edf9f83b8 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -2,6 +2,7 @@ #include "DummyExecutorFactory.h" #include "faabric_utils.h" +#include "fixtures.h" #include #include @@ -10,6 +11,8 @@ #include #include #include +#include +#include #include #include #include @@ -77,6 +80,7 @@ class DummyExecutorFixture : public RedisTestFixture , public SchedulerTestFixture , public ConfTestFixture + , public PointToPointTestFixture { public: DummyExecutorFixture() @@ -257,15 +261,16 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") reqOne->set_subtype(expectedSubType); reqOne->set_contextdata(expectedContextData); - faabric::util::SchedulingDecision expectedDecisionOne( - reqOne->messages().at(0).appid()); + const faabric::Message firstMsg = reqOne->messages().at(0); + faabric::util::SchedulingDecision expectedDecisionOne(firstMsg.appid(), + firstMsg.groupid()); for (int i = 0; i < nCallsOne; i++) { // Set snapshot key faabric::Message& msg = reqOne->mutable_messages()->at(i); msg.set_snapshotkey(expectedSnapshot); // Set app index - msg.set_appindex(i); + msg.set_appidx(i); // Expect this host to handle up to its number of cores bool isThisHost = i < thisCores; @@ -336,8 +341,9 @@ TEST_CASE_METHOD(SlowExecutorFixture, "Test batch scheduling", "[scheduler]") // Now schedule a second batch and check they're all sent to the other host std::shared_ptr reqTwo = faabric::util::batchExecFactory("foo", "bar", nCallsTwo); - faabric::util::SchedulingDecision expectedDecisionTwo( - reqTwo->messages().at(0).appid()); + const faabric::Message& firstMsg2 = reqTwo->messages().at(0); + faabric::util::SchedulingDecision expectedDecisionTwo(firstMsg2.appid(), + firstMsg2.groupid()); for (int i = 0; i < nCallsTwo; i++) { faabric::Message& msg = reqTwo->mutable_messages()->at(i); msg.set_snapshotkey(expectedSnapshot); @@ -434,8 +440,9 @@ TEST_CASE_METHOD(SlowExecutorFixture, faabric::util::batchExecFactory("foo", "bar", nCalls); req->set_type(execMode); - faabric::util::SchedulingDecision expectedDecision( - req->messages().at(0).appid()); + const faabric::Message firstMsg = req->messages().at(0); + faabric::util::SchedulingDecision expectedDecision(firstMsg.appid(), + firstMsg.groupid()); for (int i = 0; i < nCalls; i++) { faabric::Message& msg = req->mutable_messages()->at(i); msg.set_snapshotkey(expectedSnapshot); @@ -460,7 +467,6 @@ TEST_CASE_METHOD(SlowExecutorFixture, expectedExecutors = expectedLocalCalls; } - faabric::Message firstMsg = req->messages().at(0); REQUIRE(sch.getFunctionExecutorCount(firstMsg) == expectedExecutors); } @@ -828,4 +834,114 @@ TEST_CASE_METHOD(DummyExecutorFixture, "Test executor reuse", "[scheduler]") // Check executor count is still the same REQUIRE(sch.getFunctionExecutorCount(msgA) == 2); } + +TEST_CASE_METHOD(DummyExecutorFixture, + "Test point-to-point mappings sent from scheduler", + "[scheduler]") +{ + faabric::util::setMockMode(true); + + std::string thisHost = conf.endpointHost; + std::string otherHost = "foobar"; + + sch.addHostToGlobalSet(otherHost); + + // Set resources for this host + int nSlotsThisHost = 2; + faabric::HostResources resourcesThisHost; + resourcesThisHost.set_slots(nSlotsThisHost); + sch.setThisHostResources(resourcesThisHost); + + // Set resources for other host + int nSlotsOtherHost = 5; + faabric::HostResources resourcesOtherHost; + resourcesOtherHost.set_slots(nSlotsOtherHost); + faabric::scheduler::queueResourceResponse(otherHost, resourcesOtherHost); + + // Set up request + auto req = faabric::util::batchExecFactory("foo", "bar", 4); + faabric::Message& firstMsg = req->mutable_messages()->at(0); + + int appId = firstMsg.appid(); + int groupId = 0; + bool forceLocal = false; + bool expectMappingsSent = false; + + SECTION("No group ID") + { + groupId = 0; + + SECTION("Force local") + { + forceLocal = true; + expectMappingsSent = false; + } + + SECTION("No force local") + { + forceLocal = false; + expectMappingsSent = false; + } + } + + SECTION("With group ID") + { + groupId = 123; + + SECTION("Force local") + { + forceLocal = true; + expectMappingsSent = false; + } + + SECTION("No force local") + { + forceLocal = false; + expectMappingsSent = true; + } + } + + std::vector expectedHosts = { + thisHost, thisHost, otherHost, otherHost + }; + if (forceLocal) { + expectedHosts = { thisHost, thisHost, thisHost, thisHost }; + } + + faabric::util::SchedulingDecision expectedDecision(appId, groupId); + + for (int i = 0; i < req->messages().size(); i++) { + faabric::Message& m = req->mutable_messages()->at(i); + m.set_groupid(groupId); + m.set_groupidx(i); + + expectedDecision.addMessage(expectedHosts.at(i), req->messages().at(i)); + } + + // Schedule and check decision + faabric::util::SchedulingDecision actualDecision = + sch.callFunctions(req, forceLocal); + checkSchedulingDecisionEquality(expectedDecision, actualDecision); + + // Check mappings set up locally or not + faabric::transport::PointToPointBroker& broker = + faabric::transport::getPointToPointBroker(); + std::set registeredIdxs = broker.getIdxsRegisteredForGroup(groupId); + if (expectMappingsSent) { + REQUIRE(registeredIdxs.size() == 4); + } else { + REQUIRE(registeredIdxs.empty()); + } + + // Check mappings sent or not + std::vector> + sentMappings = faabric::transport::getSentMappings(); + + if (expectMappingsSent) { + REQUIRE(sentMappings.size() == 1); + REQUIRE(sentMappings.at(0).first == otherHost); + } else { + REQUIRE(sentMappings.empty()); + } +} } diff --git a/tests/test/snapshot/test_snapshot_client_server.cpp b/tests/test/snapshot/test_snapshot_client_server.cpp index aaf1cfa7f..fa86b703e 100644 --- a/tests/test/snapshot/test_snapshot_client_server.cpp +++ b/tests/test/snapshot/test_snapshot_client_server.cpp @@ -1,6 +1,7 @@ +#include + #include "faabric_utils.h" #include "fixtures.h" -#include #include @@ -22,6 +23,7 @@ class SnapshotClientServerFixture : public SchedulerTestFixture , public RedisTestFixture , public SnapshotTestFixture + , public PointToPointTestFixture { protected: faabric::snapshot::SnapshotServer server; @@ -35,6 +37,17 @@ class SnapshotClientServerFixture } ~SnapshotClientServerFixture() { server.stop(); } + + void setUpFunctionGroup(int appId, int groupId) + { + faabric::util::SchedulingDecision decision(appId, groupId); + faabric::Message msg = faabric::util::messageFactory("foo", "bar"); + msg.set_appid(appId); + msg.set_groupid(groupId); + + decision.addMessage(LOCALHOST, msg); + broker.setUpLocalMappingsFromSchedulingDecision(decision); + } }; TEST_CASE_METHOD(ConfTestFixture, @@ -71,9 +84,16 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, snapA.data = dataA.data(); snapB.data = dataB.data(); + // One request with no group + int appId = 111; + int groupIdA = 0; + int groupIdB = 123; + + setUpFunctionGroup(appId, groupIdB); + // Send the message - cli.pushSnapshot(snapKeyA, snapA); - cli.pushSnapshot(snapKeyB, snapB); + cli.pushSnapshot(snapKeyA, groupIdA, snapA); + cli.pushSnapshot(snapKeyB, groupIdB, snapB); // Check snapshots created in registry REQUIRE(reg.getSnapshotCount() == 2); @@ -107,6 +127,15 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, "Test push snapshot diffs", "[snapshot]") { + std::string thisHost = faabric::util::getSystemConfig().endpointHost; + + // One request with no group, another with a group we must initialise + int appId = 111; + int groupIdA = 0; + int groupIdB = 234; + + setUpFunctionGroup(appId, groupIdB); + // Set up a snapshot std::string snapKey = std::to_string(faabric::util::generateGid()); faabric::util::SnapshotData snap = takeSnapshot(snapKey, 5, true); @@ -123,12 +152,12 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, faabric::util::SnapshotDiff diffA2( 2 * faabric::util::HOST_PAGE_SIZE, diffDataA2.data(), diffDataA2.size()); diffsA = { diffA1, diffA2 }; - cli.pushSnapshotDiffs(snapKey, diffsA); + cli.pushSnapshotDiffs(snapKey, groupIdA, diffsA); faabric::util::SnapshotDiff diffB( 3 * faabric::util::HOST_PAGE_SIZE, diffDataB.data(), diffDataB.size()); diffsB = { diffB }; - cli.pushSnapshotDiffs(snapKey, diffsB); + cli.pushSnapshotDiffs(snapKey, groupIdB, diffsB); // Check changes have been applied checkDiffsApplied(snap.data, diffsA); @@ -178,7 +207,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, diffA2.dataType = faabric::util::SnapshotDataType::Int; diffs = { diffA1, diffA2 }; - cli.pushSnapshotDiffs(snapKey, diffs); + cli.pushSnapshotDiffs(snapKey, 0, diffs); // Check diffs have been applied according to the merge operations REQUIRE(*basePtrA1 == baseA1 + diffIntA1); @@ -290,7 +319,7 @@ TEST_CASE_METHOD(SnapshotClientServerFixture, diff.dataType = dataType; std::vector diffs = { diff }; - cli.pushSnapshotDiffs(snapKey, diffs); + cli.pushSnapshotDiffs(snapKey, 0, diffs); // Check data is as expected std::vector actualData(snap.data + offset, diff --git a/tests/test/transport/test_point_to_point.cpp b/tests/test/transport/test_point_to_point.cpp index 5cfb9c1f5..01d58340d 100644 --- a/tests/test/transport/test_point_to_point.cpp +++ b/tests/test/transport/test_point_to_point.cpp @@ -17,68 +17,63 @@ using namespace faabric::util; namespace tests { -class PointToPointClientServerFixture - : public PointToPointTestFixture - , SchedulerTestFixture -{ - public: - PointToPointClientServerFixture() - : cli(LOCALHOST) - { - server.start(); - } - - ~PointToPointClientServerFixture() { server.stop(); } - - protected: - faabric::transport::PointToPointClient cli; - faabric::transport::PointToPointServer server; -}; - TEST_CASE_METHOD(PointToPointClientServerFixture, "Test sending point-to-point mappings from client", "[transport][ptp]") { int appIdA = 123; + int groupIdA = 321; int appIdB = 345; + int groupIdB = 543; - int idxA1 = 1; - int idxA2 = 2; - int idxB1 = 1; + // Deliberately overlap these indexes to check that the app and group IDs + // matter + int appIdxA1 = 1; + int appIdxA2 = 2; + int appIdxB1 = 1; + + int groupIdxA1 = 3; + int groupIdxA2 = 4; + int groupIdxB1 = 3; std::string hostA = "host-a"; std::string hostB = "host-b"; - REQUIRE(broker.getIdxsRegisteredForApp(appIdA).empty()); - REQUIRE(broker.getIdxsRegisteredForApp(appIdB).empty()); + REQUIRE(broker.getIdxsRegisteredForGroup(appIdA).empty()); + REQUIRE(broker.getIdxsRegisteredForGroup(appIdB).empty()); faabric::PointToPointMappings mappingsA; mappingsA.set_appid(appIdA); + mappingsA.set_groupid(groupIdA); faabric::PointToPointMappings mappingsB; mappingsB.set_appid(appIdB); + mappingsB.set_groupid(groupIdB); auto* mappingA1 = mappingsA.add_mappings(); - mappingA1->set_recvidx(idxA1); + mappingA1->set_appidx(appIdxA1); + mappingA1->set_groupidx(groupIdxA1); mappingA1->set_host(hostA); auto* mappingA2 = mappingsA.add_mappings(); - mappingA2->set_recvidx(idxA2); + mappingA2->set_appidx(appIdxA2); + mappingA2->set_groupidx(groupIdxA2); mappingA2->set_host(hostB); auto* mappingB1 = mappingsB.add_mappings(); - mappingB1->set_recvidx(idxB1); + mappingB1->set_appidx(appIdxB1); + mappingB1->set_groupidx(groupIdxB1); mappingB1->set_host(hostA); cli.sendMappings(mappingsA); cli.sendMappings(mappingsB); - REQUIRE(broker.getIdxsRegisteredForApp(appIdA).size() == 2); - REQUIRE(broker.getIdxsRegisteredForApp(appIdB).size() == 1); + REQUIRE(broker.getIdxsRegisteredForGroup(groupIdA).size() == 2); + REQUIRE(broker.getIdxsRegisteredForGroup(groupIdB).size() == 1); - REQUIRE(broker.getHostForReceiver(appIdA, idxA1) == hostA); - REQUIRE(broker.getHostForReceiver(appIdA, idxA2) == hostB); - REQUIRE(broker.getHostForReceiver(appIdB, idxB1) == hostA); + REQUIRE(broker.getHostForReceiver(groupIdA, groupIdxA1) == hostA); + REQUIRE(broker.getHostForReceiver(groupIdA, groupIdxA2) == hostB); + REQUIRE(broker.getHostForReceiver(groupIdB, groupIdxB1) == hostA); } TEST_CASE_METHOD(PointToPointClientServerFixture, @@ -86,6 +81,7 @@ TEST_CASE_METHOD(PointToPointClientServerFixture, "[transport][ptp]") { int appId = 123; + int groupId = 345; int idxA = 5; int idxB = 10; @@ -94,19 +90,22 @@ TEST_CASE_METHOD(PointToPointClientServerFixture, conf.endpointHost = LOCALHOST; // Register both indexes on this host - faabric::util::SchedulingDecision decision(appId); + faabric::util::SchedulingDecision decision(appId, groupId); faabric::Message msgA = faabric::util::messageFactory("foo", "bar"); msgA.set_appid(appId); - msgA.set_appindex(idxA); + msgA.set_groupid(groupId); + msgA.set_groupidx(idxA); faabric::Message msgB = faabric::util::messageFactory("foo", "bar"); msgB.set_appid(appId); - msgB.set_appindex(idxB); + msgB.set_groupid(groupId); + msgB.set_groupidx(idxB); decision.addMessage(LOCALHOST, msgA); decision.addMessage(LOCALHOST, msgB); + // Set up the mappings broker.setAndSendMappingsFromSchedulingDecision(decision); std::vector sentDataA = { 0, 1, 2, 3 }; @@ -116,23 +115,23 @@ TEST_CASE_METHOD(PointToPointClientServerFixture, // Make sure we send the message before a receiver is available to check // async handling - broker.sendMessage(appId, idxA, idxB, sentDataA.data(), sentDataA.size()); + broker.sendMessage(groupId, idxA, idxB, sentDataA.data(), sentDataA.size()); - std::thread t([appId, idxA, idxB, &receivedDataA, &sentDataB] { + std::thread t([groupId, idxA, idxB, &receivedDataA, &sentDataB] { PointToPointBroker& broker = getPointToPointBroker(); // Receive the first message - receivedDataA = broker.recvMessage(appId, idxA, idxB); + receivedDataA = broker.recvMessage(groupId, idxA, idxB); // Send a message back broker.sendMessage( - appId, idxB, idxA, sentDataB.data(), sentDataB.size()); + groupId, idxB, idxA, sentDataB.data(), sentDataB.size()); broker.resetThreadLocalCache(); }); // Receive the message sent back - receivedDataB = broker.recvMessage(appId, idxB, idxA); + receivedDataB = broker.recvMessage(groupId, idxB, idxA); if (t.joinable()) { t.join(); @@ -151,6 +150,9 @@ TEST_CASE_METHOD( { faabric::util::setMockMode(true); + int appId = 111; + int groupId = 222; + std::string hostA = "hostA"; std::string hostB = "hostB"; std::string hostC = "hostC"; @@ -158,7 +160,14 @@ TEST_CASE_METHOD( int nMessages = 6; auto req = batchExecFactory("foo", "bar", nMessages); for (int i = 0; i < nMessages; i++) { - req->mutable_messages()->at(i).set_appindex(i); + faabric::Message& m = req->mutable_messages()->at(i); + + m.set_appid(appId); + m.set_groupid(groupId); + + // Deliberately don't share app and group idxs + m.set_appidx(i + 10); + m.set_groupidx(i); } faabric::Message& msgA = req->mutable_messages()->at(0); @@ -168,8 +177,7 @@ TEST_CASE_METHOD( faabric::Message& msgE = req->mutable_messages()->at(4); faabric::Message& msgF = req->mutable_messages()->at(5); - int appId = msgA.appid(); - SchedulingDecision decision(appId); + SchedulingDecision decision(appId, groupId); decision.addMessage(hostB, msgA); decision.addMessage(hostA, msgB); decision.addMessage(hostC, msgC); @@ -181,12 +189,12 @@ TEST_CASE_METHOD( broker.setAndSendMappingsFromSchedulingDecision(decision); // Check locally - REQUIRE(broker.getHostForReceiver(appId, msgA.appindex()) == hostB); - REQUIRE(broker.getHostForReceiver(appId, msgB.appindex()) == hostA); - REQUIRE(broker.getHostForReceiver(appId, msgC.appindex()) == hostC); - REQUIRE(broker.getHostForReceiver(appId, msgD.appindex()) == hostB); - REQUIRE(broker.getHostForReceiver(appId, msgE.appindex()) == hostB); - REQUIRE(broker.getHostForReceiver(appId, msgF.appindex()) == hostC); + REQUIRE(broker.getHostForReceiver(groupId, msgA.groupidx()) == hostB); + REQUIRE(broker.getHostForReceiver(groupId, msgB.groupidx()) == hostA); + REQUIRE(broker.getHostForReceiver(groupId, msgC.groupidx()) == hostC); + REQUIRE(broker.getHostForReceiver(groupId, msgD.groupidx()) == hostB); + REQUIRE(broker.getHostForReceiver(groupId, msgE.groupidx()) == hostB); + REQUIRE(broker.getHostForReceiver(groupId, msgF.groupidx()) == hostC); // Check the mappings have been sent out to the relevant hosts auto actualSent = getSentMappings(); @@ -200,36 +208,56 @@ TEST_CASE_METHOD( -> bool { return a.first < b.first; }); std::vector expectedHosts = { hostA, hostB, hostC }; - std::set expectedHostAIdxs = { msgB.appindex() }; - std::set expectedHostBIdxs = { msgA.appindex(), - msgD.appindex(), - msgE.appindex() }; - std::set expectedHostCIdxs = { msgC.appindex(), msgF.appindex() }; + std::set expectedAppIdxsA = { msgB.appidx() }; + std::set expectedAppIdxsB = { msgA.appidx(), + msgD.appidx(), + msgE.appidx() }; + std::set expectedAppIdxsC = { msgC.appidx(), msgF.appidx() }; + + std::set expectedGroupIdxsA = { msgB.groupidx() }; + std::set expectedGroupIdxsB = { msgA.groupidx(), + msgD.groupidx(), + msgE.groupidx() }; + std::set expectedGroupIdxsC = { msgC.groupidx(), msgF.groupidx() }; // Check all mappings are the same for (int i = 0; i < 3; i++) { REQUIRE(actualSent.at(i).first == expectedHosts.at(i)); faabric::PointToPointMappings actual = actualSent.at(i).second; - std::set hostAIdxs; - std::set hostBIdxs; - std::set hostCIdxs; + REQUIRE(actual.appid() == appId); + REQUIRE(actual.groupid() == groupId); + + std::set appIdxsA; + std::set appIdxsB; + std::set appIdxsC; + + std::set groupIdxsA; + std::set groupIdxsB; + std::set groupIdxsC; for (const auto& m : actual.mappings()) { if (m.host() == hostA) { - hostAIdxs.insert(m.recvidx()); + appIdxsA.insert(m.appidx()); + groupIdxsA.insert(m.groupidx()); } else if (m.host() == hostB) { - hostBIdxs.insert(m.recvidx()); + appIdxsB.insert(m.appidx()); + groupIdxsB.insert(m.groupidx()); } else if (m.host() == hostC) { - hostCIdxs.insert(m.recvidx()); + appIdxsC.insert(m.appidx()); + groupIdxsC.insert(m.groupidx()); } else { FAIL(); } } - REQUIRE(hostAIdxs == expectedHostAIdxs); - REQUIRE(hostBIdxs == expectedHostBIdxs); - REQUIRE(hostCIdxs == expectedHostCIdxs); + REQUIRE(appIdxsA == expectedAppIdxsA); + REQUIRE(appIdxsB == expectedAppIdxsB); + REQUIRE(appIdxsC == expectedAppIdxsC); + + REQUIRE(groupIdxsA == expectedGroupIdxsA); + REQUIRE(groupIdxsB == expectedGroupIdxsB); + REQUIRE(groupIdxsC == expectedGroupIdxsC); } } @@ -238,10 +266,15 @@ TEST_CASE_METHOD(PointToPointClientServerFixture, "[transport][ptp]") { int appId = 123; + int groupId = 345; std::atomic sharedInt = 5; - faabric::util::SchedulingDecision decision(appId); + faabric::util::SchedulingDecision decision(appId, groupId); + faabric::Message msg = faabric::util::messageFactory("foo", "bar"); + msg.set_appid(appId); + msg.set_groupid(groupId); + decision.addMessage(faabric::util::getSystemConfig().endpointHost, msg); // Background thread that will eventually enable the app and change the @@ -253,16 +286,85 @@ TEST_CASE_METHOD(PointToPointClientServerFixture, sharedInt.fetch_add(100); }); - broker.waitForMappingsOnThisHost(appId); + broker.waitForMappingsOnThisHost(groupId); // The sum won't have happened yet if this thread hasn't been forced to wait REQUIRE(sharedInt == 105); // Call again and check it doesn't block - broker.waitForMappingsOnThisHost(appId); + broker.waitForMappingsOnThisHost(groupId); if (t.joinable()) { t.join(); } } + +TEST_CASE_METHOD(PointToPointClientServerFixture, + "Test distributed lock/ unlock", + "[transport][ptp]") +{ + int appId = 999; + int groupId = 888; + int groupSize = 2; + int groupIdx = 1; + + std::string thisHost = faabric::util::getSystemConfig().endpointHost; + + // Set up mappings + faabric::Message msg = faabric::util::messageFactory("foo", "bar"); + msg.set_appid(appId); + msg.set_groupsize(groupSize); + msg.set_groupid(groupId); + msg.set_groupidx(groupIdx); + + faabric::Message rootMsg = faabric::util::messageFactory("foo", "bar"); + rootMsg.set_appid(appId); + rootMsg.set_groupsize(groupSize); + rootMsg.set_groupid(groupId); + rootMsg.set_groupidx(POINT_TO_POINT_MASTER_IDX); + + faabric::util::SchedulingDecision decision(appId, groupId); + decision.addMessage(thisHost, msg); + decision.addMessage(thisHost, rootMsg); + + broker.setUpLocalMappingsFromSchedulingDecision(decision); + + // Do both recursive and non-recursive + bool recursive = false; + int nCalls = 1; + + SECTION("Recursive") + { + // Make sure we have enough calls here to flush out any issues + recursive = true; + nCalls = 1000; + } + + SECTION("Non-recursive") + { + recursive = false; + nCalls = 1; + } + + auto group = PointToPointGroup::getGroup(groupId); + REQUIRE(group->getLockOwner(recursive) == -1); + + for (int i = 0; i < nCalls; i++) { + server.setRequestLatch(); + cli.groupLock(appId, groupId, groupIdx, recursive); + server.awaitRequestLatch(); + + broker.recvMessage(groupId, POINT_TO_POINT_MASTER_IDX, groupIdx); + } + + REQUIRE(group->getLockOwner(recursive) == groupIdx); + + for (int i = 0; i < nCalls; i++) { + server.setRequestLatch(); + cli.groupUnlock(appId, groupId, groupIdx, recursive); + server.awaitRequestLatch(); + } + + REQUIRE(group->getLockOwner(recursive) == -1); +} } diff --git a/tests/test/transport/test_point_to_point_groups.cpp b/tests/test/transport/test_point_to_point_groups.cpp new file mode 100644 index 000000000..d0c4bfb33 --- /dev/null +++ b/tests/test/transport/test_point_to_point_groups.cpp @@ -0,0 +1,333 @@ +#include + +#include "faabric_utils.h" +#include "fixtures.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace faabric::transport; + +#define CAPTURE_ERR_MSG(msgVar, op) \ + try { \ + op; \ + } catch (std::runtime_error & ex) { \ + errMsg = ex.what(); \ + } + +namespace tests { + +class PointToPointGroupFixture + : public ConfTestFixture + , public PointToPointClientServerFixture +{ + public: + PointToPointGroupFixture() + : thisHost(conf.endpointHost) + { + faabric::util::setMockMode(true); + } + + ~PointToPointGroupFixture() + { + faabric::scheduler::clearMockRequests(); + faabric::util::setMockMode(false); + } + + std::shared_ptr setUpGroup(int appId, + int groupId, + int groupSize) + { + req = faabric::util::batchExecFactory("foo", "bar", groupSize); + + faabric::util::SchedulingDecision decision(appId, groupId); + + for (int i = 0; i < groupSize; i++) { + auto& msg = req->mutable_messages()->at(i); + msg.set_appid(appId); + msg.set_groupid(groupId); + msg.set_appidx(i); + msg.set_groupidx(i); + + decision.addMessage(thisHost, msg); + } + + broker.setUpLocalMappingsFromSchedulingDecision(decision); + + return PointToPointGroup::getGroup(groupId); + } + + protected: + std::string thisHost; + + std::shared_ptr req = nullptr; +}; + +TEST_CASE_METHOD(PointToPointGroupFixture, + "Test lock requests", + "[ptp][transport]") +{ + std::string otherHost = "other"; + + int appId = 123; + int groupId = 345; + int groupIdx = 1; + + faabric::util::SchedulingDecision decision(appId, groupId); + + faabric::Message msgA = faabric::util::messageFactory("foo", "bar"); + msgA.set_appid(appId); + msgA.set_groupid(groupId); + msgA.set_appidx(0); + msgA.set_groupidx(0); + decision.addMessage(otherHost, msgA); + + faabric::Message msgB = faabric::util::messageFactory("foo", "bar"); + msgB.set_appid(appId); + msgB.set_groupid(groupId); + msgB.set_appidx(groupIdx); + msgB.set_groupidx(groupIdx); + decision.addMessage(thisHost, msgB); + + broker.setUpLocalMappingsFromSchedulingDecision(decision); + auto group = PointToPointGroup::getGroup(groupId); + + PointToPointCall op; + + std::vector data(1, 0); + + bool recursive = false; + + SECTION("Lock") + { + op = PointToPointCall::LOCK_GROUP; + + // Prepare response + broker.sendMessage(groupId, + POINT_TO_POINT_MASTER_IDX, + groupIdx, + data.data(), + data.size()); + + group->lock(groupIdx, false); + } + + SECTION("Lock recursive") + { + op = PointToPointCall::LOCK_GROUP_RECURSIVE; + recursive = true; + + // Prepare response + broker.sendMessage(groupId, + POINT_TO_POINT_MASTER_IDX, + groupIdx, + data.data(), + data.size()); + + group->lock(groupIdx, recursive); + } + + SECTION("Unlock") + { + op = PointToPointCall::UNLOCK_GROUP; + group->unlock(groupIdx, false); + } + + SECTION("Unlock recursive") + { + op = PointToPointCall::UNLOCK_GROUP_RECURSIVE; + recursive = true; + group->unlock(groupIdx, recursive); + } + + std::vector< + std::tuple> + actualRequests = getSentLockMessages(); + + REQUIRE(actualRequests.size() == 1); + REQUIRE(std::get<0>(actualRequests.at(0)) == otherHost); + + PointToPointCall actualOp = std::get<1>(actualRequests.at(0)); + REQUIRE(actualOp == op); + + faabric::PointToPointMessage req = std::get<2>(actualRequests.at(0)); + REQUIRE(req.appid() == appId); + REQUIRE(req.groupid() == groupId); + REQUIRE(req.sendidx() == groupIdx); + REQUIRE(req.recvidx() == POINT_TO_POINT_MASTER_IDX); +} + +TEST_CASE_METHOD(PointToPointGroupFixture, + "Test local locking and unlocking", + "[ptp][transport]") +{ + std::atomic sharedInt = 0; + int appId = 123; + int groupId = 234; + + // Arbitrary group size, local locks don't care + auto group = setUpGroup(appId, groupId, 3); + + group->localLock(); + + std::thread tA([&group, &sharedInt] { + group->localLock(); + + assert(sharedInt == 99); + sharedInt = 88; + + group->localUnlock(); + }); + + // Main thread sleep for a while, make sure the other can't run and update + // the counter + SLEEP_MS(1000); + + REQUIRE(sharedInt == 0); + sharedInt.store(99); + + group->localUnlock(); + + if (tA.joinable()) { + tA.join(); + } + + REQUIRE(sharedInt == 88); +} + +TEST_CASE_METHOD(PointToPointGroupFixture, + "Test distributed coordination barrier", + "[ptp][transport]") +{ + int nThreads = 5; + int appId = 123; + int groupId = 555; + + auto group = setUpGroup(appId, groupId, nThreads); + + int nSums = 2; + SECTION("Single operation") { nSums = 1; } + + SECTION("Lots of operations") + { + // We want to do this as many times as possible to deliberately create + // contention + nSums = 1000; + } + + // Spawn n-1 child threads to add to shared sums over several barriers so + // that the main thread can check all threads have completed after each. + std::vector> sharedSums(nSums); + std::vector threads; + for (int i = 1; i < nThreads; i++) { + threads.emplace_back([&group, i, nSums, &sharedSums] { + for (int s = 0; s < nSums; s++) { + sharedSums.at(s).fetch_add(s + 1); + group->barrier(i); + } + }); + } + + for (int i = 0; i < nSums; i++) { + group->barrier(POINT_TO_POINT_MASTER_IDX); + REQUIRE(sharedSums.at(i).load() == (i + 1) * (nThreads - 1)); + } + + // Join all child threads + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } +} + +TEST_CASE_METHOD(PointToPointGroupFixture, + "Test local try lock", + "[ptp][transport]") +{ + // Set up one group + int nThreads = 5; + int appId = 11; + int groupId = 111; + + auto group = setUpGroup(appId, groupId, nThreads); + + // Set up another group + int otherAppId = 22; + int otherGroupId = 222; + + auto otherGroup = setUpGroup(otherAppId, otherGroupId, nThreads); + + // Should work for un-acquired lock + REQUIRE(group->localTryLock()); + + // Should also work for another lock + REQUIRE(otherGroup->localTryLock()); + + // Should not work for already-acquired locks + REQUIRE(!group->localTryLock()); + REQUIRE(!otherGroup->localTryLock()); + + // Should work again after unlock + group->localUnlock(); + + REQUIRE(group->localTryLock()); + REQUIRE(!otherGroup->localTryLock()); + + // Running again should have no effect + group->localUnlock(); + + // Unlock other group + otherGroup->localUnlock(); + + REQUIRE(group->localTryLock()); + REQUIRE(otherGroup->localTryLock()); + + group->localUnlock(); + otherGroup->localUnlock(); +} + +TEST_CASE_METHOD(PointToPointGroupFixture, + "Test notify and await", + "[ptp][transport]") +{ + int nThreads = 4; + int actual[4] = { 0, 0, 0, 0 }; + + int appId = 11; + int groupId = 111; + + auto group = setUpGroup(appId, groupId, nThreads); + + // Run threads in background to force a wait from the master + std::vector threads; + for (int i = 1; i < nThreads; i++) { + threads.emplace_back([&group, i, &actual] { + SLEEP_MS(1000); + actual[i] = i; + + group->notify(i); + }); + } + + // Master thread to await, should only go through once all threads have + // finished + group->notify(POINT_TO_POINT_MASTER_IDX); + + for (int i = 0; i < nThreads; i++) { + REQUIRE(actual[i] == i); + } + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } +} +} diff --git a/tests/test/util/test_scheduling.cpp b/tests/test/util/test_scheduling.cpp index c4180cf94..d3b45be34 100644 --- a/tests/test/util/test_scheduling.cpp +++ b/tests/test/util/test_scheduling.cpp @@ -13,6 +13,7 @@ namespace tests { TEST_CASE("Test building scheduling decisions", "[util]") { int appId = 123; + int groupId = 345; std::string hostA = "hostA"; std::string hostB = "hostB"; @@ -20,7 +21,7 @@ TEST_CASE("Test building scheduling decisions", "[util]") auto req = batchExecFactory("foo", "bar", 3); - SchedulingDecision decision(appId); + SchedulingDecision decision(appId, groupId); faabric::Message msgA = req->mutable_messages()->at(0); faabric::Message msgB = req->mutable_messages()->at(1); @@ -32,11 +33,12 @@ TEST_CASE("Test building scheduling decisions", "[util]") std::vector expectedMsgIds = { msgA.id(), msgB.id(), msgC.id() }; std::vector expectedHosts = { hostB, hostA, hostC }; - std::vector expectedAppIdxs = { msgA.appindex(), - msgB.appindex(), - msgC.appindex() }; + std::vector expectedAppIdxs = { msgA.appidx(), + msgB.appidx(), + msgC.appidx() }; REQUIRE(decision.appId == appId); + REQUIRE(decision.groupId == groupId); REQUIRE(decision.nFunctions == 3); REQUIRE(decision.messageIds == expectedMsgIds); REQUIRE(decision.hosts == expectedHosts); @@ -47,31 +49,38 @@ TEST_CASE("Test converting point-to-point mappings to scheduling decisions", "[util]") { int appId = 123; + int groupId = 345; - int idxA = 22; + int appIdxA = 2; + int groupIdxA = 22; int msgIdA = 222; std::string hostA = "foobar"; - int idxB = 33; + int appIdxB = 3; + int groupIdxB = 33; int msgIdB = 333; std::string hostB = "bazbaz"; - std::vector expectedIdxs = { idxA, idxB }; + std::vector expectedAppIdxs = { appIdxA, appIdxB }; + std::vector expectedGroupIdxs = { groupIdxA, groupIdxB }; std::vector expectedMessageIds = { msgIdA, msgIdB }; std::vector expectedHosts = { hostA, hostB }; faabric::PointToPointMappings mappings; mappings.set_appid(appId); + mappings.set_groupid(groupId); auto* mappingA = mappings.add_mappings(); mappingA->set_host(hostA); mappingA->set_messageid(msgIdA); - mappingA->set_recvidx(idxA); + mappingA->set_appidx(appIdxA); + mappingA->set_groupidx(groupIdxA); auto* mappingB = mappings.add_mappings(); mappingB->set_host(hostB); mappingB->set_messageid(msgIdB); - mappingB->set_recvidx(idxB); + mappingB->set_appidx(appIdxB); + mappingB->set_groupidx(groupIdxB); auto actual = faabric::util::SchedulingDecision::fromPointToPointMappings(mappings); @@ -79,7 +88,8 @@ TEST_CASE("Test converting point-to-point mappings to scheduling decisions", REQUIRE(actual.appId == appId); REQUIRE(actual.nFunctions == 2); - REQUIRE(actual.appIdxs == expectedIdxs); + REQUIRE(actual.appIdxs == expectedAppIdxs); + REQUIRE(actual.groupIdxs == expectedGroupIdxs); REQUIRE(actual.messageIds == expectedMessageIds); REQUIRE(actual.hosts == expectedHosts); } diff --git a/tests/test/util/test_strings.cpp b/tests/test/util/test_strings.cpp index 610c2a216..b0b44fc81 100644 --- a/tests/test/util/test_strings.cpp +++ b/tests/test/util/test_strings.cpp @@ -58,4 +58,17 @@ TEST_CASE("Test string is int", "[util]") REQUIRE(!stringIsInt("abcd")); REQUIRE(!stringIsInt("12a33")); } + +TEST_CASE("Test vector to string for ints", "[util]") +{ + std::vector vec = { -1, 1, -2, 3 }; + REQUIRE(faabric::util::vectorToString(vec) == "[-1, 1, -2, 3]"); +} + +TEST_CASE("Test vector to string for strings", "[util]") +{ + std::vector vec = { "foo", "blah", "baz" }; + REQUIRE(faabric::util::vectorToString(vec) == + "[foo, blah, baz]"); +} } diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index 65e490cf8..ebd5c60f8 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -293,4 +294,22 @@ class PointToPointTestFixture protected: faabric::transport::PointToPointBroker& broker; }; + +class PointToPointClientServerFixture + : public PointToPointTestFixture + , SchedulerTestFixture +{ + public: + PointToPointClientServerFixture() + : cli(LOCALHOST) + { + server.start(); + } + + ~PointToPointClientServerFixture() { server.stop(); } + + protected: + faabric::transport::PointToPointClient cli; + faabric::transport::PointToPointServer server; +}; }