Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 81 additions & 55 deletions Detectors/MUON/MCH/Workflow/src/cru-page-reader-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

///
///
/// \file cru-page-reader-workflow.cxx
/// \author Andrea Ferrero
Expand Down Expand Up @@ -62,6 +63,7 @@ class FileReaderTask
LOG(INFO) << "initializing file reader";
mFrameMax = ic.options().get<int>("nframes");
mPrint = ic.options().get<bool>("print");
mFullHBF = ic.options().get<bool>("full-hbf");

auto inputFileName = ic.options().get<std::string>("infile");
mInputFile.open(inputFileName, std::ios::binary);
Expand All @@ -82,80 +84,103 @@ class FileReaderTask
{
/// send one RDH block via DPL
RDH rdh;
char* buf{nullptr};
size_t bufSize{0};

// stop if the required number of frames has been reached
if (mFrameMax == 0) {
pc.services().get<ControlService>().endOfStream();
return;
}
while (true) {

if (mPrint) {
printf("mFrameMax: %d\n", mFrameMax);
}
if (mFrameMax > 0) {
mFrameMax -= 1;
}
// stop if the required number of frames has been reached
if (mFrameMax == 0) {
pc.services().get<ControlService>().endOfStream();
return;
}

// read the next RDH, stop if no more data is available
mInputFile.read((char*)(&rdh), sizeof(RDH));
if (mInputFile.fail()) {
if (mPrint) {
std::cout << "end of file reached" << std::endl;
printf("mFrameMax: %d\n", mFrameMax);
}
if (mFrameMax > 0) {
mFrameMax -= 1;
}
pc.services().get<ControlService>().endOfStream();
return; // probably reached eof
}

// check that the RDH version is ok (only RDH versions from 4 to 6 are supported at the moment)
auto rdhVersion = o2::raw::RDHUtils::getVersion(rdh);
auto rdhHeaderSize = o2::raw::RDHUtils::getHeaderSize(rdh);
if (mPrint) {
std::cout << "header_version=" << (int)rdhVersion << std::endl;
}
if (rdhVersion < 4 || rdhVersion > 6 || rdhHeaderSize != 64) {
return;
}
// read the next RDH, stop if no more data is available
mInputFile.read((char*)(&rdh), sizeof(RDH));
if (mInputFile.fail()) {
if (mPrint) {
std::cout << "end of file reached" << std::endl;
}
pc.services().get<ControlService>().endOfStream();
return; // probably reached eof
}

// get the frame size from the RDH offsetToNext field
auto frameSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
if (mPrint) {
std::cout << "frameSize=" << frameSize << std::endl;
}
// check that the RDH version is ok (only RDH versions from 4 to 6 are supported at the moment)
auto rdhVersion = o2::raw::RDHUtils::getVersion(rdh);
auto rdhHeaderSize = o2::raw::RDHUtils::getHeaderSize(rdh);
if (mPrint) {
std::cout << "header_version=" << (int)rdhVersion << std::endl;
}
if (rdhVersion < 4 || rdhVersion > 6 || rdhHeaderSize != 64) {
return;
}

// stop if the frame size is too small
if (frameSize < rdhHeaderSize) {
std::cout << mFrameMax << " - frameSize too small: " << frameSize << std::endl;
pc.services().get<ControlService>().endOfStream();
return;
}
// get the frame size from the RDH offsetToNext field
auto frameSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
if (mPrint) {
std::cout << "frameSize=" << frameSize << std::endl;
}

// allocate the output buffer
char* buf = (char*)malloc(frameSize);
// stop if the frame size is too small
if (frameSize < rdhHeaderSize) {
std::cout << mFrameMax << " - frameSize too small: " << frameSize << std::endl;
pc.services().get<ControlService>().endOfStream();
return;
}

// copy the RDH into the output buffer
memcpy(buf, &rdh, rdhHeaderSize);
// allocate the output buffer
buf = (char*)realloc(buf, bufSize + frameSize);
if (buf == nullptr) {
std::cout << mFrameMax << " - failed to allocate buffer" << std::endl;
pc.services().get<ControlService>().endOfStream();
return;
}

// read the frame payload into the output buffer
mInputFile.read(buf + rdhHeaderSize, frameSize - rdhHeaderSize);
// copy the RDH into the output buffer
memcpy(buf + bufSize, &rdh, rdhHeaderSize);

// stop if data cannot be read completely
if (mInputFile.fail()) {
if (mPrint) {
std::cout << "end of file reached" << std::endl;
// read the frame payload into the output buffer
mInputFile.read(buf + bufSize + rdhHeaderSize, frameSize - rdhHeaderSize);

// stop if data cannot be read completely
if (mInputFile.fail()) {
if (mPrint) {
std::cout << "end of file reached" << std::endl;
}
free(buf);
pc.services().get<ControlService>().endOfStream();
return; // probably reached eof
}
free(buf);
pc.services().get<ControlService>().endOfStream();
return; // probably reached eof
}

// create the output message
auto freefct = [](void* data, void* /*hint*/) { free(data); };
pc.outputs().adoptChunk(Output{"ROUT", "RAWDATA"}, buf, frameSize, freefct, nullptr);
// increment the total buffer size
bufSize += frameSize;

auto stopBit = o2::raw::RDHUtils::getStop(rdh);

// when requesting full HBframes, the output message is sent only when the stop RDH is reached
// otherwise we send one message for each CRU page
if ((stopBit != 0) || (mFullHBF == false)) {
// create the output message
auto freefct = [](void* data, void* /*hint*/) { free(data); };
pc.outputs().adoptChunk(Output{"ROUT", "RAWDATA"}, buf, bufSize, freefct, nullptr);

// stop the readout loop
break;
}
} // while (true)
}

private:
std::ifstream mInputFile{}; ///< input file
int mFrameMax; ///< number of frames to process
bool mFullHBF; ///< send full HeartBeat frames
bool mPrint = false; ///< print debug messages
};

Expand All @@ -170,6 +195,7 @@ o2::framework::DataProcessorSpec getFileReaderSpec()
AlgorithmSpec{adaptFromTask<FileReaderTask>()},
Options{{"infile", VariantType::String, "", {"input file name"}},
{"nframes", VariantType::Int, -1, {"number of frames to process"}},
{"full-hbf", VariantType::Bool, false, {"send full HeartBeat frames"}},
{"print", VariantType::Bool, false, {"verbose output"}}}};
}
// clang-format on
Expand Down