diff --git a/IOPool/Input/src/EmbeddedRootSource.cc b/IOPool/Input/src/EmbeddedRootSource.cc index bc9bf251666f1..f71854d29ad2f 100644 --- a/IOPool/Input/src/EmbeddedRootSource.cc +++ b/IOPool/Input/src/EmbeddedRootSource.cc @@ -45,7 +45,7 @@ namespace edm { InputFile::reportReadBranches(); } - void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile_(); } + void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile(); } bool EmbeddedRootSource::readOneEvent(EventPrincipal& cache, size_t& fileNameHash, diff --git a/IOPool/Input/src/PoolSource.cc b/IOPool/Input/src/PoolSource.cc index db20150593168..03d5772e82aed 100644 --- a/IOPool/Input/src/PoolSource.cc +++ b/IOPool/Input/src/PoolSource.cc @@ -162,7 +162,7 @@ namespace edm { return fb; } - void PoolSource::closeFile_() { primaryFileSequence_->closeFile_(); } + void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); } std::shared_ptr PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); } diff --git a/IOPool/Input/src/RootEmbeddedFileSequence.cc b/IOPool/Input/src/RootEmbeddedFileSequence.cc index e3c9217158480..b5fce11b6dfe0 100644 --- a/IOPool/Input/src/RootEmbeddedFileSequence.cc +++ b/IOPool/Input/src/RootEmbeddedFileSequence.cc @@ -108,7 +108,7 @@ namespace edm { RootEmbeddedFileSequence::~RootEmbeddedFileSequence() {} - void RootEmbeddedFileSequence::endJob() { closeFile_(); } + void RootEmbeddedFileSequence::endJob() { closeFile(); } void RootEmbeddedFileSequence::closeFile_() { // delete the RootFile object. diff --git a/IOPool/Input/src/RootEmbeddedFileSequence.h b/IOPool/Input/src/RootEmbeddedFileSequence.h index 2480ef8e19d43..1c95097cd8468 100644 --- a/IOPool/Input/src/RootEmbeddedFileSequence.h +++ b/IOPool/Input/src/RootEmbeddedFileSequence.h @@ -39,7 +39,6 @@ namespace edm { RootEmbeddedFileSequence(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving RootEmbeddedFileSequence& operator=(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving - void closeFile_() override; void endJob(); void skipEntries(unsigned int offset); bool readOneEvent( @@ -56,6 +55,7 @@ namespace edm { static void fillDescription(ParameterSetDescription& desc); private: + void closeFile_() override; void initFile_(bool skipBadFiles) override; RootFileSharedPtr makeRootFile(std::shared_ptr filePtr) override; diff --git a/IOPool/Input/src/RootInputFileSequence.cc b/IOPool/Input/src/RootInputFileSequence.cc index 4cd78e40909cd..6bba8eb2a2fd9 100644 --- a/IOPool/Input/src/RootInputFileSequence.cc +++ b/IOPool/Input/src/RootInputFileSequence.cc @@ -10,6 +10,8 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" #include "Utilities/StorageFactory/interface/StorageFactory.h" +#include "Utilities/StorageFactory/interface/StatisticsSenderService.h" +#include "FWCore/ServiceRegistry/interface/Service.h" #include "TSystem.h" @@ -192,7 +194,7 @@ namespace edm { } fileIterLastOpened_ = fileIterEnd_; } - closeFile_(); + closeFile(); if (noMoreFiles()) { // No files specified @@ -224,58 +226,61 @@ namespace edm { std::shared_ptr filePtr; std::list originalInfo; - try { + { std::unique_ptr sentry( - input ? std::make_unique(*input, lfn_, usedFallback_) : nullptr); - std::unique_ptr name(gSystem->ExpandPathName(fileName().c_str())); - ; - filePtr = std::make_shared(name.get(), " Initiating request to open file ", inputType); - } catch (cms::Exception const& e) { - if (!skipBadFiles) { - if (hasFallbackUrl) { - std::ostringstream out; - out << e.explainSelf(); - - std::unique_ptr name(gSystem->ExpandPathName(fallbackFileName().c_str())); - std::string pfn(name.get()); - InputFile::reportFallbackAttempt(pfn, logicalFileName(), out.str()); - originalInfo = e.additionalInfo(); - } else { - InputFile::reportSkippedFile(fileName(), logicalFileName()); - Exception ex(errors::FileOpenError, "", e); - ex.addContext("Calling RootFileSequenceBase::initTheFile()"); - std::ostringstream out; - out << "Input file " << fileName() << " could not be opened."; - ex.addAdditionalInfo(out.str()); - throw ex; - } + input ? std::make_unique(*input, lfn_, false) : nullptr); + edm::Service service; + if (service.isAvailable()) { + service->openingFile(lfn(), inputType, -1); } - } - if (!filePtr && (hasFallbackUrl)) { try { - usedFallback_ = true; - std::unique_ptr sentry( - input ? std::make_unique(*input, lfn_, usedFallback_) : nullptr); - std::unique_ptr fallbackFullName(gSystem->ExpandPathName(fallbackFileName().c_str())); - filePtr.reset(new InputFile(fallbackFullName.get(), " Fallback request to file ", inputType)); + std::unique_ptr name(gSystem->ExpandPathName(fileName().c_str())); + filePtr = std::make_shared(name.get(), " Initiating request to open file ", inputType); } catch (cms::Exception const& e) { if (!skipBadFiles) { - InputFile::reportSkippedFile(fileName(), logicalFileName()); - Exception ex(errors::FallbackFileOpenError, "", e); - ex.addContext("Calling RootFileSequenceBase::initTheFile()"); - std::ostringstream out; - out << "Input file " << fileName() << " could not be opened.\n"; - out << "Fallback Input file " << fallbackFileName() << " also could not be opened."; - if (!originalInfo.empty()) { - out << std::endl << "Original exception info is above; fallback exception info is below."; - ex.addAdditionalInfo(out.str()); - for (auto const& s : originalInfo) { - ex.addAdditionalInfo(s); - } + if (hasFallbackUrl) { + std::ostringstream out; + out << e.explainSelf(); + + std::unique_ptr name(gSystem->ExpandPathName(fallbackFileName().c_str())); + std::string pfn(name.get()); + InputFile::reportFallbackAttempt(pfn, logicalFileName(), out.str()); + originalInfo = e.additionalInfo(); } else { + InputFile::reportSkippedFile(fileName(), logicalFileName()); + Exception ex(errors::FileOpenError, "", e); + ex.addContext("Calling RootFileSequenceBase::initTheFile()"); + std::ostringstream out; + out << "Input file " << fileName() << " could not be opened."; ex.addAdditionalInfo(out.str()); + throw ex; + } + } + } + if (!filePtr && (hasFallbackUrl)) { + try { + usedFallback_ = true; + std::unique_ptr fallbackFullName(gSystem->ExpandPathName(fallbackFileName().c_str())); + filePtr.reset(new InputFile(fallbackFullName.get(), " Fallback request to file ", inputType)); + } catch (cms::Exception const& e) { + if (!skipBadFiles) { + InputFile::reportSkippedFile(fileName(), logicalFileName()); + Exception ex(errors::FallbackFileOpenError, "", e); + ex.addContext("Calling RootFileSequenceBase::initTheFile()"); + std::ostringstream out; + out << "Input file " << fileName() << " could not be opened.\n"; + out << "Fallback Input file " << fallbackFileName() << " also could not be opened."; + if (!originalInfo.empty()) { + out << std::endl << "Original exception info is above; fallback exception info is below."; + ex.addAdditionalInfo(out.str()); + for (auto const& s : originalInfo) { + ex.addAdditionalInfo(s); + } + } else { + ex.addAdditionalInfo(out.str()); + } + throw ex; } - throw ex; } } } @@ -299,6 +304,14 @@ namespace edm { } } + void RootInputFileSequence::closeFile() { + edm::Service service; + if (rootFile() and service.isAvailable()) { + service->closedFile(lfn(), usedFallback()); + } + closeFile_(); + } + void RootInputFileSequence::setIndexIntoFile(size_t index) { indexesIntoFiles_[index] = rootFile()->indexIntoFileSharedPtr(); } diff --git a/IOPool/Input/src/RootInputFileSequence.h b/IOPool/Input/src/RootInputFileSequence.h index bf91576e3a4fb..ae132f6971eea 100644 --- a/IOPool/Input/src/RootInputFileSequence.h +++ b/IOPool/Input/src/RootInputFileSequence.h @@ -48,6 +48,8 @@ namespace edm { std::shared_ptr fileProductRegistry() const; std::shared_ptr fileBranchIDListHelper() const; + void closeFile(); + protected: typedef std::shared_ptr RootFileSharedPtr; void initFile(bool skipBadFiles) { initFile_(skipBadFiles); } diff --git a/IOPool/Input/src/RootPrimaryFileSequence.cc b/IOPool/Input/src/RootPrimaryFileSequence.cc index 48ecd0115df87..7492316373ee6 100644 --- a/IOPool/Input/src/RootPrimaryFileSequence.cc +++ b/IOPool/Input/src/RootPrimaryFileSequence.cc @@ -68,7 +68,7 @@ namespace edm { RootPrimaryFileSequence::~RootPrimaryFileSequence() {} - void RootPrimaryFileSequence::endJob() { closeFile_(); } + void RootPrimaryFileSequence::endJob() { closeFile(); } std::unique_ptr RootPrimaryFileSequence::readFile_() { if (firstFile_) { @@ -212,7 +212,7 @@ namespace edm { // Rewind to before the first event that was read. void RootPrimaryFileSequence::rewind_() { if (!atFirstFile()) { - closeFile_(); + closeFile(); setAtFirstFile(); } if (!rootFile()) { diff --git a/IOPool/Input/src/RootPrimaryFileSequence.h b/IOPool/Input/src/RootPrimaryFileSequence.h index 903e1001c82f5..f2390be10df64 100644 --- a/IOPool/Input/src/RootPrimaryFileSequence.h +++ b/IOPool/Input/src/RootPrimaryFileSequence.h @@ -40,7 +40,6 @@ namespace edm { RootPrimaryFileSequence& operator=(RootPrimaryFileSequence const&) = delete; // Disallow copying and moving std::unique_ptr readFile_(); - void closeFile_() override; void endJob(); InputSource::ItemType getNextItemType(RunNumber_t& run, LuminosityBlockNumber_t& lumi, EventNumber_t& event); bool skipEvents(int offset); @@ -56,6 +55,7 @@ namespace edm { bool nextFile(); bool previousFile(); void rewindFile(); + void closeFile_() override; int remainingEvents() const; int remainingLuminosityBlocks() const; diff --git a/IOPool/Input/src/RootSecondaryFileSequence.cc b/IOPool/Input/src/RootSecondaryFileSequence.cc index e0f14147c48c0..9d70a6c787199 100644 --- a/IOPool/Input/src/RootSecondaryFileSequence.cc +++ b/IOPool/Input/src/RootSecondaryFileSequence.cc @@ -52,7 +52,7 @@ namespace edm { RootSecondaryFileSequence::~RootSecondaryFileSequence() {} - void RootSecondaryFileSequence::endJob() { closeFile_(); } + void RootSecondaryFileSequence::endJob() { closeFile(); } void RootSecondaryFileSequence::closeFile_() { // close the currently open file, if any, and delete the RootFile object. diff --git a/IOPool/Input/src/RootSecondaryFileSequence.h b/IOPool/Input/src/RootSecondaryFileSequence.h index 6ff9be7c5f884..5ad6e3516f4a5 100644 --- a/IOPool/Input/src/RootSecondaryFileSequence.h +++ b/IOPool/Input/src/RootSecondaryFileSequence.h @@ -33,11 +33,11 @@ namespace edm { RootSecondaryFileSequence(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving RootSecondaryFileSequence& operator=(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving - void closeFile_() override; void endJob(); void initAssociationsFromSecondary(std::set const&); private: + void closeFile_() override; void initFile_(bool skipBadFiles) override; RootFileSharedPtr makeRootFile(std::shared_ptr filePtr) override; diff --git a/IOPool/SecondaryInput/test/SecondaryProducer.h b/IOPool/SecondaryInput/test/SecondaryProducer.h index 7d3086649deb3..22cb7ee1832d6 100644 --- a/IOPool/SecondaryInput/test/SecondaryProducer.h +++ b/IOPool/SecondaryInput/test/SecondaryProducer.h @@ -9,7 +9,7 @@ ************************************************************/ #include "DataFormats/Provenance/interface/EventID.h" -#include "FWCore/Framework/interface/EDProducer.h" +#include "FWCore/Framework/interface/one/EDProducer.h" #include "FWCore/Utilities/interface/get_underlying_safe.h" #include @@ -19,7 +19,7 @@ namespace edm { class ProcessConfiguration; class VectorInputSource; - class SecondaryProducer : public EDProducer { + class SecondaryProducer : public one::EDProducer<> { public: /** standard constructor*/ explicit SecondaryProducer(ParameterSet const& pset); diff --git a/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc b/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc index 5a8622d75d755..405453c42b876 100644 --- a/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc +++ b/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc @@ -195,7 +195,7 @@ void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = "" try { edm::Service statsService; if (statsService.isAvailable()) { - statsService->setSize(storage_->size()); + statsService->setSize(path, storage_->size()); } } catch (edm::Exception const &e) { if (e.categoryCode() != edm::errors::NotFound) { diff --git a/Utilities/StorageFactory/interface/StatisticsSenderService.h b/Utilities/StorageFactory/interface/StatisticsSenderService.h index 24651dcc9f9c2..25d715e035abb 100644 --- a/Utilities/StorageFactory/interface/StatisticsSenderService.h +++ b/Utilities/StorageFactory/interface/StatisticsSenderService.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include "FWCore/Utilities/interface/InputType.h" namespace edm { @@ -16,19 +18,25 @@ namespace edm { class StatisticsSenderService { public: - StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar); + StatisticsSenderService(edm::ParameterSet const& pset, edm::ActivityRegistry& ar); - void setSize(size_t size); - void setCurrentServer(const std::string &servername); - void filePreCloseEvent(std::string const &lfn, bool usedFallback); - static const char *getJobID(); - static bool getX509Subject(std::string &); + void setSize(const std::string& urlOrLfn, size_t size); + void setCurrentServer(const std::string& urlOrLfn, const std::string& servername); + static const char* getJobID(); + static bool getX509Subject(std::string&); + + void openingFile(std::string const& lfn, edm::InputType type, size_t size = -1); + void closedFile(std::string const& lfn, bool usedFallback); private: + void filePostCloseEvent(std::string const& lfn, bool usedFallback); + + std::string const* matchedLfn(std::string const& iURL); //updates its internal cache class FileStatistics { public: FileStatistics(); - void fillUDP(std::ostringstream &os); + void fillUDP(std::ostringstream& os) const; + void update(); private: ssize_t m_read_single_operations; @@ -42,19 +50,40 @@ namespace edm { time_t m_start_time; }; - void determineHostnames(void); - void fillUDP(const std::string &, bool, std::string &); + struct FileInfo { + explicit FileInfo(std::string const& iLFN, edm::InputType); + + FileInfo(FileInfo&& iInfo) + : m_filelfn(std::move(iInfo.m_filelfn)), + m_serverhost(std::move(iInfo.m_serverhost)), + m_serverdomain(std::move(iInfo.m_serverdomain)), + m_type(iInfo.m_type), + m_size(iInfo.m_size.load()), + m_id(iInfo.m_id), + m_openCount(iInfo.m_openCount.load()) {} + std::string m_filelfn; + std::string m_serverhost; + std::string m_serverdomain; + edm::InputType m_type; + std::atomic m_size; + size_t m_id; //from m_counter + std::atomic m_openCount; + }; + + void determineHostnames(); + void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&) const; + void cleanupOldFiles(); + std::string m_clienthost; std::string m_clientdomain; - std::string m_serverhost; - std::string m_serverdomain; - std::string m_filelfn; + tbb::concurrent_unordered_map m_lfnToFileInfo; + tbb::concurrent_unordered_map m_urlToLfn; FileStatistics m_filestats; std::string m_guid; size_t m_counter; - std::atomic m_size; std::string m_userdn; std::mutex m_servermutex; + const bool m_debug; }; } // namespace storage diff --git a/Utilities/StorageFactory/src/StatisticsSenderService.cc b/Utilities/StorageFactory/src/StatisticsSenderService.cc index b627a8a5067b0..0e96624fe1ef0 100644 --- a/Utilities/StorageFactory/src/StatisticsSenderService.cc +++ b/Utilities/StorageFactory/src/StatisticsSenderService.cc @@ -4,6 +4,7 @@ #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/Catalog/interface/SiteLocalConfig.h" #include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/Utilities/src/Guid.h" #include @@ -15,11 +16,7 @@ #include #include -#define UPDATE_STATISTIC(x) m_##x = x; - -#define UPDATE_AND_OUTPUT_STATISTIC(x) \ - os << "\"" #x "\":" << (x - m_##x) << ", "; \ - UPDATE_STATISTIC(x) +#define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", "; // Simple hack to define HOST_NAME_MAX on Mac. // Allows arrays to be statically allocated @@ -27,8 +24,8 @@ #define HOST_NAME_MAX 128 #endif -#define JOB_UNIQUE_ID_ENV "CRAB_UNIQUE_JOB_ID" -#define JOB_UNIQUE_ID_ENV_V2 "DashboardJobId" +static constexpr char const *const JOB_UNIQUE_ID_ENV = "CRAB_UNIQUE_JOB_ID"; +static constexpr char const *const JOB_UNIQUE_ID_ENV_V2 = "DashboardJobId"; using namespace edm::storage; @@ -43,7 +40,7 @@ StatisticsSenderService::FileStatistics::FileStatistics() m_read_vector_count_square(0), m_start_time(time(nullptr)) {} -void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { +void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) const { const StorageAccount::StorageStats &stats = StorageAccount::summary(); ssize_t read_single_operations = 0; ssize_t read_single_bytes = 0; @@ -77,35 +74,31 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { double single_sum = read_single_bytes - m_read_single_bytes; double single_average = single_sum / static_cast(single_op_count); os << "\"read_single_sigma\":" - << sqrt((static_cast(read_single_square - m_read_single_square) - - single_average * single_average * single_op_count) / - static_cast(single_op_count)) + << sqrt(std::abs((static_cast(read_single_square - m_read_single_square) - + single_average * single_average * single_op_count) / + static_cast(single_op_count))) << ", "; os << "\"read_single_average\":" << single_average << ", "; } - m_read_single_square = read_single_square; int64_t vector_op_count = read_vector_operations - m_read_vector_operations; if (vector_op_count > 0) { double vector_average = static_cast(read_vector_bytes - m_read_vector_bytes) / static_cast(vector_op_count); os << "\"read_vector_average\":" << vector_average << ", "; os << "\"read_vector_sigma\":" - << sqrt((static_cast(read_vector_square - m_read_vector_square) - - vector_average * vector_average * vector_op_count) / - static_cast(vector_op_count)) + << sqrt(std::abs((static_cast(read_vector_square - m_read_vector_square) - + vector_average * vector_average * vector_op_count) / + static_cast(vector_op_count))) << ", "; double vector_count_average = static_cast(read_vector_count_sum - m_read_vector_count_sum) / static_cast(vector_op_count); os << "\"read_vector_count_average\":" << vector_count_average << ", "; os << "\"read_vector_count_sigma\":" - << sqrt((static_cast(read_vector_count_square - m_read_vector_count_square) - - vector_count_average * vector_count_average * vector_op_count) / - static_cast(vector_op_count)) + << sqrt(std::abs((static_cast(read_vector_count_square - m_read_vector_count_square) - + vector_count_average * vector_count_average * vector_op_count) / + static_cast(vector_op_count))) << ", "; } - m_read_vector_square = read_vector_square; - m_read_vector_count_square = read_vector_count_square; - m_read_vector_count_sum = read_vector_count_sum; os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", "; @@ -113,30 +106,75 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", "; // See top of file for macros; not complex, just avoiding copy/paste - UPDATE_AND_OUTPUT_STATISTIC(read_single_operations) - UPDATE_AND_OUTPUT_STATISTIC(read_single_bytes) - UPDATE_AND_OUTPUT_STATISTIC(read_vector_operations) - UPDATE_AND_OUTPUT_STATISTIC(read_vector_bytes) + OUTPUT_STATISTIC(read_single_operations) + OUTPUT_STATISTIC(read_single_bytes) + OUTPUT_STATISTIC(read_vector_operations) + OUTPUT_STATISTIC(read_vector_bytes) os << "\"start_time\":" << m_start_time << ", "; - m_start_time = time(nullptr); // NOTE: last entry doesn't have the trailing comma. - os << "\"end_time\":" << m_start_time; + os << "\"end_time\":" << time(nullptr); } -StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const & /*pset*/, edm::ActivityRegistry &ar) - : m_clienthost("unknown"), - m_clientdomain("unknown"), +void StatisticsSenderService::FileStatistics::update() { + const StorageAccount::StorageStats &stats = StorageAccount::summary(); + ssize_t read_single_operations = 0; + ssize_t read_single_bytes = 0; + ssize_t read_single_square = 0; + ssize_t read_vector_operations = 0; + ssize_t read_vector_bytes = 0; + ssize_t read_vector_square = 0; + ssize_t read_vector_count_sum = 0; + ssize_t read_vector_count_square = 0; + auto token = StorageAccount::tokenForStorageClassName("tstoragefile"); + for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) { + if (i->first == token.value()) { + continue; + } + for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) { + if (j->first == static_cast(StorageAccount::Operation::readv)) { + read_vector_operations += j->second.attempts; + read_vector_bytes += j->second.amount; + read_vector_count_square += j->second.vector_square; + read_vector_square += j->second.amount_square; + read_vector_count_sum += j->second.vector_count; + } else if (j->first == static_cast(StorageAccount::Operation::read)) { + read_single_operations += j->second.attempts; + read_single_bytes += j->second.amount; + read_single_square += j->second.amount_square; + } + } + } + + m_read_single_square = read_single_square; + m_read_vector_square = read_vector_square; + m_read_vector_count_square = read_vector_count_square; + m_read_vector_count_sum = read_vector_count_sum; + m_read_single_operations = read_single_operations; + m_read_single_bytes = read_single_bytes; + m_read_vector_operations = read_vector_operations; + m_read_vector_bytes = read_vector_bytes; + m_start_time = time(nullptr); +} +StatisticsSenderService::FileInfo::FileInfo(std::string const &iLFN, edm::InputType iType) + : m_filelfn(iLFN), m_serverhost("unknown"), m_serverdomain("unknown"), - m_filelfn("unknown"), + m_type(iType), + m_size(-1), + m_id(0), + m_openCount(1) {} + +StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const &iPSet, edm::ActivityRegistry &ar) + : m_clienthost("unknown"), + m_clientdomain("unknown"), m_filestats(), m_guid(Guid().toString()), m_counter(0), - m_size(-1), - m_userdn("unknown") { + m_userdn("unknown"), + m_debug(iPSet.getUntrackedParameter("debug", false)) { determineHostnames(); - ar.watchPreCloseFile(this, &StatisticsSenderService::filePreCloseEvent); + ar.watchPostCloseFile(this, &StatisticsSenderService::filePostCloseEvent); if (!getX509Subject(m_userdn)) { m_userdn = "unknown"; } @@ -148,8 +186,36 @@ const char *StatisticsSenderService::getJobID() { return id ? id : std::getenv(JOB_UNIQUE_ID_ENV_V2); } -void StatisticsSenderService::setCurrentServer(const std::string &servername) { - size_t dot_pos = servername.find("."); +std::string const *StatisticsSenderService::matchedLfn(std::string const &iURL) { + auto found = m_urlToLfn.find(iURL); + if (found != m_urlToLfn.end()) { + return &found->second; + } + for (auto const &v : m_lfnToFileInfo) { + if (v.first.size() < iURL.size()) { + if (v.first == iURL.substr(iURL.size() - v.first.size())) { + m_urlToLfn.emplace(iURL, v.first); + return &m_urlToLfn.find(iURL)->second; + } + } + } + //does the lfn have a protocol and the iURL not? + if (std::string::npos == iURL.find(':')) { + for (auto const &v : m_lfnToFileInfo) { + if ((std::string::npos != v.first.find(':')) and (v.first.size() > iURL.size())) { + if (iURL == v.first.substr(v.first.size() - iURL.size())) { + m_urlToLfn.emplace(iURL, v.first); + return &m_urlToLfn.find(iURL)->second; + } + } + } + } + + return nullptr; +} + +void StatisticsSenderService::setCurrentServer(const std::string &url, const std::string &servername) { + size_t dot_pos = servername.find('.'); std::string serverhost; std::string serverdomain; if (dot_pos == std::string::npos) { @@ -163,24 +229,41 @@ void StatisticsSenderService::setCurrentServer(const std::string &servername) { } } { + auto lfn = matchedLfn(url); std::lock_guard sentry(m_servermutex); - m_serverhost = std::move(serverhost); - m_serverdomain = std::move(serverdomain); + if (nullptr != lfn) { + auto found = m_lfnToFileInfo.find(*lfn); + if (found != m_lfnToFileInfo.end()) { + found->second.m_serverhost = std::move(serverhost); + found->second.m_serverdomain = std::move(serverdomain); + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "setCurrentServer: unknown url name " << url << "\n"; + } } } -void StatisticsSenderService::setSize(size_t size) { m_size = size; } - -void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool usedFallback) { - m_filelfn = lfn; +void StatisticsSenderService::openingFile(std::string const &lfn, edm::InputType type, size_t size) { + m_urlToLfn.emplace(lfn, lfn); + auto attempt = m_lfnToFileInfo.emplace(lfn, FileInfo{lfn, type}); + if (attempt.second) { + attempt.first->second.m_size = size; + attempt.first->second.m_id = m_counter++; + edm::LogInfo("StatisticsSenderService") << "openingFile: opening " << lfn << "\n"; + } else { + ++(attempt.first->second.m_openCount); + edm::LogInfo("StatisticsSenderService") << "openingFile: re-opening" << lfn << "\n"; + } +} +void StatisticsSenderService::closedFile(std::string const &url, bool usedFallback) { edm::Service pSLC; if (!pSLC.isAvailable()) { return; } const struct addrinfo *addresses = pSLC->statisticsDestination(); - if (!addresses) { + if (!addresses and !m_debug) { return; } @@ -190,22 +273,86 @@ void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool use m_userdn = "not reported"; } - std::string results; - fillUDP(pSLC->siteName(), usedFallback, results); + auto lfn = matchedLfn(url); + if (nullptr != lfn) { + auto found = m_lfnToFileInfo.find(*lfn); + assert(found != m_lfnToFileInfo.end()); - for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) { - int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); - if (sock < 0) { - continue; + std::string results; + fillUDP(pSLC->siteName(), found->second, usedFallback, results); + if (m_debug) { + edm::LogSystem("StatisticSenderService") << "\n" << results << "\n"; } - auto close_del = [](int *iSocket) { close(*iSocket); }; - std::unique_ptr guard(&sock, close_del); - if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) { - break; + + for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) { + int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); + if (sock < 0) { + continue; + } + auto close_del = [](int *iSocket) { close(*iSocket); }; + std::unique_ptr guard(&sock, close_del); + if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) { + break; + } } + + auto c = --found->second.m_openCount; + if (m_debug) { + if (c == 0) { + edm::LogWarning("StatisticsSenderService") << "fully closed: " << *lfn << "\n"; + } else { + edm::LogWarning("StatisticsSenderService") << "partially closed: " << *lfn << "\n"; + } + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "closed: unknown url name " << url << "\n"; + } +} + +void StatisticsSenderService::cleanupOldFiles() { + //remove entries with openCount of 0 + bool moreToTest = false; + do { + moreToTest = false; + for (auto it = m_lfnToFileInfo.begin(); it != m_lfnToFileInfo.end(); ++it) { + if (it->second.m_openCount == 0) { + auto lfn = it->first; + bool moreToTest2 = false; + do { + moreToTest2 = false; + for (auto it2 = m_urlToLfn.begin(); it2 != m_urlToLfn.end(); ++it2) { + if (it2->second == lfn) { + m_urlToLfn.unsafe_erase(it2); + moreToTest2 = true; + break; + } + } + } while (moreToTest2); + + m_lfnToFileInfo.unsafe_erase(it); + moreToTest = true; + break; + } + } + } while (moreToTest); +} + +void StatisticsSenderService::setSize(const std::string &url, size_t size) { + auto lfn = matchedLfn(url); + if (nullptr != lfn) { + auto itFound = m_lfnToFileInfo.find(*lfn); + if (itFound != m_lfnToFileInfo.end()) { + itFound->second.m_size = size; + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "setSize: unknown url name " << url << "\n"; } +} - m_counter++; +void StatisticsSenderService::filePostCloseEvent(std::string const &lfn, bool usedFallback) { + //we are at a sync point in the framwework so no new files are being opened + cleanupOldFiles(); + m_filestats.update(); } void StatisticsSenderService::determineHostnames(void) { @@ -225,7 +372,10 @@ void StatisticsSenderService::determineHostnames(void) { } } -void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFallback, std::string &udpinfo) { +void StatisticsSenderService::fillUDP(const std::string &siteName, + const FileInfo &fileinfo, + bool usedFallback, + std::string &udpinfo) const { std::ostringstream os; // Header - same for all IO accesses @@ -235,22 +385,34 @@ void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFall } if (usedFallback) { os << "\"fallback\": true, "; + } else { + os << "\"fallback\": false, "; } - std::string serverhost; - std::string serverdomain; - { - std::lock_guard sentry(m_servermutex); - serverhost = m_serverhost; - serverdomain = m_serverdomain; + os << "\"type\": "; + switch (fileinfo.m_type) { + case edm::InputType::Primary: { + os << "\"primary\", "; + break; + } + case edm::InputType::SecondaryFile: { + os << "\"secondary\", "; + break; + } + case edm::InputType::SecondarySource: { + os << "\"embedded\", "; + break; + } } + auto serverhost = fileinfo.m_serverhost; + auto serverdomain = fileinfo.m_serverdomain; os << "\"user_dn\":\"" << m_userdn << "\", "; os << "\"client_host\":\"" << m_clienthost << "\", "; os << "\"client_domain\":\"" << m_clientdomain << "\", "; os << "\"server_host\":\"" << serverhost << "\", "; os << "\"server_domain\":\"" << serverdomain << "\", "; - os << "\"unique_id\":\"" << m_guid << "-" << m_counter << "\", "; - os << "\"file_lfn\":\"" << m_filelfn << "\", "; + os << "\"unique_id\":\"" << m_guid << "-" << fileinfo.m_id << "\", "; + os << "\"file_lfn\":\"" << fileinfo.m_filelfn << "\", "; // Dashboard devs requested that we send out no app_info if a job ID // is not present in the environment. const char *jobId = getJobID(); @@ -258,8 +420,8 @@ void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFall os << "\"app_info\":\"" << jobId << "\", "; } - if (m_size >= 0) { - os << "\"file_size\":" << m_size << ", "; + if (fileinfo.m_size >= 0) { + os << "\"file_size\":" << fileinfo.m_size << ", "; } m_filestats.fillUDP(os); diff --git a/Utilities/StorageFactory/test/BuildFile.xml b/Utilities/StorageFactory/test/BuildFile.xml index 27329acd2a6d8..2ce22f617536c 100644 --- a/Utilities/StorageFactory/test/BuildFile.xml +++ b/Utilities/StorageFactory/test/BuildFile.xml @@ -37,3 +37,4 @@ # to wait until the framework decides on a threading model to implement a fix. # file="threadsafe.cpp" name="test_StorageFactory_threadsafe" + diff --git a/Utilities/StorageFactory/test/make_2nd_file_cfg.py b/Utilities/StorageFactory/test/make_2nd_file_cfg.py new file mode 100644 index 0000000000000..65da742632ca9 --- /dev/null +++ b/Utilities/StorageFactory/test/make_2nd_file_cfg.py @@ -0,0 +1,10 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("SECOND") + +process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring("file:stat_sender_first.root")) + +process.o = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_second.root"), outputCommands = cms.untracked.vstring("drop *")) + +process.ep = cms.EndPath(process.o) + diff --git a/Utilities/StorageFactory/test/make_test_files_cfg.py b/Utilities/StorageFactory/test/make_test_files_cfg.py new file mode 100644 index 0000000000000..60f3944f256bc --- /dev/null +++ b/Utilities/StorageFactory/test/make_test_files_cfg.py @@ -0,0 +1,20 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("FIRST") + +process.source = cms.Source("EmptySource") + +process.first = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_first.root")) +process.b = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_b.root")) +process.c = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_c.root")) +process.d = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_d.root")) +process.e = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("stat_sender_e.root")) + +process.Thing = cms.EDProducer("ThingProducer") +process.OtherThing = cms.EDProducer("OtherThingProducer") +process.EventNumber = cms.EDProducer("EventNumberIntProducer") + + +process.o = cms.EndPath(process.first+process.b+process.c+process.d+process.e, cms.Task(process.Thing, process.OtherThing, process.EventNumber)) + +process.maxEvents.input = 10 diff --git a/Utilities/StorageFactory/test/test_file_statistics_sender.sh b/Utilities/StorageFactory/test/test_file_statistics_sender.sh new file mode 100755 index 0000000000000..523da45696e34 --- /dev/null +++ b/Utilities/StorageFactory/test/test_file_statistics_sender.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +# Pass in name and status +function die { echo $1: status $2 ; exit $2; } + +LOCAL_TEST_DIR=${CMSSW_BASE}/src/Utilities/StorageFactory/test +LOCAL_TMP_DIR=${CMSSW_BASE}/tmp/${SCRAM_ARCH} + +pushd ${LOCAL_TMP_DIR} + +#setup files used in tests +cmsRun ${LOCAL_TEST_DIR}/make_test_files_cfg.py &> make_test_files.log || die "cmsRun make_test_files_cfg.py" $? +rm make_test_files.log +cmsRun ${LOCAL_TEST_DIR}/make_2nd_file_cfg.py &> make_2nd_file.log || die "cmsRun make_2nd_file_cfg.py" $? +rm make_2nd_file.log + +cmsRun ${LOCAL_TEST_DIR}/test_single_file_statistics_sender_cfg.py &> test_single_file_statistics_sender.log || die "cmsRun test_single_file_statistics_sender_cfg.py" $? +grep -q '"file_lfn":"file:stat_sender_first.root"' test_single_file_statistics_sender.log || die "no StatisticsSenderService output for single file" 1 +rm test_single_file_statistics_sender.log + +cmsRun ${LOCAL_TEST_DIR}/test_multiple_files_file_statistics_sender_cfg.py &> test_multiple_files_file_statistics_sender.log || die "cmsRun test_multiple_files_file_statistics_sender_cfg.py" $? +grep -q '"file_lfn":"file:stat_sender_b.root"' test_multiple_files_file_statistics_sender.log || die "no StatisticsSenderService output for file b in multiple files" 1 +grep -q '"file_lfn":"file:stat_sender_c.root"' test_multiple_files_file_statistics_sender.log || die "no StatisticsSenderService output for file c in multiple files" 1 +grep -q '"file_lfn":"file:stat_sender_d.root"' test_multiple_files_file_statistics_sender.log || die "no StatisticsSenderService output for file d in multiple files" 1 +grep -q '"file_lfn":"file:stat_sender_e.root"' test_multiple_files_file_statistics_sender.log || die "no StatisticsSenderService output for file e in multiple files" 1 +rm test_multiple_files_file_statistics_sender.log + +cmsRun ${LOCAL_TEST_DIR}/test_multi_file_statistics_sender_cfg.py &> test_multi_file_statistics_sender.log || die "cmsRun test_multi_file_statistics_sender_cfg.py" $? +grep -q '"file_lfn":"file:stat_sender_first.root"' test_multi_file_statistics_sender.log || die "no StatisticsSenderService output for file first in multi file" 1 +grep -q '"file_lfn":"file:stat_sender_second.root"' test_multi_file_statistics_sender.log || die "no StatisticsSenderService output for file second in multi file" 1 +rm test_multi_file_statistics_sender.log + +cmsRun ${LOCAL_TEST_DIR}/test_secondary_file_statistics_sender_cfg.py &> test_secondary_file_statistics_sender.log || die "cmsRun test_secondary_file_statistics_sender_cfg.py" $? +grep -q '"file_lfn":"file:stat_sender_first.root"' test_secondary_file_statistics_sender.log || die "no StatisticsSenderService output for file 'first' in secondary files" 1 +grep -q '"file_lfn":"file:stat_sender_b.root"' test_secondary_file_statistics_sender.log || die "no StatisticsSenderService output for file 'b' in secondary files" 1 +grep -q '"file_lfn":"file:stat_sender_c.root"' test_secondary_file_statistics_sender.log || die "no StatisticsSenderService output for file 'c' in secondary files" 1 +grep -q '"file_lfn":"file:stat_sender_d.root"' test_secondary_file_statistics_sender.log || die "no StatisticsSenderService output for file 'd' in secondary files" 1 +grep -q '"file_lfn":"file:stat_sender_e.root"' test_secondary_file_statistics_sender.log || die "no StatisticsSenderService output for file 'e' in secondary files" 1 +rm test_secondary_file_statistics_sender.log + +popd \ No newline at end of file diff --git a/Utilities/StorageFactory/test/test_multi_file_statistics_sender_cfg.py b/Utilities/StorageFactory/test/test_multi_file_statistics_sender_cfg.py new file mode 100644 index 0000000000000..19815adc72c35 --- /dev/null +++ b/Utilities/StorageFactory/test/test_multi_file_statistics_sender_cfg.py @@ -0,0 +1,10 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +process.source = cms.Source("PoolSource", + fileNames = cms.untracked.vstring("file:stat_sender_second.root"), + secondaryFileNames = cms.untracked.vstring("file:stat_sender_first.root") +) + +process.add_(cms.Service("StatisticsSenderService", debug = cms.untracked.bool(True))) \ No newline at end of file diff --git a/Utilities/StorageFactory/test/test_multiple_files_file_statistics_sender_cfg.py b/Utilities/StorageFactory/test/test_multiple_files_file_statistics_sender_cfg.py new file mode 100644 index 0000000000000..a734285bd2487 --- /dev/null +++ b/Utilities/StorageFactory/test/test_multiple_files_file_statistics_sender_cfg.py @@ -0,0 +1,10 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring("file:stat_sender_b.root", "file:stat_sender_c.root", "file:stat_sender_d.root", "file:stat_sender_e.root"), +duplicateCheckMode = cms.untracked.string('noDuplicateCheck'), +inputCommands = cms.untracked.vstring('drop *_*_beginRun_*', 'drop *_*_endRun_*', 'drop *_*_beginLumi_*', 'drop *_*_endLumi_*') +) + +process.add_(cms.Service("StatisticsSenderService", debug = cms.untracked.bool(True))) \ No newline at end of file diff --git a/Utilities/StorageFactory/test/test_secondary_file_statistics_sender_cfg.py b/Utilities/StorageFactory/test/test_secondary_file_statistics_sender_cfg.py new file mode 100644 index 0000000000000..250d575c7f39e --- /dev/null +++ b/Utilities/StorageFactory/test/test_secondary_file_statistics_sender_cfg.py @@ -0,0 +1,44 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring("file:stat_sender_first.root")) + +process.b = cms.EDProducer("SecondaryProducer", + seq = cms.untracked.bool(True), + input = cms.SecSource("EmbeddedRootSource", + sequential = cms.untracked.bool(True), + fileNames = cms.untracked.vstring('file:stat_sender_b.root') + ) +) + +process.c = cms.EDProducer("SecondaryProducer", + seq = cms.untracked.bool(True), + input = cms.SecSource("EmbeddedRootSource", + sequential = cms.untracked.bool(True), + fileNames = cms.untracked.vstring('file:stat_sender_c.root') + ) +) + +process.d = cms.EDProducer("SecondaryProducer", + seq = cms.untracked.bool(True), + input = cms.SecSource("EmbeddedRootSource", + sequential = cms.untracked.bool(True), + fileNames = cms.untracked.vstring('file:stat_sender_d.root') + ) +) + +process.e = cms.EDProducer("SecondaryProducer", + seq = cms.untracked.bool(True), + input = cms.SecSource("EmbeddedRootSource", + sequential = cms.untracked.bool(True), + fileNames = cms.untracked.vstring('file:stat_sender_e.root') + ) +) + +process.pB = cms.Path(process.b) +process.pC = cms.Path(process.c) +process.pD = cms.Path(process.d) +process.pE = cms.Path(process.e) + +process.add_(cms.Service("StatisticsSenderService", debug = cms.untracked.bool(True))) \ No newline at end of file diff --git a/Utilities/StorageFactory/test/test_single_file_statistics_sender_cfg.py b/Utilities/StorageFactory/test/test_single_file_statistics_sender_cfg.py new file mode 100644 index 0000000000000..0576542a5e934 --- /dev/null +++ b/Utilities/StorageFactory/test/test_single_file_statistics_sender_cfg.py @@ -0,0 +1,11 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring("file:stat_sender_first.root")) + +process.add_(cms.Service("StatisticsSenderService", debug = cms.untracked.bool(True))) + +process.load("FWCore.MessageService.MessageLogger_cfi") + +process.MessageLogger.cerr.INFO.limit = 1000 \ No newline at end of file diff --git a/Utilities/XrdAdaptor/src/XrdRequestManager.cc b/Utilities/XrdAdaptor/src/XrdRequestManager.cc index 61b1751eabda0..d25b67db750fa 100644 --- a/Utilities/XrdAdaptor/src/XrdRequestManager.cc +++ b/Utilities/XrdAdaptor/src/XrdRequestManager.cc @@ -243,7 +243,7 @@ void RequestManager::updateCurrentServer() { std::unique_ptr hostname(hostname_ptr); edm::Service statsService; if (statsService.isAvailable()) { - statsService->setCurrentServer(*hostname_ptr); + statsService->setCurrentServer(m_name, *hostname_ptr); } } }