From aace59d07d40c5203efd9cdb398ce61f33c2fe1e Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 27 Oct 2021 16:08:48 +0000 Subject: [PATCH 01/15] add call records and tests --- include/faabric/util/tracing.h | 37 ++++++++++ src/proto/faabric.proto | 13 ++++ src/scheduler/Executor.cpp | 7 ++ src/scheduler/MpiWorld.cpp | 11 +++ src/util/CMakeLists.txt | 1 + src/util/tracing.cpp | 116 +++++++++++++++++++++++++++++++ tests/test/util/test_tracing.cpp | 44 ++++++++++++ 7 files changed, 229 insertions(+) create mode 100644 include/faabric/util/tracing.h create mode 100644 src/util/tracing.cpp create mode 100644 tests/test/util/test_tracing.cpp diff --git a/include/faabric/util/tracing.h b/include/faabric/util/tracing.h new file mode 100644 index 000000000..0a09e78f0 --- /dev/null +++ b/include/faabric/util/tracing.h @@ -0,0 +1,37 @@ +#pragma once + +#include + +#include +#include + +namespace faabric::util::tracing { +enum RecordType +{ + MpiPerRankMessageCount +}; + +class CallRecords +{ + public: + void startRecording(const faabric::Message& msg); + + void stopRecording(faabric::Message& msg); + + void addRecord(int msgId, RecordType recordType, int idToIncrement); + + private: + std::shared_ptr linkedMsg = nullptr; + + std::list onGoingRecordings; + + void loadRecordsToMessage(faabric::CallRecords& callRecords, + const RecordType& recordType); + + // ----- Per record type data structures ----- + + std::map perRankMsgCount; +}; + +CallRecords& getCallRecords(); +} diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 65b2d800c..74874da75 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -95,6 +95,19 @@ message MpiHostsToRanksMessage { repeated int32 basePorts = 2; } +// --------------------------------------------- +// PROFILING +// --------------------------------------------- + +message MpiPerRankMessageCount { + repeated int32 ranks = 1; + repeated int32 numMessages = 2; +} + +message CallRecords { + MpiPerRankMessageCount mpiMsgCount = 1; +} + message Message { int32 id = 1; int32 appId = 2; diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 0a2c77c67..3b95dbd65 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #define POOL_SHUTDOWN -1 @@ -226,6 +227,9 @@ void Executor::threadPoolThread(int threadPoolIdx) msg.id(), isThreads); + // Start recording calls in non-release builds + faabric::util::tracing::getCallRecords().startRecording(msg); + int32_t returnValue; try { returnValue = @@ -242,6 +246,9 @@ void Executor::threadPoolThread(int threadPoolIdx) // Set the return value msg.set_returnvalue(returnValue); + // Stop recording calls + faabric::util::tracing::getCallRecords().stopRecording(msg); + // Decrement the task count int oldTaskCount = task.batchCounter->fetch_sub(1); assert(oldTaskCount >= 0); diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index 79908a3ae..7abc336c2 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -6,6 +6,7 @@ #include #include #include +#include // Each MPI rank runs in a separate thread, thus we use TLS to maintain the // per-rank data structures @@ -33,6 +34,9 @@ static thread_local std::unordered_map< std::unique_ptr> ranksSendEndpoints; +// Id of the message that created this thread-local instance +static thread_local int thisMsgId; + // This is used for mocking in tests static std::vector rankMessages; @@ -177,6 +181,7 @@ void MpiWorld::create(const faabric::Message& call, int newId, int newSize) id = newId; user = call.user(); function = call.function(); + thisMsgId = call.id(); size = newSize; @@ -245,6 +250,7 @@ void MpiWorld::destroy() SPDLOG_TRACE("Destroying MPI world {}", id); // Note that all ranks will call this function. + thisMsgId = 0; // We must force the destructors for all message endpoints to run here // rather than at the end of their global thread-local lifespan. If we @@ -292,6 +298,7 @@ void MpiWorld::initialiseFromMsg(const faabric::Message& msg) user = msg.user(); function = msg.function(); size = msg.mpiworldsize(); + thisMsgId = msg.id(); // Block until we receive faabric::MpiHostsToRanksMessage hostRankMsg = recvMpiHostRankMsg(); @@ -572,6 +579,10 @@ void MpiWorld::send(int sendRank, SPDLOG_TRACE("MPI - send remote {} -> {}", sendRank, recvRank); sendRemoteMpiMessage(sendRank, recvRank, m); } + + // In non-release builds, track that we have sent this message + faabric::util::tracing::getCallRecords().addRecord(thisMsgId, + faabric::util::tracing::RecordType::MpiPerRankMessageCount, recvRank); } void MpiWorld::recv(int sendRank, diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index bee41ed71..7d1110cdf 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -21,6 +21,7 @@ faabric_lib(util snapshot.cpp state.cpp string_tools.cpp + tracing.cpp timing.cpp testing.cpp ) diff --git a/src/util/tracing.cpp b/src/util/tracing.cpp new file mode 100644 index 000000000..af31c4104 --- /dev/null +++ b/src/util/tracing.cpp @@ -0,0 +1,116 @@ +#include +#include + +namespace faabric::util::tracing { +void CallRecords::startRecording(const faabric::Message& msg) +{ +#ifndef NDEBUG + if (linkedMsg != nullptr && linkedMsg->id() != msg.id()) { + SPDLOG_ERROR("CallRecords already linked to a different message: (linked: {} != provided: {})", + linkedMsg->id(), msg.id()); + throw std::runtime_error("CallRecords linked to a different message"); + } else if (linkedMsg == nullptr) { + linkedMsg = std::make_shared(msg); + } +#else + ; +#endif +} + +void CallRecords::stopRecording(faabric::Message& msg) +{ +#ifndef NDEBUG + if (linkedMsg == nullptr || linkedMsg->id() != msg.id()) { + SPDLOG_ERROR("CallRecords not linked to the right message: (linked: {} != provided: {})", + linkedMsg->id(), msg.id()); + throw std::runtime_error("CallRecords linked to a different message"); + } + + linkedMsg = nullptr; + + // Update the actual faabric message + faabric::CallRecords recordsMsg; + for (const auto& recordType : onGoingRecordings) { + loadRecordsToMessage(recordsMsg, recordType); + } + + // Update the original message + *msg.mutable_records() = recordsMsg; +#else + ; +#endif +} + +void CallRecords::loadRecordsToMessage(faabric::CallRecords& callRecords, + const RecordType& recordType) +{ +#ifndef NDEBUG + switch (recordType) { + case (faabric::util::tracing::RecordType::MpiPerRankMessageCount): { + faabric::MpiPerRankMessageCount msgCount; + + for (const auto& it : perRankMsgCount) { + msgCount.add_ranks(it.first); + msgCount.add_nummessages(it.second); + } + + *callRecords.mutable_mpimsgcount() = msgCount; + break; + } + default: { + SPDLOG_ERROR("Unsupported record type: {}", recordType); + throw std::runtime_error("Unsupported record type"); + } + } +#else + ; +#endif +} + +void CallRecords::addRecord(int msgId, RecordType recordType, int idToIncrement) +{ +#ifndef NDEBUG + // Check message id + if (linkedMsg == nullptr || linkedMsg->id() != msgId) { + SPDLOG_ERROR("CallRecords not linked to the right message: (linked: {} != provided: {})", + linkedMsg->id(), msgId); + throw std::runtime_error("CallRecords linked to a different message"); + } + + // Add the record to the list of on going records if it is not there + bool mustInit = false; + auto it = std::find(onGoingRecordings.begin(), onGoingRecordings.end(), recordType); + if (it == onGoingRecordings.end()) { + onGoingRecordings.push_back(recordType); + mustInit = true; + } + + // Finally increment the corresponding record list + switch (recordType) { + case (faabric::util::tracing::RecordType::MpiPerRankMessageCount): { + if (mustInit) { + for (int i = 0; i < linkedMsg->mpiworldsize(); i++) { + perRankMsgCount[i] = 0; + } + } + + ++perRankMsgCount.at(idToIncrement); + break; + } + default: { + SPDLOG_ERROR("Unsupported record type: {}", recordType); + throw std::runtime_error("Unsupported record type"); + } + } +#else + ; +#endif +} + + +CallRecords& getCallRecords() +{ + static thread_local CallRecords callRecords; + return callRecords; +} +} diff --git a/tests/test/util/test_tracing.cpp b/tests/test/util/test_tracing.cpp new file mode 100644 index 000000000..bb55a2963 --- /dev/null +++ b/tests/test/util/test_tracing.cpp @@ -0,0 +1,44 @@ +#include + +#include "faabric_utils.h" + +#include +#include +#include // DELETE MEE +#include +#include + +namespace tests { +TEST_CASE_METHOD(MpiTestFixture, + "Test tracing the number of MPI messages", + "[util][tracing]") +{ + faabric::util::tracing::getCallRecords().startRecording(msg); + + // Send one message + int rankA1 = 0; + int rankA2 = 1; + MPI_Status status{}; + + std::vector messageData = { 0, 1, 2 }; + auto buffer = new int[messageData.size()]; + + world.send( + rankA1, rankA2, BYTES(messageData.data()), MPI_INT, messageData.size()); + world.recv( + rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); + + // Stop recording and check we have only recorded one message + faabric::util::tracing::getCallRecords().stopRecording(msg); + REQUIRE(msg.has_records()); + REQUIRE(msg.records().has_mpimsgcount()); + REQUIRE(msg.records().mpimsgcount().ranks_size() == worldSize); + for (int i = 0; i < worldSize; i++) { + if (i == rankA2) { + REQUIRE(msg.records().mpimsgcount().nummessages(i) == 1); + } else { + REQUIRE(msg.records().mpimsgcount().nummessages(i) == 0); + } + } +} +} From 7624ff9427769390c76822843998c3606b832c94 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 27 Oct 2021 16:13:01 +0000 Subject: [PATCH 02/15] formatting --- src/proto/faabric.proto | 2 +- src/scheduler/Executor.cpp | 2 +- src/scheduler/MpiWorld.cpp | 6 ++++-- src/util/tracing.cpp | 22 ++++++++++++++-------- tests/test/util/test_tracing.cpp | 19 +++++++++++++------ 5 files changed, 33 insertions(+), 18 deletions(-) diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 74874da75..60e92f789 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -96,7 +96,7 @@ message MpiHostsToRanksMessage { } // --------------------------------------------- -// PROFILING +// TRACING // --------------------------------------------- message MpiPerRankMessageCount { diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 3b95dbd65..f9ac1e6b0 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -11,8 +11,8 @@ #include #include #include -#include #include +#include #define POOL_SHUTDOWN -1 diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index 7abc336c2..e58740d2c 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -581,8 +581,10 @@ void MpiWorld::send(int sendRank, } // In non-release builds, track that we have sent this message - faabric::util::tracing::getCallRecords().addRecord(thisMsgId, - faabric::util::tracing::RecordType::MpiPerRankMessageCount, recvRank); + faabric::util::tracing::getCallRecords().addRecord( + thisMsgId, + faabric::util::tracing::RecordType::MpiPerRankMessageCount, + recvRank); } void MpiWorld::recv(int sendRank, diff --git a/src/util/tracing.cpp b/src/util/tracing.cpp index af31c4104..52777f6b9 100644 --- a/src/util/tracing.cpp +++ b/src/util/tracing.cpp @@ -6,8 +6,10 @@ void CallRecords::startRecording(const faabric::Message& msg) { #ifndef NDEBUG if (linkedMsg != nullptr && linkedMsg->id() != msg.id()) { - SPDLOG_ERROR("CallRecords already linked to a different message: (linked: {} != provided: {})", - linkedMsg->id(), msg.id()); + SPDLOG_ERROR("Error starting recording, records not linked to the right" + " message: (linked: {} != provided: {})", + linkedMsg->id(), + msg.id()); throw std::runtime_error("CallRecords linked to a different message"); } else if (linkedMsg == nullptr) { linkedMsg = std::make_shared(msg); @@ -21,8 +23,10 @@ void CallRecords::stopRecording(faabric::Message& msg) { #ifndef NDEBUG if (linkedMsg == nullptr || linkedMsg->id() != msg.id()) { - SPDLOG_ERROR("CallRecords not linked to the right message: (linked: {} != provided: {})", - linkedMsg->id(), msg.id()); + SPDLOG_ERROR("Error stopping recording, records not linked to the right" + " message: (linked: {} != provided: {})", + linkedMsg->id(), + msg.id()); throw std::runtime_error("CallRecords linked to a different message"); } @@ -72,14 +76,17 @@ void CallRecords::addRecord(int msgId, RecordType recordType, int idToIncrement) #ifndef NDEBUG // Check message id if (linkedMsg == nullptr || linkedMsg->id() != msgId) { - SPDLOG_ERROR("CallRecords not linked to the right message: (linked: {} != provided: {})", - linkedMsg->id(), msgId); + SPDLOG_ERROR("CallRecords not linked to the right message: (linked: {} " + "!= provided: {})", + linkedMsg->id(), + msgId); throw std::runtime_error("CallRecords linked to a different message"); } // Add the record to the list of on going records if it is not there bool mustInit = false; - auto it = std::find(onGoingRecordings.begin(), onGoingRecordings.end(), recordType); + auto it = + std::find(onGoingRecordings.begin(), onGoingRecordings.end(), recordType); if (it == onGoingRecordings.end()) { onGoingRecordings.push_back(recordType); mustInit = true; @@ -107,7 +114,6 @@ void CallRecords::addRecord(int msgId, RecordType recordType, int idToIncrement) #endif } - CallRecords& getCallRecords() { static thread_local CallRecords callRecords; diff --git a/tests/test/util/test_tracing.cpp b/tests/test/util/test_tracing.cpp index bb55a2963..440b01629 100644 --- a/tests/test/util/test_tracing.cpp +++ b/tests/test/util/test_tracing.cpp @@ -1,4 +1,4 @@ -#include +#include #include "faabric_utils.h" @@ -23,10 +23,17 @@ TEST_CASE_METHOD(MpiTestFixture, std::vector messageData = { 0, 1, 2 }; auto buffer = new int[messageData.size()]; - world.send( - rankA1, rankA2, BYTES(messageData.data()), MPI_INT, messageData.size()); - world.recv( - rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); + int numToSend = 10; + + for (int i = 0; i < numToSend; i++) { + world.send(rankA1, + rankA2, + BYTES(messageData.data()), + MPI_INT, + messageData.size()); + world.recv( + rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); + } // Stop recording and check we have only recorded one message faabric::util::tracing::getCallRecords().stopRecording(msg); @@ -35,7 +42,7 @@ TEST_CASE_METHOD(MpiTestFixture, REQUIRE(msg.records().mpimsgcount().ranks_size() == worldSize); for (int i = 0; i < worldSize; i++) { if (i == rankA2) { - REQUIRE(msg.records().mpimsgcount().nummessages(i) == 1); + REQUIRE(msg.records().mpimsgcount().nummessages(i) == numToSend); } else { REQUIRE(msg.records().mpimsgcount().nummessages(i) == 0); } From 7ba6cfe37b5c42e70afe2c18569da936eae7b2ef Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 27 Oct 2021 18:53:01 +0000 Subject: [PATCH 03/15] disable tracing by default during tests --- include/faabric/util/tracing.h | 4 +++ src/util/tracing.cpp | 62 +++++++++++++++++++++----------- tests/test/util/test_tracing.cpp | 6 +++- 3 files changed, 50 insertions(+), 22 deletions(-) diff --git a/include/faabric/util/tracing.h b/include/faabric/util/tracing.h index 0a09e78f0..9d1b66c10 100644 --- a/include/faabric/util/tracing.h +++ b/include/faabric/util/tracing.h @@ -25,6 +25,10 @@ class CallRecords std::list onGoingRecordings; + void checkMessageLinked(int msgId); + + void checkMessageNotLinked(); + void loadRecordsToMessage(faabric::CallRecords& callRecords, const RecordType& recordType); diff --git a/src/util/tracing.cpp b/src/util/tracing.cpp index 52777f6b9..5f8b53b32 100644 --- a/src/util/tracing.cpp +++ b/src/util/tracing.cpp @@ -1,19 +1,20 @@ #include +#include #include namespace faabric::util::tracing { void CallRecords::startRecording(const faabric::Message& msg) { #ifndef NDEBUG - if (linkedMsg != nullptr && linkedMsg->id() != msg.id()) { - SPDLOG_ERROR("Error starting recording, records not linked to the right" - " message: (linked: {} != provided: {})", - linkedMsg->id(), - msg.id()); - throw std::runtime_error("CallRecords linked to a different message"); - } else if (linkedMsg == nullptr) { - linkedMsg = std::make_shared(msg); + // In the tests there's not a thread to message mapping as we sometimes + // spawn extra threads to mock work. Thus, we skip this check here. + if (faabric::util::isTestMode()) { + return; } + + checkMessageNotLinked(); + + linkedMsg = std::make_shared(msg); #else ; #endif @@ -22,14 +23,14 @@ void CallRecords::startRecording(const faabric::Message& msg) void CallRecords::stopRecording(faabric::Message& msg) { #ifndef NDEBUG - if (linkedMsg == nullptr || linkedMsg->id() != msg.id()) { - SPDLOG_ERROR("Error stopping recording, records not linked to the right" - " message: (linked: {} != provided: {})", - linkedMsg->id(), - msg.id()); - throw std::runtime_error("CallRecords linked to a different message"); + // In the tests there's not a thread to message mapping as we sometimes + // spawn extra threads to mock work. Thus, we skip this check here. + if (faabric::util::isTestMode()) { + return; } + checkMessageLinked(msg.id()); + linkedMsg = nullptr; // Update the actual faabric message @@ -45,6 +46,28 @@ void CallRecords::stopRecording(faabric::Message& msg) #endif } +void CallRecords::checkMessageLinked(int msgId) +{ + if (linkedMsg == nullptr || linkedMsg->id() != msgId) { + SPDLOG_ERROR("Error during recording, records not linked to the right" + " message: (linked: {} != provided: {})", + linkedMsg == nullptr ? "nullptr" + : std::to_string(linkedMsg->id()), + msgId); + throw std::runtime_error("CallRecords linked to a different message"); + } +} + +void CallRecords::checkMessageNotLinked() +{ + if (linkedMsg != nullptr) { + SPDLOG_ERROR("Error starting recording, record already linked to" + "another message: {}", + linkedMsg->id()); + throw std::runtime_error("CallRecords linked to a different message"); + } +} + void CallRecords::loadRecordsToMessage(faabric::CallRecords& callRecords, const RecordType& recordType) { @@ -74,15 +97,12 @@ void CallRecords::loadRecordsToMessage(faabric::CallRecords& callRecords, void CallRecords::addRecord(int msgId, RecordType recordType, int idToIncrement) { #ifndef NDEBUG - // Check message id - if (linkedMsg == nullptr || linkedMsg->id() != msgId) { - SPDLOG_ERROR("CallRecords not linked to the right message: (linked: {} " - "!= provided: {})", - linkedMsg->id(), - msgId); - throw std::runtime_error("CallRecords linked to a different message"); + if (faabric::util::isTestMode()) { + return; } + checkMessageLinked(msgId); + // Add the record to the list of on going records if it is not there bool mustInit = false; auto it = diff --git a/tests/test/util/test_tracing.cpp b/tests/test/util/test_tracing.cpp index 440b01629..ee997dd7c 100644 --- a/tests/test/util/test_tracing.cpp +++ b/tests/test/util/test_tracing.cpp @@ -4,7 +4,6 @@ #include #include -#include // DELETE MEE #include #include @@ -13,6 +12,9 @@ TEST_CASE_METHOD(MpiTestFixture, "Test tracing the number of MPI messages", "[util][tracing]") { + // Disable test mode to test tracing + faabric::util::setTestMode(false); + faabric::util::tracing::getCallRecords().startRecording(msg); // Send one message @@ -47,5 +49,7 @@ TEST_CASE_METHOD(MpiTestFixture, REQUIRE(msg.records().mpimsgcount().nummessages(i) == 0); } } + + faabric::util::setTestMode(true); } } From 5755bf9d84bb87ba15b7cc1ddd57dd1d8e90a8d9 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 28 Oct 2021 10:05:21 +0000 Subject: [PATCH 04/15] re-factor to be used depending on message flag, change the message layout and rename to ExecGraphDetail --- include/faabric/util/exec_graph.h | 56 +++++++++ include/faabric/util/tracing.h | 41 ------- src/proto/faabric.proto | 14 +-- src/scheduler/Executor.cpp | 6 +- src/scheduler/MpiWorld.cpp | 8 +- src/util/CMakeLists.txt | 2 +- src/util/exec_graph.cpp | 150 +++++++++++++++++++++++ src/util/func.cpp | 2 + src/util/tracing.cpp | 142 --------------------- tests/test/scheduler/test_exec_graph.cpp | 83 +++++++++++++ tests/test/util/test_tracing.cpp | 55 --------- 11 files changed, 302 insertions(+), 257 deletions(-) create mode 100644 include/faabric/util/exec_graph.h delete mode 100644 include/faabric/util/tracing.h create mode 100644 src/util/exec_graph.cpp delete mode 100644 src/util/tracing.cpp delete mode 100644 tests/test/util/test_tracing.cpp diff --git a/include/faabric/util/exec_graph.h b/include/faabric/util/exec_graph.h new file mode 100644 index 000000000..4da411a6f --- /dev/null +++ b/include/faabric/util/exec_graph.h @@ -0,0 +1,56 @@ +#pragma once + +#include + +#include +#include +#include + +namespace faabric::util { +class ExecGraphDetail +{ + public: + void startRecording(const faabric::Message& msg); + + void stopRecording(faabric::Message& msg); + + void addDetail(const int msgId, + const std::string& key, + const std::string& value); + + void incrementCounter(const int msgId, + const std::string& key, + const int valueToIncrement = 1); + + static inline std::string const mpiMsgCountPrefix = "mpi-msgcount-torank-"; + + private: + std::shared_ptr linkedMsg = nullptr; + + std::map detailsMap; + + std::map intDetailsMap; + + void checkMessageLinked(const int msgId); + + void checkMessageNotLinked(); + + // ----- Wrappers to no-op the functions if not recording ----- + + std::function + doAddDetail; + + void addDetailInternal(const int msgId, + const std::string& key, + const std::string& value); + + std::function + doIncrementCounter; + + void incrementCounterInternal(const int msgId, + const std::string& key, + const int valueToIncrement); +}; + +ExecGraphDetail& getExecGraphDetail(); +} diff --git a/include/faabric/util/tracing.h b/include/faabric/util/tracing.h deleted file mode 100644 index 9d1b66c10..000000000 --- a/include/faabric/util/tracing.h +++ /dev/null @@ -1,41 +0,0 @@ -#pragma once - -#include - -#include -#include - -namespace faabric::util::tracing { -enum RecordType -{ - MpiPerRankMessageCount -}; - -class CallRecords -{ - public: - void startRecording(const faabric::Message& msg); - - void stopRecording(faabric::Message& msg); - - void addRecord(int msgId, RecordType recordType, int idToIncrement); - - private: - std::shared_ptr linkedMsg = nullptr; - - std::list onGoingRecordings; - - void checkMessageLinked(int msgId); - - void checkMessageNotLinked(); - - void loadRecordsToMessage(faabric::CallRecords& callRecords, - const RecordType& recordType); - - // ----- Per record type data structures ----- - - std::map perRankMsgCount; -}; - -CallRecords& getCallRecords(); -} diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 60e92f789..3f11d0e55 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -95,17 +95,9 @@ message MpiHostsToRanksMessage { repeated int32 basePorts = 2; } -// --------------------------------------------- -// TRACING -// --------------------------------------------- - -message MpiPerRankMessageCount { - repeated int32 ranks = 1; - repeated int32 numMessages = 2; -} - -message CallRecords { - MpiPerRankMessageCount mpiMsgCount = 1; +message ExecGraphDetail { + string key = 1; + string value = 2; } message Message { diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index f9ac1e6b0..713f2be0c 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -12,7 +13,6 @@ #include #include #include -#include #define POOL_SHUTDOWN -1 @@ -228,7 +228,7 @@ void Executor::threadPoolThread(int threadPoolIdx) isThreads); // Start recording calls in non-release builds - faabric::util::tracing::getCallRecords().startRecording(msg); + faabric::util::getExecGraphDetail().startRecording(msg); int32_t returnValue; try { @@ -247,7 +247,7 @@ void Executor::threadPoolThread(int threadPoolIdx) msg.set_returnvalue(returnValue); // Stop recording calls - faabric::util::tracing::getCallRecords().stopRecording(msg); + faabric::util::getExecGraphDetail().stopRecording(msg); // Decrement the task count int oldTaskCount = task.batchCounter->fetch_sub(1); diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index e58740d2c..653060e40 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -1,12 +1,12 @@ #include #include #include +#include #include #include #include #include #include -#include // Each MPI rank runs in a separate thread, thus we use TLS to maintain the // per-rank data structures @@ -581,10 +581,10 @@ void MpiWorld::send(int sendRank, } // In non-release builds, track that we have sent this message - faabric::util::tracing::getCallRecords().addRecord( + faabric::util::getExecGraphDetail().incrementCounter( thisMsgId, - faabric::util::tracing::RecordType::MpiPerRankMessageCount, - recvRank); + faabric::util::ExecGraphDetail::mpiMsgCountPrefix + + std::to_string(recvRank)); } void MpiWorld::recv(int sendRank, diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 7d1110cdf..d155313b4 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -7,6 +7,7 @@ faabric_lib(util crash.cpp delta.cpp environment.cpp + exec_graph.cpp files.cpp func.cpp gids.cpp @@ -21,7 +22,6 @@ faabric_lib(util snapshot.cpp state.cpp string_tools.cpp - tracing.cpp timing.cpp testing.cpp ) diff --git a/src/util/exec_graph.cpp b/src/util/exec_graph.cpp new file mode 100644 index 000000000..8bbff232e --- /dev/null +++ b/src/util/exec_graph.cpp @@ -0,0 +1,150 @@ +#include +#include +#include + +namespace faabric::util { +void ExecGraphDetail::startRecording(const faabric::Message& msg) +{ + // In the tests there's not a thread to message mapping as we sometimes + // spawn extra threads to mock work. Thus, we skip this check here. + if (faabric::util::isTestMode()) { + return; + } + + checkMessageNotLinked(); + + linkedMsg = std::make_shared(msg); + + if (!msg.recordexecgraph()) { + doAddDetail = + [](const int, const std::string&, const std::string&) -> void { ; }; + doIncrementCounter = + [](const int, const std::string&, const int) -> void { ; }; + } else { + doAddDetail = [this](const int msgId, + const std::string& key, + const std::string& value) -> void { + this->addDetailInternal(msgId, key, value); + }; + doIncrementCounter = [this](const int msgId, + const std::string& key, + const int valueToIncrement) -> void { + this->incrementCounterInternal(msgId, key, valueToIncrement); + }; + } +} + +void ExecGraphDetail::stopRecording(faabric::Message& msg) +{ + // In the tests there's not a thread to message mapping as we sometimes + // spawn extra threads to mock work. Thus, we skip this check here. + if (faabric::util::isTestMode()) { + return; + } + + checkMessageLinked(msg.id()); + + for (const auto& it : detailsMap) { + faabric::ExecGraphDetail detail; + detail.set_key(it.first); + detail.set_value(it.second); + *msg.add_execgraphdetails() = detail; + SPDLOG_TRACE("Adding exec. graph detail to message. id: {} ; {}->{}", + linkedMsg->id(), + it.first, + it.second); + } + + for (const auto& it : intDetailsMap) { + if (detailsMap.find(it.first) != detailsMap.end()) { + SPDLOG_WARN( + "Replicated key in the exec graph details: {}->{} and {}->{}", + it.first, + detailsMap.at(it.first), + it.first, + it.second); + } + + faabric::ExecGraphDetail detail; + detail.set_key(it.first); + detail.set_value(std::to_string(it.second)); + *msg.add_execgraphdetails() = detail; + SPDLOG_TRACE("Adding exec. graph detail to message. id: {} ; {}->{}", + linkedMsg->id(), + it.first, + it.second); + } + + linkedMsg = nullptr; + detailsMap.clear(); + intDetailsMap.clear(); +} + +void ExecGraphDetail::checkMessageLinked(const int msgId) +{ + if (linkedMsg == nullptr || linkedMsg->id() != msgId) { + SPDLOG_ERROR("Error during recording, records not linked to the right" + " message: (linked: {} != provided: {})", + linkedMsg == nullptr ? "nullptr" + : std::to_string(linkedMsg->id()), + msgId); + throw std::runtime_error("CallRecords linked to a different message"); + } +} + +void ExecGraphDetail::checkMessageNotLinked() +{ + if (linkedMsg != nullptr) { + SPDLOG_ERROR("Error starting recording, record already linked to" + "another message: {}", + linkedMsg->id()); + throw std::runtime_error("CallRecords linked to a different message"); + } +} + +void ExecGraphDetail::addDetail(const int msgId, + const std::string& key, + const std::string& value) +{ + doAddDetail(msgId, key, value); +} + +void ExecGraphDetail::addDetailInternal(const int msgId, + const std::string& key, + const std::string& value) +{ + if (faabric::util::isTestMode()) { + return; + } + + checkMessageLinked(msgId); + + detailsMap[key] = value; +} + +void ExecGraphDetail::incrementCounter(const int msgId, + const std::string& key, + const int valueToIncrement) +{ + doIncrementCounter(msgId, key, valueToIncrement); +} + +void ExecGraphDetail::incrementCounterInternal(const int msgId, + const std::string& key, + const int valueToIncrement) +{ + if (faabric::util::isTestMode()) { + return; + } + + checkMessageLinked(msgId); + + intDetailsMap[key] += valueToIncrement; +} + +ExecGraphDetail& getExecGraphDetail() +{ + static thread_local ExecGraphDetail graphDetail; + return graphDetail; +} +} diff --git a/src/util/func.cpp b/src/util/func.cpp index 0bef897d7..58659af18 100644 --- a/src/util/func.cpp +++ b/src/util/func.cpp @@ -103,6 +103,8 @@ faabric::Message messageFactory(const std::string& user, std::string thisHost = faabric::util::getSystemConfig().endpointHost; msg.set_masterhost(thisHost); + msg.set_recordexecgraph(false); + return msg; } diff --git a/src/util/tracing.cpp b/src/util/tracing.cpp deleted file mode 100644 index 5f8b53b32..000000000 --- a/src/util/tracing.cpp +++ /dev/null @@ -1,142 +0,0 @@ -#include -#include -#include - -namespace faabric::util::tracing { -void CallRecords::startRecording(const faabric::Message& msg) -{ -#ifndef NDEBUG - // In the tests there's not a thread to message mapping as we sometimes - // spawn extra threads to mock work. Thus, we skip this check here. - if (faabric::util::isTestMode()) { - return; - } - - checkMessageNotLinked(); - - linkedMsg = std::make_shared(msg); -#else - ; -#endif -} - -void CallRecords::stopRecording(faabric::Message& msg) -{ -#ifndef NDEBUG - // In the tests there's not a thread to message mapping as we sometimes - // spawn extra threads to mock work. Thus, we skip this check here. - if (faabric::util::isTestMode()) { - return; - } - - checkMessageLinked(msg.id()); - - linkedMsg = nullptr; - - // Update the actual faabric message - faabric::CallRecords recordsMsg; - for (const auto& recordType : onGoingRecordings) { - loadRecordsToMessage(recordsMsg, recordType); - } - - // Update the original message - *msg.mutable_records() = recordsMsg; -#else - ; -#endif -} - -void CallRecords::checkMessageLinked(int msgId) -{ - if (linkedMsg == nullptr || linkedMsg->id() != msgId) { - SPDLOG_ERROR("Error during recording, records not linked to the right" - " message: (linked: {} != provided: {})", - linkedMsg == nullptr ? "nullptr" - : std::to_string(linkedMsg->id()), - msgId); - throw std::runtime_error("CallRecords linked to a different message"); - } -} - -void CallRecords::checkMessageNotLinked() -{ - if (linkedMsg != nullptr) { - SPDLOG_ERROR("Error starting recording, record already linked to" - "another message: {}", - linkedMsg->id()); - throw std::runtime_error("CallRecords linked to a different message"); - } -} - -void CallRecords::loadRecordsToMessage(faabric::CallRecords& callRecords, - const RecordType& recordType) -{ -#ifndef NDEBUG - switch (recordType) { - case (faabric::util::tracing::RecordType::MpiPerRankMessageCount): { - faabric::MpiPerRankMessageCount msgCount; - - for (const auto& it : perRankMsgCount) { - msgCount.add_ranks(it.first); - msgCount.add_nummessages(it.second); - } - - *callRecords.mutable_mpimsgcount() = msgCount; - break; - } - default: { - SPDLOG_ERROR("Unsupported record type: {}", recordType); - throw std::runtime_error("Unsupported record type"); - } - } -#else - ; -#endif -} - -void CallRecords::addRecord(int msgId, RecordType recordType, int idToIncrement) -{ -#ifndef NDEBUG - if (faabric::util::isTestMode()) { - return; - } - - checkMessageLinked(msgId); - - // Add the record to the list of on going records if it is not there - bool mustInit = false; - auto it = - std::find(onGoingRecordings.begin(), onGoingRecordings.end(), recordType); - if (it == onGoingRecordings.end()) { - onGoingRecordings.push_back(recordType); - mustInit = true; - } - - // Finally increment the corresponding record list - switch (recordType) { - case (faabric::util::tracing::RecordType::MpiPerRankMessageCount): { - if (mustInit) { - for (int i = 0; i < linkedMsg->mpiworldsize(); i++) { - perRankMsgCount[i] = 0; - } - } - - ++perRankMsgCount.at(idToIncrement); - break; - } - default: { - SPDLOG_ERROR("Unsupported record type: {}", recordType); - throw std::runtime_error("Unsupported record type"); - } - } -#else - ; -#endif -} - -CallRecords& getCallRecords() -{ - static thread_local CallRecords callRecords; - return callRecords; -} -} diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index 755053ce6..060382f85 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include using namespace scheduler; @@ -142,4 +143,86 @@ TEST_CASE_METHOD(MpiBaseTestFixture, checkExecGraphEquality(expected, actual); } + +TEST_CASE_METHOD(MpiTestFixture, + "Test tracing the number of MPI messages", + "[util][exec-graph]") +{ + // Disable test mode and set message flag to true + faabric::util::setTestMode(false); + msg.set_recordexecgraph(true); + + faabric::util::getExecGraphDetail().startRecording(msg); + + // Send one message + int rankA1 = 0; + int rankA2 = 1; + MPI_Status status{}; + + std::vector messageData = { 0, 1, 2 }; + auto buffer = new int[messageData.size()]; + + int numToSend = 10; + std::string expectedKey = + faabric::util::ExecGraphDetail::mpiMsgCountPrefix + + std::to_string(rankA2); + + for (int i = 0; i < numToSend; i++) { + world.send(rankA1, + rankA2, + BYTES(messageData.data()), + MPI_INT, + messageData.size()); + world.recv( + rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); + } + + // Stop recording and check we have only recorded one message + faabric::util::getExecGraphDetail().stopRecording(msg); + REQUIRE(msg.execgraphdetails_size() == 1); + REQUIRE(msg.execgraphdetails(0).key() == expectedKey); + REQUIRE(msg.execgraphdetails(0).value() == std::to_string(numToSend)); + + faabric::util::setTestMode(true); +} + +TEST_CASE_METHOD(MpiTestFixture, + "Test tracing is disabled if flag in message not set", + "[util][exec-graph]") +{ + // Disable test mode and set message flag to true + faabric::util::setTestMode(false); + msg.set_recordexecgraph(false); + + faabric::util::getExecGraphDetail().startRecording(msg); + + // Send one message + int rankA1 = 0; + int rankA2 = 1; + MPI_Status status{}; + + std::vector messageData = { 0, 1, 2 }; + auto buffer = new int[messageData.size()]; + + int numToSend = 10; + std::string expectedKey = + faabric::util::ExecGraphDetail::mpiMsgCountPrefix + + std::to_string(rankA2); + + for (int i = 0; i < numToSend; i++) { + world.send(rankA1, + rankA2, + BYTES(messageData.data()), + MPI_INT, + messageData.size()); + world.recv( + rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); + } + + // Stop recording and check we have only recorded one message + faabric::util::getExecGraphDetail().stopRecording(msg); + REQUIRE(msg.execgraphdetails_size() == 0); + + faabric::util::setTestMode(true); +} } diff --git a/tests/test/util/test_tracing.cpp b/tests/test/util/test_tracing.cpp deleted file mode 100644 index ee997dd7c..000000000 --- a/tests/test/util/test_tracing.cpp +++ /dev/null @@ -1,55 +0,0 @@ -#include - -#include "faabric_utils.h" - -#include -#include -#include -#include - -namespace tests { -TEST_CASE_METHOD(MpiTestFixture, - "Test tracing the number of MPI messages", - "[util][tracing]") -{ - // Disable test mode to test tracing - faabric::util::setTestMode(false); - - faabric::util::tracing::getCallRecords().startRecording(msg); - - // Send one message - int rankA1 = 0; - int rankA2 = 1; - MPI_Status status{}; - - std::vector messageData = { 0, 1, 2 }; - auto buffer = new int[messageData.size()]; - - int numToSend = 10; - - for (int i = 0; i < numToSend; i++) { - world.send(rankA1, - rankA2, - BYTES(messageData.data()), - MPI_INT, - messageData.size()); - world.recv( - rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); - } - - // Stop recording and check we have only recorded one message - faabric::util::tracing::getCallRecords().stopRecording(msg); - REQUIRE(msg.has_records()); - REQUIRE(msg.records().has_mpimsgcount()); - REQUIRE(msg.records().mpimsgcount().ranks_size() == worldSize); - for (int i = 0; i < worldSize; i++) { - if (i == rankA2) { - REQUIRE(msg.records().mpimsgcount().nummessages(i) == numToSend); - } else { - REQUIRE(msg.records().mpimsgcount().nummessages(i) == 0); - } - } - - faabric::util::setTestMode(true); -} -} From a46a5e99dc1fcbe2b9150d11211404e928731da5 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 28 Oct 2021 10:31:29 +0000 Subject: [PATCH 05/15] update comments --- src/scheduler/Executor.cpp | 2 +- src/util/exec_graph.cpp | 2 ++ tests/test/scheduler/test_exec_graph.cpp | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 713f2be0c..a730f8817 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -227,7 +227,7 @@ void Executor::threadPoolThread(int threadPoolIdx) msg.id(), isThreads); - // Start recording calls in non-release builds + // Start recording calls faabric::util::getExecGraphDetail().startRecording(msg); int32_t returnValue; diff --git a/src/util/exec_graph.cpp b/src/util/exec_graph.cpp index 8bbff232e..c0fc44709 100644 --- a/src/util/exec_graph.cpp +++ b/src/util/exec_graph.cpp @@ -15,6 +15,8 @@ void ExecGraphDetail::startRecording(const faabric::Message& msg) linkedMsg = std::make_shared(msg); + // If message flag is not set, no-op the increment functions for minimal + // overhead if (!msg.recordexecgraph()) { doAddDetail = [](const int, const std::string&, const std::string&) -> void { ; }; diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index 060382f85..4f1b02d9e 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -219,7 +219,7 @@ TEST_CASE_METHOD(MpiTestFixture, rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); } - // Stop recording and check we have only recorded one message + // Stop recording and check we have recorded no message faabric::util::getExecGraphDetail().stopRecording(msg); REQUIRE(msg.execgraphdetails_size() == 0); From 2a668073f4be8d5a2e79f910d95d4e66b386e549 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 29 Oct 2021 08:31:34 +0000 Subject: [PATCH 06/15] refactor after offline discussion --- include/faabric/scheduler/MpiContext.h | 4 +- include/faabric/scheduler/MpiWorld.h | 4 +- include/faabric/scheduler/MpiWorldRegistry.h | 4 +- include/faabric/util/exec_graph.h | 53 +------ src/proto/faabric.proto | 10 +- src/scheduler/Executor.cpp | 6 - src/scheduler/MpiContext.cpp | 4 +- src/scheduler/MpiWorld.cpp | 25 ++- src/scheduler/MpiWorldRegistry.cpp | 4 +- src/util/exec_graph.cpp | 151 ++----------------- src/util/func.cpp | 3 + tests/test/scheduler/test_exec_graph.cpp | 64 +++++--- 12 files changed, 92 insertions(+), 240 deletions(-) diff --git a/include/faabric/scheduler/MpiContext.h b/include/faabric/scheduler/MpiContext.h index d898793f1..7fedc3de7 100644 --- a/include/faabric/scheduler/MpiContext.h +++ b/include/faabric/scheduler/MpiContext.h @@ -9,9 +9,9 @@ class MpiContext public: MpiContext(); - int createWorld(const faabric::Message& msg); + int createWorld(faabric::Message& msg); - void joinWorld(const faabric::Message& msg); + void joinWorld(faabric::Message& msg); bool getIsMpi(); diff --git a/include/faabric/scheduler/MpiWorld.h b/include/faabric/scheduler/MpiWorld.h index d927bb5c6..119abf28e 100644 --- a/include/faabric/scheduler/MpiWorld.h +++ b/include/faabric/scheduler/MpiWorld.h @@ -21,11 +21,11 @@ class MpiWorld public: MpiWorld(); - void create(const faabric::Message& call, int newId, int newSize); + void create(faabric::Message& call, int newId, int newSize); void broadcastHostsToRanks(); - void initialiseFromMsg(const faabric::Message& msg); + void initialiseFromMsg(faabric::Message& msg); std::string getHostForRank(int rank); diff --git a/include/faabric/scheduler/MpiWorldRegistry.h b/include/faabric/scheduler/MpiWorldRegistry.h index 303c099df..73ae0788a 100644 --- a/include/faabric/scheduler/MpiWorldRegistry.h +++ b/include/faabric/scheduler/MpiWorldRegistry.h @@ -8,11 +8,11 @@ class MpiWorldRegistry public: MpiWorldRegistry() = default; - scheduler::MpiWorld& createWorld(const faabric::Message& msg, + scheduler::MpiWorld& createWorld(faabric::Message& msg, int worldId, std::string hostOverride = ""); - scheduler::MpiWorld& getOrInitialiseWorld(const faabric::Message& msg); + scheduler::MpiWorld& getOrInitialiseWorld(faabric::Message& msg); scheduler::MpiWorld& getWorld(int worldId); diff --git a/include/faabric/util/exec_graph.h b/include/faabric/util/exec_graph.h index 4da411a6f..bc000a3ad 100644 --- a/include/faabric/util/exec_graph.h +++ b/include/faabric/util/exec_graph.h @@ -6,51 +6,14 @@ #include #include -namespace faabric::util { -class ExecGraphDetail -{ - public: - void startRecording(const faabric::Message& msg); +namespace faabric::util::exec_graph { +void addDetail(faabric::Message& msg, + const std::string& key, + const std::string& value); - void stopRecording(faabric::Message& msg); +void incrementCounter(faabric::Message& msg, + const std::string& key, + const int valueToIncrement = 1); - void addDetail(const int msgId, - const std::string& key, - const std::string& value); - - void incrementCounter(const int msgId, - const std::string& key, - const int valueToIncrement = 1); - - static inline std::string const mpiMsgCountPrefix = "mpi-msgcount-torank-"; - - private: - std::shared_ptr linkedMsg = nullptr; - - std::map detailsMap; - - std::map intDetailsMap; - - void checkMessageLinked(const int msgId); - - void checkMessageNotLinked(); - - // ----- Wrappers to no-op the functions if not recording ----- - - std::function - doAddDetail; - - void addDetailInternal(const int msgId, - const std::string& key, - const std::string& value); - - std::function - doIncrementCounter; - - void incrementCounterInternal(const int msgId, - const std::string& key, - const int valueToIncrement); -}; - -ExecGraphDetail& getExecGraphDetail(); +static inline std::string const mpiMsgCountPrefix = "mpi-msgcount-torank-"; } diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 3f11d0e55..20e89e65c 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -95,11 +95,6 @@ message MpiHostsToRanksMessage { repeated int32 basePorts = 2; } -message ExecGraphDetail { - string key = 1; - string value = 2; -} - message Message { int32 id = 1; int32 appId = 2; @@ -163,6 +158,11 @@ message Message { string sgxTag = 38; bytes sgxPolicy = 39; bytes sgxResult = 40; + + // Exec-graph utils + bool recordExecGraph = 41; + map intExecGraphDetails = 42; + map execGraphDetails = 43; } // --------------------------------------------- diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index a730f8817..875a62eb1 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -227,9 +227,6 @@ void Executor::threadPoolThread(int threadPoolIdx) msg.id(), isThreads); - // Start recording calls - faabric::util::getExecGraphDetail().startRecording(msg); - int32_t returnValue; try { returnValue = @@ -246,9 +243,6 @@ void Executor::threadPoolThread(int threadPoolIdx) // Set the return value msg.set_returnvalue(returnValue); - // Stop recording calls - faabric::util::getExecGraphDetail().stopRecording(msg); - // Decrement the task count int oldTaskCount = task.batchCounter->fetch_sub(1); assert(oldTaskCount >= 0); diff --git a/src/scheduler/MpiContext.cpp b/src/scheduler/MpiContext.cpp index 4674da0b1..ba895f3c9 100644 --- a/src/scheduler/MpiContext.cpp +++ b/src/scheduler/MpiContext.cpp @@ -11,7 +11,7 @@ MpiContext::MpiContext() , worldId(-1) {} -int MpiContext::createWorld(const faabric::Message& msg) +int MpiContext::createWorld(faabric::Message& msg) { if (msg.mpirank() > 0) { @@ -38,7 +38,7 @@ int MpiContext::createWorld(const faabric::Message& msg) return worldId; } -void MpiContext::joinWorld(const faabric::Message& msg) +void MpiContext::joinWorld(faabric::Message& msg) { if (!msg.ismpi()) { // Not an MPI call diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index 653060e40..956424db5 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -35,7 +35,7 @@ static thread_local std::unordered_map< ranksSendEndpoints; // Id of the message that created this thread-local instance -static thread_local int thisMsgId; +static thread_local faabric::Message* thisMsg = nullptr; // This is used for mocking in tests static std::vector rankMessages; @@ -176,12 +176,12 @@ MpiWorld::getUnackedMessageBuffer(int sendRank, int recvRank) return unackedMessageBuffers[index]; } -void MpiWorld::create(const faabric::Message& call, int newId, int newSize) +void MpiWorld::create(faabric::Message& call, int newId, int newSize) { id = newId; user = call.user(); function = call.function(); - thisMsgId = call.id(); + thisMsg = &call; size = newSize; @@ -249,9 +249,6 @@ void MpiWorld::destroy() { SPDLOG_TRACE("Destroying MPI world {}", id); - // Note that all ranks will call this function. - thisMsgId = 0; - // We must force the destructors for all message endpoints to run here // rather than at the end of their global thread-local lifespan. If we // don't, the ZMQ shutdown can hang. @@ -292,13 +289,13 @@ void MpiWorld::destroy() } } -void MpiWorld::initialiseFromMsg(const faabric::Message& msg) +void MpiWorld::initialiseFromMsg(faabric::Message& msg) { id = msg.mpiworldid(); user = msg.user(); function = msg.function(); size = msg.mpiworldsize(); - thisMsgId = msg.id(); + thisMsg = &msg; // Block until we receive faabric::MpiHostsToRanksMessage hostRankMsg = recvMpiHostRankMsg(); @@ -580,11 +577,13 @@ void MpiWorld::send(int sendRank, sendRemoteMpiMessage(sendRank, recvRank, m); } - // In non-release builds, track that we have sent this message - faabric::util::getExecGraphDetail().incrementCounter( - thisMsgId, - faabric::util::ExecGraphDetail::mpiMsgCountPrefix + - std::to_string(recvRank)); + // If the message is set and recording on, track we have sent this message + if (thisMsg != nullptr && thisMsg->recordexecgraph()) { + faabric::util::exec_graph::incrementCounter( + *thisMsg, + faabric::util::exec_graph::mpiMsgCountPrefix + + std::to_string(recvRank)); + } } void MpiWorld::recv(int sendRank, diff --git a/src/scheduler/MpiWorldRegistry.cpp b/src/scheduler/MpiWorldRegistry.cpp index 37a93866a..2a89647a8 100644 --- a/src/scheduler/MpiWorldRegistry.cpp +++ b/src/scheduler/MpiWorldRegistry.cpp @@ -10,7 +10,7 @@ MpiWorldRegistry& getMpiWorldRegistry() return r; } -scheduler::MpiWorld& MpiWorldRegistry::createWorld(const faabric::Message& msg, +scheduler::MpiWorld& MpiWorldRegistry::createWorld(faabric::Message& msg, int worldId, std::string hostOverride) { @@ -37,7 +37,7 @@ scheduler::MpiWorld& MpiWorldRegistry::createWorld(const faabric::Message& msg, return worldMap[worldId]; } -MpiWorld& MpiWorldRegistry::getOrInitialiseWorld(const faabric::Message& msg) +MpiWorld& MpiWorldRegistry::getOrInitialiseWorld(faabric::Message& msg) { // Create world locally if not exists int worldId = msg.mpiworldid(); diff --git a/src/util/exec_graph.cpp b/src/util/exec_graph.cpp index c0fc44709..79a6ab944 100644 --- a/src/util/exec_graph.cpp +++ b/src/util/exec_graph.cpp @@ -2,151 +2,22 @@ #include #include -namespace faabric::util { -void ExecGraphDetail::startRecording(const faabric::Message& msg) +namespace faabric::util::exec_graph { +void addDetail(faabric::Message& msg, + const std::string& key, + const std::string& value) { - // In the tests there's not a thread to message mapping as we sometimes - // spawn extra threads to mock work. Thus, we skip this check here. - if (faabric::util::isTestMode()) { - return; - } + auto& stringMap = *msg.mutable_execgraphdetails(); - checkMessageNotLinked(); - - linkedMsg = std::make_shared(msg); - - // If message flag is not set, no-op the increment functions for minimal - // overhead - if (!msg.recordexecgraph()) { - doAddDetail = - [](const int, const std::string&, const std::string&) -> void { ; }; - doIncrementCounter = - [](const int, const std::string&, const int) -> void { ; }; - } else { - doAddDetail = [this](const int msgId, - const std::string& key, - const std::string& value) -> void { - this->addDetailInternal(msgId, key, value); - }; - doIncrementCounter = [this](const int msgId, - const std::string& key, - const int valueToIncrement) -> void { - this->incrementCounterInternal(msgId, key, valueToIncrement); - }; - } -} - -void ExecGraphDetail::stopRecording(faabric::Message& msg) -{ - // In the tests there's not a thread to message mapping as we sometimes - // spawn extra threads to mock work. Thus, we skip this check here. - if (faabric::util::isTestMode()) { - return; - } - - checkMessageLinked(msg.id()); - - for (const auto& it : detailsMap) { - faabric::ExecGraphDetail detail; - detail.set_key(it.first); - detail.set_value(it.second); - *msg.add_execgraphdetails() = detail; - SPDLOG_TRACE("Adding exec. graph detail to message. id: {} ; {}->{}", - linkedMsg->id(), - it.first, - it.second); - } - - for (const auto& it : intDetailsMap) { - if (detailsMap.find(it.first) != detailsMap.end()) { - SPDLOG_WARN( - "Replicated key in the exec graph details: {}->{} and {}->{}", - it.first, - detailsMap.at(it.first), - it.first, - it.second); - } - - faabric::ExecGraphDetail detail; - detail.set_key(it.first); - detail.set_value(std::to_string(it.second)); - *msg.add_execgraphdetails() = detail; - SPDLOG_TRACE("Adding exec. graph detail to message. id: {} ; {}->{}", - linkedMsg->id(), - it.first, - it.second); - } - - linkedMsg = nullptr; - detailsMap.clear(); - intDetailsMap.clear(); -} - -void ExecGraphDetail::checkMessageLinked(const int msgId) -{ - if (linkedMsg == nullptr || linkedMsg->id() != msgId) { - SPDLOG_ERROR("Error during recording, records not linked to the right" - " message: (linked: {} != provided: {})", - linkedMsg == nullptr ? "nullptr" - : std::to_string(linkedMsg->id()), - msgId); - throw std::runtime_error("CallRecords linked to a different message"); - } + stringMap[key] = value; } -void ExecGraphDetail::checkMessageNotLinked() +void incrementCounter(faabric::Message& msg, + const std::string& key, + const int valueToIncrement) { - if (linkedMsg != nullptr) { - SPDLOG_ERROR("Error starting recording, record already linked to" - "another message: {}", - linkedMsg->id()); - throw std::runtime_error("CallRecords linked to a different message"); - } -} + auto& stringMap = *msg.mutable_intexecgraphdetails(); -void ExecGraphDetail::addDetail(const int msgId, - const std::string& key, - const std::string& value) -{ - doAddDetail(msgId, key, value); -} - -void ExecGraphDetail::addDetailInternal(const int msgId, - const std::string& key, - const std::string& value) -{ - if (faabric::util::isTestMode()) { - return; - } - - checkMessageLinked(msgId); - - detailsMap[key] = value; -} - -void ExecGraphDetail::incrementCounter(const int msgId, - const std::string& key, - const int valueToIncrement) -{ - doIncrementCounter(msgId, key, valueToIncrement); -} - -void ExecGraphDetail::incrementCounterInternal(const int msgId, - const std::string& key, - const int valueToIncrement) -{ - if (faabric::util::isTestMode()) { - return; - } - - checkMessageLinked(msgId); - - intDetailsMap[key] += valueToIncrement; -} - -ExecGraphDetail& getExecGraphDetail() -{ - static thread_local ExecGraphDetail graphDetail; - return graphDetail; + stringMap[key] += valueToIncrement; } } diff --git a/src/util/func.cpp b/src/util/func.cpp index 58659af18..a08c220a0 100644 --- a/src/util/func.cpp +++ b/src/util/func.cpp @@ -88,6 +88,9 @@ std::shared_ptr messageFactoryShared( std::string thisHost = faabric::util::getSystemConfig().endpointHost; ptr->set_masterhost(thisHost); + + ptr->set_recordexecgraph(false); + return ptr; } diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index 4f1b02d9e..b115b6544 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -144,16 +144,48 @@ TEST_CASE_METHOD(MpiBaseTestFixture, checkExecGraphEquality(expected, actual); } +TEST_CASE("Test exec graph details", "[util][exec-graph]") +{ + faabric::Message msg = faabric::util::messageFactory("foo", "bar"); + std::string expectedKey = "foo"; + std::string expectedStringValue = "bar"; + int expectedIntValue = 1; + + // By default, recording is disabled + REQUIRE(msg.recordexecgraph() == false); + + // If we add a recording while disabled, nothing changes + faabric::util::exec_graph::incrementCounter( + msg, expectedKey, expectedIntValue); + faabric::util::exec_graph::addDetail(msg, expectedKey, expectedStringValue); + REQUIRE(msg.intexecgraphdetails_size() == 0); + REQUIRE(msg.execgraphdetails_size() == 0); + + // We can turn it on + msg.set_recordexecgraph(true); + + // We can add records either to a string or to an int map + faabric::util::exec_graph::incrementCounter( + msg, expectedKey, expectedIntValue); + faabric::util::exec_graph::addDetail(msg, expectedKey, expectedStringValue); + + // Both change the behaviour of the underlying message + REQUIRE(msg.intexecgraphdetails_size() == 1); + REQUIRE(msg.execgraphdetails_size() == 1); + REQUIRE(msg.intexecgraphdetails().count(expectedKey) == 1); + REQUIRE(msg.intexecgraphdetails().at(expectedKey) == expectedIntValue); + REQUIRE(msg.execgraphdetails().count(expectedKey) == 1); + REQUIRE(msg.execgraphdetails().at(expectedKey) == expectedStringValue); +} + +// TODO - test we can only get exec graph if msg set + TEST_CASE_METHOD(MpiTestFixture, "Test tracing the number of MPI messages", "[util][exec-graph]") { - // Disable test mode and set message flag to true - faabric::util::setTestMode(false); msg.set_recordexecgraph(true); - faabric::util::getExecGraphDetail().startRecording(msg); - // Send one message int rankA1 = 0; int rankA2 = 1; @@ -164,8 +196,7 @@ TEST_CASE_METHOD(MpiTestFixture, int numToSend = 10; std::string expectedKey = - faabric::util::ExecGraphDetail::mpiMsgCountPrefix + - std::to_string(rankA2); + faabric::util::exec_graph::mpiMsgCountPrefix + std::to_string(rankA2); for (int i = 0; i < numToSend; i++) { world.send(rankA1, @@ -177,13 +208,10 @@ TEST_CASE_METHOD(MpiTestFixture, rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); } - // Stop recording and check we have only recorded one message - faabric::util::getExecGraphDetail().stopRecording(msg); - REQUIRE(msg.execgraphdetails_size() == 1); - REQUIRE(msg.execgraphdetails(0).key() == expectedKey); - REQUIRE(msg.execgraphdetails(0).value() == std::to_string(numToSend)); - - faabric::util::setTestMode(true); + REQUIRE(msg.intexecgraphdetails_size() == 1); + REQUIRE(msg.execgraphdetails_size() == 0); + REQUIRE(msg.intexecgraphdetails().count(expectedKey) == 1); + REQUIRE(msg.intexecgraphdetails().at(expectedKey) == numToSend); } TEST_CASE_METHOD(MpiTestFixture, @@ -191,11 +219,8 @@ TEST_CASE_METHOD(MpiTestFixture, "[util][exec-graph]") { // Disable test mode and set message flag to true - faabric::util::setTestMode(false); msg.set_recordexecgraph(false); - faabric::util::getExecGraphDetail().startRecording(msg); - // Send one message int rankA1 = 0; int rankA2 = 1; @@ -206,8 +231,7 @@ TEST_CASE_METHOD(MpiTestFixture, int numToSend = 10; std::string expectedKey = - faabric::util::ExecGraphDetail::mpiMsgCountPrefix + - std::to_string(rankA2); + faabric::util::exec_graph::mpiMsgCountPrefix + std::to_string(rankA2); for (int i = 0; i < numToSend; i++) { world.send(rankA1, @@ -220,9 +244,7 @@ TEST_CASE_METHOD(MpiTestFixture, } // Stop recording and check we have recorded no message - faabric::util::getExecGraphDetail().stopRecording(msg); + REQUIRE(msg.intexecgraphdetails_size() == 0); REQUIRE(msg.execgraphdetails_size() == 0); - - faabric::util::setTestMode(true); } } From 46ab804514bd017876e408291339f599a437906c Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 29 Oct 2021 08:54:06 +0000 Subject: [PATCH 07/15] don't log chained calls if recording exec graph is not set --- src/scheduler/MpiWorld.cpp | 4 +++- tests/test/scheduler/test_exec_graph.cpp | 2 -- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index 956424db5..c60eca206 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -199,7 +199,9 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize) msg.set_mpirank(i + 1); msg.set_mpiworldsize(size); // Log chained functions to generate execution graphs - sch.logChainedFunction(call.id(), msg.id()); + if (thisMsg != nullptr && thisMsg->recordexecgraph()) { + sch.logChainedFunction(call.id(), msg.id()); + } } std::vector executedAt; diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index b115b6544..e70e692d9 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -178,8 +178,6 @@ TEST_CASE("Test exec graph details", "[util][exec-graph]") REQUIRE(msg.execgraphdetails().at(expectedKey) == expectedStringValue); } -// TODO - test we can only get exec graph if msg set - TEST_CASE_METHOD(MpiTestFixture, "Test tracing the number of MPI messages", "[util][exec-graph]") From 1b64990f9221850908bcfd57c69d8fa4644dc4ba Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 29 Oct 2021 08:57:31 +0000 Subject: [PATCH 08/15] quick test fix --- src/util/exec_graph.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/util/exec_graph.cpp b/src/util/exec_graph.cpp index 79a6ab944..6f9774f74 100644 --- a/src/util/exec_graph.cpp +++ b/src/util/exec_graph.cpp @@ -7,6 +7,10 @@ void addDetail(faabric::Message& msg, const std::string& key, const std::string& value) { + if (!msg.recordexecgraph()) { + return; + } + auto& stringMap = *msg.mutable_execgraphdetails(); stringMap[key] = value; @@ -16,6 +20,10 @@ void incrementCounter(faabric::Message& msg, const std::string& key, const int valueToIncrement) { + if (!msg.recordexecgraph()) { + return; + } + auto& stringMap = *msg.mutable_intexecgraphdetails(); stringMap[key] += valueToIncrement; From 02661a9427c0dc430e347d0c3a99718248e91f3e Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 29 Oct 2021 09:26:00 +0000 Subject: [PATCH 09/15] add serialisation for maps --- src/util/json.cpp | 28 ++++++++++++++++++++++++++++ tests/test/util/test_json.cpp | 8 ++++++++ 2 files changed, 36 insertions(+) diff --git a/src/util/json.cpp b/src/util/json.cpp index 605f933b6..007272c86 100644 --- a/src/util/json.cpp +++ b/src/util/json.cpp @@ -183,6 +183,34 @@ std::string messageToJson(const faabric::Message& msg) a); } + if (msg.recordexecgraph()) { + d.AddMember("record_exec_graph", msg.recordexecgraph(), a); + + if (msg.execgraphdetails_size() > 0) { + std::string out = ""; + const auto& map = msg.execgraphdetails(); + for (const auto& it : map) { + out = fmt::format("{},{}:{}", out, it.first, it.second); + } + + d.AddMember( + "exec_graph_detail", Value(out.c_str(), out.size()).Move(), a); + } + + if (msg.intexecgraphdetails_size() > 0) { + std::string out = ""; + const auto& map = msg.intexecgraphdetails(); + for (const auto& it : map) { + out = fmt::format( + "{},{}:{}", out, it.first, std::to_string(it.second)); + } + + d.AddMember("int_exec_graph_detail", + Value(out.c_str(), out.size()).Move(), + a); + } + } + StringBuffer sb; Writer writer(sb); d.Accept(writer); diff --git a/tests/test/util/test_json.cpp b/tests/test/util/test_json.cpp index 2e2ecfa5a..f8ac1766d 100644 --- a/tests/test/util/test_json.cpp +++ b/tests/test/util/test_json.cpp @@ -4,6 +4,8 @@ #include +#include + using namespace faabric::util; namespace tests { @@ -39,6 +41,12 @@ TEST_CASE("Test message to JSON round trip", "[util]") msg.set_sgxpolicy("test policy string"); msg.set_sgxresult("test result string"); + msg.set_recordexecgraph(true); + auto& map = *msg.mutable_execgraphdetails(); + map["foo"] = "bar"; + auto& intMap = *msg.mutable_intexecgraphdetails(); + intMap["foo"] = 0; + SECTION("Dodgy characters") { msg.set_inputdata("[0], %$ 2233 9"); } SECTION("Bytes") From 0103cbc2cb8ea91f88907c4fb77712c93f751601 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 29 Oct 2021 09:28:56 +0000 Subject: [PATCH 10/15] fix tests --- tests/test/scheduler/test_exec_graph.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index e70e692d9..3011b4f5b 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -13,7 +13,7 @@ using namespace scheduler; namespace tests { -TEST_CASE("Test execution graph", "[scheduler]") +TEST_CASE("Test execution graph", "[scheduler][exec-graph]") { faabric::Message msgA = faabric::util::messageFactory("demo", "echo"); faabric::Message msgB1 = faabric::util::messageFactory("demo", "echo"); @@ -74,10 +74,11 @@ TEST_CASE("Test execution graph", "[scheduler]") TEST_CASE_METHOD(MpiBaseTestFixture, "Test MPI execution graph", - "[mpi][scheduler]") + "[mpi][scheduler][exec-graph]") { faabric::scheduler::MpiWorld world; msg.set_ismpi(true); + msg.set_recordexecgraph(true); // Update the result for the master message sch.setFunctionResult(msg); From 534ae9c08b405202968457e4993c095360e4a836 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 29 Oct 2021 10:18:08 +0000 Subject: [PATCH 11/15] self-review --- src/scheduler/MpiWorld.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index c60eca206..e36ac22c1 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -251,6 +251,8 @@ void MpiWorld::destroy() { SPDLOG_TRACE("Destroying MPI world {}", id); + // Note that all ranks will call this function. + // We must force the destructors for all message endpoints to run here // rather than at the end of their global thread-local lifespan. If we // don't, the ZMQ shutdown can hang. From 163ea2819a299dd78dfff3196fd03a4d6aebca38 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 29 Oct 2021 14:59:20 +0000 Subject: [PATCH 12/15] refactor thread local message's name --- src/scheduler/MpiWorld.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index e36ac22c1..e88ab3386 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -35,7 +35,7 @@ static thread_local std::unordered_map< ranksSendEndpoints; // Id of the message that created this thread-local instance -static thread_local faabric::Message* thisMsg = nullptr; +static thread_local faabric::Message* thisRankMsg = nullptr; // This is used for mocking in tests static std::vector rankMessages; @@ -181,7 +181,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize) id = newId; user = call.user(); function = call.function(); - thisMsg = &call; + thisRankMsg = &call; size = newSize; @@ -199,7 +199,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize) msg.set_mpirank(i + 1); msg.set_mpiworldsize(size); // Log chained functions to generate execution graphs - if (thisMsg != nullptr && thisMsg->recordexecgraph()) { + if (thisRankMsg != nullptr && thisRankMsg->recordexecgraph()) { sch.logChainedFunction(call.id(), msg.id()); } } @@ -299,7 +299,7 @@ void MpiWorld::initialiseFromMsg(faabric::Message& msg) user = msg.user(); function = msg.function(); size = msg.mpiworldsize(); - thisMsg = &msg; + thisRankMsg = &msg; // Block until we receive faabric::MpiHostsToRanksMessage hostRankMsg = recvMpiHostRankMsg(); @@ -582,9 +582,9 @@ void MpiWorld::send(int sendRank, } // If the message is set and recording on, track we have sent this message - if (thisMsg != nullptr && thisMsg->recordexecgraph()) { + if (thisRankMsg != nullptr && thisRankMsg->recordexecgraph()) { faabric::util::exec_graph::incrementCounter( - *thisMsg, + *thisRankMsg, faabric::util::exec_graph::mpiMsgCountPrefix + std::to_string(recvRank)); } From d6c3b1aed8c443ec21cd27bad49a5365268b1a00 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 29 Oct 2021 15:08:22 +0000 Subject: [PATCH 13/15] move mpi exec graph tests to separate file --- tests/test/scheduler/test_exec_graph.cpp | 68 ----------------- tests/test/scheduler/test_mpi_exec_graph.cpp | 77 ++++++++++++++++++++ 2 files changed, 77 insertions(+), 68 deletions(-) create mode 100644 tests/test/scheduler/test_mpi_exec_graph.cpp diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index 3011b4f5b..e6a7e5bef 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -178,72 +178,4 @@ TEST_CASE("Test exec graph details", "[util][exec-graph]") REQUIRE(msg.execgraphdetails().count(expectedKey) == 1); REQUIRE(msg.execgraphdetails().at(expectedKey) == expectedStringValue); } - -TEST_CASE_METHOD(MpiTestFixture, - "Test tracing the number of MPI messages", - "[util][exec-graph]") -{ - msg.set_recordexecgraph(true); - - // Send one message - int rankA1 = 0; - int rankA2 = 1; - MPI_Status status{}; - - std::vector messageData = { 0, 1, 2 }; - auto buffer = new int[messageData.size()]; - - int numToSend = 10; - std::string expectedKey = - faabric::util::exec_graph::mpiMsgCountPrefix + std::to_string(rankA2); - - for (int i = 0; i < numToSend; i++) { - world.send(rankA1, - rankA2, - BYTES(messageData.data()), - MPI_INT, - messageData.size()); - world.recv( - rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); - } - - REQUIRE(msg.intexecgraphdetails_size() == 1); - REQUIRE(msg.execgraphdetails_size() == 0); - REQUIRE(msg.intexecgraphdetails().count(expectedKey) == 1); - REQUIRE(msg.intexecgraphdetails().at(expectedKey) == numToSend); -} - -TEST_CASE_METHOD(MpiTestFixture, - "Test tracing is disabled if flag in message not set", - "[util][exec-graph]") -{ - // Disable test mode and set message flag to true - msg.set_recordexecgraph(false); - - // Send one message - int rankA1 = 0; - int rankA2 = 1; - MPI_Status status{}; - - std::vector messageData = { 0, 1, 2 }; - auto buffer = new int[messageData.size()]; - - int numToSend = 10; - std::string expectedKey = - faabric::util::exec_graph::mpiMsgCountPrefix + std::to_string(rankA2); - - for (int i = 0; i < numToSend; i++) { - world.send(rankA1, - rankA2, - BYTES(messageData.data()), - MPI_INT, - messageData.size()); - world.recv( - rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); - } - - // Stop recording and check we have recorded no message - REQUIRE(msg.intexecgraphdetails_size() == 0); - REQUIRE(msg.execgraphdetails_size() == 0); -} } diff --git a/tests/test/scheduler/test_mpi_exec_graph.cpp b/tests/test/scheduler/test_mpi_exec_graph.cpp new file mode 100644 index 000000000..4bfe17b0b --- /dev/null +++ b/tests/test/scheduler/test_mpi_exec_graph.cpp @@ -0,0 +1,77 @@ +#include + +#include "faabric_utils.h" + +#include +#include +#include + +namespace tests { +TEST_CASE_METHOD(MpiTestFixture, + "Test tracing the number of MPI messages", + "[util][exec-graph]") +{ + msg.set_recordexecgraph(true); + + // Send one message + int rankA1 = 0; + int rankA2 = 1; + MPI_Status status{}; + + std::vector messageData = { 0, 1, 2 }; + auto buffer = new int[messageData.size()]; + + int numToSend = 10; + std::string expectedKey = + faabric::util::exec_graph::mpiMsgCountPrefix + std::to_string(rankA2); + + for (int i = 0; i < numToSend; i++) { + world.send(rankA1, + rankA2, + BYTES(messageData.data()), + MPI_INT, + messageData.size()); + world.recv( + rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); + } + + REQUIRE(msg.intexecgraphdetails_size() == 1); + REQUIRE(msg.execgraphdetails_size() == 0); + REQUIRE(msg.intexecgraphdetails().count(expectedKey) == 1); + REQUIRE(msg.intexecgraphdetails().at(expectedKey) == numToSend); +} + +TEST_CASE_METHOD(MpiTestFixture, + "Test tracing is disabled if flag in message not set", + "[util][exec-graph]") +{ + // Disable test mode and set message flag to true + msg.set_recordexecgraph(false); + + // Send one message + int rankA1 = 0; + int rankA2 = 1; + MPI_Status status{}; + + std::vector messageData = { 0, 1, 2 }; + auto buffer = new int[messageData.size()]; + + int numToSend = 10; + std::string expectedKey = + faabric::util::exec_graph::mpiMsgCountPrefix + std::to_string(rankA2); + + for (int i = 0; i < numToSend; i++) { + world.send(rankA1, + rankA2, + BYTES(messageData.data()), + MPI_INT, + messageData.size()); + world.recv( + rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); + } + + // Stop recording and check we have recorded no message + REQUIRE(msg.intexecgraphdetails_size() == 0); + REQUIRE(msg.execgraphdetails_size() == 0); +} +} From 11ec3fc7199e5e92779ec8bc25bbab6ff80d5ed4 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 29 Oct 2021 15:09:06 +0000 Subject: [PATCH 14/15] add checks for serialisation/deserialisation --- src/util/json.cpp | 97 ++++++++++++++++++++++++++++++++--- tests/test/util/test_json.cpp | 1 + tests/utils/faabric_utils.h | 16 ++++++ tests/utils/message_utils.cpp | 5 ++ 4 files changed, 112 insertions(+), 7 deletions(-) diff --git a/src/util/json.cpp b/src/util/json.cpp index 007272c86..bdcb06676 100644 --- a/src/util/json.cpp +++ b/src/util/json.cpp @@ -8,6 +8,8 @@ #include +#include + using namespace rapidjson; namespace faabric::util { @@ -187,24 +189,35 @@ std::string messageToJson(const faabric::Message& msg) d.AddMember("record_exec_graph", msg.recordexecgraph(), a); if (msg.execgraphdetails_size() > 0) { - std::string out = ""; + std::stringstream ss; const auto& map = msg.execgraphdetails(); - for (const auto& it : map) { - out = fmt::format("{},{}:{}", out, it.first, it.second); + auto it = map.begin(); + while (it != map.end()) { + ss << fmt::format("{}:{}", it->first, it->second); + ++it; + if (it != map.end()) { + ss << ","; + } } + std::string out = ss.str(); d.AddMember( "exec_graph_detail", Value(out.c_str(), out.size()).Move(), a); } if (msg.intexecgraphdetails_size() > 0) { - std::string out = ""; + std::stringstream ss; const auto& map = msg.intexecgraphdetails(); - for (const auto& it : map) { - out = fmt::format( - "{},{}:{}", out, it.first, std::to_string(it.second)); + auto it = map.begin(); + while (it != map.end()) { + ss << fmt::format("{}:{}", it->first, it->second); + ++it; + if (it != map.end()) { + ss << ","; + } } + std::string out = ss.str(); d.AddMember("int_exec_graph_detail", Value(out.c_str(), out.size()).Move(), a); @@ -294,6 +307,54 @@ std::string getStringFromJson(Document& doc, return std::string(valuePtr, valuePtr + it->value.GetStringLength()); } +std::map getStringStringMapFromJson( + Document& doc, + const std::string& key) +{ + std::map map; + + Value::MemberIterator it = doc.FindMember(key.c_str()); + if (it == doc.MemberEnd()) { + return map; + } + + const char* valuePtr = it->value.GetString(); + std::stringstream ss( + std::string(valuePtr, valuePtr + it->value.GetStringLength())); + std::string keyVal; + while (std::getline(ss, keyVal, ',')) { + auto pos = keyVal.find(":"); + std::string key = keyVal.substr(0, pos); + map[key] = keyVal.erase(0, pos + sizeof(char)); + } + + return map; +} + +std::map getStringIntMapFromJson(Document& doc, + const std::string& key) +{ + std::map map; + + Value::MemberIterator it = doc.FindMember(key.c_str()); + if (it == doc.MemberEnd()) { + return map; + } + + const char* valuePtr = it->value.GetString(); + std::stringstream ss( + std::string(valuePtr, valuePtr + it->value.GetStringLength())); + std::string keyVal; + while (std::getline(ss, keyVal, ',')) { + auto pos = keyVal.find(":"); + std::string key = keyVal.substr(0, pos); + int val = std::stoi(keyVal.erase(0, pos + sizeof(char))); + map[key] = val; + } + + return map; +} + faabric::Message jsonToMessage(const std::string& jsonIn) { PROF_START(jsonDecode) @@ -352,6 +413,28 @@ faabric::Message jsonToMessage(const std::string& jsonIn) msg.set_sgxpolicy(getStringFromJson(d, "sgxpolicy", "")); msg.set_sgxresult(getStringFromJson(d, "sgxresult", "")); + msg.set_recordexecgraph(getBoolFromJson(d, "record_exec_graph", false)); + + // By default, clear the map + msg.clear_execgraphdetails(); + // Fill keypairs if found + auto& msgStrMap = *msg.mutable_execgraphdetails(); + std::map strMap = + getStringStringMapFromJson(d, "exec_graph_detail"); + for (auto& it : strMap) { + msgStrMap[it.first] = it.second; + } + + // By default, clear the map + msg.clear_intexecgraphdetails(); + // Fill keypairs if found + auto& msgIntMap = *msg.mutable_intexecgraphdetails(); + std::map intMap = + getStringIntMapFromJson(d, "int_exec_graph_detail"); + for (auto& it : intMap) { + msgIntMap[it.first] = it.second; + } + PROF_END(jsonDecode) return msg; diff --git a/tests/test/util/test_json.cpp b/tests/test/util/test_json.cpp index f8ac1766d..df3248ae3 100644 --- a/tests/test/util/test_json.cpp +++ b/tests/test/util/test_json.cpp @@ -61,6 +61,7 @@ TEST_CASE("Test message to JSON round trip", "[util]") REQUIRE(msg.timestamp() > 0); std::string jsonString = faabric::util::messageToJson(msg); + SPDLOG_INFO("{}", jsonString); faabric::Message actual = faabric::util::jsonToMessage(jsonString); diff --git a/tests/utils/faabric_utils.h b/tests/utils/faabric_utils.h index 3a9ea939c..b6c50c5a2 100644 --- a/tests/utils/faabric_utils.h +++ b/tests/utils/faabric_utils.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "fixtures.h" #include @@ -63,6 +65,20 @@ using namespace faabric; namespace tests { void cleanFaabric(); +template +void checkMessageMapEquality(T mapA, T mapB) +{ + REQUIRE(mapA.size() == mapB.size()); + auto itA = mapA.begin(); + auto itB = mapB.begin(); + while (itA != mapA.end() && itB != mapB.end()) { + REQUIRE(itA->first == itB->first); + REQUIRE(itA->second == itB->second); + itA++; + itB++; + } +} + void checkMessageEquality(const faabric::Message& msgA, const faabric::Message& msgB); diff --git a/tests/utils/message_utils.cpp b/tests/utils/message_utils.cpp index d441d26b8..58924958c 100644 --- a/tests/utils/message_utils.cpp +++ b/tests/utils/message_utils.cpp @@ -47,5 +47,10 @@ void checkMessageEquality(const faabric::Message& msgA, REQUIRE(msgA.sgxtag() == msgB.sgxtag()); REQUIRE(msgA.sgxpolicy() == msgB.sgxpolicy()); REQUIRE(msgA.sgxresult() == msgB.sgxresult()); + + REQUIRE(msgA.recordexecgraph() == msgB.recordexecgraph()); + checkMessageMapEquality(msgA.execgraphdetails(), msgB.execgraphdetails()); + checkMessageMapEquality(msgA.intexecgraphdetails(), + msgB.intexecgraphdetails()); } } From 80e55215110a9ef468868bfe1eba801719d080bc Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 29 Oct 2021 15:37:16 +0000 Subject: [PATCH 15/15] cleanup --- src/scheduler/MpiWorld.cpp | 1 + tests/test/scheduler/test_exec_graph.cpp | 1 + tests/test/util/test_json.cpp | 1 - 3 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index e88ab3386..ab814f4d5 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -201,6 +201,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize) // Log chained functions to generate execution graphs if (thisRankMsg != nullptr && thisRankMsg->recordexecgraph()) { sch.logChainedFunction(call.id(), msg.id()); + msg.set_recordexecgraph(true); } } diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index e6a7e5bef..b1619b171 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -98,6 +98,7 @@ TEST_CASE_METHOD(MpiBaseTestFixture, messages.at(rank).set_mpiworldid(worldId); messages.at(rank).set_mpirank(rank); messages.at(rank).set_mpiworldsize(worldSize); + messages.at(rank).set_recordexecgraph(true); } world.create(msg, worldId, worldSize); diff --git a/tests/test/util/test_json.cpp b/tests/test/util/test_json.cpp index df3248ae3..f8ac1766d 100644 --- a/tests/test/util/test_json.cpp +++ b/tests/test/util/test_json.cpp @@ -61,7 +61,6 @@ TEST_CASE("Test message to JSON round trip", "[util]") REQUIRE(msg.timestamp() > 0); std::string jsonString = faabric::util::messageToJson(msg); - SPDLOG_INFO("{}", jsonString); faabric::Message actual = faabric::util::jsonToMessage(jsonString);