Skip to content

Commit

Permalink
Merge pull request #6720 from smorovic/daq-fixes-72X
Browse files Browse the repository at this point in the history
Daq updates (backport of 73X #6718)
  • Loading branch information
cmsbuild committed Dec 3, 2014
2 parents e8d1129 + 83e319a commit 19d585a
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 27 deletions.
7 changes: 4 additions & 3 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Expand Up @@ -39,7 +39,7 @@ namespace evf{
{
public:

enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded };
enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };

explicit EvFDaqDirector( const edm::ParameterSet &pset, edm::ActivityRegistry& reg );
~EvFDaqDirector();
Expand Down Expand Up @@ -99,6 +99,7 @@ namespace evf{
void lockFULocal2();
void unlockFULocal2();
void createRunOpendirMaybe();
int readLastLSEntry(std::string const& file);
void setDeleteTracking( std::mutex* fileDeleteLock,std::list<std::pair<int,InputFile*>> *filesToDelete) {
fileDeleteLockPtr_=fileDeleteLock;
filesToDeletePtr_ = filesToDelete;
Expand All @@ -108,7 +109,7 @@ namespace evf{
private:
//bool bulock();
//bool fulock();
bool bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize);
bool bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize, int maxLS);
void openFULockfileStream(std::string& fuLockFilePath, bool create);
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
Expand Down Expand Up @@ -177,7 +178,7 @@ namespace evf{

bool readEolsDefinition_ = true;
unsigned int eolsNFilesIndex_ = 1;

std::string stopFilePath_;
};
}

Expand Down
5 changes: 5 additions & 0 deletions EventFilter/Utilities/interface/FastMonitoringService.h
Expand Up @@ -137,6 +137,9 @@ namespace evf{
void postSourceEvent(edm::StreamID);
void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
void preSourceEarlyTermination(edm::TerminationOrigin);

//this is still needed for use in special functions like DQM which are in turn framework services
void setMicroState(MicroStateService::Microstate);
Expand Down Expand Up @@ -249,6 +252,8 @@ namespace evf{
bool pathLegendWritten_ = false;

std::atomic<bool> monInit_;
bool exception_detected_ = false;
std::vector<unsigned int> exceptionInLS_;
};

}
Expand Down
39 changes: 21 additions & 18 deletions EventFilter/Utilities/plugins/FedRawDataInputSource.cc
Expand Up @@ -322,7 +322,10 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
currentFile_=nullptr;
return status;
}

else if ( status == evf::EvFDaqDirector::runAbort)
{
throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
}
else if (status == evf::EvFDaqDirector::newLumi)
{
if (getLSFromFilename_) {
Expand Down Expand Up @@ -371,10 +374,10 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
if (currentFile_->nEvents_!=currentFile_->nProcessed_)
{
throw cms::Exception("RuntimeError")
<< "Fully processed " << currentFile_->nProcessed_
<< " from the file " << currentFile_->fileName_
<< " but according to BU JSON there should be "
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< "Fully processed " << currentFile_->nProcessed_
<< " from the file " << currentFile_->fileName_
<< " but according to BU JSON there should be "
<< currentFile_->nEvents_ << " events";
}
//try to wake up supervisor thread which might be sleeping waiting for the free chunk
Expand All @@ -401,7 +404,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
//file is too short
if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < headerSize[detectedFRDversion_])
{
throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Premature end of input file while reading event header";
}
if (singleBufferMode_) {
Expand Down Expand Up @@ -429,14 +432,14 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
if ( bufferInputRead_ < headerSize[detectedFRDversion_])
{
throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Premature end of input file while reading event header";
}
}

event_.reset( new FRDEventMsgView(dataPosition) );
if (event_->size()>eventChunkSize_) {
throw cms::Exception("FedRawDataInputSource::nextEvent")
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< " event id:"<< event_->event()<< " lumi:" << event_->lumi()
<< " run:" << event_->run() << " of size:" << event_->size()
<< " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
Expand All @@ -446,7 +449,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()

if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize)
{
throw cms::Exception("FedRawDataInputSource::nextEvent") <<
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Premature end of input file while reading event data";
}
if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
Expand Down Expand Up @@ -478,7 +481,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()

event_.reset( new FRDEventMsgView(dataPosition) );
if (event_->size()>eventChunkSize_) {
throw cms::Exception("FedRawDataInputSource::nextEvent")
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< " event id:"<< event_->event()<< " lumi:" << event_->lumi()
<< " run:" << event_->run() << " of size:" << event_->size()
<< " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
Expand All @@ -488,7 +491,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()

if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize)
{
throw cms::Exception("FedRawDataInputSource::nextEvent") <<
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Premature end of input file while reading event data";
}

Expand Down Expand Up @@ -524,7 +527,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());

if ( adler != event_->adler32() ) {
throw cms::Exception("FedRawDataInputSource::nextEvent") <<
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
" but calculated 0x" << adler;
}
Expand Down Expand Up @@ -768,19 +771,17 @@ int FedRawDataInputSource::grabNextJsonFile(boost::filesystem::path const& jsonS
{
// Input dir gone?
daqDirector_->unlockFULocal();
edm::LogError("FedRawDataInputSource") << "grabNextFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
<< " - Maybe the BU run dir disappeared? Ending process with code 0...";
_exit(-1);
edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
}
catch (std::runtime_error e)
{
// Another process grabbed the file and NFS did not register this
daqDirector_->unlockFULocal();
edm::LogError("FedRawDataInputSource") << "grabNextFile - runtime Exception -: " << e.what();
edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
}

catch( boost::bad_lexical_cast const& ) {
edm::LogError("FedRawDataInputSource") << "grabNextFile - error parsing number of events from BU JSON. "
edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
<< "Input value is -: " << data;
}

Expand Down Expand Up @@ -883,7 +884,9 @@ void FedRawDataInputSource::readSupervisor()

if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
_exit(-1);
fileQueue_.push(new InputFile(evf::EvFDaqDirector::runAbort, 0));
stop=true;
break;
}

int dbgcount=0;
Expand Down
3 changes: 2 additions & 1 deletion EventFilter/Utilities/src/AuxiliaryMakers.cc
Expand Up @@ -23,14 +23,15 @@ namespace evf{
time = stv.tv_sec;
time = (time << 32) + stv.tv_usec;
}
int64_t orbitnr = (((uint64_t)record->getHeader().getData().header.orbitHigh) << 16) + record->getHeader().getData().header.orbitLow;
return edm::EventAuxiliary(eventId,
processGUID,
edm::Timestamp(time),
true,
(edm::EventAuxiliary::ExperimentType)(FED_EVTY_EXTRACT(record->getFEDHeader().getData().header.eventid)),
(int)record->getHeader().getData().header.bcid,
edm::EventAuxiliary::invalidStoreNumber,
record->getHeader().getData().header.orbitLow);
(int)(orbitnr&0xefffffffU));//framework supports only 32-bit signed
}
}
}
39 changes: 36 additions & 3 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Expand Up @@ -196,6 +196,7 @@ namespace evf {

pthread_mutex_init(&init_lock_,NULL);

stopFilePath_ = run_dir_+"/CMSSW_STOP";
}

EvFDaqDirector::~EvFDaqDirector()
Expand Down Expand Up @@ -390,6 +391,14 @@ namespace evf {
int retval = -1;
int lock_attempts = 0;

struct stat buf;
int stopFileLS = -1;
if (stat(stopFilePath_.c_str(),&buf)==0) {
stopFileLS = readLastLSEntry(stopFilePath_);
edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: " << stopFileLS;
//return runEnded;
}

while (retval==-1) {
retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
if (retval==-1) usleep(50000);
Expand All @@ -402,7 +411,6 @@ namespace evf {
else
edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and fu.lock file are present -: errno "<< errno <<":"<< strerror(errno) << std::endl;

struct stat buf;
if (stat(bu_run_dir_.c_str(), &buf)!=0) return runEnded;
if (stat((bu_run_dir_+"/fu.lock").c_str(), &buf)!=0) return runEnded;
lock_attempts=0;
Expand Down Expand Up @@ -433,7 +441,7 @@ namespace evf {
}

// try to bump
bool bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize);
bool bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, stopFileLS);
ls = readLs;
// there is a new file to grab or lumisection ended
if (bumpedOk) {
Expand Down Expand Up @@ -505,6 +513,10 @@ namespace evf {
//edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
if ( stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf)!=0)
fileStatus = runEnded;
if (stopFileLS>=0 && (int)ls > stopFileLS) {
edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
fileStatus = runEnded;
}
}
return fileStatus;
}
Expand Down Expand Up @@ -570,7 +582,7 @@ namespace evf {
return boost::lexical_cast<int>(data);
}

bool EvFDaqDirector::bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize) {
bool EvFDaqDirector::bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize, int maxLS) {

if (previousFileSize_ != 0) {
if (!fms_) {
Expand Down Expand Up @@ -627,6 +639,10 @@ namespace evf {
// this lumi ended, check for files
++ls;
index = 0;

//reached limit
if (maxLS>=0 && ls > (unsigned int)maxLS) return false;

nextFile = getInputJsonFilePath(ls,0);
if (stat(nextFile.c_str(), &buf) == 0) {
// a new file was found at new lumisection, index 0
Expand Down Expand Up @@ -751,4 +767,21 @@ namespace evf {
}
}


int EvFDaqDirector::readLastLSEntry(std::string const& file) {

boost::filesystem::ifstream ij(file);
Json::Value deserializeRoot;
Json::Reader reader;

if (!reader.parse(ij, deserializeRoot)) {
edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
return -1;
}

int ret = deserializeRoot.get("lastLS","").asInt();
return ret;

}

}
48 changes: 46 additions & 2 deletions EventFilter/Utilities/src/FastMonitoringService.cc
Expand Up @@ -69,6 +69,9 @@ namespace evf{
reg.watchPreModuleEvent(this,&FastMonitoringService::preModuleEvent);//should be stream
reg.watchPostModuleEvent(this,&FastMonitoringService::postModuleEvent);//

reg.watchPreStreamEarlyTermination(this,&FastMonitoringService::preStreamEarlyTermination);
reg.watchPreGlobalEarlyTermination(this,&FastMonitoringService::preGlobalEarlyTermination);
reg.watchPreSourceEarlyTermination(this,&FastMonitoringService::preSourceEarlyTermination);
}


Expand Down Expand Up @@ -206,6 +209,43 @@ namespace evf{

}

void FastMonitoringService::preStreamEarlyTermination(edm::StreamContext const& sc, edm::TerminationOrigin to)
{
std::string context;
if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
edm::LogInfo("FastMonitoringService") << " STREAM " << sc.streamID().value() << " earlyTermination -: ID:"<< sc.eventID()
<< " LS:" << sc.eventID().luminosityBlock() << " " << context;
std::lock_guard<std::mutex> lock(fmt_.monlock_);
exceptionInLS_.push_back(sc.eventID().luminosityBlock());
//exception_detected_=true;
}

void FastMonitoringService::preGlobalEarlyTermination(edm::GlobalContext const& gc, edm::TerminationOrigin to)
{
std::string context;
if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
edm::LogInfo("FastMonitoringService") << " GLOBAL " << "earlyTermination -: LS:"
<< gc.luminosityBlockID().luminosityBlock() << " " << context;
std::lock_guard<std::mutex> lock(fmt_.monlock_);
exceptionInLS_.push_back(gc.luminosityBlockID().luminosityBlock());
//exception_detected_=true;
}

void FastMonitoringService::preSourceEarlyTermination(edm::TerminationOrigin to)
{
std::string context;
if (to==edm::TerminationOrigin::ExceptionFromThisContext) context = " FromThisContext ";
if (to==edm::TerminationOrigin::ExceptionFromAnotherContext) context = " FromAnotherContext";
if (to==edm::TerminationOrigin::ExternalSignal) context = " FromExternalSignal";
edm::LogInfo("FastMonitoringService") << " SOURCE " << "earlyTermination -: " << context;
std::lock_guard<std::mutex> lock(fmt_.monlock_);
exception_detected_=true;
}

void FastMonitoringService::jobFailure()
{
macrostate_ = FastMonitoringThread::sError;
Expand Down Expand Up @@ -312,8 +352,12 @@ namespace evf{
{
auto itr = sourceEventsReport_.find(lumi);
if (itr==sourceEventsReport_.end()) {
//do not throw exception in case of signal termination
if (edm::shutdown_flag) {
//check if exception has been thrown (in case of Global/Stream early termination, for this LS)
bool exception_detected = exception_detected_;
for (auto ex : exceptionInLS_)
if (lumi == ex) exception_detected=true;

if (edm::shutdown_flag || exception_detected) {
edm::LogInfo("FastMonitoringService") << "Run interrupted. Skip writing EoL information -: "
<< processedEventsPerLumi_[lumi] << " events were processed in LUMI " << lumi;
//this will prevent output modules from producing json file for possibly incomplete lumi
Expand Down

0 comments on commit 19d585a

Please sign in to comment.