Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAQ optimized usage of FU file locking (12_0_X backport) #35221

Merged
merged 3 commits into from Sep 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion EventFilter/Utilities/interface/AuxiliaryMakers.h
Expand Up @@ -12,7 +12,8 @@ namespace evf {
bool isRealData,
const edm::EventAuxiliary::ExperimentType&,
const std::string& processGUID,
bool verifyLumiSection);
bool verifyLumiSection,
bool suppressWarning);
}
} // namespace evf
#endif
3 changes: 2 additions & 1 deletion EventFilter/Utilities/interface/EvFDaqDirector.h
Expand Up @@ -127,7 +127,8 @@ namespace evf {
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
void createLumiSectionFiles(const uint32_t lumiSection,
const uint32_t currentLumiSection,
bool doCreateBoLS = true);
bool doCreateBoLS,
bool doCreateEoLS);
static int parseFRDFileHeader(std::string const& rawSourcePath,
int& rawFd,
uint16_t& rawHeaderSize,
Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/interface/FedRawDataInputSource.h
Expand Up @@ -60,7 +60,7 @@ class FedRawDataInputSource : public edm::RawInputSource {
void maybeOpenNewLumiSection(const uint32_t lumiSection);
evf::EvFDaqDirector::FileStatus nextEvent();
evf::EvFDaqDirector::FileStatus getNextEvent();
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection&);
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange);

void readSupervisor();
void readWorker(unsigned int tid);
Expand Down
33 changes: 33 additions & 0 deletions EventFilter/Utilities/plugins/DaqFakeReader.cc
Expand Up @@ -20,6 +20,8 @@
#include <cmath>
#include <sys/time.h>
#include <cstring>
#include <cstdlib>
#include <chrono>

using namespace std;
using namespace edm;
Expand All @@ -33,6 +35,7 @@ DaqFakeReader::DaqFakeReader(const edm::ParameterSet& pset)
: runNum(1),
eventNum(1),
empty_events(pset.getUntrackedParameter<bool>("emptyEvents", false)),
fillRandom_(pset.getUntrackedParameter<bool>("fillRandom", false)),
meansize(pset.getUntrackedParameter<unsigned int>("meanSize", 1024)),
width(pset.getUntrackedParameter<unsigned int>("width", 1024)),
injected_errors_per_million_events(pset.getUntrackedParameter<unsigned int>("injectErrPpm", 0)),
Expand All @@ -43,6 +46,12 @@ DaqFakeReader::DaqFakeReader(const edm::ParameterSet& pset)
if (tcdsFEDID_ < FEDNumbering::MINTCDSuTCAFEDID)
throw cms::Exception("DaqFakeReader::DaqFakeReader")
<< " TCDS FED ID lower than " << FEDNumbering::MINTCDSuTCAFEDID;
if (fillRandom_) {
//intialize random seed
auto time_count =
static_cast<long unsigned int>(std::chrono::high_resolution_clock::now().time_since_epoch().count());
srand(time_count & 0xffffffff);
}
produces<FEDRawDataCollection>();
}

Expand Down Expand Up @@ -102,6 +111,18 @@ void DaqFakeReader::fillFEDs(
// Allocate space for header+trailer+payload
feddata.resize(size + 16);

if (fillRandom_) {
//fill FED with random values
size_t size_ui = size - size % sizeof(unsigned int);
for (size_t i = 0; i < size_ui; i += sizeof(unsigned int)) {
*((unsigned int*)(feddata.data() + i)) = (unsigned int)rand();
}
//remainder
for (size_t i = size_ui; i < size; i++) {
*(feddata.data() + i) = rand() & 0xff;
}
}

// Generate header
FEDHeader::set(feddata.data(),
1, // Trigger type
Expand Down Expand Up @@ -164,3 +185,15 @@ void DaqFakeReader::beginLuminosityBlock(LuminosityBlock const& iL, EventSetup c
std::cout << "DaqFakeReader begin Lumi " << iL.luminosityBlock() << std::endl;
fakeLs_ = iL.luminosityBlock();
}

void DaqFakeReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
desc.setComment("Injector of generated raw FED data for DAQ testing");
desc.addUntracked<bool>("emptyEvents", false);
desc.addUntracked<bool>("fillRandom", false);
desc.addUntracked<unsigned int>("meanSize", 1024);
desc.addUntracked<unsigned int>("width", 1024);
desc.addUntracked<unsigned int>("injectErrPpm", 1024);
desc.addUntracked<unsigned int>("tcdsFEDID", 1024);
descriptions.add("DaqFakeReader", desc);
}
4 changes: 4 additions & 0 deletions EventFilter/Utilities/plugins/DaqFakeReader.h
Expand Up @@ -7,6 +7,7 @@
* \author N. Amapane - CERN
*/

#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/Framework/interface/one/EDProducer.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/EventSetup.h"
Expand All @@ -33,6 +34,8 @@ class DaqFakeReader : public edm::one::EDProducer<> {

void produce(edm::Event&, edm::EventSetup const&) override;

static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

private:
//
// private member functions
Expand All @@ -48,6 +51,7 @@ class DaqFakeReader : public edm::one::EDProducer<> {
edm::RunNumber_t runNum;
edm::EventNumber_t eventNum;
bool empty_events;
bool fillRandom_;
unsigned int meansize; // in bytes
unsigned int width;
unsigned int injected_errors_per_million_events;
Expand Down
5 changes: 3 additions & 2 deletions EventFilter/Utilities/src/AuxiliaryMakers.cc
Expand Up @@ -12,7 +12,8 @@ namespace evf {
bool isRealData,
const edm::EventAuxiliary::ExperimentType& eventType,
const std::string& processGUID,
bool verifyLumiSection) {
bool verifyLumiSection,
bool suppressWarning) {
edm::EventID eventId(runNumber, // check that runnumber from record is consistent
lumiSection,
tcds->header.eventNumber);
Expand All @@ -29,7 +30,7 @@ namespace evf {
const uint64_t orbitnr = ((uint64_t)tcds->header.orbitHigh << 16) | tcds->header.orbitLow;
const uint32_t recordLumiSection = tcds->header.lumiSection;

if (isRealData) {
if (isRealData && !suppressWarning) {
//warnings are disabled for generated data
if (verifyLumiSection && recordLumiSection != lumiSection)
edm::LogWarning("AuxiliaryMakers")
Expand Down
58 changes: 43 additions & 15 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Expand Up @@ -942,14 +942,18 @@ namespace evf {

void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
const uint32_t currentLumiSection,
bool doCreateBoLS) {
bool doCreateBoLS,
bool doCreateEoLS) {
if (currentLumiSection > 0) {
const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
struct stat buf;
bool found = (stat(fuEoLS.c_str(), &buf) == 0);
if (!found) {
int eol_fd = open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
close(eol_fd);
if (doCreateEoLS) {
int eol_fd =
open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
close(eol_fd);
}
if (doCreateBoLS)
createBoLSFile(lumiSection, false);
}
Expand Down Expand Up @@ -1677,8 +1681,10 @@ namespace evf {
serverError = true;
}
}

break;
}

} catch (std::exception const& e) {
edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
serverError = true;
Expand All @@ -1704,6 +1710,7 @@ namespace evf {
fileStatus = noFile;
sleep(1); //back-off if error detected
}

return fileStatus;
}

Expand Down Expand Up @@ -1760,7 +1767,7 @@ namespace evf {

//local lock to force index json and EoLS files to appear in order
if (fileBrokerUseLocalLock_)
lockFULocal2();
lockFULocal();

int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
bool rawHeader = false;
Expand All @@ -1770,22 +1777,21 @@ namespace evf {
if (serverError) {
//do not update anything
if (fileBrokerUseLocalLock_)
unlockFULocal2();
unlockFULocal();
return noFile;
}

//handle creation of EoLS and BoLS files if lumisection has changed
//handle creation of BoLS files if lumisection has changed
if (currentLumiSection == 0) {
if (fileStatus == runEnded) {
createLumiSectionFiles(closedServerLS, 0);
createLumiSectionFiles(serverLS, closedServerLS, false); // +1
} else
createLumiSectionFiles(serverLS, 0);
if (fileStatus == runEnded)
createLumiSectionFiles(closedServerLS, 0, true, false);
else
createLumiSectionFiles(serverLS, 0, true, false);
} else {
//loop over and create any EoLS files missing
if (closedServerLS >= currentLumiSection) {
//only BoLS files
for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
createLumiSectionFiles(i + 1, i);
createLumiSectionFiles(i + 1, i, true, false);
}
}

Expand All @@ -1803,6 +1809,11 @@ namespace evf {
close(rawFd);
rawFd = -1;
}

//can unlock because all files have been created locally
if (fileBrokerUseLocalLock_)
unlockFULocal();

if (!fileFound) {
//catch condition where directory got deleted
fileStatus = noFile;
Expand All @@ -1813,9 +1824,26 @@ namespace evf {
}
}

//can unlock because all files have been created locally
if (fileBrokerUseLocalLock_)
//handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
//so that EoLS files can not appear locally before index files
if (currentLumiSection == 0) {
lockFULocal2();
if (fileStatus == runEnded) {
createLumiSectionFiles(closedServerLS, 0, false, true);
createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
} else {
createLumiSectionFiles(serverLS, 0, false, true);
}
unlockFULocal2();
} else {
if (closedServerLS >= currentLumiSection) {
//lock exclusive to create EoLS files
lockFULocal2();
for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
createLumiSectionFiles(i + 1, i, false, true);
unlockFULocal2();
}
}

if (fileStatus == runEnded)
ls = std::max(currentLumiSection, serverLS);
Expand Down
14 changes: 10 additions & 4 deletions EventFilter/Utilities/src/FedRawDataInputSource.cc
Expand Up @@ -613,7 +613,8 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
setMonState(inReadEvent);
std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
bool tcdsInRange;
edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);

if (useL1EventID_) {
eventID_ = edm::EventID(eventRunNumber_, currentLumiSection_, L1EventID_);
Expand All @@ -639,7 +640,8 @@ void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
event_->isRealData(),
static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
processGUID(),
!fileListLoopMode_);
!fileListLoopMode_,
!tcdsInRange);
aux.setProcessHistoryID(processHistoryID_);
makeEvent(eventPrincipal, aux);
}
Expand Down Expand Up @@ -684,7 +686,7 @@ void FedRawDataInputSource::read(edm::EventPrincipal& eventPrincipal) {
return;
}

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData) {
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange) {
edm::TimeValue_t time;
timeval stv;
gettimeofday(&stv, nullptr);
Expand All @@ -696,6 +698,7 @@ edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollect
unsigned char* event = (unsigned char*)event_->payload();
GTPEventID_ = 0;
tcds_pointer_ = nullptr;
tcdsInRange = false;
uint16_t selectedTCDSFed = 0;
while (eventSize > 0) {
assert(eventSize >= FEDTrailer::length);
Expand All @@ -712,6 +715,9 @@ edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(FEDRawDataCollect
if (!selectedTCDSFed) {
selectedTCDSFed = fedId;
tcds_pointer_ = event + eventSize;
if (fedId >= FEDNumbering::MINTCDSuTCAFEDID && fedId <= FEDNumbering::MAXTCDSuTCAFEDID) {
tcdsInRange = true;
}
} else
throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
<< "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
Expand Down Expand Up @@ -810,7 +816,6 @@ void FedRawDataInputSource::readSupervisor() {
if (fms_) {
setMonStateSup(inSupBusy);
fms_->startedLookingForFile();
setMonStateSup(inSupLockPolling);
}

evf::EvFDaqDirector::FileStatus status = evf::EvFDaqDirector::noFile;
Expand All @@ -830,6 +835,7 @@ void FedRawDataInputSource::readSupervisor() {

assert(rawFd == -1);
uint64_t thisLockWaitTimeUs = 0.;
setMonStateSup(inSupLockPolling);
if (fileListMode_) {
//return LS if LS not set, otherwise return file
status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/test/LocalRunBUFU.sh
Expand Up @@ -19,7 +19,7 @@ RC=0
P=$$
PREFIX=results_${USER}${P}
OUTDIR=${LOCAL_TMP_DIR}/${PREFIX}

echo "Output will be temporarily written to ${OUTDIR}"

mkdir ${OUTDIR}
cp ${SCRIPTDIR}/startBU.py ${OUTDIR}
Expand Down
1 change: 1 addition & 0 deletions EventFilter/Utilities/test/startBU.py
Expand Up @@ -121,6 +121,7 @@
defaultQualifier = cms.untracked.int32(0))

process.s = cms.EDProducer("DaqFakeReader",
fillRandom = cms.untracked.bool(True),
meanSize = cms.untracked.uint32(options.fedMeanSize),
width = cms.untracked.uint32(int(math.ceil(options.fedMeanSize/2.))),
tcdsFEDID = cms.untracked.uint32(1024),
Expand Down