From 32bdfa1da51be0c55b4b108c389dd3f64bf61f7d Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 30 Sep 2021 11:18:22 -0500 Subject: [PATCH 1/6] Report only once when trying to open file Previously, each try to open the file using a different PFN would report an open attempt for the same LFN. This meant we could have multiple opens but only one close for a given LFN. --- IOPool/Input/src/RootInputFileSequence.cc | 45 ++++++++++++----------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/IOPool/Input/src/RootInputFileSequence.cc b/IOPool/Input/src/RootInputFileSequence.cc index 616a67944b0ac..3d7354481e7ce 100644 --- a/IOPool/Input/src/RootInputFileSequence.cc +++ b/IOPool/Input/src/RootInputFileSequence.cc @@ -240,31 +240,32 @@ namespace edm { //this tries to open the file using multiple PFNs corresponding to different data catalogs std::list exInfo; - for (std::vector::const_iterator it = fNames.begin(); it != fNames.end(); ++it) { - try { - std::unique_ptr sentry( - input ? std::make_unique(*input, lfn_, false) : nullptr); - std::unique_ptr name(gSystem->ExpandPathName(it->c_str())); - filePtr = std::make_shared(name.get(), " Initiating request to open file ", inputType); - break; - } catch (cms::Exception const& e) { - if (!skipBadFiles && std::next(it) == fNames.end()) { - InputFile::reportSkippedFile((*it), logicalFileName()); - Exception ex(errors::FileOpenError, "", e); - ex.addContext("Calling RootInputFileSequence::initTheFile()"); - std::ostringstream out; - out << "Input file " << (*it) << " could not be opened."; - ex.addAdditionalInfo(out.str()); - //report previous exceptions when use other names to open file - for (auto const& s : exInfo) - ex.addAdditionalInfo(s); - throw ex; - } else { - exInfo.push_back("Calling RootInputFileSequence::initTheFile(): fail to open the file with name " + (*it)); + { + std::unique_ptr sentry( + input ? std::make_unique(*input, lfn_, false) : nullptr); + for (std::vector::const_iterator it = fNames.begin(); it != fNames.end(); ++it) { + try { + std::unique_ptr name(gSystem->ExpandPathName(it->c_str())); + filePtr = std::make_shared(name.get(), " Initiating request to open file ", inputType); + break; + } catch (cms::Exception const& e) { + if (!skipBadFiles && std::next(it) == fNames.end()) { + InputFile::reportSkippedFile((*it), logicalFileName()); + Exception ex(errors::FileOpenError, "", e); + ex.addContext("Calling RootInputFileSequence::initTheFile()"); + std::ostringstream out; + out << "Input file " << (*it) << " could not be opened."; + ex.addAdditionalInfo(out.str()); + //report previous exceptions when use other names to open file + for (auto const& s : exInfo) + ex.addAdditionalInfo(s); + throw ex; + } else { + exInfo.push_back("Calling RootInputFileSequence::initTheFile(): fail to open the file with name " + (*it)); + } } } } - if (filePtr) { size_t currentIndexIntoFile = fileIter_ - fileIterBegin_; rootFile_ = makeRootFile(filePtr); From bac09f47e3553d31ecdc282ed7813a3062ae3e47 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 30 Sep 2021 11:25:22 -0500 Subject: [PATCH 2/6] StatisticsSenderService now requires LFN or URL When sending information to the StatisticsSenderService, the file LFN or URL must be supplied. --- .../TFileAdaptor/src/TStorageFactoryFile.cc | 2 +- .../interface/StatisticsSenderService.h | 43 +++-- .../src/StatisticsSenderService.cc | 182 ++++++++++++++---- Utilities/XrdAdaptor/src/XrdRequestManager.cc | 2 +- 4 files changed, 177 insertions(+), 52 deletions(-) diff --git a/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc b/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc index 5a8622d75d755..405453c42b876 100644 --- a/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc +++ b/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc @@ -195,7 +195,7 @@ void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = "" try { edm::Service statsService; if (statsService.isAvailable()) { - statsService->setSize(storage_->size()); + statsService->setSize(path, storage_->size()); } } catch (edm::Exception const &e) { if (e.categoryCode() != edm::errors::NotFound) { diff --git a/Utilities/StorageFactory/interface/StatisticsSenderService.h b/Utilities/StorageFactory/interface/StatisticsSenderService.h index 24651dcc9f9c2..940defc861283 100644 --- a/Utilities/StorageFactory/interface/StatisticsSenderService.h +++ b/Utilities/StorageFactory/interface/StatisticsSenderService.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace edm { @@ -16,19 +17,24 @@ namespace edm { class StatisticsSenderService { public: - StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar); + StatisticsSenderService(edm::ParameterSet const& pset, edm::ActivityRegistry& ar); - void setSize(size_t size); - void setCurrentServer(const std::string &servername); - void filePreCloseEvent(std::string const &lfn, bool usedFallback); - static const char *getJobID(); - static bool getX509Subject(std::string &); + void setSize(const std::string& urlOrLfn, size_t size); + void setCurrentServer(const std::string& urlOrLfn, const std::string& servername); + static const char* getJobID(); + static bool getX509Subject(std::string&); + + void openingFile(std::string const& lfn, size_t size = -1); + void closedFile(std::string const& lfn, bool usedFallback); private: + void filePreCloseEvent(std::string const& lfn, bool usedFallback); + + std::string const* matchedLfn(std::string const& iURL); //updates its internal cache class FileStatistics { public: FileStatistics(); - void fillUDP(std::ostringstream &os); + void fillUDP(std::ostringstream& os); private: ssize_t m_read_single_operations; @@ -42,19 +48,30 @@ namespace edm { time_t m_start_time; }; - void determineHostnames(void); - void fillUDP(const std::string &, bool, std::string &); + struct FileInfo { + explicit FileInfo(std::string const& iLFN); + std::string m_filelfn; + std::string m_serverhost; + std::string m_serverdomain; + ssize_t m_size; + size_t m_id; //from m_counter + std::atomic m_openCount; + }; + + void determineHostnames(); + void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&); + void cleanupOldFiles(); + std::string m_clienthost; std::string m_clientdomain; - std::string m_serverhost; - std::string m_serverdomain; - std::string m_filelfn; + tbb::concurrent_unordered_map m_lfnToFileInfo; + tbb::concurrent_unordered_map m_urlToLfn; FileStatistics m_filestats; std::string m_guid; size_t m_counter; - std::atomic m_size; std::string m_userdn; std::mutex m_servermutex; + const bool m_debug; }; } // namespace storage diff --git a/Utilities/StorageFactory/src/StatisticsSenderService.cc b/Utilities/StorageFactory/src/StatisticsSenderService.cc index d68f83bbd0576..d40162d89b514 100644 --- a/Utilities/StorageFactory/src/StatisticsSenderService.cc +++ b/Utilities/StorageFactory/src/StatisticsSenderService.cc @@ -4,6 +4,7 @@ #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/Catalog/interface/SiteLocalConfig.h" #include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/Utilities/interface/Guid.h" #include @@ -124,19 +125,20 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { os << "\"end_time\":" << m_start_time; } -StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const & /*pset*/, edm::ActivityRegistry &ar) +StatisticsSenderService::FileInfo::FileInfo(std::string const &iLFN) + : m_filelfn(iLFN), m_serverhost("unknown"), m_serverdomain("unknown"), m_size(-1), m_id(0), m_openCount(1) {} + +StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const &iPSet, edm::ActivityRegistry &ar) : m_clienthost("unknown"), m_clientdomain("unknown"), - m_serverhost("unknown"), - m_serverdomain("unknown"), - m_filelfn("unknown"), m_filestats(), m_guid(Guid().toString()), m_counter(0), - m_size(-1), - m_userdn("unknown") { + m_userdn("unknown"), + m_debug(iPSet.getUntrackedParameter("debug", false)) { determineHostnames(); ar.watchPreCloseFile(this, &StatisticsSenderService::filePreCloseEvent); + ar.watchPreOpenFile([this](auto const &iLFN, bool) { openingFile(iLFN, -1); }); if (!getX509Subject(m_userdn)) { m_userdn = "unknown"; } @@ -148,7 +150,35 @@ const char *StatisticsSenderService::getJobID() { return id ? id : std::getenv(JOB_UNIQUE_ID_ENV_V2); } -void StatisticsSenderService::setCurrentServer(const std::string &servername) { +std::string const *StatisticsSenderService::matchedLfn(std::string const &iURL) { + auto found = m_urlToLfn.find(iURL); + if (found != m_urlToLfn.end()) { + return &found->second; + } + for (auto const &v : m_lfnToFileInfo) { + if (v.first.size() < iURL.size()) { + if (v.first == iURL.substr(iURL.size() - v.first.size())) { + m_urlToLfn.emplace(iURL, v.first); + return &m_urlToLfn.find(iURL)->second; + } + } + } + //does the lfn have a protocol and the iURL not? + if (std::string::npos == iURL.find(':')) { + for (auto const &v : m_lfnToFileInfo) { + if ((std::string::npos != v.first.find(':')) and (v.first.size() > iURL.size())) { + if (iURL == v.first.substr(v.first.size() - iURL.size())) { + m_urlToLfn.emplace(iURL, v.first); + return &m_urlToLfn.find(iURL)->second; + } + } + } + } + + return nullptr; +} + +void StatisticsSenderService::setCurrentServer(const std::string &url, const std::string &servername) { size_t dot_pos = servername.find('.'); std::string serverhost; std::string serverdomain; @@ -163,24 +193,41 @@ void StatisticsSenderService::setCurrentServer(const std::string &servername) { } } { + auto lfn = matchedLfn(url); std::lock_guard sentry(m_servermutex); - m_serverhost = std::move(serverhost); - m_serverdomain = std::move(serverdomain); + if (nullptr != lfn) { + auto found = m_lfnToFileInfo.find(*lfn); + if (found != m_lfnToFileInfo.end()) { + found->second.m_serverhost = std::move(serverhost); + found->second.m_serverdomain = std::move(serverdomain); + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "setCurrentServer: unknown url name " << url << "\n"; + } } } -void StatisticsSenderService::setSize(size_t size) { m_size = size; } - -void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool usedFallback) { - m_filelfn = lfn; +void StatisticsSenderService::openingFile(std::string const &lfn, size_t size) { + m_urlToLfn.emplace(lfn, lfn); + auto attempt = m_lfnToFileInfo.emplace(lfn, lfn); + if (attempt.second) { + attempt.first->second.m_size = size; + attempt.first->second.m_id = m_counter++; + edm::LogInfo("StatisticsSenderService") << "openingFile: opening " << lfn << "\n"; + } else { + ++(attempt.first->second.m_openCount); + edm::LogInfo("StatisticsSenderService") << "openingFile: re-opening" << lfn << "\n"; + } +} +void StatisticsSenderService::closedFile(std::string const &url, bool usedFallback) { edm::Service pSLC; if (!pSLC.isAvailable()) { return; } const struct addrinfo *addresses = pSLC->statisticsDestination(); - if (!addresses) { + if (!addresses and !m_debug) { return; } @@ -190,22 +237,85 @@ void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool use m_userdn = "not reported"; } - std::string results; - fillUDP(pSLC->siteName(), usedFallback, results); + auto lfn = matchedLfn(url); + if (nullptr != lfn) { + auto found = m_lfnToFileInfo.find(*lfn); + assert(found != m_lfnToFileInfo.end()); - for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) { - int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); - if (sock < 0) { - continue; + std::string results; + fillUDP(pSLC->siteName(), found->second, usedFallback, results); + if (m_debug) { + edm::LogSystem("StatisticSenderService") << "\n" << results << "\n"; } - auto close_del = [](int *iSocket) { close(*iSocket); }; - std::unique_ptr guard(&sock, close_del); - if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) { - break; + + for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) { + int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); + if (sock < 0) { + continue; + } + auto close_del = [](int *iSocket) { close(*iSocket); }; + std::unique_ptr guard(&sock, close_del); + if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) { + break; + } + } + + auto c = --found->second.m_openCount; + if (c == 0) { + edm::LogWarning("StatisticsSenderService") << "fully closed: " << *lfn << "\n"; + } else { + edm::LogWarning("StatisticsSenderService") << "partially closed: " << *lfn << "\n"; + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "closed: unknown url name " << url << "\n"; + } +} + +void StatisticsSenderService::cleanupOldFiles() { + //remove entries with openCount of 0 + bool moreToTest = false; + do { + moreToTest = false; + for (auto it = m_lfnToFileInfo.begin(); it != m_lfnToFileInfo.end(); ++it) { + if (it->second.m_openCount == 0) { + auto lfn = it->first; + bool moreToTest2 = false; + do { + moreToTest2 = false; + for (auto it2 = m_urlToLfn.begin(); it2 != m_urlToLfn.end(); ++it2) { + if (it2->second == lfn) { + m_urlToLfn.unsafe_erase(it2); + moreToTest2 = true; + break; + } + } + } while (moreToTest2); + + m_lfnToFileInfo.unsafe_erase(it); + moreToTest = true; + break; + } } + } while (moreToTest); +} + +void StatisticsSenderService::setSize(const std::string &url, size_t size) { + auto lfn = matchedLfn(url); + if (nullptr != lfn) { + auto itFound = m_lfnToFileInfo.find(*lfn); + if (itFound != m_lfnToFileInfo.end()) { + //do I need to synchronize? + itFound->second.m_size = size; + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "setSize: unknown url name " << url << "\n"; } +} - m_counter++; +void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool usedFallback) { + closedFile(lfn, usedFallback); + //we are at a sync point in the framwework so no new files are being opened + cleanupOldFiles(); } void StatisticsSenderService::determineHostnames(void) { @@ -225,7 +335,10 @@ void StatisticsSenderService::determineHostnames(void) { } } -void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFallback, std::string &udpinfo) { +void StatisticsSenderService::fillUDP(const std::string &siteName, + const FileInfo &fileinfo, + bool usedFallback, + std::string &udpinfo) { std::ostringstream os; // Header - same for all IO accesses @@ -236,21 +349,16 @@ void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFall if (usedFallback) { os << "\"fallback\": true, "; } - std::string serverhost; - std::string serverdomain; - { - std::lock_guard sentry(m_servermutex); - serverhost = m_serverhost; - serverdomain = m_serverdomain; - } + auto serverhost = fileinfo.m_serverhost; + auto serverdomain = fileinfo.m_serverdomain; os << "\"user_dn\":\"" << m_userdn << "\", "; os << "\"client_host\":\"" << m_clienthost << "\", "; os << "\"client_domain\":\"" << m_clientdomain << "\", "; os << "\"server_host\":\"" << serverhost << "\", "; os << "\"server_domain\":\"" << serverdomain << "\", "; - os << "\"unique_id\":\"" << m_guid << "-" << m_counter << "\", "; - os << "\"file_lfn\":\"" << m_filelfn << "\", "; + os << "\"unique_id\":\"" << m_guid << "-" << fileinfo.m_id << "\", "; + os << "\"file_lfn\":\"" << fileinfo.m_filelfn << "\", "; // Dashboard devs requested that we send out no app_info if a job ID // is not present in the environment. const char *jobId = getJobID(); @@ -258,8 +366,8 @@ void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFall os << "\"app_info\":\"" << jobId << "\", "; } - if (m_size >= 0) { - os << "\"file_size\":" << m_size << ", "; + if (fileinfo.m_size >= 0) { + os << "\"file_size\":" << fileinfo.m_size << ", "; } m_filestats.fillUDP(os); diff --git a/Utilities/XrdAdaptor/src/XrdRequestManager.cc b/Utilities/XrdAdaptor/src/XrdRequestManager.cc index 961d9787a0201..a992fbc8f4736 100644 --- a/Utilities/XrdAdaptor/src/XrdRequestManager.cc +++ b/Utilities/XrdAdaptor/src/XrdRequestManager.cc @@ -250,7 +250,7 @@ void RequestManager::updateCurrentServer() { std::unique_ptr hostname(hostname_ptr); edm::Service statsService; if (statsService.isAvailable()) { - statsService->setCurrentServer(*hostname_ptr); + statsService->setCurrentServer(m_name, *hostname_ptr); } } } From 327eb2587bd00c813189659d19581f549855d6f8 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 30 Sep 2021 16:03:08 -0500 Subject: [PATCH 3/6] Report all framework files to StatisticsSenderService Send statistics for primary, secondary, and embedded files. The aggregate file statistics are only reset on primary file close boundaries to keep the behavior the same as previous. Changed all calls to closeFile_() to be the new closeFile() --- IOPool/Input/src/EmbeddedRootSource.cc | 2 +- IOPool/Input/src/PoolSource.cc | 2 +- IOPool/Input/src/RootEmbeddedFileSequence.cc | 2 +- IOPool/Input/src/RootEmbeddedFileSequence.h | 2 +- IOPool/Input/src/RootInputFileSequence.cc | 16 +++- IOPool/Input/src/RootInputFileSequence.h | 2 + IOPool/Input/src/RootPrimaryFileSequence.cc | 4 +- IOPool/Input/src/RootPrimaryFileSequence.h | 2 +- IOPool/Input/src/RootSecondaryFileSequence.cc | 2 +- IOPool/Input/src/RootSecondaryFileSequence.h | 2 +- .../interface/StatisticsSenderService.h | 9 ++- .../src/StatisticsSenderService.cc | 77 +++++++++++++------ 12 files changed, 84 insertions(+), 38 deletions(-) diff --git a/IOPool/Input/src/EmbeddedRootSource.cc b/IOPool/Input/src/EmbeddedRootSource.cc index bc9bf251666f1..f71854d29ad2f 100644 --- a/IOPool/Input/src/EmbeddedRootSource.cc +++ b/IOPool/Input/src/EmbeddedRootSource.cc @@ -45,7 +45,7 @@ namespace edm { InputFile::reportReadBranches(); } - void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile_(); } + void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile(); } bool EmbeddedRootSource::readOneEvent(EventPrincipal& cache, size_t& fileNameHash, diff --git a/IOPool/Input/src/PoolSource.cc b/IOPool/Input/src/PoolSource.cc index 033c0bede04dd..380cbb1251244 100644 --- a/IOPool/Input/src/PoolSource.cc +++ b/IOPool/Input/src/PoolSource.cc @@ -162,7 +162,7 @@ namespace edm { return fb; } - void PoolSource::closeFile_() { primaryFileSequence_->closeFile_(); } + void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); } std::shared_ptr PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); } diff --git a/IOPool/Input/src/RootEmbeddedFileSequence.cc b/IOPool/Input/src/RootEmbeddedFileSequence.cc index 27b57de0c36a5..afc96e4954eb4 100644 --- a/IOPool/Input/src/RootEmbeddedFileSequence.cc +++ b/IOPool/Input/src/RootEmbeddedFileSequence.cc @@ -121,7 +121,7 @@ namespace edm { RootEmbeddedFileSequence::~RootEmbeddedFileSequence() {} - void RootEmbeddedFileSequence::endJob() { closeFile_(); } + void RootEmbeddedFileSequence::endJob() { closeFile(); } void RootEmbeddedFileSequence::closeFile_() { // delete the RootFile object. diff --git a/IOPool/Input/src/RootEmbeddedFileSequence.h b/IOPool/Input/src/RootEmbeddedFileSequence.h index 39bb3bb6e8c1b..168e987b2778d 100644 --- a/IOPool/Input/src/RootEmbeddedFileSequence.h +++ b/IOPool/Input/src/RootEmbeddedFileSequence.h @@ -39,7 +39,6 @@ namespace edm { RootEmbeddedFileSequence(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving RootEmbeddedFileSequence& operator=(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving - void closeFile_() override; void endJob(); void skipEntries(unsigned int offset); bool readOneEvent( @@ -56,6 +55,7 @@ namespace edm { static void fillDescription(ParameterSetDescription& desc); private: + void closeFile_() override; void initFile_(bool skipBadFiles) override; RootFileSharedPtr makeRootFile(std::shared_ptr filePtr) override; diff --git a/IOPool/Input/src/RootInputFileSequence.cc b/IOPool/Input/src/RootInputFileSequence.cc index 3d7354481e7ce..152e8cf0c695d 100644 --- a/IOPool/Input/src/RootInputFileSequence.cc +++ b/IOPool/Input/src/RootInputFileSequence.cc @@ -10,6 +10,8 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" #include "Utilities/StorageFactory/interface/StorageFactory.h" +#include "Utilities/StorageFactory/interface/StatisticsSenderService.h" +#include "FWCore/ServiceRegistry/interface/Service.h" #include "TSystem.h" @@ -208,7 +210,7 @@ namespace edm { } fileIterLastOpened_ = fileIterEnd_; } - closeFile_(); + closeFile(); if (noMoreFiles()) { // No files specified @@ -243,6 +245,10 @@ namespace edm { { std::unique_ptr sentry( input ? std::make_unique(*input, lfn_, false) : nullptr); + edm::Service service; + if (service.isAvailable()) { + service->openingFile(lfn(), -1); + } for (std::vector::const_iterator it = fNames.begin(); it != fNames.end(); ++it) { try { std::unique_ptr name(gSystem->ExpandPathName(it->c_str())); @@ -288,6 +294,14 @@ namespace edm { } } + void RootInputFileSequence::closeFile() { + edm::Service service; + if (rootFile() and service.isAvailable()) { + service->closedFile(lfn(), usedFallback()); + } + closeFile_(); + } + void RootInputFileSequence::setIndexIntoFile(size_t index) { indexesIntoFiles_[index] = rootFile()->indexIntoFileSharedPtr(); } diff --git a/IOPool/Input/src/RootInputFileSequence.h b/IOPool/Input/src/RootInputFileSequence.h index 3fc18af5d0a21..42eb2c9aa27e4 100644 --- a/IOPool/Input/src/RootInputFileSequence.h +++ b/IOPool/Input/src/RootInputFileSequence.h @@ -54,6 +54,8 @@ namespace edm { std::shared_ptr fileProductRegistry() const; std::shared_ptr fileBranchIDListHelper() const; + void closeFile(); + protected: typedef std::shared_ptr RootFileSharedPtr; void initFile(bool skipBadFiles) { initFile_(skipBadFiles); } diff --git a/IOPool/Input/src/RootPrimaryFileSequence.cc b/IOPool/Input/src/RootPrimaryFileSequence.cc index e502314f0569b..25b647801ec71 100644 --- a/IOPool/Input/src/RootPrimaryFileSequence.cc +++ b/IOPool/Input/src/RootPrimaryFileSequence.cc @@ -69,7 +69,7 @@ namespace edm { RootPrimaryFileSequence::~RootPrimaryFileSequence() {} - void RootPrimaryFileSequence::endJob() { closeFile_(); } + void RootPrimaryFileSequence::endJob() { closeFile(); } std::shared_ptr RootPrimaryFileSequence::readFile_() { std::shared_ptr fileBlock; @@ -246,7 +246,7 @@ namespace edm { // Rewind to before the first event that was read. void RootPrimaryFileSequence::rewind_() { if (!atFirstFile()) { - closeFile_(); + closeFile(); setAtFirstFile(); } if (!rootFile()) { diff --git a/IOPool/Input/src/RootPrimaryFileSequence.h b/IOPool/Input/src/RootPrimaryFileSequence.h index 339c22561b962..81b714c29fed5 100644 --- a/IOPool/Input/src/RootPrimaryFileSequence.h +++ b/IOPool/Input/src/RootPrimaryFileSequence.h @@ -40,7 +40,6 @@ namespace edm { RootPrimaryFileSequence& operator=(RootPrimaryFileSequence const&) = delete; // Disallow copying and moving std::shared_ptr readFile_(); - void closeFile_() override; void endJob(); InputSource::ItemType getNextItemType(RunNumber_t& run, LuminosityBlockNumber_t& lumi, EventNumber_t& event); void skipEventsAtBeginning(int offset); @@ -57,6 +56,7 @@ namespace edm { bool nextFile(); bool previousFile(); void rewindFile(); + void closeFile_() override; int remainingEvents() const; int remainingLuminosityBlocks() const; diff --git a/IOPool/Input/src/RootSecondaryFileSequence.cc b/IOPool/Input/src/RootSecondaryFileSequence.cc index fc802d18e0c52..22dc51f3d1aea 100644 --- a/IOPool/Input/src/RootSecondaryFileSequence.cc +++ b/IOPool/Input/src/RootSecondaryFileSequence.cc @@ -52,7 +52,7 @@ namespace edm { RootSecondaryFileSequence::~RootSecondaryFileSequence() {} - void RootSecondaryFileSequence::endJob() { closeFile_(); } + void RootSecondaryFileSequence::endJob() { closeFile(); } void RootSecondaryFileSequence::closeFile_() { // close the currently open file, if any, and delete the RootFile object. diff --git a/IOPool/Input/src/RootSecondaryFileSequence.h b/IOPool/Input/src/RootSecondaryFileSequence.h index 6ff9be7c5f884..5ad6e3516f4a5 100644 --- a/IOPool/Input/src/RootSecondaryFileSequence.h +++ b/IOPool/Input/src/RootSecondaryFileSequence.h @@ -33,11 +33,11 @@ namespace edm { RootSecondaryFileSequence(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving RootSecondaryFileSequence& operator=(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving - void closeFile_() override; void endJob(); void initAssociationsFromSecondary(std::set const&); private: + void closeFile_() override; void initFile_(bool skipBadFiles) override; RootFileSharedPtr makeRootFile(std::shared_ptr filePtr) override; diff --git a/Utilities/StorageFactory/interface/StatisticsSenderService.h b/Utilities/StorageFactory/interface/StatisticsSenderService.h index 940defc861283..61bfd55517136 100644 --- a/Utilities/StorageFactory/interface/StatisticsSenderService.h +++ b/Utilities/StorageFactory/interface/StatisticsSenderService.h @@ -28,13 +28,14 @@ namespace edm { void closedFile(std::string const& lfn, bool usedFallback); private: - void filePreCloseEvent(std::string const& lfn, bool usedFallback); + void filePostCloseEvent(std::string const& lfn, bool usedFallback); std::string const* matchedLfn(std::string const& iURL); //updates its internal cache class FileStatistics { public: FileStatistics(); - void fillUDP(std::ostringstream& os); + void fillUDP(std::ostringstream& os) const; + void update(); private: ssize_t m_read_single_operations; @@ -53,13 +54,13 @@ namespace edm { std::string m_filelfn; std::string m_serverhost; std::string m_serverdomain; - ssize_t m_size; + std::atomic m_size; size_t m_id; //from m_counter std::atomic m_openCount; }; void determineHostnames(); - void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&); + void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&) const; void cleanupOldFiles(); std::string m_clienthost; diff --git a/Utilities/StorageFactory/src/StatisticsSenderService.cc b/Utilities/StorageFactory/src/StatisticsSenderService.cc index d40162d89b514..32fb96fb70068 100644 --- a/Utilities/StorageFactory/src/StatisticsSenderService.cc +++ b/Utilities/StorageFactory/src/StatisticsSenderService.cc @@ -16,11 +16,7 @@ #include #include -#define UPDATE_STATISTIC(x) m_##x = x; - -#define UPDATE_AND_OUTPUT_STATISTIC(x) \ - os << "\"" #x "\":" << (x - m_##x) << ", "; \ - UPDATE_STATISTIC(x) +#define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", "; // Simple hack to define HOST_NAME_MAX on Mac. // Allows arrays to be statically allocated @@ -28,8 +24,8 @@ #define HOST_NAME_MAX 128 #endif -#define JOB_UNIQUE_ID_ENV "CRAB_UNIQUE_JOB_ID" -#define JOB_UNIQUE_ID_ENV_V2 "DashboardJobId" +static constexpr char const *const JOB_UNIQUE_ID_ENV = "CRAB_UNIQUE_JOB_ID"; +static constexpr char const *const JOB_UNIQUE_ID_ENV_V2 = "DashboardJobId"; using namespace edm::storage; @@ -44,7 +40,7 @@ StatisticsSenderService::FileStatistics::FileStatistics() m_read_vector_count_square(0), m_start_time(time(nullptr)) {} -void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { +void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) const { const StorageAccount::StorageStats &stats = StorageAccount::summary(); ssize_t read_single_operations = 0; ssize_t read_single_bytes = 0; @@ -84,7 +80,6 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { << ", "; os << "\"read_single_average\":" << single_average << ", "; } - m_read_single_square = read_single_square; int64_t vector_op_count = read_vector_operations - m_read_vector_operations; if (vector_op_count > 0) { double vector_average = @@ -104,9 +99,6 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { static_cast(vector_op_count))) << ", "; } - m_read_vector_square = read_vector_square; - m_read_vector_count_square = read_vector_count_square; - m_read_vector_count_sum = read_vector_count_sum; os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", "; @@ -114,17 +106,56 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", "; // See top of file for macros; not complex, just avoiding copy/paste - UPDATE_AND_OUTPUT_STATISTIC(read_single_operations) - UPDATE_AND_OUTPUT_STATISTIC(read_single_bytes) - UPDATE_AND_OUTPUT_STATISTIC(read_vector_operations) - UPDATE_AND_OUTPUT_STATISTIC(read_vector_bytes) + OUTPUT_STATISTIC(read_single_operations) + OUTPUT_STATISTIC(read_single_bytes) + OUTPUT_STATISTIC(read_vector_operations) + OUTPUT_STATISTIC(read_vector_bytes) os << "\"start_time\":" << m_start_time << ", "; - m_start_time = time(nullptr); // NOTE: last entry doesn't have the trailing comma. - os << "\"end_time\":" << m_start_time; + os << "\"end_time\":" << time(nullptr); } +void StatisticsSenderService::FileStatistics::update() { + const StorageAccount::StorageStats &stats = StorageAccount::summary(); + ssize_t read_single_operations = 0; + ssize_t read_single_bytes = 0; + ssize_t read_single_square = 0; + ssize_t read_vector_operations = 0; + ssize_t read_vector_bytes = 0; + ssize_t read_vector_square = 0; + ssize_t read_vector_count_sum = 0; + ssize_t read_vector_count_square = 0; + auto token = StorageAccount::tokenForStorageClassName("tstoragefile"); + for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) { + if (i->first == token.value()) { + continue; + } + for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) { + if (j->first == static_cast(StorageAccount::Operation::readv)) { + read_vector_operations += j->second.attempts; + read_vector_bytes += j->second.amount; + read_vector_count_square += j->second.vector_square; + read_vector_square += j->second.amount_square; + read_vector_count_sum += j->second.vector_count; + } else if (j->first == static_cast(StorageAccount::Operation::read)) { + read_single_operations += j->second.attempts; + read_single_bytes += j->second.amount; + read_single_square += j->second.amount_square; + } + } + } + + m_read_single_square = read_single_square; + m_read_vector_square = read_vector_square; + m_read_vector_count_square = read_vector_count_square; + m_read_vector_count_sum = read_vector_count_sum; + m_read_single_operations = read_single_operations; + m_read_single_bytes = read_single_bytes; + m_read_vector_operations = read_vector_operations; + m_read_vector_bytes = read_vector_bytes; + m_start_time = time(nullptr); +} StatisticsSenderService::FileInfo::FileInfo(std::string const &iLFN) : m_filelfn(iLFN), m_serverhost("unknown"), m_serverdomain("unknown"), m_size(-1), m_id(0), m_openCount(1) {} @@ -137,8 +168,7 @@ StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const &iPSet, m_userdn("unknown"), m_debug(iPSet.getUntrackedParameter("debug", false)) { determineHostnames(); - ar.watchPreCloseFile(this, &StatisticsSenderService::filePreCloseEvent); - ar.watchPreOpenFile([this](auto const &iLFN, bool) { openingFile(iLFN, -1); }); + ar.watchPostCloseFile(this, &StatisticsSenderService::filePostCloseEvent); if (!getX509Subject(m_userdn)) { m_userdn = "unknown"; } @@ -304,7 +334,6 @@ void StatisticsSenderService::setSize(const std::string &url, size_t size) { if (nullptr != lfn) { auto itFound = m_lfnToFileInfo.find(*lfn); if (itFound != m_lfnToFileInfo.end()) { - //do I need to synchronize? itFound->second.m_size = size; } } else if (m_debug) { @@ -312,10 +341,10 @@ void StatisticsSenderService::setSize(const std::string &url, size_t size) { } } -void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool usedFallback) { - closedFile(lfn, usedFallback); +void StatisticsSenderService::filePostCloseEvent(std::string const &lfn, bool usedFallback) { //we are at a sync point in the framwework so no new files are being opened cleanupOldFiles(); + m_filestats.update(); } void StatisticsSenderService::determineHostnames(void) { @@ -338,7 +367,7 @@ void StatisticsSenderService::determineHostnames(void) { void StatisticsSenderService::fillUDP(const std::string &siteName, const FileInfo &fileinfo, bool usedFallback, - std::string &udpinfo) { + std::string &udpinfo) const { std::ostringstream os; // Header - same for all IO accesses From 86edd4a1e77c803a1d981e4e443ca7efe1c50d9f Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Fri, 1 Oct 2021 09:46:20 -0500 Subject: [PATCH 4/6] Changed SecondaryProducer to be a one::EDProducer --- IOPool/SecondaryInput/test/SecondaryProducer.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/IOPool/SecondaryInput/test/SecondaryProducer.h b/IOPool/SecondaryInput/test/SecondaryProducer.h index 7d3086649deb3..22cb7ee1832d6 100644 --- a/IOPool/SecondaryInput/test/SecondaryProducer.h +++ b/IOPool/SecondaryInput/test/SecondaryProducer.h @@ -9,7 +9,7 @@ ************************************************************/ #include "DataFormats/Provenance/interface/EventID.h" -#include "FWCore/Framework/interface/EDProducer.h" +#include "FWCore/Framework/interface/one/EDProducer.h" #include "FWCore/Utilities/interface/get_underlying_safe.h" #include @@ -19,7 +19,7 @@ namespace edm { class ProcessConfiguration; class VectorInputSource; - class SecondaryProducer : public EDProducer { + class SecondaryProducer : public one::EDProducer<> { public: /** standard constructor*/ explicit SecondaryProducer(ParameterSet const& pset); From 7786d20c86a6fc6576d187d8c21b61235cbaa4ea Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Fri, 1 Oct 2021 09:47:24 -0500 Subject: [PATCH 5/6] Added unit test for StatisticsSenderService --- Utilities/StorageFactory/test/BuildFile.xml | 1 + .../StorageFactory/test/make_2nd_file_cfg.py | 10 +++++ .../test/make_test_files_cfg.py | 20 +++++++++ .../test/test_file_statistics_sender.sh | 41 +++++++++++++++++ .../test_multi_file_statistics_sender_cfg.py | 10 +++++ ...ltiple_files_file_statistics_sender_cfg.py | 10 +++++ ...st_secondary_file_statistics_sender_cfg.py | 44 +++++++++++++++++++ .../test_single_file_statistics_sender_cfg.py | 11 +++++ 8 files changed, 147 insertions(+) create mode 100644 Utilities/StorageFactory/test/make_2nd_file_cfg.py create mode 100644 Utilities/StorageFactory/test/make_test_files_cfg.py create mode 100755 Utilities/StorageFactory/test/test_file_statistics_sender.sh create mode 100644 Utilities/StorageFactory/test/test_multi_file_statistics_sender_cfg.py create mode 100644 Utilities/StorageFactory/test/test_multiple_files_file_statistics_sender_cfg.py create mode 100644 Utilities/StorageFactory/test/test_secondary_file_statistics_sender_cfg.py create mode 100644 Utilities/StorageFactory/test/test_single_file_statistics_sender_cfg.py diff --git a/Utilities/StorageFactory/test/BuildFile.xml b/Utilities/StorageFactory/test/BuildFile.xml index 1ef9d5b865f8f..25578023ce536 100644 --- a/Utilities/StorageFactory/test/BuildFile.xml +++ b/Utilities/StorageFactory/test/BuildFile.xml @@ -42,6 +42,7 @@ +