Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

daq updates #6718

Merged
merged 10 commits into from Dec 3, 2014
7 changes: 4 additions & 3 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Expand Up @@ -39,7 +39,7 @@ namespace evf{
{
public:

enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded };
enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };

explicit EvFDaqDirector( const edm::ParameterSet &pset, edm::ActivityRegistry& reg );
~EvFDaqDirector();
Expand Down Expand Up @@ -99,6 +99,7 @@ namespace evf{
void lockFULocal2();
void unlockFULocal2();
void createRunOpendirMaybe();
int readLastLSEntry(std::string const& file);
void setDeleteTracking( std::mutex* fileDeleteLock,std::list<std::pair<int,InputFile*>> *filesToDelete) {
fileDeleteLockPtr_=fileDeleteLock;
filesToDeletePtr_ = filesToDelete;
Expand All @@ -108,7 +109,7 @@ namespace evf{
private:
//bool bulock();
//bool fulock();
bool bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize);
bool bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize, int maxLS);
void openFULockfileStream(std::string& fuLockFilePath, bool create);
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
Expand Down Expand Up @@ -177,7 +178,7 @@ namespace evf{

bool readEolsDefinition_ = true;
unsigned int eolsNFilesIndex_ = 1;

std::string stopFilePath_;
};
}

Expand Down
5 changes: 5 additions & 0 deletions EventFilter/Utilities/interface/FastMonitoringService.h
Expand Up @@ -137,6 +137,9 @@ namespace evf{
void postSourceEvent(edm::StreamID);
void preModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
void postModuleEvent(edm::StreamContext const&, edm::ModuleCallingContext const&);
void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
void preSourceEarlyTermination(edm::TerminationOrigin);

//this is still needed for use in special functions like DQM which are in turn framework services
void setMicroState(MicroStateService::Microstate);
Expand Down Expand Up @@ -249,6 +252,8 @@ namespace evf{
bool pathLegendWritten_ = false;

std::atomic<bool> monInit_;
bool exception_detected_ = false;
std::vector<unsigned int> exceptionInLS_;
};

}
Expand Down
73 changes: 38 additions & 35 deletions EventFilter/Utilities/plugins/FedRawDataInputSource.cc
Expand Up @@ -81,7 +81,7 @@ FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset,
edm::LogInfo("FedRawDataInputSource") << "Test mode is ON!";

processHistoryID_ = daqProvenanceHelper_.daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
setNewRun();
setNewRun();
setRunAuxiliary(new edm::RunAuxiliary(runNumber_, edm::Timestamp::beginOfTime(),
edm::Timestamp::invalidTimestamp()));

Expand All @@ -98,7 +98,7 @@ FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset,
if (!numBuffers_)
throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
"no reading enabled with numBuffers parameter 0";

numConcurrentReads_=numBuffers_-1;
singleBufferMode_ = !(numBuffers_>1);

Expand Down Expand Up @@ -298,14 +298,14 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
{
const size_t headerSize[4] = {0,2*sizeof(uint32),(4 + 1024) * sizeof(uint32),7*sizeof(uint32)}; //size per version of FRDEventHeader

if (setExceptionState_) threadError();
if (setExceptionState_) threadError();
if (!currentFile_)
{
if (!streamFileTrackerPtr_) {
streamFileTrackerPtr_ = daqDirector_->getStreamFileTracker();
nStreams_ = streamFileTrackerPtr_->size();
if (nStreams_>10) checkEvery_=nStreams_;
}
}

evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
if (!fileQueue_.try_pop(currentFile_))
Expand All @@ -322,8 +322,11 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
currentFile_=nullptr;
return status;
}

else if (status == evf::EvFDaqDirector::newLumi)
else if ( status == evf::EvFDaqDirector::runAbort)
{
throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
}
else if (status == evf::EvFDaqDirector::newLumi)
{
if (getLSFromFilename_) {
if (currentFile_->lumi_ > currentLumiSection_) {
Expand Down Expand Up @@ -371,7 +374,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
if (currentFile_->nEvents_!=currentFile_->nProcessed_)
{
throw cms::Exception("RuntimeError")
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< "Fully processed " << currentFile_->nProcessed_
<< " from the file " << currentFile_->fileName_
<< " but according to BU JSON there should be "
Expand Down Expand Up @@ -401,7 +404,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
//file is too short
if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < headerSize[detectedFRDversion_])
{
throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Premature end of input file while reading event header";
}
if (singleBufferMode_) {
Expand All @@ -415,7 +418,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;

//conditions when read amount is not sufficient for the header to fit
if (!bufferInputRead_ || bufferInputRead_ < headerSize[detectedFRDversion_]
if (!bufferInputRead_ || bufferInputRead_ < headerSize[detectedFRDversion_]
|| eventChunkSize_ - currentFile_->chunkPosition_ < headerSize[detectedFRDversion_])
{
readNextChunkIntoBuffer(currentFile_);
Expand All @@ -429,24 +432,24 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
if ( bufferInputRead_ < headerSize[detectedFRDversion_])
{
throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Premature end of input file while reading event header";
}
}

event_.reset( new FRDEventMsgView(dataPosition) );
if (event_->size()>eventChunkSize_) {
throw cms::Exception("FedRawDataInputSource::nextEvent")
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< " event id:"<< event_->event()<< " lumi:" << event_->lumi()
<< " run:" << event_->run() << " of size:" << event_->size()
<< " run:" << event_->run() << " of size:" << event_->size()
<< " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
}

const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];

if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize)
{
throw cms::Exception("FedRawDataInputSource::nextEvent") <<
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Premature end of input file while reading event data";
}
if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
Expand All @@ -466,7 +469,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
//wait for the current chunk to become added to the vector
while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
usleep(10000);
if (setExceptionState_) threadError();
if (setExceptionState_) threadError();
}

//check if header is at the boundary of two chunks
Expand All @@ -478,17 +481,17 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()

event_.reset( new FRDEventMsgView(dataPosition) );
if (event_->size()>eventChunkSize_) {
throw cms::Exception("FedRawDataInputSource::nextEvent")
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< " event id:"<< event_->event()<< " lumi:" << event_->lumi()
<< " run:" << event_->run() << " of size:" << event_->size()
<< " run:" << event_->run() << " of size:" << event_->size()
<< " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
}

const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];

if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize)
{
throw cms::Exception("FedRawDataInputSource::nextEvent") <<
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Premature end of input file while reading event data";
}

Expand Down Expand Up @@ -524,7 +527,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());

if ( adler != event_->adler32() ) {
throw cms::Exception("FedRawDataInputSource::nextEvent") <<
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
" but calculated 0x" << adler;
}
Expand Down Expand Up @@ -574,7 +577,7 @@ void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal)
edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);

if (useL1EventID_){
eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
edm::EventAuxiliary::PhysicsTrigger);
aux.setProcessHistoryID(processHistoryID_);
Expand All @@ -590,21 +593,21 @@ void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal)
}
else{
evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
edm::EventAuxiliary aux = evf::evtn::makeEventAuxiliary(&record,
edm::EventAuxiliary aux = evf::evtn::makeEventAuxiliary(&record,
eventRunNumber_,currentLumiSection_,
processGUID());
processGUID());
aux.setProcessHistoryID(processHistoryID_);
makeEvent(eventPrincipal, aux);
}



std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));

//FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
//eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
// daqProvenanceHelper_.dummyProvenance_);

eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
daqProvenanceHelper_.dummyProvenance());

Expand Down Expand Up @@ -768,19 +771,17 @@ int FedRawDataInputSource::grabNextJsonFile(boost::filesystem::path const& jsonS
{
// Input dir gone?
daqDirector_->unlockFULocal();
edm::LogError("FedRawDataInputSource") << "grabNextFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
<< " - Maybe the BU run dir disappeared? Ending process with code 0...";
_exit(-1);
edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
}
catch (std::runtime_error e)
{
// Another process grabbed the file and NFS did not register this
daqDirector_->unlockFULocal();
edm::LogError("FedRawDataInputSource") << "grabNextFile - runtime Exception -: " << e.what();
edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
}

catch( boost::bad_lexical_cast const& ) {
edm::LogError("FedRawDataInputSource") << "grabNextFile - error parsing number of events from BU JSON. "
edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
<< "Input value is -: " << data;
}

Expand Down Expand Up @@ -826,7 +827,7 @@ void FedRawDataInputSource::readSupervisor()
unsigned int currentLumiSection = 0;
//threadInit_.exchange(true,std::memory_order_acquire);

{
{
std::unique_lock<std::mutex> lk(startupLock_);
startupCv_.notify_one();
}
Expand Down Expand Up @@ -883,7 +884,9 @@ void FedRawDataInputSource::readSupervisor()

if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
_exit(-1);
fileQueue_.push(new InputFile(evf::EvFDaqDirector::runAbort, 0));
stop=true;
break;
}

int dbgcount=0;
Expand Down Expand Up @@ -1082,18 +1085,18 @@ inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
//wait for chunk
while (!waitForChunk(currentChunk_)) {
usleep(100000);
if (parent_->exceptionState()) parent_->threadError();
if (parent_->exceptionState()) parent_->threadError();
}

dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;

if (currentLeft < size) {

//we need next chunk
while (!waitForChunk(currentChunk_+1)) {
usleep(100000);
if (parent_->exceptionState()) parent_->threadError();
if (parent_->exceptionState()) parent_->threadError();
}
//copy everything to beginning of the first chunk
dataPosition-=chunkPosition_;
Expand Down Expand Up @@ -1136,7 +1139,7 @@ void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile *file)
fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
bufferInputRead_ = 0;
//off_t pos = lseek(fileDescriptor,0,SEEK_SET);
if (fileDescriptor_>=0)
if (fileDescriptor_>=0)
LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
else
{
Expand Down Expand Up @@ -1173,7 +1176,7 @@ void FedRawDataInputSource::readNextChunkIntoBuffer(InputFile *file)
}
file->chunkPosition_=0;//data was moved to beginning of the chunk
}
if (bufferInputRead_ == file->fileSize_) { // no more data in this file
if (bufferInputRead_ == file->fileSize_) { // no more data in this file
if (fileDescriptor_!=-1)
{
LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
Expand Down
7 changes: 4 additions & 3 deletions EventFilter/Utilities/src/AuxiliaryMakers.cc
Expand Up @@ -5,15 +5,15 @@
namespace evf{
namespace evtn{

edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record,
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record,
unsigned int runNumber,
unsigned int lumiSection,
std::string const &processGUID){
edm::EventID eventId(runNumber, // check that runnumber from record is consistent
//record->getHeader().getData().header.lumiSection,//+1
lumiSection,
record->getHeader().getData().header.eventNumber);

uint64_t gpsh = record->getBST().getBST().gpstimehigh;
uint32_t gpsl = record->getBST().getBST().gpstimelow;
edm::TimeValue_t time = static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl);
Expand All @@ -23,14 +23,15 @@ namespace evf{
time = stv.tv_sec;
time = (time << 32) + stv.tv_usec;
}
int64_t orbitnr = (((uint64_t)record->getHeader().getData().header.orbitHigh) << 16) + record->getHeader().getData().header.orbitLow;
return edm::EventAuxiliary(eventId,
processGUID,
edm::Timestamp(time),
true,
(edm::EventAuxiliary::ExperimentType)(FED_EVTY_EXTRACT(record->getFEDHeader().getData().header.eventid)),
(int)record->getHeader().getData().header.bcid,
edm::EventAuxiliary::invalidStoreNumber,
record->getHeader().getData().header.orbitLow);
(int)(orbitnr&0xefffffffU));//framework supports only 32-bit signed
}
}
}