Skip to content

Commit

Permalink
fix: First round of using new non-namespaced typenames
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisklein committed Sep 7, 2021
1 parent 0bf765e commit 4e8f247
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 317 deletions.
22 changes: 11 additions & 11 deletions docs/Transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,34 @@ The next table shows the supported address types for each transport implementati

## 2.1 Message

Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content. Message can be initialized in three different ways by calling `NewMessage()`:
Devices transport data between each other in form of `fair::mq::Message`s. These can be filled with arbitrary content. Message can be initialized in three different ways by calling `NewMessage()`:

```cpp
FairMQMessagePtr NewMessage() const;
fair::mq::MessagePtr NewMessage() const;
```
**with no parameters**: Initializes an empty message (typically used for receiving).

```cpp
FairMQMessagePtr NewMessage(const size_t size) const;
fair::mq::MessagePtr NewMessage(const size_t size) const;
```
**given message size**: Initializes message body with a given size. Fill the created contents via buffer pointer.
```cpp
using fairmq_free_fn = void(void* data, void* hint);
FairMQMessagePtr NewMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const;
fair::mq::MessagePtr NewMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const;
```
**given existing buffer and a size**: Initialize the message from an existing buffer. In case of ZeroMQ this is a zero-copy operation.

Additionally, FairMQ provides two more message factories for convenience:
```cpp
template<typename T>
FairMQMessagePtr NewSimpleMessage(const T& data) const
fair::mq::MessagePtr NewSimpleMessage(const T& data) const
```
**copy and own**: Copy the `data` argument into the returned message and take ownership (free memory after message is sent). This interface is useful for small, [trivially copyable](http://en.cppreference.com/w/cpp/concept/TriviallyCopyable) data.
```cpp
template<typename T>
FairMQMessagePtr NewStaticMessage(const T& data) const
fair::mq::MessagePtr NewStaticMessage(const T& data) const
```
**point to existing memory**: The returned message will point to the `data` argument, but not take ownership (someone else must destruct this variable). Make sure that `data` lives long enough to be successfully sent. This interface is most useful for third party managed, contiguous memory (Be aware of shallow types with internal pointer references! These will not be sent.)

Expand All @@ -65,19 +65,19 @@ The component of a program, that is reponsible for the allocation or destruction
After queuing a message for sending in FairMQ, the transport takes ownership over the message body and will free it with `free()` after it is no longer used. A callback can be passed to the message object, to be called instead of the destruction with `free()` (for initialization via buffer+size).

```cpp
static void FairMQNoCleanup(void* /*data*/, void* /*obj*/) {}
static void fair::mq::NoCleanup(void* /*data*/, void* /*obj*/) {}

template<typename T>
static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj) { delete static_cast<T*>(obj); }
static void fair::mq::SimpleMsgCleanup(void* /*data*/, void* obj) { delete static_cast<T*>(obj); }
```
For convenience, two common deleter callbacks are already defined in the `FairMQTransportFactory` class to aid the user in controlling ownership of the data.
For convenience, two common deleter callbacks are already defined in the `fair::mq::TransportFactory` class to aid the user in controlling ownership of the data.
## 2.2 Channel
A channel represents a communication endpoint in FairMQ. Usage is similar to a traditional Unix network socket. A device usually contains a number of channels that can either listen for incoming connections from channels of other devices or they can connect to other listening channels. Channels are organized by a channel name and a subchannel index.
```cpp
const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const;
const fair::mq::Channel& GetChannel(const std::string& channelName, const int index = 0) const;
```

All subchannels with a common channel name need to be of the same transport type.
Expand All @@ -87,7 +87,7 @@ All subchannels with a common channel name need to be of the same transport type
A poller allows to wait on multiple channels either to receive or send a message.

```cpp
FairMQPollerPtr NewPoller(const std::vector<const FairMQChannel*>& channels)
fair::mq::PollerPtr NewPoller(const std::vector<const fair::mq::Channel*>& channels)
```
**list channels**: This poller waits on all supplied channels. Currently, it is limited to channels of the same transport type only.
Expand Down
29 changes: 17 additions & 12 deletions examples/readout/receiver.cxx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
Expand All @@ -8,10 +8,15 @@

#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <memory>

using namespace std;
using namespace fair::mq;
namespace bpo = boost::program_options;

struct Receiver : fair::mq::Device
namespace {

struct Receiver : Device
{
void InitTask() override
{
Expand All @@ -21,15 +26,14 @@ struct Receiver : fair::mq::Device

void Run() override
{
FairMQChannel& dataInChannel = fChannels.at("sr").at(0);
Channel& dataInChannel = fChannels.at("sr").at(0);

while (!NewStatePending()) {
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
auto msg(dataInChannel.NewMessage());
dataInChannel.Receive(msg);
// void* ptr = msg->GetData();

if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";
break;
}
}
Expand All @@ -40,13 +44,14 @@ struct Receiver : fair::mq::Device
uint64_t fNumIterations = 0;
};

} // namespace

void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
options.add_options()(
"max-iterations",
bpo::value<uint64_t>()->default_value(0),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}

std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Receiver>();
}
unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Receiver>(); }
39 changes: 22 additions & 17 deletions examples/region/sink.cxx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
Expand All @@ -8,42 +8,47 @@

#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <memory>

namespace bpo = boost::program_options;
using namespace std;
using namespace fair::mq;

struct Sink : fair::mq::Device
namespace {

struct Sink : Device
{
void InitTask() override
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
<< ", ptr: " << info.ptr
<< ", size: " << info.size
<< ", flags: " << info.flags;
<< (info.managed ? "managed" : "unmanaged") << ", id: " << info.id
<< ", ptr: " << info.ptr << ", size: " << info.size
<< ", flags: " << info.flags;
});
}

void Run() override
{
FairMQChannel& dataInChannel = fChannels.at("data").at(0);
Channel& dataInChannel = fChannels.at("data").at(0);

while (!NewStatePending()) {
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
auto msg(dataInChannel.Transport()->CreateMessage());
dataInChannel.Receive(msg);

// void* ptr = msg->GetData();
// char* cptr = static_cast<char*>(ptr);
// LOG(info) << "check: " << cptr[3];

if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";
break;
}
}
}

void ResetTask() override
{
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();
Expand All @@ -54,14 +59,14 @@ struct Sink : fair::mq::Device
uint64_t fNumIterations = 0;
};

} // namespace

void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
options.add_options()(
"max-iterations",
bpo::value<uint64_t>()->default_value(0),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}

std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}
unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Sink>(); }
11 changes: 6 additions & 5 deletions fairmq/JSONParser.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
*/

#include "JSONParser.h"
#include "FairMQChannel.h"
#include <fairmq/PropertyOutput.h>
#include <fairmq/tools/Strings.h>

#include <fairlogger/Logger.h>

#include <boost/any.hpp>
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#include <boost/property_tree/json_parser.hpp>
#undef BOOST_BIND_GLOBAL_PLACEHOLDERS
#include <boost/property_tree/ptree.hpp>
#include <boost/any.hpp>

#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/JSONParser.h>
#include <fairmq/PropertyOutput.h>
#include <fairmq/tools/Strings.h>
#include <iomanip>

using namespace std;
Expand Down
10 changes: 5 additions & 5 deletions fairmq/PluginServices.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
Expand All @@ -9,8 +9,8 @@
#ifndef FAIR_MQ_PLUGINSERVICES_H
#define FAIR_MQ_PLUGINSERVICES_H

#include <fairmq/Device.h>
#include <fairmq/States.h>
#include <FairMQDevice.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/Properties.h>

Expand Down Expand Up @@ -40,7 +40,7 @@ class PluginServices
{
public:
PluginServices() = delete;
PluginServices(ProgOptions& config, FairMQDevice& device)
PluginServices(ProgOptions& config, Device& device)
: fConfig(config)
, fDevice(device)
{}
Expand Down Expand Up @@ -117,7 +117,7 @@ class PluginServices
/// The state transition may not happen immediately, but when the current state evaluates the
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
/// If the device control role has not been taken yet, calling this function will take over control implicitely.
auto ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> bool;
auto ChangeDeviceState(const std::string& controller, DeviceStateTransition next) -> bool;

/// @brief Subscribe with a callback to device state changes
/// @param subscriber id
Expand Down Expand Up @@ -269,7 +269,7 @@ class PluginServices

private:
fair::mq::ProgOptions& fConfig;
FairMQDevice& fDevice;
Device& fDevice;
boost::optional<std::string> fDeviceController;
mutable std::mutex fDeviceControllerMutex;
std::condition_variable fReleaseDeviceControlCondition;
Expand Down
18 changes: 8 additions & 10 deletions fairmq/ProgOptions.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
Expand All @@ -9,22 +9,20 @@
#ifndef FAIR_MQ_PROGOPTIONS_H
#define FAIR_MQ_PROGOPTIONS_H

#include "FairMQChannel.h"
#include "FairMQLogger.h"
#include <boost/program_options.hpp>
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/EventManager.h>
#include <fairmq/ProgOptionsFwd.h>
#include <fairmq/Properties.h>
#include <fairmq/tools/Strings.h>

#include <boost/program_options.hpp>

#include <functional>
#include <map>
#include <mutex>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <vector>
#include <stdexcept>

namespace fair::mq
{
Expand All @@ -38,10 +36,10 @@ class ProgOptions
virtual ~ProgOptions() = default;

void ParseAll(const std::vector<std::string>& cmdArgs, bool allowUnregistered);
void ParseAll(const int argc, char const* const* argv, bool allowUnregistered = true);
void ParseAll(int argc, char const* const* argv, bool allowUnregistered = true);
void Notify();

void AddToCmdLineOptions(const boost::program_options::options_description optDesc, bool visible = true);
void AddToCmdLineOptions(boost::program_options::options_description optDesc, bool visible = true);
boost::program_options::options_description& GetCmdLineOptions();

/// @brief Checks a property with the given key exist in the configuration
Expand Down Expand Up @@ -174,7 +172,7 @@ class ProgOptions
/// @brief Takes the provided channel and creates properties based on it
/// @param name channel name
/// @param channel FairMQChannel reference
void AddChannel(const std::string& name, const FairMQChannel& channel);
void AddChannel(const std::string& name, const Channel& channel);

/// @brief Subscribe to property updates of type T
/// @param subscriber
Expand Down
12 changes: 6 additions & 6 deletions fairmq/TransportFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,21 @@ class TransportFactory
/// @brief Create new Message of specified size
/// @param size message size
/// @return pointer to Message
virtual MessagePtr CreateMessage(const size_t size) = 0;
virtual MessagePtr CreateMessage(size_t size) = 0;
/// @brief Create new Message of specified size and alignment
/// @param size message size
/// @param alignment message alignment
/// @return pointer to Message
virtual MessagePtr CreateMessage(const size_t size, Alignment alignment) = 0;
virtual MessagePtr CreateMessage(size_t size, Alignment alignment) = 0;
/// @brief Create new Message with user provided buffer and size
/// @param data pointer to user provided buffer
/// @param size size of the user provided buffer
/// @param ffn callback, called when the message is transfered (and can be deleted)
/// @param obj optional helper pointer that can be used in the callback
/// @return pointer to Message
virtual MessagePtr CreateMessage(void* data,
const size_t size,
fairmq_free_fn* ffn,
size_t size,
FreeFn* ffn,
void* hint = nullptr) = 0;
/// @brief create a message with the buffer located within the corresponding unmanaged region
/// @param unmanagedRegion the unmanaged region that this message buffer belongs to
Expand All @@ -82,8 +82,8 @@ class TransportFactory
/// @param hint optional parameter, returned to the user in the RegionCallback
virtual MessagePtr CreateMessage(UnmanagedRegionPtr& unmanagedRegion,
void* data,
const size_t size,
void* hint = 0) = 0;
size_t size,
void* hint = nullptr) = 0;

/// @brief Create a socket
virtual SocketPtr CreateSocket(const std::string& type, const std::string& name) = 0;
Expand Down
Loading

0 comments on commit 4e8f247

Please sign in to comment.