Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch FastTimerService to using a local thread observer #33261

Merged
merged 4 commits into from Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 54 additions & 7 deletions HLTrigger/Timer/plugins/FastTimerService.cc
Expand Up @@ -34,6 +34,7 @@ using json = nlohmann::json;
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "HLTrigger/Timer/interface/memory_usage.h"
#include "HLTrigger/Timer/interface/processor_model.h"
Expand Down Expand Up @@ -766,7 +767,8 @@ void FastTimerService::PlotsPerJob::fill_lumi(AtomicResources const& data, unsig
///////////////////////////////////////////////////////////////////////////////

FastTimerService::FastTimerService(const edm::ParameterSet& config, edm::ActivityRegistry& registry)
: // configuration
: tbb::task_scheduler_observer(true),
// configuration
callgraph_(),
// job configuration
concurrent_lumis_(0),
Expand Down Expand Up @@ -1099,6 +1101,7 @@ void FastTimerService::postSourceLumi(edm::LuminosityBlockIndex index) {
}

void FastTimerService::postEndJob() {
guard_.finalize();
if (print_job_summary_) {
edm::LogVerbatim out("FastReport");
printSummary(out, job_summary_, "Job");
Expand Down Expand Up @@ -1662,17 +1665,61 @@ void FastTimerService::postModuleStreamEndLumi(edm::StreamContext const& sc, edm
thread().measure_and_accumulate(lumi_transition_[index]);
}

void FastTimerService::on_scheduler_entry(bool worker) {
// initialise the measurement point for a thread that has newly joining the TBB pool
thread().measure();
FastTimerService::ThreadGuard::ThreadGuard() {
auto err = ::pthread_key_create(&key_, retire_thread);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, retire_thread is called when the worker thread exits.
Why not call it from on_scheduler_exit instead ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Monitoring the primary arena, on_scheduler_entry and on_scheduler_exit get called many times as threads get moved between different arenas, and there's no way to tell when the call to on_scheduler_exit is the final exit. This is the only reliable way I could think of to catch if a thread gets deleted before the end job sequence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so with this approach, the time (and resources) spent by a thread outside the main arena would be accounted as "overhead", right ?
Which is not a bad thing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that with either the global observer or this PR, what gets measured as overhead is the time outside of any defined CMS module, irrespective of the arena. With the global observer you were getting one call to on_scheduler_enter and on_scheduler_exit per thread, and I'm trying to replicate that with the observer on the primary arena.

if (err) {
throw cms::Exception("FastTimerService") << "ThreadGuard key creation failed: " << ::strerror(err);
}
}

// If this is a new thread, register it and return true
bool FastTimerService::ThreadGuard::register_thread(FastTimerService::AtomicResources& r) {
auto ptr = ::pthread_getspecific(key_);

if (not ptr) {
auto p = thread_resources_.emplace_back(std::make_unique<specific_t>(r));
auto err = ::pthread_setspecific(key_, p->get());
if (err) {
throw cms::Exception("FastTimerService") << "ThreadGuard pthread_setspecific failed: " << ::strerror(err);
}
return true;
}
return false;
}

void FastTimerService::on_scheduler_exit(bool worker) {
// called when a thread exits
void FastTimerService::ThreadGuard::retire_thread(void* ptr) {
auto p = static_cast<specific_t*>(ptr);
// account any resources used or freed by the thread before leaving the TBB pool
thread().measure_and_accumulate(overhead_);
p->measurement_.measure_and_accumulate(p->resource_);
p->live_ = false;
}

FastTimerService::Measurement& FastTimerService::thread() { return threads_.local(); }
// finalize all threads that have not retired
void FastTimerService::ThreadGuard::finalize() {
for (auto& p : thread_resources_) {
if (p->live_) {
p->measurement_.measure_and_accumulate(p->resource_);
}
}
}

FastTimerService::Measurement& FastTimerService::ThreadGuard::thread() {
auto ptr = ::pthread_getspecific(key_);
auto p = static_cast<ThreadGuard::specific_t*>(ptr);
return p->measurement_;
}

void FastTimerService::on_scheduler_entry(bool worker) {
if (guard_.register_thread(overhead_)) {
// initialise the measurement point for a thread that has newly joined the TBB pool
thread().measure();
}
}

void FastTimerService::on_scheduler_exit(bool worker) {}

FastTimerService::Measurement& FastTimerService::thread() { return guard_.thread(); }

// describe the module's configuration
void FastTimerService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
Expand Down
30 changes: 27 additions & 3 deletions HLTrigger/Timer/plugins/FastTimerService.h
Expand Up @@ -3,6 +3,7 @@

// system headers
#include <unistd.h>
#include <pthread.h>

// C++ headers
#include <chrono>
Expand Down Expand Up @@ -455,9 +456,32 @@ class FastTimerService : public tbb::task_scheduler_observer {
std::vector<ResourcesPerJob> run_summary_; // whole event time accounting per-run
std::mutex summary_mutex_; // synchronise access to the summary objects across different threads

// per-thread quantities, lazily allocated
tbb::enumerable_thread_specific<Measurement, tbb::cache_aligned_allocator<Measurement>, tbb::ets_key_per_instance>
threads_;
//
struct ThreadGuard {
struct specific_t {
specific_t(AtomicResources& r) : resource_(r), live_(true) {}
~specific_t() = default;

Measurement measurement_;
AtomicResources& resource_;
bool live_;
};

ThreadGuard();
~ThreadGuard() = default;

static void retire_thread(void* t);

bool register_thread(FastTimerService::AtomicResources& r);
Measurement& thread();
void finalize();

tbb::concurrent_vector<std::unique_ptr<specific_t>> thread_resources_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why a tbb::concurrent_vector<std::unique_ptr<specific_t>> ?
wouldn't tbb::concurrent_vector<specific_t> also work, and avoid one indirection ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it needs to be std::shared_ptr. Otherwise there's a potential use-after-free if the threads exit after the FastTimerService gets destructed. We've observed that TBB often doesn't destroy threads until global destructors are called (I believe using the global observer you may actually have been missing some overhead from on_scheduler_exit getting called after postEndJob).

pthread_key_t key_;
};

//
ThreadGuard guard_;

// atomic variables to keep track of the completion of each step, process by process
std::unique_ptr<std::atomic<unsigned int>[]> subprocess_event_check_;
Expand Down