diff --git a/include/faabric/util/exec_graph.h b/include/faabric/util/exec_graph.h new file mode 100644 index 000000000..4da411a6f --- /dev/null +++ b/include/faabric/util/exec_graph.h @@ -0,0 +1,56 @@ +#pragma once + +#include + +#include +#include +#include + +namespace faabric::util { +class ExecGraphDetail +{ + public: + void startRecording(const faabric::Message& msg); + + void stopRecording(faabric::Message& msg); + + void addDetail(const int msgId, + const std::string& key, + const std::string& value); + + void incrementCounter(const int msgId, + const std::string& key, + const int valueToIncrement = 1); + + static inline std::string const mpiMsgCountPrefix = "mpi-msgcount-torank-"; + + private: + std::shared_ptr linkedMsg = nullptr; + + std::map detailsMap; + + std::map intDetailsMap; + + void checkMessageLinked(const int msgId); + + void checkMessageNotLinked(); + + // ----- Wrappers to no-op the functions if not recording ----- + + std::function + doAddDetail; + + void addDetailInternal(const int msgId, + const std::string& key, + const std::string& value); + + std::function + doIncrementCounter; + + void incrementCounterInternal(const int msgId, + const std::string& key, + const int valueToIncrement); +}; + +ExecGraphDetail& getExecGraphDetail(); +} diff --git a/include/faabric/util/tracing.h b/include/faabric/util/tracing.h deleted file mode 100644 index 9d1b66c10..000000000 --- a/include/faabric/util/tracing.h +++ /dev/null @@ -1,41 +0,0 @@ -#pragma once - -#include - -#include -#include - -namespace faabric::util::tracing { -enum RecordType -{ - MpiPerRankMessageCount -}; - -class CallRecords -{ - public: - void startRecording(const faabric::Message& msg); - - void stopRecording(faabric::Message& msg); - - void addRecord(int msgId, RecordType recordType, int idToIncrement); - - private: - std::shared_ptr linkedMsg = nullptr; - - std::list onGoingRecordings; - - void checkMessageLinked(int msgId); - - void checkMessageNotLinked(); - - void loadRecordsToMessage(faabric::CallRecords& callRecords, - const RecordType& recordType); - - // ----- Per record type data structures ----- - - std::map perRankMsgCount; -}; - -CallRecords& getCallRecords(); -} diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index bae9a933c..97c3d1569 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -12,7 +13,6 @@ #include #include #include -#include #define POOL_SHUTDOWN -1 @@ -228,7 +228,7 @@ void Executor::threadPoolThread(int threadPoolIdx) isThreads); // Start recording calls in non-release builds - faabric::util::tracing::getCallRecords().startRecording(msg); + faabric::util::getExecGraphDetail().startRecording(msg); int32_t returnValue; try { @@ -247,7 +247,7 @@ void Executor::threadPoolThread(int threadPoolIdx) msg.set_returnvalue(returnValue); // Stop recording calls - faabric::util::tracing::getCallRecords().stopRecording(msg); + faabric::util::getExecGraphDetail().stopRecording(msg); // Decrement the task count int oldTaskCount = task.batchCounter->fetch_sub(1); diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index e58740d2c..653060e40 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -1,12 +1,12 @@ #include #include #include +#include #include #include #include #include #include -#include // Each MPI rank runs in a separate thread, thus we use TLS to maintain the // per-rank data structures @@ -581,10 +581,10 @@ void MpiWorld::send(int sendRank, } // In non-release builds, track that we have sent this message - faabric::util::tracing::getCallRecords().addRecord( + faabric::util::getExecGraphDetail().incrementCounter( thisMsgId, - faabric::util::tracing::RecordType::MpiPerRankMessageCount, - recvRank); + faabric::util::ExecGraphDetail::mpiMsgCountPrefix + + std::to_string(recvRank)); } void MpiWorld::recv(int sendRank, diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 7d1110cdf..d155313b4 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -7,6 +7,7 @@ faabric_lib(util crash.cpp delta.cpp environment.cpp + exec_graph.cpp files.cpp func.cpp gids.cpp @@ -21,7 +22,6 @@ faabric_lib(util snapshot.cpp state.cpp string_tools.cpp - tracing.cpp timing.cpp testing.cpp ) diff --git a/src/util/exec_graph.cpp b/src/util/exec_graph.cpp new file mode 100644 index 000000000..8bbff232e --- /dev/null +++ b/src/util/exec_graph.cpp @@ -0,0 +1,150 @@ +#include +#include +#include + +namespace faabric::util { +void ExecGraphDetail::startRecording(const faabric::Message& msg) +{ + // In the tests there's not a thread to message mapping as we sometimes + // spawn extra threads to mock work. Thus, we skip this check here. + if (faabric::util::isTestMode()) { + return; + } + + checkMessageNotLinked(); + + linkedMsg = std::make_shared(msg); + + if (!msg.recordexecgraph()) { + doAddDetail = + [](const int, const std::string&, const std::string&) -> void { ; }; + doIncrementCounter = + [](const int, const std::string&, const int) -> void { ; }; + } else { + doAddDetail = [this](const int msgId, + const std::string& key, + const std::string& value) -> void { + this->addDetailInternal(msgId, key, value); + }; + doIncrementCounter = [this](const int msgId, + const std::string& key, + const int valueToIncrement) -> void { + this->incrementCounterInternal(msgId, key, valueToIncrement); + }; + } +} + +void ExecGraphDetail::stopRecording(faabric::Message& msg) +{ + // In the tests there's not a thread to message mapping as we sometimes + // spawn extra threads to mock work. Thus, we skip this check here. + if (faabric::util::isTestMode()) { + return; + } + + checkMessageLinked(msg.id()); + + for (const auto& it : detailsMap) { + faabric::ExecGraphDetail detail; + detail.set_key(it.first); + detail.set_value(it.second); + *msg.add_execgraphdetails() = detail; + SPDLOG_TRACE("Adding exec. graph detail to message. id: {} ; {}->{}", + linkedMsg->id(), + it.first, + it.second); + } + + for (const auto& it : intDetailsMap) { + if (detailsMap.find(it.first) != detailsMap.end()) { + SPDLOG_WARN( + "Replicated key in the exec graph details: {}->{} and {}->{}", + it.first, + detailsMap.at(it.first), + it.first, + it.second); + } + + faabric::ExecGraphDetail detail; + detail.set_key(it.first); + detail.set_value(std::to_string(it.second)); + *msg.add_execgraphdetails() = detail; + SPDLOG_TRACE("Adding exec. graph detail to message. id: {} ; {}->{}", + linkedMsg->id(), + it.first, + it.second); + } + + linkedMsg = nullptr; + detailsMap.clear(); + intDetailsMap.clear(); +} + +void ExecGraphDetail::checkMessageLinked(const int msgId) +{ + if (linkedMsg == nullptr || linkedMsg->id() != msgId) { + SPDLOG_ERROR("Error during recording, records not linked to the right" + " message: (linked: {} != provided: {})", + linkedMsg == nullptr ? "nullptr" + : std::to_string(linkedMsg->id()), + msgId); + throw std::runtime_error("CallRecords linked to a different message"); + } +} + +void ExecGraphDetail::checkMessageNotLinked() +{ + if (linkedMsg != nullptr) { + SPDLOG_ERROR("Error starting recording, record already linked to" + "another message: {}", + linkedMsg->id()); + throw std::runtime_error("CallRecords linked to a different message"); + } +} + +void ExecGraphDetail::addDetail(const int msgId, + const std::string& key, + const std::string& value) +{ + doAddDetail(msgId, key, value); +} + +void ExecGraphDetail::addDetailInternal(const int msgId, + const std::string& key, + const std::string& value) +{ + if (faabric::util::isTestMode()) { + return; + } + + checkMessageLinked(msgId); + + detailsMap[key] = value; +} + +void ExecGraphDetail::incrementCounter(const int msgId, + const std::string& key, + const int valueToIncrement) +{ + doIncrementCounter(msgId, key, valueToIncrement); +} + +void ExecGraphDetail::incrementCounterInternal(const int msgId, + const std::string& key, + const int valueToIncrement) +{ + if (faabric::util::isTestMode()) { + return; + } + + checkMessageLinked(msgId); + + intDetailsMap[key] += valueToIncrement; +} + +ExecGraphDetail& getExecGraphDetail() +{ + static thread_local ExecGraphDetail graphDetail; + return graphDetail; +} +} diff --git a/src/util/func.cpp b/src/util/func.cpp index 0bef897d7..58659af18 100644 --- a/src/util/func.cpp +++ b/src/util/func.cpp @@ -103,6 +103,8 @@ faabric::Message messageFactory(const std::string& user, std::string thisHost = faabric::util::getSystemConfig().endpointHost; msg.set_masterhost(thisHost); + msg.set_recordexecgraph(false); + return msg; } diff --git a/src/util/tracing.cpp b/src/util/tracing.cpp deleted file mode 100644 index 5f8b53b32..000000000 --- a/src/util/tracing.cpp +++ /dev/null @@ -1,142 +0,0 @@ -#include -#include -#include - -namespace faabric::util::tracing { -void CallRecords::startRecording(const faabric::Message& msg) -{ -#ifndef NDEBUG - // In the tests there's not a thread to message mapping as we sometimes - // spawn extra threads to mock work. Thus, we skip this check here. - if (faabric::util::isTestMode()) { - return; - } - - checkMessageNotLinked(); - - linkedMsg = std::make_shared(msg); -#else - ; -#endif -} - -void CallRecords::stopRecording(faabric::Message& msg) -{ -#ifndef NDEBUG - // In the tests there's not a thread to message mapping as we sometimes - // spawn extra threads to mock work. Thus, we skip this check here. - if (faabric::util::isTestMode()) { - return; - } - - checkMessageLinked(msg.id()); - - linkedMsg = nullptr; - - // Update the actual faabric message - faabric::CallRecords recordsMsg; - for (const auto& recordType : onGoingRecordings) { - loadRecordsToMessage(recordsMsg, recordType); - } - - // Update the original message - *msg.mutable_records() = recordsMsg; -#else - ; -#endif -} - -void CallRecords::checkMessageLinked(int msgId) -{ - if (linkedMsg == nullptr || linkedMsg->id() != msgId) { - SPDLOG_ERROR("Error during recording, records not linked to the right" - " message: (linked: {} != provided: {})", - linkedMsg == nullptr ? "nullptr" - : std::to_string(linkedMsg->id()), - msgId); - throw std::runtime_error("CallRecords linked to a different message"); - } -} - -void CallRecords::checkMessageNotLinked() -{ - if (linkedMsg != nullptr) { - SPDLOG_ERROR("Error starting recording, record already linked to" - "another message: {}", - linkedMsg->id()); - throw std::runtime_error("CallRecords linked to a different message"); - } -} - -void CallRecords::loadRecordsToMessage(faabric::CallRecords& callRecords, - const RecordType& recordType) -{ -#ifndef NDEBUG - switch (recordType) { - case (faabric::util::tracing::RecordType::MpiPerRankMessageCount): { - faabric::MpiPerRankMessageCount msgCount; - - for (const auto& it : perRankMsgCount) { - msgCount.add_ranks(it.first); - msgCount.add_nummessages(it.second); - } - - *callRecords.mutable_mpimsgcount() = msgCount; - break; - } - default: { - SPDLOG_ERROR("Unsupported record type: {}", recordType); - throw std::runtime_error("Unsupported record type"); - } - } -#else - ; -#endif -} - -void CallRecords::addRecord(int msgId, RecordType recordType, int idToIncrement) -{ -#ifndef NDEBUG - if (faabric::util::isTestMode()) { - return; - } - - checkMessageLinked(msgId); - - // Add the record to the list of on going records if it is not there - bool mustInit = false; - auto it = - std::find(onGoingRecordings.begin(), onGoingRecordings.end(), recordType); - if (it == onGoingRecordings.end()) { - onGoingRecordings.push_back(recordType); - mustInit = true; - } - - // Finally increment the corresponding record list - switch (recordType) { - case (faabric::util::tracing::RecordType::MpiPerRankMessageCount): { - if (mustInit) { - for (int i = 0; i < linkedMsg->mpiworldsize(); i++) { - perRankMsgCount[i] = 0; - } - } - - ++perRankMsgCount.at(idToIncrement); - break; - } - default: { - SPDLOG_ERROR("Unsupported record type: {}", recordType); - throw std::runtime_error("Unsupported record type"); - } - } -#else - ; -#endif -} - -CallRecords& getCallRecords() -{ - static thread_local CallRecords callRecords; - return callRecords; -} -} diff --git a/tests/test/scheduler/test_exec_graph.cpp b/tests/test/scheduler/test_exec_graph.cpp index 755053ce6..060382f85 100644 --- a/tests/test/scheduler/test_exec_graph.cpp +++ b/tests/test/scheduler/test_exec_graph.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include using namespace scheduler; @@ -142,4 +143,86 @@ TEST_CASE_METHOD(MpiBaseTestFixture, checkExecGraphEquality(expected, actual); } + +TEST_CASE_METHOD(MpiTestFixture, + "Test tracing the number of MPI messages", + "[util][exec-graph]") +{ + // Disable test mode and set message flag to true + faabric::util::setTestMode(false); + msg.set_recordexecgraph(true); + + faabric::util::getExecGraphDetail().startRecording(msg); + + // Send one message + int rankA1 = 0; + int rankA2 = 1; + MPI_Status status{}; + + std::vector messageData = { 0, 1, 2 }; + auto buffer = new int[messageData.size()]; + + int numToSend = 10; + std::string expectedKey = + faabric::util::ExecGraphDetail::mpiMsgCountPrefix + + std::to_string(rankA2); + + for (int i = 0; i < numToSend; i++) { + world.send(rankA1, + rankA2, + BYTES(messageData.data()), + MPI_INT, + messageData.size()); + world.recv( + rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); + } + + // Stop recording and check we have only recorded one message + faabric::util::getExecGraphDetail().stopRecording(msg); + REQUIRE(msg.execgraphdetails_size() == 1); + REQUIRE(msg.execgraphdetails(0).key() == expectedKey); + REQUIRE(msg.execgraphdetails(0).value() == std::to_string(numToSend)); + + faabric::util::setTestMode(true); +} + +TEST_CASE_METHOD(MpiTestFixture, + "Test tracing is disabled if flag in message not set", + "[util][exec-graph]") +{ + // Disable test mode and set message flag to true + faabric::util::setTestMode(false); + msg.set_recordexecgraph(false); + + faabric::util::getExecGraphDetail().startRecording(msg); + + // Send one message + int rankA1 = 0; + int rankA2 = 1; + MPI_Status status{}; + + std::vector messageData = { 0, 1, 2 }; + auto buffer = new int[messageData.size()]; + + int numToSend = 10; + std::string expectedKey = + faabric::util::ExecGraphDetail::mpiMsgCountPrefix + + std::to_string(rankA2); + + for (int i = 0; i < numToSend; i++) { + world.send(rankA1, + rankA2, + BYTES(messageData.data()), + MPI_INT, + messageData.size()); + world.recv( + rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); + } + + // Stop recording and check we have only recorded one message + faabric::util::getExecGraphDetail().stopRecording(msg); + REQUIRE(msg.execgraphdetails_size() == 0); + + faabric::util::setTestMode(true); +} } diff --git a/tests/test/util/test_tracing.cpp b/tests/test/util/test_tracing.cpp deleted file mode 100644 index ee997dd7c..000000000 --- a/tests/test/util/test_tracing.cpp +++ /dev/null @@ -1,55 +0,0 @@ -#include - -#include "faabric_utils.h" - -#include -#include -#include -#include - -namespace tests { -TEST_CASE_METHOD(MpiTestFixture, - "Test tracing the number of MPI messages", - "[util][tracing]") -{ - // Disable test mode to test tracing - faabric::util::setTestMode(false); - - faabric::util::tracing::getCallRecords().startRecording(msg); - - // Send one message - int rankA1 = 0; - int rankA2 = 1; - MPI_Status status{}; - - std::vector messageData = { 0, 1, 2 }; - auto buffer = new int[messageData.size()]; - - int numToSend = 10; - - for (int i = 0; i < numToSend; i++) { - world.send(rankA1, - rankA2, - BYTES(messageData.data()), - MPI_INT, - messageData.size()); - world.recv( - rankA1, rankA2, BYTES(buffer), MPI_INT, messageData.size(), &status); - } - - // Stop recording and check we have only recorded one message - faabric::util::tracing::getCallRecords().stopRecording(msg); - REQUIRE(msg.has_records()); - REQUIRE(msg.records().has_mpimsgcount()); - REQUIRE(msg.records().mpimsgcount().ranks_size() == worldSize); - for (int i = 0; i < worldSize; i++) { - if (i == rankA2) { - REQUIRE(msg.records().mpimsgcount().nummessages(i) == numToSend); - } else { - REQUIRE(msg.records().mpimsgcount().nummessages(i) == 0); - } - } - - faabric::util::setTestMode(true); -} -}