From 46e647595dce2a9d178ea3906ce05747d2a8b7b7 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse Date: Fri, 16 Jun 2017 15:27:53 +0200 Subject: [PATCH 1/2] Introduce PayloadMerger helper class This is now done by introducing a new generic class PayloadMerger which can be used to merge together multiple payloads into a given buffer, which can then be sent via O2. The PayloadMerger class can be configured w.r.t.: - Given a Payload, determine is the unique identifier for the class of equivalence it belongs to. - Given an id, determine wether or not the class of equivalence it represents is complete. - Given a payload, extract the parts you are interested from it. --- Utilities/DataFlow/CMakeLists.txt | 4 +- .../DataFlow/include/DataFlow/PayloadMerger.h | 123 ++++++++++++++++++ .../DataFlow/include/DataFlow/SubframeUtils.h | 45 +++++++ .../DataFlow/test/test_PayloadMerger01.cxx | 82 ++++++++++++ .../DataFlow/test/test_SubframeUtils01.cxx | 59 +++++++++ 5 files changed, 311 insertions(+), 2 deletions(-) create mode 100644 Utilities/DataFlow/include/DataFlow/PayloadMerger.h create mode 100644 Utilities/DataFlow/include/DataFlow/SubframeUtils.h create mode 100644 Utilities/DataFlow/test/test_PayloadMerger01.cxx create mode 100644 Utilities/DataFlow/test/test_SubframeUtils01.cxx diff --git a/Utilities/DataFlow/CMakeLists.txt b/Utilities/DataFlow/CMakeLists.txt index 0f891fcd1bd39..ea510647701b6 100644 --- a/Utilities/DataFlow/CMakeLists.txt +++ b/Utilities/DataFlow/CMakeLists.txt @@ -85,6 +85,8 @@ O2_GENERATE_EXECUTABLE( set(TEST_SRCS test/test_TimeframeParser.cxx + test/test_SubframeUtils01.cxx + test/test_PayloadMerger01.cxx ) O2_GENERATE_TESTS( @@ -94,5 +96,3 @@ O2_GENERATE_TESTS( O2_GENERATE_MAN(NAME TimeframeReaderDevice) O2_GENERATE_MAN(NAME TimeframeWriterDevice) - -target_compile_options(test_TimeframeParser PUBLIC -O0 -g) diff --git a/Utilities/DataFlow/include/DataFlow/PayloadMerger.h b/Utilities/DataFlow/include/DataFlow/PayloadMerger.h new file mode 100644 index 0000000000000..0e60b53c5ce6f --- /dev/null +++ b/Utilities/DataFlow/include/DataFlow/PayloadMerger.h @@ -0,0 +1,123 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See https://alice-o2.web.cern.ch/ for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef PAYLOAD_MERGER_H +#define PAYLOAD_MERGER_H + +#include +#include +#include +#include +#include + +#include + +namespace o2 { namespace dataflow { +/// Helper class that given a set of FairMQMessage, merges (part of) their +/// payload into a separate memory area. +/// +/// - Append multiple messages via the aggregate method +/// - Finalise buffer creation with the finalise call. +template +class PayloadMerger { + static int64_t fullPayloadExtractor(char **payload, + char *buffer, + size_t bufferSize) { + *payload = buffer; + return bufferSize; + } +public: + using MergeableId = ID; + using MessageMap = std::multimap>; + using PayloadExtractor = std::function; + using IdExtractor = std::function&)>; + using MergeCompletionCheker = std::function; + + /// Helper class to merge FairMQMessages sharing a user defined class of equivalence, + /// specified by @makeId. Completeness of the class of equivalence can be asserted by + /// the @checkIfComplete policy. It's also possible to specify a user defined way of + /// extracting the parts of the payload to be merged via the extractPayload method. + PayloadMerger(IdExtractor makeId, + MergeCompletionCheker checkIfComplete, + PayloadExtractor extractPayload = fullPayloadExtractor) + : + mMakeId{makeId}, + mCheckIfComplete{checkIfComplete}, + mExtractPayload{extractPayload} + { + } + + /// Aggregates @payload to all the ones with the same id. + /// @return the id extracted from the payload via the constructor + /// specified id policy (mMakeId callback). + MergeableId aggregate(std::unique_ptr &payload) { + auto id = mMakeId(payload); + mPartsMap.emplace(std::make_pair(id, std::move(payload))); + return id; + } + + /// This merges a set of messages sharing the same id @id to a unique buffer + /// @out, so that it can be either consumed or sent as a message itself. + /// The decision on whether the merge must happen is done by the constructor + /// specified policy mCheckIfComplete which can, for example, decide + /// to merge when a certain number of subparts are reached. + /// Merging at the moment requires an extra copy, but in principle this could + /// be easily extended to support scatter - gather. + size_t finalise(char **out, MergeableId &id) { + *out = nullptr; + if (mCheckIfComplete(id, mPartsMap) == false) { + return 0; + } + // If we are here, it means we can send the messages that belong + // to some predefined class of equivalence, identified by the MERGEABLE_ID, + // to the receiver. This is done by the following process: + // + // - Extract what we actually want to send (this might be data embedded inside the message itself) + // - Calculate the aggregate size of all the payloads. + // - Copy all the parts into a final payload + // - Create the header part + // - Create the payload part + // - Send + std::vector> parts; + + size_t sum = 0; + auto range = mPartsMap.equal_range(id); + for (auto hi = range.first, he = range.second; hi != he; ++hi) { + std::unique_ptr &payload = hi->second; + std::pair part; + part.second = mExtractPayload(&part.first, reinterpret_cast(payload->GetData()), payload->GetSize()); + parts.push_back(part); + sum += part.second; + } + + auto *payload = new char[sum](); + size_t offset = 0; + for (auto &part : parts) { + // Right now this does a copy. In principle this could be done with some sort of + // vectorized I/O + memcpy(payload + offset, part.first, part.second); + offset += part.second; + } + + mPartsMap.erase(id); + *out = payload; + return sum; + } + +private: + IdExtractor mMakeId; + MergeCompletionCheker mCheckIfComplete; + PayloadExtractor mExtractPayload; + + MessageMap mPartsMap; +}; +} /* dataflow */ +} /* o2 */ + +#endif // PAYLOAD_MERGER_H diff --git a/Utilities/DataFlow/include/DataFlow/SubframeUtils.h b/Utilities/DataFlow/include/DataFlow/SubframeUtils.h new file mode 100644 index 0000000000000..889dc4c158379 --- /dev/null +++ b/Utilities/DataFlow/include/DataFlow/SubframeUtils.h @@ -0,0 +1,45 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See https://alice-o2.web.cern.ch/ for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef DATAFLOW_SUBFRAMEUTILS_H +#define DATAFLOW_SUBFRAMEUTILS_H + +#include +#include +#include "Headers/HeartbeatFrame.h" + +namespace o2 { namespace dataflow { + +int64_t extractDetectorPayloadStrip(char **payload, char *buffer, size_t bufferSize) { + *payload = buffer + sizeof(o2::Header::HeartbeatHeader); + return bufferSize - sizeof(o2::Header::HeartbeatHeader) - sizeof(o2::Header::HeartbeatTrailer); +} + + +struct SubframeId { + size_t timeframeId; + size_t socketId; + + // operator needed for the equal_range algorithm/ multimap method + bool operator<(const SubframeId& rhs) const { + return std::tie(timeframeId, socketId) < std::tie(rhs.timeframeId, rhs.socketId); + } +}; + +SubframeId makeIdFromHeartbeatHeader(const Header::HeartbeatHeader &header, size_t socketId, size_t orbitsPerTimeframe) { + SubframeId id = { + .timeframeId = header.orbit / orbitsPerTimeframe, + .socketId = socketId + }; + return id; +} + +} /* namespace dataflow */ } /* namespace o2 */ + +#endif // DATAFLOW_SUBFRAMEUTILS_H diff --git a/Utilities/DataFlow/test/test_PayloadMerger01.cxx b/Utilities/DataFlow/test/test_PayloadMerger01.cxx new file mode 100644 index 0000000000000..cd951e1229e2d --- /dev/null +++ b/Utilities/DataFlow/test/test_PayloadMerger01.cxx @@ -0,0 +1,82 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See https://alice-o2.web.cern.ch/ for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#define BOOST_TEST_MODULE Test Utilities DataFlowTest +#define BOOST_TEST_MAIN +#define BOOST_TEST_DYN_LINK + +#include + +#include +#include +#include "DataFlow/PayloadMerger.h" +#include "DataFlow/FakeTimeframeBuilder.h" +#include "DataFlow/TimeframeParser.h" +#include "DataFlow/SubframeUtils.h" +#include "fairmq/FairMQTransportFactory.h" +#include "fairmq/FairMQParts.h" +#include "fairmq/FairMQDevice.h" + +using SubframeId = o2::dataflow::SubframeId; +using HeartbeatHeader = o2::Header::HeartbeatHeader; +using HeartbeatTrailer = o2::Header::HeartbeatTrailer; + +SubframeId fakeAddition(o2::dataflow::PayloadMerger &merger, + std::unique_ptr &transport, + int64_t orbit) { + // Create a message + // + // We set orbit to be always the same and the actual contents to be 127 + static size_t dummyMessageSize = 1000; + auto msg = transport->CreateMessage(dummyMessageSize); + char *b = reinterpret_cast(msg->GetData()) + sizeof(HeartbeatHeader); + for (size_t i = 0; i < (dummyMessageSize - sizeof(HeartbeatHeader)); ++i) { + b[i] = orbit; + } + b[0] = 127; + HeartbeatHeader *header = reinterpret_cast(msg->GetData()); + header->orbit = orbit; + return merger.aggregate(msg); +} + +BOOST_AUTO_TEST_CASE(PayloadMergerTest) { + std::unique_ptr zmq(FairMQDevice::MakeTransport("zeromq")); + + // Needs three subtimeframes to merge them + auto checkIfComplete = [](SubframeId id, o2::dataflow::PayloadMerger::MessageMap &m) -> bool { + return m.count(id) >= 3; + }; + + // Id is given by the orbit, 2 orbits per timeframe + auto makeId = [](std::unique_ptr &msg) { + auto header = reinterpret_cast(msg->GetData()); + return o2::dataflow::makeIdFromHeartbeatHeader(*header, 0, 2); + }; + + o2::dataflow::PayloadMerger merger(makeId, checkIfComplete, o2::dataflow::extractDetectorPayloadStrip); + char *finalBuf = new char[3000]; + size_t finalSize = 0; + auto id = fakeAddition(merger, zmq, 1); + finalSize = merger.finalise(&finalBuf, id); + BOOST_CHECK(finalSize == 0); // Not enough parts, not merging yet. + id = fakeAddition(merger, zmq, 1); + finalSize = merger.finalise(&finalBuf, id); + BOOST_CHECK(finalSize == 0); // Not enough parts, not merging yet. + id = fakeAddition(merger, zmq, 2); + finalSize = merger.finalise(&finalBuf, id); + BOOST_CHECK(finalSize == 0); // Different ID, not merging yet. + id = fakeAddition(merger, zmq, 1); + finalSize = merger.finalise(&finalBuf, id); + BOOST_CHECK(finalSize); // Now we merge! + size_t partSize = (1000-sizeof(HeartbeatHeader) - sizeof(HeartbeatTrailer)); + BOOST_CHECK(finalSize == 3*partSize); // This should be the calculated size + for (size_t i = 0; i < finalSize; ++i) { + BOOST_CHECK(finalBuf[i] == ((i % partSize) == 0 ? 127 : 1)); + } +} diff --git a/Utilities/DataFlow/test/test_SubframeUtils01.cxx b/Utilities/DataFlow/test/test_SubframeUtils01.cxx new file mode 100644 index 0000000000000..30b4065592e4a --- /dev/null +++ b/Utilities/DataFlow/test/test_SubframeUtils01.cxx @@ -0,0 +1,59 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See https://alice-o2.web.cern.ch/ for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#define BOOST_TEST_MODULE Test Utilities DataFlowTest +#define BOOST_TEST_MAIN +#define BOOST_TEST_DYN_LINK + +#include "DataFlow/SubframeUtils.h" +#include +#include + +BOOST_AUTO_TEST_CASE(SubframeUtils01) { + o2::dataflow::SubframeId a; + a.timeframeId = 0; + a.socketId = 1; + o2::dataflow::SubframeId b; + b.timeframeId = 1; + b.socketId = 0; + BOOST_CHECK(a < b); + char *buf = new char[1000]; + memset(buf, 126, 1000); + for (size_t i = sizeof(o2::Header::HeartbeatHeader); i < 1000 - sizeof(o2::Header::HeartbeatHeader); ++i) + { + buf[i] = 0; + } + BOOST_CHECK(buf[0] == 126); + BOOST_CHECK(buf[sizeof(o2::Header::HeartbeatHeader)] == 0); + BOOST_CHECK(buf[sizeof(o2::Header::HeartbeatHeader)-1] == 126); + char *realPayload = nullptr; + size_t realSize = o2::dataflow::extractDetectorPayloadStrip(&realPayload, buf, 1000); + BOOST_CHECK(realPayload != nullptr); + BOOST_CHECK(realSize == 1000 - sizeof(o2::Header::HeartbeatHeader) - sizeof(o2::Header::HeartbeatTrailer)); + BOOST_CHECK(realPayload == buf + sizeof(o2::Header::HeartbeatHeader)); + BOOST_CHECK(realPayload[0] == 0); + + o2::Header::HeartbeatHeader header1; + header1.orbit = 0; + o2::Header::HeartbeatHeader header2; + header2.orbit = 255; + o2::Header::HeartbeatHeader header3; + header3.orbit = 256; + + auto id1 = o2::dataflow::makeIdFromHeartbeatHeader(header1, 1, 256); + auto id2 = o2::dataflow::makeIdFromHeartbeatHeader(header2, 1, 256); + auto id3 = o2::dataflow::makeIdFromHeartbeatHeader(header3, 1, 256); + BOOST_CHECK(!(id1 < id2)); // Maybe we should provide an == operator + BOOST_CHECK(!(id2 < id1)); + BOOST_CHECK(id1 < id3); + BOOST_CHECK(id2 < id3); + BOOST_CHECK(id1.timeframeId == 0); + BOOST_CHECK(id3.timeframeId == 1); +} From d0b798317997af99f115388521a4ff1f702155ae Mon Sep 17 00:00:00 2001 From: Giulio Eulisse Date: Fri, 16 Jun 2017 15:21:50 +0200 Subject: [PATCH 2/2] Adapt SubframeBuilderDevice to use PayloadMerger --- Utilities/DataFlow/CMakeLists.txt | 2 + .../DataFlow/doc/SubframeBuilderDevice.1.in | 48 ++++++ .../DataFlow/include/DataFlow/PayloadMerger.h | 13 +- .../include/DataFlow/SubframeBuilderDevice.h | 29 ++-- .../DataFlow/src/SubframeBuilderDevice.cxx | 140 ++++++++---------- .../DataFlow/src/runSubframeBuilderDevice.cxx | 23 ++- doc/o2.1.in | 2 +- 7 files changed, 157 insertions(+), 100 deletions(-) create mode 100644 Utilities/DataFlow/doc/SubframeBuilderDevice.1.in diff --git a/Utilities/DataFlow/CMakeLists.txt b/Utilities/DataFlow/CMakeLists.txt index ea510647701b6..b3074a9f848d2 100644 --- a/Utilities/DataFlow/CMakeLists.txt +++ b/Utilities/DataFlow/CMakeLists.txt @@ -76,6 +76,7 @@ ForEach (_file RANGE 0 ${_length}) ) EndForEach (_file RANGE 0 ${_length}) + O2_GENERATE_EXECUTABLE( EXE_NAME TimeframeValidationTool SOURCES src/TimeframeValidationTool @@ -96,3 +97,4 @@ O2_GENERATE_TESTS( O2_GENERATE_MAN(NAME TimeframeReaderDevice) O2_GENERATE_MAN(NAME TimeframeWriterDevice) +O2_GENERATE_MAN(NAME SubframeBuilderDevice) diff --git a/Utilities/DataFlow/doc/SubframeBuilderDevice.1.in b/Utilities/DataFlow/doc/SubframeBuilderDevice.1.in new file mode 100644 index 0000000000000..e6e0fafde1347 --- /dev/null +++ b/Utilities/DataFlow/doc/SubframeBuilderDevice.1.in @@ -0,0 +1,48 @@ +.\" Manpage for SubframeBuilderDevice. +.TH AliceO2 1 "12 May 2017" "1.0" "SubframeBuilderDevice man page" + +.SH NAME + +SubframeBuilderDevice - aggregate HBF in input as a single STF + +.SH SYNOPSIS + +SubframeBuilderDevice [options] + +.SH DESCRIPTION + +SubframeBuilderDevice will take in input a number of HeartBeat Frames +(HBF) and merge them in a single STF, with a policy defined by the +passed options. + +.SH OPTIONS + +--self-triggered Time frame duration + +.TP 5 + +--in-chan-name [NAME] Name of the input channel + +.TP 5 + +--out-chan-name [NAME] Name of the output channel + +.TP 5 + +--detector-name [NAME] Name of detector as data source + +.TP 5 + +--flp-id arg [NAME] ID of the FLP used as data source + +.TP 5 + +--strip-hbf Strip HeartBeatHeader (HBH) & HeartBeatTrailer (HBT) from each HBF + +.SH SEE ALSO + +FLPSenderDEvice(1), EPNReceiverDevice(1), HeartbeatSampler(1), TimeframeValidator(1) + +.SH BUGS + +Lots of bugs diff --git a/Utilities/DataFlow/include/DataFlow/PayloadMerger.h b/Utilities/DataFlow/include/DataFlow/PayloadMerger.h index 0e60b53c5ce6f..a68ba09d24f85 100644 --- a/Utilities/DataFlow/include/DataFlow/PayloadMerger.h +++ b/Utilities/DataFlow/include/DataFlow/PayloadMerger.h @@ -26,12 +26,6 @@ namespace o2 { namespace dataflow { /// - Finalise buffer creation with the finalise call. template class PayloadMerger { - static int64_t fullPayloadExtractor(char **payload, - char *buffer, - size_t bufferSize) { - *payload = buffer; - return bufferSize; - } public: using MergeableId = ID; using MessageMap = std::multimap>; @@ -110,6 +104,13 @@ class PayloadMerger { return sum; } + // Helper method which leaves the payload untouched + static int64_t fullPayloadExtractor(char **payload, + char *buffer, + size_t bufferSize) { + *payload = buffer; + return bufferSize; + } private: IdExtractor mMakeId; MergeCompletionCheker mCheckIfComplete; diff --git a/Utilities/DataFlow/include/DataFlow/SubframeBuilderDevice.h b/Utilities/DataFlow/include/DataFlow/SubframeBuilderDevice.h index c6f6e570d60b9..9bbb0d1482af5 100644 --- a/Utilities/DataFlow/include/DataFlow/SubframeBuilderDevice.h +++ b/Utilities/DataFlow/include/DataFlow/SubframeBuilderDevice.h @@ -21,7 +21,10 @@ #include "Headers/DataHeader.h" #include "Headers/HeartbeatFrame.h" #include "O2Device/O2Device.h" +#include "DataFlow/PayloadMerger.h" +#include "DataFlow/SubframeUtils.h" #include +#include class FairMQParts; @@ -51,22 +54,26 @@ namespace DataFlow { class SubframeBuilderDevice : public Base::O2Device { public: - typedef o2::Base::O2Message O2Message; + using O2Message = o2::Base::O2Message; + using SubframeId = o2::dataflow::SubframeId; + using Merger = dataflow::PayloadMerger; static constexpr const char* OptionKeyInputChannelName = "in-chan-name"; static constexpr const char* OptionKeyOutputChannelName = "out-chan-name"; - static constexpr const char* OptionKeyDuration = "duration"; - static constexpr const char* OptionKeySelfTriggered = "self-triggered"; + static constexpr const char* OptionKeyOrbitDuration = "orbit-duration"; + static constexpr const char* OptionKeyOrbitsPerTimeframe = "orbits-per-timeframe"; static constexpr const char* OptionKeyInDataFile = "indatafile-name"; static constexpr const char* OptionKeyDetector = "detector-name"; + static constexpr const char* OptionKeyFLPId = "flp-id"; + static constexpr const char* OptionKeyStripHBF = "strip-hbf"; // TODO: this is just a first mockup, remove it - // Default duration is for now harcoded to 22 milliseconds. // Default start time for all the producers is 8/4/1977 // Timeframe start time will be ((N * duration) + start time) where // N is the incremental number of timeframes being sent out. // TODO: replace this with a unique Heartbeat from a common device. - static constexpr uint32_t DefaultDuration = 22000000; + static constexpr uint32_t DefaultOrbitDuration = 88924; + static constexpr uint32_t DefaultOrbitsPerTimeframe = 256; static constexpr uint64_t DefaultHeartbeatStart = 229314600000000000LL; /// Default constructor @@ -93,13 +100,15 @@ class SubframeBuilderDevice : public Base::O2Device bool BuildAndSendFrame(FairMQParts &parts); private: - unsigned mFrameNumber = 0; - constexpr static uint32_t mOrbitsPerTimeframe = 1; - constexpr static uint32_t mOrbitDuration = 1000000000; - constexpr static uint32_t mDuration = mOrbitsPerTimeframe * mOrbitDuration; + uint32_t mOrbitsPerTimeframe; + // FIXME: lookup the actual value + uint32_t mOrbitDuration; std::string mInputChannelName = ""; std::string mOutputChannelName = ""; - bool mIsSelfTriggered = false; + size_t mFLPId = 0; + bool mStripHBF = false; + std::unique_ptr mMerger; + uint64_t mHeartbeatStart = DefaultHeartbeatStart; template diff --git a/Utilities/DataFlow/src/SubframeBuilderDevice.cxx b/Utilities/DataFlow/src/SubframeBuilderDevice.cxx index 8c9aa43a99b6c..974fae8b921a0 100644 --- a/Utilities/DataFlow/src/SubframeBuilderDevice.cxx +++ b/Utilities/DataFlow/src/SubframeBuilderDevice.cxx @@ -18,27 +18,20 @@ #include #include "DataFlow/SubframeBuilderDevice.h" +#include "DataFlow/SubframeUtils.h" #include "Headers/SubframeMetadata.h" #include "Headers/HeartbeatFrame.h" #include "Headers/DataHeader.h" #include -// From C++11 on, constexpr static data members are implicitly inlined. Redeclaration -// is still permitted, but deprecated. Some compilers do not implement this standard -// correctly. It also has to be noticed that this error does not occur for all the -// other public constexpr members -constexpr uint32_t o2::DataFlow::SubframeBuilderDevice::mOrbitsPerTimeframe; -constexpr uint32_t o2::DataFlow::SubframeBuilderDevice::mOrbitDuration; -constexpr uint32_t o2::DataFlow::SubframeBuilderDevice::mDuration; - using HeartbeatHeader = o2::Header::HeartbeatHeader; using HeartbeatTrailer = o2::Header::HeartbeatTrailer; using DataHeader = o2::Header::DataHeader; +using SubframeId = o2::dataflow::SubframeId; o2::DataFlow::SubframeBuilderDevice::SubframeBuilderDevice() : O2Device() { - LOG(INFO) << "o2::DataFlow::SubframeBuilderDevice::SubframeBuilderDevice " << mDuration << "\n"; } o2::DataFlow::SubframeBuilderDevice::~SubframeBuilderDevice() @@ -46,35 +39,57 @@ o2::DataFlow::SubframeBuilderDevice::~SubframeBuilderDevice() void o2::DataFlow::SubframeBuilderDevice::InitTask() { -// mDuration = GetConfig()->GetValue(OptionKeyDuration); - mIsSelfTriggered = GetConfig()->GetValue(OptionKeySelfTriggered); + mOrbitDuration = GetConfig()->GetValue(OptionKeyOrbitDuration); + mOrbitsPerTimeframe = GetConfig()->GetValue(OptionKeyOrbitsPerTimeframe); mInputChannelName = GetConfig()->GetValue(OptionKeyInputChannelName); mOutputChannelName = GetConfig()->GetValue(OptionKeyOutputChannelName); - - if (!mIsSelfTriggered) { - // depending on whether the device is self-triggered or expects input, - // the handler function needs to be registered or not. - // ConditionalRun is not called anymore from the base class if the - // callback is registered - LOG(INFO) << "Obtaining data from DataPublisher\n"; - OnData(mInputChannelName.c_str(), &o2::DataFlow::SubframeBuilderDevice::HandleData); - } else { - LOG(INFO) << "Self triggered mode. Doing nothing for now.\n"; - } - LOG(INFO) << "o2::DataFlow::SubframeBuilderDevice::InitTask " << mDuration << "\n"; -} - -// FIXME: how do we actually find out the payload size??? -int64_t extractDetectorPayload(char **payload, char *buffer, size_t bufferSize) { - *payload = buffer + sizeof(HeartbeatHeader); - return bufferSize - sizeof(HeartbeatHeader) - sizeof(HeartbeatTrailer); + mFLPId= GetConfig()->GetValue(OptionKeyFLPId); + mStripHBF= GetConfig()->GetValue(OptionKeyStripHBF); + + LOG(INFO) << "Obtaining data from DataPublisher\n"; + // Now that we have all the information lets create the policies to do the + // payload extraction and merging and create the actual PayloadMerger. + + // We extract the timeframeId from the number of orbits. + // FIXME: handle multiple socket ids + Merger::IdExtractor makeId = [this](std::unique_ptr &msg) -> SubframeId { + HeartbeatHeader *hbh = reinterpret_cast(msg->GetData()); + SubframeId id = {.timeframeId = hbh->orbit / this->mOrbitsPerTimeframe, + .socketId = 0 }; + return id; + }; + + // We extract the payload differently depending on wether we want to strip + // the header or not. + Merger::PayloadExtractor payloadExtractor = [this](char **out, char *in, size_t inSize) -> size_t { + if (!this->mStripHBF) { + return Merger::fullPayloadExtractor(out, in, inSize); + } + return o2::dataflow::extractDetectorPayloadStrip(out, in, inSize); + }; + + // Whether a given timeframe is complete depends on how many orbits per + // timeframe we want. + Merger::MergeCompletionCheker checkIfComplete = + [this](Merger::MergeableId id, Merger::MessageMap &map) { + return map.count(id) < this->mOrbitsPerTimeframe; + }; + + mMerger.reset(new Merger(makeId, checkIfComplete, payloadExtractor)); + OnData(mInputChannelName.c_str(), &o2::DataFlow::SubframeBuilderDevice::HandleData); } bool o2::DataFlow::SubframeBuilderDevice::BuildAndSendFrame(FairMQParts &inParts) { - LOG(INFO) << "o2::DataFlow::SubframeBuilderDevice::BuildAndSendFrame" << mDuration << "\n"; - char *incomingBuffer = (char *)inParts.At(1)->GetData(); - HeartbeatHeader *hbh = reinterpret_cast(incomingBuffer); + auto id = mMerger->aggregate(inParts.At(1)); + + char **outBuffer; + size_t outSize = mMerger->finalise(outBuffer, id); + // In this case we do not have enough subtimeframes for id, + // so we simply return. + if (outSize == 0) + return true; + // If we reach here, it means we do have enough subtimeframes. // top level subframe header, the DataHeader is going to be used with // description "SUBTIMEFRAMEMD" @@ -83,60 +98,33 @@ bool o2::DataFlow::SubframeBuilderDevice::BuildAndSendFrame(FairMQParts &inParts // all CRUs of a FLP in all cases serve a single detector o2::Header::DataHeader dh; dh.dataDescription = o2::Header::DataDescription("SUBTIMEFRAMEMD"); - dh.dataOrigin = o2::Header::DataOrigin("TEST"); - dh.subSpecification = 0; + dh.dataOrigin = o2::Header::DataOrigin("FLP"); + dh.subSpecification = mFLPId; dh.payloadSize = sizeof(SubframeMetadata); + DataHeader payloadheader(*o2::Header::get((byte*)inParts.At(0)->GetData())); + // subframe meta information as payload SubframeMetadata md; - // md.startTime = (hbh->orbit / mOrbitsPerTimeframe) * mDuration; - md.startTime = static_cast(hbh->orbit) * static_cast(mOrbitDuration); - md.duration = mDuration; - LOG(INFO) << "Start time for subframe (" << hbh->orbit << ", " - << mDuration - << ") " << timeframeIdFromTimestamp(md.startTime, mDuration) << " " << md.startTime<< "\n"; - - // send an empty subframe (no detector payload), only the data header - // and the subframe meta data are added to the sub timeframe - // TODO: this is going to be changed as soon as the device implements - // handling of the input data + // id is really the first orbit in the timeframe. + md.startTime = id.timeframeId * mOrbitsPerTimeframe * static_cast(mOrbitDuration); + md.duration = mOrbitDuration*mOrbitsPerTimeframe; + LOG(INFO) << "Start time for subframe (" << md.startTime << ", " + << md.duration + << ")"; + + // Add the metadata about the merged subtimeframes + // FIXME: do we really need this? O2Message outgoing; - - // build multipart message from header and payload AddMessage(outgoing, dh, NewSimpleMessage(md)); - char *sourcePayload = nullptr; - auto payloadSize = extractDetectorPayload(&sourcePayload, - incomingBuffer, - inParts.At(1)->GetSize()); - LOG(INFO) << "Got " << inParts.Size() << " parts\n"; - for (auto pi = 0; pi < inParts.Size(); ++pi) - { - LOG(INFO) << " Part " << pi << ": " << inParts.At(pi)->GetSize() << " bytes \n"; - } - if (payloadSize <= 0) - { - LOG(ERROR) << "Payload is too small: " << payloadSize << "\n"; - return true; - } - else - { - LOG(INFO) << "Payload of size " << payloadSize << "received\n"; - } - - auto *payload = new char[payloadSize](); - memcpy(payload, sourcePayload, payloadSize); - DataHeader payloadheader(*o2::Header::get((byte*)inParts.At(0)->GetData())); - - payloadheader.subSpecification = 0; - payloadheader.payloadSize = payloadSize; - - // FIXME: take care of multiple HBF per SubtimeFrame + // Add the actual merged payload. AddMessage(outgoing, payloadheader, - NewMessage(payload, payloadSize, - [](void* data, void* hint) { delete[] reinterpret_cast(hint); }, payload)); + NewMessage(*outBuffer, outSize, + [](void* data, void* hint) { delete[] reinterpret_cast(hint); }, *outBuffer)); // send message Send(outgoing, mOutputChannelName.c_str()); + // FIXME: do we actually need this? outgoing should go out of scope outgoing.fParts.clear(); return true; diff --git a/Utilities/DataFlow/src/runSubframeBuilderDevice.cxx b/Utilities/DataFlow/src/runSubframeBuilderDevice.cxx index 6bbf45adce8ef..a3ed59e406ef4 100644 --- a/Utilities/DataFlow/src/runSubframeBuilderDevice.cxx +++ b/Utilities/DataFlow/src/runSubframeBuilderDevice.cxx @@ -13,15 +13,18 @@ namespace bpo = boost::program_options; +constexpr uint32_t o2::DataFlow::SubframeBuilderDevice::DefaultOrbitDuration; +constexpr uint32_t o2::DataFlow::SubframeBuilderDevice::DefaultOrbitsPerTimeframe; + void addCustomOptions(bpo::options_description& options) { options.add_options() -// (AliceO2::DataFlow::SubframeBuilderDevice::OptionKeyDuration, -// bpo::value()->default_value(10000), -// "Time frame duration") - (o2::DataFlow::SubframeBuilderDevice::OptionKeySelfTriggered, - bpo::bool_switch()->default_value(false), - "Time frame duration") + (o2::DataFlow::SubframeBuilderDevice::OptionKeyOrbitDuration, + bpo::value()->default_value(o2::DataFlow::SubframeBuilderDevice::DefaultOrbitDuration), + "Orbit duration") + (o2::DataFlow::SubframeBuilderDevice::OptionKeyOrbitsPerTimeframe, + bpo::value()->default_value(o2::DataFlow::SubframeBuilderDevice::DefaultOrbitsPerTimeframe), + "Orbits per timeframe") (o2::DataFlow::SubframeBuilderDevice::OptionKeyInputChannelName, bpo::value()->default_value("input"), "Name of the input channel") @@ -30,7 +33,13 @@ void addCustomOptions(bpo::options_description& options) "Name of the output channel") (o2::DataFlow::SubframeBuilderDevice::OptionKeyDetector, bpo::value()->default_value("TPC"), - "Name of detector as data source"); + "Name of detector as data source") + (o2::DataFlow::SubframeBuilderDevice::OptionKeyFLPId, + bpo::value()->default_value(0), + "ID of the FLP used as data source") + (o2::DataFlow::SubframeBuilderDevice::OptionKeyStripHBF, + bpo::bool_switch()->default_value(false), + "Strip HBH & HBT from each HBF"); } FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) diff --git a/doc/o2.1.in b/doc/o2.1.in index 2a91c5a9908ef..c1818a3cf64f9 100644 --- a/doc/o2.1.in +++ b/doc/o2.1.in @@ -7,7 +7,7 @@ O2 is Alice next generation software framework to be used for RUN3. .SH DEVICES -AliceHLTWrapperDevice(1) +AliceHLTWrapperDevice(1), SubframeBuilderDevice(1) .\.SH TOOLS