Skip to content

Commit

Permalink
Merge pull request #4583 from Dr15Jones/makeSystemTimingCorrectWithUn…
Browse files Browse the repository at this point in the history
…scheduled

Make system timing correct with unscheduled execution
  • Loading branch information
davidlange6 committed Jul 9, 2014
2 parents 5386b88 + 4ab074f commit d9b7918
Show file tree
Hide file tree
Showing 22 changed files with 492 additions and 271 deletions.
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/Schedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#include "FWCore/Framework/src/WorkerRegistry.h"
#include "FWCore/Framework/src/GlobalSchedule.h"
#include "FWCore/Framework/src/StreamSchedule.h"
#include "FWCore/Framework/src/SystemTimeKeeper.h"
#include "FWCore/Framework/src/PreallocationConfiguration.h"
#include "FWCore/MessageLogger/interface/ExceptionMessages.h"
#include "FWCore/MessageLogger/interface/JobReport.h"
Expand Down Expand Up @@ -249,6 +250,7 @@ namespace edm {
AllOutputModuleCommunicators all_output_communicators_;
PreallocationConfiguration preallocConfig_;

std::unique_ptr<SystemTimeKeeper> summaryTimeKeeper_;

bool wantSummary_;

Expand Down
6 changes: 2 additions & 4 deletions FWCore/Framework/interface/UnscheduledCallProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ namespace edm {
for(std::map<std::string, Worker*>::iterator it = labelToWorkers_.begin(), itEnd=labelToWorkers_.end();
it != itEnd;
++it) {
CPUTimer timer;
try {
ParentContext parentContext(context);
it->second->doWork<T>(p, es, &timer,streamID, parentContext, topContext);
it->second->doWork<T>(p, es, streamID, parentContext, topContext);
}
catch (cms::Exception & ex) {
std::ostringstream ost;
Expand Down Expand Up @@ -79,11 +78,10 @@ namespace edm {
std::map<std::string, Worker*>::const_iterator itFound =
labelToWorkers_.find(moduleLabel);
if(itFound != labelToWorkers_.end()) {
CPUTimer timer;
try {
ParentContext parentContext(mcc);
itFound->second->doWork<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> >(event,
eventSetup, &timer,event.streamID(), parentContext, mcc->getStreamContext());
eventSetup, event.streamID(), parentContext, mcc->getStreamContext());
}
catch (cms::Exception & ex) {
std::ostringstream ost;
Expand Down
3 changes: 1 addition & 2 deletions FWCore/Framework/interface/WorkerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ namespace edm {
PreallocationConfiguration const* prealloc,
std::shared_ptr<ProcessConfiguration> processConfiguration,
std::string label,
bool useStopwatch,
std::set<std::string>& unscheduledLabels,
std::vector<std::string>& shouldBeUsedLabels);

Expand All @@ -63,7 +62,7 @@ namespace edm {

AllWorkers const& allWorkers() const {return allWorkers_;}

void addToAllWorkers(Worker* w, bool useStopwatch);
void addToAllWorkers(Worker* w);

ExceptionToActionTable const& actionTable() const {return *actionTable_;}

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/src/GlobalSchedule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace edm {

void
GlobalSchedule::addToAllWorkers(Worker* w) {
workerManager_.addToAllWorkers(w, false);
workerManager_.addToAllWorkers(w);
}

}
2 changes: 1 addition & 1 deletion FWCore/Framework/src/GlobalSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ namespace edm {
for(auto & worker: allWorkers()) {
try {
ParentContext parentContext(context);
worker->doWork<T>(p, es, nullptr,StreamID::invalidStreamID(), parentContext, context);
worker->doWork<T>(p, es,StreamID::invalidStreamID(), parentContext, context);
}
catch (cms::Exception & ex) {
std::ostringstream ost;
Expand Down
12 changes: 0 additions & 12 deletions FWCore/Framework/src/Path.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace edm {
std::shared_ptr<ActivityRegistry> areg,
StreamContext const* streamContext,
PathContext::PathType pathType) :
stopwatch_(),
timesRun_(),
timesPassed_(),
timesFailed_(),
Expand All @@ -35,7 +34,6 @@ namespace edm {
}

Path::Path(Path const& r) :
stopwatch_(r.stopwatch_),
timesRun_(r.timesRun_),
timesPassed_(r.timesPassed_),
timesFailed_(r.timesFailed_),
Expand Down Expand Up @@ -163,16 +161,6 @@ namespace edm {
for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
}

void
Path::useStopwatch() {
stopwatch_.reset(new RunStopwatch::StopwatchPointer::element_type);
for(WorkersInPath::iterator it=workers_.begin(), itEnd = workers_.end();
it != itEnd;
++it) {
it->useStopwatch();
}
}

void
Path::setEarlyDeleteHelpers(std::map<const Worker*,EarlyDeleteHelper*> const& iWorkerToDeleter) {
//we use a temp so we can overset the size but then when moving to earlyDeleteHelpers we only
Expand Down
20 changes: 0 additions & 20 deletions FWCore/Framework/src/Path.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#include <exception>
#include <sstream>

#include "FWCore/Framework/src/RunStopwatch.h"

namespace edm {
class EventPrincipal;
class ModuleDescription;
Expand Down Expand Up @@ -64,17 +62,6 @@ namespace edm {
int bitPosition() const { return bitpos_; }
std::string const& name() const { return pathContext_.pathName(); }

std::pair<double, double> timeCpuReal() const {
if(stopwatch_) {
return std::pair<double, double>(stopwatch_->cpuTime(), stopwatch_->realTime());
}
return std::pair<double, double>(0., 0.);
}

std::pair<double, double> timeCpuReal(unsigned int const i) const {
return workers_.at(i).timeCpuReal();
}

void clearCounters();

int timesRun() const { return timesRun_; }
Expand All @@ -93,14 +80,12 @@ namespace edm {

void setEarlyDeleteHelpers(std::map<const Worker*,EarlyDeleteHelper*> const&);

void useStopwatch();
private:

// If you define this be careful about the pointer in the
// PlaceInPathContext object in the contained WorkerInPath objects.
Path const& operator=(Path const&) = delete; // stop default

RunStopwatch::StopwatchPointer stopwatch_;
int timesRun_;
int timesPassed_;
int timesFailed_;
Expand Down Expand Up @@ -169,14 +154,9 @@ namespace edm {
void Path::processOneOccurrence(typename T::MyPrincipal& ep, EventSetup const& es,
StreamID const& streamID, typename T::Context const* context) {

//Create the PathSignalSentry before the RunStopwatch so that
// we only record the time spent in the path not from the signal
int nwrwue = -1;
PathSignalSentry<T> signaler(actReg_.get(), nwrwue, state_, &pathContext_);

// A RunStopwatch, but only if we are processing an event.
RunStopwatch stopwatch(T::isEvent_ ? stopwatch_ : RunStopwatch::StopwatchPointer());

if (T::isEvent_) {
++timesRun_;
}
Expand Down
65 changes: 0 additions & 65 deletions FWCore/Framework/src/RunStopwatch.h

This file was deleted.

35 changes: 32 additions & 3 deletions FWCore/Framework/src/Schedule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,37 @@ namespace edm {
c->setEventSelectionInfo(outputModulePathPositions, preg.anyProductProduced());
c->selectProducts(preg);
}

if(wantSummary_) {
std::vector<const ModuleDescription*> modDesc;
const auto& workers = allWorkers();
modDesc.reserve(workers.size());

std::transform(workers.begin(),workers.end(),
std::back_inserter(modDesc),
[](const Worker* iWorker) -> const ModuleDescription* {
return iWorker->descPtr();
});

summaryTimeKeeper_.reset(new SystemTimeKeeper(prealloc.numberOfStreams(),
modDesc,
tns));
auto timeKeeperPtr = summaryTimeKeeper_.get();

areg->watchPreModuleEvent(timeKeeperPtr, &SystemTimeKeeper::startModuleEvent);
areg->watchPostModuleEvent(timeKeeperPtr, &SystemTimeKeeper::stopModuleEvent);
areg->watchPreModuleEventDelayedGet(timeKeeperPtr, &SystemTimeKeeper::pauseModuleEvent);
areg->watchPostModuleEventDelayedGet(timeKeeperPtr,&SystemTimeKeeper::restartModuleEvent);

areg->watchPreSourceEvent(timeKeeperPtr, &SystemTimeKeeper::startEvent);
areg->watchPostEvent(timeKeeperPtr, &SystemTimeKeeper::stopEvent);

areg->watchPrePathEvent(timeKeeperPtr, &SystemTimeKeeper::startPath);
areg->watchPostPathEvent(timeKeeperPtr, &SystemTimeKeeper::stopPath);
//areg->preModuleEventSignal_.connect([timeKeeperPtr](StreamContext const& iContext, ModuleCallingContext const& iMod) {
//timeKeeperPtr->startModuleEvent(iContext,iMod);
//});
}

} // Schedule::Schedule

Expand Down Expand Up @@ -1005,9 +1036,7 @@ namespace edm {
rep.eventSummary.totalEvents = 0;
rep.eventSummary.cpuTime = 0.;
rep.eventSummary.realTime = 0.;
for(auto& s: streamSchedules_) {
s->getTriggerTimingReport(rep);
}
summaryTimeKeeper_->fillTriggerTimingReport(rep);
}

int
Expand Down
71 changes: 2 additions & 69 deletions FWCore/Framework/src/StreamSchedule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,11 @@ namespace edm {
results_inserter_(),
trig_paths_(),
end_paths_(),
stopwatch_(tns.wantSummary() ? new RunStopwatch::StopwatchPointer::element_type : static_cast<RunStopwatch::StopwatchPointer::element_type*> (nullptr)),
total_events_(),
total_passed_(),
number_of_unscheduled_modules_(0),
streamID_(streamID),
streamContext_(streamID_, processContext),
wantSummary_(tns.wantSummary()),
endpathsAreActive_(true) {

ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
Expand Down Expand Up @@ -214,7 +212,7 @@ namespace edm {
ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked));
assert(isTracked);
assert(modulePSet != nullptr);
workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, wantSummary_, unscheduledLabels, shouldBeUsedLabels);
workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
} else {
//everthing is marked are unused so no 'on demand' allowed
shouldBeUsedLabels.push_back(label);
Expand Down Expand Up @@ -479,9 +477,6 @@ namespace edm {
// an empty path will cause an extra bit that is not used
if (!tmpworkers.empty()) {
trig_paths_.emplace_back(bitpos, name, tmpworkers, trptr, actionTable(), actReg_, &streamContext_, PathContext::PathType::kPath);
if (wantSummary_) {
trig_paths_.back().useStopwatch();
}
} else {
empty_trig_paths_.push_back(bitpos);
empty_trig_path_names_.push_back(name);
Expand All @@ -505,9 +500,6 @@ namespace edm {

if (!tmpworkers.empty()) {
end_paths_.emplace_back(bitpos, name, tmpworkers, TrigResPtr(), actionTable(), actReg_, &streamContext_, PathContext::PathType::kEndPath);
if (wantSummary_) {
end_paths_.back().useStopwatch();
}
}
for_all(holder, std::bind(&StreamSchedule::addToAllWorkers, this, _1));
}
Expand Down Expand Up @@ -646,65 +638,6 @@ namespace edm {
fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerSummary);
}

static void
fillModuleInPathTimingSummary(Path const& path,
size_t which,
ModuleInPathTimingSummary& sum) {
sum.timesVisited = +path.timesVisited(which);
auto times = path.timeCpuReal(which);
sum.cpuTime += times.first;
sum.realTime += path.timesFailed(which);
sum.moduleLabel = path.getWorker(which)->description().moduleLabel();
}

static void
fillPathTimingSummary(Path const& path, PathTimingSummary& sum) {
sum.name = path.name();
sum.bitPosition = path.bitPosition();
sum.timesRun += path.timesRun();
auto times = path.timeCpuReal();
sum.cpuTime += times.first;
sum.realTime += times.second;

Path::size_type sz = path.size();
if(sum.moduleInPathSummaries.size()==0) {
std::vector<ModuleInPathTimingSummary> temp(sz);
for (size_t i = 0; i != sz; ++i) {
fillModuleInPathTimingSummary(path, i, temp[i]);
}
sum.moduleInPathSummaries.swap(temp);
} else {
assert(sz == sum.moduleInPathSummaries.size());
for (size_t i = 0; i != sz; ++i) {
fillModuleInPathTimingSummary(path, i, sum.moduleInPathSummaries[i]);
}
}
}

static void
fillWorkerTimingSummaryAux(Worker const& w, WorkerTimingSummary& sum) {
sum.timesVisited += w.timesVisited();
sum.timesRun += w.timesRun();
auto times = w.timeCpuReal();
sum.cpuTime += times.first;
sum.realTime += times.second;
sum.moduleLabel = w.description().moduleLabel();
}

static void
fillWorkerTimingSummary(Worker const* pw, WorkerTimingSummary& sum) {
fillWorkerTimingSummaryAux(*pw, sum);
}

void
StreamSchedule::getTriggerTimingReport(TriggerTimingReport& rep) const {
rep.eventSummary.totalEvents += totalEvents();

fill_summary(trig_paths_, rep.trigPathSummaries, &fillPathTimingSummary);
fill_summary(end_paths_, rep.endPathSummaries, &fillPathTimingSummary);
fill_summary(allWorkers(), rep.workerSummaries, &fillWorkerTimingSummary);
}

void
StreamSchedule::clearCounters() {
using std::placeholders::_1;
Expand All @@ -721,7 +654,7 @@ namespace edm {

void
StreamSchedule::addToAllWorkers(Worker* w) {
workerManager_.addToAllWorkers(w, wantSummary_);
workerManager_.addToAllWorkers(w);
}

void
Expand Down

0 comments on commit d9b7918

Please sign in to comment.