Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in DAQ modules for 80X (make new PR) #12277

Merged
merged 5 commits into from Nov 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions EventFilter/Utilities/BuildFile.xml
@@ -1,6 +1,10 @@
<use name="FWCore/MessageLogger"/>
<use name="FWCore/ServiceRegistry"/>
<use name="FWCore/Framework"/>
<use name="FWCore/Sources"/>
<use name="EventFilter/FEDInterface"/>
<use name="DataFormats/FEDRawData"/>
<use name="IOPool/Streamer"/>
<use name="curl"/>
<export>
<lib name="1"/>
Expand Down
6 changes: 3 additions & 3 deletions EventFilter/Utilities/interface/FastMonitoringService.h
Expand Up @@ -45,6 +45,7 @@
which should continue to use the concrete class interface) will be defined

*/
class FedRawDataInputSource;

namespace edm {
class ConfigurationDescriptions;
Expand Down Expand Up @@ -151,7 +152,6 @@ namespace evf{
void setMicroState(MicroStateService::Microstate);
void setMicroState(edm::StreamID, MicroStateService::Microstate);

void reportEventsThisLumiInSource(unsigned int lumi,unsigned int events);
void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
void startedLookingForFile();
void stoppedLookingForFile(unsigned int lumi);
Expand All @@ -165,6 +165,7 @@ namespace evf{
return !getAbortFlagForLumi(lumi) && (processed || emptyLumisectionMode_);
}
std::string getRunDirName() const { return runDirectory_.stem().string(); }
void setInputSource(FedRawDataInputSource *inputSource) {inputSource_=inputSource;}

private:

Expand Down Expand Up @@ -206,6 +207,7 @@ namespace evf{
FastMonitoringThread fmt_;
Encoding encModule_;
std::vector<Encoding> encPath_;
FedRawDataInputSource * inputSource_ = nullptr;

unsigned int nStreams_;
unsigned int nThreads_;
Expand Down Expand Up @@ -256,8 +258,6 @@ namespace evf{

boost::filesystem::path workingDirectory_, runDirectory_;

std::map<unsigned int,unsigned int> sourceEventsReport_;

bool threadIDAvailable_ = false;

std::atomic<unsigned long> totalEventsProcessed_;
Expand Down
Expand Up @@ -50,6 +50,7 @@ friend class InputChunk;
virtual ~FedRawDataInputSource();
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

std::pair<bool,unsigned int> getEventReport(unsigned int lumi, bool erase);
protected:
virtual bool checkNextEvent() override;
virtual void read(edm::EventPrincipal& eventPrincipal) override;
Expand All @@ -75,6 +76,9 @@ friend class InputChunk;
//functions for single buffered reader
void readNextChunkIntoBuffer(InputFile *file);

//monitoring
void reportEventsThisLumiInSource(unsigned int lumi,unsigned int events);

//variables
evf::FastMonitoringService* fms_=nullptr;
evf::EvFDaqDirector* daqDirector_=nullptr;
Expand Down Expand Up @@ -166,6 +170,8 @@ friend class InputChunk;

std::atomic<bool> threadInit_;

std::map<unsigned int,unsigned int> sourceEventsReport_;
std::mutex monlock_;
};


Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/plugins/BuildFile.xml
Expand Up @@ -5,8 +5,8 @@
<use name="FWCore/Sources"/>
<use name="FWCore/Utilities"/>
<use name="IOPool/Streamer"/>
<use name="EventFilter/Utilities"/>
<use name="DataFormats/FEDRawData"/>
<use name="EventFilter/Utilities"/>
<use name="root"/>
<use name="boost"/>
<use name="CLHEP"/>
Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/src/EvFDaqDirector.cc
Expand Up @@ -10,7 +10,7 @@

#include "EventFilter/Utilities/interface/EvFDaqDirector.h"
#include "EventFilter/Utilities/interface/FastMonitoringService.h"
#include "EventFilter/Utilities/plugins/FedRawDataInputSource.h"
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
#include "EventFilter/Utilities/interface/DataPointDefinition.h"
#include "EventFilter/Utilities/interface/DataPoint.h"

Expand Down
67 changes: 28 additions & 39 deletions EventFilter/Utilities/src/FastMonitoringService.cc
Expand Up @@ -11,6 +11,7 @@
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/ServiceRegistry/interface/PathContext.h"
#include "EventFilter/Utilities/interface/EvFDaqDirector.h"
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
#include "EventFilter/Utilities/interface/FileIO.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/Utilities/interface/UnixSignalHandlers.h"
Expand Down Expand Up @@ -260,7 +261,7 @@ namespace evf{
if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
edm::LogInfo("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
edm::LogWarning("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
<< " LS:" << sc.eventID().luminosityBlock() << " " << context;
std::lock_guard<std::mutex> lock(fmt_.monlock_);
exceptionInLS_.push_back(sc.eventID().luminosityBlock());
Expand All @@ -272,7 +273,7 @@ namespace evf{
if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
edm::LogInfo("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
edm::LogWarning("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
<< gc.luminosityBlockID().luminosityBlock() << " " << context;
std::lock_guard<std::mutex> lock(fmt_.monlock_);
exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
Expand All @@ -284,7 +285,7 @@ namespace evf{
if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
edm::LogInfo("FastMonitoringService") << " SOURCE " << "earlyTermination -: " << context;
edm::LogWarning("FastMonitoringService") << " SOURCE " << "earlyTermination -: " << context;
std::lock_guard<std::mutex> lock(fmt_.monlock_);
exception_detected_=true;
}
Expand Down Expand Up @@ -400,35 +401,34 @@ namespace evf{
throw cms::Exception("FastMonitoringService") << "Internal error: got null pointer from FastMonitor";
processedEventsPerLumi_[lumi] = std::pair<unsigned int,bool>(lumiProcessedJptr->value(),false);

{
auto itr = sourceEventsReport_.find(lumi);
if (itr==sourceEventsReport_.end()) {
//check if exception has been thrown (in case of Global/Stream early termination, for this LS)
bool exception_detected = exception_detected_;
for (auto ex : exceptionInLS_)
if (lumi == ex) exception_detected=true;

if (edm::shutdown_flag || exception_detected) {
edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
<< processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
//this will prevent output modules from producing json file for possibly incomplete lumi
processedEventsPerLumi_[lumi].first=0;
processedEventsPerLumi_[lumi].second=true;
return;
}
//disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
//throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
}
else {
if (itr->second!=processedEventsPerLumi_[lumi].first) {
throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
//checking if exception has been thrown (in case of Global/Stream early termination, for this LS)
bool exception_detected = exception_detected_;
for (auto ex : exceptionInLS_)
if (lumi == ex) exception_detected=true;

if (edm::shutdown_flag || exception_detected) {
edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
<< processedEventsPerLumi_[lumi].first << " events were processed in LUMI " << lumi;
//this will prevent output modules from producing json file for possibly incomplete lumi
processedEventsPerLumi_[lumi].first=0;
processedEventsPerLumi_[lumi].second=true;
//disable this exception, so service can be used standalone (will be thrown if output module asks for this information)
//throw cms::Exception("FastMonitoringService") << "SOURCE did not send update for lumi block. LUMI -:" << lumi;
return;

}

if (inputSource_) {
auto sourceReport = inputSource_->getEventReport(lumi, true);
if (sourceReport.first) {
if (sourceReport.second!=processedEventsPerLumi_[lumi].first) {
throw cms::Exception("FastMonitoringService") << "MISMATCH with SOURCE update. LUMI -: "
<< lumi
<< ", events(processed):" << processedEventsPerLumi_[lumi].first
<< " events(source):" << itr->second;
<< " events(source):" << sourceReport.second;
}
sourceEventsReport_.erase(itr);
}
}
}
edm::LogInfo("FastMonitoringService") << "Statistics for lumisection -: lumi = " << lumi << " events = "
<< lumiProcessedJptr->value() << " time = " << usecondsForLumi/1000000
<< " size = " << accuSize << " thr = " << throughput;
Expand Down Expand Up @@ -719,16 +719,5 @@ namespace evf{
fmt_.jsonMonitor_->snap(ls);
}

void FastMonitoringService::reportEventsThisLumiInSource(unsigned int lumi,unsigned int events)
{

std::lock_guard<std::mutex> lock(fmt_.monlock_);
auto itr = sourceEventsReport_.find(lumi);
if (itr!=sourceEventsReport_.end())
itr->second+=events;
else
sourceEventsReport_[lumi]=events;

}
} //end namespace evf

Expand Up @@ -31,7 +31,7 @@
#include "EventFilter/FEDInterface/interface/fed_header.h"
#include "EventFilter/FEDInterface/interface/fed_trailer.h"

#include "EventFilter/Utilities/plugins/FedRawDataInputSource.h"
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"

#include "EventFilter/Utilities/interface/FastMonitoringService.h"
#include "EventFilter/Utilities/interface/DataPointDefinition.h"
Expand Down Expand Up @@ -129,6 +129,7 @@ FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset,
daqDirector_->setDeleteTracking(&fileDeleteLock_,&filesToDelete_);
if (fms_) daqDirector_->setFMS(fms_);

fms_->setInputSource(this);
//should delete chunks when run stops
for (unsigned int i=0;i<numBuffers_;i++) {
freeChunks_.push(new InputChunk(i,eventChunkSize_));
Expand Down Expand Up @@ -238,7 +239,7 @@ bool FedRawDataInputSource::checkNextEvent()
int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
close(eor_fd);
}
if (fms_) fms_->reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
eventsThisLumi_=0;
resetLuminosityBlockAuxiliary();
edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
Expand All @@ -256,7 +257,7 @@ bool FedRawDataInputSource::checkNextEvent()
if (!getLSFromFilename_) {
//get new lumi from file header
if (event_->lumi() > currentLumiSection_) {
if (fms_) fms_->reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
eventsThisLumi_=0;
maybeOpenNewLumiSection( event_->lumi() );
}
Expand Down Expand Up @@ -368,7 +369,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
{
if (getLSFromFilename_) {
if (currentFile_->lumi_ > currentLumiSection_) {
if (fms_) fms_->reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
eventsThisLumi_=0;
maybeOpenNewLumiSection(currentFile_->lumi_);
}
Expand Down Expand Up @@ -396,7 +397,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
assert(currentFile_->nChunks_==0);
if (getLSFromFilename_)
if (currentFile_->lumi_ > currentLumiSection_) {
if (fms_) fms_->reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
reportEventsThisLumiInSource(currentLumiSection_,eventsThisLumi_);
eventsThisLumi_=0;
maybeOpenNewLumiSection(currentFile_->lumi_);
}
Expand Down Expand Up @@ -1255,5 +1256,32 @@ void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile *file)
}
}


void FedRawDataInputSource::reportEventsThisLumiInSource(unsigned int lumi,unsigned int events)
{

std::lock_guard<std::mutex> lock(monlock_);
auto itr = sourceEventsReport_.find(lumi);
if (itr!=sourceEventsReport_.end())
itr->second+=events;
else
sourceEventsReport_[lumi]=events;
}

std::pair<bool,unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase)
{
std::lock_guard<std::mutex> lock(monlock_);
auto itr = sourceEventsReport_.find(lumi);
if (itr!=sourceEventsReport_.end()) {
auto && ret = std::pair<bool,unsigned int>(true,itr->second);
if (erase)
sourceEventsReport_.erase(itr);
return ret;
}
else
return std::pair<bool,unsigned int>(false,0);
}


// define this class as an input source
DEFINE_FWK_INPUT_SOURCE( FedRawDataInputSource);