Skip to content

Commit

Permalink
Merge pull request #35221 from smorovic/dev-120X-locking-opt
Browse files Browse the repository at this point in the history
DAQ optimized usage of FU file locking (12_0_X backport)
  • Loading branch information
cmsbuild committed Sep 11, 2021
2 parents e3e9504 + 6ee7254 commit df3d5fa
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 25 deletions.
3 changes: 2 additions & 1 deletion EventFilter/Utilities/interface/AuxiliaryMakers.h
Expand Up @@ -12,7 +12,8 @@ namespace evf {
bool isRealData,
const edm::EventAuxiliary::ExperimentType&,
const std::string& processGUID,
bool verifyLumiSection);
bool verifyLumiSection,
bool suppressWarning);
}
} // namespace evf
#endif
3 changes: 2 additions & 1 deletion EventFilter/Utilities/interface/EvFDaqDirector.h
Expand Up @@ -127,7 +127,8 @@ namespace evf {
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
void createLumiSectionFiles(const uint32_t lumiSection,
const uint32_t currentLumiSection,
bool doCreateBoLS = true);
bool doCreateBoLS,
bool doCreateEoLS);
static int parseFRDFileHeader(std::string const& rawSourcePath,
int& rawFd,
uint16_t& rawHeaderSize,
Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/interface/FedRawDataInputSource.h
Expand Up @@ -60,7 +60,7 @@ class FedRawDataInputSource : public edm::RawInputSource {
void maybeOpenNewLumiSection(const uint32_t lumiSection);
evf::EvFDaqDirector::FileStatus nextEvent();
evf::EvFDaqDirector::FileStatus getNextEvent();
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection&);
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange);

void readSupervisor();
void readWorker(unsigned int tid);
Expand Down
33 changes: 33 additions & 0 deletions EventFilter/Utilities/plugins/DaqFakeReader.cc
Expand Up @@ -20,6 +20,8 @@
#include <cmath>
#include <sys/time.h>
#include <cstring>
#include <cstdlib>
#include <chrono>

using namespace std;
using namespace edm;
Expand All @@ -33,6 +35,7 @@ DaqFakeReader::DaqFakeReader(const edm::ParameterSet& pset)
: runNum(1),
eventNum(1),
empty_events(pset.getUntrackedParameter<bool>("emptyEvents", false)),
fillRandom_(pset.getUntrackedParameter<bool>("fillRandom", false)),
meansize(pset.getUntrackedParameter<unsigned int>("meanSize", 1024)),
width(pset.getUntrackedParameter<unsigned int>("width", 1024)),
injected_errors_per_million_events(pset.getUntrackedParameter<unsigned int>("injectErrPpm", 0)),
Expand All @@ -43,6 +46,12 @@ DaqFakeReader::DaqFakeReader(const edm::ParameterSet& pset)
if (tcdsFEDID_ < FEDNumbering::MINTCDSuTCAFEDID)
throw cms::Exception("DaqFakeReader::DaqFakeReader")
<< " TCDS FED ID lower than " << FEDNumbering::MINTCDSuTCAFEDID;
if (fillRandom_) {
//intialize random seed
auto time_count =
static_cast<long unsigned int>(std::chrono::high_resolution_clock::now().time_since_epoch().count());
srand(time_count & 0xffffffff);
}
produces<FEDRawDataCollection>();
}

Expand Down Expand Up @@ -102,6 +111,18 @@ void DaqFakeReader::fillFEDs(
// Allocate space for header+trailer+payload
feddata.resize(size + 16);

if (fillRandom_) {
//fill FED with random values
size_t size_ui = size - size % sizeof(unsigned int);
for (size_t i = 0; i < size_ui; i += sizeof(unsigned int)) {
*((unsigned int*)(feddata.data() + i)) = (unsigned int)rand();
}
//remainder
for (size_t i = size_ui; i < size; i++) {
*(feddata.data() + i) = rand() & 0xff;
}
}

// Generate header
FEDHeader::set(feddata.data(),
1, // Trigger type
Expand Down Expand Up @@ -164,3 +185,15 @@ void DaqFakeReader::beginLuminosityBlock(LuminosityBlock const& iL, EventSetup c
std::cout << "DaqFakeReader begin Lumi " << iL.luminosityBlock() << std::endl;
fakeLs_ = iL.luminosityBlock();
}

void DaqFakeReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
desc.setComment("Injector of generated raw FED data for DAQ testing");
desc.addUntracked<bool>("emptyEvents", false);
desc.addUntracked<bool>("fillRandom", false);
desc.addUntracked<unsigned int>("meanSize", 1024);
desc.addUntracked<unsigned int>("width", 1024);
desc.addUntracked<unsigned int>("injectErrPpm", 1024);
desc.addUntracked<unsigned int>("tcdsFEDID", 1024);
descriptions.add("DaqFakeReader", desc);
}
4 changes: 4 additions & 0 deletions EventFilter/Utilities/plugins/DaqFakeReader.h
Expand Up @@ -7,6 +7,7 @@
* \author N. Amapane - CERN
*/

#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/Framework/interface/one/EDProducer.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/EventSetup.h"
Expand All @@ -33,6 +34,8 @@ class DaqFakeReader : public edm::one::EDProducer<> {

void produce(edm::Event&, edm::EventSetup const&) override;

static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

private:
//
// private member functions
Expand All @@ -48,6 +51,7 @@ class DaqFakeReader : public edm::one::EDProducer<> {
edm::RunNumber_t runNum;
edm::EventNumber_t eventNum;
bool empty_events;
bool fillRandom_;
unsigned int meansize; // in bytes
unsigned int width;
unsigned int injected_errors_per_million_events;
Expand Down
5 changes: 3 additions & 2 deletions EventFilter/Utilities/src/AuxiliaryMakers.cc
Expand Up @@ -12,7 +12,8 @@ namespace evf {
bool isRealData,
const edm::EventAuxiliary::ExperimentType& eventType,
const std::string& processGUID,
bool verifyLumiSection) {
bool verifyLumiSection,
bool suppressWarning) {
edm::EventID eventId(runNumber, // check that runnumber from record is consistent
lumiSection,
tcds->header.eventNumber);
Expand All @@ -29,7 +30,7 @@ namespace evf {
const uint64_t orbitnr = ((uint64_t)tcds->header.orbitHigh << 16) | tcds->header.orbitLow;
const uint32_t recordLumiSection = tcds->header.lumiSection;

if (isRealData) {
if (isRealData && !suppressWarning) {
//warnings are disabled for generated data
if (verifyLumiSection && recordLumiSection != lumiSection)
edm::LogWarning("AuxiliaryMakers")
Expand Down
58 changes: 43 additions & 15 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Expand Up @@ -942,14 +942,18 @@ namespace evf {

void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
const uint32_t currentLumiSection,
bool doCreateBoLS) {
bool doCreateBoLS,
bool doCreateEoLS) {
if (currentLumiSection > 0) {
const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
struct stat buf;
bool found = (stat(fuEoLS.c_str(), &buf) == 0);
if (!found) {
int eol_fd = open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
close(eol_fd);
if (doCreateEoLS) {
int eol_fd =
open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
close(eol_fd);
}
if (doCreateBoLS)
createBoLSFile(lumiSection, false);
}
Expand Down Expand Up @@ -1677,8 +1681,10 @@ namespace evf {
serverError = true;
}
}

break;
}

} catch (std::exception const& e) {
edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
serverError = true;
Expand All @@ -1704,6 +1710,7 @@ namespace evf {
fileStatus = noFile;
sleep(1); //back-off if error detected
}

return fileStatus;
}

Expand Down Expand Up @@ -1760,7 +1767,7 @@ namespace evf {

//local lock to force index json and EoLS files to appear in order
if (fileBrokerUseLocalLock_)
lockFULocal2();
lockFULocal();

int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
bool rawHeader = false;
Expand All @@ -1770,22 +1777,21 @@ namespace evf {
if (serverError) {
//do not update anything
if (fileBrokerUseLocalLock_)
unlockFULocal2();
unlockFULocal();
return noFile;
}

//handle creation of EoLS and BoLS files if lumisection has changed
//handle creation of BoLS files if lumisection has changed
if (currentLumiSection == 0) {
if (fileStatus == runEnded) {
createLumiSectionFiles(closedServerLS, 0);
createLumiSectionFiles(serverLS, closedServerLS, false); // +1
} else
createLumiSectionFiles(serverLS, 0);
if (fileStatus == runEnded)
createLumiSectionFiles(closedServerLS, 0, true, false);
else
createLumiSectionFiles(serverLS, 0, true, false);
} else {
//loop over and create any EoLS files missing
if (closedServerLS >= currentLumiSection) {
//only BoLS files
for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
createLumiSectionFiles(i + 1, i);
createLumiSectionFiles(i + 1, i, true, false);
}
}

Expand All @@ -1803,6 +1809,11 @@ namespace evf {
close(rawFd);
rawFd = -1;
}

//can unlock because all files have been created locally
if (fileBrokerUseLocalLock_)
unlockFULocal();

if (!fileFound) {
//catch condition where directory got deleted
fileStatus = noFile;
Expand All @@ -1813,9 +1824,26 @@ namespace evf {
}
}

//can unlock because all files have been created locally
if (fileBrokerUseLocalLock_)
//handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
//so that EoLS files can not appear locally before index files
if (currentLumiSection == 0) {
lockFULocal2();
if (fileStatus == runEnded) {
createLumiSectionFiles(closedServerLS, 0, false, true);
createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
} else {
createLumiSectionFiles(serverLS, 0, false, true);
}
unlockFULocal2();
} else {
if (closedServerLS >= currentLumiSection) {
//lock exclusive to create EoLS files
lockFULocal2();
for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
createLumiSectionFiles(i + 1, i, false, true);
unlockFULocal2();
}
}

if (fileStatus == runEnded)
ls = std::max(currentLumiSection, serverLS);
Expand Down
14 changes: 10 additions & 4 deletions EventFilter/Utilities/src/FedRawDataInputSource.cc
Expand Up @@ -613,7 +613,8 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
setMonState(inReadEvent);
std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
bool tcdsInRange;
edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);

if (useL1EventID_) {
eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
Expand All @@ -639,7 +640,8 @@ void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
event_->isRealData(),
static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
processGUID(),
!fileListLoopMode_);
!fileListLoopMode_,
!tcdsInRange);
aux.setProcessHistoryID(processHistoryID_);
makeEvent(eventPrincipal, aux);
}
Expand Down Expand Up @@ -684,7 +686,7 @@ void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
return;
}

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData) {
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange) {
edm::TimeValue_t time;
timeval stv;
gettimeofday(&stv, nullptr);
Expand All @@ -696,6 +698,7 @@ edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollect
unsigned char* event = (unsigned char*)event_->payload();
GTPEventID_ = 0;
tcds_pointer_ = nullptr;
tcdsInRange = false;
uint16_t selectedTCDSFed = 0;
while (eventSize > 0) {
assert(eventSize >= FEDTrailer::length);
Expand All @@ -712,6 +715,9 @@ edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollect
if (!selectedTCDSFed) {
selectedTCDSFed = fedId;
tcds_pointer_ = event + eventSize;
if (fedId >= FEDNumbering::MINTCDSuTCAFEDID && fedId <= FEDNumbering::MAXTCDSuTCAFEDID) {
tcdsInRange = true;
}
} else
throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
<< "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
Expand Down Expand Up @@ -810,7 +816,6 @@ void FedRawDataInputSource::readSupervisor() {
if (fms_) {
setMonStateSup(inSupBusy);
fms_->startedLookingForFile();
setMonStateSup(inSupLockPolling);
}

evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
Expand All @@ -830,6 +835,7 @@ void FedRawDataInputSource::readSupervisor() {

assert(rawFd == -1);
uint64_t thisLockWaitTimeUs = 0.;
setMonStateSup(inSupLockPolling);
if (fileListMode_) {
//return LS if LS not set, otherwise return file
status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/test/LocalRunBUFU.sh
Expand Up @@ -19,7 +19,7 @@ RC=0
P=$$
PREFIX=results_${USER}${P}
OUTDIR=${LOCAL_TMP_DIR}/${PREFIX}

echo "Output will be temporarily written to ${OUTDIR}"

mkdir ${OUTDIR}
cp ${SCRIPTDIR}/startBU.py ${OUTDIR}
Expand Down
1 change: 1 addition & 0 deletions EventFilter/Utilities/test/startBU.py
Expand Up @@ -121,6 +121,7 @@
defaultQualifier = cms.untracked.int32(0))

process.s = cms.EDProducer("DaqFakeReader",
fillRandom = cms.untracked.bool(True),
meanSize = cms.untracked.uint32(options.fedMeanSize),
width = cms.untracked.uint32(int(math.ceil(options.fedMeanSize/2.))),
tcdsFEDID = cms.untracked.uint32(1024),
Expand Down

0 comments on commit df3d5fa

Please sign in to comment.