Skip to content

Commit

Permalink
Merge pull request #1245 from Dr15Jones/threadSafeTiming
Browse files Browse the repository at this point in the history
Multithreaded fixes -- Modified Timing service to be thread-safe
  • Loading branch information
ktf committed Oct 31, 2013
2 parents bbcec13 + 3933a85 commit 243d040
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 122 deletions.
43 changes: 18 additions & 25 deletions FWCore/Services/interface/Timing.h
Expand Up @@ -8,23 +8,17 @@
// Original Author: Jim Kowalkowski
//

/*
Changes Log 1: 2009/01/14 10:29:00, Natalia Garcia Nebot
Modified the service to add some new measurements:
- Average time per event (cpu and wallclock)
- Fastest time per event (cpu and wallclock)
- Slowest time per event (cpu and wallclock)
*/

#include "DataFormats/Provenance/interface/EventID.h"
#include "DataFormats/Provenance/interface/ProvenanceFwd.h"
#include <atomic>

namespace edm {
class ActivityRegistry;
class Event;
class EventSetup;
class ParameterSet;
class ConfigurationDescriptions;
class StreamContext;
class ModuleCallingContext;

namespace service {
class Timing {
Expand All @@ -35,33 +29,32 @@ namespace edm {
static void fillDescriptions(edm::ConfigurationDescriptions & descriptions);

private:

void postBeginJob();
void postEndJob();

void preEventProcessing(EventID const&, Timestamp const&);
void postEventProcessing(Event const&, EventSetup const&);
void preEvent(StreamContext const&);
void postEvent(StreamContext const&);

void preModule(ModuleDescription const&);
void postModule(ModuleDescription const&);
void preModule(StreamContext const&, ModuleCallingContext const&);
void postModule(StreamContext const&, ModuleCallingContext const&);

EventID curr_event_;
double curr_job_time_; // seconds
double curr_job_cpu_; // seconds
double curr_event_time_; // seconds
double curr_event_cpu_; // seconds
double curr_module_time_; // seconds
double total_event_cpu_; // seconds
std::vector<double> curr_events_time_; // seconds
std::vector<double> curr_events_cpu_; // seconds
std::vector<double> total_events_cpu_; // seconds
bool summary_only_;
bool report_summary_;

//
// Min Max and average event times for summary
// at end of job
double max_event_time_; // seconds
double max_event_cpu_; // seconds
double min_event_time_; // seconds
double min_event_cpu_; // seconds
int total_event_count_;
// Min Max and average event times for each Stream.
// Used for summary at end of job
std::vector<double> max_events_time_; // seconds
std::vector<double> max_events_cpu_; // seconds
std::vector<double> min_events_time_; // seconds
std::vector<double> min_events_cpu_; // seconds
std::atomic<unsigned long> total_event_count_;
};
}
}
Expand Down
200 changes: 103 additions & 97 deletions FWCore/Services/src/Timing.cc
Expand Up @@ -7,32 +7,6 @@
//
// Original Author: Jim Kowalkowski
//
// Change Log
//
// 1 - mf 4/22/08 Facilitate summary output to job report and logs:
// In Timing ctor, default for report_summary_ changed to true
// In postEndJob, add output to logger
//
// 2 - 2009/01/14 10:29:00, Natalia Garcia Nebot
// Modified the service to add some new measurements to report:
// - Average time per event (cpu and wallclock)
// - Fastest time per event (cpu and wallclock)
// - Slowest time per event (cpu and wallclock)
//
// 3 - mf 3/18/09 Change use of LogAbsolute to LogImportant
// so that users can throttle the messages
// for selected destinations. LogImportant
// is treated at the same level as LogError, so
// by default the behavior will not change, but
// there will now be a way to control the verbosity.
//
// 4 - mf 3/18/09 The per-event output TimeEvent is changed to LogPrint.
// The per-module output TimeModule is changed to LogPrint.
//
// 5 - wmtan 2010/03/16
// Fixed the constructor to initialize all data members!
// Standardized coding style
//

#include "FWCore/Services/interface/Timing.h"

Expand All @@ -44,6 +18,9 @@
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
#include "FWCore/ServiceRegistry/interface/SystemBounds.h"
#include "FWCore/Utilities/interface/Exception.h"

#include <iostream>
Expand Down Expand Up @@ -78,32 +55,50 @@ namespace edm {
totalCPUTime += (double)usage.ru_stime.tv_sec + (double(usage.ru_stime.tv_usec) * 1E-6);
return totalCPUTime;
}

//NOTE: We use a per thread stack for module times since unscheduled
// exectuion or tbb task spawning can cause a module to run on the
// same thread as an already running module
static
std::vector<double>& moduleTimeStack() {
static thread_local std::vector<double> s_stack;
return s_stack;
}

Timing::Timing(ParameterSet const& iPS, ActivityRegistry& iRegistry) :
curr_event_(),
curr_job_time_(0.),
curr_job_cpu_(0.),
curr_event_time_(0.),
curr_event_cpu_(0.),
curr_module_time_(0.),
total_event_cpu_(0.),
curr_events_time_(),
curr_events_cpu_(),
total_events_cpu_(),
summary_only_(iPS.getUntrackedParameter<bool>("summaryOnly")),
report_summary_(iPS.getUntrackedParameter<bool>("useJobReport")),
max_event_time_(0.),
max_event_cpu_(0.),
min_event_time_(0.),
min_event_cpu_(0.),
max_events_time_(),
max_events_cpu_(),
min_events_time_(),
min_events_cpu_(),
total_event_count_(0) {
iRegistry.watchPostBeginJob(this, &Timing::postBeginJob);
iRegistry.watchPostEndJob(this, &Timing::postEndJob);

iRegistry.watchPreProcessEvent(this, &Timing::preEventProcessing);
iRegistry.watchPostProcessEvent(this, &Timing::postEventProcessing);
iRegistry.watchPreEvent(this, &Timing::preEvent);
iRegistry.watchPostEvent(this, &Timing::postEvent);

if(not summary_only_) {
iRegistry.watchPreModule(this, &Timing::preModule);
iRegistry.watchPostModule(this, &Timing::postModule);
iRegistry.watchPreModuleEvent(this, &Timing::preModule);
iRegistry.watchPostModuleEvent(this, &Timing::postModule);
}

iRegistry.preallocateSignal_.connect([this](service::SystemBounds const& iBounds){
auto nStreams = iBounds.maxNumberOfStreams();
curr_events_time_.resize(nStreams,0.);
curr_events_cpu_.resize(nStreams,0.);
total_events_cpu_.resize(nStreams,0.);
max_events_time_.resize(nStreams,0.);
max_events_cpu_.resize(nStreams,0.);
min_events_time_.resize(nStreams,1.E6);
min_events_cpu_.resize(nStreams,1.E6);
});
}

Timing::~Timing() {
Expand All @@ -123,11 +118,9 @@ namespace edm {
void Timing::postBeginJob() {
curr_job_time_ = getTime();
curr_job_cpu_ = getCPU();
total_event_cpu_ = 0.0;

//LogInfo("TimeReport")
if(not summary_only_) {
LogImportant("TimeReport") // ChangeLog 3
LogImportant("TimeReport")
<< "TimeReport> Report activated" << "\n"
<< "TimeReport> Report columns headings for events: "
<< "eventnum runnum timetaken\n"
Expand All @@ -142,97 +135,110 @@ namespace edm {
double average_event_time = total_job_time / total_event_count_;

double total_job_cpu = getCPU() - curr_job_cpu_;
double average_event_cpu = total_event_cpu_ / total_event_count_;

//LogAbsolute("FwkJob")
//LogAbsolute("TimeReport") // Changelog 1
LogImportant("TimeReport") // Changelog 3
double total_event_cpu = 0.;
for(auto t : total_events_cpu_) {
total_event_cpu +=t;
}
double average_event_cpu = total_event_cpu / total_event_count_;

double min_event_time = *(std::min_element(min_events_time_.begin(),
min_events_time_.end()));
double max_event_time = *(std::max_element(max_events_time_.begin(),
max_events_time_.end()));
double min_event_cpu = *(std::min_element(min_events_cpu_.begin(),
min_events_cpu_.end()));
double max_event_cpu = *(std::max_element(max_events_cpu_.begin(),
max_events_cpu_.end()));
LogImportant("TimeReport")
<< "TimeReport> Time report complete in "
<< total_job_time << " seconds"
<< "\n"
<< " Time Summary: \n"
<< " - Min event: " << min_event_time_ << "\n"
<< " - Max event: " << max_event_time_ << "\n"
<< " - Min event: " << min_event_time << "\n"
<< " - Max event: " << max_event_time << "\n"
<< " - Avg event: " << average_event_time << "\n"
<< " - Total job: " << total_job_time << "\n"
<< " CPU Summary: \n"
<< " - Min event: " << min_event_cpu_ << "\n"
<< " - Max event: " << max_event_cpu_ << "\n"
<< " - Min event: " << min_event_cpu << "\n"
<< " - Max event: " << max_event_cpu << "\n"
<< " - Avg event: " << average_event_cpu << "\n"
<< " - Total job: " << total_job_cpu << "\n"
<< " - Total event: " << total_event_cpu_ << "\n";
<< " - Total event: " << total_event_cpu << "\n";

if(report_summary_) {
Service<JobReport> reportSvc;
// std::map<std::string, double> reportData;
std::map<std::string, std::string> reportData;

reportData.insert(std::make_pair("MinEventTime", d2str(min_event_time_)));
reportData.insert(std::make_pair("MaxEventTime", d2str(max_event_time_)));
reportData.insert(std::make_pair("MinEventTime", d2str(min_event_time)));
reportData.insert(std::make_pair("MaxEventTime", d2str(max_event_time)));
reportData.insert(std::make_pair("AvgEventTime", d2str(average_event_time)));
reportData.insert(std::make_pair("TotalJobTime", d2str(total_job_time)));
reportData.insert(std::make_pair("MinEventCPU", d2str(min_event_cpu_)));
reportData.insert(std::make_pair("MaxEventCPU", d2str(max_event_cpu_)));
reportData.insert(std::make_pair("MinEventCPU", d2str(min_event_cpu)));
reportData.insert(std::make_pair("MaxEventCPU", d2str(max_event_cpu)));
reportData.insert(std::make_pair("AvgEventCPU", d2str(average_event_cpu)));
reportData.insert(std::make_pair("TotalJobCPU", d2str(total_job_cpu)));
reportData.insert(std::make_pair("TotalEventCPU", d2str(total_event_cpu_)));
reportData.insert(std::make_pair("TotalEventCPU", d2str(total_event_cpu)));

reportSvc->reportPerformanceSummary("Timing", reportData);
// reportSvc->reportTimingInfo(reportData);
}
}

void Timing::preEventProcessing(EventID const& iID, Timestamp const&) {
curr_event_ = iID;
curr_event_time_ = getTime();
curr_event_cpu_ = getCPU();
void Timing::preEvent(StreamContext const& iStream) {
auto index = iStream.streamID().value();
curr_events_time_[index] = getTime();
curr_events_cpu_[index] = getCPU();
}

void Timing::postEventProcessing(Event const&, EventSetup const&) {
curr_event_cpu_ = getCPU() - curr_event_cpu_;
total_event_cpu_ += curr_event_cpu_;
void Timing::postEvent(StreamContext const& iStream) {
auto index = iStream.streamID().value();
double curr_event_cpu = getCPU() - curr_events_cpu_[index];
total_events_cpu_[index] += curr_event_cpu;

curr_event_time_ = getTime() - curr_event_time_;
double curr_event_time = getTime() - curr_events_time_[index];

if(not summary_only_) {
LogPrint("TimeEvent") // ChangeLog 3
auto const & eventID = iStream.eventID();
LogPrint("TimeEvent")
<< "TimeEvent> "
<< curr_event_.event() << " "
<< curr_event_.run() << " "
<< curr_event_time_ << " "
<< curr_event_cpu_ << " "
<< total_event_cpu_;
}
if(total_event_count_ == 0) {
max_event_time_ = curr_event_time_;
min_event_time_ = curr_event_time_;
max_event_cpu_ = curr_event_cpu_;
min_event_cpu_ = curr_event_cpu_;
<< eventID.event() << " "
<< eventID.run() << " "
<< curr_event_time << " "
<< curr_event_cpu << " "
<< total_events_cpu_[index];
}

if(curr_event_time_ > max_event_time_) max_event_time_ = curr_event_time_;
if(curr_event_time_ < min_event_time_) min_event_time_ = curr_event_time_;
if(curr_event_cpu_ > max_event_cpu_) max_event_cpu_ = curr_event_cpu_;
if(curr_event_cpu_ < min_event_cpu_) min_event_cpu_ = curr_event_cpu_;
total_event_count_ = total_event_count_ + 1;
if(curr_event_time > max_events_time_[index]) max_events_time_[index] = curr_event_time;
if(curr_event_time < min_events_time_[index]) min_events_time_[index] = curr_event_time;
if(curr_event_cpu > max_events_cpu_[index]) max_events_cpu_[index] = curr_event_cpu;
if(curr_event_cpu < min_events_cpu_[index]) min_events_cpu_[index] = curr_event_cpu;
++total_event_count_;
}

void Timing::preModule(ModuleDescription const&) {
curr_module_time_ = getTime();
void Timing::preModule(StreamContext const&, ModuleCallingContext const&) {
auto & modStack = moduleTimeStack();
modStack.push_back(getTime());
}

void Timing::postModule(ModuleDescription const& desc) {
double t = getTime() - curr_module_time_;
void Timing::postModule(StreamContext const& iStream, ModuleCallingContext const& iModule) {
//LogInfo("TimeModule")
if(not summary_only_) {
LogPrint("TimeModule") << "TimeModule> " // ChangeLog 4
<< curr_event_.event() << " "
<< curr_event_.run() << " "
<< desc.moduleLabel() << " "
<< desc.moduleName() << " "
<< t;
auto& modStack = moduleTimeStack();
assert(modStack.size() > 0);
double curr_module_time = modStack.back();
modStack.pop_back();
double t = getTime() - curr_module_time;
//move waiting module start times forward to account
// for the fact that they were paused while this module ran
for(auto& waitingModuleStartTime : modStack) {
waitingModuleStartTime +=t;
}

auto const & eventID = iStream.eventID();
auto const & desc = *(iModule.moduleDescription());

LogPrint("TimeModule") << "TimeModule> "
<< eventID.event() << " "
<< eventID.run() << " "
<< desc.moduleLabel() << " "
<< desc.moduleName() << " "
<< t;
}
}
}
Expand Down

0 comments on commit 243d040

Please sign in to comment.