From d96e900cae61f2d8f0764cb33b990466f596cefe Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Tue, 21 Sep 2021 15:17:14 -0500 Subject: [PATCH 1/8] Protect storage accounting UDP messages from NaN NaN's were being reported from the values computed using sqrt. This most likely was from the different variables not being updated atomically together. --- .../src/StatisticsSenderService.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Utilities/StorageFactory/src/StatisticsSenderService.cc b/Utilities/StorageFactory/src/StatisticsSenderService.cc index b627a8a5067b0..660b8fd43bc22 100644 --- a/Utilities/StorageFactory/src/StatisticsSenderService.cc +++ b/Utilities/StorageFactory/src/StatisticsSenderService.cc @@ -77,9 +77,9 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { double single_sum = read_single_bytes - m_read_single_bytes; double single_average = single_sum / static_cast(single_op_count); os << "\"read_single_sigma\":" - << sqrt((static_cast(read_single_square - m_read_single_square) - - single_average * single_average * single_op_count) / - static_cast(single_op_count)) + << sqrt(std::abs((static_cast(read_single_square - m_read_single_square) - + single_average * single_average * single_op_count) / + static_cast(single_op_count))) << ", "; os << "\"read_single_average\":" << single_average << ", "; } @@ -90,17 +90,17 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { static_cast(read_vector_bytes - m_read_vector_bytes) / static_cast(vector_op_count); os << "\"read_vector_average\":" << vector_average << ", "; os << "\"read_vector_sigma\":" - << sqrt((static_cast(read_vector_square - m_read_vector_square) - - vector_average * vector_average * vector_op_count) / - static_cast(vector_op_count)) + << sqrt(std::abs((static_cast(read_vector_square - m_read_vector_square) - + vector_average * vector_average * vector_op_count) / + static_cast(vector_op_count))) << ", "; double vector_count_average = static_cast(read_vector_count_sum - m_read_vector_count_sum) / static_cast(vector_op_count); os << "\"read_vector_count_average\":" << vector_count_average << ", "; os << "\"read_vector_count_sigma\":" - << sqrt((static_cast(read_vector_count_square - m_read_vector_count_square) - - vector_count_average * vector_count_average * vector_op_count) / - static_cast(vector_op_count)) + << sqrt(std::abs((static_cast(read_vector_count_square - m_read_vector_count_square) - + vector_count_average * vector_count_average * vector_op_count) / + static_cast(vector_op_count))) << ", "; } m_read_vector_square = read_vector_square; From 751e6a0dd15d9ed30cb80851595aa572af264c80 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 30 Sep 2021 11:18:22 -0500 Subject: [PATCH 2/8] Report only once when trying to open file Previously, each try to open the file using a different PFN would report an open attempt for the same LFN. This meant we could have multiple opens but only one close for a given LFN. --- IOPool/Input/src/RootInputFileSequence.cc | 89 +++++++++++------------ 1 file changed, 44 insertions(+), 45 deletions(-) diff --git a/IOPool/Input/src/RootInputFileSequence.cc b/IOPool/Input/src/RootInputFileSequence.cc index 4cd78e40909cd..6520a4e4ddb6e 100644 --- a/IOPool/Input/src/RootInputFileSequence.cc +++ b/IOPool/Input/src/RootInputFileSequence.cc @@ -224,58 +224,57 @@ namespace edm { std::shared_ptr filePtr; std::list originalInfo; - try { + { std::unique_ptr sentry( - input ? std::make_unique(*input, lfn_, usedFallback_) : nullptr); - std::unique_ptr name(gSystem->ExpandPathName(fileName().c_str())); - ; - filePtr = std::make_shared(name.get(), " Initiating request to open file ", inputType); - } catch (cms::Exception const& e) { - if (!skipBadFiles) { - if (hasFallbackUrl) { - std::ostringstream out; - out << e.explainSelf(); - - std::unique_ptr name(gSystem->ExpandPathName(fallbackFileName().c_str())); - std::string pfn(name.get()); - InputFile::reportFallbackAttempt(pfn, logicalFileName(), out.str()); - originalInfo = e.additionalInfo(); - } else { - InputFile::reportSkippedFile(fileName(), logicalFileName()); - Exception ex(errors::FileOpenError, "", e); - ex.addContext("Calling RootFileSequenceBase::initTheFile()"); - std::ostringstream out; - out << "Input file " << fileName() << " could not be opened."; - ex.addAdditionalInfo(out.str()); - throw ex; - } - } - } - if (!filePtr && (hasFallbackUrl)) { + input ? std::make_unique(*input, lfn_, false) : nullptr); try { - usedFallback_ = true; - std::unique_ptr sentry( - input ? std::make_unique(*input, lfn_, usedFallback_) : nullptr); - std::unique_ptr fallbackFullName(gSystem->ExpandPathName(fallbackFileName().c_str())); - filePtr.reset(new InputFile(fallbackFullName.get(), " Fallback request to file ", inputType)); + std::unique_ptr name(gSystem->ExpandPathName(fileName().c_str())); + filePtr = std::make_shared(name.get(), " Initiating request to open file ", inputType); } catch (cms::Exception const& e) { if (!skipBadFiles) { - InputFile::reportSkippedFile(fileName(), logicalFileName()); - Exception ex(errors::FallbackFileOpenError, "", e); - ex.addContext("Calling RootFileSequenceBase::initTheFile()"); - std::ostringstream out; - out << "Input file " << fileName() << " could not be opened.\n"; - out << "Fallback Input file " << fallbackFileName() << " also could not be opened."; - if (!originalInfo.empty()) { - out << std::endl << "Original exception info is above; fallback exception info is below."; - ex.addAdditionalInfo(out.str()); - for (auto const& s : originalInfo) { - ex.addAdditionalInfo(s); - } + if (hasFallbackUrl) { + std::ostringstream out; + out << e.explainSelf(); + + std::unique_ptr name(gSystem->ExpandPathName(fallbackFileName().c_str())); + std::string pfn(name.get()); + InputFile::reportFallbackAttempt(pfn, logicalFileName(), out.str()); + originalInfo = e.additionalInfo(); } else { + InputFile::reportSkippedFile(fileName(), logicalFileName()); + Exception ex(errors::FileOpenError, "", e); + ex.addContext("Calling RootFileSequenceBase::initTheFile()"); + std::ostringstream out; + out << "Input file " << fileName() << " could not be opened."; ex.addAdditionalInfo(out.str()); + throw ex; + } + } + } + if (!filePtr && (hasFallbackUrl)) { + try { + usedFallback_ = true; + std::unique_ptr fallbackFullName(gSystem->ExpandPathName(fallbackFileName().c_str())); + filePtr.reset(new InputFile(fallbackFullName.get(), " Fallback request to file ", inputType)); + } catch (cms::Exception const& e) { + if (!skipBadFiles) { + InputFile::reportSkippedFile(fileName(), logicalFileName()); + Exception ex(errors::FallbackFileOpenError, "", e); + ex.addContext("Calling RootFileSequenceBase::initTheFile()"); + std::ostringstream out; + out << "Input file " << fileName() << " could not be opened.\n"; + out << "Fallback Input file " << fallbackFileName() << " also could not be opened."; + if (!originalInfo.empty()) { + out << std::endl << "Original exception info is above; fallback exception info is below."; + ex.addAdditionalInfo(out.str()); + for (auto const& s : originalInfo) { + ex.addAdditionalInfo(s); + } + } else { + ex.addAdditionalInfo(out.str()); + } + throw ex; } - throw ex; } } } From f9ef3a687f1481e6a8a474aaafc2b41e6a4c5302 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 30 Sep 2021 11:25:22 -0500 Subject: [PATCH 3/8] StatisticsSenderService now requires LFN or URL When sending information to the StatisticsSenderService, the file LFN or URL must be supplied. --- .../TFileAdaptor/src/TStorageFactoryFile.cc | 2 +- .../interface/StatisticsSenderService.h | 43 ++-- .../src/StatisticsSenderService.cc | 184 ++++++++++++++---- Utilities/XrdAdaptor/src/XrdRequestManager.cc | 2 +- 4 files changed, 178 insertions(+), 53 deletions(-) diff --git a/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc b/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc index 5a8622d75d755..405453c42b876 100644 --- a/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc +++ b/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc @@ -195,7 +195,7 @@ void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = "" try { edm::Service statsService; if (statsService.isAvailable()) { - statsService->setSize(storage_->size()); + statsService->setSize(path, storage_->size()); } } catch (edm::Exception const &e) { if (e.categoryCode() != edm::errors::NotFound) { diff --git a/Utilities/StorageFactory/interface/StatisticsSenderService.h b/Utilities/StorageFactory/interface/StatisticsSenderService.h index 24651dcc9f9c2..940defc861283 100644 --- a/Utilities/StorageFactory/interface/StatisticsSenderService.h +++ b/Utilities/StorageFactory/interface/StatisticsSenderService.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace edm { @@ -16,19 +17,24 @@ namespace edm { class StatisticsSenderService { public: - StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar); + StatisticsSenderService(edm::ParameterSet const& pset, edm::ActivityRegistry& ar); - void setSize(size_t size); - void setCurrentServer(const std::string &servername); - void filePreCloseEvent(std::string const &lfn, bool usedFallback); - static const char *getJobID(); - static bool getX509Subject(std::string &); + void setSize(const std::string& urlOrLfn, size_t size); + void setCurrentServer(const std::string& urlOrLfn, const std::string& servername); + static const char* getJobID(); + static bool getX509Subject(std::string&); + + void openingFile(std::string const& lfn, size_t size = -1); + void closedFile(std::string const& lfn, bool usedFallback); private: + void filePreCloseEvent(std::string const& lfn, bool usedFallback); + + std::string const* matchedLfn(std::string const& iURL); //updates its internal cache class FileStatistics { public: FileStatistics(); - void fillUDP(std::ostringstream &os); + void fillUDP(std::ostringstream& os); private: ssize_t m_read_single_operations; @@ -42,19 +48,30 @@ namespace edm { time_t m_start_time; }; - void determineHostnames(void); - void fillUDP(const std::string &, bool, std::string &); + struct FileInfo { + explicit FileInfo(std::string const& iLFN); + std::string m_filelfn; + std::string m_serverhost; + std::string m_serverdomain; + ssize_t m_size; + size_t m_id; //from m_counter + std::atomic m_openCount; + }; + + void determineHostnames(); + void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&); + void cleanupOldFiles(); + std::string m_clienthost; std::string m_clientdomain; - std::string m_serverhost; - std::string m_serverdomain; - std::string m_filelfn; + tbb::concurrent_unordered_map m_lfnToFileInfo; + tbb::concurrent_unordered_map m_urlToLfn; FileStatistics m_filestats; std::string m_guid; size_t m_counter; - std::atomic m_size; std::string m_userdn; std::mutex m_servermutex; + const bool m_debug; }; } // namespace storage diff --git a/Utilities/StorageFactory/src/StatisticsSenderService.cc b/Utilities/StorageFactory/src/StatisticsSenderService.cc index 660b8fd43bc22..194eac87c6b6c 100644 --- a/Utilities/StorageFactory/src/StatisticsSenderService.cc +++ b/Utilities/StorageFactory/src/StatisticsSenderService.cc @@ -4,6 +4,7 @@ #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/Catalog/interface/SiteLocalConfig.h" #include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/Utilities/src/Guid.h" #include @@ -124,19 +125,20 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { os << "\"end_time\":" << m_start_time; } -StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const & /*pset*/, edm::ActivityRegistry &ar) +StatisticsSenderService::FileInfo::FileInfo(std::string const &iLFN) + : m_filelfn(iLFN), m_serverhost("unknown"), m_serverdomain("unknown"), m_size(-1), m_id(0), m_openCount(1) {} + +StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const &iPSet, edm::ActivityRegistry &ar) : m_clienthost("unknown"), m_clientdomain("unknown"), - m_serverhost("unknown"), - m_serverdomain("unknown"), - m_filelfn("unknown"), m_filestats(), m_guid(Guid().toString()), m_counter(0), - m_size(-1), - m_userdn("unknown") { + m_userdn("unknown"), + m_debug(iPSet.getUntrackedParameter("debug", false)) { determineHostnames(); ar.watchPreCloseFile(this, &StatisticsSenderService::filePreCloseEvent); + ar.watchPreOpenFile([this](auto const &iLFN, bool) { openingFile(iLFN, -1); }); if (!getX509Subject(m_userdn)) { m_userdn = "unknown"; } @@ -148,8 +150,36 @@ const char *StatisticsSenderService::getJobID() { return id ? id : std::getenv(JOB_UNIQUE_ID_ENV_V2); } -void StatisticsSenderService::setCurrentServer(const std::string &servername) { - size_t dot_pos = servername.find("."); +std::string const *StatisticsSenderService::matchedLfn(std::string const &iURL) { + auto found = m_urlToLfn.find(iURL); + if (found != m_urlToLfn.end()) { + return &found->second; + } + for (auto const &v : m_lfnToFileInfo) { + if (v.first.size() < iURL.size()) { + if (v.first == iURL.substr(iURL.size() - v.first.size())) { + m_urlToLfn.emplace(iURL, v.first); + return &m_urlToLfn.find(iURL)->second; + } + } + } + //does the lfn have a protocol and the iURL not? + if (std::string::npos == iURL.find(':')) { + for (auto const &v : m_lfnToFileInfo) { + if ((std::string::npos != v.first.find(':')) and (v.first.size() > iURL.size())) { + if (iURL == v.first.substr(v.first.size() - iURL.size())) { + m_urlToLfn.emplace(iURL, v.first); + return &m_urlToLfn.find(iURL)->second; + } + } + } + } + + return nullptr; +} + +void StatisticsSenderService::setCurrentServer(const std::string &url, const std::string &servername) { + size_t dot_pos = servername.find('.'); std::string serverhost; std::string serverdomain; if (dot_pos == std::string::npos) { @@ -163,24 +193,41 @@ void StatisticsSenderService::setCurrentServer(const std::string &servername) { } } { + auto lfn = matchedLfn(url); std::lock_guard sentry(m_servermutex); - m_serverhost = std::move(serverhost); - m_serverdomain = std::move(serverdomain); + if (nullptr != lfn) { + auto found = m_lfnToFileInfo.find(*lfn); + if (found != m_lfnToFileInfo.end()) { + found->second.m_serverhost = std::move(serverhost); + found->second.m_serverdomain = std::move(serverdomain); + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "setCurrentServer: unknown url name " << url << "\n"; + } } } -void StatisticsSenderService::setSize(size_t size) { m_size = size; } - -void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool usedFallback) { - m_filelfn = lfn; +void StatisticsSenderService::openingFile(std::string const &lfn, size_t size) { + m_urlToLfn.emplace(lfn, lfn); + auto attempt = m_lfnToFileInfo.emplace(lfn, lfn); + if (attempt.second) { + attempt.first->second.m_size = size; + attempt.first->second.m_id = m_counter++; + edm::LogInfo("StatisticsSenderService") << "openingFile: opening " << lfn << "\n"; + } else { + ++(attempt.first->second.m_openCount); + edm::LogInfo("StatisticsSenderService") << "openingFile: re-opening" << lfn << "\n"; + } +} +void StatisticsSenderService::closedFile(std::string const &url, bool usedFallback) { edm::Service pSLC; if (!pSLC.isAvailable()) { return; } const struct addrinfo *addresses = pSLC->statisticsDestination(); - if (!addresses) { + if (!addresses and !m_debug) { return; } @@ -190,22 +237,85 @@ void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool use m_userdn = "not reported"; } - std::string results; - fillUDP(pSLC->siteName(), usedFallback, results); + auto lfn = matchedLfn(url); + if (nullptr != lfn) { + auto found = m_lfnToFileInfo.find(*lfn); + assert(found != m_lfnToFileInfo.end()); - for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) { - int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); - if (sock < 0) { - continue; + std::string results; + fillUDP(pSLC->siteName(), found->second, usedFallback, results); + if (m_debug) { + edm::LogSystem("StatisticSenderService") << "\n" << results << "\n"; } - auto close_del = [](int *iSocket) { close(*iSocket); }; - std::unique_ptr guard(&sock, close_del); - if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) { - break; + + for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) { + int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); + if (sock < 0) { + continue; + } + auto close_del = [](int *iSocket) { close(*iSocket); }; + std::unique_ptr guard(&sock, close_del); + if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) { + break; + } + } + + auto c = --found->second.m_openCount; + if (c == 0) { + edm::LogWarning("StatisticsSenderService") << "fully closed: " << *lfn << "\n"; + } else { + edm::LogWarning("StatisticsSenderService") << "partially closed: " << *lfn << "\n"; + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "closed: unknown url name " << url << "\n"; + } +} + +void StatisticsSenderService::cleanupOldFiles() { + //remove entries with openCount of 0 + bool moreToTest = false; + do { + moreToTest = false; + for (auto it = m_lfnToFileInfo.begin(); it != m_lfnToFileInfo.end(); ++it) { + if (it->second.m_openCount == 0) { + auto lfn = it->first; + bool moreToTest2 = false; + do { + moreToTest2 = false; + for (auto it2 = m_urlToLfn.begin(); it2 != m_urlToLfn.end(); ++it2) { + if (it2->second == lfn) { + m_urlToLfn.unsafe_erase(it2); + moreToTest2 = true; + break; + } + } + } while (moreToTest2); + + m_lfnToFileInfo.unsafe_erase(it); + moreToTest = true; + break; + } } + } while (moreToTest); +} + +void StatisticsSenderService::setSize(const std::string &url, size_t size) { + auto lfn = matchedLfn(url); + if (nullptr != lfn) { + auto itFound = m_lfnToFileInfo.find(*lfn); + if (itFound != m_lfnToFileInfo.end()) { + //do I need to synchronize? + itFound->second.m_size = size; + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "setSize: unknown url name " << url << "\n"; } +} - m_counter++; +void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool usedFallback) { + closedFile(lfn, usedFallback); + //we are at a sync point in the framwework so no new files are being opened + cleanupOldFiles(); } void StatisticsSenderService::determineHostnames(void) { @@ -225,7 +335,10 @@ void StatisticsSenderService::determineHostnames(void) { } } -void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFallback, std::string &udpinfo) { +void StatisticsSenderService::fillUDP(const std::string &siteName, + const FileInfo &fileinfo, + bool usedFallback, + std::string &udpinfo) { std::ostringstream os; // Header - same for all IO accesses @@ -236,21 +349,16 @@ void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFall if (usedFallback) { os << "\"fallback\": true, "; } - std::string serverhost; - std::string serverdomain; - { - std::lock_guard sentry(m_servermutex); - serverhost = m_serverhost; - serverdomain = m_serverdomain; - } + auto serverhost = fileinfo.m_serverhost; + auto serverdomain = fileinfo.m_serverdomain; os << "\"user_dn\":\"" << m_userdn << "\", "; os << "\"client_host\":\"" << m_clienthost << "\", "; os << "\"client_domain\":\"" << m_clientdomain << "\", "; os << "\"server_host\":\"" << serverhost << "\", "; os << "\"server_domain\":\"" << serverdomain << "\", "; - os << "\"unique_id\":\"" << m_guid << "-" << m_counter << "\", "; - os << "\"file_lfn\":\"" << m_filelfn << "\", "; + os << "\"unique_id\":\"" << m_guid << "-" << fileinfo.m_id << "\", "; + os << "\"file_lfn\":\"" << fileinfo.m_filelfn << "\", "; // Dashboard devs requested that we send out no app_info if a job ID // is not present in the environment. const char *jobId = getJobID(); @@ -258,8 +366,8 @@ void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFall os << "\"app_info\":\"" << jobId << "\", "; } - if (m_size >= 0) { - os << "\"file_size\":" << m_size << ", "; + if (fileinfo.m_size >= 0) { + os << "\"file_size\":" << fileinfo.m_size << ", "; } m_filestats.fillUDP(os); diff --git a/Utilities/XrdAdaptor/src/XrdRequestManager.cc b/Utilities/XrdAdaptor/src/XrdRequestManager.cc index 61b1751eabda0..d25b67db750fa 100644 --- a/Utilities/XrdAdaptor/src/XrdRequestManager.cc +++ b/Utilities/XrdAdaptor/src/XrdRequestManager.cc @@ -243,7 +243,7 @@ void RequestManager::updateCurrentServer() { std::unique_ptr hostname(hostname_ptr); edm::Service statsService; if (statsService.isAvailable()) { - statsService->setCurrentServer(*hostname_ptr); + statsService->setCurrentServer(m_name, *hostname_ptr); } } } From 34043820d37b43b3e4b128321decd4c1fe1f1c9b Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 30 Sep 2021 16:03:08 -0500 Subject: [PATCH 4/8] Report all framework files to StatisticsSenderService Send statistics for primary, secondary, and embedded files. The aggregate file statistics are only reset on primary file close boundaries to keep the behavior the same as previous. Changed all calls to closeFile_() to be the new closeFile() --- IOPool/Input/src/EmbeddedRootSource.cc | 2 +- IOPool/Input/src/PoolSource.cc | 2 +- IOPool/Input/src/RootEmbeddedFileSequence.cc | 2 +- IOPool/Input/src/RootEmbeddedFileSequence.h | 2 +- IOPool/Input/src/RootInputFileSequence.cc | 16 +++- IOPool/Input/src/RootInputFileSequence.h | 2 + IOPool/Input/src/RootPrimaryFileSequence.cc | 4 +- IOPool/Input/src/RootPrimaryFileSequence.h | 2 +- IOPool/Input/src/RootSecondaryFileSequence.cc | 2 +- IOPool/Input/src/RootSecondaryFileSequence.h | 2 +- .../interface/StatisticsSenderService.h | 9 ++- .../src/StatisticsSenderService.cc | 77 +++++++++++++------ 12 files changed, 84 insertions(+), 38 deletions(-) diff --git a/IOPool/Input/src/EmbeddedRootSource.cc b/IOPool/Input/src/EmbeddedRootSource.cc index bc9bf251666f1..f71854d29ad2f 100644 --- a/IOPool/Input/src/EmbeddedRootSource.cc +++ b/IOPool/Input/src/EmbeddedRootSource.cc @@ -45,7 +45,7 @@ namespace edm { InputFile::reportReadBranches(); } - void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile_(); } + void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile(); } bool EmbeddedRootSource::readOneEvent(EventPrincipal& cache, size_t& fileNameHash, diff --git a/IOPool/Input/src/PoolSource.cc b/IOPool/Input/src/PoolSource.cc index db20150593168..03d5772e82aed 100644 --- a/IOPool/Input/src/PoolSource.cc +++ b/IOPool/Input/src/PoolSource.cc @@ -162,7 +162,7 @@ namespace edm { return fb; } - void PoolSource::closeFile_() { primaryFileSequence_->closeFile_(); } + void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); } std::shared_ptr PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); } diff --git a/IOPool/Input/src/RootEmbeddedFileSequence.cc b/IOPool/Input/src/RootEmbeddedFileSequence.cc index e3c9217158480..b5fce11b6dfe0 100644 --- a/IOPool/Input/src/RootEmbeddedFileSequence.cc +++ b/IOPool/Input/src/RootEmbeddedFileSequence.cc @@ -108,7 +108,7 @@ namespace edm { RootEmbeddedFileSequence::~RootEmbeddedFileSequence() {} - void RootEmbeddedFileSequence::endJob() { closeFile_(); } + void RootEmbeddedFileSequence::endJob() { closeFile(); } void RootEmbeddedFileSequence::closeFile_() { // delete the RootFile object. diff --git a/IOPool/Input/src/RootEmbeddedFileSequence.h b/IOPool/Input/src/RootEmbeddedFileSequence.h index 2480ef8e19d43..1c95097cd8468 100644 --- a/IOPool/Input/src/RootEmbeddedFileSequence.h +++ b/IOPool/Input/src/RootEmbeddedFileSequence.h @@ -39,7 +39,6 @@ namespace edm { RootEmbeddedFileSequence(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving RootEmbeddedFileSequence& operator=(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving - void closeFile_() override; void endJob(); void skipEntries(unsigned int offset); bool readOneEvent( @@ -56,6 +55,7 @@ namespace edm { static void fillDescription(ParameterSetDescription& desc); private: + void closeFile_() override; void initFile_(bool skipBadFiles) override; RootFileSharedPtr makeRootFile(std::shared_ptr filePtr) override; diff --git a/IOPool/Input/src/RootInputFileSequence.cc b/IOPool/Input/src/RootInputFileSequence.cc index 6520a4e4ddb6e..2666db1602afd 100644 --- a/IOPool/Input/src/RootInputFileSequence.cc +++ b/IOPool/Input/src/RootInputFileSequence.cc @@ -10,6 +10,8 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" #include "Utilities/StorageFactory/interface/StorageFactory.h" +#include "Utilities/StorageFactory/interface/StatisticsSenderService.h" +#include "FWCore/ServiceRegistry/interface/Service.h" #include "TSystem.h" @@ -192,7 +194,7 @@ namespace edm { } fileIterLastOpened_ = fileIterEnd_; } - closeFile_(); + closeFile(); if (noMoreFiles()) { // No files specified @@ -227,6 +229,10 @@ namespace edm { { std::unique_ptr sentry( input ? std::make_unique(*input, lfn_, false) : nullptr); + edm::Service service; + if (service.isAvailable()) { + service->openingFile(lfn(), -1); + } try { std::unique_ptr name(gSystem->ExpandPathName(fileName().c_str())); filePtr = std::make_shared(name.get(), " Initiating request to open file ", inputType); @@ -298,6 +304,14 @@ namespace edm { } } + void RootInputFileSequence::closeFile() { + edm::Service service; + if (rootFile() and service.isAvailable()) { + service->closedFile(lfn(), usedFallback()); + } + closeFile_(); + } + void RootInputFileSequence::setIndexIntoFile(size_t index) { indexesIntoFiles_[index] = rootFile()->indexIntoFileSharedPtr(); } diff --git a/IOPool/Input/src/RootInputFileSequence.h b/IOPool/Input/src/RootInputFileSequence.h index bf91576e3a4fb..ae132f6971eea 100644 --- a/IOPool/Input/src/RootInputFileSequence.h +++ b/IOPool/Input/src/RootInputFileSequence.h @@ -48,6 +48,8 @@ namespace edm { std::shared_ptr fileProductRegistry() const; std::shared_ptr fileBranchIDListHelper() const; + void closeFile(); + protected: typedef std::shared_ptr RootFileSharedPtr; void initFile(bool skipBadFiles) { initFile_(skipBadFiles); } diff --git a/IOPool/Input/src/RootPrimaryFileSequence.cc b/IOPool/Input/src/RootPrimaryFileSequence.cc index 48ecd0115df87..7492316373ee6 100644 --- a/IOPool/Input/src/RootPrimaryFileSequence.cc +++ b/IOPool/Input/src/RootPrimaryFileSequence.cc @@ -68,7 +68,7 @@ namespace edm { RootPrimaryFileSequence::~RootPrimaryFileSequence() {} - void RootPrimaryFileSequence::endJob() { closeFile_(); } + void RootPrimaryFileSequence::endJob() { closeFile(); } std::unique_ptr RootPrimaryFileSequence::readFile_() { if (firstFile_) { @@ -212,7 +212,7 @@ namespace edm { // Rewind to before the first event that was read. void RootPrimaryFileSequence::rewind_() { if (!atFirstFile()) { - closeFile_(); + closeFile(); setAtFirstFile(); } if (!rootFile()) { diff --git a/IOPool/Input/src/RootPrimaryFileSequence.h b/IOPool/Input/src/RootPrimaryFileSequence.h index 903e1001c82f5..f2390be10df64 100644 --- a/IOPool/Input/src/RootPrimaryFileSequence.h +++ b/IOPool/Input/src/RootPrimaryFileSequence.h @@ -40,7 +40,6 @@ namespace edm { RootPrimaryFileSequence& operator=(RootPrimaryFileSequence const&) = delete; // Disallow copying and moving std::unique_ptr readFile_(); - void closeFile_() override; void endJob(); InputSource::ItemType getNextItemType(RunNumber_t& run, LuminosityBlockNumber_t& lumi, EventNumber_t& event); bool skipEvents(int offset); @@ -56,6 +55,7 @@ namespace edm { bool nextFile(); bool previousFile(); void rewindFile(); + void closeFile_() override; int remainingEvents() const; int remainingLuminosityBlocks() const; diff --git a/IOPool/Input/src/RootSecondaryFileSequence.cc b/IOPool/Input/src/RootSecondaryFileSequence.cc index e0f14147c48c0..9d70a6c787199 100644 --- a/IOPool/Input/src/RootSecondaryFileSequence.cc +++ b/IOPool/Input/src/RootSecondaryFileSequence.cc @@ -52,7 +52,7 @@ namespace edm { RootSecondaryFileSequence::~RootSecondaryFileSequence() {} - void RootSecondaryFileSequence::endJob() { closeFile_(); } + void RootSecondaryFileSequence::endJob() { closeFile(); } void RootSecondaryFileSequence::closeFile_() { // close the currently open file, if any, and delete the RootFile object. diff --git a/IOPool/Input/src/RootSecondaryFileSequence.h b/IOPool/Input/src/RootSecondaryFileSequence.h index 6ff9be7c5f884..5ad6e3516f4a5 100644 --- a/IOPool/Input/src/RootSecondaryFileSequence.h +++ b/IOPool/Input/src/RootSecondaryFileSequence.h @@ -33,11 +33,11 @@ namespace edm { RootSecondaryFileSequence(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving RootSecondaryFileSequence& operator=(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving - void closeFile_() override; void endJob(); void initAssociationsFromSecondary(std::set const&); private: + void closeFile_() override; void initFile_(bool skipBadFiles) override; RootFileSharedPtr makeRootFile(std::shared_ptr filePtr) override; diff --git a/Utilities/StorageFactory/interface/StatisticsSenderService.h b/Utilities/StorageFactory/interface/StatisticsSenderService.h index 940defc861283..61bfd55517136 100644 --- a/Utilities/StorageFactory/interface/StatisticsSenderService.h +++ b/Utilities/StorageFactory/interface/StatisticsSenderService.h @@ -28,13 +28,14 @@ namespace edm { void closedFile(std::string const& lfn, bool usedFallback); private: - void filePreCloseEvent(std::string const& lfn, bool usedFallback); + void filePostCloseEvent(std::string const& lfn, bool usedFallback); std::string const* matchedLfn(std::string const& iURL); //updates its internal cache class FileStatistics { public: FileStatistics(); - void fillUDP(std::ostringstream& os); + void fillUDP(std::ostringstream& os) const; + void update(); private: ssize_t m_read_single_operations; @@ -53,13 +54,13 @@ namespace edm { std::string m_filelfn; std::string m_serverhost; std::string m_serverdomain; - ssize_t m_size; + std::atomic m_size; size_t m_id; //from m_counter std::atomic m_openCount; }; void determineHostnames(); - void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&); + void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&) const; void cleanupOldFiles(); std::string m_clienthost; diff --git a/Utilities/StorageFactory/src/StatisticsSenderService.cc b/Utilities/StorageFactory/src/StatisticsSenderService.cc index 194eac87c6b6c..eb5401431d815 100644 --- a/Utilities/StorageFactory/src/StatisticsSenderService.cc +++ b/Utilities/StorageFactory/src/StatisticsSenderService.cc @@ -16,11 +16,7 @@ #include #include -#define UPDATE_STATISTIC(x) m_##x = x; - -#define UPDATE_AND_OUTPUT_STATISTIC(x) \ - os << "\"" #x "\":" << (x - m_##x) << ", "; \ - UPDATE_STATISTIC(x) +#define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", "; // Simple hack to define HOST_NAME_MAX on Mac. // Allows arrays to be statically allocated @@ -28,8 +24,8 @@ #define HOST_NAME_MAX 128 #endif -#define JOB_UNIQUE_ID_ENV "CRAB_UNIQUE_JOB_ID" -#define JOB_UNIQUE_ID_ENV_V2 "DashboardJobId" +static constexpr char const *const JOB_UNIQUE_ID_ENV = "CRAB_UNIQUE_JOB_ID"; +static constexpr char const *const JOB_UNIQUE_ID_ENV_V2 = "DashboardJobId"; using namespace edm::storage; @@ -44,7 +40,7 @@ StatisticsSenderService::FileStatistics::FileStatistics() m_read_vector_count_square(0), m_start_time(time(nullptr)) {} -void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { +void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) const { const StorageAccount::StorageStats &stats = StorageAccount::summary(); ssize_t read_single_operations = 0; ssize_t read_single_bytes = 0; @@ -84,7 +80,6 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { << ", "; os << "\"read_single_average\":" << single_average << ", "; } - m_read_single_square = read_single_square; int64_t vector_op_count = read_vector_operations - m_read_vector_operations; if (vector_op_count > 0) { double vector_average = @@ -104,9 +99,6 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { static_cast(vector_op_count))) << ", "; } - m_read_vector_square = read_vector_square; - m_read_vector_count_square = read_vector_count_square; - m_read_vector_count_sum = read_vector_count_sum; os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", "; @@ -114,17 +106,56 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", "; // See top of file for macros; not complex, just avoiding copy/paste - UPDATE_AND_OUTPUT_STATISTIC(read_single_operations) - UPDATE_AND_OUTPUT_STATISTIC(read_single_bytes) - UPDATE_AND_OUTPUT_STATISTIC(read_vector_operations) - UPDATE_AND_OUTPUT_STATISTIC(read_vector_bytes) + OUTPUT_STATISTIC(read_single_operations) + OUTPUT_STATISTIC(read_single_bytes) + OUTPUT_STATISTIC(read_vector_operations) + OUTPUT_STATISTIC(read_vector_bytes) os << "\"start_time\":" << m_start_time << ", "; - m_start_time = time(nullptr); // NOTE: last entry doesn't have the trailing comma. - os << "\"end_time\":" << m_start_time; + os << "\"end_time\":" << time(nullptr); } +void StatisticsSenderService::FileStatistics::update() { + const StorageAccount::StorageStats &stats = StorageAccount::summary(); + ssize_t read_single_operations = 0; + ssize_t read_single_bytes = 0; + ssize_t read_single_square = 0; + ssize_t read_vector_operations = 0; + ssize_t read_vector_bytes = 0; + ssize_t read_vector_square = 0; + ssize_t read_vector_count_sum = 0; + ssize_t read_vector_count_square = 0; + auto token = StorageAccount::tokenForStorageClassName("tstoragefile"); + for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) { + if (i->first == token.value()) { + continue; + } + for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) { + if (j->first == static_cast(StorageAccount::Operation::readv)) { + read_vector_operations += j->second.attempts; + read_vector_bytes += j->second.amount; + read_vector_count_square += j->second.vector_square; + read_vector_square += j->second.amount_square; + read_vector_count_sum += j->second.vector_count; + } else if (j->first == static_cast(StorageAccount::Operation::read)) { + read_single_operations += j->second.attempts; + read_single_bytes += j->second.amount; + read_single_square += j->second.amount_square; + } + } + } + + m_read_single_square = read_single_square; + m_read_vector_square = read_vector_square; + m_read_vector_count_square = read_vector_count_square; + m_read_vector_count_sum = read_vector_count_sum; + m_read_single_operations = read_single_operations; + m_read_single_bytes = read_single_bytes; + m_read_vector_operations = read_vector_operations; + m_read_vector_bytes = read_vector_bytes; + m_start_time = time(nullptr); +} StatisticsSenderService::FileInfo::FileInfo(std::string const &iLFN) : m_filelfn(iLFN), m_serverhost("unknown"), m_serverdomain("unknown"), m_size(-1), m_id(0), m_openCount(1) {} @@ -137,8 +168,7 @@ StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const &iPSet, m_userdn("unknown"), m_debug(iPSet.getUntrackedParameter("debug", false)) { determineHostnames(); - ar.watchPreCloseFile(this, &StatisticsSenderService::filePreCloseEvent); - ar.watchPreOpenFile([this](auto const &iLFN, bool) { openingFile(iLFN, -1); }); + ar.watchPostCloseFile(this, &StatisticsSenderService::filePostCloseEvent); if (!getX509Subject(m_userdn)) { m_userdn = "unknown"; } @@ -304,7 +334,6 @@ void StatisticsSenderService::setSize(const std::string &url, size_t size) { if (nullptr != lfn) { auto itFound = m_lfnToFileInfo.find(*lfn); if (itFound != m_lfnToFileInfo.end()) { - //do I need to synchronize? itFound->second.m_size = size; } } else if (m_debug) { @@ -312,10 +341,10 @@ void StatisticsSenderService::setSize(const std::string &url, size_t size) { } } -void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool usedFallback) { - closedFile(lfn, usedFallback); +void StatisticsSenderService::filePostCloseEvent(std::string const &lfn, bool usedFallback) { //we are at a sync point in the framwework so no new files are being opened cleanupOldFiles(); + m_filestats.update(); } void StatisticsSenderService::determineHostnames(void) { @@ -338,7 +367,7 @@ void StatisticsSenderService::determineHostnames(void) { void StatisticsSenderService::fillUDP(const std::string &siteName, const FileInfo &fileinfo, bool usedFallback, - std::string &udpinfo) { + std::string &udpinfo) const { std::ostringstream os; // Header - same for all IO accesses From 421b4a25031f31381a874b062f84b40849d85448 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Fri, 1 Oct 2021 09:46:20 -0500 Subject: [PATCH 5/8] Changed SecondaryProducer to be a one::EDProducer --- IOPool/SecondaryInput/test/SecondaryProducer.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/IOPool/SecondaryInput/test/SecondaryProducer.h b/IOPool/SecondaryInput/test/SecondaryProducer.h index 7d3086649deb3..22cb7ee1832d6 100644 --- a/IOPool/SecondaryInput/test/SecondaryProducer.h +++ b/IOPool/SecondaryInput/test/SecondaryProducer.h @@ -9,7 +9,7 @@ ************************************************************/ #include "DataFormats/Provenance/interface/EventID.h" -#include "FWCore/Framework/interface/EDProducer.h" +#include "FWCore/Framework/interface/one/EDProducer.h" #include "FWCore/Utilities/interface/get_underlying_safe.h" #include @@ -19,7 +19,7 @@ namespace edm { class ProcessConfiguration; class VectorInputSource; - class SecondaryProducer : public EDProducer { + class SecondaryProducer : public one::EDProducer<> { public: /** standard constructor*/ explicit SecondaryProducer(ParameterSet const& pset); From 51a1bd440427f4d8b0b839bd769b28758fa130bf Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Fri, 1 Oct 2021 09:47:24 -0500 Subject: [PATCH 6/8] Added unit test for StatisticsSenderService --- Utilities/StorageFactory/test/BuildFile.xml | 1 + .../StorageFactory/test/make_2nd_file_cfg.py | 10 +++++ .../test/make_test_files_cfg.py | 20 +++++++++ .../test/test_file_statistics_sender.sh | 41 +++++++++++++++++ .../test_multi_file_statistics_sender_cfg.py | 10 +++++ ...ltiple_files_file_statistics_sender_cfg.py | 10 +++++ ...st_secondary_file_statistics_sender_cfg.py | 44 +++++++++++++++++++ .../test_single_file_statistics_sender_cfg.py | 11 +++++ 8 files changed, 147 insertions(+) create mode 100644 Utilities/StorageFactory/test/make_2nd_file_cfg.py create mode 100644 Utilities/StorageFactory/test/make_test_files_cfg.py create mode 100755 Utilities/StorageFactory/test/test_file_statistics_sender.sh create mode 100644 Utilities/StorageFactory/test/test_multi_file_statistics_sender_cfg.py create mode 100644 Utilities/StorageFactory/test/test_multiple_files_file_statistics_sender_cfg.py create mode 100644 Utilities/StorageFactory/test/test_secondary_file_statistics_sender_cfg.py create mode 100644 Utilities/StorageFactory/test/test_single_file_statistics_sender_cfg.py diff --git a/Utilities/StorageFactory/test/BuildFile.xml b/Utilities/StorageFactory/test/BuildFile.xml index 27329acd2a6d8..2ce22f617536c 100644 --- a/Utilities/StorageFactory/test/BuildFile.xml +++ b/Utilities/StorageFactory/test/BuildFile.xml @@ -37,3 +37,4 @@ # to wait until the framework decides on a threading model to implement a fix. # file="threadsafe.cpp" name="test_StorageFactory_threadsafe" + diff --git a/Utilities/StorageFactory/test/make_2nd_file_cfg.py b/Utilities/StorageFactory/test/make_2nd_file_cfg.py new file mode 100644 index 0000000000000..65da742632ca9 --- /dev/null +++ b/Utilities/StorageFactory/test/make_2nd_file_cfg.py @@ -0,0 +1,10 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("SECOND") + +process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring("file:stat_sender_first.root")) + +process.o = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_second.root"), outputCommands = cms.untracked.vstring("drop *")) + +process.ep = cms.EndPath(process.o) + diff --git a/Utilities/StorageFactory/test/make_test_files_cfg.py b/Utilities/StorageFactory/test/make_test_files_cfg.py new file mode 100644 index 0000000000000..60f3944f256bc --- /dev/null +++ b/Utilities/StorageFactory/test/make_test_files_cfg.py @@ -0,0 +1,20 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("FIRST") + +process.source = cms.Source("EmptySource") + +process.first = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_first.root")) +process.b = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_b.root")) +process.c = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_c.root")) +process.d = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_d.root")) +process.e = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_e.root")) + +process.Thing = cms.EDProducer("ThingProducer") +process.OtherThing = cms.EDProducer("OtherThingProducer") +process.EventNumber = cms.EDProducer("EventNumberIntProducer") + + +process.o = cms.EndPath(process.first+process.b+process.c+process.d+process.e, cms.Task(process.Thing, process.OtherThing, process.EventNumber)) + +process.maxEvents.input = 10 diff --git a/Utilities/StorageFactory/test/test_file_statistics_sender.sh b/Utilities/StorageFactory/test/test_file_statistics_sender.sh new file mode 100755 index 0000000000000..523da45696e34 --- /dev/null +++ b/Utilities/StorageFactory/test/test_file_statistics_sender.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +# Pass in name and status +function die { echo $1: status $2 ; exit $2; } + +LOCAL_TEST_DIR=${CMSSW_BASE}/src/Utilities/StorageFactory/test +LOCAL_TMP_DIR=${CMSSW_BASE}/tmp/${SCRAM_ARCH} + +pushd ${LOCAL_TMP_DIR} + +#setup files used in tests +cmsRun ${LOCAL_TEST_DIR}/make_test_files_cfg.py &> make_test_files.log || die "cmsRun make_test_files_cfg.py" $? +rm make_test_files.log +cmsRun ${LOCAL_TEST_DIR}/make_2nd_file_cfg.py &> make_2nd_file.log || die "cmsRun make_2nd_file_cfg.py" $? +rm make_2nd_file.log + +cmsRun ${LOCAL_TEST_DIR}/test_single_file_statistics_sender_cfg.py &> test_single_file_statistics_sender.log || die "cmsRun test_single_file_statistics_sender_cfg.py" $? +grep -q '"file_lfn":"file:stat_sender_first.root"' test_single_file_statistics_sender.log || die "no StatisticsSenderService output for single file" 1 +rm test_single_file_statistics_sender.log + +cmsRun ${LOCAL_TEST_DIR}/test_multiple_files_file_statistics_sender_cfg.py &> test_multiple_files_file_statistics_sender.log || die "cmsRun test_multiple_files_file_statistics_sender_cfg.py" $? +grep -q '"file_lfn":"file:stat_sender_b.root"' test_multiple_files_file_statistics_sender.log || die "no StatisticsSenderService output for file b in multiple files" 1 +grep -q '"file_lfn":"file:stat_sender_c.root"' test_multiple_files_file_statistics_sender.log || die "no StatisticsSenderService output for file c in multiple files" 1 +grep -q '"file_lfn":"file:stat_sender_d.root"' test_multiple_files_file_statistics_sender.log || die "no StatisticsSenderService output for file d in multiple files" 1 +grep -q '"file_lfn":"file:stat_sender_e.root"' test_multiple_files_file_statistics_sender.log || die "no StatisticsSenderService output for file e in multiple files" 1 +rm test_multiple_files_file_statistics_sender.log + +cmsRun ${LOCAL_TEST_DIR}/test_multi_file_statistics_sender_cfg.py &> test_multi_file_statistics_sender.log || die "cmsRun test_multi_file_statistics_sender_cfg.py" $? +grep -q '"file_lfn":"file:stat_sender_first.root"' test_multi_file_statistics_sender.log || die "no StatisticsSenderService output for file first in multi file" 1 +grep -q '"file_lfn":"file:stat_sender_second.root"' test_multi_file_statistics_sender.log || die "no StatisticsSenderService output for file second in multi file" 1 +rm test_multi_file_statistics_sender.log + +cmsRun ${LOCAL_TEST_DIR}/test_secondary_file_statistics_sender_cfg.py &> test_secondary_file_statistics_sender.log || die "cmsRun test_secondary_file_statistics_sender_cfg.py" $? +grep -q '"file_lfn":"file:stat_sender_first.root"' test_secondary_file_statistics_sender.log || die "no StatisticsSenderService output for file 'first' in secondary files" 1 +grep -q '"file_lfn":"file:stat_sender_b.root"' test_secondary_file_statistics_sender.log || die "no StatisticsSenderService output for file 'b' in secondary files" 1 +grep -q '"file_lfn":"file:stat_sender_c.root"' test_secondary_file_statistics_sender.log || die "no StatisticsSenderService output for file 'c' in secondary files" 1 +grep -q '"file_lfn":"file:stat_sender_d.root"' test_secondary_file_statistics_sender.log || die "no StatisticsSenderService output for file 'd' in secondary files" 1 +grep -q '"file_lfn":"file:stat_sender_e.root"' test_secondary_file_statistics_sender.log || die "no StatisticsSenderService output for file 'e' in secondary files" 1 +rm test_secondary_file_statistics_sender.log + +popd \ No newline at end of file diff --git a/Utilities/StorageFactory/test/test_multi_file_statistics_sender_cfg.py b/Utilities/StorageFactory/test/test_multi_file_statistics_sender_cfg.py new file mode 100644 index 0000000000000..19815adc72c35 --- /dev/null +++ b/Utilities/StorageFactory/test/test_multi_file_statistics_sender_cfg.py @@ -0,0 +1,10 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +process.source = cms.Source("PoolSource", + fileNames = cms.untracked.vstring("file:stat_sender_second.root"), + secondaryFileNames = cms.untracked.vstring("file:stat_sender_first.root") +) + +process.add_(cms.Service("StatisticsSenderService", debug = cms.untracked.bool(True))) \ No newline at end of file diff --git a/Utilities/StorageFactory/test/test_multiple_files_file_statistics_sender_cfg.py b/Utilities/StorageFactory/test/test_multiple_files_file_statistics_sender_cfg.py new file mode 100644 index 0000000000000..a734285bd2487 --- /dev/null +++ b/Utilities/StorageFactory/test/test_multiple_files_file_statistics_sender_cfg.py @@ -0,0 +1,10 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring("file:stat_sender_b.root", "file:stat_sender_c.root", "file:stat_sender_d.root", "file:stat_sender_e.root"), +duplicateCheckMode = cms.untracked.string('noDuplicateCheck'), +inputCommands = cms.untracked.vstring('drop *_*_beginRun_*', 'drop *_*_endRun_*', 'drop *_*_beginLumi_*', 'drop *_*_endLumi_*') +) + +process.add_(cms.Service("StatisticsSenderService", debug = cms.untracked.bool(True))) \ No newline at end of file diff --git a/Utilities/StorageFactory/test/test_secondary_file_statistics_sender_cfg.py b/Utilities/StorageFactory/test/test_secondary_file_statistics_sender_cfg.py new file mode 100644 index 0000000000000..250d575c7f39e --- /dev/null +++ b/Utilities/StorageFactory/test/test_secondary_file_statistics_sender_cfg.py @@ -0,0 +1,44 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring("file:stat_sender_first.root")) + +process.b = cms.EDProducer("SecondaryProducer", + seq = cms.untracked.bool(True), + input = cms.SecSource("EmbeddedRootSource", + sequential = cms.untracked.bool(True), + fileNames = cms.untracked.vstring('file:stat_sender_b.root') + ) +) + +process.c = cms.EDProducer("SecondaryProducer", + seq = cms.untracked.bool(True), + input = cms.SecSource("EmbeddedRootSource", + sequential = cms.untracked.bool(True), + fileNames = cms.untracked.vstring('file:stat_sender_c.root') + ) +) + +process.d = cms.EDProducer("SecondaryProducer", + seq = cms.untracked.bool(True), + input = cms.SecSource("EmbeddedRootSource", + sequential = cms.untracked.bool(True), + fileNames = cms.untracked.vstring('file:stat_sender_d.root') + ) +) + +process.e = cms.EDProducer("SecondaryProducer", + seq = cms.untracked.bool(True), + input = cms.SecSource("EmbeddedRootSource", + sequential = cms.untracked.bool(True), + fileNames = cms.untracked.vstring('file:stat_sender_e.root') + ) +) + +process.pB = cms.Path(process.b) +process.pC = cms.Path(process.c) +process.pD = cms.Path(process.d) +process.pE = cms.Path(process.e) + +process.add_(cms.Service("StatisticsSenderService", debug = cms.untracked.bool(True))) \ No newline at end of file diff --git a/Utilities/StorageFactory/test/test_single_file_statistics_sender_cfg.py b/Utilities/StorageFactory/test/test_single_file_statistics_sender_cfg.py new file mode 100644 index 0000000000000..0576542a5e934 --- /dev/null +++ b/Utilities/StorageFactory/test/test_single_file_statistics_sender_cfg.py @@ -0,0 +1,11 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring("file:stat_sender_first.root")) + +process.add_(cms.Service("StatisticsSenderService", debug = cms.untracked.bool(True))) + +process.load("FWCore.MessageService.MessageLogger_cfi") + +process.MessageLogger.cerr.INFO.limit = 1000 \ No newline at end of file From e96e62204e62f2d27dc38c4248a04ff708816f26 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Mon, 18 Oct 2021 13:19:14 -0500 Subject: [PATCH 7/8] Pass InputType to StatisticsSenderService Now broadcasts how the file is used. --- IOPool/Input/src/RootInputFileSequence.cc | 2 +- .../interface/StatisticsSenderService.h | 15 +++++++-- .../src/StatisticsSenderService.cc | 31 ++++++++++++++++--- 3 files changed, 41 insertions(+), 7 deletions(-) diff --git a/IOPool/Input/src/RootInputFileSequence.cc b/IOPool/Input/src/RootInputFileSequence.cc index 2666db1602afd..6bba8eb2a2fd9 100644 --- a/IOPool/Input/src/RootInputFileSequence.cc +++ b/IOPool/Input/src/RootInputFileSequence.cc @@ -231,7 +231,7 @@ namespace edm { input ? std::make_unique(*input, lfn_, false) : nullptr); edm::Service service; if (service.isAvailable()) { - service->openingFile(lfn(), -1); + service->openingFile(lfn(), inputType, -1); } try { std::unique_ptr name(gSystem->ExpandPathName(fileName().c_str())); diff --git a/Utilities/StorageFactory/interface/StatisticsSenderService.h b/Utilities/StorageFactory/interface/StatisticsSenderService.h index 61bfd55517136..25d715e035abb 100644 --- a/Utilities/StorageFactory/interface/StatisticsSenderService.h +++ b/Utilities/StorageFactory/interface/StatisticsSenderService.h @@ -7,6 +7,7 @@ #include #include #include +#include "FWCore/Utilities/interface/InputType.h" namespace edm { @@ -24,7 +25,7 @@ namespace edm { static const char* getJobID(); static bool getX509Subject(std::string&); - void openingFile(std::string const& lfn, size_t size = -1); + void openingFile(std::string const& lfn, edm::InputType type, size_t size = -1); void closedFile(std::string const& lfn, bool usedFallback); private: @@ -50,10 +51,20 @@ namespace edm { }; struct FileInfo { - explicit FileInfo(std::string const& iLFN); + explicit FileInfo(std::string const& iLFN, edm::InputType); + + FileInfo(FileInfo&& iInfo) + : m_filelfn(std::move(iInfo.m_filelfn)), + m_serverhost(std::move(iInfo.m_serverhost)), + m_serverdomain(std::move(iInfo.m_serverdomain)), + m_type(iInfo.m_type), + m_size(iInfo.m_size.load()), + m_id(iInfo.m_id), + m_openCount(iInfo.m_openCount.load()) {} std::string m_filelfn; std::string m_serverhost; std::string m_serverdomain; + edm::InputType m_type; std::atomic m_size; size_t m_id; //from m_counter std::atomic m_openCount; diff --git a/Utilities/StorageFactory/src/StatisticsSenderService.cc b/Utilities/StorageFactory/src/StatisticsSenderService.cc index eb5401431d815..febcaf98f31a1 100644 --- a/Utilities/StorageFactory/src/StatisticsSenderService.cc +++ b/Utilities/StorageFactory/src/StatisticsSenderService.cc @@ -156,8 +156,14 @@ void StatisticsSenderService::FileStatistics::update() { m_read_vector_bytes = read_vector_bytes; m_start_time = time(nullptr); } -StatisticsSenderService::FileInfo::FileInfo(std::string const &iLFN) - : m_filelfn(iLFN), m_serverhost("unknown"), m_serverdomain("unknown"), m_size(-1), m_id(0), m_openCount(1) {} +StatisticsSenderService::FileInfo::FileInfo(std::string const &iLFN, edm::InputType iType) + : m_filelfn(iLFN), + m_serverhost("unknown"), + m_serverdomain("unknown"), + m_type(iType), + m_size(-1), + m_id(0), + m_openCount(1) {} StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const &iPSet, edm::ActivityRegistry &ar) : m_clienthost("unknown"), @@ -237,9 +243,9 @@ void StatisticsSenderService::setCurrentServer(const std::string &url, const std } } -void StatisticsSenderService::openingFile(std::string const &lfn, size_t size) { +void StatisticsSenderService::openingFile(std::string const &lfn, edm::InputType type, size_t size) { m_urlToLfn.emplace(lfn, lfn); - auto attempt = m_lfnToFileInfo.emplace(lfn, lfn); + auto attempt = m_lfnToFileInfo.emplace(lfn, FileInfo{lfn, type}); if (attempt.second) { attempt.first->second.m_size = size; attempt.first->second.m_id = m_counter++; @@ -377,6 +383,23 @@ void StatisticsSenderService::fillUDP(const std::string &siteName, } if (usedFallback) { os << "\"fallback\": true, "; + } else { + os << "\"fallback\": false, "; + } + os << "\"type\": "; + switch (fileinfo.m_type) { + case edm::InputType::Primary: { + os << "\"primary\", "; + break; + } + case edm::InputType::SecondaryFile: { + os << "\"secondary\", "; + break; + } + case edm::InputType::SecondarySource: { + os << "\"embedded\", "; + break; + } } auto serverhost = fileinfo.m_serverhost; auto serverdomain = fileinfo.m_serverdomain; From ed0609fd63b7750d9a0ee27bf4e03434ecebf5db Mon Sep 17 00:00:00 2001 From: Matti Kortelainen Date: Tue, 7 Dec 2021 21:49:00 +0100 Subject: [PATCH 8/8] Silence file closuring printouts in StatisticsSenderService by default --- .../StorageFactory/src/StatisticsSenderService.cc | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Utilities/StorageFactory/src/StatisticsSenderService.cc b/Utilities/StorageFactory/src/StatisticsSenderService.cc index febcaf98f31a1..0e96624fe1ef0 100644 --- a/Utilities/StorageFactory/src/StatisticsSenderService.cc +++ b/Utilities/StorageFactory/src/StatisticsSenderService.cc @@ -297,10 +297,12 @@ void StatisticsSenderService::closedFile(std::string const &url, bool usedFallba } auto c = --found->second.m_openCount; - if (c == 0) { - edm::LogWarning("StatisticsSenderService") << "fully closed: " << *lfn << "\n"; - } else { - edm::LogWarning("StatisticsSenderService") << "partially closed: " << *lfn << "\n"; + if (m_debug) { + if (c == 0) { + edm::LogWarning("StatisticsSenderService") << "fully closed: " << *lfn << "\n"; + } else { + edm::LogWarning("StatisticsSenderService") << "partially closed: " << *lfn << "\n"; + } } } else if (m_debug) { edm::LogWarning("StatisticsSenderService") << "closed: unknown url name " << url << "\n";