diff --git a/HLTrigger/Timer/plugins/FastTimerService.cc b/HLTrigger/Timer/plugins/FastTimerService.cc index 62abb7a78f1fe..7d9eaefdfe613 100644 --- a/HLTrigger/Timer/plugins/FastTimerService.cc +++ b/HLTrigger/Timer/plugins/FastTimerService.cc @@ -34,6 +34,7 @@ using json = nlohmann::json; #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/StreamID.h" #include "HLTrigger/Timer/interface/memory_usage.h" #include "HLTrigger/Timer/interface/processor_model.h" @@ -766,7 +767,8 @@ void FastTimerService::PlotsPerJob::fill_lumi(AtomicResources const& data, unsig /////////////////////////////////////////////////////////////////////////////// FastTimerService::FastTimerService(const edm::ParameterSet& config, edm::ActivityRegistry& registry) - : // configuration + : tbb::task_scheduler_observer(true), + // configuration callgraph_(), // job configuration concurrent_lumis_(0), @@ -1099,6 +1101,7 @@ void FastTimerService::postSourceLumi(edm::LuminosityBlockIndex index) { } void FastTimerService::postEndJob() { + guard_.finalize(); if (print_job_summary_) { edm::LogVerbatim out("FastReport"); printSummary(out, job_summary_, "Job"); @@ -1662,17 +1665,68 @@ void FastTimerService::postModuleStreamEndLumi(edm::StreamContext const& sc, edm thread().measure_and_accumulate(lumi_transition_[index]); } -void FastTimerService::on_scheduler_entry(bool worker) { - // initialise the measurement point for a thread that has newly joining the TBB pool - thread().measure(); +FastTimerService::ThreadGuard::ThreadGuard() { + auto err = ::pthread_key_create(&key_, retire_thread); + if (err) { + throw cms::Exception("FastTimerService") << "ThreadGuard key creation failed: " << ::strerror(err); + } } -void FastTimerService::on_scheduler_exit(bool worker) { - // account any resources used or freed by the thread before leaving the TBB pool - thread().measure_and_accumulate(overhead_); +// If this is a new thread, register it and return true +bool FastTimerService::ThreadGuard::register_thread(FastTimerService::AtomicResources& r) { + auto ptr = ::pthread_getspecific(key_); + + if (not ptr) { + auto p = thread_resources_.emplace_back(std::make_shared(r)); + auto pp = new std::shared_ptr(*p); + auto err = ::pthread_setspecific(key_, pp); + if (err) { + throw cms::Exception("FastTimerService") << "ThreadGuard pthread_setspecific failed: " << ::strerror(err); + } + return true; + } + return false; +} + +std::shared_ptr* FastTimerService::ThreadGuard::ptr(void* p) { + return static_cast*>(p); +} + +// called when a thread exits +void FastTimerService::ThreadGuard::retire_thread(void* p) { + auto ps = ptr(p); + auto expected = true; + if ((*ps)->live_.compare_exchange_strong(expected, false)) { + // account any resources used or freed by the thread before leaving the TBB pool + (*ps)->measurement_.measure_and_accumulate((*ps)->resource_); + } + delete ps; } -FastTimerService::Measurement& FastTimerService::thread() { return threads_.local(); } +// finalize all threads that have not retired +void FastTimerService::ThreadGuard::finalize() { + for (auto& p : thread_resources_) { + auto expected = true; + if (p->live_.compare_exchange_strong(expected, false)) { + p->measurement_.measure_and_accumulate(p->resource_); + } + } +} + +FastTimerService::Measurement& FastTimerService::ThreadGuard::thread() { + return (*ptr(::pthread_getspecific(key_)))->measurement_; +} + +void FastTimerService::on_scheduler_entry(bool worker) { + if (guard_.register_thread(overhead_)) { + // initialise the measurement point for a thread that has newly joined the TBB pool + thread().measure(); + } +} + +void FastTimerService::on_scheduler_exit(bool worker) {} + +FastTimerService::Measurement& FastTimerService::thread() { return guard_.thread(); } // describe the module's configuration void FastTimerService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { diff --git a/HLTrigger/Timer/plugins/FastTimerService.h b/HLTrigger/Timer/plugins/FastTimerService.h index ac8dc872ec89b..f7616173e5ebd 100644 --- a/HLTrigger/Timer/plugins/FastTimerService.h +++ b/HLTrigger/Timer/plugins/FastTimerService.h @@ -3,6 +3,7 @@ // system headers #include +#include // C++ headers #include @@ -455,9 +456,33 @@ class FastTimerService : public tbb::task_scheduler_observer { std::vector run_summary_; // whole event time accounting per-run std::mutex summary_mutex_; // synchronise access to the summary objects across different threads - // per-thread quantities, lazily allocated - tbb::enumerable_thread_specific, tbb::ets_key_per_instance> - threads_; + // + struct ThreadGuard { + struct specific_t { + specific_t(AtomicResources& r) : resource_(r), live_(true) {} + ~specific_t() = default; + + Measurement measurement_; + AtomicResources& resource_; + std::atomic live_; + }; + + ThreadGuard(); + ~ThreadGuard() = default; + + static void retire_thread(void* t); + static std::shared_ptr* ptr(void* p); + + bool register_thread(FastTimerService::AtomicResources& r); + Measurement& thread(); + void finalize(); + + tbb::concurrent_vector> thread_resources_; + pthread_key_t key_; + }; + + // + ThreadGuard guard_; // atomic variables to keep track of the completion of each step, process by process std::unique_ptr[]> subprocess_event_check_;