Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce use of Worker::doWork #32607

Merged
merged 3 commits into from Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion FWCore/Framework/src/StreamSchedule.cc
Expand Up @@ -696,7 +696,10 @@ namespace edm {
ParentContext parentContext(&streamContext_);
using Traits = OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>;

results_inserter_->doWork<Traits>(info, streamID_, parentContext, &streamContext_);
auto expt = results_inserter_->runModuleDirectly<Traits>(info, streamID_, parentContext, &streamContext_);
if (expt) {
std::rethrow_exception(expt);
}
} catch (cms::Exception& ex) {
if (not iExcept) {
if (ex.context().empty()) {
Expand Down
70 changes: 42 additions & 28 deletions FWCore/Framework/test/global_filter_t.cppunit.cc
Expand Up @@ -10,6 +10,7 @@
#include <vector>
#include <map>
#include <functional>
#include "tbb/global_control.h"
#include "FWCore/Framework/interface/global/EDFilter.h"
#include "FWCore/Framework/src/WorkerT.h"
#include "FWCore/Framework/src/ModuleHolder.h"
Expand All @@ -29,6 +30,25 @@

#include "cppunit/extensions/HelperMacros.h"

namespace {
struct ShadowStreamID {
constexpr ShadowStreamID() : value(0) {}
unsigned int value;
};

union IDUnion {
IDUnion() : m_shadow() {}
ShadowStreamID m_shadow;
edm::StreamID m_id;
};
} // namespace
static edm::StreamID makeID() {
IDUnion u;
assert(u.m_id.value() == 0);
return u.m_id;
}
static const edm::StreamID s_streamID0 = makeID();

class testGlobalFilter : public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(testGlobalFilter);

Expand Down Expand Up @@ -104,6 +124,18 @@ class testGlobalFilter : public CppUnit::TestFixture {
template <typename T>
void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);

template <typename Traits, typename Info>
void doWork(edm::Worker* iBase, Info const& info, edm::ParentContext const& iContext) {
auto task = edm::make_empty_waiting_task();
task->increment_ref_count();
iBase->doWorkAsync<Traits>(
edm::WaitingTaskHolder(task.get()), info, edm::ServiceToken(), s_streamID0, iContext, nullptr);
task->wait_for_all();
if (auto e = task->exceptionPtr()) {
std::rethrow_exception(*e);
}
}

class BasicProd : public edm::global::EDFilter<> {
public:
mutable unsigned int m_count = 0; //[[cms-thread-safe]]
Expand Down Expand Up @@ -328,25 +360,6 @@ class testGlobalFilter : public CppUnit::TestFixture {
};
};

namespace {
struct ShadowStreamID {
constexpr ShadowStreamID() : value(0) {}
unsigned int value;
};

union IDUnion {
IDUnion() : m_shadow() {}
ShadowStreamID m_shadow;
edm::StreamID m_id;
};
} // namespace
static edm::StreamID makeID() {
IDUnion u;
assert(u.m_id.value() == 0);
return u.m_id;
}
static const edm::StreamID s_streamID0 = makeID();

///registration of the test so that the runner can find it
CPPUNIT_TEST_SUITE_REGISTRATION(testGlobalFilter);

Expand Down Expand Up @@ -386,26 +399,26 @@ testGlobalFilter::testGlobalFilter()
typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
edm::ParentContext nullParentContext;
edm::RunTransitionInfo info(*m_rp, *m_es);
iBase->doWork<Traits>(info, s_streamID0, nullParentContext, nullptr);
doWork<Traits>(iBase, info, nullParentContext);
};
m_transToFunc[Trans::kStreamBeginRun] = [this](edm::Worker* iBase) {
typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionStreamBegin> Traits;
edm::ParentContext nullParentContext;
edm::RunTransitionInfo info(*m_rp, *m_es);
iBase->doWork<Traits>(info, s_streamID0, nullParentContext, nullptr);
doWork<Traits>(iBase, info, nullParentContext);
};

m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase) {
typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
edm::ParentContext nullParentContext;
edm::LumiTransitionInfo info(*m_lbp, *m_es);
iBase->doWork<Traits>(info, s_streamID0, nullParentContext, nullptr);
doWork<Traits>(iBase, info, nullParentContext);
};
m_transToFunc[Trans::kStreamBeginLuminosityBlock] = [this](edm::Worker* iBase) {
typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionStreamBegin> Traits;
edm::ParentContext nullParentContext;
edm::LumiTransitionInfo info(*m_lbp, *m_es);
iBase->doWork<Traits>(info, s_streamID0, nullParentContext, nullptr);
doWork<Traits>(iBase, info, nullParentContext);
};

m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase) {
Expand All @@ -414,33 +427,33 @@ testGlobalFilter::testGlobalFilter()
edm::ParentContext nullParentContext(&streamContext);
iBase->setActivityRegistry(m_actReg);
edm::EventTransitionInfo info(*m_ep, *m_es);
iBase->doWork<Traits>(info, s_streamID0, nullParentContext, nullptr);
doWork<Traits>(iBase, info, nullParentContext);
};

m_transToFunc[Trans::kStreamEndLuminosityBlock] = [this](edm::Worker* iBase) {
typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionStreamEnd> Traits;
edm::ParentContext nullParentContext;
edm::LumiTransitionInfo info(*m_lbp, *m_es);
iBase->doWork<Traits>(info, s_streamID0, nullParentContext, nullptr);
doWork<Traits>(iBase, info, nullParentContext);
};
m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase) {
typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
edm::ParentContext nullParentContext;
edm::LumiTransitionInfo info(*m_lbp, *m_es);
iBase->doWork<Traits>(info, s_streamID0, nullParentContext, nullptr);
doWork<Traits>(iBase, info, nullParentContext);
};

m_transToFunc[Trans::kStreamEndRun] = [this](edm::Worker* iBase) {
typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionStreamEnd> Traits;
edm::ParentContext nullParentContext;
edm::RunTransitionInfo info(*m_rp, *m_es);
iBase->doWork<Traits>(info, s_streamID0, nullParentContext, nullptr);
doWork<Traits>(iBase, info, nullParentContext);
};
m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase) {
typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
edm::ParentContext nullParentContext;
edm::RunTransitionInfo info(*m_rp, *m_es);
iBase->doWork<Traits>(info, s_streamID0, nullParentContext, nullptr);
doWork<Traits>(iBase, info, nullParentContext);
};

m_transToFunc[Trans::kEndStream] = [](edm::Worker* iBase) {
Expand Down Expand Up @@ -471,6 +484,7 @@ namespace {

template <typename T>
void testGlobalFilter::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
tbb::global_control control(tbb::global_control::max_allowed_parallelism, 1);
edm::maker::ModuleHolderT<edm::global::EDFilterBase> h(iMod, nullptr);
h.preallocate(edm::PreallocationConfiguration{});

Expand Down
24 changes: 19 additions & 5 deletions FWCore/Framework/test/global_outputmodule_t.cppunit.cc
Expand Up @@ -8,6 +8,7 @@
#include <vector>
#include <map>
#include <functional>
#include "tbb/global_control.h"
#include "FWCore/Framework/interface/global/OutputModule.h"
#include "FWCore/Framework/src/OutputModuleCommunicatorT.h"
#include "FWCore/Framework/src/TransitionInfoTypes.h"
Expand Down Expand Up @@ -93,6 +94,17 @@ class testGlobalOutputModule : public CppUnit::TestFixture {
template <typename T>
void testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect);

template <typename Traits, typename Info>
void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) {
auto task = edm::make_empty_waiting_task();
task->increment_ref_count();
iBase->doWorkAsync<Traits>(edm::WaitingTaskHolder(task.get()), info, edm::ServiceToken(), id, iContext, nullptr);
task->wait_for_all();
if (auto e = task->exceptionPtr()) {
std::rethrow_exception(*e);
}
}

class BasicOutputModule : public edm::global::OutputModule<> {
public:
using edm::global::OutputModuleBase::doPreallocate;
Expand Down Expand Up @@ -181,14 +193,14 @@ testGlobalOutputModule::testGlobalOutputModule()
typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalBegin> Traits;
edm::ParentContext parentContext;
edm::RunTransitionInfo info(*m_rp, *m_es);
iBase->doWork<Traits>(info, edm::StreamID::invalidStreamID(), parentContext, nullptr);
doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
};

m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalBegin> Traits;
edm::ParentContext parentContext;
edm::LumiTransitionInfo info(*m_lbp, *m_es);
iBase->doWork<Traits>(info, edm::StreamID::invalidStreamID(), parentContext, nullptr);
doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
};

m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator*) {
Expand All @@ -197,14 +209,14 @@ testGlobalOutputModule::testGlobalOutputModule()
edm::ParentContext parentContext(&streamContext);
iBase->setActivityRegistry(m_actReg);
edm::EventTransitionInfo info(*m_ep, *m_es);
iBase->doWork<Traits>(info, s_streamID0, parentContext, nullptr);
doWork<Traits>(iBase, info, s_streamID0, parentContext);
};

m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) {
typedef edm::OccurrenceTraits<edm::LuminosityBlockPrincipal, edm::BranchActionGlobalEnd> Traits;
edm::ParentContext parentContext;
edm::LumiTransitionInfo info(*m_lbp, *m_es);
iBase->doWork<Traits>(info, edm::StreamID::invalidStreamID(), parentContext, nullptr);
doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
auto t = edm::make_empty_waiting_task();
t->increment_ref_count();
iComm->writeLumiAsync(edm::WaitingTaskHolder(t.get()), *m_lbp, nullptr, &activityRegistry);
Expand All @@ -218,7 +230,7 @@ testGlobalOutputModule::testGlobalOutputModule()
typedef edm::OccurrenceTraits<edm::RunPrincipal, edm::BranchActionGlobalEnd> Traits;
edm::ParentContext parentContext;
edm::RunTransitionInfo info(*m_rp, *m_es);
iBase->doWork<Traits>(info, edm::StreamID::invalidStreamID(), parentContext, nullptr);
doWork<Traits>(iBase, info, edm::StreamID::invalidStreamID(), parentContext);
auto t = edm::make_empty_waiting_task();
t->increment_ref_count();
iComm->writeRunAsync(edm::WaitingTaskHolder(t.get()), *m_rp, nullptr, &activityRegistry, nullptr);
Expand Down Expand Up @@ -279,6 +291,8 @@ namespace {

template <typename T>
void testGlobalOutputModule::testTransitions(std::shared_ptr<T> iMod, Expectations const& iExpect) {
tbb::global_control control(tbb::global_control::max_allowed_parallelism, 1);

iMod->doPreallocate(m_preallocConfig);
edm::WorkerT<edm::global::OutputModuleBase> w{iMod, m_desc, m_params.actions_};
edm::OutputModuleCommunicatorT<edm::global::OutputModuleBase> comm(iMod.get());
Expand Down