From 23dde92ae98cc19d863514778563e38958668000 Mon Sep 17 00:00:00 2001 From: Matti Kortelainen Date: Wed, 29 Apr 2020 03:39:02 +0200 Subject: [PATCH] Add globalBeginJob() interface for stream modules using GlobalCache --- .../interface/stream/EDAnalyzerAdaptor.h | 1 + .../interface/stream/EDAnalyzerAdaptorBase.h | 2 +- .../interface/stream/ProducingModuleAdaptor.h | 1 + .../stream/ProducingModuleAdaptorBase.h | 2 +- .../interface/stream/callAbilities.h | 14 +++++++ .../src/stream/EDAnalyzerAdaptorBase.cc | 1 - .../src/stream/ProducingModuleAdaptorBase.cc | 3 -- .../Framework/test/stream_filter_t.cppunit.cc | 38 ++++++++++++++++++- .../test/stream_producer_t.cppunit.cc | 35 ++++++++++++++++- .../test/stubs/TestStreamAnalyzers.cc | 7 ++++ .../Framework/test/stubs/TestStreamFilters.cc | 7 ++++ .../test/stubs/TestStreamProducers.cc | 7 ++++ .../Framework/test/test_stream_modules_cfg.py | 6 +-- 13 files changed, 113 insertions(+), 11 deletions(-) diff --git a/FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h b/FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h index 2e290e2830958..f44938f34e0bd 100644 --- a/FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h +++ b/FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h @@ -89,6 +89,7 @@ namespace edm { m_lumiSummaries.resize(iNLumis); } + void doBeginJob() final { MyGlobal::beginJob(m_global.get()); } void doEndJob() final { MyGlobal::endJob(m_global.get()); } void setupRun(EDAnalyzerBase* iProd, RunIndex iIndex) final { MyGlobalRun::set(iProd, m_runs[iIndex].get()); } void streamEndRunSummary(EDAnalyzerBase* iProd, edm::Run const& iRun, edm::EventSetup const& iES) final { diff --git a/FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h b/FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h index e3cab1dd9a63c..a49ca54678d91 100644 --- a/FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h +++ b/FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h @@ -127,7 +127,7 @@ namespace edm { Principal const& iPrincipal) const {} virtual void setupStreamModules() = 0; - void doBeginJob(); + virtual void doBeginJob() = 0; virtual void doEndJob() = 0; void doBeginStream(StreamID id); diff --git a/FWCore/Framework/interface/stream/ProducingModuleAdaptor.h b/FWCore/Framework/interface/stream/ProducingModuleAdaptor.h index 6037280a06a65..adc80a4ba725a 100644 --- a/FWCore/Framework/interface/stream/ProducingModuleAdaptor.h +++ b/FWCore/Framework/interface/stream/ProducingModuleAdaptor.h @@ -88,6 +88,7 @@ namespace edm { m_lumis.resize(iNLumis); m_lumiSummaries.resize(iNLumis); } + void doBeginJob() final { MyGlobal::beginJob(m_global.get()); } void doEndJob() final { MyGlobal::endJob(m_global.get()); } void setupRun(M* iProd, RunIndex iIndex) final { MyGlobalRun::set(iProd, m_runs[iIndex].get()); } void streamEndRunSummary(M* iProd, edm::Run const& iRun, edm::EventSetup const& iES) final { diff --git a/FWCore/Framework/interface/stream/ProducingModuleAdaptorBase.h b/FWCore/Framework/interface/stream/ProducingModuleAdaptorBase.h index f0ca39f0839eb..a595c3a08816f 100644 --- a/FWCore/Framework/interface/stream/ProducingModuleAdaptorBase.h +++ b/FWCore/Framework/interface/stream/ProducingModuleAdaptorBase.h @@ -140,7 +140,7 @@ namespace edm { void doPreallocate(PreallocationConfiguration const&); virtual void preallocLumis(unsigned int) {} virtual void setupStreamModules() = 0; - void doBeginJob(); + virtual void doBeginJob() = 0; virtual void doEndJob() = 0; void doBeginStream(StreamID id); diff --git a/FWCore/Framework/interface/stream/callAbilities.h b/FWCore/Framework/interface/stream/callAbilities.h index 9bacaa66cddaa..ed5c3c5b5806c 100644 --- a/FWCore/Framework/interface/stream/callAbilities.h +++ b/FWCore/Framework/interface/stream/callAbilities.h @@ -20,6 +20,7 @@ // system include files #include +#include // user include files #include "FWCore/Framework/interface/stream/dummy_helpers.h" @@ -32,17 +33,30 @@ namespace edm { //******************************** // CallGlobal //******************************** + namespace callGlobalDetail { + template > + struct has_globalBeginJob : std::false_type {}; + + template + struct has_globalBeginJob> : std::true_type {}; + } // namespace callGlobalDetail template struct CallGlobalImpl { template static void set(B* iProd, typename T::GlobalCache const* iCache) { static_cast(iProd)->setGlobalCache(iCache); } + static void beginJob(typename T::GlobalCache* iCache) { + if constexpr (callGlobalDetail::has_globalBeginJob::value) { + T::globalBeginJob(iCache); + } + } static void endJob(typename T::GlobalCache* iCache) { T::globalEndJob(iCache); } }; template struct CallGlobalImpl { static void set(void* iProd, void const* iCache) {} + static void beginJob(void* iCache) {} static void endJob(void* iCache) {} }; diff --git a/FWCore/Framework/src/stream/EDAnalyzerAdaptorBase.cc b/FWCore/Framework/src/stream/EDAnalyzerAdaptorBase.cc index a1f8f75f17b58..f42b6fa3211a9 100644 --- a/FWCore/Framework/src/stream/EDAnalyzerAdaptorBase.cc +++ b/FWCore/Framework/src/stream/EDAnalyzerAdaptorBase.cc @@ -143,7 +143,6 @@ bool EDAnalyzerAdaptorBase::doEvent(EventPrincipal const& ep, mod->analyze(e, c); return true; } -void EDAnalyzerAdaptorBase::doBeginJob() {} void EDAnalyzerAdaptorBase::doBeginStream(StreamID id) { m_streamModules[id]->beginStream(id); } void EDAnalyzerAdaptorBase::doEndStream(StreamID id) { m_streamModules[id]->endStream(); } diff --git a/FWCore/Framework/src/stream/ProducingModuleAdaptorBase.cc b/FWCore/Framework/src/stream/ProducingModuleAdaptorBase.cc index d3aeb5a96eb2e..52111d0922cff 100644 --- a/FWCore/Framework/src/stream/ProducingModuleAdaptorBase.cc +++ b/FWCore/Framework/src/stream/ProducingModuleAdaptorBase.cc @@ -154,9 +154,6 @@ namespace edm { return m_streamModules[0]->indiciesForPutProducts(iBranchType); } - template - void ProducingModuleAdaptorBase::doBeginJob() {} - template void ProducingModuleAdaptorBase::doBeginStream(StreamID id) { m_streamModules[id]->beginStream(id); diff --git a/FWCore/Framework/test/stream_filter_t.cppunit.cc b/FWCore/Framework/test/stream_filter_t.cppunit.cc index c83831971d376..2bf25d78f3051 100644 --- a/FWCore/Framework/test/stream_filter_t.cppunit.cc +++ b/FWCore/Framework/test/stream_filter_t.cppunit.cc @@ -134,6 +134,36 @@ class testStreamFilter : public CppUnit::TestFixture { ++m_count; } }; + class GlobalProdWithBeginJob : public edm::stream::EDFilter> { + public: + static unsigned int m_count; + + static std::unique_ptr initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique(1); } + GlobalProdWithBeginJob(edm::ParameterSet const&, const int* iGlobal) { CPPUNIT_ASSERT(*iGlobal == 1); } + + static void globalBeginJob(int* iGlobal) { + CPPUNIT_ASSERT(1 == *iGlobal); + *iGlobal = 2; + ++m_count; + } + + void beginStream(edm::StreamID) { + int* iGlobal = const_cast(globalCache()); + CPPUNIT_ASSERT(2 == *iGlobal); + *iGlobal = 3; + ++m_count; + } + + bool filter(edm::Event&, edm::EventSetup const&) override { + ++m_count; + return true; + } + + static void globalEndJob(int* iGlobal) { + CPPUNIT_ASSERT(3 == *iGlobal); + ++m_count; + } + }; class RunProd : public edm::stream::EDFilter> { public: static unsigned int m_count; @@ -356,6 +386,7 @@ class testStreamFilter : public CppUnit::TestFixture { }; unsigned int testStreamFilter::BasicProd::m_count = 0; unsigned int testStreamFilter::GlobalProd::m_count = 0; +unsigned int testStreamFilter::GlobalProdWithBeginJob::m_count = 0; unsigned int testStreamFilter::RunProd::m_count = 0; unsigned int testStreamFilter::LumiProd::m_count = 0; unsigned int testStreamFilter::RunSummaryProd::m_count = 0; @@ -422,6 +453,7 @@ testStreamFilter::testStreamFilter() m_actReg.reset(new edm::ActivityRegistry); //For each transition, bind a lambda which will call the proper method of the Worker + m_transToFunc[Trans::kBeginJob] = [](edm::Worker* iBase) { iBase->beginJob(); }; m_transToFunc[Trans::kBeginStream] = [](edm::Worker* iBase) { edm::StreamContext streamContext(s_streamID0, nullptr); iBase->beginStream(s_streamID0, streamContext); @@ -483,6 +515,7 @@ testStreamFilter::testStreamFilter() edm::StreamContext streamContext(s_streamID0, nullptr); iBase->endStream(s_streamID0, streamContext); }; + m_transToFunc[Trans::kEndJob] = [](edm::Worker* iBase) { iBase->endJob(); }; } namespace { @@ -529,7 +562,10 @@ void testStreamFilter::runTest(Expectations const& iExpect) { void testStreamFilter::basicTest() { runTest({Trans::kEvent}); } -void testStreamFilter::globalTest() { runTest({Trans::kBeginJob, Trans::kEvent, Trans::kEndJob}); } +void testStreamFilter::globalTest() { + runTest({Trans::kEvent, Trans::kEndJob}); + runTest({Trans::kBeginJob, Trans::kBeginStream, Trans::kEvent, Trans::kEndJob}); +} void testStreamFilter::runTest() { runTest({Trans::kGlobalBeginRun, Trans::kEvent, Trans::kGlobalEndRun}); } diff --git a/FWCore/Framework/test/stream_producer_t.cppunit.cc b/FWCore/Framework/test/stream_producer_t.cppunit.cc index 044d355e3043e..bcd35f4668be9 100644 --- a/FWCore/Framework/test/stream_producer_t.cppunit.cc +++ b/FWCore/Framework/test/stream_producer_t.cppunit.cc @@ -128,6 +128,33 @@ class testStreamProducer : public CppUnit::TestFixture { ++m_count; } }; + class GlobalProdWithBeginJob : public edm::stream::EDProducer> { + public: + static unsigned int m_count; + + static std::unique_ptr initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique(1); } + GlobalProdWithBeginJob(edm::ParameterSet const&, const int* iGlobal) { CPPUNIT_ASSERT(*iGlobal == 1); } + + static void globalBeginJob(int* iGlobal) { + CPPUNIT_ASSERT(1 == *iGlobal); + *iGlobal = 2; + ++m_count; + } + + void beginStream(edm::StreamID) { + int* iGlobal = const_cast(globalCache()); + CPPUNIT_ASSERT(2 == *iGlobal); + *iGlobal = 3; + ++m_count; + } + + void produce(edm::Event&, edm::EventSetup const&) override { ++m_count; } + + static void globalEndJob(int* iGlobal) { + CPPUNIT_ASSERT(3 == *iGlobal); + ++m_count; + } + }; class RunProd : public edm::stream::EDProducer> { public: static unsigned int m_count; @@ -320,6 +347,7 @@ class testStreamProducer : public CppUnit::TestFixture { }; unsigned int testStreamProducer::BasicProd::m_count = 0; unsigned int testStreamProducer::GlobalProd::m_count = 0; +unsigned int testStreamProducer::GlobalProdWithBeginJob::m_count = 0; unsigned int testStreamProducer::RunProd::m_count = 0; unsigned int testStreamProducer::LumiProd::m_count = 0; unsigned int testStreamProducer::RunSummaryProd::m_count = 0; @@ -386,6 +414,7 @@ testStreamProducer::testStreamProducer() m_actReg.reset(new edm::ActivityRegistry); //For each transition, bind a lambda which will call the proper method of the Worker + m_transToFunc[Trans::kBeginJob] = [](edm::Worker* iBase) { iBase->beginJob(); }; m_transToFunc[Trans::kBeginStream] = [](edm::Worker* iBase) { edm::StreamContext streamContext(s_streamID0, nullptr); iBase->beginStream(s_streamID0, streamContext); @@ -447,6 +476,7 @@ testStreamProducer::testStreamProducer() edm::StreamContext streamContext(s_streamID0, nullptr); iBase->endStream(s_streamID0, streamContext); }; + m_transToFunc[Trans::kEndJob] = [](edm::Worker* iBase) { iBase->endJob(); }; } namespace { @@ -493,7 +523,10 @@ void testStreamProducer::runTest(Expectations const& iExpect) { void testStreamProducer::basicTest() { runTest({Trans::kEvent}); } -void testStreamProducer::globalTest() { runTest({Trans::kBeginJob, Trans::kEvent, Trans::kEndJob}); } +void testStreamProducer::globalTest() { + runTest({Trans::kEvent, Trans::kEndJob}); + runTest({Trans::kBeginJob, Trans::kBeginStream, Trans::kEvent, Trans::kEndJob}); +} void testStreamProducer::runTest() { runTest({Trans::kGlobalBeginRun, Trans::kEvent, Trans::kGlobalEndRun}); } diff --git a/FWCore/Framework/test/stubs/TestStreamAnalyzers.cc b/FWCore/Framework/test/stubs/TestStreamAnalyzers.cc index 722cfa9b837d8..0e4e9c4546fe9 100644 --- a/FWCore/Framework/test/stubs/TestStreamAnalyzers.cc +++ b/FWCore/Framework/test/stubs/TestStreamAnalyzers.cc @@ -60,6 +60,13 @@ namespace edmtest { }); } + static void globalBeginJob(Cache* iGlobal) { + ++m_count; + if (iGlobal->value != 0) { + throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0"; + } + } + void analyze(edm::Event const&, edm::EventSetup const&) { ++m_count; ++((globalCache())->value); diff --git a/FWCore/Framework/test/stubs/TestStreamFilters.cc b/FWCore/Framework/test/stubs/TestStreamFilters.cc index 982fe84c0bfad..c18a2a0c42d98 100644 --- a/FWCore/Framework/test/stubs/TestStreamFilters.cc +++ b/FWCore/Framework/test/stubs/TestStreamFilters.cc @@ -56,6 +56,13 @@ namespace edmtest { produces(); } + static void globalBeginJob(Cache* iGlobal) { + ++m_count; + if (iGlobal->value != 0) { + throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0"; + } + } + bool filter(edm::Event&, edm::EventSetup const&) override { ++m_count; ++((globalCache())->value); diff --git a/FWCore/Framework/test/stubs/TestStreamProducers.cc b/FWCore/Framework/test/stubs/TestStreamProducers.cc index 4b04cb40f5448..be19a817c3640 100644 --- a/FWCore/Framework/test/stubs/TestStreamProducers.cc +++ b/FWCore/Framework/test/stubs/TestStreamProducers.cc @@ -64,6 +64,13 @@ namespace edmtest { produces(); } + static void globalBeginJob(Cache* iGlobal) { + ++m_count; + if (iGlobal->value != 0) { + throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0"; + } + } + void produce(edm::Event&, edm::EventSetup const&) override { ++m_count; ++((globalCache())->value); diff --git a/FWCore/Framework/test/test_stream_modules_cfg.py b/FWCore/Framework/test/test_stream_modules_cfg.py index f9ef4a9652c0e..5d24d8de94827 100644 --- a/FWCore/Framework/test/test_stream_modules_cfg.py +++ b/FWCore/Framework/test/test_stream_modules_cfg.py @@ -29,7 +29,7 @@ process.GlobIntProd = cms.EDProducer("edmtest::stream::GlobalIntProducer", - transitions = cms.int32(nEvt+2) + transitions = cms.int32(nEvt+3) ,cachevalue = cms.int32(nEvt) ) @@ -75,7 +75,7 @@ process.GlobIntAn = cms.EDAnalyzer("edmtest::stream::GlobalIntAnalyzer", - transitions = cms.int32(nEvt+2) + transitions = cms.int32(nEvt+3) ,cachevalue = cms.int32(nEvt) ) @@ -100,7 +100,7 @@ ) process.GlobIntFil = cms.EDFilter("edmtest::stream::GlobalIntFilter", - transitions = cms.int32(nEvt+2) + transitions = cms.int32(nEvt+3) ,cachevalue = cms.int32(nEvt) )