Skip to content

Commit

Permalink
Merge pull request #12277 from smorovic/exception-catch-fix-80X
Browse files Browse the repository at this point in the history
Fix race condition in DAQ modules for 80X (make new PR)
  • Loading branch information
cmsbuild committed Nov 12, 2015
2 parents 877f2dd + 70c8373 commit 8912b6f
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 49 deletions.
4 changes: 4 additions & 0 deletions EventFilter/Utilities/BuildFile.xml
@@ -1,6 +1,10 @@
<use name="FWCore/MessageLogger"/>
<use name="FWCore/ServiceRegistry"/>
<use name="FWCore/Framework"/>
<use name="FWCore/Sources"/>
<use name="EventFilter/FEDInterface"/>
<use name="DataFormats/FEDRawData"/>
<use name="IOPool/Streamer"/>
<use name="curl"/>
<export>
<lib name="1"/>
Expand Down
6 changes: 3 additions & 3 deletions EventFilter/Utilities/interface/FastMonitoringService.h
Expand Up @@ -45,6 +45,7 @@
which should continue to use the concrete class interface) will be defined
*/
class FedRawDataInputSource;

namespace edm {
class ConfigurationDescriptions;
Expand Down Expand Up @@ -151,7 +152,6 @@ namespace evf{
void setMicroState(MicroStateService::Microstate);
void setMicroState(edm::StreamID, MicroStateService::Microstate);

void reportEventsThisLumiInSource(unsigned int lumi,unsigned int events);
void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
void startedLookingForFile();
void stoppedLookingForFile(unsigned int lumi);
Expand All @@ -165,6 +165,7 @@ namespace evf{
return !getAbortFlagForLumi(lumi) && (processed || emptyLumisectionMode_);
}
std::string getRunDirName() const { return runDirectory_.stem().string(); }
void setInputSource(FedRawDataInputSource *inputSource) {inputSource_=inputSource;}

private:

Expand Down Expand Up @@ -206,6 +207,7 @@ namespace evf{
FastMonitoringThread fmt_;
Encoding encModule_;
std::vector<Encoding> encPath_;
FedRawDataInputSource * inputSource_ = nullptr;

unsigned int nStreams_;
unsigned int nThreads_;
Expand Down Expand Up @@ -256,8 +258,6 @@ namespace evf{

boost::filesystem::path workingDirectory_, runDirectory_;

std::map<unsigned int,unsigned int> sourceEventsReport_;

bool threadIDAvailable_ = false;

std::atomic<unsigned long> totalEventsProcessed_;
Expand Down
Expand Up @@ -50,6 +50,7 @@ friend class InputChunk;
virtual ~FedRawDataInputSource();
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

std::pair<bool,unsigned int> getEventReport(unsigned int lumi, bool erase);
protected:
virtual bool checkNextEvent() override;
virtual void read(edm::EventPrincipal& eventPrincipal) override;
Expand All @@ -75,6 +76,9 @@ friend class InputChunk;
//functions for single buffered reader
void readNextChunkIntoBuffer(InputFile *file);

//monitoring
void reportEventsThisLumiInSource(unsigned int lumi,unsigned int events);

//variables
evf::FastMonitoringService* fms_=nullptr;
evf::EvFDaqDirector* daqDirector_=nullptr;
Expand Down Expand Up @@ -166,6 +170,8 @@ friend class InputChunk;

std::atomic<bool> threadInit_;

std::map<unsigned int,unsigned int> sourceEventsReport_;
std::mutex monlock_;
};


Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/plugins/BuildFile.xml
Expand Up @@ -5,8 +5,8 @@
<use name="FWCore/Sources"/>
<use name="FWCore/Utilities"/>
<use name="IOPool/Streamer"/>
<use name="EventFilter/Utilities"/>
<use name="DataFormats/FEDRawData"/>
<use name="EventFilter/Utilities"/>
<use name="root"/>
<use name="boost"/>
<use name="CLHEP"/>
Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/src/EvFDaqDirector.cc
Expand Up @@ -10,7 +10,7 @@

#include "EventFilter/Utilities/interface/EvFDaqDirector.h"
#include "EventFilter/Utilities/interface/FastMonitoringService.h"
#include "EventFilter/Utilities/plugins/FedRawDataInputSource.h"
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
#include "EventFilter/Utilities/interface/DataPointDefinition.h"
#include "EventFilter/Utilities/interface/DataPoint.h"

Expand Down
67 changes: 28 additions & 39 deletions EventFilter/Utilities/src/FastMonitoringService.cc
Expand Up @@ -11,6 +11,7 @@
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/ServiceRegistry/interface/PathContext.h"
#include "EventFilter/Utilities/interface/EvFDaqDirector.h"
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
#include "EventFilter/Utilities/interface/FileIO.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/Utilities/interface/UnixSignalHandlers.h"
Expand Down Expand Up @@ -260,7 +261,7 @@ namespace evf{
if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
edm::LogInfo("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
edm::LogWarning("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
<< " LS:" << sc.eventID().luminosityBlock() << " " << context;
std::lock_guard<std::mutex> lock(fmt_.monlock_);
exceptionInLS_.push_back(sc.eventID().luminosityBlock());
Expand All @@ -272,7 +273,7 @@ namespace evf{
if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
edm::LogInfo("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
edm::LogWarning("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
<< gc.luminosityBlockID().luminosityBlock() << " " << context;
std::lock_guard<std::mutex> lock(fmt_.monlock_);
exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
Expand All @@ -284,7 +285,7 @@ namespace evf{
if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
edm::LogInfo("FastMonitoringService") << " SOURCE " << "earlyTermination -: " << context;
edm::LogWarning("FastMonitoringService") << " SOURCE " << "earlyTermination -: " << context;
std::lock_guard<std::mutex> lock(fmt_.monlock_);
exception_detected_=true;
}
Expand Down Expand Up @@ -400,35 +401,34 @@ namespace evf{
throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
processedEventsPerLumi_[lumi] = std::pair<unsigned int,bool>(lumiProcessedJptr->value(),false);

{
auto itr = sourceEventsReport_.find(lumi);
if (itr==sourceEventsReport_.end()) {
//check if exception has been thrown (in case of Global/Stream early termination, for this LS)
bool exception_detected = exception_detected_;
for (auto ex : exceptionInLS_)
if (lumi == ex) exception_detected=true;

if (edm::shutdown_flag || exception_detected) {
edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
<< processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
//this will prevent output modules from producing json file for possibly incomplete lumi
processedEventsPerLumi_[lumi].first=0;
processedEventsPerLumi_[lumi].second=true;
return;
}
//disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
//throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
}
else {
if (itr->second!=processedEventsPerLumi_[lumi].first) {
throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
//checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
bool exception_detected = exception_detected_;
for (auto ex : exceptionInLS_)
if (lumi == ex) exception_detected=true;

if (edm::shutdown_flag || exception_detected) {
edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
<< processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
//this will prevent output modules from producing json file for possibly incomplete lumi
processedEventsPerLumi_[lumi].first=0;
processedEventsPerLumi_[lumi].second=true;
//disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
//throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
return;

}

if (inputSource_) {
auto sourceReport = inputSource_->getEventReport(lumi, true);
if (sourceReport.first) {
if (sourceReport.second!=processedEventsPerLumi_[lumi].first) {
throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
<< lumi
<< ", events(processed):" << processedEventsPerLumi_[lumi].first
<< " events(source):" << itr->second;
<< " events(source):" << sourceReport.second;
}
sourceEventsReport_.erase(itr);
}
}
}
edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
<< lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
<< " size = " << accuSize << " thr = " << throughput;
Expand Down Expand Up @@ -719,16 +719,5 @@ namespace evf{
fmt_.jsonMonitor_->snap(ls);
}

void FastMonitoringService::reportEventsThisLumiInSource(unsigned int lumi,unsigned int events)
{

std::lock_guard<std::mutex> lock(fmt_.monlock_);
auto itr = sourceEventsReport_.find(lumi);
if (itr!=sourceEventsReport_.end())
itr->second+=events;
else
sourceEventsReport_[lumi]=events;

}
} //end namespace evf

Expand Up @@ -31,7 +31,7 @@
#include "EventFilter/FEDInterface/interface/fed_header.h"
#include "EventFilter/FEDInterface/interface/fed_trailer.h"

#include "EventFilter/Utilities/plugins/FedRawDataInputSource.h"
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"

#include "EventFilter/Utilities/interface/FastMonitoringService.h"
#include "EventFilter/Utilities/interface/DataPointDefinition.h"
Expand Down Expand Up @@ -129,6 +129,7 @@ FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset,
daqDirector_->setDeleteTracking(&fileDeleteLock_,&filesToDelete_);
if (fms_) daqDirector_->setFMS(fms_);

fms_->setInputSource(this);
//should delete chunks when run stops
for (unsigned int i=0;i<numBuffers_;i++) {
freeChunks_.push(new InputChunk(i,eventChunkSize_));
Expand Down Expand Up @@ -238,7 +239,7 @@ bool FedRawDataInputSource::checkNextEvent()
int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
close(eor_fd);
}
if (fms_) fms_->reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
eventsThisLumi_=0;
resetLuminosityBlockAuxiliary();
edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
Expand All @@ -256,7 +257,7 @@ bool FedRawDataInputSource::checkNextEvent()
if (!getLSFromFilename_) {
//get new lumi from file header
if (event_->lumi() > currentLumiSection_) {
if (fms_) fms_->reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
eventsThisLumi_=0;
maybeOpenNewLumiSection( event_->lumi() );
}
Expand Down Expand Up @@ -368,7 +369,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
{
if (getLSFromFilename_) {
if (currentFile_->lumi_ > currentLumiSection_) {
if (fms_) fms_->reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
eventsThisLumi_=0;
maybeOpenNewLumiSection(currentFile_->lumi_);
}
Expand Down Expand Up @@ -396,7 +397,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
assert(currentFile_->nChunks_==0);
if (getLSFromFilename_)
if (currentFile_->lumi_ > currentLumiSection_) {
if (fms_) fms_->reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
eventsThisLumi_=0;
maybeOpenNewLumiSection(currentFile_->lumi_);
}
Expand Down Expand Up @@ -1255,5 +1256,32 @@ void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile *file)
}
}


void FedRawDataInputSource::reportEventsThisLumiInSource(unsigned int lumi,unsigned int events)
{

std::lock_guard<std::mutex> lock(monlock_);
auto itr = sourceEventsReport_.find(lumi);
if (itr!=sourceEventsReport_.end())
itr->second+=events;
else
sourceEventsReport_[lumi]=events;
}

std::pair<bool,unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase)
{
std::lock_guard<std::mutex> lock(monlock_);
auto itr = sourceEventsReport_.find(lumi);
if (itr!=sourceEventsReport_.end()) {
auto && ret = std::pair<bool,unsigned int>(true,itr->second);
if (erase)
sourceEventsReport_.erase(itr);
return ret;
}
else
return std::pair<bool,unsigned int>(false,0);
}


// define this class as an input source
DEFINE_FWK_INPUT_SOURCE( FedRawDataInputSource);

0 comments on commit 8912b6f

Please sign in to comment.