Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebased and pre-commit checked #951

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
138 changes: 116 additions & 22 deletions plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#include "datastream_zmq.h"
#include "ui_datastream_zmq.h"

#include <QMessageBox>
#include "PlotJuggler/messageparser_base.h"
#include <QDebug>
#include <QSettings>
#include <QDialog>
#include <QIntValidator>
#include <QMessageBox>
#include <QSettings>
#include <chrono>
#include "PlotJuggler/messageparser_base.h"
#include <iostream>

using namespace PJ;

Expand Down Expand Up @@ -73,7 +74,7 @@ bool DataStreamZMQ::start(QStringList*)
QString address = settings.value("ZMQ_Subscriber::address", "localhost").toString();
QString protocol = settings.value("ZMQ_Subscriber::protocol", "JSON").toString();
QString topics = settings.value("ZMQ_Subscriber::topics", "").toString();
bool is_connect = settings.value("ZMQ_Subscriber::is_connect", true).toBool();
_is_connect = settings.value("ZMQ_Subscriber::is_connect", true).toBool();

QString previous_address = address;

Expand All @@ -86,11 +87,11 @@ bool DataStreamZMQ::start(QStringList*)
else
{
previous_address = dialog->ui->lineEditAddress->text();
dialog->ui->lineEditAddress->setText("*");
dialog->ui->lineEditAddress->setText("0.0.0.0");
}
});

if (is_connect)
if (_is_connect)
{
dialog->ui->radioConnect->setChecked(true);
}
Expand All @@ -105,21 +106,19 @@ bool DataStreamZMQ::start(QStringList*)
dialog->ui->lineEditPort->setText(QString::number(port));
dialog->ui->lineEditTopics->setText(topics);

ParserFactoryPlugin::Ptr parser_creator;

connect(dialog->ui->comboBoxProtocol,
qOverload<const QString&>(&QComboBox::currentIndexChanged), this,
[&](const QString& selected_protocol) {
if (parser_creator)
if (_parser_creator)
{
if (auto prev_widget = parser_creator->optionsWidget())
if (auto prev_widget = _parser_creator->optionsWidget())
{
prev_widget->setVisible(false);
}
}
parser_creator = parserFactories()->at(selected_protocol);
_parser_creator = parserFactories()->at(selected_protocol);

if (auto widget = parser_creator->optionsWidget())
if (auto widget = _parser_creator->optionsWidget())
{
widget->setVisible(true);
}
Expand All @@ -138,21 +137,21 @@ bool DataStreamZMQ::start(QStringList*)
port = dialog->ui->lineEditPort->text().toUShort(&ok);
protocol = dialog->ui->comboBoxProtocol->currentText();
topics = dialog->ui->lineEditTopics->text();
is_connect = dialog->ui->radioConnect->isChecked();
_is_connect = dialog->ui->radioConnect->isChecked();

_parser = parser_creator->createParser({}, {}, {}, dataMap());
_parser = _parser_creator->createParser({}, {}, {}, dataMap());

// save back to service
settings.setValue("ZMQ_Subscriber::address", address);
settings.setValue("ZMQ_Subscriber::protocol", protocol);
settings.setValue("ZMQ_Subscriber::port", port);
settings.setValue("ZMQ_Subscriber::topics", topics);
settings.setValue("ZMQ_Subscriber::is_connect", is_connect);
settings.setValue("ZMQ_Subscriber::is_connect", _is_connect);

QString addr =
dialog->ui->comboBox->currentText() + address + ":" + QString::number(port);
_socket_address = addr.toStdString();
if (is_connect)
if (_is_connect)
{
_zmq_socket.connect(_socket_address.c_str());
}
Expand All @@ -164,6 +163,12 @@ bool DataStreamZMQ::start(QStringList*)
parseTopicFilters(topics);
subscribeTopics();

// Add a parser for each topic
for (const auto& topic : _topic_filters)
{
_parsers[topic] = _parser_creator->createParser(topic, {}, {}, dataMap());
}

_zmq_socket.set(zmq::sockopt::rcvtimeo, 100);

qDebug() << "ZMQ listening on address" << QString::fromStdString(_socket_address);
Expand All @@ -188,7 +193,14 @@ void DataStreamZMQ::shutdown()

unsubscribeTopics();

_zmq_socket.disconnect(_socket_address.c_str());
if (_is_connect)
{
_zmq_socket.disconnect(_socket_address.c_str());
}
else
{
_zmq_socket.unbind(_socket_address.c_str());
}
}
}

Expand All @@ -199,18 +211,79 @@ void DataStreamZMQ::receiveLoop()
zmq::message_t recv_msg;
zmq::recv_result_t result = _zmq_socket.recv(recv_msg);

if (recv_msg.size() > 0)
// If we did not receive anything, continue
if (recv_msg.size() <= 0)
{
continue;
}

// If there are more parts, then it is the topic
std::string topic = "";
if (recv_msg.more())
{
using namespace std::chrono;
auto ts = high_resolution_clock::now().time_since_epoch();
double timestamp = 1e-6 * double(duration_cast<microseconds>(ts).count());
topic =
std::string(reinterpret_cast<const char*>(recv_msg.data()), recv_msg.size());

PJ::MessageRef msg(reinterpret_cast<uint8_t*>(recv_msg.data()), recv_msg.size());
// Then it is the payload
recv_msg.rebuild();
result = _zmq_socket.recv(recv_msg);

// If we did not receive anything, continue
if (recv_msg.size() <= 0)
{
continue;
}
}

PJ::MessageRef msg{ PJ::MessageRef(reinterpret_cast<uint8_t*>(recv_msg.data()),
recv_msg.size()) };

// If there are more parts, then it is the timestamp
double timestamp = 0.0;
if (recv_msg.more())
{
recv_msg.rebuild();
result = _zmq_socket.recv(recv_msg);

if (recv_msg.size() > 0)
{
// The timestamp is the seconds since the epoch as a string
timestamp = std::stod(
std::string(reinterpret_cast<const char*>(recv_msg.data()), recv_msg.size()));
}
}
else
{
// If there are no more parts, the timestamp is the current time
timestamp = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count() *
1e-6;
}

// Parse the message without a topic if it is empty
if (topic.empty())
{
if (parseMessage(msg, timestamp))
{
emit this->dataReceived();
}
}
// Otherwise, parse the message with the topic
else
{
if (parseMessage(topic, msg, timestamp))
{
emit this->dataReceived();
}
}

// Extinguish remaining parts (if any)
while (recv_msg.more())
{
recv_msg.rebuild();
result = _zmq_socket.recv(recv_msg);
}
}
}

Expand All @@ -228,6 +301,27 @@ bool DataStreamZMQ::parseMessage(const PJ::MessageRef& msg, double& timestamp)
}
}

bool DataStreamZMQ::parseMessage(const std::string& topic, const PJ::MessageRef& msg,
double& timestamp)
{
try
{
std::lock_guard<std::mutex> lock(mutex());
// If the topic is not in the map keys, create a new parser
if (_parsers.find(topic) == _parsers.end())
{
_parsers[topic] = _parser_creator->createParser(topic, {}, {}, dataMap());
}

_parsers[topic]->parseMessage(msg, timestamp);
return true;
}
catch (...)
{
return false;
}
}

void DataStreamZMQ::parseTopicFilters(const QString& topic_filters)
{
const QRegExp regex("(,{0,1}\\s+)|(;\\s*)");
Expand Down
12 changes: 9 additions & 3 deletions plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once
#include <QDialog>

#include <QtPlugin>
#include <thread>
#include "PlotJuggler/datastreamer_base.h"
#include "PlotJuggler/messageparser_base.h"
#include "ui_datastream_zmq.h"
#include "zmq.hpp"
#include <QtPlugin>
#include <map>
#include <string>
#include <thread>

class StreamZMQDialog : public QDialog
{
Expand Down Expand Up @@ -57,9 +59,13 @@ class DataStreamZMQ : public PJ::DataStreamer
std::string _socket_address;
std::thread _receive_thread;
std::vector<std::string> _topic_filters;

std::map<std::string, PJ::MessageParserPtr> _parsers;
PJ::ParserFactoryPlugin::Ptr _parser_creator;
bool _is_connect = false;
void receiveLoop();
bool parseMessage(const PJ::MessageRef& msg, double& timestamp);
bool parseMessage(const std::string& topic, const PJ::MessageRef& msg,
double& timestamp);
void parseTopicFilters(const QString& filters);
void subscribeTopics();
void unsubscribeTopics();
Expand Down
49 changes: 34 additions & 15 deletions plotjuggler_plugins/DataStreamZMQ/utilities/start_test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@
import json
import argparse

from time import sleep
from time import sleep, time_ns
import numpy as np

PORT = 9872

parser = argparse.ArgumentParser("start_test_publisher")

parser.add_argument("--topic|-t",
dest="topic",
help="Topic on which messages will be published",
type=str,
required=False)
parser.add_argument(
"--topic|-t",
dest="topic",
help="Topic on which messages will be published",
type=str,
required=False,
)
parser.add_argument(
"--timestamp",
dest="timestamp",
help="Send timestamp as message part, requires topic to work properly",
required=False,
action="store_true",
)

args = parser.parse_args()
topic = args.topic
timestamp = args.timestamp


def main():
Expand All @@ -29,28 +39,37 @@ def main():
ticks = 0

while True:
out_str = []
packet = []
data = {
"ticks": ticks,
"data": {
"cos": math.cos(ticks),
"sin": math.sin(ticks),
"floor": np.floor(np.cos(ticks)),
"ceil": np.ceil(np.cos(ticks))
}
"ceil": np.ceil(np.cos(ticks)),
},
}

if topic:
print(f"[{topic}] - " + json.dumps(data))
server_socket.send_multipart(
[topic.encode(), json.dumps(data).encode()])
else:
print(json.dumps(data))
server_socket.send(json.dumps(data).encode())
out_str.append(f"[{topic}] - ")
packet.append(topic.encode())

out_str.append(json.dumps(data))
packet.append(out_str[-1].encode())

if timestamp:
timestamp_s = str(time_ns() * 1e-9)
out_str.append(" - timestamp: " + timestamp_s)
packet.append(timestamp_s.encode())

print("".join(out_str))
server_socket.send_multipart(packet)

ticks += 1

sleep(0.1)


if __name__ == '__main__':
if __name__ == "__main__":
main()