diff --git a/FWCore/Concurrency/scripts/edmStreamStallGrapher.py b/FWCore/Concurrency/scripts/edmStreamStallGrapher.py index a370c88ceefb2..23f10658813d0 100755 --- a/FWCore/Concurrency/scripts/edmStreamStallGrapher.py +++ b/FWCore/Concurrency/scripts/edmStreamStallGrapher.py @@ -137,6 +137,7 @@ def processingStepsFromStallMonitorOutput(f,moduleNames): class StallMonitorParser(object): def __init__(self,f): numStreams = 0 + numStreamsFromSource = 0 moduleNames = {} for rawl in f: l = rawl.strip() @@ -146,11 +147,17 @@ def __init__(self,f): #found global begin run numStreams = int(i[1])+1 break + if numStreams == 0 and l and l[0] == 'S': + s = int(l.split(' ')[1]) + if s > numStreamsFromSource: + numStreamsFromSource = s if len(l) > 5 and l[0:2] == "#M": (id,name)=tuple(l[2:].split()) moduleNames[id] = name continue self._f = f + if numStreams == 0: + numStreams = numStreamsFromSource +1 self.numStreams =numStreams self._moduleNames = moduleNames self.maxNameSize =0 diff --git a/FWCore/Modules/src/TimeStudyModules.cc b/FWCore/Modules/src/TimeStudyModules.cc new file mode 100644 index 0000000000000..8b6a32544b934 --- /dev/null +++ b/FWCore/Modules/src/TimeStudyModules.cc @@ -0,0 +1,374 @@ +// -*- C++ -*- +// +// Package: FWCore/Modules +// Class : TimeStudyModules +// +// Implementation: +// [Notes on implementation] +// +// Original Author: Chris Jones +// Created: Thu, 22 Mar 2018 16:23:48 GMT +// + +// system include files +#include +#include +#include +#include +#include +#include + +// user include files +#include "FWCore/Framework/interface/global/EDProducer.h" +#include "FWCore/Framework/interface/one/EDProducer.h" +#include "FWCore/Framework/interface/one/EDAnalyzer.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/ConsumesCollector.h" + +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/Utilities/interface/EDGetToken.h" +#include "FWCore/Utilities/interface/InputTag.h" + +#include "FWCore/ServiceRegistry/interface/SystemBounds.h" + +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/ServiceRegistry/interface/ServiceMaker.h" +#include "FWCore/ServiceRegistry/interface/Service.h" + + +namespace timestudy { + namespace { + struct Sleeper { + Sleeper(edm::ParameterSet const& p, edm::ConsumesCollector&& iCol ) { + auto const& cv = p.getParameter>("consumes"); + tokens_.reserve(cv.size()); + for(auto const& c: cv) { + tokens_.emplace_back( iCol.consumes(c)); + } + + auto const& tv = p.getParameter>("eventTimes"); + eventTimes_.reserve(tv.size()); + for(auto t: tv) { + eventTimes_.push_back( static_cast(t*1E6)); + } + } + + void + getAndSleep(edm::Event const& e) const { + edm::Handle h; + for(auto const&t: tokens_) { + e.getByToken(t,h); + } + //Event number minimum value is 1 + usleep( eventTimes_[ (e.id().event()-1) % eventTimes_.size()]); + } + + static void fillDescription(edm::ParameterSetDescription& desc) { + desc.add>("consumes", {})->setComment("What event int data products to consume"); + desc.add>("eventTimes")->setComment("The time, in seconds, for how long the module should sleep each event. The index to use is based on a modulo of size of the list applied to the Event ID number."); + } + + private: + std::vector> tokens_; + std::vector eventTimes_; + + }; + } +//-------------------------------------------------------------------- +// +// Produces an IntProduct instance. +// +class SleepingProducer : public edm::global::EDProducer<> { +public: + explicit SleepingProducer(edm::ParameterSet const& p) : + value_(p.getParameter("ivalue")), + sleeper_(p, consumesCollector()) + { + produces(); + } + void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override; + + static void fillDescriptions(edm::ConfigurationDescriptions & descriptions); + +private: + int value_; + Sleeper sleeper_; +}; + +void +SleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const { + // EventSetup is not used. + sleeper_.getAndSleep(e); + + e.put(std::make_unique(value_)); +} + +void +SleepingProducer::fillDescriptions(edm::ConfigurationDescriptions & descriptions) { + edm::ParameterSetDescription desc; + + desc.add("ivalue")->setComment("Value to put into Event"); + Sleeper::fillDescription(desc); + + descriptions.addDefault(desc); +} + + class OneSleepingProducer : public edm::one::EDProducer { + public: + explicit OneSleepingProducer(edm::ParameterSet const& p) : + value_(p.getParameter("ivalue")), + sleeper_(p, consumesCollector()) + { + produces(); + usesResource(p.getParameter("resource")); + } + void produce( edm::Event& e, edm::EventSetup const& c) override; + + static void fillDescriptions(edm::ConfigurationDescriptions & descriptions); + + private: + int value_; + Sleeper sleeper_; + }; + + void + OneSleepingProducer::produce(edm::Event& e, edm::EventSetup const&) { + // EventSetup is not used. + sleeper_.getAndSleep(e); + + e.put(std::make_unique(value_)); + } + + void + OneSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions & descriptions) { + edm::ParameterSetDescription desc; + + desc.add("ivalue")->setComment("Value to put into Event"); + desc.add("resource",std::string())->setComment("The name of the resource that is being shared"); + Sleeper::fillDescription(desc); + + descriptions.addDefault(desc); + } + + class OneSleepingAnalyzer : public edm::one::EDAnalyzer<> { + public: + explicit OneSleepingAnalyzer(edm::ParameterSet const& p) : + sleeper_(p, consumesCollector()) + { + } + void analyze( edm::Event const& e, edm::EventSetup const& c) override; + + static void fillDescriptions(edm::ConfigurationDescriptions & descriptions); + + private: + int value_; + Sleeper sleeper_; + }; + + void + OneSleepingAnalyzer::analyze(edm::Event const& e, edm::EventSetup const&) { + // EventSetup is not used. + sleeper_.getAndSleep(e); + } + + void + OneSleepingAnalyzer::fillDescriptions(edm::ConfigurationDescriptions & descriptions) { + edm::ParameterSetDescription desc; + + Sleeper::fillDescription(desc); + + descriptions.addDefault(desc); + } + + /* + The SleepingServer is configured to wait to accumulate X events before starting to run. + On a call to asyncWork + -the data will be added to the streams' slot then the waiting thread will be informed + -if the server is waiting on threads + - it wakes up and sleeps for 'initTime' + - it then checks to see if another event was pushed and if it does it continues to do the sleep loop + - once all sleep are done, it checks to see if enough events have contacted it and if so it sleeps for the longest 'workTime' duration given + - when done, it sleeps for each event 'finishTime' and when it wakes it sends the callback + - when all calledback, it goes back to check if there are waiting events + - if there are not enough waiting events, it goes back to waiting on a condition variable + + The SleepingServer keeps track of the number of active Streams by counting the number of streamBeginLumi and streamEndLumi calls have taken place. If there are insufficient active Lumis compared to the number of events it wants to wait for, the Server thread is told to start processing without further waiting. + + */ + class SleepingServer { + public: + SleepingServer(edm::ParameterSet const& iPS, edm::ActivityRegistry& iAR): + nWaitingEvents_(iPS.getUntrackedParameter("nWaitingEvents")) + { + iAR.watchPreallocate([this](edm::service::SystemBounds const& iBounds) { + auto const nStreams =iBounds.maxNumberOfStreams(); + waitingStreams_.reserve(nStreams); + waitTimesPerStream_.resize(nStreams); + waitingTaskPerStream_.resize(nStreams); + }); + + iAR.watchPreEndJob([this]() { + stopProcessing_ = true; + condition_.notify_one(); + serverThread_->join(); + }); + iAR.watchPreStreamBeginLumi([this](edm::StreamContext const&) { + ++activeStreams_; + }); + iAR.watchPreStreamEndLumi([this](edm::StreamContext const&) { + --activeStreams_; + condition_.notify_one(); + }); + + serverThread_ = std::make_unique([this]() { threadWork(); } ); + } + + void asyncWork(edm::StreamID id, edm::WaitingTaskWithArenaHolder iTask, long initTime, long workTime, long finishTime) { + waitTimesPerStream_[id.value()]={{initTime,workTime,finishTime}}; + waitingTaskPerStream_[id.value()]=std::move(iTask); + { + std::lock_guard lk{mutex_}; + waitingStreams_.push_back(id.value()); + } + condition_.notify_one(); + } + + private: + bool readyToDoSomething() { + if(stopProcessing_) { + return true; + } + if(waitingStreams_.size() >= nWaitingEvents_) { + return true; + } + //every running stream is now waiting + return waitingStreams_.size() == activeStreams_; + } + + void threadWork() { + while(not stopProcessing_.load()) { + std::vector streamsToProcess; + { + std::unique_lock lk(mutex_); + condition_.wait(lk, [this]() { + return readyToDoSomething(); + }); + swap(streamsToProcess,waitingStreams_); + } + if(stopProcessing_) { + break; + } + long longestTime = 0; + //simulate filling the external device + for(auto i: streamsToProcess) { + auto const& v=waitTimesPerStream_[i]; + if(v[1]>longestTime) { + longestTime = v[1]; + } + usleep(v[0]); + } + //simulate running external device + usleep(longestTime); + + //simulate copying data back + for(auto i: streamsToProcess) { + auto const& v=waitTimesPerStream_[i]; + usleep(v[2]); + waitingTaskPerStream_[i].doneWaiting(std::exception_ptr()); + } + } + waitingTaskPerStream_.clear(); + } + const unsigned int nWaitingEvents_; + std::unique_ptr serverThread_; + std::vector waitingStreams_; + std::vector> waitTimesPerStream_; + std::vector waitingTaskPerStream_; + std::mutex mutex_; + std::condition_variable condition_; + std::atomic activeStreams_{0}; + std::atomic stopProcessing_{false}; + }; + + class ExternalWorkSleepingProducer : public edm::global::EDProducer { + public: + explicit ExternalWorkSleepingProducer(edm::ParameterSet const& p) : + value_(p.getParameter("ivalue")), + sleeper_(p, consumesCollector()) + { + { + auto const& tv = p.getParameter>("serviceInitTimes"); + initTimes_.reserve(tv.size()); + for(auto t: tv) { + initTimes_.push_back( static_cast(t*1E6)); + } + } + { + auto const& tv = p.getParameter>("serviceWorkTimes"); + workTimes_.reserve(tv.size()); + for(auto t: tv) { + workTimes_.push_back( static_cast(t*1E6)); + } + } + { + auto const& tv = p.getParameter>("serviceFinishTimes"); + finishTimes_.reserve(tv.size()); + for(auto t: tv) { + finishTimes_.push_back( static_cast(t*1E6)); + } + } + assert(finishTimes_.size() == initTimes_.size()); + assert(workTimes_.size() == initTimes_.size()); + + produces(); + } + void acquire(edm::StreamID, edm::Event const & e, edm::EventSetup const& c, edm::WaitingTaskWithArenaHolder holder) const override; + + void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override; + + static void fillDescriptions(edm::ConfigurationDescriptions & descriptions); + + private: + std::vector initTimes_; + std::vector workTimes_; + std::vector finishTimes_; + int value_; + Sleeper sleeper_; + }; + + void + ExternalWorkSleepingProducer::acquire(edm::StreamID id, edm::Event const& e, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) const { + // EventSetup is not used. + sleeper_.getAndSleep(e); + edm::Service server; + auto index = (e.id().event()-1) % initTimes_.size(); + server->asyncWork(id, std::move(holder), initTimes_[index], workTimes_[index], finishTimes_[index]); + } + + void + ExternalWorkSleepingProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const { + e.put(std::make_unique(value_)); + } + + void + ExternalWorkSleepingProducer::fillDescriptions(edm::ConfigurationDescriptions & descriptions) { + edm::ParameterSetDescription desc; + + desc.add("ivalue")->setComment("Value to put into Event"); + desc.add>("serviceInitTimes"); + desc.add>("serviceWorkTimes"); + desc.add>("serviceFinishTimes"); + Sleeper::fillDescription(desc); + + descriptions.addDefault(desc); + } + +} +DEFINE_FWK_SERVICE(timestudy::SleepingServer); +DEFINE_FWK_MODULE(timestudy::SleepingProducer); +DEFINE_FWK_MODULE(timestudy::OneSleepingProducer); +DEFINE_FWK_MODULE(timestudy::ExternalWorkSleepingProducer); +DEFINE_FWK_MODULE(timestudy::OneSleepingAnalyzer); + diff --git a/FWCore/Modules/test/FWCoreModulesTest.sh b/FWCore/Modules/test/FWCoreModulesTest.sh index 6b1ac1515e6b4..606e4a4fa1106 100755 --- a/FWCore/Modules/test/FWCoreModulesTest.sh +++ b/FWCore/Modules/test/FWCoreModulesTest.sh @@ -14,5 +14,7 @@ echo checkcacheidentifier cmsRun ${LOCAL_TEST_DIR}/checkcacheidentifier_cfg.py || die 'failed running cmsRun checkcacheidentifier_cfg.py' $? echo testPathStatusFilter cmsRun ${LOCAL_TEST_DIR}/testPathStatusFilter_cfg.py || die 'failed running cmsRun testPathStatusFilter_cfg.py' $? +echo sleepingModules +cmsRun ${LOCAL_TEST_DIR}/sleepingModules_cfg.py || die 'failed running cmsRun sleepingModules_cfg.py' $? popd diff --git a/FWCore/Modules/test/sleepingModules_cfg.py b/FWCore/Modules/test/sleepingModules_cfg.py new file mode 100644 index 0000000000000..e76e6f67df11e --- /dev/null +++ b/FWCore/Modules/test/sleepingModules_cfg.py @@ -0,0 +1,67 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("Test") + +process.source =cms.Source("EmptySource") + +process.maxEvents = cms.untracked.PSet(input = cms.untracked.int32(10)) + +process.options = cms.untracked.PSet(numberOfThreads = cms.untracked.uint32(4), + numberOfStreams = cms.untracked.uint32(0)) + +#allows something like simulation of source +process.s1 = cms.EDProducer("timestudy::OneSleepingProducer", + resource = cms.string("source"), + ivalue = cms.int32(1), + consumes = cms.VInputTag(), + eventTimes = cms.vdouble(0.01,0.005)) + +process.s2 = cms.EDProducer("timestudy::OneSleepingProducer", + resource = cms.string("source"), + ivalue = cms.int32(2), + consumes = cms.VInputTag(), + eventTimes = cms.vdouble(0.02,0.03)) + +process.p1 = cms.EDProducer("timestudy::SleepingProducer", + ivalue = cms.int32(3), + consumes = cms.VInputTag("s1","s2"), + eventTimes = cms.vdouble(0.05)) + + +process.p2 = cms.EDProducer("timestudy::SleepingProducer", + ivalue = cms.int32(3), + consumes = cms.VInputTag("s2"), + eventTimes = cms.vdouble(0.03)) + +process.p3 = cms.EDProducer("timestudy::SleepingProducer", + ivalue = cms.int32(3), + consumes = cms.VInputTag("p1","p2"), + eventTimes = cms.vdouble(0.03)) + +#external work +process.add_(cms.Service("timestudy::SleepingServer", + nWaitingEvents = cms.untracked.uint32(4))) + +process.e = cms.EDProducer("timestudy::ExternalWorkSleepingProducer", + consumes = cms.VInputTag("p2","p3"), + ivalue = cms.int32(10), + eventTimes = cms.vdouble(0.01), + serviceInitTimes = cms.vdouble(0.,0.), + serviceWorkTimes = cms.vdouble(0.1,0.15), + serviceFinishTimes = cms.vdouble(0.,0.) +) + +#approximates an OutputModule +process.out = cms.EDAnalyzer("timestudy::OneSleepingAnalyzer", + consumes = cms.VInputTag("s1","s2", "p1", "p2", "p3","e"), + eventTimes = cms.vdouble(0.02,0.03) +) + + +process.o = cms.EndPath(process.out, cms.Task(process.s1,process.s2,process.p1,process.p2,process.p3,process.e)) + +#process.add_(cms.Service("Tracer")) + +#process.add_(cms.Service("StallMonitor", fileName = cms.untracked.string("stall_sleep.log"))) + +process.add_(cms.Service("ZombieKillerService", secondsBetweenChecks = cms.untracked.uint32(10))) \ No newline at end of file