Skip to content

Commit

Permalink
Merge pull request #34899 from Dr15Jones/omtWorkerThread
Browse files Browse the repository at this point in the history
Run G4 workers on their own dedicated threads
  • Loading branch information
cmsbuild committed Aug 19, 2021
2 parents 5da88e0 + bb5bf39 commit 6827de0
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 71 deletions.
9 changes: 6 additions & 3 deletions SimG4Core/Application/interface/RunManagerMTWorker.h
Expand Up @@ -86,7 +86,8 @@ class RunManagerMTWorker {

void DumpMagneticField(const G4Field*, const std::string&) const;

static void resetTLS();
void resetTLS();
int getThreadIndex() const { return m_thread_index; }

Generator m_generator;
edm::EDGetTokenT<edm::HepMCProduct> m_InToken;
Expand All @@ -111,12 +112,14 @@ class RunManagerMTWorker {
edm::ParameterSet m_p;

struct TLSData;
static thread_local TLSData* m_tls;
static thread_local bool dumpMF;
TLSData* m_tls{nullptr};
bool dumpMF{false};

G4SimEvent* m_simEvent;
std::unique_ptr<CMSSteppingVerbose> m_sVerbose;
std::unordered_map<std::string, std::unique_ptr<SensitiveDetectorMakerBase>> m_sdMakers;

const int m_thread_index{-1};
};

#endif
98 changes: 98 additions & 0 deletions SimG4Core/Application/interface/ThreadHandoff.h
@@ -0,0 +1,98 @@
#ifndef SimG4Core_Application_ThreadHandoff_h
#define SimG4Core_Application_ThreadHandoff_h
// -*- C++ -*-
//
// Package: SimG4Core/Application
// Class : ThreadHandoff
//
/**\class ThreadHandoff ThreadHandoff.h "SimG4Core/Application/interface/ThreadHandoff.h"
Description: [one line class summary]
Usage:
<usage>
*/
//
// Original Author: Christopher Jones
// Created: Mon, 16 Aug 2021 13:51:53 GMT
//

// system include files
#include <condition_variable>
#include <cstring> //strerror_r
#include <exception>
#include <mutex>
#include <pthread.h>

// user include files

// forward declarations

namespace omt {
class ThreadHandoff {
public:
explicit ThreadHandoff(int stackSize);
~ThreadHandoff();

ThreadHandoff(const ThreadHandoff&) = delete; // stop default
const ThreadHandoff& operator=(const ThreadHandoff&) = delete; // stop default

template <typename F>
void runAndWait(F&& iF) {
Functor<F> f{std::move(iF)};

std::unique_lock<std::mutex> lck(m_mutex);
m_loopReady = false;
m_toRun = &f;

m_threadHandoff.notify_one();

m_threadHandoff.wait(lck, [this]() { return m_loopReady; });
auto e = f.exception();
if (e) {
std::rethrow_exception(e);
}
}

void stopThread() {
runAndWait([this]() { m_stopThread = true; });
}

private:
class FunctorBase {
public:
virtual ~FunctorBase() {}
virtual void execute() = 0;
};
template <typename F>
class Functor : public FunctorBase {
public:
explicit Functor(F&& iF) : m_f(std::move(iF)) {}
void execute() final {
try {
m_f();
} catch (...) {
m_except = std::current_exception();
}
}
std::exception_ptr exception() { return m_except; }

private:
F m_f;
std::exception_ptr m_except;
};

static void* threadLoop(void* iArgs);

// ---------- member data --------------------------------
pthread_t m_thread;
std::mutex m_mutex;
std::condition_variable m_threadHandoff;

FunctorBase* m_toRun{nullptr};
bool m_loopReady{false};
bool m_stopThread{false};
};
} // namespace omt
#endif
47 changes: 40 additions & 7 deletions SimG4Core/Application/plugins/OscarMTProducer.cc
Expand Up @@ -14,6 +14,7 @@

#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/Utilities/interface/RandomNumberGenerator.h"
#include "FWCore/Utilities/interface/Exception.h"

Expand All @@ -34,6 +35,8 @@
#include "SimDataFormats/TrackingHit/interface/PSimHitContainer.h"
#include "SimDataFormats/CaloHit/interface/PCaloHitContainer.h"

#include "SimG4Core/Application/interface/ThreadHandoff.h"

#include "Randomize.hh"

// for some reason void doesn't compile
Expand All @@ -56,6 +59,7 @@ class OscarMTProducer : public edm::stream::EDProducer<edm::GlobalCache<OscarMTM
void produce(edm::Event& e, const edm::EventSetup& c) override;

private:
omt::ThreadHandoff m_handoff;
std::unique_ptr<RunManagerMTWorker> m_runManagerWorker;
const OscarMTMasterThread* m_masterThread = nullptr;
};
Expand Down Expand Up @@ -87,17 +91,25 @@ namespace {
explicit StaticRandomEngineSetUnset(CLHEP::HepRandomEngine* engine);
~StaticRandomEngineSetUnset();

CLHEP::HepRandomEngine* currentEngine() { return m_currentEngine; }

private:
CLHEP::HepRandomEngine* m_currentEngine;
CLHEP::HepRandomEngine* m_previousEngine;
};
} // namespace

OscarMTProducer::OscarMTProducer(edm::ParameterSet const& p, const OscarMTMasterThread* ms) {
OscarMTProducer::OscarMTProducer(edm::ParameterSet const& p, const OscarMTMasterThread* ms)
: m_handoff{p.getUntrackedParameter<int>("workerThreadStackSize", 10 * 1024 * 1024)} {
// Random number generation not allowed here
StaticRandomEngineSetUnset random(nullptr);

m_runManagerWorker = std::make_unique<RunManagerMTWorker>(p, consumesCollector());
auto token = edm::ServiceRegistry::instance().presentToken();
m_handoff.runAndWait([this, &p, token]() {
edm::ServiceRegistry::Operate guard{token};
StaticRandomEngineSetUnset random(nullptr);
m_runManagerWorker = std::make_unique<RunManagerMTWorker>(p, consumesCollector());
});
m_masterThread = ms;
m_masterThread->callConsumes(consumesCollector());

Expand Down Expand Up @@ -165,7 +177,13 @@ OscarMTProducer::OscarMTProducer(edm::ParameterSet const& p, const OscarMTMaster
edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer is constructed";
}

OscarMTProducer::~OscarMTProducer() {}
OscarMTProducer::~OscarMTProducer() {
auto token = edm::ServiceRegistry::instance().presentToken();
m_handoff.runAndWait([this, token]() {
edm::ServiceRegistry::Operate guard{token};
m_runManagerWorker.reset();
});
}

std::unique_ptr<OscarMTMasterThread> OscarMTProducer::initializeGlobalCache(const edm::ParameterSet& iConfig) {
// Random number generation not allowed here
Expand Down Expand Up @@ -198,21 +216,31 @@ void OscarMTProducer::globalEndJob(OscarMTMasterThread* masterThread) {

void OscarMTProducer::beginRun(const edm::Run&, const edm::EventSetup& es) {
edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::beginRun";
m_runManagerWorker->beginRun(es);
m_runManagerWorker->initializeG4(m_masterThread->runManagerMasterPtr(), es);
auto token = edm::ServiceRegistry::instance().presentToken();
m_handoff.runAndWait([this, &es, token]() {
edm::ServiceRegistry::Operate guard{token};
m_runManagerWorker->beginRun(es);
m_runManagerWorker->initializeG4(m_masterThread->runManagerMasterPtr(), es);
});
edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::beginRun done";
}

void OscarMTProducer::endRun(const edm::Run&, const edm::EventSetup&) {
// Random number generation not allowed here
StaticRandomEngineSetUnset random(nullptr);
edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::endRun";
m_runManagerWorker->endRun();
auto token = edm::ServiceRegistry::instance().presentToken();
m_handoff.runAndWait([this, token]() {
StaticRandomEngineSetUnset random(nullptr);
edm::ServiceRegistry::Operate guard{token};
m_runManagerWorker->endRun();
});
edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::endRun done";
}

void OscarMTProducer::produce(edm::Event& e, const edm::EventSetup& es) {
StaticRandomEngineSetUnset random(e.streamID());
auto engine = random.currentEngine();
edm::LogVerbatim("SimG4CoreApplication") << "Produce event " << e.id() << " stream " << e.streamID();
//edm::LogVerbatim("SimG4CoreApplication") << " rand= " << G4UniformRand();

Expand All @@ -221,7 +249,12 @@ void OscarMTProducer::produce(edm::Event& e, const edm::EventSetup& es) {

std::unique_ptr<G4SimEvent> evt;
try {
evt = m_runManagerWorker->produce(e, es, globalCache()->runManagerMaster());
auto token = edm::ServiceRegistry::instance().presentToken();
m_handoff.runAndWait([this, &e, &es, &evt, token, engine]() {
edm::ServiceRegistry::Operate guard{token};
StaticRandomEngineSetUnset random(engine);
evt = m_runManagerWorker->produce(e, es, globalCache()->runManagerMaster());
});
} catch (const SimG4Exception& simg4ex) {
edm::LogWarning("SimG4CoreApplication") << "SimG4Exception caght! " << simg4ex.what();

Expand Down
70 changes: 9 additions & 61 deletions SimG4Core/Application/src/RunManagerMTWorker.cc
Expand Up @@ -64,18 +64,13 @@
#include "tbb/task_arena.h"

static std::once_flag applyOnce;
thread_local bool RunManagerMTWorker::dumpMF = false;

// from https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3302/2.html
namespace {
std::atomic<int> thread_counter{0};

int get_new_thread_index() { return thread_counter++; }

thread_local int s_thread_index = get_new_thread_index();

int getThreadIndex() { return s_thread_index; }

void createWatchers(const edm::ParameterSet& iP,
SimActivityRegistry* iReg,
std::vector<std::shared_ptr<SimWatcher>>& oWatchers,
Expand All @@ -102,10 +97,6 @@ namespace {
}
}
}

std::atomic<int> active_tlsdata{0};
std::atomic<bool> tls_shutdown_timeout{false};
std::atomic<int> n_tls_shutdown_task{0};
} // namespace

struct RunManagerMTWorker::TLSData {
Expand All @@ -127,17 +118,17 @@ struct RunManagerMTWorker::TLSData {
bool threadInitialized = false;
bool runTerminated = false;

TLSData() { ++active_tlsdata; }
TLSData() {}

~TLSData() { --active_tlsdata; }
~TLSData() {}
};

//This can not be a smart pointer since we must delete some of the members
// before leaving main() else we get a segmentation fault caused by accessing
// other 'singletons' after those singletons have been deleted. Instead we
// atempt to delete all TLS at RunManagerMTWorker destructor. If that fails for
// some reason, it is better to leak than cause a crash.
thread_local RunManagerMTWorker::TLSData* RunManagerMTWorker::m_tls{nullptr};
//thread_local RunManagerMTWorker::TLSData* RunManagerMTWorker::m_tls{nullptr};

RunManagerMTWorker::RunManagerMTWorker(const edm::ParameterSet& iConfig, edm::ConsumesCollector&& iC)
: m_generator(iConfig.getParameter<edm::ParameterSet>("Generator")),
Expand All @@ -158,7 +149,8 @@ RunManagerMTWorker::RunManagerMTWorker(const edm::ParameterSet& iConfig, edm::Co
m_pCustomUIsession(iConfig.getUntrackedParameter<edm::ParameterSet>("CustomUIsession")),
m_p(iConfig),
m_simEvent(nullptr),
m_sVerbose(nullptr) {
m_sVerbose(nullptr),
m_thread_index{get_new_thread_index()} {
std::vector<std::string> onlySDs = iConfig.getParameter<std::vector<std::string>>("OnlySDs");
m_sdMakers = sim::sensitiveDetectorMakers(m_p, iC, onlySDs);
std::vector<edm::ParameterSet> watchers = iConfig.getParameter<std::vector<edm::ParameterSet>>("Watchers");
Expand All @@ -180,47 +172,9 @@ RunManagerMTWorker::RunManagerMTWorker(const edm::ParameterSet& iConfig, edm::Co
edm::LogVerbatim("SimG4CoreApplication") << "SD[" << k << "] " << itr->first;
}

RunManagerMTWorker::~RunManagerMTWorker() {
++n_tls_shutdown_task;
resetTLS();

{
//make sure all tasks are done before continuing
timespec s;
s.tv_sec = 0;
s.tv_nsec = 10000;
while (n_tls_shutdown_task != 0) {
nanosleep(&s, nullptr);
}
}
}

void RunManagerMTWorker::resetTLS() {
m_tls = nullptr;
RunManagerMTWorker::~RunManagerMTWorker() { resetTLS(); }

if (active_tlsdata != 0 and not tls_shutdown_timeout) {
++n_tls_shutdown_task;
//need to run tasks on each thread which has set the tls
{
tbb::task_arena arena(tbb::task_arena::attach{});
arena.enqueue([]() { RunManagerMTWorker::resetTLS(); });
}
timespec s;
s.tv_sec = 0;
s.tv_nsec = 10000;
//we do not want this thread to be used for a new task since it
// has already cleared its structures. In order to fill all TBB
// threads we wait for all TLSes to clear
int count = 0;
while (active_tlsdata.load() != 0 and ++count < 1000) {
nanosleep(&s, nullptr);
}
if (count >= 1000) {
tls_shutdown_timeout = true;
}
}
--n_tls_shutdown_task;
}
void RunManagerMTWorker::resetTLS() { m_tls = nullptr; }

void RunManagerMTWorker::beginRun(edm::EventSetup const& es) {
for (auto& maker : m_sdMakers) {
Expand Down Expand Up @@ -323,7 +277,7 @@ void RunManagerMTWorker::initializeG4(RunManagerMT* runManagerMaster, const edm:

std::string fieldFile = m_p.getUntrackedParameter<std::string>("FileNameField", "");
if (!fieldFile.empty()) {
std::call_once(applyOnce, []() { dumpMF = true; });
std::call_once(applyOnce, [this]() { dumpMF = true; });
if (dumpMF) {
edm::LogVerbatim("SimG4CoreApplication") << "RunManagerMTWorker: Dump magnetic field to file " << fieldFile;
DumpMagneticField(tM->GetFieldManager()->GetDetectorField(), fieldFile);
Expand Down Expand Up @@ -494,13 +448,7 @@ std::unique_ptr<G4SimEvent> RunManagerMTWorker::produce(const edm::Event& inpevt
// We have to do the per-thread initialization, and per-thread
// per-run initialization here by ourselves.

if (nullptr == m_tls || !m_tls->threadInitialized) {
edm::LogVerbatim("SimG4CoreApplication")
<< "RunManagerMTWorker::produce(): stream " << inpevt.streamID() << " thread " << getThreadIndex()
<< " Geant4 initialisation for this thread";
initializeG4(&runManagerMaster, es);
m_tls->threadInitialized = true;
}
assert(m_tls != nullptr and m_tls->threadInitialized);
// Initialize run
if (inpevt.id().run() != m_tls->currentRunNumber) {
edm::LogVerbatim("SimG4CoreApplication")
Expand Down

0 comments on commit 6827de0

Please sign in to comment.