Skip to content

Commit

Permalink
Add globalBeginJob() interface for stream modules using GlobalCache
Browse files Browse the repository at this point in the history
  • Loading branch information
makortel committed Apr 29, 2020
1 parent b3d42e1 commit 23dde92
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 11 deletions.
1 change: 1 addition & 0 deletions FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h
Expand Up @@ -89,6 +89,7 @@ namespace edm {
m_lumiSummaries.resize(iNLumis);
}

void doBeginJob() final { MyGlobal::beginJob(m_global.get()); }
void doEndJob() final { MyGlobal::endJob(m_global.get()); }
void setupRun(EDAnalyzerBase* iProd, RunIndex iIndex) final { MyGlobalRun::set(iProd, m_runs[iIndex].get()); }
void streamEndRunSummary(EDAnalyzerBase* iProd, edm::Run const& iRun, edm::EventSetup const& iES) final {
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h
Expand Up @@ -127,7 +127,7 @@ namespace edm {
Principal const& iPrincipal) const {}

virtual void setupStreamModules() = 0;
void doBeginJob();
virtual void doBeginJob() = 0;
virtual void doEndJob() = 0;

void doBeginStream(StreamID id);
Expand Down
1 change: 1 addition & 0 deletions FWCore/Framework/interface/stream/ProducingModuleAdaptor.h
Expand Up @@ -88,6 +88,7 @@ namespace edm {
m_lumis.resize(iNLumis);
m_lumiSummaries.resize(iNLumis);
}
void doBeginJob() final { MyGlobal::beginJob(m_global.get()); }
void doEndJob() final { MyGlobal::endJob(m_global.get()); }
void setupRun(M* iProd, RunIndex iIndex) final { MyGlobalRun::set(iProd, m_runs[iIndex].get()); }
void streamEndRunSummary(M* iProd, edm::Run const& iRun, edm::EventSetup const& iES) final {
Expand Down
Expand Up @@ -140,7 +140,7 @@ namespace edm {
void doPreallocate(PreallocationConfiguration const&);
virtual void preallocLumis(unsigned int) {}
virtual void setupStreamModules() = 0;
void doBeginJob();
virtual void doBeginJob() = 0;
virtual void doEndJob() = 0;

void doBeginStream(StreamID id);
Expand Down
14 changes: 14 additions & 0 deletions FWCore/Framework/interface/stream/callAbilities.h
Expand Up @@ -20,6 +20,7 @@

// system include files
#include <memory>
#include <type_traits>
// user include files
#include "FWCore/Framework/interface/stream/dummy_helpers.h"

Expand All @@ -32,17 +33,30 @@ namespace edm {
//********************************
// CallGlobal
//********************************
namespace callGlobalDetail {
template <typename, typename = std::void_t<>>
struct has_globalBeginJob : std::false_type {};

template <typename T>
struct has_globalBeginJob<T, std::void_t<decltype(T::globalBeginJob(nullptr))>> : std::true_type {};
} // namespace callGlobalDetail
template <typename T, bool>
struct CallGlobalImpl {
template <typename B>
static void set(B* iProd, typename T::GlobalCache const* iCache) {
static_cast<T*>(iProd)->setGlobalCache(iCache);
}
static void beginJob(typename T::GlobalCache* iCache) {
if constexpr (callGlobalDetail::has_globalBeginJob<T>::value) {
T::globalBeginJob(iCache);
}
}
static void endJob(typename T::GlobalCache* iCache) { T::globalEndJob(iCache); }
};
template <typename T>
struct CallGlobalImpl<T, false> {
static void set(void* iProd, void const* iCache) {}
static void beginJob(void* iCache) {}
static void endJob(void* iCache) {}
};

Expand Down
1 change: 0 additions & 1 deletion FWCore/Framework/src/stream/EDAnalyzerAdaptorBase.cc
Expand Up @@ -143,7 +143,6 @@ bool EDAnalyzerAdaptorBase::doEvent(EventPrincipal const& ep,
mod->analyze(e, c);
return true;
}
void EDAnalyzerAdaptorBase::doBeginJob() {}

void EDAnalyzerAdaptorBase::doBeginStream(StreamID id) { m_streamModules[id]->beginStream(id); }
void EDAnalyzerAdaptorBase::doEndStream(StreamID id) { m_streamModules[id]->endStream(); }
Expand Down
3 changes: 0 additions & 3 deletions FWCore/Framework/src/stream/ProducingModuleAdaptorBase.cc
Expand Up @@ -154,9 +154,6 @@ namespace edm {
return m_streamModules[0]->indiciesForPutProducts(iBranchType);
}

template <typename T>
void ProducingModuleAdaptorBase<T>::doBeginJob() {}

template <typename T>
void ProducingModuleAdaptorBase<T>::doBeginStream(StreamID id) {
m_streamModules[id]->beginStream(id);
Expand Down
38 changes: 37 additions & 1 deletion FWCore/Framework/test/stream_filter_t.cppunit.cc
Expand Up @@ -134,6 +134,36 @@ class testStreamFilter : public CppUnit::TestFixture {
++m_count;
}
};
class GlobalProdWithBeginJob : public edm::stream::EDFilter<edm::GlobalCache<int>> {
public:
static unsigned int m_count;

static std::unique_ptr<int> initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique<int>(1); }
GlobalProdWithBeginJob(edm::ParameterSet const&, const int* iGlobal) { CPPUNIT_ASSERT(*iGlobal == 1); }

static void globalBeginJob(int* iGlobal) {
CPPUNIT_ASSERT(1 == *iGlobal);
*iGlobal = 2;
++m_count;
}

void beginStream(edm::StreamID) {
int* iGlobal = const_cast<int*>(globalCache());
CPPUNIT_ASSERT(2 == *iGlobal);
*iGlobal = 3;
++m_count;
}

bool filter(edm::Event&, edm::EventSetup const&) override {
++m_count;
return true;
}

static void globalEndJob(int* iGlobal) {
CPPUNIT_ASSERT(3 == *iGlobal);
++m_count;
}
};
class RunProd : public edm::stream::EDFilter<edm::RunCache<int>> {
public:
static unsigned int m_count;
Expand Down Expand Up @@ -356,6 +386,7 @@ class testStreamFilter : public CppUnit::TestFixture {
};
unsigned int testStreamFilter::BasicProd::m_count = 0;
unsigned int testStreamFilter::GlobalProd::m_count = 0;
unsigned int testStreamFilter::GlobalProdWithBeginJob::m_count = 0;
unsigned int testStreamFilter::RunProd::m_count = 0;
unsigned int testStreamFilter::LumiProd::m_count = 0;
unsigned int testStreamFilter::RunSummaryProd::m_count = 0;
Expand Down Expand Up @@ -422,6 +453,7 @@ testStreamFilter::testStreamFilter()
m_actReg.reset(new edm::ActivityRegistry);

//For each transition, bind a lambda which will call the proper method of the Worker
m_transToFunc[Trans::kBeginJob] = [](edm::Worker* iBase) { iBase->beginJob(); };
m_transToFunc[Trans::kBeginStream] = [](edm::Worker* iBase) {
edm::StreamContext streamContext(s_streamID0, nullptr);
iBase->beginStream(s_streamID0, streamContext);
Expand Down Expand Up @@ -483,6 +515,7 @@ testStreamFilter::testStreamFilter()
edm::StreamContext streamContext(s_streamID0, nullptr);
iBase->endStream(s_streamID0, streamContext);
};
m_transToFunc[Trans::kEndJob] = [](edm::Worker* iBase) { iBase->endJob(); };
}

namespace {
Expand Down Expand Up @@ -529,7 +562,10 @@ void testStreamFilter::runTest(Expectations const& iExpect) {

void testStreamFilter::basicTest() { runTest<BasicProd>({Trans::kEvent}); }

void testStreamFilter::globalTest() { runTest<GlobalProd>({Trans::kBeginJob, Trans::kEvent, Trans::kEndJob}); }
void testStreamFilter::globalTest() {
runTest<GlobalProd>({Trans::kEvent, Trans::kEndJob});
runTest<GlobalProdWithBeginJob>({Trans::kBeginJob, Trans::kBeginStream, Trans::kEvent, Trans::kEndJob});
}

void testStreamFilter::runTest() { runTest<RunProd>({Trans::kGlobalBeginRun, Trans::kEvent, Trans::kGlobalEndRun}); }

Expand Down
35 changes: 34 additions & 1 deletion FWCore/Framework/test/stream_producer_t.cppunit.cc
Expand Up @@ -128,6 +128,33 @@ class testStreamProducer : public CppUnit::TestFixture {
++m_count;
}
};
class GlobalProdWithBeginJob : public edm::stream::EDProducer<edm::GlobalCache<int>> {
public:
static unsigned int m_count;

static std::unique_ptr<int> initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique<int>(1); }
GlobalProdWithBeginJob(edm::ParameterSet const&, const int* iGlobal) { CPPUNIT_ASSERT(*iGlobal == 1); }

static void globalBeginJob(int* iGlobal) {
CPPUNIT_ASSERT(1 == *iGlobal);
*iGlobal = 2;
++m_count;
}

void beginStream(edm::StreamID) {
int* iGlobal = const_cast<int*>(globalCache());
CPPUNIT_ASSERT(2 == *iGlobal);
*iGlobal = 3;
++m_count;
}

void produce(edm::Event&, edm::EventSetup const&) override { ++m_count; }

static void globalEndJob(int* iGlobal) {
CPPUNIT_ASSERT(3 == *iGlobal);
++m_count;
}
};
class RunProd : public edm::stream::EDProducer<edm::RunCache<int>> {
public:
static unsigned int m_count;
Expand Down Expand Up @@ -320,6 +347,7 @@ class testStreamProducer : public CppUnit::TestFixture {
};
unsigned int testStreamProducer::BasicProd::m_count = 0;
unsigned int testStreamProducer::GlobalProd::m_count = 0;
unsigned int testStreamProducer::GlobalProdWithBeginJob::m_count = 0;
unsigned int testStreamProducer::RunProd::m_count = 0;
unsigned int testStreamProducer::LumiProd::m_count = 0;
unsigned int testStreamProducer::RunSummaryProd::m_count = 0;
Expand Down Expand Up @@ -386,6 +414,7 @@ testStreamProducer::testStreamProducer()
m_actReg.reset(new edm::ActivityRegistry);

//For each transition, bind a lambda which will call the proper method of the Worker
m_transToFunc[Trans::kBeginJob] = [](edm::Worker* iBase) { iBase->beginJob(); };
m_transToFunc[Trans::kBeginStream] = [](edm::Worker* iBase) {
edm::StreamContext streamContext(s_streamID0, nullptr);
iBase->beginStream(s_streamID0, streamContext);
Expand Down Expand Up @@ -447,6 +476,7 @@ testStreamProducer::testStreamProducer()
edm::StreamContext streamContext(s_streamID0, nullptr);
iBase->endStream(s_streamID0, streamContext);
};
m_transToFunc[Trans::kEndJob] = [](edm::Worker* iBase) { iBase->endJob(); };
}

namespace {
Expand Down Expand Up @@ -493,7 +523,10 @@ void testStreamProducer::runTest(Expectations const& iExpect) {

void testStreamProducer::basicTest() { runTest<BasicProd>({Trans::kEvent}); }

void testStreamProducer::globalTest() { runTest<GlobalProd>({Trans::kBeginJob, Trans::kEvent, Trans::kEndJob}); }
void testStreamProducer::globalTest() {
runTest<GlobalProd>({Trans::kEvent, Trans::kEndJob});
runTest<GlobalProdWithBeginJob>({Trans::kBeginJob, Trans::kBeginStream, Trans::kEvent, Trans::kEndJob});
}

void testStreamProducer::runTest() { runTest<RunProd>({Trans::kGlobalBeginRun, Trans::kEvent, Trans::kGlobalEndRun}); }

Expand Down
7 changes: 7 additions & 0 deletions FWCore/Framework/test/stubs/TestStreamAnalyzers.cc
Expand Up @@ -60,6 +60,13 @@ namespace edmtest {
});
}

static void globalBeginJob(Cache* iGlobal) {
++m_count;
if (iGlobal->value != 0) {
throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
}
}

void analyze(edm::Event const&, edm::EventSetup const&) {
++m_count;
++((globalCache())->value);
Expand Down
7 changes: 7 additions & 0 deletions FWCore/Framework/test/stubs/TestStreamFilters.cc
Expand Up @@ -56,6 +56,13 @@ namespace edmtest {
produces<unsigned int>();
}

static void globalBeginJob(Cache* iGlobal) {
++m_count;
if (iGlobal->value != 0) {
throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
}
}

bool filter(edm::Event&, edm::EventSetup const&) override {
++m_count;
++((globalCache())->value);
Expand Down
7 changes: 7 additions & 0 deletions FWCore/Framework/test/stubs/TestStreamProducers.cc
Expand Up @@ -64,6 +64,13 @@ namespace edmtest {
produces<unsigned int>();
}

static void globalBeginJob(Cache* iGlobal) {
++m_count;
if (iGlobal->value != 0) {
throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
}
}

void produce(edm::Event&, edm::EventSetup const&) override {
++m_count;
++((globalCache())->value);
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/test/test_stream_modules_cfg.py
Expand Up @@ -29,7 +29,7 @@


process.GlobIntProd = cms.EDProducer("edmtest::stream::GlobalIntProducer",
transitions = cms.int32(nEvt+2)
transitions = cms.int32(nEvt+3)
,cachevalue = cms.int32(nEvt)
)

Expand Down Expand Up @@ -75,7 +75,7 @@


process.GlobIntAn = cms.EDAnalyzer("edmtest::stream::GlobalIntAnalyzer",
transitions = cms.int32(nEvt+2)
transitions = cms.int32(nEvt+3)
,cachevalue = cms.int32(nEvt)
)

Expand All @@ -100,7 +100,7 @@
)

process.GlobIntFil = cms.EDFilter("edmtest::stream::GlobalIntFilter",
transitions = cms.int32(nEvt+2)
transitions = cms.int32(nEvt+3)
,cachevalue = cms.int32(nEvt)
)

Expand Down

0 comments on commit 23dde92

Please sign in to comment.