Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test how to do blocking hand-off between TBB and module thread #34084

Merged
merged 14 commits into from Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion FWCore/Integration/test/BuildFile.xml
Expand Up @@ -427,7 +427,15 @@
<use name="boost"/>
<use name="clhep"/>
</library>

<library file="TestServicesOnNonFrameworkThreadsAnalyzer.cc" name = "TestServicesOnNonFrameworkThreadsAnalyzer">
<flags EDM_PLUGIN="1"/>
<use name="FWCore/Framework"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/MessageLogger"/>
<use name="clhep"/>
</library>
<test name="TestFWCoreIntegrationModuleThread" command="cmsRun ${LOCALTOP}/src/FWCore/Integration/test/moduleThread_test_cfg.py"/>

<bin file="RandomIntProducer_t.cpp">
<use name="FWCore/Framework"/>
<use name="FWCore/ParameterSet"/>
Expand Down
119 changes: 119 additions & 0 deletions FWCore/Integration/test/TestServicesOnNonFrameworkThreadsAnalyzer.cc
@@ -0,0 +1,119 @@
#include "FWCore/Framework/interface/stream/EDAnalyzer.h"
#include "FWCore/Framework/interface/MakerMacros.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/ModuleContextSentry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/Service.h"

#include "FWCore/Utilities/interface/RandomNumberGenerator.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/MessageLogger/interface/edm_MessageLogger.h"
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <iostream>
#include <exception>

#include "CLHEP/Random/RandFlat.h"

namespace edmtest {
class TestServicesOnNonFrameworkThreadsAnalyzer : public edm::stream::EDAnalyzer<> {
public:
TestServicesOnNonFrameworkThreadsAnalyzer(edm::ParameterSet const&);
~TestServicesOnNonFrameworkThreadsAnalyzer() override;

void analyze(edm::Event const&, edm::EventSetup const&) final;

private:
void runOnOtherThread();
void shutdownThread();
std::unique_ptr<std::thread> m_thread;
std::mutex m_mutex;
std::condition_variable m_condVar;

bool m_managerThreadReady = false;
bool m_continueProcessing = false;
bool m_eventWorkDone = false;

//context info
edm::ModuleCallingContext const* m_moduleCallingContext = nullptr;
edm::ServiceToken* m_serviceToken = nullptr;
edm::StreamID m_streamID;
std::exception_ptr m_except;
};

TestServicesOnNonFrameworkThreadsAnalyzer::TestServicesOnNonFrameworkThreadsAnalyzer(edm::ParameterSet const&)
: m_streamID(edm::StreamID::invalidStreamID()) {
m_thread = std::make_unique<std::thread>([this]() { this->runOnOtherThread(); });

m_mutex.lock();
m_managerThreadReady = true;
m_continueProcessing = true;
}

TestServicesOnNonFrameworkThreadsAnalyzer::~TestServicesOnNonFrameworkThreadsAnalyzer() {
if (m_thread) {
shutdownThread();
}
}

void TestServicesOnNonFrameworkThreadsAnalyzer::analyze(edm::Event const& iEvent, edm::EventSetup const&) {
m_eventWorkDone = false;
m_moduleCallingContext = iEvent.moduleCallingContext();
edm::ServiceToken token = edm::ServiceRegistry::instance().presentToken();
m_serviceToken = &token;
m_streamID = iEvent.streamID();
{ edm::LogSystem("FrameworkThread") << "new Event"; }
m_mutex.unlock();
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condVar.notify_one();
m_condVar.wait(lk, [this] { return this->m_eventWorkDone; });
lk.release();
}
edm::LogSystem("FrameworkThread") << " done";
m_managerThreadReady = true;
if (m_except) {
std::rethrow_exception(m_except);
}
}

void TestServicesOnNonFrameworkThreadsAnalyzer::runOnOtherThread() {
std::unique_lock<std::mutex> lk(m_mutex);

do {
m_condVar.wait(lk, [this] { return m_managerThreadReady; });
if (m_continueProcessing) {
edm::ModuleCallingContext newContext(*m_moduleCallingContext);
edm::ModuleContextSentry sentry(&newContext, m_moduleCallingContext->parent());

edm::ServiceRegistry::Operate srSentry(*m_serviceToken);
try {
edm::Service<edm::RandomNumberGenerator> rng;
edm::Service<edm::MessageLogger> ml;
ml->setThreadContext(*m_moduleCallingContext);
edm::LogSystem("ModuleThread") << " ++running with rng "
<< CLHEP::RandFlat::shootInt(&rng->getEngine(m_streamID), 10);
} catch (...) {
m_except = std::current_exception();
}
}
m_eventWorkDone = true;
m_managerThreadReady = false;
lk.unlock();
m_condVar.notify_one();
lk.lock();
} while (m_continueProcessing);
}

void TestServicesOnNonFrameworkThreadsAnalyzer::shutdownThread() {
m_continueProcessing = false;
m_mutex.unlock();
m_condVar.notify_one();
m_thread->join();
}

} // namespace edmtest

DEFINE_FWK_MODULE(edmtest::TestServicesOnNonFrameworkThreadsAnalyzer);
15 changes: 15 additions & 0 deletions FWCore/Integration/test/moduleThread_test_cfg.py
@@ -0,0 +1,15 @@
import FWCore.ParameterSet.Config as cms

process = cms.Process("Test")

process.source = cms.Source("EmptySource")

process.maxEvents.input = 10

process.test = cms.EDAnalyzer("edmtest::TestServicesOnNonFrameworkThreadsAnalyzer")

process.p = cms.EndPath(process.test)

process.add_(cms.Service("RandomNumberGeneratorService",
test = cms.PSet(initialSeed = cms.untracked.uint32(12345))
))
41 changes: 41 additions & 0 deletions FWCore/MessageLogger/interface/edm_MessageLogger.h
@@ -0,0 +1,41 @@
#ifndef FWCore_MessageService_MessageLogger_h
#define FWCore_MessageService_MessageLogger_h

// -*- C++ -*-
//
// Package: MessageService
// Class : MessageLogger
//
/**\class edm::MessageLogger MessageLogger.h FWCore/MessageService/plugins/MessageLogger.h

Description: Abstract interface for MessageLogger Service

Usage:
<usage>

*/
//

// system include files

// user include files

// forward declarations

namespace edm {
class ModuleCallingContext;

class MessageLogger {
public:
virtual ~MessageLogger();

virtual void setThreadContext(ModuleCallingContext const&) = 0;

protected:
MessageLogger() = default;

}; // MessageLogger

} // namespace edm

#endif // FWCore_MessageService_MessageLogger_h
10 changes: 10 additions & 0 deletions FWCore/MessageLogger/src/edm_MessageLogger.cc
@@ -0,0 +1,10 @@
//
// MessageLogger.cc
// CMSSW
//
// Created by Chris Jones on 6/10/21.
//

#include "FWCore/MessageLogger/interface/edm_MessageLogger.h"

edm::MessageLogger::~MessageLogger() = default;
2 changes: 0 additions & 2 deletions FWCore/MessageService/BuildFile.xml
@@ -1,7 +1,5 @@
<use name="DataFormats/Provenance"/>
<use name="FWCore/MessageLogger"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/ServiceRegistry"/>
<use name="FWCore/Utilities"/>
<use name="tbb"/>
<export>
Expand Down
2 changes: 2 additions & 0 deletions FWCore/MessageService/plugins/BuildFile.xml
@@ -1,4 +1,6 @@
<library file="*.cc" name="FWCoreMessageServicePlugins">
<flags EDM_PLUGIN="1"/>
<use name="FWCore/MessageService"/>
<use name="FWCore/ServiceRegistry"/>
<use name="DataFormats/Provenance"/>
</library>
Expand Up @@ -15,8 +15,8 @@

#include "FWCore/ParameterSet/interface/ParameterSet.h"

#include "FWCore/MessageService/interface/MessageLogger.h"
#include "FWCore/MessageService/src/MessageServicePSetValidation.h"
#include "MessageLogger.h"
#include "MessageServicePSetValidation.h"

#include "FWCore/MessageLogger/interface/MessageLoggerQ.h"
#include "FWCore/MessageLogger/interface/MessageDrop.h"
Expand Down Expand Up @@ -325,6 +325,29 @@ namespace edm {
nonModule_errorEnabled = messageDrop->errorEnabled;
} // ctor

void MessageLogger::setThreadContext(ModuleCallingContext const& iModContext) {
//need to know if we are in a global or stream context
auto top = iModContext.getTopModuleCallingContext();
assert(nullptr != top);
if (ParentContext::Type::kGlobal == top->type()) {
auto globalContext = iModContext.getGlobalContext();
auto tran = globalContext->transition();
if (tran == GlobalContext::Transition::kBeginLuminosityBlock or
tran == GlobalContext::Transition::kEndLuminosityBlock) {
establishModule(lumiInfoBegin_ + globalContext->luminosityBlockIndex(),
iModContext,
s_globalTransitionNames[static_cast<int>(tran)]);
} else {
establishModule(
runInfoBegin_ + globalContext->runIndex(), iModContext, s_globalTransitionNames[static_cast<int>(tran)]);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this need any specific treatment for ProcessBlock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. I didn't look to see if David extended the message logger for that case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wddgit Could you comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I will take a look at this one after lunch. There were a few MessageLogger changes I vaguely remember making in the PR, but not many. It might have actually been in the first PR...

I've been trying to finish up this documentation and some slides for next week's Core meeting, before I start addressing the PR comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like ProcessBlock doesn't work with the interface. That is because, in my opinion, the ProcessBlock transitions in the MessageLogger service are calling the wrong establishModule and unestablishModule calls of the messaging system. They should be calling the ones using the ModuleCallingContext but they are using the ModuleDescription instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment. For ProcessBlock it always a global context. There are no stream transitions. I am not sure if that is relevant here yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or was it incorrect a year ago also?

I believe it was incorrect a year ago. I would expect the ProcessBlock transitions to work like the Run or LuminosityBlock transitions, not like the constructor or beginJob transitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still looking and trying to remember why I did it this way. There is a unit test and at least for beginProcessBlock and endProcessBlock it seems to be working:

https://cmssdt.cern.ch/lxr/source/FWCore/MessageService/test/unit_test_outputs/u33_all.log
lines 7 and 93

There is no accessInputProcessBlock transition test.

At some level, I intuitively see why you would it expect it to be like Run and LuminosityBlock. Although there are a lot of similarities to beginJob and endJob also. The ProcessBlock transitions are always global, no stream transitions. The modules can run concurrently but otherwise no concurrency, I am not sure how that plays into it. Still looking...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see three differences in the establishModule/unestablishModule functions.

  1. Setting runEvent. This seems irrelevant to the ProcessBlock transitions.
  2. Setting streamID. Also irrelevant for the ProcessBlock transitions.
  3. Different actions if previousModuleOnThread is set on unestablishModule. I'm less sure, but I think that is also irrelevant for ProcessBlock transitions. Is there a way for the previousModuleOnThread to be set for ProcessBlock transitions?

Everything else in the function overloads seems the same. Unless I am missing something, the ModuleDescription overloads are correct and all is OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The establishModule gets a ModuleCallingContext which is needed if a module can get data from another module therby cause the other module to be run. The context allows one to trace that.

} else {
auto stream = iModContext.getStreamContext();
establishModule(
stream->streamID().value(), iModContext, s_streamTransitionNames[static_cast<int>(stream->transition())]);
}
}

//
// Shared helper routines for establishing module name and enabling behavior
//
Expand Down Expand Up @@ -867,22 +890,22 @@ namespace edm {
}

void MessageLogger::postEndJob() {
SummarizeInJobReport(); // Put summary info into Job Rep // change log 10
summarizeInJobReport(); // Put summary info into Job Rep // change log 10
MessageLoggerQ::MLqSUM(); // trigger summary info. // change log 9
}

void MessageLogger::jobFailure() {
MessageDrop* messageDrop = MessageDrop::instance();
messageDrop->setSinglet("jobFailure");
SummarizeInJobReport(); // Put summary info into Job Rep // change log 10
summarizeInJobReport(); // Put summary info into Job Rep // change log 10
MessageLoggerQ::MLqSUM(); // trigger summary info. // change log 9
}

//
// Other methods
//

void MessageLogger::SummarizeInJobReport() {
void MessageLogger::summarizeInJobReport() {
if (fjrSummaryRequested_) {
std::map<std::string, double>* smp = new std::map<std::string, double>();
MessageLoggerQ::MLqJRS(smp);
Expand Down
@@ -1,12 +1,12 @@
#ifndef FWCore_MessageService_MessageLogger_h
#define FWCore_MessageService_MessageLogger_h
#ifndef FWCore_MessageService_plugins_MessageLogger_h
#define FWCore_MessageService_plugins_MessageLogger_h

// -*- C++ -*-
//
// Package: Services
// Package: MessageService
// Class : MessageLogger
//
/**\class MessageLogger MessageLogger.h FWCore/MessageService/interface/MessageLogger.h
/**\class edm::service::MessageLogger MessageLogger.h FWCore/MessageService/plugins/MessageLogger.h

Description: <one line class summary>

Expand All @@ -33,7 +33,7 @@

#include "DataFormats/Provenance/interface/EventID.h"
#include "FWCore/MessageLogger/interface/ELseverityLevel.h"
#include "FWCore/MessageLogger/interface/ErrorObj.h"
#include "FWCore/MessageLogger/interface/edm_MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

Expand All @@ -44,18 +44,15 @@ namespace edm {
class ParameterSet;
namespace service {

class MessageLogger {
class MessageLogger : public edm::MessageLogger {
public:
MessageLogger(ParameterSet const&, ActivityRegistry&);

void fillErrorObj(edm::ErrorObj& obj) const;
bool debugEnabled() const { return debugEnabled_; }

static bool anyDebugEnabled() { return anyDebugEnabled_; }

static void SummarizeInJobReport();
void setThreadContext(ModuleCallingContext const&) final;

private:
static void summarizeInJobReport();

void postBeginJob();
void preEndJob();
void postEndJob();
Expand Down Expand Up @@ -193,4 +190,4 @@ namespace edm {

} // namespace edm

#endif // FWCore_MessageService_MessageLogger_h
#endif // FWCore_MessageService_plugins_MessageLogger_h
Expand Up @@ -17,7 +17,7 @@

// user include files

#include "FWCore/MessageService/src/MessageServicePSetValidation.h"
#include "FWCore/MessageService/plugins/MessageServicePSetValidation.h"
Dr15Jones marked this conversation as resolved.
Show resolved Hide resolved

using namespace edm;
using namespace edm::service;
Expand Down
7 changes: 5 additions & 2 deletions FWCore/MessageService/plugins/Module.cc
@@ -1,11 +1,14 @@
#include "FWCore/PluginManager/interface/PresenceMacros.h"
#include "FWCore/MessageService/interface/MessageLogger.h"
#include "MessageLogger.h"
#include "FWCore/MessageService/interface/SingleThreadMSPresence.h"
#include "FWCore/ServiceRegistry/interface/ServiceMaker.h"

#pragma GCC visibility push(hidden)
using edm::service::MessageLogger;
using edm::service::SingleThreadMSPresence;
DEFINE_FWK_SERVICE(MessageLogger);

using MessageLoggerMaker = edm::serviceregistry::AllArgsMaker<edm::MessageLogger, MessageLogger>;
DEFINE_FWK_SERVICE_MAKER(MessageLogger, MessageLoggerMaker);

DEFINE_FWK_PRESENCE(SingleThreadMSPresence);
#pragma GCC visibility pop
2 changes: 0 additions & 2 deletions HLTrigger/special/plugins/HLTCountNumberOfObject.h
Expand Up @@ -17,8 +17,6 @@

#include "FWCore/ParameterSet/interface/ParameterSet.h"

#include "FWCore/MessageService/interface/MessageLogger.h"

#include "HLTrigger/HLTcore/interface/HLTFilter.h"
#include "DataFormats/HLTReco/interface/TriggerFilterObjectWithRefs.h"
#include "HLTrigger/HLTcore/interface/defaultModuleLabel.h"
Expand Down
2 changes: 0 additions & 2 deletions HLTrigger/special/plugins/HLTTrackWithHits.h
Expand Up @@ -17,8 +17,6 @@

#include "FWCore/ParameterSet/interface/ParameterSet.h"

#include "FWCore/MessageService/interface/MessageLogger.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this no longer needed for e.g. LogDebug ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things like LogDebug have always come from FWCore/MessageLogger not from FWCore/MessageService. It was unfortunate that both packages had a file named MessageLogger.h in their include directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK, thanks. I guess those files are getting LogDebug via some other intermediate include.


#include "HLTrigger/HLTcore/interface/HLTFilter.h"
#include "DataFormats/HLTReco/interface/TriggerFilterObjectWithRefs.h"
#include "DataFormats/TrackReco/interface/Track.h"
Expand Down
1 change: 0 additions & 1 deletion L1Trigger/TrackFindingTMTT/plugins/TMTrackProducer.cc
Expand Up @@ -13,7 +13,6 @@
#include "L1Trigger/TrackFindingTMTT/interface/Array2D.h"
#include "L1Trigger/TrackFindingTMTT/interface/PrintL1trk.h"

#include "FWCore/MessageService/interface/MessageLogger.h"
#include "FWCore/Framework/interface/ESHandle.h"

#include <iostream>
Expand Down