From d049aab7f642e3baf378477678c8d6b4ba028253 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Tue, 2 Apr 2019 10:43:35 +0200 Subject: [PATCH 1/2] [DPL] Support for resizable DataChunk Change DataChunk to be of type std::vector Interface changes: - makeChunk returns reference to std::vector object with polymorphic_allocator - return value of adoptChunk is dropped, not possible to preserve this, not been used anyhow This goes in line with the general idea of DataAllocator's `make` methods, where the object is booked with one single call and is send automatically after processing ends. Requires return by reference and to use a reference variable in the code. The disadvantage is that accidental copy is possible and impossible to detect at compile time. Details: Introducing VectorObject to MessageContext, holding an instance of a vector. The memory is directly allocated in the message memory using the pmr functionality. In order to avoid that objects go out of scope and the allocated buffer is available at send, the created vector can not be returned by move, but is kept in the context and returned by reference. This requires changes in the code to use the reference instead of a copy. The return value of adoptChunk must be dropped because a buffer can only be adopted to a vector object using a special allocator. With this it can not simply be returned as it is a different type. Using polymorphic_allocator even for trivially default constructible types still invokes the constructor for the elements, thus changing the memory of the underlying resource. --- .../workflow/src/ClusterDecoderRawSpec.cxx | 2 +- .../Core/include/Framework/DataAllocator.h | 10 +- Framework/Core/include/Framework/DataChunk.h | 12 +- .../Core/include/Framework/MessageContext.h | 138 ++++++++++++++++-- Framework/Core/src/DataAllocator.cxx | 15 +- Framework/Core/src/DataProcessor.cxx | 7 +- Framework/Core/src/Dispatcher.cxx | 4 +- Framework/Core/test/test_DataAllocator.cxx | 33 ++++- .../test/test_SimpleStatefulProcessing01.cxx | 4 +- Framework/Utils/src/DPLBroadcaster.cxx | 4 +- Framework/Utils/src/DPLGatherer.cxx | 4 +- Framework/Utils/src/DPLRouter.cxx | 4 +- Framework/Utils/test/DPLBroadcasterMerger.cxx | 8 +- Framework/Utils/test/DPLOutputTest.cxx | 6 +- 14 files changed, 192 insertions(+), 59 deletions(-) diff --git a/Detectors/TPC/workflow/src/ClusterDecoderRawSpec.cxx b/Detectors/TPC/workflow/src/ClusterDecoderRawSpec.cxx index f171dd04bb6db..dfd7aa446d24d 100644 --- a/Detectors/TPC/workflow/src/ClusterDecoderRawSpec.cxx +++ b/Detectors/TPC/workflow/src/ClusterDecoderRawSpec.cxx @@ -147,7 +147,7 @@ DataProcessorSpec getClusterDecoderRawSpec(bool sendMC) // containers are created for clusters and MC labels per (sector,globalPadRow) address char* outputBuffer = nullptr; auto outputAllocator = [&pc, &fanSpec, &outputBuffer, &rawHeaderStack](size_t size) -> char* { - outputBuffer = pc.outputs().newChunk(Output{ gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, Lifetime::Timeframe, std::move(rawHeaderStack) }, size).data; + outputBuffer = pc.outputs().newChunk(Output{ gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, Lifetime::Timeframe, std::move(rawHeaderStack) }, size).data(); return outputBuffer; }; std::vector mcoutList; diff --git a/Framework/Core/include/Framework/DataAllocator.h b/Framework/Core/include/Framework/DataAllocator.h index a830dfc3a3310..d7134eacffe18 100644 --- a/Framework/Core/include/Framework/DataAllocator.h +++ b/Framework/Core/include/Framework/DataAllocator.h @@ -78,11 +78,11 @@ class DataAllocator ContextRegistry* contextes, const AllowedOutputRoutes& routes); - DataChunk newChunk(const Output&, size_t); + DataChunk& newChunk(const Output&, size_t); - inline DataChunk newChunk(OutputRef&& ref, size_t size) { return newChunk(getOutputByBind(std::move(ref)), size); } + inline DataChunk& newChunk(OutputRef&& ref, size_t size) { return newChunk(getOutputByBind(std::move(ref)), size); } - DataChunk adoptChunk(const Output&, char*, size_t, fairmq_free_fn*, void*); + void adoptChunk(const Output&, char*, size_t, fairmq_free_fn*, void*); // In case no extra argument is provided and the passed type is trivially // copyable and non polymorphic, the most likely wanted behavior is to create @@ -91,12 +91,12 @@ class DataAllocator typename std::enable_if::value == true, T&>::type make(const Output& spec) { - DataChunk chunk = newChunk(spec, sizeof(T)); - return *reinterpret_cast(chunk.data); + return *reinterpret_cast(newChunk(spec, sizeof(T)).data()); } // In case an extra argument is provided, we consider this an array / // collection elements of that type + // FIXME: once the vector functionality with polymorphic allocator is fully in place, this might be dropped template typename std::enable_if::value == true, gsl::span&>::type make(const Output& spec, size_t nElements) diff --git a/Framework/Core/include/Framework/DataChunk.h b/Framework/Core/include/Framework/DataChunk.h index 208460d9dd405..27dc95a2deff4 100644 --- a/Framework/Core/include/Framework/DataChunk.h +++ b/Framework/Core/include/Framework/DataChunk.h @@ -10,17 +10,15 @@ #ifndef FRAMEWORK_DATACHUNK_H #define FRAMEWORK_DATACHUNK_H +#include "MemoryResources/MemoryResources.h" + namespace o2 { namespace framework { - -/// Simple struct to hold a pointer to the actual FairMQMessage. -/// In principle this could be an iovec... -struct DataChunk { - char *data; - size_t size; -}; +// FIXME: make sure that a DataChunk can not be copied or assigned, because the context returns the +// object by reference and we have to make sure that the code is using the reference instead of a copy +using DataChunk = std::vector>; } // namespace framework } // namespace o2 diff --git a/Framework/Core/include/Framework/MessageContext.h b/Framework/Core/include/Framework/MessageContext.h index 70ace6583ac9a..1d6383f88286a 100644 --- a/Framework/Core/include/Framework/MessageContext.h +++ b/Framework/Core/include/Framework/MessageContext.h @@ -12,6 +12,8 @@ #include "Framework/FairMQDeviceProxy.h" #include "Framework/TypeTraits.h" +#include "MemoryResources/MemoryResources.h" +#include "Headers/DataHeader.h" #include #include @@ -20,6 +22,7 @@ #include #include #include +#include class FairMQDevice; @@ -35,25 +38,56 @@ class MessageContext { { } - // thhis is the virtual interface for context objects + // this is the virtual interface for context objects class ContextObject { public: ContextObject() = default; ContextObject(FairMQMessagePtr&& headerMsg, FairMQMessagePtr&& payloadMsg, const std::string& bindingChannel) - : parts{}, channel{ bindingChannel } + : mParts{}, mChannel{ bindingChannel } { - parts.AddPart(std::move(headerMsg)); - parts.AddPart(std::move(payloadMsg)); + mParts.AddPart(std::move(headerMsg)); + mParts.AddPart(std::move(payloadMsg)); + } + ContextObject(FairMQMessagePtr&& headerMsg, const std::string& bindingChannel) + : mParts{}, mChannel{ bindingChannel } + { + mParts.AddPart(std::move(headerMsg)); } virtual ~ContextObject() = default; - // TODO: we keep this public for the moment to keep the current interface - // the send-handler loops over all objects and can acces the message parts - // directly, needs to be changed when the context is generalized, then the - // information needs to be stored in the derived class - FairMQParts parts; - std::string channel; + /// @brief Finalize the object and return the parts by move + /// This is the default method and can be overloaded by other implmentations to carry out other + /// tasks before returning the parts objects + virtual FairMQParts finalize() + { + FairMQParts parts = std::move(mParts); + assert(parts.Size() == 2); + auto* header = o2::header::get(parts.At(0)->GetData()); + if (header == nullptr) { + throw std::logic_error("No valid header message found"); + } else { + // o2::header::get returns const pointer, but here we can change the message + const_cast(header)->payloadSize = parts.At(1)->GetSize(); + } + // return value optimization returns by move + return parts; + } + + /// @brief return the channel name + const std::string& channel() const + { + return mChannel; + } + + bool empty() const + { + return mParts.Size() == 0; + } + + protected: + FairMQParts mParts; + std::string mChannel; }; /// TrivialObject handles a message object @@ -78,13 +112,85 @@ class MessageContext { auto* data() { - assert(parts.Size() == 2); - return parts[1].GetData(); + assert(mParts.Size() == 2); + return mParts[1].GetData(); } }; + /// VectorObject handles a message object holding std::vector with polymorphic_allocator + /// can not adopt an existing message, because the polymorphic_allocator will call the element constructor, + /// so this works only with new messages + template + class VectorObject : public ContextObject + { + public: + using value_type = T; + using return_type = std::vector>; + using buffer_type = return_type; + /// default contructor forbidden, object always has to control message instances + VectorObject() = delete; + /// constructor taking header message by move and creating the paypload message + template + VectorObject(ContextType* context, FairMQMessagePtr&& headerMsg, const std::string& bindingChannel, int index, size_t size) + : ContextObject(std::forward(headerMsg), bindingChannel), + // backup of initially allocated size + mAllocatedSize{ size * sizeof(value_type) }, + // the transport factory + mFactory{ context->proxy().getTransport(bindingChannel, index) }, + // the memory resource takes ownership of the message + mResource{ mFactory ? mFactory->GetMemoryResource() : nullptr }, + // create the vector with apropriate underlying memory resource for the message + mData{ size, pmr::polymorphic_allocator(mResource) } + { + // FIXME: drop this repeated check and make sure at initial setup of devices that everything is fine + // introduce error policy + if (mFactory == nullptr) { + throw std::runtime_error(std::string("failed to get transport factory for channel ") + bindingChannel); + } + if (mResource == nullptr) { + throw std::runtime_error(std::string("no memory resource for channel ") + bindingChannel); + } + } + ~VectorObject() override = default; + + /// @brief Finalize object and return parts by move + /// This retrieves the actual message from the vector object and moves it to the parts + FairMQParts finalize() final + { + assert(mParts.Size() == 1); + auto payloadMsg = o2::pmr::getMessage(std::move(mData)); + mParts.AddPart(std::move(payloadMsg)); + return ContextObject::finalize(); + } + + /// @brief return reference to the handled vector object + operator return_type&() + { + return mData; + } + + /// @brief return reference to the handled vector object + return_type& get() + { + return mData; + } + + /// @brief return data pointer of the handled vector object + value_type* data() + { + return mData.data(); + } + + private: + size_t mAllocatedSize; /// backup of initially allocated size + FairMQTransportFactory* mFactory = nullptr; /// pointer to transport factory + pmr::FairMQMemoryResource* mResource = nullptr; /// message resource + buffer_type mData; /// the data buffer + }; + // SpanObject creates a trivial binary object for an array of elements of // type T and holds a span over the elements + // FIXME: probably obsolete after introducing of vector with polymorphic_allocator template class SpanObject : public ContextObject { @@ -101,9 +207,9 @@ class MessageContext { // TODO: we probably also want to check consistency of the header message, i.e. payloadSize member auto payloadMsg = context->createMessage(bindingChannel, index, nElements * sizeof(T)); mValue = value_type(reinterpret_cast(payloadMsg->GetData()), nElements); - parts.AddPart(std::move(headerMsg)); - parts.AddPart(std::move(payloadMsg)); - channel = bindingChannel; + mParts.AddPart(std::move(headerMsg)); + mParts.AddPart(std::move(payloadMsg)); + mChannel = bindingChannel; } ~SpanObject() override = default; @@ -152,7 +258,7 @@ class MessageContext { { // Verify that everything has been sent on clear. for (auto &m : mMessages) { - assert(m->parts.Size() == 0); + assert(m->empty()); } mMessages.clear(); } diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index 6b608f5bf4f5f..6721a03ba54bf 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -61,8 +61,8 @@ DataAllocator::matchDataHeader(const Output& spec, size_t timeslice) { throw std::runtime_error(str.str()); } -DataChunk -DataAllocator::newChunk(const Output& spec, size_t size) { +DataChunk& DataAllocator::newChunk(const Output& spec, size_t size) +{ std::string channel = matchDataHeader(spec, mTimingInfo->timeslice); auto context = mContextRegistry->get(); @@ -70,12 +70,12 @@ DataAllocator::newChunk(const Output& spec, size_t size) { o2::header::gSerializationMethodNone, // size // ); - auto& co = context->add(std::move(headerMessage), channel, 0, size); - return DataChunk{ reinterpret_cast(co.data()), size }; + auto& co = context->add>(std::move(headerMessage), channel, 0, size); + return co; } -DataChunk -DataAllocator::adoptChunk(const Output& spec, char *buffer, size_t size, fairmq_free_fn *freefn, void *hint = nullptr) { +void DataAllocator::adoptChunk(const Output& spec, char* buffer, size_t size, fairmq_free_fn* freefn, void* hint = nullptr) +{ // Find a matching channel, create a new message for it and put it in the // queue to be sent at the end of the processing std::string channel = matchDataHeader(spec, mTimingInfo->timeslice); @@ -87,8 +87,7 @@ DataAllocator::adoptChunk(const Output& spec, char *buffer, size_t size, fairmq_ // FIXME: how do we want to use subchannels? time based parallelism? auto context = mContextRegistry->get(); - auto& co = context->add(std::move(headerMessage), channel, 0, buffer, size, freefn, hint); - return DataChunk{ reinterpret_cast(co.data()), size }; + context->add(std::move(headerMessage), channel, 0, buffer, size, freefn, hint); } FairMQMessagePtr DataAllocator::headerMessageFromOutput(Output const& spec, // diff --git a/Framework/Core/src/DataProcessor.cxx b/Framework/Core/src/DataProcessor.cxx index e5a2c67643d1c..1e77bf741236c 100644 --- a/Framework/Core/src/DataProcessor.cxx +++ b/Framework/Core/src/DataProcessor.cxx @@ -34,11 +34,10 @@ namespace framework void DataProcessor::doSend(FairMQDevice &device, MessageContext &context) { for (auto &message : context) { // monitoringService.send({ message->parts.Size(), "outputs/total" }); - assert(message->parts.Size() == 2); - FairMQParts parts = std::move(message->parts); - assert(message->parts.Size() == 0); + FairMQParts parts = std::move(message->finalize()); + assert(message->empty()); assert(parts.Size() == 2); - device.Send(parts, message->channel, 0); + device.Send(parts, message->channel(), 0); assert(parts.Size() == 2); } } diff --git a/Framework/Core/src/Dispatcher.cxx b/Framework/Core/src/Dispatcher.cxx index e61d6b8b35443..97fdf967553a0 100644 --- a/Framework/Core/src/Dispatcher.cxx +++ b/Framework/Core/src/Dispatcher.cxx @@ -84,8 +84,8 @@ void Dispatcher::send(DataAllocator& dataAllocator, const DataRef& inputData, co dataAllocator.adopt(output, DataRefUtils::as(inputData).release()); } else { // POD // todo: do it non-copy, when API is available - auto outputMessage = dataAllocator.newChunk(output, inputHeader->payloadSize); - memcpy(outputMessage.data, inputData.payload, inputHeader->payloadSize); + auto& outputMessage = dataAllocator.newChunk(output, inputHeader->payloadSize); + memcpy(outputMessage.data(), inputData.payload, inputHeader->payloadSize); } } diff --git a/Framework/Core/test/test_DataAllocator.cxx b/Framework/Core/test/test_DataAllocator.cxx index eb33585551184..ded144610f988 100644 --- a/Framework/Core/test/test_DataAllocator.cxx +++ b/Framework/Core/test/test_DataAllocator.cxx @@ -107,6 +107,14 @@ DataProcessorSpec getSourceSpec() auto freefct = [](void* data, void* hint) {}; // simply ignore the cleanup for the test static std::string teststring = "adoptchunk"; pc.outputs().adoptChunk(Output{ "TST", "ADOPTCHUNK", 0, Lifetime::Timeframe }, teststring.data(), teststring.length(), freefct, nullptr); + // test resizable data chunk, initial size 0 and grow + auto& growchunk = pc.outputs().newChunk(OutputRef{ "growchunk", 0 }, 0); + growchunk.resize(sizeof(o2::test::TriviallyCopyable)); + memcpy(growchunk.data(), &a, sizeof(o2::test::TriviallyCopyable)); + // test resizable data chunk, large initial size and shrink + auto& shrinkchunk = pc.outputs().newChunk(OutputRef{ "shrinkchunk", 0 }, 1000000); + shrinkchunk.resize(sizeof(o2::test::TriviallyCopyable)); + memcpy(shrinkchunk.data(), &a, sizeof(o2::test::TriviallyCopyable)); }; return DataProcessorSpec{ "source", // name of the processor @@ -114,6 +122,8 @@ DataProcessorSpec getSourceSpec() { OutputSpec{ "TST", "MESSAGEABLE", 0, Lifetime::Timeframe }, OutputSpec{ { "makesingle" }, "TST", "MAKESINGLE", 0, Lifetime::Timeframe }, OutputSpec{ { "makespan" }, "TST", "MAKESPAN", 0, Lifetime::Timeframe }, + OutputSpec{ { "growchunk" }, "TST", "GROWCHUNK", 0, Lifetime::Timeframe }, + OutputSpec{ { "shrinkchunk" }, "TST", "SHRINKCHUNK", 0, Lifetime::Timeframe }, OutputSpec{ "TST", "ADOPTCHUNK", 0, Lifetime::Timeframe }, OutputSpec{ "TST", "MSGBLEROOTSRLZ", 0, Lifetime::Timeframe }, OutputSpec{ "TST", "ROOTNONTOBJECT", 0, Lifetime::Timeframe }, @@ -143,8 +153,10 @@ DataProcessorSpec getSinkSpec() dumpStack(dh); } // plain, unserialized object in input1 channel + LOG(INFO) << "extracting o2::test::TriviallyCopyable from input1"; auto object1 = pc.inputs().get("input1"); ASSERT_ERROR(object1 == o2::test::TriviallyCopyable(42, 23, 0xdead)); + LOG(INFO) << "extracting span of o2::test::TriviallyCopyable from input1"; auto object1span = pc.inputs().get>("input1"); ASSERT_ERROR(object1span.size() == 1); ASSERT_ERROR(sizeof(typename decltype(object1span)::value_type) == sizeof(o2::test::TriviallyCopyable)); @@ -157,50 +169,67 @@ DataProcessorSpec getSinkSpec() ASSERT_ERROR(metaHeader2 != nullptr && metaHeader2->secret == 23); // ROOT-serialized messageable object in input2 channel + LOG(INFO) << "extracting o2::test::TriviallyCopyable pointer from input2"; auto object2 = pc.inputs().get("input2"); ASSERT_ERROR(object2 != nullptr); ASSERT_ERROR(*object2 == o2::test::TriviallyCopyable(42, 23, 0xdead)); // ROOT-serialized, non-messageable object in input3 channel + LOG(INFO) << "extracting o2::test::Polymorphic pointer from input3"; auto object3 = pc.inputs().get("input3"); ASSERT_ERROR(object3 != nullptr); ASSERT_ERROR(*object3 == o2::test::Polymorphic(0xbeef)); // container of objects + LOG(INFO) << "extracting vector of o2::test::Polymorphic from input4"; auto object4 = pc.inputs().get>("input4"); ASSERT_ERROR(object4.size() == 2); ASSERT_ERROR(object4[0] == o2::test::Polymorphic(0xaffe)); ASSERT_ERROR(object4[1] == o2::test::Polymorphic(0xd00f)); // container of objects + LOG(INFO) << "extracting vector of o2::test::Polymorphic from input5"; auto object5 = pc.inputs().get>("input5"); ASSERT_ERROR(object5.size() == 2); ASSERT_ERROR(object5[0] == o2::test::Polymorphic(0xaffe)); ASSERT_ERROR(object5[1] == o2::test::Polymorphic(0xd00f)); // container of objects + LOG(INFO) << "extracting vector of o2::test::Polymorphic from input6"; auto object6 = pc.inputs().get>("input6"); ASSERT_ERROR(object6.size() == 2); ASSERT_ERROR(object6[0] == o2::test::Polymorphic(0xaffe)); ASSERT_ERROR(object6[1] == o2::test::Polymorphic(0xd00f)); // checking retrieving buffer as raw char*, and checking content by cast + LOG(INFO) << "extracting raw char* from input1"; auto rawchar = pc.inputs().get("input1"); const auto& data1 = *reinterpret_cast(rawchar); ASSERT_ERROR(data1 == o2::test::TriviallyCopyable(42, 23, 0xdead)); + LOG(INFO) << "extracting o2::test::TriviallyCopyable from input7"; auto object7 = pc.inputs().get("input7"); ASSERT_ERROR(object1 == o2::test::TriviallyCopyable(42, 23, 0xdead)); + LOG(INFO) << "extracting span of o2::test::TriviallyCopyable from input8"; auto objectspan8 = DataRefUtils::as(pc.inputs().get("input8")); ASSERT_ERROR(objectspan8.size() == 3); for (auto const& object8 : objectspan8) { ASSERT_ERROR(object8 == o2::test::TriviallyCopyable(42, 23, 0xdead)); } + LOG(INFO) << "extracting std::string from input9"; auto object9 = pc.inputs().get("input9"); ASSERT_ERROR(object9 == "adoptchunk"); + LOG(INFO) << "extracting o2::test::TriviallyCopyable from input10"; + auto object10 = pc.inputs().get("input10"); + ASSERT_ERROR(object10 == o2::test::TriviallyCopyable(42, 23, 0xdead)); + + LOG(INFO) << "extracting o2::test::TriviallyCopyable from input11"; + auto object11 = pc.inputs().get("input11"); + ASSERT_ERROR(object11 == o2::test::TriviallyCopyable(42, 23, 0xdead)); + pc.services().get().readyToQuit(true); }; @@ -213,7 +242,9 @@ DataProcessorSpec getSinkSpec() InputSpec{ "input6", "TST", "ROOTSERLZDVEC2", 0, Lifetime::Timeframe }, InputSpec{ "input7", "TST", "MAKESINGLE", 0, Lifetime::Timeframe }, InputSpec{ "input8", "TST", "MAKESPAN", 0, Lifetime::Timeframe }, - InputSpec{ "input9", "TST", "ADOPTCHUNK", 0, Lifetime::Timeframe } }, + InputSpec{ "input9", "TST", "ADOPTCHUNK", 0, Lifetime::Timeframe }, + InputSpec{ "input10", "TST", "GROWCHUNK", 0, Lifetime::Timeframe }, + InputSpec{ "input11", "TST", "SHRINKCHUNK", 0, Lifetime::Timeframe } }, Outputs{}, AlgorithmSpec(processingFct) }; } diff --git a/Framework/Core/test/test_SimpleStatefulProcessing01.cxx b/Framework/Core/test/test_SimpleStatefulProcessing01.cxx index 50283a314dea8..17b1eb054e7e3 100644 --- a/Framework/Core/test/test_SimpleStatefulProcessing01.cxx +++ b/Framework/Core/test/test_SimpleStatefulProcessing01.cxx @@ -52,8 +52,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const&) { callbacks.set(CallbackService::Id::Stop, stopcb); callbacks.set(CallbackService::Id::Reset, resetcb); return adaptStateless([](DataAllocator& outputs) { - auto out = outputs.newChunk({ "TES", "STATEFUL", 0 }, sizeof(int)); - auto outI = reinterpret_cast(out.data); + auto& out = outputs.newChunk({ "TES", "STATEFUL", 0 }, sizeof(int)); + auto outI = reinterpret_cast(out.data()); outI[0] = foo++; }); }) // diff --git a/Framework/Utils/src/DPLBroadcaster.cxx b/Framework/Utils/src/DPLBroadcaster.cxx index 95679f1cac78c..d29b272036e4e 100644 --- a/Framework/Utils/src/DPLBroadcaster.cxx +++ b/Framework/Utils/src/DPLBroadcaster.cxx @@ -47,8 +47,8 @@ o2f::DataProcessorSpec defineBroadcaster(std::string devName, o2f::InputSpec usr auto msgSize = (*funcPtr)(inputMsg); // Iterating over the OutputSpecs to push the input message to all the output destinations for (const auto& itOutputs : (*outputsPtr)) { - auto fwdMsg = ctx.outputs().newChunk(itOutputs, msgSize); - std::memcpy(fwdMsg.data, inputMsg.payload, msgSize); + auto& fwdMsg = ctx.outputs().newChunk(itOutputs, msgSize); + std::memcpy(fwdMsg.data(), inputMsg.payload, msgSize); } }; } } }; diff --git a/Framework/Utils/src/DPLGatherer.cxx b/Framework/Utils/src/DPLGatherer.cxx index f0057a025b807..e8bbff5a53568 100644 --- a/Framework/Utils/src/DPLGatherer.cxx +++ b/Framework/Utils/src/DPLGatherer.cxx @@ -42,9 +42,9 @@ o2f::DataProcessorSpec defineGatherer(std::string devName, o2f::Inputs usrInputs // Retrieving message size from API auto msgSize = (o2::header::get(itInputs.header))->payloadSize; // Allocating new chunk - auto fwdMsg = ctx.outputs().newChunk((*outputPtr), msgSize); + auto& fwdMsg = ctx.outputs().newChunk((*outputPtr), msgSize); // Moving the input to the output chunk - std::memmove(fwdMsg.data, itInputs.payload, msgSize); + std::memmove(fwdMsg.data(), itInputs.payload, msgSize); } }; } } }; diff --git a/Framework/Utils/src/DPLRouter.cxx b/Framework/Utils/src/DPLRouter.cxx index 278c2a67279ae..f485eb83a2afb 100644 --- a/Framework/Utils/src/DPLRouter.cxx +++ b/Framework/Utils/src/DPLRouter.cxx @@ -42,8 +42,8 @@ o2f::DataProcessorSpec defineRouter(std::string devName, o2f::Inputs usrInput, o auto msgSize = (o2::header::get(inputMsg.header))->payloadSize; auto& outputCh = (*outputsPtr)[(*mappingFuncPtr)(inputMsg)]; - auto fwdMsg = ctx.outputs().newChunk(outputCh, msgSize); - std::memcpy(fwdMsg.data, inputMsg.payload, msgSize); + auto& fwdMsg = ctx.outputs().newChunk(outputCh, msgSize); + std::memcpy(fwdMsg.data(), inputMsg.payload, msgSize); }; } } }; } diff --git a/Framework/Utils/test/DPLBroadcasterMerger.cxx b/Framework/Utils/test/DPLBroadcasterMerger.cxx index 8b75178f2a3af..067efec2ac022 100644 --- a/Framework/Utils/test/DPLBroadcasterMerger.cxx +++ b/Framework/Utils/test/DPLBroadcasterMerger.cxx @@ -49,12 +49,12 @@ o2f::DataProcessorSpec defineGenerator(o2f::OutputSpec usrOutput) LOG(INFO) << ">>> Preparing MSG:" << msgIndex; - auto outputMsg = + auto& outputMsg = ctx.outputs().newChunk(*usrOutput_shptr, (msgIndex + 1) * sizeof(uint32_t) / sizeof(char)); LOG(INFO) << ">>> Preparing1 MSG:" << msgIndex; - auto payload = reinterpret_cast(outputMsg.data); + auto payload = reinterpret_cast(outputMsg.data()); payload[0] = msgIndex; @@ -82,8 +82,8 @@ o2f::DataProcessorSpec definePipeline(std::string devName, o2f::InputSpec usrInp auto inputMsg = ctx.inputs().getByPos(0); auto msgSize = (o2::header::get(inputMsg.header))->payloadSize; - auto fwdMsg = ctx.outputs().newChunk((*output_sharedptr), msgSize); - std::memcpy(fwdMsg.data, inputMsg.payload, msgSize); + auto& fwdMsg = ctx.outputs().newChunk((*output_sharedptr), msgSize); + std::memcpy(fwdMsg.data(), inputMsg.payload, msgSize); }; } } }; } diff --git a/Framework/Utils/test/DPLOutputTest.cxx b/Framework/Utils/test/DPLOutputTest.cxx index 59699c625b35e..af87b983fb038 100644 --- a/Framework/Utils/test/DPLOutputTest.cxx +++ b/Framework/Utils/test/DPLOutputTest.cxx @@ -43,12 +43,12 @@ o2f::DataProcessorSpec defineTestGenerator() LOG(INFO) << ">>> Preparing MSG:" << msgIndex; - auto outputMsg = ctx.outputs().newChunk({ "TST", "ToSink", 0, o2f::Lifetime::Timeframe }, - (31 + 1) * sizeof(uint32_t) / sizeof(char)); + auto& outputMsg = ctx.outputs().newChunk({ "TST", "ToSink", 0, o2f::Lifetime::Timeframe }, + (31 + 1) * sizeof(uint32_t) / sizeof(char)); LOG(INFO) << ">>> Preparing1 MSG:" << msgIndex; - auto payload = reinterpret_cast(outputMsg.data); + auto payload = reinterpret_cast(outputMsg.data()); payload[0] = msgIndex; From 5fb8d96f88f4a1c27ed74454bdf87c05ef0f6a92 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 10 Apr 2019 09:38:18 +0200 Subject: [PATCH 2/2] Forbid copy constructor and assignment for DataChunk Make DataChunk a class deriving from std::vector instead of an alias. This allows to delete the copy constructor and assignment operator. DPL's output allocator returns the created DataChunk by reference and thus it's important not to make a copy in the code. Now it's ensured to detect this at compile time. --- Framework/Core/include/Framework/DataChunk.h | 29 +++++++++++++--- .../Core/include/Framework/MessageContext.h | 34 ++++++++++++++----- Framework/Core/src/DataAllocator.cxx | 2 +- 3 files changed, 51 insertions(+), 14 deletions(-) diff --git a/Framework/Core/include/Framework/DataChunk.h b/Framework/Core/include/Framework/DataChunk.h index 27dc95a2deff4..9e172029a99ca 100644 --- a/Framework/Core/include/Framework/DataChunk.h +++ b/Framework/Core/include/Framework/DataChunk.h @@ -16,11 +16,32 @@ namespace o2 { namespace framework { -// FIXME: make sure that a DataChunk can not be copied or assigned, because the context returns the -// object by reference and we have to make sure that the code is using the reference instead of a copy -using DataChunk = std::vector>; +/// @class DataChunk A resizable buffer used with DPL's DataAllocator +/// DataChunk derives from std::vector with polymorphic allocator and forbids copying, the underlying +/// buffer is of type char and is through DPL and polymorphic memory resource directly allocated in the +/// message memory. +/// Since MessageContext returns the object by reference, the forbidden copy and assignment makes sure that +/// the code can not accidentally use a copy instead reference. +class DataChunk : public std::vector> +{ + public: + // FIXME: want to have a general forwarding, but then the copy constructor is not deleted any more despite + // it's declared deleted + //template + //DataChunk(T&& arg, Args&&... args) : std::vector>(std::forward(args)...) + //{ + //} + + // DataChunk is special and for the moment it's enough to declare the constructor with size and allocator + DataChunk(size_t size, const o2::pmr::polymorphic_allocator& allocator) : std::vector>(size, allocator) + { + } + DataChunk(const DataChunk&) = delete; + DataChunk& operator=(const DataChunk&) = delete; + DataChunk(DataChunk&&) = default; + DataChunk& operator=(DataChunk&&) = default; +}; } // namespace framework } // namespace o2 - #endif // FRAMEWORK_DATACHUNK_H diff --git a/Framework/Core/include/Framework/MessageContext.h b/Framework/Core/include/Framework/MessageContext.h index 1d6383f88286a..73bbf3f3c79b6 100644 --- a/Framework/Core/include/Framework/MessageContext.h +++ b/Framework/Core/include/Framework/MessageContext.h @@ -117,21 +117,24 @@ class MessageContext { } }; - /// VectorObject handles a message object holding std::vector with polymorphic_allocator - /// can not adopt an existing message, because the polymorphic_allocator will call the element constructor, + /// ContainerRefObject handles a message object holding an instance of type T + /// The allocator type is required to be o2::pmr::polymorphic_allocator + /// can not adopt an existing message, because the polymorphic_allocator will call type constructor, /// so this works only with new messages + /// FIXME: not sure if we want to have this for all container types template - class VectorObject : public ContextObject + class ContainerRefObject : public ContextObject { public: - using value_type = T; - using return_type = std::vector>; + using value_type = typename T::value_type; + using return_type = T; using buffer_type = return_type; + static_assert(std::is_same>::value, "container must have polymorphic allocator"); /// default contructor forbidden, object always has to control message instances - VectorObject() = delete; + ContainerRefObject() = delete; /// constructor taking header message by move and creating the paypload message - template - VectorObject(ContextType* context, FairMQMessagePtr&& headerMsg, const std::string& bindingChannel, int index, size_t size) + template + ContainerRefObject(ContextType* context, FairMQMessagePtr&& headerMsg, const std::string& bindingChannel, int index, size_t size) : ContextObject(std::forward(headerMsg), bindingChannel), // backup of initially allocated size mAllocatedSize{ size * sizeof(value_type) }, @@ -151,7 +154,7 @@ class MessageContext { throw std::runtime_error(std::string("no memory resource for channel ") + bindingChannel); } } - ~VectorObject() override = default; + ~ContainerRefObject() override = default; /// @brief Finalize object and return parts by move /// This retrieves the actual message from the vector object and moves it to the parts @@ -188,6 +191,19 @@ class MessageContext { buffer_type mData; /// the data buffer }; + /// VectorObject handles a message object holding std::vector with polymorphic_allocator + /// can not adopt an existing message, because the polymorphic_allocator will call the element constructor, + /// so this works only with new messages + template >>> + class VectorObject : public _BASE + { + public: + template + VectorObject(Args&&... args) : _BASE(std::forward(args)...) + { + } + }; + // SpanObject creates a trivial binary object for an array of elements of // type T and holds a span over the elements // FIXME: probably obsolete after introducing of vector with polymorphic_allocator diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index 6721a03ba54bf..76371ce3274d4 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -70,7 +70,7 @@ DataChunk& DataAllocator::newChunk(const Output& spec, size_t size) o2::header::gSerializationMethodNone, // size // ); - auto& co = context->add>(std::move(headerMessage), channel, 0, size); + auto& co = context->add>(std::move(headerMessage), channel, 0, size); return co; }