Skip to content

Commit

Permalink
Merge pull request #1575 from fwyzard/add_StreamContext_to_Pre_PostSo…
Browse files Browse the repository at this point in the history
…urce

Multithreading fixes -- Add a StreamContext to the Pre/PostSource signals
  • Loading branch information
ktf committed Nov 27, 2013
2 parents 0c16833 + 9f94d72 commit 4953851
Show file tree
Hide file tree
Showing 27 changed files with 115 additions and 92 deletions.
12 changes: 10 additions & 2 deletions DQMServices/Core/src/DQMService.cc
Expand Up @@ -34,6 +34,10 @@ static void
restrictDQMAccessM(const edm::ModuleDescription &)
{ restrictDQMAccess(); }

static void
restrictDQMAccessS(edm::StreamID)
{ restrictDQMAccess(); }

/// Release access to the DQM core.
static void
releaseDQMAccess(void)
Expand All @@ -43,6 +47,10 @@ static void
releaseDQMAccessM(const edm::ModuleDescription &)
{ releaseDQMAccess(); }

static void
releaseDQMAccessS(edm::StreamID)
{ releaseDQMAccess(); }

// -------------------------------------------------------------------
DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
: store_(&*edm::Service<DQMStore>()),
Expand All @@ -53,8 +61,8 @@ DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
{
ar.watchPreSourceConstruction(&restrictDQMAccessM);
ar.watchPostSourceConstruction(&releaseDQMAccessM);
ar.watchPreSource(&restrictDQMAccess);
ar.watchPostSource(&releaseDQMAccess);
ar.watchPreSourceEvent(&restrictDQMAccessS);
ar.watchPostSourceEvent(&releaseDQMAccessS);
ar.watchPreModule(&restrictDQMAccessM);
ar.watchPostModule(&releaseDQMAccessM);
ar.watchPostProcessEvent(this, &DQMService::flush);
Expand Down
Expand Up @@ -50,7 +50,6 @@ class BlockWipedAllocatorService {

blockWipedPool(&pool);
if (m_useAlloc) BlockWipedPoolAllocated::usePool();
iAR.watchPreSource(this,&BlockWipedAllocatorService::preSource);
iAR.watchPreProcessEvent(this,&BlockWipedAllocatorService::preEventProcessing);
iAR.watchPostEndJob(this,&BlockWipedAllocatorService::postEndJob);
iAR.watchPreModule(this,&BlockWipedAllocatorService::preModule);
Expand All @@ -60,11 +59,6 @@ class BlockWipedAllocatorService {
// wipe the workspace before each event
void preEventProcessing(const edm::EventID&, const edm::Timestamp&) { wiper();}

// nope event-principal deleted in source
void preSource() {
// wiper();
}

void dump() {
if (m_silent) return;
std::cout << "ReferenceCounted stat"<< std::endl;
Expand Down
5 changes: 2 additions & 3 deletions EventFilter/Utilities/interface/MicroStateServiceClassic.h
Expand Up @@ -37,9 +37,8 @@ namespace evf{
void preEventProcessing(const edm::EventID&, const edm::Timestamp&);
void postEventProcessing(const edm::Event&, const edm::EventSetup&);

void preSource();
void postSource();

void preSourceEvent(edm::StreamID);
void postSourceEvent(edm::StreamID);

void preModule(const edm::ModuleDescription&);
void postModule(const edm::ModuleDescription&);
Expand Down
9 changes: 5 additions & 4 deletions EventFilter/Utilities/plugins/FastMonitoringService.cc
Expand Up @@ -44,8 +44,8 @@ namespace evf{
reg.watchPreProcessPath(this,&FastMonitoringService::preProcessPath);
reg.watchPreProcessEvent(this,&FastMonitoringService::preEventProcessing);
reg.watchPostProcessEvent(this,&FastMonitoringService::postEventProcessing);
reg.watchPreSource(this,&FastMonitoringService::preSource);
reg.watchPostSource(this,&FastMonitoringService::postSource);
reg.watchPreSourceEvent(this,&FastMonitoringService::preSourceEvent);
reg.watchPostSourceEvent(this,&FastMonitoringService::postSourceEvent);

reg.watchPreModule(this,&FastMonitoringService::preModule);
reg.watchPostModule(this,&FastMonitoringService::postModule);
Expand Down Expand Up @@ -246,13 +246,14 @@ namespace evf{
fmt_.m_data.processedJ_.value()++;
fmt_.monlock_.unlock();
}
void FastMonitoringService::preSource()

void FastMonitoringService::preSourceEvent(edm::StreamID)
{
// boost::mutex::scoped_lock sl(lock_);
fmt_.m_data.microstate_ = &reservedMicroStateNames[mIdle];
}

void FastMonitoringService::postSource()
void FastMonitoringService::postSourceEvent(edm::StreamID)
{
// boost::mutex::scoped_lock sl(lock_);
fmt_.m_data.microstate_ = &reservedMicroStateNames[mFwkOvh];
Expand Down
4 changes: 2 additions & 2 deletions EventFilter/Utilities/plugins/FastMonitoringService.h
Expand Up @@ -118,8 +118,8 @@ namespace evf{
void preEventProcessing(const edm::EventID&, const edm::Timestamp&);
void postEventProcessing(const edm::Event&, const edm::EventSetup&);

void preSource();
void postSource();
void preSourceEvent(edm::StreamID);
void postSourceEvent(edm::StreamID);

void preModule(const edm::ModuleDescription&);
void postModule(const edm::ModuleDescription&);
Expand Down
9 changes: 5 additions & 4 deletions EventFilter/Utilities/src/MicroStateServiceClassic.cc
Expand Up @@ -15,8 +15,8 @@ namespace evf{

reg.watchPreProcessEvent(this,&MicroStateServiceClassic::preEventProcessing);
reg.watchPostProcessEvent(this,&MicroStateServiceClassic::postEventProcessing);
reg.watchPreSource(this,&MicroStateServiceClassic::preSource);
reg.watchPostSource(this,&MicroStateServiceClassic::postSource);
reg.watchPreSourceEvent(this,&MicroStateServiceClassic::preSourceEvent);
reg.watchPostSourceEvent(this,&MicroStateServiceClassic::postSourceEvent);

reg.watchPreModule(this,&MicroStateServiceClassic::preModule);
reg.watchPostModule(this,&MicroStateServiceClassic::postModule);
Expand Down Expand Up @@ -53,13 +53,14 @@ namespace evf{
boost::mutex::scoped_lock sl(lock_);
microstate2_ = &input;
}
void MicroStateServiceClassic::preSource()

void MicroStateServiceClassic::preSourceEvent(edm::StreamID)
{
boost::mutex::scoped_lock sl(lock_);
microstate2_ = &input;
}

void MicroStateServiceClassic::postSource()
void MicroStateServiceClassic::postSourceEvent(edm::StreamID)
{
boost::mutex::scoped_lock sl(lock_);
microstate2_ = &fwkovh;
Expand Down
16 changes: 11 additions & 5 deletions FWCore/Framework/interface/InputSource.h
Expand Up @@ -109,10 +109,10 @@ namespace edm {
ItemType nextItemType();

/// Read next event
void readEvent(EventPrincipal& ep, StreamContext *);
void readEvent(EventPrincipal& ep, StreamContext &);

/// Read a specific event
bool readEvent(EventPrincipal& ep, EventID const&, StreamContext *);
bool readEvent(EventPrincipal& ep, EventID const&, StreamContext &);

/// Read next luminosity block Auxilary
boost::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary();
Expand Down Expand Up @@ -279,9 +279,15 @@ namespace edm {

class EventSourceSentry {
public:
explicit EventSourceSentry(InputSource const& source);
EventSourceSentry(InputSource const& source, StreamContext & sc);
~EventSourceSentry();

EventSourceSentry(EventSourceSentry const&) = delete; // Disallow copying and moving
EventSourceSentry& operator=(EventSourceSentry const&) = delete; // Disallow copying and moving

private:
SourceSentry sentry_;
InputSource const& source_;
StreamContext & sc_;
};

class LumiSourceSentry {
Expand Down Expand Up @@ -386,7 +392,7 @@ namespace edm {
virtual void readRun_(RunPrincipal& runPrincipal);
virtual void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal);
virtual void readEvent_(EventPrincipal& eventPrincipal) = 0;
virtual bool readIt(EventID const&, EventPrincipal& eventPrincipal);
virtual bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext);
virtual std::unique_ptr<FileBlock> readFile_();
virtual void closeFile_() {}
virtual bool goToEvent_(EventID const& eventID);
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -1806,7 +1806,7 @@ namespace edm {
//TODO this will have to become per stream
auto& event = principalCache_.eventPrincipal(iStreamIndex);
StreamContext streamContext(event.streamID(), &processContext_);
input_->readEvent(event, &streamContext);
input_->readEvent(event, streamContext);
FDEBUG(1) << "\treadEvent\n";
}
void EventProcessor::processEvent(unsigned int iStreamIndex) {
Expand Down
39 changes: 27 additions & 12 deletions FWCore/Framework/src/InputSource.cc
Expand Up @@ -19,6 +19,7 @@
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Utilities/interface/do_nothing_deleter.h"
Expand Down Expand Up @@ -329,13 +330,17 @@ namespace edm {
}

void
InputSource::readEvent(EventPrincipal& ep, StreamContext* streamContext) {
InputSource::readEvent(EventPrincipal& ep, StreamContext& streamContext) {
assert(state_ == IsEvent);
assert(!eventLimitReached());
{
// block scope, in order to issue the PostSourceEvent signal before calling postRead and issueReports
EventSourceSentry sentry(*this, streamContext);

callWithTryCatchAndPrint<void>( [this,&ep](){ readEvent_(ep); }, "Calling InputSource::readEvent_" );
if(receiver_) {
--numberOfEventsBeforeBigSkip_;
callWithTryCatchAndPrint<void>( [this,&ep](){ readEvent_(ep); }, "Calling InputSource::readEvent_" );
if(receiver_) {
--numberOfEventsBeforeBigSkip_;
}
}

Event event(ep, moduleDescription(), nullptr);
Expand All @@ -347,14 +352,17 @@ namespace edm {
}

bool
InputSource::readEvent(EventPrincipal& ep, EventID const& eventID, StreamContext* streamContext) {
InputSource::readEvent(EventPrincipal& ep, EventID const& eventID, StreamContext& streamContext) {
bool result = false;

if(!limitReached()) {
//result = callWithTryCatchAndPrint<bool>( [this,ep,&eventID](){ return readIt(eventID, ep); }, "Calling InputSource::readIt" );
result = readIt(eventID, ep);
if(result) {
if (not limitReached()) {
// the Pre/PostSourceEvent signals should be generated only if the event is actually found.
// this should be taken care of by an EventSourceSentry in the implementaion of readIt()

//result = callWithTryCatchAndPrint<bool>( [this,&eventID,&ep](){ return readIt(eventID, ep); }, "Calling InputSource::readIt" );
result = readIt(eventID, ep, streamContext);

if (result) {
Event event(ep, moduleDescription(), nullptr);
postRead(event);
if(remainingEvents_ > 0) --remainingEvents_;
Expand Down Expand Up @@ -409,7 +417,7 @@ namespace edm {
}

bool
InputSource::readIt(EventID const&, EventPrincipal&) {
InputSource::readIt(EventID const&, EventPrincipal&, StreamContext&) {
throw Exception(errors::LogicError)
<< "InputSource::readIt()\n"
<< "Random access is not implemented for this type of Input Source\n"
Expand Down Expand Up @@ -608,8 +616,15 @@ namespace edm {
post_();
}

InputSource::EventSourceSentry::EventSourceSentry(InputSource const& source) :
sentry_(source.actReg()->preSourceSignal_, source.actReg()->postSourceSignal_) {
InputSource::EventSourceSentry::EventSourceSentry(InputSource const& source, StreamContext & sc) :
source_(source),
sc_(sc)
{
source.actReg()->preSourceSignal_(sc_.streamID());
}

InputSource::EventSourceSentry::~EventSourceSentry() {
source_.actReg()->postSourceSignal_(sc_.streamID());
}

InputSource::LumiSourceSentry::LumiSourceSentry(InputSource const& source) :
Expand Down
1 change: 0 additions & 1 deletion FWCore/Integration/test/ThrowingSource.cc
Expand Up @@ -130,7 +130,6 @@ namespace edm {
ThrowingSource::readEvent_(EventPrincipal& eventPrincipal) {
if (whenToThrow_ == kReadEvent) throw cms::Exception("TestThrow") << "ThrowingSource::readEvent_";
assert(eventCached() || processingMode() != RunsLumisAndEvents);
EventSourceSentry sentry(*this);
EventAuxiliary aux(eventID(), processGUID(), Timestamp(presentTime()), false, EventAuxiliary::Undefined);
eventPrincipal.fillEventPrincipal(aux, processHistoryRegistry());
}
Expand Down
10 changes: 6 additions & 4 deletions FWCore/MessageService/interface/MessageLogger.h
Expand Up @@ -64,12 +64,14 @@ class MessageLogger {
void postEndJob();
void jobFailure();

void preSource ();
void postSource ();
void preSourceEvent ( StreamID );
void postSourceEvent ( StreamID );
void preSourceRunLumi ();
void postSourceRunLumi ();

void preFile ( std::string const &, bool );
void preFile ( std::string const&, bool );
void preFileClose ( std::string const&, bool );
void postFile ( std::string const &, bool );
void postFile ( std::string const&, bool );

void preModuleConstruction ( ModuleDescription const & );
void postModuleConstruction( ModuleDescription const & );
Expand Down
25 changes: 13 additions & 12 deletions FWCore/MessageService/src/MessageLogger.cc
Expand Up @@ -267,13 +267,13 @@ namespace edm {
iRegistry.watchPreModuleEvent(this,&MessageLogger::preModuleEvent);
iRegistry.watchPostModuleEvent(this,&MessageLogger::postModuleEvent);

iRegistry.watchPreSource(this,&MessageLogger::preSource);
iRegistry.watchPostSource(this,&MessageLogger::postSource);
iRegistry.watchPreSourceEvent(this,&MessageLogger::preSourceEvent);
iRegistry.watchPostSourceEvent(this,&MessageLogger::postSourceEvent);
// change log 14:
iRegistry.watchPreSourceRun(this,&MessageLogger::preSource);
iRegistry.watchPostSourceRun(this,&MessageLogger::postSource);
iRegistry.watchPreSourceLumi(this,&MessageLogger::preSource);
iRegistry.watchPostSourceLumi(this,&MessageLogger::postSource);
iRegistry.watchPreSourceRun(this,&MessageLogger::preSourceRunLumi);
iRegistry.watchPostSourceRun(this,&MessageLogger::postSourceRunLumi);
iRegistry.watchPreSourceLumi(this,&MessageLogger::preSourceRunLumi);
iRegistry.watchPostSourceLumi(this,&MessageLogger::postSourceRunLumi);
iRegistry.watchPreOpenFile(this,&MessageLogger::preFile);
iRegistry.watchPostOpenFile(this,&MessageLogger::postFile);
iRegistry.watchPreCloseFile(this,&MessageLogger::preFileClose);
Expand Down Expand Up @@ -692,12 +692,13 @@ namespace edm {
MessageDrop::instance()->setSinglet("AfterBeginJob"); // Change Log 17
}

void
MessageLogger::preSource()
{
establish("source");
}
void MessageLogger::postSource()
void MessageLogger::preSourceEvent(StreamID)
{ establish("source"); }
void MessageLogger::postSourceEvent(StreamID)
{ unEstablish("AfterSource"); }
void MessageLogger::preSourceRunLumi()
{ establish("source"); }
void MessageLogger::postSourceRunLumi()
{ unEstablish("AfterSource"); }

void MessageLogger::preFile( std::string const &, bool )
Expand Down
17 changes: 9 additions & 8 deletions FWCore/ServiceRegistry/interface/ActivityRegistry.h
Expand Up @@ -31,6 +31,7 @@ unscheduled execution. The tests are in FWCore/Integration/test:
// system include files
//#include "boost/signal.hpp"
#include "FWCore/Utilities/interface/Signal.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "boost/bind.hpp"
#include "boost/mem_fn.hpp"
#include "boost/utility.hpp"
Expand Down Expand Up @@ -102,20 +103,20 @@ namespace edm {
AR_WATCH_USING_METHOD_0(watchJobFailure)

/// signal is emitted before the source starts creating an Event
typedef signalslot::Signal<void()> PreSource;
PreSource preSourceSignal_;
void watchPreSource(PreSource::slot_type const& iSlot) {
typedef signalslot::Signal<void(StreamID)> PreSourceEvent;
PreSourceEvent preSourceSignal_;
void watchPreSourceEvent(PreSourceEvent::slot_type const& iSlot) {
preSourceSignal_.connect(iSlot);
}
AR_WATCH_USING_METHOD_0(watchPreSource)
AR_WATCH_USING_METHOD_1(watchPreSourceEvent)

/// signal is emitted after the source starts creating an Event
typedef signalslot::Signal<void()> PostSource;
PostSource postSourceSignal_;
void watchPostSource(PostSource::slot_type const& iSlot) {
typedef signalslot::Signal<void(StreamID)> PostSourceEvent;
PostSourceEvent postSourceSignal_;
void watchPostSourceEvent(PostSourceEvent::slot_type const& iSlot) {
postSourceSignal_.connect_front(iSlot);
}
AR_WATCH_USING_METHOD_0(watchPostSource)
AR_WATCH_USING_METHOD_1(watchPostSourceEvent)

/// signal is emitted before the source starts creating a Lumi
typedef signalslot::Signal<void()> PreSourceLumi;
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Services/src/Memory.cc
Expand Up @@ -143,7 +143,7 @@ namespace edm {
if(!oncePerEventMode_) { // default, prints on increases
iReg.watchPreSourceConstruction(this, &SimpleMemoryCheck::preSourceConstruction);
iReg.watchPostSourceConstruction(this, &SimpleMemoryCheck::postSourceConstruction);
iReg.watchPostSource(this, &SimpleMemoryCheck::postSource);
iReg.watchPostSourceEvent(this, &SimpleMemoryCheck::postSourceEvent);
iReg.watchPostModuleConstruction(this, &SimpleMemoryCheck::postModuleConstruction);
iReg.watchPostModuleBeginJob(this, &SimpleMemoryCheck::postModuleBeginJob);
iReg.watchPostEvent(this, &SimpleMemoryCheck::postEvent);
Expand Down Expand Up @@ -267,7 +267,7 @@ namespace edm {
}
}

void SimpleMemoryCheck::postSource() {
void SimpleMemoryCheck::postSourceEvent(StreamID sid) {
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Services/src/Memory.h
Expand Up @@ -62,7 +62,7 @@ namespace edm {

void preSourceConstruction(const ModuleDescription&);
void postSourceConstruction(const ModuleDescription&);
void postSource();
void postSourceEvent(StreamID);

void postBeginJob();

Expand Down

0 comments on commit 4953851

Please sign in to comment.