diff --git a/docs/Transport.md b/docs/Transport.md index 29c28db15..69347861b 100644 --- a/docs/Transport.md +++ b/docs/Transport.md @@ -27,34 +27,34 @@ The next table shows the supported address types for each transport implementati ## 2.1 Message -Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content. Message can be initialized in three different ways by calling `NewMessage()`: +Devices transport data between each other in form of `fair::mq::Message`s. These can be filled with arbitrary content. Message can be initialized in three different ways by calling `NewMessage()`: ```cpp -FairMQMessagePtr NewMessage() const; +fair::mq::MessagePtr NewMessage() const; ``` **with no parameters**: Initializes an empty message (typically used for receiving). ```cpp -FairMQMessagePtr NewMessage(const size_t size) const; +fair::mq::MessagePtr NewMessage(const size_t size) const; ``` **given message size**: Initializes message body with a given size. Fill the created contents via buffer pointer. ```cpp using fairmq_free_fn = void(void* data, void* hint); -FairMQMessagePtr NewMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const; +fair::mq::MessagePtr NewMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const; ``` **given existing buffer and a size**: Initialize the message from an existing buffer. In case of ZeroMQ this is a zero-copy operation. Additionally, FairMQ provides two more message factories for convenience: ```cpp template -FairMQMessagePtr NewSimpleMessage(const T& data) const +fair::mq::MessagePtr NewSimpleMessage(const T& data) const ``` **copy and own**: Copy the `data` argument into the returned message and take ownership (free memory after message is sent). This interface is useful for small, [trivially copyable](http://en.cppreference.com/w/cpp/concept/TriviallyCopyable) data. ```cpp template -FairMQMessagePtr NewStaticMessage(const T& data) const +fair::mq::MessagePtr NewStaticMessage(const T& data) const ``` **point to existing memory**: The returned message will point to the `data` argument, but not take ownership (someone else must destruct this variable). Make sure that `data` lives long enough to be successfully sent. This interface is most useful for third party managed, contiguous memory (Be aware of shallow types with internal pointer references! These will not be sent.) @@ -65,19 +65,19 @@ The component of a program, that is reponsible for the allocation or destruction After queuing a message for sending in FairMQ, the transport takes ownership over the message body and will free it with `free()` after it is no longer used. A callback can be passed to the message object, to be called instead of the destruction with `free()` (for initialization via buffer+size). ```cpp -static void FairMQNoCleanup(void* /*data*/, void* /*obj*/) {} +static void fair::mq::NoCleanup(void* /*data*/, void* /*obj*/) {} template -static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj) { delete static_cast(obj); } +static void fair::mq::SimpleMsgCleanup(void* /*data*/, void* obj) { delete static_cast(obj); } ``` -For convenience, two common deleter callbacks are already defined in the `FairMQTransportFactory` class to aid the user in controlling ownership of the data. +For convenience, two common deleter callbacks are already defined in the `fair::mq::TransportFactory` class to aid the user in controlling ownership of the data. ## 2.2 Channel A channel represents a communication endpoint in FairMQ. Usage is similar to a traditional Unix network socket. A device usually contains a number of channels that can either listen for incoming connections from channels of other devices or they can connect to other listening channels. Channels are organized by a channel name and a subchannel index. ```cpp -const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const; +const fair::mq::Channel& GetChannel(const std::string& channelName, const int index = 0) const; ``` All subchannels with a common channel name need to be of the same transport type. @@ -87,7 +87,7 @@ All subchannels with a common channel name need to be of the same transport type A poller allows to wait on multiple channels either to receive or send a message. ```cpp -FairMQPollerPtr NewPoller(const std::vector& channels) +fair::mq::PollerPtr NewPoller(const std::vector& channels) ``` **list channels**: This poller waits on all supplied channels. Currently, it is limited to channels of the same transport type only. diff --git a/examples/readout/receiver.cxx b/examples/readout/receiver.cxx index 8a82f3f2e..59df99baf 100644 --- a/examples/readout/receiver.cxx +++ b/examples/readout/receiver.cxx @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -8,10 +8,15 @@ #include #include +#include +using namespace std; +using namespace fair::mq; namespace bpo = boost::program_options; -struct Receiver : fair::mq::Device +namespace { + +struct Receiver : Device { void InitTask() override { @@ -21,15 +26,14 @@ struct Receiver : fair::mq::Device void Run() override { - FairMQChannel& dataInChannel = fChannels.at("sr").at(0); + Channel& dataInChannel = fChannels.at("sr").at(0); while (!NewStatePending()) { - FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); + auto msg(dataInChannel.NewMessage()); dataInChannel.Receive(msg); - // void* ptr = msg->GetData(); if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { - LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state."; break; } } @@ -40,13 +44,14 @@ struct Receiver : fair::mq::Device uint64_t fNumIterations = 0; }; +} // namespace + void addCustomOptions(bpo::options_description& options) { - options.add_options() - ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); + options.add_options()( + "max-iterations", + bpo::value()->default_value(0), + "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } -std::unique_ptr getDevice(fair::mq::ProgOptions& /*config*/) -{ - return std::make_unique(); -} +unique_ptr getDevice(ProgOptions& /*config*/) { return make_unique(); } diff --git a/examples/region/sink.cxx b/examples/region/sink.cxx index ee942930d..c271dc74a 100644 --- a/examples/region/sink.cxx +++ b/examples/region/sink.cxx @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -8,30 +8,34 @@ #include #include +#include namespace bpo = boost::program_options; +using namespace std; +using namespace fair::mq; -struct Sink : fair::mq::Device +namespace { + +struct Sink : Device { void InitTask() override { // Get the fMaxIterations value from the command line options (via fConfig) fMaxIterations = fConfig->GetProperty("max-iterations"); - fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { + fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](RegionInfo info) { LOG(info) << "Region event: " << info.event << ": " - << (info.managed ? "managed" : "unmanaged") - << ", id: " << info.id - << ", ptr: " << info.ptr - << ", size: " << info.size - << ", flags: " << info.flags; + << (info.managed ? "managed" : "unmanaged") << ", id: " << info.id + << ", ptr: " << info.ptr << ", size: " << info.size + << ", flags: " << info.flags; }); } + void Run() override { - FairMQChannel& dataInChannel = fChannels.at("data").at(0); + Channel& dataInChannel = fChannels.at("data").at(0); while (!NewStatePending()) { - FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); + auto msg(dataInChannel.Transport()->CreateMessage()); dataInChannel.Receive(msg); // void* ptr = msg->GetData(); @@ -39,11 +43,12 @@ struct Sink : fair::mq::Device // LOG(info) << "check: " << cptr[3]; if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { - LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; + LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state."; break; } } } + void ResetTask() override { fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents(); @@ -54,14 +59,14 @@ struct Sink : fair::mq::Device uint64_t fNumIterations = 0; }; +} // namespace void addCustomOptions(bpo::options_description& options) { - options.add_options() - ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); + options.add_options()( + "max-iterations", + bpo::value()->default_value(0), + "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } -std::unique_ptr getDevice(fair::mq::ProgOptions& /*config*/) -{ - return std::make_unique(); -} +unique_ptr getDevice(ProgOptions& /*config*/) { return make_unique(); } diff --git a/fairmq/JSONParser.cxx b/fairmq/JSONParser.cxx index dc6c7b96b..b7f5a4f1e 100644 --- a/fairmq/JSONParser.cxx +++ b/fairmq/JSONParser.cxx @@ -13,18 +13,19 @@ */ #include "JSONParser.h" -#include "FairMQChannel.h" -#include -#include #include +#include #define BOOST_BIND_GLOBAL_PLACEHOLDERS #include #undef BOOST_BIND_GLOBAL_PLACEHOLDERS #include -#include - +#include +#include +#include +#include +#include #include using namespace std; diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index f884a7cc2..6f62b748c 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_PLUGINSERVICES_H #define FAIR_MQ_PLUGINSERVICES_H +#include #include -#include #include #include @@ -40,7 +40,7 @@ class PluginServices { public: PluginServices() = delete; - PluginServices(ProgOptions& config, FairMQDevice& device) + PluginServices(ProgOptions& config, Device& device) : fConfig(config) , fDevice(device) {} @@ -117,7 +117,7 @@ class PluginServices /// The state transition may not happen immediately, but when the current state evaluates the /// pending transition event and terminates. In other words, the device states are scheduled cooperatively. /// If the device control role has not been taken yet, calling this function will take over control implicitely. - auto ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> bool; + auto ChangeDeviceState(const std::string& controller, DeviceStateTransition next) -> bool; /// @brief Subscribe with a callback to device state changes /// @param subscriber id @@ -269,7 +269,7 @@ class PluginServices private: fair::mq::ProgOptions& fConfig; - FairMQDevice& fDevice; + Device& fDevice; boost::optional fDeviceController; mutable std::mutex fDeviceControllerMutex; std::condition_variable fReleaseDeviceControlCondition; diff --git a/fairmq/ProgOptions.h b/fairmq/ProgOptions.h index 09dd5f6f2..bec9042c5 100644 --- a/fairmq/ProgOptions.h +++ b/fairmq/ProgOptions.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,22 +9,20 @@ #ifndef FAIR_MQ_PROGOPTIONS_H #define FAIR_MQ_PROGOPTIONS_H -#include "FairMQChannel.h" -#include "FairMQLogger.h" +#include +#include +#include #include #include #include #include - -#include - #include #include #include +#include #include #include #include -#include namespace fair::mq { @@ -38,10 +36,10 @@ class ProgOptions virtual ~ProgOptions() = default; void ParseAll(const std::vector& cmdArgs, bool allowUnregistered); - void ParseAll(const int argc, char const* const* argv, bool allowUnregistered = true); + void ParseAll(int argc, char const* const* argv, bool allowUnregistered = true); void Notify(); - void AddToCmdLineOptions(const boost::program_options::options_description optDesc, bool visible = true); + void AddToCmdLineOptions(boost::program_options::options_description optDesc, bool visible = true); boost::program_options::options_description& GetCmdLineOptions(); /// @brief Checks a property with the given key exist in the configuration @@ -174,7 +172,7 @@ class ProgOptions /// @brief Takes the provided channel and creates properties based on it /// @param name channel name /// @param channel FairMQChannel reference - void AddChannel(const std::string& name, const FairMQChannel& channel); + void AddChannel(const std::string& name, const Channel& channel); /// @brief Subscribe to property updates of type T /// @param subscriber diff --git a/fairmq/TransportFactory.h b/fairmq/TransportFactory.h index 8873fa08c..03892856e 100644 --- a/fairmq/TransportFactory.h +++ b/fairmq/TransportFactory.h @@ -59,12 +59,12 @@ class TransportFactory /// @brief Create new Message of specified size /// @param size message size /// @return pointer to Message - virtual MessagePtr CreateMessage(const size_t size) = 0; + virtual MessagePtr CreateMessage(size_t size) = 0; /// @brief Create new Message of specified size and alignment /// @param size message size /// @param alignment message alignment /// @return pointer to Message - virtual MessagePtr CreateMessage(const size_t size, Alignment alignment) = 0; + virtual MessagePtr CreateMessage(size_t size, Alignment alignment) = 0; /// @brief Create new Message with user provided buffer and size /// @param data pointer to user provided buffer /// @param size size of the user provided buffer @@ -72,8 +72,8 @@ class TransportFactory /// @param obj optional helper pointer that can be used in the callback /// @return pointer to Message virtual MessagePtr CreateMessage(void* data, - const size_t size, - fairmq_free_fn* ffn, + size_t size, + FreeFn* ffn, void* hint = nullptr) = 0; /// @brief create a message with the buffer located within the corresponding unmanaged region /// @param unmanagedRegion the unmanaged region that this message buffer belongs to @@ -82,8 +82,8 @@ class TransportFactory /// @param hint optional parameter, returned to the user in the RegionCallback virtual MessagePtr CreateMessage(UnmanagedRegionPtr& unmanagedRegion, void* data, - const size_t size, - void* hint = 0) = 0; + size_t size, + void* hint = nullptr) = 0; /// @brief Create a socket virtual SocketPtr CreateSocket(const std::string& type, const std::string& name) = 0; diff --git a/fairmq/shmem/Poller.h b/fairmq/shmem/Poller.h index 396fca3cf..814d0023a 100644 --- a/fairmq/shmem/Poller.h +++ b/fairmq/shmem/Poller.h @@ -5,19 +5,18 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ + #ifndef FAIR_MQ_SHMEM_POLLER_H_ #define FAIR_MQ_SHMEM_POLLER_H_ -#include "Socket.h" +#include +#include +#include +#include #include -#include -#include -#include - -#include - #include #include +#include namespace fair::mq::shmem { @@ -25,7 +24,7 @@ namespace fair::mq::shmem class Poller final : public fair::mq::Poller { public: - Poller(const std::vector& channels) + Poller(const std::vector& channels) : fItems() , fNumItems(0) { @@ -45,7 +44,7 @@ class Poller final : public fair::mq::Poller } } - Poller(const std::vector& channels) + Poller(const std::vector& channels) : fItems() , fNumItems(0) { @@ -65,7 +64,7 @@ class Poller final : public fair::mq::Poller } } - Poller(const std::unordered_map>& channelsMap, const std::vector& channelList) + Poller(const std::unordered_map>& channelsMap, const std::vector& channelList) : fItems() , fNumItems(0) { diff --git a/fairmq/zeromq/Poller.h b/fairmq/zeromq/Poller.h index 92cb1eee0..d63d6bcbd 100644 --- a/fairmq/zeromq/Poller.h +++ b/fairmq/zeromq/Poller.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,16 +9,14 @@ #ifndef FAIR_MQ_ZMQ_POLLER_H #define FAIR_MQ_ZMQ_POLLER_H -#include +#include +#include +#include #include -#include -#include -#include - -#include - +#include #include #include +#include namespace fair::mq::zmq { @@ -26,7 +24,7 @@ namespace fair::mq::zmq class Poller final : public fair::mq::Poller { public: - Poller(const std::vector& channels) + Poller(const std::vector& channels) : fItems() , fNumItems(0) { @@ -46,7 +44,7 @@ class Poller final : public fair::mq::Poller } } - Poller(const std::vector& channels) + Poller(const std::vector& channels) : fItems() , fNumItems(0) { @@ -66,14 +64,14 @@ class Poller final : public fair::mq::Poller } } - Poller(const std::unordered_map>& channelsMap, const std::vector& channelList) + Poller(const std::unordered_map>& channelsMap, const std::vector& channelList) : fItems() , fNumItems(0) { try { int offset = 0; // calculate offsets and the total size of the poll item set - for (std::string channel : channelList) { + for (std::string const & channel : channelList) { fOffsetMap[channel] = offset; offset += channelsMap.at(channel).size(); fNumItems += channelsMap.at(channel).size(); diff --git a/test/channel/_channel.cxx b/test/channel/_channel.cxx index d13475e62..b953142aa 100644 --- a/test/channel/_channel.cxx +++ b/test/channel/_channel.cxx @@ -1,15 +1,13 @@ /******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include - +#include #include - #include namespace @@ -20,15 +18,15 @@ using namespace fair::mq; TEST(Channel, Validation) { - FairMQChannel channel; - ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError); + Channel channel; + ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError); channel.UpdateType("pair"); ASSERT_EQ(channel.Validate(), false); ASSERT_EQ(channel.IsValid(), false); channel.UpdateAddress("bla"); - ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError); + ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError); channel.UpdateMethod("connect"); ASSERT_EQ(channel.Validate(), false); @@ -55,31 +53,31 @@ TEST(Channel, Validation) ASSERT_EQ(channel.IsValid(), true); channel.UpdateSndBufSize(-1); - ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError); + ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError); channel.UpdateSndBufSize(1000); ASSERT_NO_THROW(channel.Validate()); channel.UpdateRcvBufSize(-1); - ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError); + ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError); channel.UpdateRcvBufSize(1000); ASSERT_NO_THROW(channel.Validate()); channel.UpdateSndKernelSize(-1); - ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError); + ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError); channel.UpdateSndKernelSize(1000); ASSERT_NO_THROW(channel.Validate()); channel.UpdateRcvKernelSize(-1); - ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError); + ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError); channel.UpdateRcvKernelSize(1000); ASSERT_NO_THROW(channel.Validate()); channel.UpdateRateLogging(-1); - ASSERT_THROW(channel.Validate(), FairMQChannel::ChannelConfigurationError); + ASSERT_THROW(channel.Validate(), Channel::ChannelConfigurationError); channel.UpdateRateLogging(1); ASSERT_NO_THROW(channel.Validate()); - FairMQChannel channel2 = channel; + Channel channel2 = channel; ASSERT_NO_THROW(channel2.Validate()); ASSERT_EQ(channel2.Validate(), true); ASSERT_EQ(channel2.IsValid(), true); diff --git a/test/memory_resources/_memory_resources.cxx b/test/memory_resources/_memory_resources.cxx index cd5c34d0a..f7a177dff 100644 --- a/test/memory_resources/_memory_resources.cxx +++ b/test/memory_resources/_memory_resources.cxx @@ -73,9 +73,10 @@ int TestData::ndeallocations = 0; TEST(MemoryResources, transportAllocatorMap) { - size_t session{tools::UuidHash()}; + // size_t session{tools::UuidHash()}; ProgOptions config; - config.SetProperty("session", to_string(session)); + // config.SetProperty("session", to_string(session)); + config.SetProperty("session", "default"); FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config); FactoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem", fair::mq::tools::Uuid(), &config); diff --git a/test/message/_message.cxx b/test/message/_message.cxx index 73c4e53ec..7480e3767 100644 --- a/test/message/_message.cxx +++ b/test/message/_message.cxx @@ -1,223 +1,232 @@ /******************************************************************************** - * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include -#include +#include +#include +#include +#include +#include #include -#include +#include #include - +#include #include - +#include #include -#include +#include +#include namespace { using namespace std; +using namespace fair::mq; -void RunPushPullWithMsgResize(const string& transport, const string& _address) +auto AsStringView(Message const& msg) -> string_view { - size_t session{fair::mq::tools::UuidHash()}; - std::string address(fair::mq::tools::ToString(_address, "_", transport)); - - fair::mq::ProgOptions config; - config.SetProperty("session", to_string(session)); + return {static_cast(msg.GetData()), msg.GetSize()}; +} - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); +auto RunPushPullWithMsgResize(string const & transport, string const & _address) -> void +{ + ProgOptions config; + config.SetProperty("session", tools::Uuid()); + auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); - FairMQChannel push{"Push", "push", factory}; + Channel push{"Push", "push", factory}; + Channel pull{"Pull", "pull", factory}; + auto const address(tools::ToString(_address, "_", transport)); push.Bind(address); - - FairMQChannel pull{"Pull", "pull", factory}; pull.Connect(address); { - FairMQMessagePtr outMsg(push.NewMessage(6)); - ASSERT_EQ(outMsg->GetSize(), 6); - memcpy(outMsg->GetData(), "ABCDEF", 6); - ASSERT_EQ(outMsg->SetUsedSize(5), true); - ASSERT_EQ(outMsg->SetUsedSize(5), true); - ASSERT_EQ(outMsg->SetUsedSize(7), false); + size_t const size{6}; + auto outMsg(push.NewMessage(size)); + ASSERT_EQ(outMsg->GetSize(), size); + memcpy(outMsg->GetData(), "ABCDEF", size); + ASSERT_TRUE(outMsg->SetUsedSize(5)); + ASSERT_TRUE(outMsg->SetUsedSize(5)); + ASSERT_FALSE(outMsg->SetUsedSize(7)); ASSERT_EQ(outMsg->GetSize(), 5); + // check if the data is still intact - ASSERT_EQ(static_cast(outMsg->GetData())[0], 'A'); - ASSERT_EQ(static_cast(outMsg->GetData())[1], 'B'); - ASSERT_EQ(static_cast(outMsg->GetData())[2], 'C'); - ASSERT_EQ(static_cast(outMsg->GetData())[3], 'D'); - ASSERT_EQ(static_cast(outMsg->GetData())[4], 'E'); - ASSERT_EQ(outMsg->SetUsedSize(2), true); + ASSERT_EQ(AsStringView(*outMsg)[0], 'A'); + ASSERT_EQ(AsStringView(*outMsg)[1], 'B'); + ASSERT_EQ(AsStringView(*outMsg)[2], 'C'); + ASSERT_EQ(AsStringView(*outMsg)[3], 'D'); + ASSERT_EQ(AsStringView(*outMsg)[4], 'E'); + ASSERT_TRUE(outMsg->SetUsedSize(2)); ASSERT_EQ(outMsg->GetSize(), 2); - ASSERT_EQ(static_cast(outMsg->GetData())[0], 'A'); - ASSERT_EQ(static_cast(outMsg->GetData())[1], 'B'); - FairMQMessagePtr msgCopy(push.NewMessage()); + ASSERT_EQ(AsStringView(*outMsg)[0], 'A'); + ASSERT_EQ(AsStringView(*outMsg)[1], 'B'); + + auto msgCopy(push.NewMessage()); msgCopy->Copy(*outMsg); ASSERT_EQ(msgCopy->GetSize(), 2); ASSERT_EQ(push.Send(outMsg), 2); - FairMQMessagePtr inMsg(pull.NewMessage()); + auto inMsg(pull.NewMessage()); ASSERT_EQ(pull.Receive(inMsg), 2); ASSERT_EQ(inMsg->GetSize(), 2); - ASSERT_EQ(static_cast(inMsg->GetData())[0], 'A'); - ASSERT_EQ(static_cast(inMsg->GetData())[1], 'B'); + ASSERT_EQ(AsStringView(*inMsg)[0], 'A'); + ASSERT_EQ(AsStringView(*inMsg)[1], 'B'); } { - FairMQMessagePtr outMsg(push.NewMessage(1000)); - ASSERT_EQ(outMsg->SetUsedSize(0), true); + size_t const size{1000}; + auto outMsg(push.NewMessage(size)); + ASSERT_TRUE(outMsg->SetUsedSize(0)); ASSERT_EQ(outMsg->GetSize(), 0); - FairMQMessagePtr msgCopy(push.NewMessage()); + auto msgCopy(push.NewMessage()); msgCopy->Copy(*outMsg); ASSERT_EQ(msgCopy->GetSize(), 0); ASSERT_EQ(push.Send(outMsg), 0); - FairMQMessagePtr inMsg(pull.NewMessage()); + auto inMsg(pull.NewMessage()); ASSERT_EQ(pull.Receive(inMsg), 0); ASSERT_EQ(inMsg->GetSize(), 0); } } -void RunMsgRebuild(const string& transport) +auto RunMsgRebuild(const string& transport) -> void { - size_t session{fair::mq::tools::UuidHash()}; + ProgOptions config; + config.SetProperty("session", tools::Uuid()); + auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); - fair::mq::ProgOptions config; - config.SetProperty("session", to_string(session)); - - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); - - FairMQMessagePtr msg(factory->CreateMessage()); + size_t const msgSize{100}; + string const expectedStr{"asdf"}; + auto msg(factory->CreateMessage()); EXPECT_EQ(msg->GetSize(), 0); - msg->Rebuild(100); - EXPECT_EQ(msg->GetSize(), 100); - string* str = new string("asdf"); - msg->Rebuild(const_cast(str->c_str()), - str->length(), - [](void* /*data*/, void* obj) { delete static_cast(obj); }, - str); - EXPECT_NE(msg->GetSize(), 100); - EXPECT_EQ(msg->GetSize(), string("asdf").length()); - EXPECT_EQ(string(static_cast(msg->GetData()), msg->GetSize()), string("asdf")); + msg->Rebuild(msgSize); + EXPECT_EQ(msg->GetSize(), msgSize); + + auto str(make_unique(expectedStr)); + void* data(str->data()); + auto const size(str->length()); + msg->Rebuild( + data, + size, + [](void* /*data*/, void* obj) { delete static_cast(obj); }, // NOLINT + str.release()); + EXPECT_NE(msg->GetSize(), msgSize); + EXPECT_EQ(msg->GetSize(), expectedStr.length()); + EXPECT_EQ(AsStringView(*msg), expectedStr); } -void Alignment(const string& transport, const string& _address) +auto CheckMsgAlignment(Message const& msg, fair::mq::Alignment alignment) -> bool { - size_t session{fair::mq::tools::UuidHash()}; - std::string address(fair::mq::tools::ToString(_address, "_", transport)); - - fair::mq::ProgOptions config; - config.SetProperty("session", to_string(session)); + assert(static_cast(alignment) > 0); // NOLINT + return (reinterpret_cast(msg.GetData()) % static_cast(alignment)) == 0; // NOLINT +} - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); +auto RunPushPullWithAlignment(string const& transport, string const& _address) -> void +{ + ProgOptions config; + config.SetProperty("session", tools::Uuid()); + auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); - FairMQChannel push{"Push", "push", factory}; + Channel push{"Push", "push", factory}; + Channel pull{"Pull", "pull", factory}; + auto const address(tools::ToString(_address, "_", transport)); push.Bind(address); - - FairMQChannel pull{"Pull", "pull", factory}; pull.Connect(address); - size_t alignment = 64; - - FairMQMessagePtr outMsg1(push.NewMessage(100, fair::mq::Alignment{alignment})); - ASSERT_EQ(reinterpret_cast(outMsg1->GetData()) % alignment, 0); - ASSERT_EQ(push.Send(outMsg1), 100); - - FairMQMessagePtr inMsg1(pull.NewMessage(fair::mq::Alignment{alignment})); - ASSERT_EQ(pull.Receive(inMsg1), 100); - ASSERT_EQ(reinterpret_cast(inMsg1->GetData()) % alignment, 0); - - FairMQMessagePtr outMsg2(push.NewMessage(32, fair::mq::Alignment{alignment})); - ASSERT_EQ(reinterpret_cast(outMsg2->GetData()) % alignment, 0); - ASSERT_EQ(push.Send(outMsg2), 32); - - FairMQMessagePtr inMsg2(pull.NewMessage(fair::mq::Alignment{alignment})); - ASSERT_EQ(pull.Receive(inMsg2), 32); - ASSERT_EQ(reinterpret_cast(inMsg2->GetData()) % alignment, 0); + { + Alignment const align{64}; + for (size_t const size : {100, 32}) { + auto outMsg(push.NewMessage(size, align)); + auto inMsg(pull.NewMessage(align)); + + ASSERT_TRUE(CheckMsgAlignment(*outMsg, align)); + ASSERT_EQ(push.Send(outMsg), size); + ASSERT_EQ(pull.Receive(inMsg), size); + ASSERT_TRUE(CheckMsgAlignment(*inMsg, align)); + } + } - FairMQMessagePtr outMsg3(push.NewMessage(100, fair::mq::Alignment{0})); - ASSERT_EQ(push.Send(outMsg3), 100); + { + Alignment const align{0}; + size_t const size{100}; + auto outMsg(push.NewMessage(size, align)); + auto inMsg(pull.NewMessage(align)); - FairMQMessagePtr inMsg3(pull.NewMessage(fair::mq::Alignment{0})); - ASSERT_EQ(pull.Receive(inMsg3), 100); + ASSERT_EQ(push.Send(outMsg), size); + ASSERT_EQ(pull.Receive(inMsg), size); + } - FairMQMessagePtr msg1(push.NewMessage(25)); - msg1->Rebuild(50, fair::mq::Alignment{alignment}); - ASSERT_EQ(reinterpret_cast(msg1->GetData()) % alignment, 0); + for (auto const align : {Alignment{64}, Alignment{32}}) { + size_t const size25{25}; + size_t const size50{50}; - size_t alignment2 = 32; - FairMQMessagePtr msg2(push.NewMessage(25, fair::mq::Alignment{alignment})); - msg2->Rebuild(50, fair::mq::Alignment{alignment2}); - ASSERT_EQ(reinterpret_cast(msg2->GetData()) % alignment2, 0); + auto msg(push.NewMessage(size25)); + msg->Rebuild(size50, align); + ASSERT_TRUE(CheckMsgAlignment(*msg, align)); + } } -void EmptyMessage(const string& transport, const string& _address) +auto EmptyMessage(string const& transport, string const& _address) -> void { - size_t session{fair::mq::tools::UuidHash()}; - std::string address(fair::mq::tools::ToString(_address, "_", transport)); + ProgOptions config; + config.SetProperty("session", tools::Uuid()); + auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); - fair::mq::ProgOptions config; - config.SetProperty("session", to_string(session)); - - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); - - FairMQChannel push{"Push", "push", factory}; + Channel push{"Push", "push", factory}; + Channel pull{"Pull", "pull", factory}; + auto const address(tools::ToString(_address, "_", transport)); push.Bind(address); - - FairMQChannel pull{"Pull", "pull", factory}; pull.Connect(address); - FairMQMessagePtr outMsg(push.NewMessage()); + auto outMsg(push.NewMessage()); ASSERT_EQ(outMsg->GetData(), nullptr); ASSERT_EQ(push.Send(outMsg), 0); - FairMQMessagePtr inMsg(pull.NewMessage()); + auto inMsg(pull.NewMessage()); ASSERT_EQ(pull.Receive(inMsg), 0); ASSERT_EQ(inMsg->GetData(), nullptr); } -TEST(Resize, zeromq) +TEST(Resize, zeromq) // NOLINT { RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize"); } -TEST(Resize, shmem) +TEST(Resize, shmem) // NOLINT { RunPushPullWithMsgResize("shmem", "ipc://test_message_resize"); } -TEST(Rebuild, zeromq) +TEST(Rebuild, zeromq) // NOLINT { RunMsgRebuild("zeromq"); } -TEST(Rebuild, shmem) +TEST(Rebuild, shmem) // NOLINT { RunMsgRebuild("shmem"); } -TEST(Alignment, shmem) +TEST(Alignment, shmem) // NOLINT { - Alignment("shmem", "ipc://test_message_alignment"); + RunPushPullWithAlignment("shmem", "ipc://test_message_alignment"); } -TEST(Alignment, zeromq) +TEST(Alignment, zeromq) // NOLINT { - Alignment("zeromq", "ipc://test_message_alignment"); + RunPushPullWithAlignment("zeromq", "ipc://test_message_alignment"); } -TEST(EmptyMessage, zeromq) +TEST(EmptyMessage, zeromq) // NOLINT { EmptyMessage("zeromq", "ipc://test_empty_message"); } -TEST(EmptyMessage, shmem) +TEST(EmptyMessage, shmem) // NOLINT { EmptyMessage("shmem", "ipc://test_empty_message"); } diff --git a/test/protocols/_push_pull_multipart.cxx b/test/protocols/_push_pull_multipart.cxx index 0f38406fc..3a6806b3d 100644 --- a/test/protocols/_push_pull_multipart.cxx +++ b/test/protocols/_push_pull_multipart.cxx @@ -1,20 +1,19 @@ /******************************************************************************** - * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include -#include -#include -#include -#include -#include - #include +#include +#include +#include +#include +#include +#include +#include #include #include #include @@ -24,23 +23,22 @@ namespace { using namespace std; +using namespace fair::mq; auto RunSingleThreadedMultipart(string transport, string address1, string address2) -> void { - size_t session{fair::mq::tools::UuidHash()}; - fair::mq::ProgOptions config; - config.SetProperty("session", std::to_string(session)); + config.SetProperty("session", tools::Uuid()); - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); + auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); - FairMQChannel push1("Push1", "push", factory); + Channel push1("Push1", "push", factory); ASSERT_TRUE(push1.Bind(address1)); - FairMQChannel pull1("Pull1", "pull", factory); + Channel pull1("Pull1", "pull", factory); ASSERT_TRUE(pull1.Connect(address1)); - FairMQChannel push2("Push2", "push", factory); + Channel push2("Push2", "push", factory); ASSERT_TRUE(push2.Bind(address2)); - FairMQChannel pull2("Pull2", "pull", factory); + Channel pull2("Pull2", "pull", factory); ASSERT_TRUE(pull2.Connect(address2)); // TODO validate that fTransportFactory is not nullptr @@ -51,32 +49,32 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres ASSERT_TRUE(pull2.Validate()); { - FairMQParts multiplePartsOut; + Parts multiplePartsOut; multiplePartsOut.AddPart(push1.NewSimpleMessage("1")); multiplePartsOut.AddPart(push1.NewSimpleMessage("2")); multiplePartsOut.AddPart(push1.NewSimpleMessage("3")); ASSERT_GE(push1.Send(multiplePartsOut), 0); - FairMQParts singlePartOut; + Parts singlePartOut; singlePartOut.AddPart(push1.NewSimpleMessage("4")); ASSERT_GE(push1.Send(singlePartOut), 0); } - FairMQParts multipleParts; + Parts multipleParts; ASSERT_GE(pull1.Receive(multipleParts), 0); stringstream multiple; - for_each(multipleParts.cbegin(), multipleParts.cend(), [&multiple, &factory](const FairMQMessagePtr& part) { + for_each(multipleParts.cbegin(), multipleParts.cend(), [&multiple, &factory](const MessagePtr& part) { multiple << string{static_cast(part->GetData()), part->GetSize()}; ASSERT_EQ(part->GetTransport(), factory.get()); }); ASSERT_EQ(multiple.str(), "123"); - FairMQParts singlePart; + Parts singlePart; ASSERT_GE(pull1.Receive(singlePart), 0); stringstream single; - for_each(singlePart.cbegin(), singlePart.cend(), [&single](const FairMQMessagePtr& part) { + for_each(singlePart.cbegin(), singlePart.cend(), [&single](const MessagePtr& part) { single << string{static_cast(part->GetData()), part->GetSize()}; }); ASSERT_EQ(single.str(), "4"); @@ -85,18 +83,18 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres ASSERT_GE(push2.Send(multipleParts), 0); { - FairMQParts singlePartIn; + Parts singlePartIn; ASSERT_GE(pull2.Receive(singlePartIn), 0); stringstream singleIn; - for_each(singlePartIn.cbegin(), singlePartIn.cend(), [&singleIn](const FairMQMessagePtr& part) { + for_each(singlePartIn.cbegin(), singlePartIn.cend(), [&singleIn](const MessagePtr& part) { singleIn << string{static_cast(part->GetData()), part->GetSize()}; }); ASSERT_EQ(singleIn.str(), "4"); - FairMQParts multiplePartsIn; + Parts multiplePartsIn; ASSERT_GE(pull2.Receive(multiplePartsIn), 0); stringstream multipleIn; - for_each(multiplePartsIn.cbegin(), multiplePartsIn.cend(), [&multipleIn, &factory](const FairMQMessagePtr& part) { + for_each(multiplePartsIn.cbegin(), multiplePartsIn.cend(), [&multipleIn, &factory](const MessagePtr& part) { multipleIn << string{static_cast(part->GetData()), part->GetSize()}; ASSERT_EQ(part->GetTransport(), factory.get()); }); @@ -106,24 +104,22 @@ auto RunSingleThreadedMultipart(string transport, string address1, string addres auto RunMultiThreadedMultipart(string transport, string address1) -> void { - size_t session{fair::mq::tools::UuidHash()}; - - fair::mq::ProgOptions config; - config.SetProperty("session", std::to_string(session)); + ProgOptions config; + config.SetProperty("session", tools::Uuid()); config.SetProperty("io-threads", 1); - config.SetProperty("shm-segment-size", 20000000); + config.SetProperty("shm-segment-size", 20000000); // NOLINT - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); + auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); - FairMQChannel push1("Push1", "push", factory); + Channel push1("Push1", "push", factory); ASSERT_TRUE(push1.Bind(address1)); - FairMQChannel pull1("Pull1", "pull", factory); + Channel pull1("Pull1", "pull", factory); ASSERT_TRUE(pull1.Connect(address1)); auto pusher = thread{[&push1](){ ASSERT_TRUE(push1.Validate()); - FairMQParts sent; + Parts sent; sent.AddPart(push1.NewSimpleMessage("1")); sent.AddPart(push1.NewSimpleMessage("2")); sent.AddPart(push1.NewSimpleMessage("3")); @@ -134,11 +130,11 @@ auto RunMultiThreadedMultipart(string transport, string address1) -> void auto puller = thread{[&pull1](){ ASSERT_TRUE(pull1.Validate()); - FairMQParts received; + Parts received; ASSERT_GE(pull1.Receive(received), 0); stringstream out; - for_each(received.cbegin(), received.cend(), [&out](const FairMQMessagePtr& part) { + for_each(received.cbegin(), received.cend(), [&out](const MessagePtr& part) { out << string{static_cast(part->GetData()), part->GetSize()}; }); ASSERT_EQ(out.str(), "123"); @@ -148,42 +144,42 @@ auto RunMultiThreadedMultipart(string transport, string address1) -> void puller.join(); } -TEST(PushPull, Multipart_ST_inproc_zeromq) +TEST(PushPull, Multipart_ST_inproc_zeromq) // NOLINT { RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2"); } -TEST(PushPull, Multipart_ST_inproc_shmem) +TEST(PushPull, Multipart_ST_inproc_shmem) // NOLINT { RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2"); } -TEST(PushPull, Multipart_ST_ipc_zeromq) +TEST(PushPull, Multipart_ST_ipc_zeromq) // NOLINT { RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2"); } -TEST(PushPull, Multipart_ST_ipc_shmem) +TEST(PushPull, Multipart_ST_ipc_shmem) // NOLINT { RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmem_1", "ipc://test_Multipart_ST_ipc_shmem_2"); } -TEST(PushPull, Multipart_MT_inproc_zeromq) +TEST(PushPull, Multipart_MT_inproc_zeromq) // NOLINT { RunMultiThreadedMultipart("zeromq", "inproc://test_1"); } -TEST(PushPull, Multipart_MT_inproc_shmem) +TEST(PushPull, Multipart_MT_inproc_shmem) // NOLINT { RunMultiThreadedMultipart("shmem", "inproc://test_1"); } -TEST(PushPull, Multipart_MT_ipc_zeromq) +TEST(PushPull, Multipart_MT_ipc_zeromq) // NOLINT { RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MT_ipc_zeromq_1"); } -TEST(PushPull, Multipart_MT_ipc_shmem) +TEST(PushPull, Multipart_MT_ipc_shmem) // NOLINT { RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MT_ipc_shmem_1"); } diff --git a/test/transport/_options.cxx b/test/transport/_options.cxx index e603ebec0..01aef8d07 100644 --- a/test/transport/_options.cxx +++ b/test/transport/_options.cxx @@ -1,20 +1,20 @@ /******************************************************************************** - * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include -#include -#include -#include -#include -#include - #include +#include +#include +#include +#include +#include +#include +#include +#include #include #include #include @@ -24,25 +24,25 @@ namespace { using namespace std; +using namespace fair::mq; -void CheckOldOptionInterface(FairMQChannel& channel, const string& option) +void CheckOldOptionInterface(Channel& channel, const string& option) { - int value = 500; + int const expectedValue{500}; + int value = expectedValue; channel.GetSocket().SetOption(option, &value, sizeof(value)); value = 0; size_t valueSize = sizeof(value); channel.GetSocket().GetOption(option, &value, &valueSize); - ASSERT_EQ(value, 500); + ASSERT_EQ(value, expectedValue); } void RunOptionsTest(const string& transport) { - size_t session{fair::mq::tools::UuidHash()}; - fair::mq::ProgOptions config; - config.SetProperty("session", to_string(session)); - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); - FairMQChannel channel("Push", "push", factory); + config.SetProperty("session", tools::Uuid()); + auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); + Channel channel("Push", "push", factory); CheckOldOptionInterface(channel, "linger"); CheckOldOptionInterface(channel, "snd-hwm"); @@ -50,38 +50,37 @@ void RunOptionsTest(const string& transport) CheckOldOptionInterface(channel, "snd-size"); CheckOldOptionInterface(channel, "rcv-size"); - channel.GetSocket().SetLinger(300); - ASSERT_EQ(channel.GetSocket().GetLinger(), 300); - - channel.GetSocket().SetSndBufSize(500); - ASSERT_EQ(channel.GetSocket().GetSndBufSize(), 500); - - channel.GetSocket().SetRcvBufSize(500); - ASSERT_EQ(channel.GetSocket().GetRcvBufSize(), 500); - - channel.GetSocket().SetSndKernelSize(8000); - ASSERT_EQ(channel.GetSocket().GetSndKernelSize(), 8000); - - channel.GetSocket().SetRcvKernelSize(8000); - ASSERT_EQ(channel.GetSocket().GetRcvKernelSize(), 8000); + size_t const linger{300}; + channel.GetSocket().SetLinger(linger); + ASSERT_EQ(channel.GetSocket().GetLinger(), linger); + + size_t const bufSize{500}; + channel.GetSocket().SetSndBufSize(bufSize); + ASSERT_EQ(channel.GetSocket().GetSndBufSize(), bufSize); + channel.GetSocket().SetRcvBufSize(bufSize); + ASSERT_EQ(channel.GetSocket().GetRcvBufSize(), bufSize); + + size_t const kernelSize{8000}; + channel.GetSocket().SetSndKernelSize(kernelSize); + ASSERT_EQ(channel.GetSocket().GetSndKernelSize(), kernelSize); + channel.GetSocket().SetRcvKernelSize(kernelSize); + ASSERT_EQ(channel.GetSocket().GetRcvKernelSize(), kernelSize); } void ZeroingAndMlock(const string& transport) { - size_t session{fair::mq::tools::UuidHash()}; - - fair::mq::ProgOptions config; - config.SetProperty("session", to_string(session)); - config.SetProperty("shm-segment-size", 16384); + ProgOptions config; + config.SetProperty("session", tools::Uuid()); + config.SetProperty("shm-segment-size", 16384); // NOLINT config.SetProperty("shm-zero-segment", true); config.SetProperty("shm-mlock-segment", true); - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); + auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); - FairMQMessagePtr outMsg(factory->CreateMessage(10000)); - char test[10000]; - memset(test, 0, sizeof(test)); - ASSERT_EQ(memcmp(test, outMsg->GetData(), outMsg->GetSize()), 0); + constexpr size_t size{10000}; + auto outMsg(factory->CreateMessage(size)); + array test{0}; + ASSERT_EQ(memcmp(test.data(), outMsg->GetData(), outMsg->GetSize()), 0); } void ZeroingAndMlockOnCreation(const string& transport) @@ -90,29 +89,29 @@ void ZeroingAndMlockOnCreation(const string& transport) fair::mq::ProgOptions config; config.SetProperty("session", to_string(session)); - config.SetProperty("shm-segment-size", 16384); + config.SetProperty("shm-segment-size", 16384); // NOLINT config.SetProperty("shm-mlock-segment-on-creation", true); config.SetProperty("shm-zero-segment-on-creation", true); auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); - FairMQMessagePtr outMsg(factory->CreateMessage(10000)); - char test[10000]; - memset(test, 0, sizeof(test)); - ASSERT_EQ(memcmp(test, outMsg->GetData(), outMsg->GetSize()), 0); + constexpr size_t size{10000}; + auto outMsg(factory->CreateMessage(size)); + array test{0}; + ASSERT_EQ(memcmp(test.data(), outMsg->GetData(), outMsg->GetSize()), 0); } -TEST(Options, zeromq) +TEST(Options, zeromq) // NOLINT { RunOptionsTest("zeromq"); } -TEST(Options, shmem) +TEST(Options, shmem) // NOLINT { RunOptionsTest("shmem"); } -TEST(ZeroingAndMlock, shmem) +TEST(ZeroingAndMlock, shmem) // NOLINT { ZeroingAndMlock("shmem"); }