From 65879e88605218c3d68bd6bdb61bb298db5b8a8d Mon Sep 17 00:00:00 2001 From: shahoian Date: Sat, 16 May 2026 22:13:07 +0200 Subject: [PATCH 1/4] Repair raw tf part counters / headers Can be disabled by --ignore-repair-headers. --- .../include/TFReaderDD/SubTimeFrameFile.h | 18 +++++++++-- .../TFReaderDD/SubTimeFrameFileReader.h | 2 +- .../TFReaderDD/src/SubTimeFrameFileReader.cxx | 32 +++++++++++++++++-- Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx | 5 ++- Detectors/Raw/TFReaderDD/src/TFReaderSpec.h | 1 + .../Raw/TFReaderDD/src/tf-reader-workflow.cxx | 2 +- 6 files changed, 53 insertions(+), 7 deletions(-) diff --git a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFile.h b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFile.h index 340027642b74c..eeabf8e8d4117 100644 --- a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFile.h +++ b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFile.h @@ -21,6 +21,8 @@ #include #include +#include "Framework/DataSpecUtils.h" +#include "Framework/OutputSpec.h" #include "Framework/Logger.h" namespace o2 @@ -151,13 +153,13 @@ struct SubTimeFrameFileMeta { /// std::uint64_t mWriteTimeMs; - auto getTimePoint() + auto getTimePoint() const { using namespace std::chrono; return time_point{milliseconds{mWriteTimeMs}}; } - std::string getTimeString() + std::string getTimeString() const { using namespace std::chrono; std::time_t lTime = system_clock::to_time_t(getTimePoint()); @@ -167,6 +169,11 @@ struct SubTimeFrameFileMeta { return lTimeStream.str(); } + const std::string info() const + { + return fmt::format("Size in file: {} Time: {} Version: {}", mStfSizeInFile, getTimeString(), mStfFileVersion); + } + SubTimeFrameFileMeta(const std::uint64_t pStfSize) : SubTimeFrameFileMeta() { @@ -220,6 +227,11 @@ struct SubTimeFrameFileDataIndex { static_assert(sizeof(DataIndexElem) == 48, "DataIndexElem changed -> Binary compatibility is lost!"); } + + const std::string info() const + { + return fmt::format("DH: {} Cnt:{} Size:{} Offset:{}", o2::framework::DataSpecUtils::describe(o2::framework::OutputSpec{mDataOrigin, mDataDescription, mSubSpecification}), mDataBlockCnt, mSize, mOffset); + } }; SubTimeFrameFileDataIndex() = default; @@ -240,6 +252,8 @@ struct SubTimeFrameFileDataIndex { return sizeof(o2::header::DataHeader) + (sizeof(DataIndexElem) * mDataIndex.size()); } + const std::vector& getDataIndex() const { return mDataIndex; } + friend std::ostream& operator<<(std::ostream& pStream, const SubTimeFrameFileDataIndex& pIndex); private: diff --git a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h index 3b926e0a79206..7fba597252ba8 100644 --- a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h +++ b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h @@ -50,7 +50,7 @@ class SubTimeFrameFileReader ~SubTimeFrameFileReader(); /// Read a single TF from the file - std::unique_ptr read(fair::mq::Device* device, const std::vector& outputRoutes, const std::string& rawChannel, size_t slice, bool sup0xccdb, int verbosity); + std::unique_ptr read(fair::mq::Device* device, const std::vector& outputRoutes, const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repairHeaders, int verbosity); /// Tell the current position of the file inline std::uint64_t position() const { return mFileMapOffset; } diff --git a/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx b/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx index 5f862dffe512f..c0de91d623c3a 100644 --- a/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx +++ b/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx @@ -184,7 +184,7 @@ std::uint64_t sCreationTime = 0; std::mutex stfMtx; std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector& outputRoutes, - const std::string& rawChannel, size_t slice, bool sup0xccdb, int verbosity) + const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repaireHeaders, int verbosity) { std::unique_ptr messagesPerRoute = std::make_unique(); auto& msgMap = *messagesPerRoute.get(); @@ -252,10 +252,15 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* return nullptr; } lStfMetaDataHdr = o2::header::DataHeader::Get(lMetaHdrStack.first()); - LOGP(debug, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta)); + if (verbosity > 0) { + LOGP(info, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta)); + } if (!read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) { return nullptr; } + if (verbosity > 0) { + LOGP(info, "TFMeta : {}", lStfFileMeta.info()); + } if (lStfFileMeta.mWriteTimeMs == 0 && creationFallBack != 0) { if (!creation0Notified) { creation0Notified = true; @@ -319,6 +324,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* std::int64_t lLeftToRead = lStfDataSize; STFHeader stfHeader{tfID, -1u, -1u}; + DataHeader prevHeader; // read pairs while (lLeftToRead > 0) { // allocate and read the Headers @@ -335,6 +341,28 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* return nullptr; } DataHeader locDataHeader(*lDataHeader); + + if (repaireHeaders) { + auto descHeader = [&locDataHeader]() { + return o2f::DataSpecUtils::describe(o2f::OutputSpec{locDataHeader.dataOrigin, locDataHeader.dataDescription, locDataHeader.subSpecification}); + }; + if (locDataHeader == prevHeader) { + if (prevHeader.tfCounter == locDataHeader.tfCounter && (prevHeader.splitPayloadIndex + 1) != locDataHeader.splitPayloadIndex) { + if (verbosity > 3) { + LOGP(warn, "Repairing wrong index {}/{} to {} for {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts, descHeader()); + } + locDataHeader.splitPayloadIndex = (++prevHeader.splitPayloadIndex) % prevHeader.splitPayloadParts; + } + } else { // new header + if (locDataHeader.splitPayloadIndex != 0) { + if (verbosity > 2) { + LOGP(warn, "Repairing wrong index {}/{} to 0 for new {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, descHeader()); + } + locDataHeader.splitPayloadIndex = 0; + } + } + prevHeader = locDataHeader; + } // sanity check if (int(locDataHeader.firstTForbit) == -1) { if (!negativeOrbitNotified) { diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx index 919e76083f595..f94edf8540ac8 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx @@ -118,6 +118,8 @@ void TFReaderSpec::init(o2f::InitContext& ic) mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff; mInput.maxTFCache = std::max(1, ic.options().get("max-cached-tf")); mInput.maxFileCache = std::max(1, ic.options().get("max-cached-files")); + mInput.repairHeaders = !ic.options().get("ignore-repair-headers"); + if (!mInput.fileRunTimeSpans.empty()) { loadRunTimeSpans(mInput.fileRunTimeSpans); } @@ -421,7 +423,7 @@ void TFReaderSpec::TFBuilder() std::this_thread::sleep_for(sleepTime); continue; } - auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.verbosity); + auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.repairHeaders, mInput.verbosity); bool acceptTF = true; if (tf) { if (mRunTimeRanges.size()) { @@ -675,6 +677,7 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp) } spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}}); + spec.options.emplace_back(o2f::ConfigParamSpec{"ignore-repair-headers", o2f::VariantType::Bool, false, {"do not check/repair headers"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}}); diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h index 2c1c62ecbb414..57dd283d9a1d5 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h @@ -49,6 +49,7 @@ struct TFReaderInp { bool sendDummyForMissing = true; bool sup0xccdb = false; bool invertIRFramesSelection = false; + bool repairHeaders = true; std::vector hdVec; std::vector tfIDs{}; }; diff --git a/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx b/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx index b424353531de7..a29b4dadfdb25 100644 --- a/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx +++ b/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx @@ -34,7 +34,7 @@ void customize(std::vector& workflowOptions) options.push_back(ConfigParamSpec{"copy-dir", VariantType::String, "/tmp/", {"copy base directory for remote files"}}); options.push_back(ConfigParamSpec{"tf-file-regex", VariantType::String, ".+\\.tf$", {"regex string to identify TF files"}}); options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access - options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH)"}}); + options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH), report repairs"}}); options.push_back(ConfigParamSpec{"raw-channel-config", VariantType::String, "", {"optional raw FMQ channel for non-DPL output"}}); options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}}); options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}}); From bd17148419b8bd397125b009bd2cacaee895f6bf Mon Sep 17 00:00:00 2001 From: shahoian Date: Sun, 17 May 2026 02:06:05 +0200 Subject: [PATCH 2/4] By default do not store FLP/DISTSUBTIMEFRAME in the rawTF --- Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx b/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx index 64c39fa7ef75a..513edf236b0fe 100644 --- a/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx +++ b/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx @@ -97,6 +97,7 @@ class RawTFDump : public Task bool mCreateRunEnvDir = true; bool mAcceptCurrentTF = false; bool mRejectDEADBEEF = false; + bool mRejectDistSTF = true; int mVerbose = 0; std::vector mTFOrbits{}; // 1st orbits of TF accumulated in current file o2::framework::DataTakingContext mDataTakingContext{}; @@ -185,6 +186,7 @@ void RawTFDump::init(InitContext& ic) mWriteTF = false; mStoreMetaFile = false; } + mRejectDistSTF = !ic.options().get("include-dist-stf"); mRejectDEADBEEF = !ic.options().get("include-deadbeef"); mCreateRunEnvDir = !ic.options().get("ignore-partition-run-dir"); mMinFileSize = ic.options().get("min-file-size"); @@ -520,6 +522,9 @@ void RawTFDump::prepareTFForWriting(ProcessingContext& pc) if (dh->subSpecification == 0xdeadbeef && mRejectDEADBEEF) { continue; } + if (dh->dataOrigin == o2::header::gDataOriginFLP && dh->dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF) { + continue; + } const auto lHdrDataSize = sizeof(DataHeader) + dh->payloadSize; mTFSize += lHdrDataSize; @@ -577,6 +582,7 @@ DataProcessorSpec getRawTFDumpSpec(const std::string& inpconfig, const std::stri AlgorithmSpec{adaptFromTask(trigger)}, Options{ {"include-deadbeef", VariantType::Bool, false, {"Include DPL-generated 0xdeadbeef subspecs for missing data"}}, + {"include-dist-stf", VariantType::Bool, false, {"Include FLP/DISTSUBTIMEFRAME input"}}, {"exclude-trigger-specs", VariantType::String, "", {"Ignore trigger seen in these inputs of triggerspec"}}, {"max-dump-rate", VariantType::Float, 0.f, {"%-age of TFs to dump. W/o external trigger: random(>0) or periodic(<0) rejection, with: max limit"}}, {"rate-est-conf-limit", VariantType::Float, 0.05f, {"quantile for the lowest rate estimate confidence limit"}}, From 63c42d3ee7b5e7909d0d0d59adf0bbe1dc561db9 Mon Sep 17 00:00:00 2001 From: shahoian Date: Sun, 17 May 2026 19:45:36 +0200 Subject: [PATCH 3/4] Fix payloadIndex, ignore writing DistSTF by default, extra verbose output --- .../Raw/TFReaderDD/src/RawTFDumpSpec.cxx | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx b/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx index 513edf236b0fe..03bd26ae0deb9 100644 --- a/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx +++ b/Detectors/Raw/TFReaderDD/src/RawTFDumpSpec.cxx @@ -255,6 +255,7 @@ void RawTFDump::run(ProcessingContext& pc) try { size_t lTFSizeInFile = getTFSizeInFile(); SubTimeFrameFileMeta lTFFileMeta(lTFSizeInFile); + lTFFileMeta.mWriteTimeMs = mTimingInfo.creation; mFile << lTFFileMeta; // Write DataHeader + SubTimeFrameFileMeta mFile << mTFDataIndex; // Write DataHeader + SubTimeFrameFileDataIndex @@ -265,6 +266,10 @@ void RawTFDump::run(ProcessingContext& pc) const auto& dataPtr = mTFData[lEntry + part]; DataHeader hdToWrite = *reinterpret_cast(dataPtr.first); // make a local DataHeader copy to clear flagsNextHeader bit hdToWrite.flagsNextHeader = 0; + hdToWrite.splitPayloadIndex = part; + if (mVerbose > 2) { + LOGP(info, "Writing part:{}/{} of {} | TFCounter:{} part{}/{}", part, lCnt, DataSpecUtils::describe(OutputSpec{hdToWrite.dataOrigin, hdToWrite.dataDescription, hdToWrite.subSpecification}), hdToWrite.firstTForbit, hdToWrite.splitPayloadIndex, hdToWrite.splitPayloadParts); + } buffered_write(reinterpret_cast(&hdToWrite), sizeof(DataHeader)); buffered_write(dataPtr.second, hdToWrite.payloadSize); } @@ -519,10 +524,11 @@ void RawTFDump::prepareTFForWriting(ProcessingContext& pc) LOGP(error, "Failed to extract header"); continue; } - if (dh->subSpecification == 0xdeadbeef && mRejectDEADBEEF) { - continue; - } - if (dh->dataOrigin == o2::header::gDataOriginFLP && dh->dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF) { + if ((dh->subSpecification == 0xdeadbeef && mRejectDEADBEEF) || + (dh->dataOrigin == o2::header::gDataOriginFLP && dh->dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF)) { + if (mVerbose > 2) { + LOGP(info, "Rejecting {}", DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification})); + } continue; } const auto lHdrDataSize = sizeof(DataHeader) + dh->payloadSize; @@ -536,9 +542,10 @@ void RawTFDump::prepareTFForWriting(ProcessingContext& pc) lCnt++; mTFData.push_back({ref.header, ref.payload}); if (mVerbose > 2) { - LOGP(info, "{}, part: {} of {}, payload {}, 1stTFOrbit: {} TF: {}", + const auto* dph = DataRefUtils::getHeader(ref); + LOGP(info, "{}, part: {} of {}, payload {}, 1stTFOrbit: {} TF: {}, creation: {} | counter:{} size:{} entry:{}", DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}), - dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit, dh->tfCounter); + dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit, dh->tfCounter, dph ? dph->creation : -1UL, lCnt, lSize, lEntry); } } @@ -553,7 +560,7 @@ void RawTFDump::prepareTFForWriting(ProcessingContext& pc) OutputSpec spec{eq.mDataOrigin, eq.mDataDescription, eq.mSubSpecification}; if (mVerbose > 1) { - LOGP(info, "{} : {} parts of size {} | offset: {}", DataSpecUtils::describe(spec), lCnt, lSize, lCurrOff); + LOGP(info, "{} : {} parts of size {} entry {}| offset: {}", DataSpecUtils::describe(spec), lCnt, lSize, lEntry, lCurrOff); } mTFDataIndex.AddStfElement(eq, lCnt, lCurrOff, lSize); lCurrOff += lSize; From dcc0b555c23f8f7a4cada3fd1ba28e8a52f093bf Mon Sep 17 00:00:00 2001 From: shahoian Date: Sun, 17 May 2026 19:47:00 +0200 Subject: [PATCH 4/4] Ignore stored DistSTF by default --- .../TFReaderDD/SubTimeFrameFileReader.h | 11 +++- .../TFReaderDD/src/SubTimeFrameFileReader.cxx | 63 ++++++++++++------- Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx | 10 ++- Detectors/Raw/TFReaderDD/src/TFReaderSpec.h | 1 + 4 files changed, 60 insertions(+), 25 deletions(-) diff --git a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h index 7fba597252ba8..2b7d2b7ab8e74 100644 --- a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h +++ b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h @@ -46,11 +46,11 @@ class SubTimeFrameFileReader public: SubTimeFrameFileReader() = delete; - SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask); + SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask, int verb, bool sup0xccdb, bool repaireHeaders, bool rejectDistSTF); ~SubTimeFrameFileReader(); /// Read a single TF from the file - std::unique_ptr read(fair::mq::Device* device, const std::vector& outputRoutes, const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repairHeaders, int verbosity); + std::unique_ptr read(fair::mq::Device* device, const std::vector& outputRoutes, const std::string& rawChannel, size_t slice); /// Tell the current position of the file inline std::uint64_t position() const { return mFileMapOffset; } @@ -76,6 +76,13 @@ class SubTimeFrameFileReader std::uint64_t mFileMapOffset = 0; std::uint64_t mFileSize = 0; + int mVerbosity = 0; + bool mSup0xccdb = true; + bool mRepaireHeaders = true; + bool mRejectDistSTF = true; + + const std::string describeHeader(const o2::header::DataHeader& hd, bool full = false) const; + // helper to make sure written chunks are buffered, only allow pointers template ::value>> diff --git a/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx b/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx index c0de91d623c3a..c8bc6ff374ead 100644 --- a/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx +++ b/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx @@ -45,8 +45,8 @@ namespace o2f = o2::framework; /// SubTimeFrameFileReader //////////////////////////////////////////////////////////////////////////////// -SubTimeFrameFileReader::SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask) - : mFileName(pFileName) +SubTimeFrameFileReader::SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask, int verb, bool sup0xccdb, bool repaireHeaders, bool rejectDistSTF) + : mFileName(pFileName), mVerbosity(verb), mSup0xccdb(sup0xccdb), mRepaireHeaders(repaireHeaders), mRejectDistSTF(rejectDistSTF) { mFileMap.open(mFileName); if (!mFileMap.is_open()) { @@ -178,13 +178,21 @@ Stack SubTimeFrameFileReader::getHeaderStack(std::size_t& pOrigsize) return Stack(lStackMem); } +const std::string SubTimeFrameFileReader::describeHeader(const o2::header::DataHeader& hd, bool full) const +{ + std::string res = fmt::format("{}", o2f::DataSpecUtils::describe(o2::framework::OutputSpec{hd.dataOrigin, hd.dataDescription, hd.subSpecification})); + if (full) { + res += fmt::format(" part:{}/{} sz:{} TF:{} Orb:{} Run:{}", hd.splitPayloadIndex, hd.splitPayloadParts, hd.payloadSize, hd.tfCounter, hd.firstTForbit, hd.runNumber); + } + return res; +} + std::uint32_t sRunNumber = 0; // TODO: add id to files metadata std::uint32_t sFirstTForbit = 0; // TODO: add id to files metadata std::uint64_t sCreationTime = 0; std::mutex stfMtx; -std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector& outputRoutes, - const std::string& rawChannel, size_t slice, bool sup0xccdb, bool repaireHeaders, int verbosity) +std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector& outputRoutes, const std::string& rawChannel, size_t slice) { std::unique_ptr messagesPerRoute = std::make_unique(); auto& msgMap = *messagesPerRoute.get(); @@ -252,13 +260,13 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* return nullptr; } lStfMetaDataHdr = o2::header::DataHeader::Get(lMetaHdrStack.first()); - if (verbosity > 0) { + if (mVerbosity > 0) { LOGP(info, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta)); } if (!read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) { return nullptr; } - if (verbosity > 0) { + if (mVerbosity > 0) { LOGP(info, "TFMeta : {}", lStfFileMeta.info()); } if (lStfFileMeta.mWriteTimeMs == 0 && creationFallBack != 0) { @@ -342,21 +350,18 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* } DataHeader locDataHeader(*lDataHeader); - if (repaireHeaders) { - auto descHeader = [&locDataHeader]() { - return o2f::DataSpecUtils::describe(o2f::OutputSpec{locDataHeader.dataOrigin, locDataHeader.dataDescription, locDataHeader.subSpecification}); - }; + if (mRepaireHeaders) { if (locDataHeader == prevHeader) { if (prevHeader.tfCounter == locDataHeader.tfCounter && (prevHeader.splitPayloadIndex + 1) != locDataHeader.splitPayloadIndex) { - if (verbosity > 3) { - LOGP(warn, "Repairing wrong index {}/{} to {} for {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts, descHeader()); + if (mVerbosity > 3) { + LOGP(warn, "Repairing wrong part index for {} to {}", describeHeader(locDataHeader, true), (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts); } locDataHeader.splitPayloadIndex = (++prevHeader.splitPayloadIndex) % prevHeader.splitPayloadParts; } } else { // new header if (locDataHeader.splitPayloadIndex != 0) { - if (verbosity > 2) { - LOGP(warn, "Repairing wrong index {}/{} to 0 for new {}", locDataHeader.splitPayloadIndex, locDataHeader.splitPayloadParts, descHeader()); + if (mVerbosity > 2) { + LOGP(warn, "Repairing wrong part index for new {} to {}", describeHeader(locDataHeader, true), (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts); } locDataHeader.splitPayloadIndex = 0; } @@ -378,6 +383,18 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* } locDataHeader.runNumber = runNumberFallBack; } + const std::uint64_t lDataSize = locDataHeader.payloadSize; + + if (locDataHeader.dataOrigin == o2::header::gDataOriginFLP && locDataHeader.dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF) { + if (mVerbosity > 0) { + LOGP(warn, "Ignoring stored {}", describeHeader(locDataHeader)); + } + if (!ignore_nbytes(lDataSize)) { + return nullptr; + } + lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter + continue; + } o2::header::Stack headerStack{locDataHeader, o2f::DataProcessingHeader{tfID, 1, lStfFileMeta.mWriteTimeMs}}; if (stfHeader.runNumber == -1) { stfHeader.id = locDataHeader.tfCounter; @@ -387,8 +404,6 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* sRunNumber = stfHeader.runNumber; sFirstTForbit = stfHeader.firstOrbit; } - - const std::uint64_t lDataSize = locDataHeader.payloadSize; // do we accept these data? auto detOrigStatus = mDetOrigMap.find(locDataHeader.dataOrigin); if (detOrigStatus != mDetOrigMap.end() && !detOrigStatus->second) { // this is a detector data and we don't want to read it @@ -431,10 +446,10 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* if (!read_advance(lDataMsg->GetData(), lDataSize)) { return nullptr; } - if (verbosity > 0) { - if (verbosity > 1 || locDataHeader.splitPayloadIndex == 0) { + if (mVerbosity > 0) { + if (mVerbosity > 1 || locDataHeader.splitPayloadIndex == 0) { printStack(headerStack); - if (o2::raw::RDHUtils::checkRDH(lDataMsg->GetData()) && verbosity > 2) { + if (o2::raw::RDHUtils::checkRDH(lDataMsg->GetData()) && mVerbosity > 2) { o2::raw::RDHUtils::printRDH(lDataMsg->GetData()); } } @@ -442,6 +457,9 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* #ifdef _RUN_TIMING_MEASUREMENT_ addPartSW.Start(false); #endif + if (mVerbosity > 2) { + LOGP(info, "addPart {} to {} | HdrSize:{} DataSize:{}", describeHeader(locDataHeader, true), fmqChannel, lHdrStackMsg->GetSize(), lDataMsg->GetSize()); + } addPart(std::move(lHdrStackMsg), std::move(lDataMsg), fmqChannel); #ifdef _RUN_TIMING_MEASUREMENT_ addPartSW.Stop(); @@ -463,7 +481,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* } unsigned stfSS[2] = {0, 0xccdb}; - for (int iss = 0; iss < (sup0xccdb ? 1 : 2); iss++) { + for (int iss = 0; iss < (mSup0xccdb ? 1 : 2); iss++) { o2::header::DataHeader stfDistDataHeader(o2::header::gDataDescriptionDISTSTF, o2::header::gDataOriginFLP, stfSS[iss], sizeof(STFHeader), 0, 1); stfDistDataHeader.payloadSerializationMethod = o2::header::gSerializationMethodNone; stfDistDataHeader.firstTForbit = stfHeader.firstOrbit; @@ -473,7 +491,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* if (!fmqChannel.empty()) { // no output channel auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport(); o2::header::Stack headerStackSTF{stfDistDataHeader, o2f::DataProcessingHeader{tfID, 1, lStfFileMeta.mWriteTimeMs}}; - if (verbosity > 0) { + if (mVerbosity > 0) { printStack(headerStackSTF); } auto hdMessageSTF = fmqFactory->CreateMessage(headerStackSTF.size(), fair::mq::Alignment{64}); @@ -483,6 +501,9 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* #ifdef _RUN_TIMING_MEASUREMENT_ addPartSW.Start(false); #endif + if (mVerbosity > 2) { + LOGP(info, "addPart forced {} to {} | HdrSize:{} DataSize:{}", describeHeader(stfDistDataHeader, true), fmqChannel, hdMessageSTF->GetSize(), plMessageSTF->GetSize()); + } addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel); #ifdef _RUN_TIMING_MEASUREMENT_ addPartSW.Stop(); diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx index f94edf8540ac8..d0de5fb893e3d 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx @@ -119,6 +119,7 @@ void TFReaderSpec::init(o2f::InitContext& ic) mInput.maxTFCache = std::max(1, ic.options().get("max-cached-tf")); mInput.maxFileCache = std::max(1, ic.options().get("max-cached-files")); mInput.repairHeaders = !ic.options().get("ignore-repair-headers"); + mInput.rejectDistSTF = !ic.options().get("read-dist-stf"); if (!mInput.fileRunTimeSpans.empty()) { loadRunTimeSpans(mInput.fileRunTimeSpans); @@ -265,7 +266,11 @@ void TFReaderSpec::run(o2f::ProcessingContext& ctx) setTimingInfo(*tfPtr.get()); size_t nparts = 0, dataSize = 0; if (mInput.sendDummyForMissing) { + int cntAck = 0; for (auto& msgIt : *tfPtr.get()) { // complete with empty output for the specs which were requested but were not seen in the data + if (mInput.verbosity > 0) { + LOGP(info, "acknowledgeOutput {}", cntAck++); + } acknowledgeOutput(*msgIt.second.get(), true); } addMissingParts(*tfPtr.get()); @@ -411,7 +416,7 @@ void TFReaderSpec::TFBuilder() } LOG(info) << "Processing file " << tfFileName; - SubTimeFrameFileReader reader(tfFileName, mInput.detMask); + SubTimeFrameFileReader reader(tfFileName, mInput.detMask, mInput.verbosity, mInput.sup0xccdb, mInput.repairHeaders, mInput.rejectDistSTF); size_t locID = 0; // try { @@ -423,7 +428,7 @@ void TFReaderSpec::TFBuilder() std::this_thread::sleep_for(sleepTime); continue; } - auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter, mInput.sup0xccdb, mInput.repairHeaders, mInput.verbosity); + auto tf = reader.read(mDevice, mOutputRoutes, mInput.rawChannelConfig, mAccTFCounter); bool acceptTF = true; if (tf) { if (mRunTimeRanges.size()) { @@ -678,6 +683,7 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp) spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"ignore-repair-headers", o2f::VariantType::Bool, false, {"do not check/repair headers"}}); + spec.options.emplace_back(o2f::ConfigParamSpec{"read-dist-stf", o2f::VariantType::Bool, false, {"do not ignore stored FLP/DISTSUBTIMEFRAME (will clash with injected one)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}}); diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h index 57dd283d9a1d5..6ecce0d032c06 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h @@ -50,6 +50,7 @@ struct TFReaderInp { bool sup0xccdb = false; bool invertIRFramesSelection = false; bool repairHeaders = true; + bool rejectDistSTF = true; std::vector hdVec; std::vector tfIDs{}; };