diff --git a/Detectors/TPC/workflow/src/ClusterDecoderRawSpec.cxx b/Detectors/TPC/workflow/src/ClusterDecoderRawSpec.cxx index f171dd04bb6db..dfd7aa446d24d 100644 --- a/Detectors/TPC/workflow/src/ClusterDecoderRawSpec.cxx +++ b/Detectors/TPC/workflow/src/ClusterDecoderRawSpec.cxx @@ -147,7 +147,7 @@ DataProcessorSpec getClusterDecoderRawSpec(bool sendMC) // containers are created for clusters and MC labels per (sector,globalPadRow) address char* outputBuffer = nullptr; auto outputAllocator = [&pc, &fanSpec, &outputBuffer, &rawHeaderStack](size_t size) -> char* { - outputBuffer = pc.outputs().newChunk(Output{ gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, Lifetime::Timeframe, std::move(rawHeaderStack) }, size).data; + outputBuffer = pc.outputs().newChunk(Output{ gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, Lifetime::Timeframe, std::move(rawHeaderStack) }, size).data(); return outputBuffer; }; std::vector mcoutList; diff --git a/Framework/Core/include/Framework/DataAllocator.h b/Framework/Core/include/Framework/DataAllocator.h index a830dfc3a3310..d7134eacffe18 100644 --- a/Framework/Core/include/Framework/DataAllocator.h +++ b/Framework/Core/include/Framework/DataAllocator.h @@ -78,11 +78,11 @@ class DataAllocator ContextRegistry* contextes, const AllowedOutputRoutes& routes); - DataChunk newChunk(const Output&, size_t); + DataChunk& newChunk(const Output&, size_t); - inline DataChunk newChunk(OutputRef&& ref, size_t size) { return newChunk(getOutputByBind(std::move(ref)), size); } + inline DataChunk& newChunk(OutputRef&& ref, size_t size) { return newChunk(getOutputByBind(std::move(ref)), size); } - DataChunk adoptChunk(const Output&, char*, size_t, fairmq_free_fn*, void*); + void adoptChunk(const Output&, char*, size_t, fairmq_free_fn*, void*); // In case no extra argument is provided and the passed type is trivially // copyable and non polymorphic, the most likely wanted behavior is to create @@ -91,12 +91,12 @@ class DataAllocator typename std::enable_if::value == true, T&>::type make(const Output& spec) { - DataChunk chunk = newChunk(spec, sizeof(T)); - return *reinterpret_cast(chunk.data); + return *reinterpret_cast(newChunk(spec, sizeof(T)).data()); } // In case an extra argument is provided, we consider this an array / // collection elements of that type + // FIXME: once the vector functionality with polymorphic allocator is fully in place, this might be dropped template typename std::enable_if::value == true, gsl::span&>::type make(const Output& spec, size_t nElements) diff --git a/Framework/Core/include/Framework/DataChunk.h b/Framework/Core/include/Framework/DataChunk.h index 208460d9dd405..9e172029a99ca 100644 --- a/Framework/Core/include/Framework/DataChunk.h +++ b/Framework/Core/include/Framework/DataChunk.h @@ -10,19 +10,38 @@ #ifndef FRAMEWORK_DATACHUNK_H #define FRAMEWORK_DATACHUNK_H +#include "MemoryResources/MemoryResources.h" + namespace o2 { namespace framework { +/// @class DataChunk A resizable buffer used with DPL's DataAllocator +/// DataChunk derives from std::vector with polymorphic allocator and forbids copying, the underlying +/// buffer is of type char and is through DPL and polymorphic memory resource directly allocated in the +/// message memory. +/// Since MessageContext returns the object by reference, the forbidden copy and assignment makes sure that +/// the code can not accidentally use a copy instead reference. +class DataChunk : public std::vector> +{ + public: + // FIXME: want to have a general forwarding, but then the copy constructor is not deleted any more despite + // it's declared deleted + //template + //DataChunk(T&& arg, Args&&... args) : std::vector>(std::forward(args)...) + //{ + //} -/// Simple struct to hold a pointer to the actual FairMQMessage. -/// In principle this could be an iovec... -struct DataChunk { - char *data; - size_t size; + // DataChunk is special and for the moment it's enough to declare the constructor with size and allocator + DataChunk(size_t size, const o2::pmr::polymorphic_allocator& allocator) : std::vector>(size, allocator) + { + } + DataChunk(const DataChunk&) = delete; + DataChunk& operator=(const DataChunk&) = delete; + DataChunk(DataChunk&&) = default; + DataChunk& operator=(DataChunk&&) = default; }; } // namespace framework } // namespace o2 - #endif // FRAMEWORK_DATACHUNK_H diff --git a/Framework/Core/include/Framework/MessageContext.h b/Framework/Core/include/Framework/MessageContext.h index 70ace6583ac9a..73bbf3f3c79b6 100644 --- a/Framework/Core/include/Framework/MessageContext.h +++ b/Framework/Core/include/Framework/MessageContext.h @@ -12,6 +12,8 @@ #include "Framework/FairMQDeviceProxy.h" #include "Framework/TypeTraits.h" +#include "MemoryResources/MemoryResources.h" +#include "Headers/DataHeader.h" #include #include @@ -20,6 +22,7 @@ #include #include #include +#include class FairMQDevice; @@ -35,25 +38,56 @@ class MessageContext { { } - // thhis is the virtual interface for context objects + // this is the virtual interface for context objects class ContextObject { public: ContextObject() = default; ContextObject(FairMQMessagePtr&& headerMsg, FairMQMessagePtr&& payloadMsg, const std::string& bindingChannel) - : parts{}, channel{ bindingChannel } + : mParts{}, mChannel{ bindingChannel } { - parts.AddPart(std::move(headerMsg)); - parts.AddPart(std::move(payloadMsg)); + mParts.AddPart(std::move(headerMsg)); + mParts.AddPart(std::move(payloadMsg)); + } + ContextObject(FairMQMessagePtr&& headerMsg, const std::string& bindingChannel) + : mParts{}, mChannel{ bindingChannel } + { + mParts.AddPart(std::move(headerMsg)); } virtual ~ContextObject() = default; - // TODO: we keep this public for the moment to keep the current interface - // the send-handler loops over all objects and can acces the message parts - // directly, needs to be changed when the context is generalized, then the - // information needs to be stored in the derived class - FairMQParts parts; - std::string channel; + /// @brief Finalize the object and return the parts by move + /// This is the default method and can be overloaded by other implmentations to carry out other + /// tasks before returning the parts objects + virtual FairMQParts finalize() + { + FairMQParts parts = std::move(mParts); + assert(parts.Size() == 2); + auto* header = o2::header::get(parts.At(0)->GetData()); + if (header == nullptr) { + throw std::logic_error("No valid header message found"); + } else { + // o2::header::get returns const pointer, but here we can change the message + const_cast(header)->payloadSize = parts.At(1)->GetSize(); + } + // return value optimization returns by move + return parts; + } + + /// @brief return the channel name + const std::string& channel() const + { + return mChannel; + } + + bool empty() const + { + return mParts.Size() == 0; + } + + protected: + FairMQParts mParts; + std::string mChannel; }; /// TrivialObject handles a message object @@ -78,13 +112,101 @@ class MessageContext { auto* data() { - assert(parts.Size() == 2); - return parts[1].GetData(); + assert(mParts.Size() == 2); + return mParts[1].GetData(); + } + }; + + /// ContainerRefObject handles a message object holding an instance of type T + /// The allocator type is required to be o2::pmr::polymorphic_allocator + /// can not adopt an existing message, because the polymorphic_allocator will call type constructor, + /// so this works only with new messages + /// FIXME: not sure if we want to have this for all container types + template + class ContainerRefObject : public ContextObject + { + public: + using value_type = typename T::value_type; + using return_type = T; + using buffer_type = return_type; + static_assert(std::is_same>::value, "container must have polymorphic allocator"); + /// default contructor forbidden, object always has to control message instances + ContainerRefObject() = delete; + /// constructor taking header message by move and creating the paypload message + template + ContainerRefObject(ContextType* context, FairMQMessagePtr&& headerMsg, const std::string& bindingChannel, int index, size_t size) + : ContextObject(std::forward(headerMsg), bindingChannel), + // backup of initially allocated size + mAllocatedSize{ size * sizeof(value_type) }, + // the transport factory + mFactory{ context->proxy().getTransport(bindingChannel, index) }, + // the memory resource takes ownership of the message + mResource{ mFactory ? mFactory->GetMemoryResource() : nullptr }, + // create the vector with apropriate underlying memory resource for the message + mData{ size, pmr::polymorphic_allocator(mResource) } + { + // FIXME: drop this repeated check and make sure at initial setup of devices that everything is fine + // introduce error policy + if (mFactory == nullptr) { + throw std::runtime_error(std::string("failed to get transport factory for channel ") + bindingChannel); + } + if (mResource == nullptr) { + throw std::runtime_error(std::string("no memory resource for channel ") + bindingChannel); + } + } + ~ContainerRefObject() override = default; + + /// @brief Finalize object and return parts by move + /// This retrieves the actual message from the vector object and moves it to the parts + FairMQParts finalize() final + { + assert(mParts.Size() == 1); + auto payloadMsg = o2::pmr::getMessage(std::move(mData)); + mParts.AddPart(std::move(payloadMsg)); + return ContextObject::finalize(); + } + + /// @brief return reference to the handled vector object + operator return_type&() + { + return mData; + } + + /// @brief return reference to the handled vector object + return_type& get() + { + return mData; + } + + /// @brief return data pointer of the handled vector object + value_type* data() + { + return mData.data(); + } + + private: + size_t mAllocatedSize; /// backup of initially allocated size + FairMQTransportFactory* mFactory = nullptr; /// pointer to transport factory + pmr::FairMQMemoryResource* mResource = nullptr; /// message resource + buffer_type mData; /// the data buffer + }; + + /// VectorObject handles a message object holding std::vector with polymorphic_allocator + /// can not adopt an existing message, because the polymorphic_allocator will call the element constructor, + /// so this works only with new messages + template >>> + class VectorObject : public _BASE + { + public: + template + VectorObject(Args&&... args) : _BASE(std::forward(args)...) + { } }; // SpanObject creates a trivial binary object for an array of elements of // type T and holds a span over the elements + // FIXME: probably obsolete after introducing of vector with polymorphic_allocator template class SpanObject : public ContextObject { @@ -101,9 +223,9 @@ class MessageContext { // TODO: we probably also want to check consistency of the header message, i.e. payloadSize member auto payloadMsg = context->createMessage(bindingChannel, index, nElements * sizeof(T)); mValue = value_type(reinterpret_cast(payloadMsg->GetData()), nElements); - parts.AddPart(std::move(headerMsg)); - parts.AddPart(std::move(payloadMsg)); - channel = bindingChannel; + mParts.AddPart(std::move(headerMsg)); + mParts.AddPart(std::move(payloadMsg)); + mChannel = bindingChannel; } ~SpanObject() override = default; @@ -152,7 +274,7 @@ class MessageContext { { // Verify that everything has been sent on clear. for (auto &m : mMessages) { - assert(m->parts.Size() == 0); + assert(m->empty()); } mMessages.clear(); } diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index 6b608f5bf4f5f..76371ce3274d4 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -61,8 +61,8 @@ DataAllocator::matchDataHeader(const Output& spec, size_t timeslice) { throw std::runtime_error(str.str()); } -DataChunk -DataAllocator::newChunk(const Output& spec, size_t size) { +DataChunk& DataAllocator::newChunk(const Output& spec, size_t size) +{ std::string channel = matchDataHeader(spec, mTimingInfo->timeslice); auto context = mContextRegistry->get(); @@ -70,12 +70,12 @@ DataAllocator::newChunk(const Output& spec, size_t size) { o2::header::gSerializationMethodNone, // size // ); - auto& co = context->add(std::move(headerMessage), channel, 0, size); - return DataChunk{ reinterpret_cast(co.data()), size }; + auto& co = context->add>(std::move(headerMessage), channel, 0, size); + return co; } -DataChunk -DataAllocator::adoptChunk(const Output& spec, char *buffer, size_t size, fairmq_free_fn *freefn, void *hint = nullptr) { +void DataAllocator::adoptChunk(const Output& spec, char* buffer, size_t size, fairmq_free_fn* freefn, void* hint = nullptr) +{ // Find a matching channel, create a new message for it and put it in the // queue to be sent at the end of the processing std::string channel = matchDataHeader(spec, mTimingInfo->timeslice); @@ -87,8 +87,7 @@ DataAllocator::adoptChunk(const Output& spec, char *buffer, size_t size, fairmq_ // FIXME: how do we want to use subchannels? time based parallelism? auto context = mContextRegistry->get(); - auto& co = context->add(std::move(headerMessage), channel, 0, buffer, size, freefn, hint); - return DataChunk{ reinterpret_cast(co.data()), size }; + context->add(std::move(headerMessage), channel, 0, buffer, size, freefn, hint); } FairMQMessagePtr DataAllocator::headerMessageFromOutput(Output const& spec, // diff --git a/Framework/Core/src/DataProcessor.cxx b/Framework/Core/src/DataProcessor.cxx index e5a2c67643d1c..1e77bf741236c 100644 --- a/Framework/Core/src/DataProcessor.cxx +++ b/Framework/Core/src/DataProcessor.cxx @@ -34,11 +34,10 @@ namespace framework void DataProcessor::doSend(FairMQDevice &device, MessageContext &context) { for (auto &message : context) { // monitoringService.send({ message->parts.Size(), "outputs/total" }); - assert(message->parts.Size() == 2); - FairMQParts parts = std::move(message->parts); - assert(message->parts.Size() == 0); + FairMQParts parts = std::move(message->finalize()); + assert(message->empty()); assert(parts.Size() == 2); - device.Send(parts, message->channel, 0); + device.Send(parts, message->channel(), 0); assert(parts.Size() == 2); } } diff --git a/Framework/Core/src/Dispatcher.cxx b/Framework/Core/src/Dispatcher.cxx index e61d6b8b35443..97fdf967553a0 100644 --- a/Framework/Core/src/Dispatcher.cxx +++ b/Framework/Core/src/Dispatcher.cxx @@ -84,8 +84,8 @@ void Dispatcher::send(DataAllocator& dataAllocator, const DataRef& inputData, co dataAllocator.adopt(output, DataRefUtils::as(inputData).release()); } else { // POD // todo: do it non-copy, when API is available - auto outputMessage = dataAllocator.newChunk(output, inputHeader->payloadSize); - memcpy(outputMessage.data, inputData.payload, inputHeader->payloadSize); + auto& outputMessage = dataAllocator.newChunk(output, inputHeader->payloadSize); + memcpy(outputMessage.data(), inputData.payload, inputHeader->payloadSize); } } diff --git a/Framework/Core/test/test_DataAllocator.cxx b/Framework/Core/test/test_DataAllocator.cxx index eb33585551184..ded144610f988 100644 --- a/Framework/Core/test/test_DataAllocator.cxx +++ b/Framework/Core/test/test_DataAllocator.cxx @@ -107,6 +107,14 @@ DataProcessorSpec getSourceSpec() auto freefct = [](void* data, void* hint) {}; // simply ignore the cleanup for the test static std::string teststring = "adoptchunk"; pc.outputs().adoptChunk(Output{ "TST", "ADOPTCHUNK", 0, Lifetime::Timeframe }, teststring.data(), teststring.length(), freefct, nullptr); + // test resizable data chunk, initial size 0 and grow + auto& growchunk = pc.outputs().newChunk(OutputRef{ "growchunk", 0 }, 0); + growchunk.resize(sizeof(o2::test::TriviallyCopyable)); + memcpy(growchunk.data(), &a, sizeof(o2::test::TriviallyCopyable)); + // test resizable data chunk, large initial size and shrink + auto& shrinkchunk = pc.outputs().newChunk(OutputRef{ "shrinkchunk", 0 }, 1000000); + shrinkchunk.resize(sizeof(o2::test::TriviallyCopyable)); + memcpy(shrinkchunk.data(), &a, sizeof(o2::test::TriviallyCopyable)); }; return DataProcessorSpec{ "source", // name of the processor @@ -114,6 +122,8 @@ DataProcessorSpec getSourceSpec() { OutputSpec{ "TST", "MESSAGEABLE", 0, Lifetime::Timeframe }, OutputSpec{ { "makesingle" }, "TST", "MAKESINGLE", 0, Lifetime::Timeframe }, OutputSpec{ { "makespan" }, "TST", "MAKESPAN", 0, Lifetime::Timeframe }, + OutputSpec{ { "growchunk" }, "TST", "GROWCHUNK", 0, Lifetime::Timeframe }, + OutputSpec{ { "shrinkchunk" }, "TST", "SHRINKCHUNK", 0, Lifetime::Timeframe }, OutputSpec{ "TST", "ADOPTCHUNK", 0, Lifetime::Timeframe }, OutputSpec{ "TST", "MSGBLEROOTSRLZ", 0, Lifetime::Timeframe }, OutputSpec{ "TST", "ROOTNONTOBJECT", 0, Lifetime::Timeframe }, @@ -143,8 +153,10 @@ DataProcessorSpec getSinkSpec() dumpStack(dh); } // plain, unserialized object in input1 channel + LOG(INFO) << "extracting o2::test::TriviallyCopyable from input1"; auto object1 = pc.inputs().get("input1"); ASSERT_ERROR(object1 == o2::test::TriviallyCopyable(42, 23, 0xdead)); + LOG(INFO) << "extracting span of o2::test::TriviallyCopyable from input1"; auto object1span = pc.inputs().get>("input1"); ASSERT_ERROR(object1span.size() == 1); ASSERT_ERROR(sizeof(typename decltype(object1span)::value_type) == sizeof(o2::test::TriviallyCopyable)); @@ -157,50 +169,67 @@ DataProcessorSpec getSinkSpec() ASSERT_ERROR(metaHeader2 != nullptr && metaHeader2->secret == 23); // ROOT-serialized messageable object in input2 channel + LOG(INFO) << "extracting o2::test::TriviallyCopyable pointer from input2"; auto object2 = pc.inputs().get("input2"); ASSERT_ERROR(object2 != nullptr); ASSERT_ERROR(*object2 == o2::test::TriviallyCopyable(42, 23, 0xdead)); // ROOT-serialized, non-messageable object in input3 channel + LOG(INFO) << "extracting o2::test::Polymorphic pointer from input3"; auto object3 = pc.inputs().get("input3"); ASSERT_ERROR(object3 != nullptr); ASSERT_ERROR(*object3 == o2::test::Polymorphic(0xbeef)); // container of objects + LOG(INFO) << "extracting vector of o2::test::Polymorphic from input4"; auto object4 = pc.inputs().get>("input4"); ASSERT_ERROR(object4.size() == 2); ASSERT_ERROR(object4[0] == o2::test::Polymorphic(0xaffe)); ASSERT_ERROR(object4[1] == o2::test::Polymorphic(0xd00f)); // container of objects + LOG(INFO) << "extracting vector of o2::test::Polymorphic from input5"; auto object5 = pc.inputs().get>("input5"); ASSERT_ERROR(object5.size() == 2); ASSERT_ERROR(object5[0] == o2::test::Polymorphic(0xaffe)); ASSERT_ERROR(object5[1] == o2::test::Polymorphic(0xd00f)); // container of objects + LOG(INFO) << "extracting vector of o2::test::Polymorphic from input6"; auto object6 = pc.inputs().get>("input6"); ASSERT_ERROR(object6.size() == 2); ASSERT_ERROR(object6[0] == o2::test::Polymorphic(0xaffe)); ASSERT_ERROR(object6[1] == o2::test::Polymorphic(0xd00f)); // checking retrieving buffer as raw char*, and checking content by cast + LOG(INFO) << "extracting raw char* from input1"; auto rawchar = pc.inputs().get("input1"); const auto& data1 = *reinterpret_cast(rawchar); ASSERT_ERROR(data1 == o2::test::TriviallyCopyable(42, 23, 0xdead)); + LOG(INFO) << "extracting o2::test::TriviallyCopyable from input7"; auto object7 = pc.inputs().get("input7"); ASSERT_ERROR(object1 == o2::test::TriviallyCopyable(42, 23, 0xdead)); + LOG(INFO) << "extracting span of o2::test::TriviallyCopyable from input8"; auto objectspan8 = DataRefUtils::as(pc.inputs().get("input8")); ASSERT_ERROR(objectspan8.size() == 3); for (auto const& object8 : objectspan8) { ASSERT_ERROR(object8 == o2::test::TriviallyCopyable(42, 23, 0xdead)); } + LOG(INFO) << "extracting std::string from input9"; auto object9 = pc.inputs().get("input9"); ASSERT_ERROR(object9 == "adoptchunk"); + LOG(INFO) << "extracting o2::test::TriviallyCopyable from input10"; + auto object10 = pc.inputs().get("input10"); + ASSERT_ERROR(object10 == o2::test::TriviallyCopyable(42, 23, 0xdead)); + + LOG(INFO) << "extracting o2::test::TriviallyCopyable from input11"; + auto object11 = pc.inputs().get("input11"); + ASSERT_ERROR(object11 == o2::test::TriviallyCopyable(42, 23, 0xdead)); + pc.services().get().readyToQuit(true); }; @@ -213,7 +242,9 @@ DataProcessorSpec getSinkSpec() InputSpec{ "input6", "TST", "ROOTSERLZDVEC2", 0, Lifetime::Timeframe }, InputSpec{ "input7", "TST", "MAKESINGLE", 0, Lifetime::Timeframe }, InputSpec{ "input8", "TST", "MAKESPAN", 0, Lifetime::Timeframe }, - InputSpec{ "input9", "TST", "ADOPTCHUNK", 0, Lifetime::Timeframe } }, + InputSpec{ "input9", "TST", "ADOPTCHUNK", 0, Lifetime::Timeframe }, + InputSpec{ "input10", "TST", "GROWCHUNK", 0, Lifetime::Timeframe }, + InputSpec{ "input11", "TST", "SHRINKCHUNK", 0, Lifetime::Timeframe } }, Outputs{}, AlgorithmSpec(processingFct) }; } diff --git a/Framework/Core/test/test_SimpleStatefulProcessing01.cxx b/Framework/Core/test/test_SimpleStatefulProcessing01.cxx index 50283a314dea8..17b1eb054e7e3 100644 --- a/Framework/Core/test/test_SimpleStatefulProcessing01.cxx +++ b/Framework/Core/test/test_SimpleStatefulProcessing01.cxx @@ -52,8 +52,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const&) { callbacks.set(CallbackService::Id::Stop, stopcb); callbacks.set(CallbackService::Id::Reset, resetcb); return adaptStateless([](DataAllocator& outputs) { - auto out = outputs.newChunk({ "TES", "STATEFUL", 0 }, sizeof(int)); - auto outI = reinterpret_cast(out.data); + auto& out = outputs.newChunk({ "TES", "STATEFUL", 0 }, sizeof(int)); + auto outI = reinterpret_cast(out.data()); outI[0] = foo++; }); }) // diff --git a/Framework/Utils/src/DPLBroadcaster.cxx b/Framework/Utils/src/DPLBroadcaster.cxx index 95679f1cac78c..d29b272036e4e 100644 --- a/Framework/Utils/src/DPLBroadcaster.cxx +++ b/Framework/Utils/src/DPLBroadcaster.cxx @@ -47,8 +47,8 @@ o2f::DataProcessorSpec defineBroadcaster(std::string devName, o2f::InputSpec usr auto msgSize = (*funcPtr)(inputMsg); // Iterating over the OutputSpecs to push the input message to all the output destinations for (const auto& itOutputs : (*outputsPtr)) { - auto fwdMsg = ctx.outputs().newChunk(itOutputs, msgSize); - std::memcpy(fwdMsg.data, inputMsg.payload, msgSize); + auto& fwdMsg = ctx.outputs().newChunk(itOutputs, msgSize); + std::memcpy(fwdMsg.data(), inputMsg.payload, msgSize); } }; } } }; diff --git a/Framework/Utils/src/DPLGatherer.cxx b/Framework/Utils/src/DPLGatherer.cxx index f0057a025b807..e8bbff5a53568 100644 --- a/Framework/Utils/src/DPLGatherer.cxx +++ b/Framework/Utils/src/DPLGatherer.cxx @@ -42,9 +42,9 @@ o2f::DataProcessorSpec defineGatherer(std::string devName, o2f::Inputs usrInputs // Retrieving message size from API auto msgSize = (o2::header::get(itInputs.header))->payloadSize; // Allocating new chunk - auto fwdMsg = ctx.outputs().newChunk((*outputPtr), msgSize); + auto& fwdMsg = ctx.outputs().newChunk((*outputPtr), msgSize); // Moving the input to the output chunk - std::memmove(fwdMsg.data, itInputs.payload, msgSize); + std::memmove(fwdMsg.data(), itInputs.payload, msgSize); } }; } } }; diff --git a/Framework/Utils/src/DPLRouter.cxx b/Framework/Utils/src/DPLRouter.cxx index 278c2a67279ae..f485eb83a2afb 100644 --- a/Framework/Utils/src/DPLRouter.cxx +++ b/Framework/Utils/src/DPLRouter.cxx @@ -42,8 +42,8 @@ o2f::DataProcessorSpec defineRouter(std::string devName, o2f::Inputs usrInput, o auto msgSize = (o2::header::get(inputMsg.header))->payloadSize; auto& outputCh = (*outputsPtr)[(*mappingFuncPtr)(inputMsg)]; - auto fwdMsg = ctx.outputs().newChunk(outputCh, msgSize); - std::memcpy(fwdMsg.data, inputMsg.payload, msgSize); + auto& fwdMsg = ctx.outputs().newChunk(outputCh, msgSize); + std::memcpy(fwdMsg.data(), inputMsg.payload, msgSize); }; } } }; } diff --git a/Framework/Utils/test/DPLBroadcasterMerger.cxx b/Framework/Utils/test/DPLBroadcasterMerger.cxx index 8b75178f2a3af..067efec2ac022 100644 --- a/Framework/Utils/test/DPLBroadcasterMerger.cxx +++ b/Framework/Utils/test/DPLBroadcasterMerger.cxx @@ -49,12 +49,12 @@ o2f::DataProcessorSpec defineGenerator(o2f::OutputSpec usrOutput) LOG(INFO) << ">>> Preparing MSG:" << msgIndex; - auto outputMsg = + auto& outputMsg = ctx.outputs().newChunk(*usrOutput_shptr, (msgIndex + 1) * sizeof(uint32_t) / sizeof(char)); LOG(INFO) << ">>> Preparing1 MSG:" << msgIndex; - auto payload = reinterpret_cast(outputMsg.data); + auto payload = reinterpret_cast(outputMsg.data()); payload[0] = msgIndex; @@ -82,8 +82,8 @@ o2f::DataProcessorSpec definePipeline(std::string devName, o2f::InputSpec usrInp auto inputMsg = ctx.inputs().getByPos(0); auto msgSize = (o2::header::get(inputMsg.header))->payloadSize; - auto fwdMsg = ctx.outputs().newChunk((*output_sharedptr), msgSize); - std::memcpy(fwdMsg.data, inputMsg.payload, msgSize); + auto& fwdMsg = ctx.outputs().newChunk((*output_sharedptr), msgSize); + std::memcpy(fwdMsg.data(), inputMsg.payload, msgSize); }; } } }; } diff --git a/Framework/Utils/test/DPLOutputTest.cxx b/Framework/Utils/test/DPLOutputTest.cxx index 59699c625b35e..af87b983fb038 100644 --- a/Framework/Utils/test/DPLOutputTest.cxx +++ b/Framework/Utils/test/DPLOutputTest.cxx @@ -43,12 +43,12 @@ o2f::DataProcessorSpec defineTestGenerator() LOG(INFO) << ">>> Preparing MSG:" << msgIndex; - auto outputMsg = ctx.outputs().newChunk({ "TST", "ToSink", 0, o2f::Lifetime::Timeframe }, - (31 + 1) * sizeof(uint32_t) / sizeof(char)); + auto& outputMsg = ctx.outputs().newChunk({ "TST", "ToSink", 0, o2f::Lifetime::Timeframe }, + (31 + 1) * sizeof(uint32_t) / sizeof(char)); LOG(INFO) << ">>> Preparing1 MSG:" << msgIndex; - auto payload = reinterpret_cast(outputMsg.data); + auto payload = reinterpret_cast(outputMsg.data()); payload[0] = msgIndex;