Skip to content

Commit

Permalink
Don't use to-be-deprecated names
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx authored and dennisklein committed Jan 24, 2022
1 parent f15f669 commit bfd08bb
Show file tree
Hide file tree
Showing 90 changed files with 416 additions and 396 deletions.
2 changes: 1 addition & 1 deletion examples/1-1/sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct Sampler : fair::mq::Device
// create message object with a pointer to the data buffer, its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg(NewMessage(
fair::mq::MessagePtr msg(NewMessage(
const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
Expand Down
2 changes: 1 addition & 1 deletion examples/1-1/sink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}

bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";

Expand Down
4 changes: 2 additions & 2 deletions examples/1-n-1/processor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct Processor : fair::mq::Device
OnData("data1", &Processor::HandleData);
}

bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";

Expand All @@ -32,7 +32,7 @@ struct Processor : fair::mq::Device
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
fair::mq::MessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));
Expand Down
2 changes: 1 addition & 1 deletion examples/1-n-1/sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct Sampler : fair::mq::Device
{
// Initializing message with NewStaticMessage will avoid copy
// but won't delete the data after the sending is completed.
FairMQMessagePtr msg(NewStaticMessage(fText));
fair::mq::MessagePtr msg(NewStaticMessage(fText));

LOG(info) << "Sending \"" << fText << "\"";

Expand Down
2 changes: 1 addition & 1 deletion examples/1-n-1/sink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}

bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";

Expand Down
4 changes: 2 additions & 2 deletions examples/copypush/sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ struct Sampler : fair::mq::Device
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage(fCounter++));
fair::mq::MessagePtr msg(NewSimpleMessage(fCounter++));

for (int i = 0; i < fNumDataChannels - 1; ++i) {
FairMQMessagePtr msgCopy(NewMessage());
fair::mq::MessagePtr msgCopy(NewMessage());
msgCopy->Copy(*msg);
Send(msgCopy, "data", i);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/copypush/sink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}

bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received message: \"" << *(static_cast<uint64_t*>(msg->GetData())) << "\"";

Expand Down
4 changes: 2 additions & 2 deletions examples/dds/processor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct Processor : fair::mq::Device
OnData("data1", &Processor::HandleData);
}

bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";

Expand All @@ -32,7 +32,7 @@ struct Processor : fair::mq::Device
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
fair::mq::MessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));
Expand Down
2 changes: 1 addition & 1 deletion examples/dds/sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct Sampler : fair::mq::Device
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("Data"));
fair::mq::MessagePtr msg(NewSimpleMessage("Data"));

LOG(info) << "Sending \"Data\"";

Expand Down
2 changes: 1 addition & 1 deletion examples/dds/sink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct Sink : fair::mq::Device
fIterations = fConfig->GetValue<uint64_t>("iterations");
}

bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";

Expand Down
2 changes: 1 addition & 1 deletion examples/multipart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ A topology of two devices - Sampler and Sink, communicating with PUSH-PULL patte

The Sampler sends a multipart message to the Sink, consisting of two message parts - header and body.

Each message part is a regular FairMQMessage. To combine them into a multi-part message use `FairMQParts`. Add messages to `FairMQParts` with `AddPart` method.
Each message part is a regular fair::mq::Message. To combine them into a multi-part message use `fair::mq::Parts`. Add messages to `fair::mq::Parts` with `AddPart` method.

All parts are guaranteed to be delivered together. The Receive call in the sink will recive the entire parts structure.

Expand Down
6 changes: 3 additions & 3 deletions examples/multipart/sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ struct Sampler : fair::mq::Device
}
LOG(info) << "Sending header with stopFlag: " << header.stopFlag;

FairMQParts parts;
fair::mq::Parts parts;

// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
parts.AddPart(NewSimpleMessage(header));
parts.AddPart(NewMessage(1000));

// create more data parts, testing the FairMQParts in-place constructor
FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
// create more data parts, testing the fair::mq::Parts in-place constructor
fair::mq::Parts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
assert(auxData.Size() == 3);
parts.AddPart(std::move(auxData));
assert(auxData.Size() == 0);
Expand Down
2 changes: 1 addition & 1 deletion examples/multipart/sink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct Sink : fair::mq::Device
OnData("data", &Sink::HandleData);
}

bool HandleData(FairMQParts& parts, int)
bool HandleData(fair::mq::Parts& parts, int)
{
LOG(info) << "Received message with " << parts.Size() << " parts";

Expand Down
2 changes: 1 addition & 1 deletion examples/multiple-channels/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ This example demonstrates how to work with multiple channels and multiplex betwe

A topology of three devices - **Sampler**, **Sink** and **Broadcaster**. The Sampler sends data to the Sink via the PUSH-PULL pattern. The Broadcaster device sends a message to both Sampler and Sink containing a string "OK" every second. The Broadcaster sends the message via PUB pattern. Both Sampler and Sink, besides doing their PUSH-PULL job, listen via SUB to the Broadcaster.

The multiplexing between their data channels and the broadcast channels happens with `FairMQPoller`.
The multiplexing between their data channels and the broadcast channels happens with `fair::mq::Poller`.
2 changes: 1 addition & 1 deletion examples/multiple-channels/broadcaster.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct Broadcaster : fair::mq::Device

// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("OK"));
fair::mq::MessagePtr msg(NewSimpleMessage("OK"));

LOG(info) << "Sending OK";

Expand Down
8 changes: 4 additions & 4 deletions examples/multiple-channels/sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
********************************************************************************/

#include <fairmq/Device.h>
#include <FairMQPoller.h>
#include <fairmq/Poller.h>
#include <fairmq/runDevice.h>

#include <chrono>
Expand All @@ -26,13 +26,13 @@ struct Sampler : fair::mq::Device

void Run() override
{
FairMQPollerPtr poller(NewPoller("data", "broadcast"));
fair::mq::PollerPtr poller(NewPoller("data", "broadcast"));

while (!NewStatePending()) {
poller->Poll(100);

if (poller->CheckInput("broadcast", 0)) {
FairMQMessagePtr msg(NewMessage());
fair::mq::MessagePtr msg(NewMessage());

if (Receive(msg, "broadcast") > 0) {
LOG(info) << "Received broadcast: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
Expand All @@ -42,7 +42,7 @@ struct Sampler : fair::mq::Device
if (poller->CheckOutput("data", 0)) {
std::this_thread::sleep_for(std::chrono::seconds(1));

FairMQMessagePtr msg(NewSimpleMessage(fText));
fair::mq::MessagePtr msg(NewSimpleMessage(fText));

if (Send(msg, "data") > 0) {
LOG(info) << "Sent \"" << fText << "\"";
Expand Down
4 changes: 2 additions & 2 deletions examples/multiple-channels/sink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}

bool HandleBroadcast(FairMQMessagePtr& msg, int /*index*/)
bool HandleBroadcast(fair::mq::MessagePtr& msg, int /*index*/)
{
LOG(info) << "Received broadcast: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedBroadcast = true;

return CheckIterations();
}

bool HandleData(FairMQMessagePtr& msg, int /*index*/)
bool HandleData(fair::mq::MessagePtr& msg, int /*index*/)
{
LOG(info) << "Received message: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedData = true;
Expand Down
4 changes: 2 additions & 2 deletions examples/multiple-transports/sampler1.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct Sampler1 : fair::mq::Device
bool ConditionalRun() override
{
// Creates a message using the transport of channel data1
FairMQMessagePtr msg(NewMessageFor("data1", 0, 1000000));
fair::mq::MessagePtr msg(NewMessageFor("data1", 0, 1000000));

// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
Expand All @@ -54,7 +54,7 @@ struct Sampler1 : fair::mq::Device
uint64_t numAcks = 0;

while (!NewStatePending()) {
FairMQMessagePtr ack(NewMessageFor("ack", 0));
fair::mq::MessagePtr ack(NewMessageFor("ack", 0));
if (Receive(ack, "ack") < 0) {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/multiple-transports/sampler2.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct Sampler2 : fair::mq::Device

bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessage(1000));
fair::mq::MessagePtr msg(NewMessage(1000));

// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
Expand Down
6 changes: 3 additions & 3 deletions examples/multiple-transports/sink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ struct Sink : fair::mq::Device
}

// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool HandleData1(FairMQMessagePtr& /*msg*/, int /*index*/)
bool HandleData1(fair::mq::MessagePtr& /*msg*/, int /*index*/)
{
fNumIterations1++;
// Creates a message using the transport of channel ack
FairMQMessagePtr ack(NewMessageFor("ack", 0));
fair::mq::MessagePtr ack(NewMessageFor("ack", 0));
if (Send(ack, "ack") < 0) {
return false;
}
Expand All @@ -40,7 +40,7 @@ struct Sink : fair::mq::Device
}

// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool HandleData2(FairMQMessagePtr& /*msg*/, int /*index*/)
bool HandleData2(fair::mq::MessagePtr& /*msg*/, int /*index*/)
{
fNumIterations2++;
// return true if want to be called again (otherwise go to IDLE state)
Expand Down
6 changes: 3 additions & 3 deletions examples/n-m/receiver.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace bpo = boost::program_options;

struct TFBuffer
{
FairMQParts parts;
fair::mq::Parts parts;
chrono::steady_clock::time_point start;
chrono::steady_clock::time_point end;
};
Expand All @@ -43,7 +43,7 @@ struct Receiver : fair::mq::Device
fMaxTimeframes = GetConfig()->GetValue<int>("max-timeframes");
}

bool HandleData(FairMQParts& parts, int /* index */)
bool HandleData(fair::mq::Parts& parts, int /* index */)
{
Header& h = *(static_cast<Header*>(parts.At(0)->GetData()));
// LOG(info) << "Received sub-time frame #" << h.id << " from Sender" << h.senderIndex;
Expand Down Expand Up @@ -107,7 +107,7 @@ void addCustomOptions(bpo::options_description& options)
("max-timeframes", bpo::value<int>()->default_value(0), "Maximum number of timeframes to receive (0 - unlimited)");
}

std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /* config */)
{
return std::make_unique<Receiver>();
}
8 changes: 4 additions & 4 deletions examples/n-m/sender.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ struct Sender : fair::mq::Device

void Run() override
{
FairMQChannel& dataInChannel = GetChannel("sync", 0);
fair::mq::Channel& dataInChannel = GetChannel("sync", 0);

while (!NewStatePending()) {
Header h;
FairMQMessagePtr id(NewMessage());
fair::mq::MessagePtr id(NewMessage());
if (dataInChannel.Receive(id) > 0) {
h.id = *(static_cast<uint16_t*>(id->GetData()));
h.senderIndex = fIndex;
} else {
continue;
}

FairMQParts parts;
fair::mq::Parts parts;
parts.AddPart(NewSimpleMessage(h));
parts.AddPart(NewMessage(fSubtimeframeSize));

Expand All @@ -66,7 +66,7 @@ void addCustomOptions(bpo::options_description& options)
("subtimeframe-size", bpo::value<int>()->default_value(1000), "Subtimeframe size in bytes")
("num-receivers", bpo::value<int>()->required(), "Number of EPNs");
}
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /* config */)
{
return std::make_unique<Sender>();
}
4 changes: 2 additions & 2 deletions examples/n-m/synchronizer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct Synchronizer : fair::mq::Device
{
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewSimpleMessage(fTimeframeId));
fair::mq::MessagePtr msg(NewSimpleMessage(fTimeframeId));

if (Send(msg, "sync") > 0) {
if (++fTimeframeId == UINT16_MAX - 1) {
Expand All @@ -37,7 +37,7 @@ struct Synchronizer : fair::mq::Device
};

void addCustomOptions(bpo::options_description& /* options */) {}
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /* config */)
{
return std::make_unique<Synchronizer>();
}
4 changes: 2 additions & 2 deletions examples/qc/qCDispatcher.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ struct QCDispatcher : fair::mq::Device
});
}

bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
if (fDoQC.load() == true) {
FairMQMessagePtr msgCopy(NewMessage());
fair::mq::MessagePtr msgCopy(NewMessage());
msgCopy->Copy(*msg);
if (Send(msg, "qc") < 0) {
return false;
Expand Down
4 changes: 2 additions & 2 deletions examples/qc/qCTask.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>

class QCTask : public FairMQDevice
class QCTask : public fair::mq::Device
{
public:
QCTask()
{
OnData("qc", [](FairMQMessagePtr& /*msg*/, int) {
OnData("qc", [](fair::mq::MessagePtr& /*msg*/, int) {
LOG(info) << "received data";
return false;
});
Expand Down
2 changes: 1 addition & 1 deletion examples/qc/sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct Sampler : fair::mq::Device
{
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessage(1000));
fair::mq::MessagePtr msg(NewMessage(1000));

if (Send(msg, "data1") < 0) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion examples/qc/sink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
struct Sink : fair::mq::Device
{
Sink() { OnData("data2", &Sink::HandleData); }
bool HandleData(FairMQMessagePtr& /*msg*/, int /*index*/) { return true; }
bool HandleData(fair::mq::MessagePtr& /*msg*/, int /*index*/) { return true; }
};

namespace bpo = boost::program_options;
Expand Down
Loading

0 comments on commit bfd08bb

Please sign in to comment.