Skip to content

Commit

Permalink
Merge pull request #9385 from smorovic/filelocking-75X
Browse files Browse the repository at this point in the history
Online file-locking improvements (75X)
  • Loading branch information
cmsbuild committed Jun 2, 2015
2 parents c73a0af + 34b38b6 commit e2c5480
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 24 deletions.
3 changes: 2 additions & 1 deletion EventFilter/Utilities/interface/EvFDaqDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ namespace evf{
void removeFile(unsigned int ls, unsigned int index);
void removeFile(std::string );

FileStatus updateFuLock(unsigned int& ls, std::string& nextFile, uint32_t& fsize);
FileStatus updateFuLock(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime);
void tryInitializeFuLockFile();
unsigned int getRunNumber() const { return run_; }
unsigned int getJumpLS() const { return jumpLS_; }
Expand Down Expand Up @@ -136,6 +136,7 @@ namespace evf{
bool requireTSPSet_;
std::string selectedTransferMode_;
std::string hltSourceDirectory_;
unsigned int fuLockPollInterval_;

std::string hostname_;
std::string run_string_;
Expand Down
3 changes: 3 additions & 0 deletions EventFilter/Utilities/interface/FastMonitoringService.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ namespace evf{
void preStreamEarlyTermination(edm::StreamContext const&, edm::TerminationOrigin);
void preGlobalEarlyTermination(edm::GlobalContext const&, edm::TerminationOrigin);
void preSourceEarlyTermination(edm::TerminationOrigin);
void setExceptionDetected(unsigned int ls);

//this is still needed for use in special functions like DQM which are in turn framework services
void setMicroState(MicroStateService::Microstate);
Expand All @@ -149,6 +150,7 @@ namespace evf{
void accumulateFileSize(unsigned int lumi, unsigned long fileSize);
void startedLookingForFile();
void stoppedLookingForFile(unsigned int lumi);
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount);
unsigned int getEventsProcessedForLumi(unsigned int lumi);
std::string getRunDirName() const { return runDirectory_.stem().string(); }

Expand Down Expand Up @@ -226,6 +228,7 @@ namespace evf{
//helpers for source statistics:
std::map<unsigned int, unsigned long> accuSize_;
std::vector<double> leadTimes_;
std::map<unsigned int, std::pair<double,unsigned int>> lockStatsDuringLumi_;

//for output module
std::map<unsigned int, unsigned int> processedEventsPerLumi_;
Expand Down
8 changes: 8 additions & 0 deletions EventFilter/Utilities/interface/FastMonitoringThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ namespace evf{
DoubleJ fastThroughputJ_;
DoubleJ fastAvgLeadTimeJ_;
IntJ fastFilesProcessedJ_;
DoubleJ fastLockWaitJ_;
IntJ fastLockCountJ_;

unsigned int varIndexThrougput_;

Expand All @@ -52,10 +54,14 @@ namespace evf{
fastThroughputJ_ = 0;
fastAvgLeadTimeJ_ = 0;
fastFilesProcessedJ_ = 0;
fastLockWaitJ_ = 0;
fastLockCountJ_ = 0;
fastMacrostateJ_.setName("Macrostate");
fastThroughputJ_.setName("Throughput");
fastAvgLeadTimeJ_.setName("AverageLeadTime");
fastFilesProcessedJ_.setName("FilesProcessed");
fastLockWaitJ_.setName("LockWaitUs");
fastLockCountJ_.setName("LockCount");

fastPathProcessedJ_ = 0;
fastPathProcessedJ_.setName("Processed");
Expand All @@ -68,6 +74,8 @@ namespace evf{
fm->registerGlobalMonitorable(&fastThroughputJ_,false);
fm->registerGlobalMonitorable(&fastAvgLeadTimeJ_,false);
fm->registerGlobalMonitorable(&fastFilesProcessedJ_,false);
fm->registerGlobalMonitorable(&fastLockWaitJ_,false);
fm->registerGlobalMonitorable(&fastLockCountJ_,false);

for (unsigned int i=0;i<nStreams;i++) {
AtomicMonUInt * p = new AtomicMonUInt;
Expand Down
25 changes: 22 additions & 3 deletions EventFilter/Utilities/plugins/FedRawDataInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
uint32_t crc=0;
crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
if ( crc != event_->crc32c() ) {
if (fms_) fms_->setExceptionDetected(currentLumiSection_);
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
" but calculated 0x" << crc;
Expand All @@ -556,6 +557,7 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent()
adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());

if ( adler != event_->adler32() ) {
if (fms_) fms_->setExceptionDetected(currentLumiSection_);
throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
"Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
" but calculated 0x" << adler;
Expand Down Expand Up @@ -895,6 +897,10 @@ void FedRawDataInputSource::readSupervisor()
uint32_t ls;
uint32_t fileSize;

uint32_t monLS=1;
uint32_t lockCount=0;
uint64_t sumLockWaitTimeUs=0.;

if (fms_) fms_->startedLookingForFile();

evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
Expand All @@ -904,14 +910,27 @@ void FedRawDataInputSource::readSupervisor()
stop=true;
break;
}

status = daqDirector_->updateFuLock(ls,nextFile,fileSize);

uint64_t thisLockWaitTimeUs=0.;
status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);

//monitoring of lock wait time
if (thisLockWaitTimeUs>0.)
sumLockWaitTimeUs+=thisLockWaitTimeUs;
lockCount++;
if (ls>monLS) {
monLS=ls;
if (lockCount)
if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
lockCount=0;
sumLockWaitTimeUs=0;
}

//check again for any remaining index/EoLS files after EoR file is seen
if ( status == evf::EvFDaqDirector::runEnded) {
usleep(100000);
//now all files should have appeared in ramdisk, check again if any raw files were left behind
status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
}

if ( status == evf::EvFDaqDirector::runEnded) {
Expand Down
9 changes: 9 additions & 0 deletions EventFilter/Utilities/plugins/microstatedef.jsd
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
{
"name" : "FilesProcessed",
"operation" : "sum"
},
{
"name" : "LockWaitUs",
"operation" : "sum"
},
{
"name" : "LockCount",
"operation" : "sum"
}

]
}
9 changes: 9 additions & 0 deletions EventFilter/Utilities/plugins/microstatedeffast.jsd
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
{
"name" : "FilesProcessed",
"operation" : "sum"
},
{
"name" : "LockWaitUs",
"operation" : "sum"
},
{
"name" : "LockCount",
"operation" : "sum"
}

]
}
3 changes: 2 additions & 1 deletion EventFilter/Utilities/python/EvFDaqDirector_cfi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
runNumber = cms.untracked.uint32(0),
outputAdler32Recheck=cms.untracked.bool(False),
requireTransfersPSet=cms.untracked.bool(False),
selectedTransferMode=cms.untracked.string("")
selectedTransferMode=cms.untracked.string(""),
fuLockPollInterval = cms.untracked.uint32(2000)
)

10 changes: 9 additions & 1 deletion EventFilter/Utilities/src/AuxiliaryMakers.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <sys/time.h>

#include "EventFilter/Utilities/interface/AuxiliaryMakers.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"

namespace evf{
namespace evtn{
Expand All @@ -23,7 +24,14 @@ namespace evf{
time = stv.tv_sec;
time = (time << 32) + stv.tv_usec;
}
int64_t orbitnr = (((uint64_t)record->getHeader().getData().header.orbitHigh) << 16) + record->getHeader().getData().header.orbitLow;
uint64_t orbitnr = (((uint64_t)record->getHeader().getData().header.orbitHigh) << 16) + record->getHeader().getData().header.orbitLow;
uint32_t recordLumiSection = record->getHeader().getData().header.lumiSection;

if (recordLumiSection != lumiSection)
edm::LogWarning("AuxiliaryMakers") << "Lumisection mismatch, external : "<<lumiSection << ", record : " << recordLumiSection;
if ((orbitnr >> 18) + 1 != recordLumiSection)
edm::LogWarning("AuxiliaryMakers") << "Lumisection and orbit number mismatch, LS : " << lumiSection << ", LS from orbit: " << ((orbitnr >> 18) + 1) << ", orbit:" << orbitnr;

return edm::EventAuxiliary(eventId,
processGUID,
edm::Timestamp(time),
Expand Down
50 changes: 32 additions & 18 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace evf {
requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet",false)),
selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode","")),
hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory","")),
fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval",2000)),
hostname_(""),
bu_readlock_fd_(-1),
bu_writelock_fd_(-1),
Expand Down Expand Up @@ -106,6 +107,16 @@ namespace evf {
char hostname[33];
gethostname(hostname,32);
hostname_ = hostname;

char * fuLockPollIntervalPtr = getenv("FFF_LOCKPOLLINTERVAL");
if (fuLockPollIntervalPtr) {
try {
fuLockPollInterval_=boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
edm::LogInfo("Setting fu lock poll interval by environment string: ") << fuLockPollInterval_ << " us";
}
catch (...) {edm::LogWarning("Unable to parse environment string: ") << std::string(fuLockPollIntervalPtr);}
}

// check if base dir exists or create it accordingly
int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
if (retval != 0 && errno != EEXIST) {
Expand Down Expand Up @@ -422,7 +433,7 @@ namespace evf {
removeFile(getRawFilePath(ls,index));
}

EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls, std::string& nextFile, uint32_t& fsize) {
EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime) {
EvFDaqDirector::FileStatus fileStatus = noFile;

int retval = -1;
Expand All @@ -436,13 +447,16 @@ namespace evf {
//return runEnded;
}

timeval ts_lockbegin;
gettimeofday(&ts_lockbegin,0);

while (retval==-1) {
retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
if (retval==-1) usleep(50000);
if (retval==-1) usleep(fuLockPollInterval_);
else continue;

lock_attempts++;
if (lock_attempts>100 || errno==116) {
lock_attempts+=fuLockPollInterval_;
if (lock_attempts>5000000 || errno==116) {
if (errno==116)
edm::LogWarning("EvFDaqDirector") << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
else
Expand All @@ -461,6 +475,14 @@ namespace evf {
lock_attempts=0;
}
}

timeval ts_lockend;
gettimeofday(&ts_lockend,0);
long deltat = (ts_lockend.tv_usec-ts_lockbegin.tv_usec) + (ts_lockend.tv_sec-ts_lockbegin.tv_sec)*1000000;
if (deltat>0.) lockWaitTime=deltat;



if(retval!=0) return fileStatus;

#ifdef DEBUG
Expand Down Expand Up @@ -490,17 +512,10 @@ namespace evf {
ls = readLs;
// there is a new file to grab or lumisection ended
if (bumpedOk) {
// rewind and clear
check = fseek(fu_rw_lock_stream, 0, SEEK_SET);
if (check == 0) {
ftruncate(fu_readwritelock_fd_, 0);
fflush(fu_rw_lock_stream); //this should not be needed ???
} else
edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error "
<< strerror(errno);
// write new data
check = fseek(fu_rw_lock_stream, 0, SEEK_SET);
if (check == 0) {
ftruncate(fu_readwritelock_fd_, 0);
// write next index in the file, which is the file the next process should take
if (testModeNoBuilderUnit_) {
fprintf(fu_rw_lock_stream, "%u %u %u %u", readLs,
Expand All @@ -511,9 +526,8 @@ namespace evf {
fprintf(fu_rw_lock_stream, "%u %u", readLs,
readIndex + 1);
}
fflush(fu_rw_lock_stream);
fsync(fu_readwritelock_fd_);

fflush(fu_rw_lock_stream);
fsync(fu_readwritelock_fd_);
fileStatus = newFile;

if (testModeNoBuilderUnit_)
Expand All @@ -525,8 +539,8 @@ namespace evf {
<< readIndex + 1;

} else
edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error "
<< strerror(errno);
throw cms::Exception("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error "
<< strerror(errno);
}
} else
edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
Expand Down Expand Up @@ -785,7 +799,7 @@ namespace evf {

void EvFDaqDirector::lockFULocal() {
//fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
flock(fulocal_rwlock_fd_,LOCK_EX);
flock(fulocal_rwlock_fd_,LOCK_SH);
}

void EvFDaqDirector::unlockFULocal() {
Expand Down
24 changes: 24 additions & 0 deletions EventFilter/Utilities/src/FastMonitoringService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ namespace evf{
exception_detected_=true;
}

void FastMonitoringService::setExceptionDetected(unsigned int ls) {
if (!ls) exception_detected_=true;
else exceptionInLS_.push_back(ls);
}

void FastMonitoringService::jobFailure()
{
macrostate_ = FastMonitoringThread::sError;
Expand Down Expand Up @@ -307,6 +312,7 @@ namespace evf{
avgLeadTime_.erase(oldLumi);
filesProcessedDuringLumi_.erase(oldLumi);
accuSize_.erase(oldLumi);
lockStatsDuringLumi_.erase(oldLumi);
processedEventsPerLumi_.erase(oldLumi);
}
lastGlobalLumi_= newLumi;
Expand Down Expand Up @@ -583,6 +589,13 @@ namespace evf{
}
}

void FastMonitoringService::reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
{
std::lock_guard<std::mutex> lock(fmt_.monlock_);
lockStatsDuringLumi_[ls]=std::pair<double,unsigned int>(waitTime,lockCount);

}

//for the output module
unsigned int FastMonitoringService::getEventsProcessedForLumi(unsigned int lumi) {
std::lock_guard<std::mutex> lock(fmt_.monlock_);
Expand Down Expand Up @@ -615,6 +628,17 @@ namespace evf{
if (iti != filesProcessedDuringLumi_.end())
fmt_.m_data.fastFilesProcessedJ_ = iti->second;
else fmt_.m_data.fastFilesProcessedJ_=0;

auto itrd = lockStatsDuringLumi_.find(ls);
if (itrd != lockStatsDuringLumi_.end()) {
fmt_.m_data.fastLockWaitJ_ = itrd->second.first;
fmt_.m_data.fastLockCountJ_ = itrd->second.second;
}
else {
fmt_.m_data.fastLockWaitJ_=0.;
fmt_.m_data.fastLockCountJ_=0.;
}

}
else return;

Expand Down

0 comments on commit e2c5480

Please sign in to comment.