diff --git a/SimG4Core/Application/interface/RunManagerMTWorker.h b/SimG4Core/Application/interface/RunManagerMTWorker.h index f42895f449894..0442175117865 100644 --- a/SimG4Core/Application/interface/RunManagerMTWorker.h +++ b/SimG4Core/Application/interface/RunManagerMTWorker.h @@ -86,7 +86,8 @@ class RunManagerMTWorker { void DumpMagneticField(const G4Field*, const std::string&) const; - static void resetTLS(); + void resetTLS(); + int getThreadIndex() const { return m_thread_index; } Generator m_generator; edm::EDGetTokenT m_InToken; @@ -111,12 +112,14 @@ class RunManagerMTWorker { edm::ParameterSet m_p; struct TLSData; - static thread_local TLSData* m_tls; - static thread_local bool dumpMF; + TLSData* m_tls{nullptr}; + bool dumpMF{false}; G4SimEvent* m_simEvent; std::unique_ptr m_sVerbose; std::unordered_map> m_sdMakers; + + const int m_thread_index{-1}; }; #endif diff --git a/SimG4Core/Application/interface/ThreadHandoff.h b/SimG4Core/Application/interface/ThreadHandoff.h new file mode 100644 index 0000000000000..e54aa32f6e8ab --- /dev/null +++ b/SimG4Core/Application/interface/ThreadHandoff.h @@ -0,0 +1,98 @@ +#ifndef SimG4Core_Application_ThreadHandoff_h +#define SimG4Core_Application_ThreadHandoff_h +// -*- C++ -*- +// +// Package: SimG4Core/Application +// Class : ThreadHandoff +// +/**\class ThreadHandoff ThreadHandoff.h "SimG4Core/Application/interface/ThreadHandoff.h" + + Description: [one line class summary] + + Usage: + + +*/ +// +// Original Author: Christopher Jones +// Created: Mon, 16 Aug 2021 13:51:53 GMT +// + +// system include files +#include +#include //strerror_r +#include +#include +#include + +// user include files + +// forward declarations + +namespace omt { + class ThreadHandoff { + public: + explicit ThreadHandoff(int stackSize); + ~ThreadHandoff(); + + ThreadHandoff(const ThreadHandoff&) = delete; // stop default + const ThreadHandoff& operator=(const ThreadHandoff&) = delete; // stop default + + template + void runAndWait(F&& iF) { + Functor f{std::move(iF)}; + + std::unique_lock lck(m_mutex); + m_loopReady = false; + m_toRun = &f; + + m_threadHandoff.notify_one(); + + m_threadHandoff.wait(lck, [this]() { return m_loopReady; }); + auto e = f.exception(); + if (e) { + std::rethrow_exception(e); + } + } + + void stopThread() { + runAndWait([this]() { m_stopThread = true; }); + } + + private: + class FunctorBase { + public: + virtual ~FunctorBase() {} + virtual void execute() = 0; + }; + template + class Functor : public FunctorBase { + public: + explicit Functor(F&& iF) : m_f(std::move(iF)) {} + void execute() final { + try { + m_f(); + } catch (...) { + m_except = std::current_exception(); + } + } + std::exception_ptr exception() { return m_except; } + + private: + F m_f; + std::exception_ptr m_except; + }; + + static void* threadLoop(void* iArgs); + + // ---------- member data -------------------------------- + pthread_t m_thread; + std::mutex m_mutex; + std::condition_variable m_threadHandoff; + + FunctorBase* m_toRun{nullptr}; + bool m_loopReady{false}; + bool m_stopThread{false}; + }; +} // namespace omt +#endif diff --git a/SimG4Core/Application/plugins/OscarMTProducer.cc b/SimG4Core/Application/plugins/OscarMTProducer.cc index d03a59bfd909d..d33504f82f66e 100644 --- a/SimG4Core/Application/plugins/OscarMTProducer.cc +++ b/SimG4Core/Application/plugins/OscarMTProducer.cc @@ -14,6 +14,7 @@ #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" #include "FWCore/Utilities/interface/RandomNumberGenerator.h" #include "FWCore/Utilities/interface/Exception.h" @@ -34,6 +35,8 @@ #include "SimDataFormats/TrackingHit/interface/PSimHitContainer.h" #include "SimDataFormats/CaloHit/interface/PCaloHitContainer.h" +#include "SimG4Core/Application/interface/ThreadHandoff.h" + #include "Randomize.hh" // for some reason void doesn't compile @@ -56,6 +59,7 @@ class OscarMTProducer : public edm::stream::EDProducer m_runManagerWorker; const OscarMTMasterThread* m_masterThread = nullptr; }; @@ -87,17 +91,25 @@ namespace { explicit StaticRandomEngineSetUnset(CLHEP::HepRandomEngine* engine); ~StaticRandomEngineSetUnset(); + CLHEP::HepRandomEngine* currentEngine() { return m_currentEngine; } + private: CLHEP::HepRandomEngine* m_currentEngine; CLHEP::HepRandomEngine* m_previousEngine; }; } // namespace -OscarMTProducer::OscarMTProducer(edm::ParameterSet const& p, const OscarMTMasterThread* ms) { +OscarMTProducer::OscarMTProducer(edm::ParameterSet const& p, const OscarMTMasterThread* ms) + : m_handoff{p.getUntrackedParameter("workerThreadStackSize", 10 * 1024 * 1024)} { // Random number generation not allowed here StaticRandomEngineSetUnset random(nullptr); - m_runManagerWorker = std::make_unique(p, consumesCollector()); + auto token = edm::ServiceRegistry::instance().presentToken(); + m_handoff.runAndWait([this, &p, token]() { + edm::ServiceRegistry::Operate guard{token}; + StaticRandomEngineSetUnset random(nullptr); + m_runManagerWorker = std::make_unique(p, consumesCollector()); + }); m_masterThread = ms; m_masterThread->callConsumes(consumesCollector()); @@ -165,7 +177,13 @@ OscarMTProducer::OscarMTProducer(edm::ParameterSet const& p, const OscarMTMaster edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer is constructed"; } -OscarMTProducer::~OscarMTProducer() {} +OscarMTProducer::~OscarMTProducer() { + auto token = edm::ServiceRegistry::instance().presentToken(); + m_handoff.runAndWait([this, token]() { + edm::ServiceRegistry::Operate guard{token}; + m_runManagerWorker.reset(); + }); +} std::unique_ptr OscarMTProducer::initializeGlobalCache(const edm::ParameterSet& iConfig) { // Random number generation not allowed here @@ -198,8 +216,12 @@ void OscarMTProducer::globalEndJob(OscarMTMasterThread* masterThread) { void OscarMTProducer::beginRun(const edm::Run&, const edm::EventSetup& es) { edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::beginRun"; - m_runManagerWorker->beginRun(es); - m_runManagerWorker->initializeG4(m_masterThread->runManagerMasterPtr(), es); + auto token = edm::ServiceRegistry::instance().presentToken(); + m_handoff.runAndWait([this, &es, token]() { + edm::ServiceRegistry::Operate guard{token}; + m_runManagerWorker->beginRun(es); + m_runManagerWorker->initializeG4(m_masterThread->runManagerMasterPtr(), es); + }); edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::beginRun done"; } @@ -207,12 +229,18 @@ void OscarMTProducer::endRun(const edm::Run&, const edm::EventSetup&) { // Random number generation not allowed here StaticRandomEngineSetUnset random(nullptr); edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::endRun"; - m_runManagerWorker->endRun(); + auto token = edm::ServiceRegistry::instance().presentToken(); + m_handoff.runAndWait([this, token]() { + StaticRandomEngineSetUnset random(nullptr); + edm::ServiceRegistry::Operate guard{token}; + m_runManagerWorker->endRun(); + }); edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::endRun done"; } void OscarMTProducer::produce(edm::Event& e, const edm::EventSetup& es) { StaticRandomEngineSetUnset random(e.streamID()); + auto engine = random.currentEngine(); edm::LogVerbatim("SimG4CoreApplication") << "Produce event " << e.id() << " stream " << e.streamID(); //edm::LogVerbatim("SimG4CoreApplication") << " rand= " << G4UniformRand(); @@ -221,7 +249,12 @@ void OscarMTProducer::produce(edm::Event& e, const edm::EventSetup& es) { std::unique_ptr evt; try { - evt = m_runManagerWorker->produce(e, es, globalCache()->runManagerMaster()); + auto token = edm::ServiceRegistry::instance().presentToken(); + m_handoff.runAndWait([this, &e, &es, &evt, token, engine]() { + edm::ServiceRegistry::Operate guard{token}; + StaticRandomEngineSetUnset random(engine); + evt = m_runManagerWorker->produce(e, es, globalCache()->runManagerMaster()); + }); } catch (const SimG4Exception& simg4ex) { edm::LogWarning("SimG4CoreApplication") << "SimG4Exception caght! " << simg4ex.what(); diff --git a/SimG4Core/Application/src/RunManagerMTWorker.cc b/SimG4Core/Application/src/RunManagerMTWorker.cc index 6e240cae85086..0f557b23a6cee 100644 --- a/SimG4Core/Application/src/RunManagerMTWorker.cc +++ b/SimG4Core/Application/src/RunManagerMTWorker.cc @@ -64,7 +64,6 @@ #include "tbb/task_arena.h" static std::once_flag applyOnce; -thread_local bool RunManagerMTWorker::dumpMF = false; // from https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3302/2.html namespace { @@ -72,10 +71,6 @@ namespace { int get_new_thread_index() { return thread_counter++; } - thread_local int s_thread_index = get_new_thread_index(); - - int getThreadIndex() { return s_thread_index; } - void createWatchers(const edm::ParameterSet& iP, SimActivityRegistry* iReg, std::vector>& oWatchers, @@ -102,10 +97,6 @@ namespace { } } } - - std::atomic active_tlsdata{0}; - std::atomic tls_shutdown_timeout{false}; - std::atomic n_tls_shutdown_task{0}; } // namespace struct RunManagerMTWorker::TLSData { @@ -127,9 +118,9 @@ struct RunManagerMTWorker::TLSData { bool threadInitialized = false; bool runTerminated = false; - TLSData() { ++active_tlsdata; } + TLSData() {} - ~TLSData() { --active_tlsdata; } + ~TLSData() {} }; //This can not be a smart pointer since we must delete some of the members @@ -137,7 +128,7 @@ struct RunManagerMTWorker::TLSData { // other 'singletons' after those singletons have been deleted. Instead we // atempt to delete all TLS at RunManagerMTWorker destructor. If that fails for // some reason, it is better to leak than cause a crash. -thread_local RunManagerMTWorker::TLSData* RunManagerMTWorker::m_tls{nullptr}; +//thread_local RunManagerMTWorker::TLSData* RunManagerMTWorker::m_tls{nullptr}; RunManagerMTWorker::RunManagerMTWorker(const edm::ParameterSet& iConfig, edm::ConsumesCollector&& iC) : m_generator(iConfig.getParameter("Generator")), @@ -158,7 +149,8 @@ RunManagerMTWorker::RunManagerMTWorker(const edm::ParameterSet& iConfig, edm::Co m_pCustomUIsession(iConfig.getUntrackedParameter("CustomUIsession")), m_p(iConfig), m_simEvent(nullptr), - m_sVerbose(nullptr) { + m_sVerbose(nullptr), + m_thread_index{get_new_thread_index()} { std::vector onlySDs = iConfig.getParameter>("OnlySDs"); m_sdMakers = sim::sensitiveDetectorMakers(m_p, iC, onlySDs); std::vector watchers = iConfig.getParameter>("Watchers"); @@ -180,47 +172,9 @@ RunManagerMTWorker::RunManagerMTWorker(const edm::ParameterSet& iConfig, edm::Co edm::LogVerbatim("SimG4CoreApplication") << "SD[" << k << "] " << itr->first; } -RunManagerMTWorker::~RunManagerMTWorker() { - ++n_tls_shutdown_task; - resetTLS(); - - { - //make sure all tasks are done before continuing - timespec s; - s.tv_sec = 0; - s.tv_nsec = 10000; - while (n_tls_shutdown_task != 0) { - nanosleep(&s, nullptr); - } - } -} - -void RunManagerMTWorker::resetTLS() { - m_tls = nullptr; +RunManagerMTWorker::~RunManagerMTWorker() { resetTLS(); } - if (active_tlsdata != 0 and not tls_shutdown_timeout) { - ++n_tls_shutdown_task; - //need to run tasks on each thread which has set the tls - { - tbb::task_arena arena(tbb::task_arena::attach{}); - arena.enqueue([]() { RunManagerMTWorker::resetTLS(); }); - } - timespec s; - s.tv_sec = 0; - s.tv_nsec = 10000; - //we do not want this thread to be used for a new task since it - // has already cleared its structures. In order to fill all TBB - // threads we wait for all TLSes to clear - int count = 0; - while (active_tlsdata.load() != 0 and ++count < 1000) { - nanosleep(&s, nullptr); - } - if (count >= 1000) { - tls_shutdown_timeout = true; - } - } - --n_tls_shutdown_task; -} +void RunManagerMTWorker::resetTLS() { m_tls = nullptr; } void RunManagerMTWorker::beginRun(edm::EventSetup const& es) { for (auto& maker : m_sdMakers) { @@ -323,7 +277,7 @@ void RunManagerMTWorker::initializeG4(RunManagerMT* runManagerMaster, const edm: std::string fieldFile = m_p.getUntrackedParameter("FileNameField", ""); if (!fieldFile.empty()) { - std::call_once(applyOnce, []() { dumpMF = true; }); + std::call_once(applyOnce, [this]() { dumpMF = true; }); if (dumpMF) { edm::LogVerbatim("SimG4CoreApplication") << "RunManagerMTWorker: Dump magnetic field to file " << fieldFile; DumpMagneticField(tM->GetFieldManager()->GetDetectorField(), fieldFile); @@ -494,13 +448,7 @@ std::unique_ptr RunManagerMTWorker::produce(const edm::Event& inpevt // We have to do the per-thread initialization, and per-thread // per-run initialization here by ourselves. - if (nullptr == m_tls || !m_tls->threadInitialized) { - edm::LogVerbatim("SimG4CoreApplication") - << "RunManagerMTWorker::produce(): stream " << inpevt.streamID() << " thread " << getThreadIndex() - << " Geant4 initialisation for this thread"; - initializeG4(&runManagerMaster, es); - m_tls->threadInitialized = true; - } + assert(m_tls != nullptr and m_tls->threadInitialized); // Initialize run if (inpevt.id().run() != m_tls->currentRunNumber) { edm::LogVerbatim("SimG4CoreApplication") diff --git a/SimG4Core/Application/src/ThreadHandoff.cc b/SimG4Core/Application/src/ThreadHandoff.cc new file mode 100644 index 0000000000000..fda990f21f02b --- /dev/null +++ b/SimG4Core/Application/src/ThreadHandoff.cc @@ -0,0 +1,116 @@ +// -*- C++ -*- +// +// Package: SimG4Core/Application +// Class : ThreadHandoff +// +// Implementation: +// [Notes on implementation] +// +// Original Author: Christopher Jones +// Created: Mon, 16 Aug 2021 14:37:29 GMT +// + +// system include files + +// user include files +#include "SimG4Core/Application/interface/ThreadHandoff.h" +#include "FWCore/Utilities/interface/Exception.h" + +// +// constants, enums and typedefs +// + +namespace { + std::string errorMessage(int erno) { + std::array buffer; + strerror_r(erno, &buffer[0], buffer.size()); + buffer.back() = '\0'; + return std::string(&buffer[0]); + } +} // namespace +// +// static data member definitions +// + +// +// constructors and destructor +// +using namespace omt; + +ThreadHandoff::ThreadHandoff(int stackSize) { + pthread_attr_t attr; + int erno; + if (0 != (erno = pthread_attr_init(&attr))) { + throw cms::Exception("ThreadInitFailed") + << "Failed to initialize thread attributes (" << erno << ") " << errorMessage(erno); + } + + if (0 != (erno = pthread_attr_setstacksize(&attr, stackSize))) { + throw cms::Exception("ThreadStackSizeFailed") + << "Failed to set stack size " << stackSize << " " << errorMessage(erno); + } + std::unique_lock lk(m_mutex); + + erno = pthread_create(&m_thread, &attr, threadLoop, this); + if (0 != erno) { + throw cms::Exception("ThreadCreateFailed") << " failed to create a pthread (" << erno << ") " << errorMessage(erno); + } + m_loopReady = false; + m_threadHandoff.wait(lk, [this]() { return m_loopReady; }); +} + +// ThreadHandoff::ThreadHandoff(const ThreadHandoff& rhs) +// { +// // do actual copying here; +// } + +ThreadHandoff::~ThreadHandoff() { + if (not m_stopThread) { + stopThread(); + } + void* ret; + pthread_join(m_thread, &ret); +} + +// +// assignment operators +// +// const ThreadHandoff& ThreadHandoff::operator=(const ThreadHandoff& rhs) +// { +// //An exception safe implementation is +// ThreadHandoff temp(rhs); +// swap(rhs); +// +// return *this; +// } + +// +// member functions +// + +// +// const member functions +// + +// +// static member functions +// +void* ThreadHandoff::threadLoop(void* iArgs) { + auto theThis = reinterpret_cast(iArgs); + { + std::unique_lock lck(theThis->m_mutex); + theThis->m_loopReady = true; + } + theThis->m_threadHandoff.notify_one(); + + std::unique_lock lck(theThis->m_mutex); + do { + theThis->m_toRun = nullptr; + theThis->m_threadHandoff.wait(lck, [theThis]() { return nullptr != theThis->m_toRun; }); + theThis->m_toRun->execute(); + theThis->m_loopReady = true; + theThis->m_threadHandoff.notify_one(); + } while (not theThis->m_stopThread); + theThis->m_loopReady = true; + return nullptr; +} diff --git a/SimG4Core/Application/test/test_catch2_ThreadHandoff.cc b/SimG4Core/Application/test/test_catch2_ThreadHandoff.cc new file mode 100644 index 0000000000000..5f930d811cba0 --- /dev/null +++ b/SimG4Core/Application/test/test_catch2_ThreadHandoff.cc @@ -0,0 +1,20 @@ +#include "SimG4Core/Application/interface/ThreadHandoff.h" +#include "FWCore/Utilities/interface/Exception.h" +#define CATCH_CONFIG_MAIN +#include "catch.hpp" + +using namespace omt; +TEST_CASE("Test omt::ThreadHandoff", "[ThreadHandoff]") { + SECTION("Do nothing") { ThreadHandoff th; } + SECTION("Simple") { + ThreadHandoff th; + bool value = false; + th.runAndWait([&value]() { value = true; }); + REQUIRE(value == true); + } + + SECTION("Exception") { + ThreadHandoff th; + REQUIRE_THROWS_AS(th.runAndWait([]() { throw cms::Exception("Test"); }), cms::Exception); + } +}