Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new signal to ActivityRegistry around when a #14360

Merged
merged 2 commits into from May 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 14 additions & 2 deletions FWCore/Framework/interface/DelayedReader.h
Expand Up @@ -13,14 +13,26 @@ uses input sources to retrieve EDProducts from external storage.
#include <memory>

namespace edm {

class BranchKey;
class EDProductGetter;
class ModuleCallingContext;
class SharedResourcesAcquirer;
class StreamContext;

namespace signalslot {
template <typename T> class Signal;
}

class DelayedReader {
public:
virtual ~DelayedReader();
virtual std::unique_ptr<WrapperBase> getProduct(BranchKey const& k, EDProductGetter const* ep);

std::unique_ptr<WrapperBase> getProduct(BranchKey const& k,
EDProductGetter const* ep,
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preReadFromSourceSignal = nullptr,
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postReadFromSourceSignal = nullptr,
ModuleCallingContext const* mcc = nullptr);

void mergeReaders(DelayedReader* other) {mergeReaders_(other);}
void reset() {reset_();}

Expand Down
6 changes: 4 additions & 2 deletions FWCore/Framework/interface/EventPrincipal.h
Expand Up @@ -173,11 +173,13 @@ namespace edm {
}

using Base::getProvenance;

signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> preModuleDelayedGetSignal_;
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> postModuleDelayedGetSignal_;


signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> preReadFromSourceSignal_;
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> postReadFromSourceSignal_;

private:

BranchID pidToBid(ProductID const& pid) const;
Expand Down
22 changes: 20 additions & 2 deletions FWCore/Framework/src/DelayedReader.cc
@@ -1,5 +1,8 @@
#include "FWCore/Framework/interface/DelayedReader.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/Utilities/interface/Signal.h"

#include <mutex>
#include <cassert>
Expand All @@ -12,13 +15,28 @@
namespace edm {
DelayedReader::~DelayedReader() {}

std::unique_ptr<WrapperBase>
DelayedReader::getProduct(BranchKey const& k, EDProductGetter const* ep) {
std::unique_ptr<WrapperBase>
DelayedReader::getProduct(BranchKey const& k,
EDProductGetter const* ep,
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preReadFromSourceSignal,
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postReadFromSourceSignal,
ModuleCallingContext const* mcc) {

auto sr = sharedResources_();
std::unique_lock<SharedResourcesAcquirer> guard;
if(sr) {
guard =std::unique_lock<SharedResourcesAcquirer>(*sr);
}

if(mcc) {
preReadFromSourceSignal->emit(*(mcc->getStreamContext()),*mcc);
}
std::shared_ptr<void> guardForSignal(nullptr,[&postReadFromSourceSignal,mcc](const void*){
if(mcc) {
postReadFromSourceSignal->emit(*(mcc->getStreamContext()),*mcc);
}
});

return getProduct_(k, ep);
}

Expand Down
4 changes: 2 additions & 2 deletions FWCore/Framework/src/EventPrincipal.cc
Expand Up @@ -197,8 +197,8 @@ namespace edm {
}
});

std::unique_ptr<WrapperBase> edp(reader()->getProduct(bk, this));
std::unique_ptr<WrapperBase> edp(reader()->getProduct(bk, this, &preReadFromSourceSignal_, &postReadFromSourceSignal_, mcc));

// Now fix up the ProductResolver
phb.putProduct(std::move(edp));
}
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/src/EventProcessor.cc
Expand Up @@ -546,6 +546,8 @@ namespace edm {
thinnedAssociationsHelper(), *processConfiguration_, historyAppender_.get(), index);
ep->preModuleDelayedGetSignal_.connect(std::cref(actReg_->preModuleEventDelayedGetSignal_));
ep->postModuleDelayedGetSignal_.connect(std::cref(actReg_->postModuleEventDelayedGetSignal_));
ep->preReadFromSourceSignal_.connect(std::cref(actReg_->preEventReadFromSourceSignal_));
ep->postReadFromSourceSignal_.connect(std::cref(actReg_->postEventReadFromSourceSignal_));
principalCache_.insert(ep);
}
// initialize the subprocesses, if there are any
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/src/SubProcess.cc
Expand Up @@ -172,6 +172,8 @@ namespace edm {
false /*not primary process*/);
ep->preModuleDelayedGetSignal_.connect(std::cref(items.actReg_->preModuleEventDelayedGetSignal_));
ep->postModuleDelayedGetSignal_.connect(std::cref(items.actReg_->postModuleEventDelayedGetSignal_));
ep->preReadFromSourceSignal_.connect(std::cref(items.actReg_->preEventReadFromSourceSignal_));
ep->postReadFromSourceSignal_.connect(std::cref(items.actReg_->postEventReadFromSourceSignal_));
principalCache_.insert(ep);
}
if(hasSubProcesses) {
Expand Down
36 changes: 36 additions & 0 deletions FWCore/Integration/test/unit_test_outputs/testGetBy2.log
Expand Up @@ -311,20 +311,32 @@ PathContext: pathName = e pathID = 0 (EndPath)
ProcessContext: PROD2 7da3661f4f7dead5e42f07cf3ddf5a59

++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: processing event for module: stream = 0 label = 'intProducerU' id = 4
++++++++++ finished: processing event for module: stream = 0 label = 'intProducerU' id = 4
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: processing event for module: stream = 0 label = 'intVectorProducer' id = 5
Expand Down Expand Up @@ -421,20 +433,32 @@ PathContext: pathName = e pathID = 0 (EndPath)
ProcessContext: PROD2 7da3661f4f7dead5e42f07cf3ddf5a59

++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: processing event for module: stream = 0 label = 'intProducerU' id = 4
++++++++++ finished: processing event for module: stream = 0 label = 'intProducerU' id = 4
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: processing event for module: stream = 0 label = 'intVectorProducer' id = 5
Expand Down Expand Up @@ -531,20 +555,32 @@ PathContext: pathName = e pathID = 0 (EndPath)
ProcessContext: PROD2 7da3661f4f7dead5e42f07cf3ddf5a59

++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: processing event for module: stream = 0 label = 'intProducerU' id = 4
++++++++++ finished: processing event for module: stream = 0 label = 'intProducerU' id = 4
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++++ finished: event delayed read from source: stream = 0 label = 'out' id = 3
++++++++ finished: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++ starting: delayed processing event for module: stream = 0 label = 'out' id = 3
++++++++++ starting: processing event for module: stream = 0 label = 'intVectorProducer' id = 5
Expand Down
48 changes: 32 additions & 16 deletions FWCore/ServiceRegistry/interface/ActivityRegistry.h
Expand Up @@ -684,22 +684,38 @@ namespace edm {
}
AR_WATCH_USING_METHOD_2(watchPostModuleEvent)

/// signal is emitted after the module starts processing the Event and before a delayed get has started
typedef signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> PreModuleEventDelayedGet;
PreModuleEventDelayedGet preModuleEventDelayedGetSignal_;
void watchPreModuleEventDelayedGet(PreModuleEventDelayedGet::slot_type const& iSlot) {
preModuleEventDelayedGetSignal_.connect(iSlot);
}
AR_WATCH_USING_METHOD_2(watchPreModuleEventDelayedGet)

/// signal is emitted after the module starts processing the Event and after a delayed get has finished
typedef signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> PostModuleEventDelayedGet;
PostModuleEventDelayedGet postModuleEventDelayedGetSignal_;
void watchPostModuleEventDelayedGet(PostModuleEventDelayedGet::slot_type const& iSlot) {
postModuleEventDelayedGetSignal_.connect_front(iSlot);
}
AR_WATCH_USING_METHOD_2(watchPostModuleEventDelayedGet)

/// signal is emitted after the module starts processing the Event and before a delayed get has started
typedef signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> PreModuleEventDelayedGet;
PreModuleEventDelayedGet preModuleEventDelayedGetSignal_;
void watchPreModuleEventDelayedGet(PreModuleEventDelayedGet::slot_type const& iSlot) {
preModuleEventDelayedGetSignal_.connect(iSlot);
}
AR_WATCH_USING_METHOD_2(watchPreModuleEventDelayedGet)

/// signal is emitted after the module starts processing the Event and after a delayed get has finished
typedef signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> PostModuleEventDelayedGet;
PostModuleEventDelayedGet postModuleEventDelayedGetSignal_;
void watchPostModuleEventDelayedGet(PostModuleEventDelayedGet::slot_type const& iSlot) {
postModuleEventDelayedGetSignal_.connect_front(iSlot);
}
AR_WATCH_USING_METHOD_2(watchPostModuleEventDelayedGet)

/// signal is emitted after the module starts processing the Event, after a delayed get has started, and before a source read
typedef signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> PreEventReadFromSource;
PreEventReadFromSource preEventReadFromSourceSignal_;
void watchPreEventReadFromSource(PreEventReadFromSource::slot_type const& iSlot) {
preEventReadFromSourceSignal_.connect(iSlot);
}
AR_WATCH_USING_METHOD_2(watchPreEventReadFromSource)

/// signal is emitted after the module starts processing the Event, after a delayed get has started, and after a source read
typedef signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> PostEventReadFromSource;
PostEventReadFromSource postEventReadFromSourceSignal_;
void watchPostEventReadFromSource(PostEventReadFromSource::slot_type const& iSlot) {
postEventReadFromSourceSignal_.connect_front(iSlot);
}
AR_WATCH_USING_METHOD_2(watchPostEventReadFromSource)

typedef signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> PreModuleStreamBeginRun;
PreModuleStreamBeginRun preModuleStreamBeginRunSignal_;
void watchPreModuleStreamBeginRun(PreModuleStreamBeginRun::slot_type const& iSlot) {
Expand Down
5 changes: 5 additions & 0 deletions FWCore/ServiceRegistry/src/ActivityRegistry.cc
Expand Up @@ -214,6 +214,8 @@ namespace edm {
preModuleEventDelayedGetSignal_.connect(std::cref(iOther.preModuleEventDelayedGetSignal_));
postModuleEventDelayedGetSignal_.connect(std::cref(iOther.postModuleEventDelayedGetSignal_));

preEventReadFromSourceSignal_.connect(std::cref(iOther.preEventReadFromSourceSignal_));
postEventReadFromSourceSignal_.connect(std::cref(iOther.postEventReadFromSourceSignal_));

preModuleStreamBeginRunSignal_.connect(std::cref(iOther.preModuleStreamBeginRunSignal_));
postModuleStreamBeginRunSignal_.connect(std::cref(iOther.postModuleStreamBeginRunSignal_));
Expand Down Expand Up @@ -378,6 +380,9 @@ namespace edm {
copySlotsToFrom(preModuleEventDelayedGetSignal_, iOther.preModuleEventDelayedGetSignal_);
copySlotsToFromReverse(postModuleEventDelayedGetSignal_, iOther.postModuleEventDelayedGetSignal_);

copySlotsToFrom(preEventReadFromSourceSignal_, iOther.preEventReadFromSourceSignal_);
copySlotsToFromReverse(postEventReadFromSourceSignal_, iOther.postEventReadFromSourceSignal_);

copySlotsToFrom(preModuleStreamBeginRunSignal_, iOther.preModuleStreamBeginRunSignal_);
copySlotsToFromReverse(postModuleStreamBeginRunSignal_, iOther.postModuleStreamBeginRunSignal_);

Expand Down