Skip to content

Commit

Permalink
Adopt FairMQMessage backed memory resource collection from AliceO2
Browse files Browse the repository at this point in the history
Add a pmr interface to FairMQTransportFactory

refactor

Port the unit tests for MemoryResources

clang format
  • Loading branch information
mkrzewic authored and dennisklein committed Oct 31, 2018
1 parent 919193a commit 310b964
Show file tree
Hide file tree
Showing 9 changed files with 709 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ find_package(Threads REQUIRED)

if(BUILD_FAIRMQ)
find_package2(PUBLIC Boost VERSION 1.64 REQUIRED
COMPONENTS program_options thread system filesystem regex date_time signals
COMPONENTS container program_options thread system filesystem regex date_time signals
)
find_package2(PUBLIC FairLogger VERSION 1.2.0 REQUIRED)
find_package2(PRIVATE ZeroMQ VERSION 4.1.5 REQUIRED)
Expand Down
4 changes: 4 additions & 0 deletions fairmq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ set(FAIRMQ_PUBLIC_HEADER_FILES
FairMQSocket.h
FairMQStateMachine.h
FairMQTransportFactory.h
MemoryResources.h
MemoryResourceTools.h
Tools.h
Transports.h
options/FairMQProgOptions.h
Expand Down Expand Up @@ -155,6 +157,7 @@ set(FAIRMQ_SOURCE_FILES
zeromq/FairMQUnmanagedRegionZMQ.cxx
zeromq/FairMQSocketZMQ.cxx
zeromq/FairMQTransportFactoryZMQ.cxx
MemoryResources.cxx
)

if(BUILD_NANOMSG_TRANSPORT)
Expand Down Expand Up @@ -232,6 +235,7 @@ endif()

target_link_libraries(${_target}
INTERFACE # only consumers link against interface dependencies
Boost::container

PUBLIC # libFairMQ AND consumers of libFairMQ link aginst public dependencies
Threads::Threads
Expand Down
11 changes: 9 additions & 2 deletions fairmq/FairMQTransportFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
#ifndef FAIRMQTRANSPORTFACTORY_H_
#define FAIRMQTRANSPORTFACTORY_H_

#include <FairMQLogger.h>
#include <FairMQMessage.h>
#include <FairMQSocket.h>
#include <FairMQPoller.h>
#include <FairMQSocket.h>
#include <FairMQUnmanagedRegion.h>
#include <FairMQLogger.h>
#include <fairmq/MemoryResources.h>
#include <fairmq/Transports.h>

#include <string>
Expand All @@ -30,13 +31,19 @@ class FairMQTransportFactory
/// Topology wide unique id
const std::string fkId;

/// The polymorphic memory resource associated with the transport
fair::mq::ChannelResource fMemoryResource{this};

public:
/// ctor
/// @param id Topology wide unique id, usually the device id.
FairMQTransportFactory(const std::string& id);

auto GetId() const -> const std::string { return fkId; };

/// Get a pointer to the associated polymorphic memory resource
fair::mq::ChannelResource* GetMemoryResource() { return &fMemoryResource; }

/// @brief Create empty FairMQMessage
/// @return pointer to FairMQMessage
virtual FairMQMessagePtr CreateMessage() const = 0;
Expand Down
119 changes: 119 additions & 0 deletions fairmq/MemoryResourceTools.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/********************************************************************************
* Copyright (C) 2018 CERN and copyright holders of ALICE O2 *
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/

/// @brief Tools for interfacing containers to the transport via polymorphic
/// allocators
///
/// @author Mikolaj Krzewicki, mkrzewic@cern.ch

#include <fairmq/FairMQTransportFactory.h>
#include <fairmq/MemoryResources.h>

namespace fair {
namespace mq {

using ByteSpectatorAllocator = SpectatorAllocator<fair::mq::byte>;
using BytePmrAllocator = boost::container::pmr::polymorphic_allocator<fair::mq::byte>;

//_________________________________________________________________________________________________
// return the message associated with the container or nullptr if it does not
// make sense (e.g. when
// we are just watching an existing message or when the container is not using
// FairMQMemoryResource
// as backend).
template<typename ContainerT>
// typename std::enable_if<
// std::is_base_of<
// boost::container::pmr::polymorphic_allocator<typename
// ContainerT::value_type>,
// typename ContainerT::allocator_type>::value == true,
// FairMQMessagePtr>::type
FairMQMessagePtr getMessage(ContainerT &&container_, FairMQMemoryResource *targetResource = nullptr)
{
auto container = std::move(container_);
auto alloc = container.get_allocator();

auto resource = dynamic_cast<FairMQMemoryResource *>(alloc.resource());
if (!resource && !targetResource) {
throw std::runtime_error("Neither the container or target resource specified");
}
size_t containerSizeBytes = container.size() * sizeof(typename ContainerT::value_type);
if ((!targetResource && resource)
|| (resource && targetResource && resource->is_equal(*targetResource))) {
auto message = resource->getMessage(static_cast<void *>(
const_cast<typename std::remove_const<typename ContainerT::value_type>::type *>(
container.data())));
if (message)
message->SetUsedSize(containerSizeBytes);
return std::move(message);
} else {
auto message = targetResource->getTransportFactory()->CreateMessage(containerSizeBytes);
std::memcpy(static_cast<fair::mq::byte *>(message->GetData()),
container.data(),
containerSizeBytes);
return std::move(message);
}
};

//_________________________________________________________________________________________________
/// Return a vector of const ElemT, resource must be kept alive throughout the
/// lifetime of the
/// container and associated message.
template<typename ElemT>
std::vector<const ElemT, boost::container::pmr::polymorphic_allocator<const ElemT>> adoptVector(
size_t nelem,
SpectatorMessageResource *resource)
{
return std::vector<const ElemT, SpectatorAllocator<const ElemT>>(
nelem, SpectatorAllocator<ElemT>(resource));
};

//_________________________________________________________________________________________________
/// Return a vector of const ElemT, takes ownership of the message, needs an
/// upstream global
/// ChannelResource to register the message.
template<typename ElemT>
std::vector<const ElemT, OwningMessageSpectatorAllocator<const ElemT>>
adoptVector(size_t nelem, ChannelResource *upstream, FairMQMessagePtr message)
{
return std::vector<const ElemT, OwningMessageSpectatorAllocator<const ElemT>>(
nelem,
OwningMessageSpectatorAllocator<const ElemT>(
MessageResource{std::move(message), upstream}));
};

//_________________________________________________________________________________________________
// TODO: this is C++14, converting it down to C++11 is too much work atm
// This returns a unique_ptr of const vector, does not allow modifications at
// the cost of pointer
// semantics for access.
// use auto or decltype to catch the return value (or use span)
// template<typename ElemT>
// auto adoptVector(size_t nelem, FairMQMessage* message)
//{
// using DataType = std::vector<ElemT, ByteSpectatorAllocator>;
//
// struct doubleDeleter
// {
// // kids: don't do this at home! (but here it's OK)
// // this stateful deleter allows a single unique_ptr to manage 2
// resources at the same time.
// std::unique_ptr<SpectatorMessageResource> extra;
// void operator()(const DataType* ptr) { delete ptr; }
// };
//
// using OutputType = std::unique_ptr<const DataType, doubleDeleter>;
//
// auto resource = std::make_unique<SpectatorMessageResource>(message);
// auto output = new DataType(nelem, ByteSpectatorAllocator{resource.get()});
// return OutputType(output, doubleDeleter{std::move(resource)});
//}

} /* namespace mq */
} /* namespace fair */
25 changes: 25 additions & 0 deletions fairmq/MemoryResources.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/********************************************************************************
* Copyright (C) 2018 CERN and copyright holders of ALICE O2 *
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/

/// @brief Memory allocators and interfaces related to managing memory via the
/// trasport layer
///
/// @author Mikolaj Krzewicki, mkrzewic@cern.ch

#include <fairmq/FairMQTransportFactory.h>
#include <fairmq/MemoryResources.h>

void *fair::mq::ChannelResource::do_allocate(std::size_t bytes, std::size_t /*alignment*/)
{
FairMQMessagePtr message;
message = factory->CreateMessage(bytes);
void *addr = message->GetData();
messageMap[addr] = std::move(message);
return addr;
};
Loading

0 comments on commit 310b964

Please sign in to comment.