diff --git a/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp b/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp index ec23ebd95..c6657b657 100644 --- a/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp +++ b/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp @@ -1,13 +1,14 @@ #include "datastream_zmq.h" #include "ui_datastream_zmq.h" -#include +#include "PlotJuggler/messageparser_base.h" #include -#include #include #include +#include +#include #include -#include "PlotJuggler/messageparser_base.h" +#include using namespace PJ; @@ -73,7 +74,7 @@ bool DataStreamZMQ::start(QStringList*) QString address = settings.value("ZMQ_Subscriber::address", "localhost").toString(); QString protocol = settings.value("ZMQ_Subscriber::protocol", "JSON").toString(); QString topics = settings.value("ZMQ_Subscriber::topics", "").toString(); - bool is_connect = settings.value("ZMQ_Subscriber::is_connect", true).toBool(); + _is_connect = settings.value("ZMQ_Subscriber::is_connect", true).toBool(); QString previous_address = address; @@ -86,11 +87,11 @@ bool DataStreamZMQ::start(QStringList*) else { previous_address = dialog->ui->lineEditAddress->text(); - dialog->ui->lineEditAddress->setText("*"); + dialog->ui->lineEditAddress->setText("0.0.0.0"); } }); - if (is_connect) + if (_is_connect) { dialog->ui->radioConnect->setChecked(true); } @@ -105,21 +106,19 @@ bool DataStreamZMQ::start(QStringList*) dialog->ui->lineEditPort->setText(QString::number(port)); dialog->ui->lineEditTopics->setText(topics); - ParserFactoryPlugin::Ptr parser_creator; - connect(dialog->ui->comboBoxProtocol, qOverload(&QComboBox::currentIndexChanged), this, [&](const QString& selected_protocol) { - if (parser_creator) + if (_parser_creator) { - if (auto prev_widget = parser_creator->optionsWidget()) + if (auto prev_widget = _parser_creator->optionsWidget()) { prev_widget->setVisible(false); } } - parser_creator = parserFactories()->at(selected_protocol); + _parser_creator = parserFactories()->at(selected_protocol); - if (auto widget = parser_creator->optionsWidget()) + if (auto widget = _parser_creator->optionsWidget()) { widget->setVisible(true); } @@ -138,21 +137,21 @@ bool DataStreamZMQ::start(QStringList*) port = dialog->ui->lineEditPort->text().toUShort(&ok); protocol = dialog->ui->comboBoxProtocol->currentText(); topics = dialog->ui->lineEditTopics->text(); - is_connect = dialog->ui->radioConnect->isChecked(); + _is_connect = dialog->ui->radioConnect->isChecked(); - _parser = parser_creator->createParser({}, {}, {}, dataMap()); + _parser = _parser_creator->createParser({}, {}, {}, dataMap()); // save back to service settings.setValue("ZMQ_Subscriber::address", address); settings.setValue("ZMQ_Subscriber::protocol", protocol); settings.setValue("ZMQ_Subscriber::port", port); settings.setValue("ZMQ_Subscriber::topics", topics); - settings.setValue("ZMQ_Subscriber::is_connect", is_connect); + settings.setValue("ZMQ_Subscriber::is_connect", _is_connect); QString addr = dialog->ui->comboBox->currentText() + address + ":" + QString::number(port); _socket_address = addr.toStdString(); - if (is_connect) + if (_is_connect) { _zmq_socket.connect(_socket_address.c_str()); } @@ -164,6 +163,12 @@ bool DataStreamZMQ::start(QStringList*) parseTopicFilters(topics); subscribeTopics(); + // Add a parser for each topic + for (const auto& topic : _topic_filters) + { + _parsers[topic] = _parser_creator->createParser(topic, {}, {}, dataMap()); + } + _zmq_socket.set(zmq::sockopt::rcvtimeo, 100); qDebug() << "ZMQ listening on address" << QString::fromStdString(_socket_address); @@ -188,7 +193,14 @@ void DataStreamZMQ::shutdown() unsubscribeTopics(); - _zmq_socket.disconnect(_socket_address.c_str()); + if (_is_connect) + { + _zmq_socket.disconnect(_socket_address.c_str()); + } + else + { + _zmq_socket.unbind(_socket_address.c_str()); + } } } @@ -199,18 +211,79 @@ void DataStreamZMQ::receiveLoop() zmq::message_t recv_msg; zmq::recv_result_t result = _zmq_socket.recv(recv_msg); - if (recv_msg.size() > 0) + // If we did not receive anything, continue + if (recv_msg.size() <= 0) + { + continue; + } + + // If there are more parts, then it is the topic + std::string topic = ""; + if (recv_msg.more()) { - using namespace std::chrono; - auto ts = high_resolution_clock::now().time_since_epoch(); - double timestamp = 1e-6 * double(duration_cast(ts).count()); + topic = + std::string(reinterpret_cast(recv_msg.data()), recv_msg.size()); - PJ::MessageRef msg(reinterpret_cast(recv_msg.data()), recv_msg.size()); + // Then it is the payload + recv_msg.rebuild(); + result = _zmq_socket.recv(recv_msg); + + // If we did not receive anything, continue + if (recv_msg.size() <= 0) + { + continue; + } + } + + PJ::MessageRef msg{ PJ::MessageRef(reinterpret_cast(recv_msg.data()), + recv_msg.size()) }; + + // If there are more parts, then it is the timestamp + double timestamp = 0.0; + if (recv_msg.more()) + { + recv_msg.rebuild(); + result = _zmq_socket.recv(recv_msg); + + if (recv_msg.size() > 0) + { + // The timestamp is the seconds since the epoch as a string + timestamp = std::stod( + std::string(reinterpret_cast(recv_msg.data()), recv_msg.size())); + } + } + else + { + // If there are no more parts, the timestamp is the current time + timestamp = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count() * + 1e-6; + } + + // Parse the message without a topic if it is empty + if (topic.empty()) + { if (parseMessage(msg, timestamp)) { emit this->dataReceived(); } } + // Otherwise, parse the message with the topic + else + { + if (parseMessage(topic, msg, timestamp)) + { + emit this->dataReceived(); + } + } + + // Extinguish remaining parts (if any) + while (recv_msg.more()) + { + recv_msg.rebuild(); + result = _zmq_socket.recv(recv_msg); + } } } @@ -228,6 +301,27 @@ bool DataStreamZMQ::parseMessage(const PJ::MessageRef& msg, double& timestamp) } } +bool DataStreamZMQ::parseMessage(const std::string& topic, const PJ::MessageRef& msg, + double& timestamp) +{ + try + { + std::lock_guard lock(mutex()); + // If the topic is not in the map keys, create a new parser + if (_parsers.find(topic) == _parsers.end()) + { + _parsers[topic] = _parser_creator->createParser(topic, {}, {}, dataMap()); + } + + _parsers[topic]->parseMessage(msg, timestamp); + return true; + } + catch (...) + { + return false; + } +} + void DataStreamZMQ::parseTopicFilters(const QString& topic_filters) { const QRegExp regex("(,{0,1}\\s+)|(;\\s*)"); diff --git a/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h b/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h index c028337fd..53a5c64f5 100644 --- a/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h +++ b/plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h @@ -1,12 +1,14 @@ #pragma once #include -#include -#include #include "PlotJuggler/datastreamer_base.h" #include "PlotJuggler/messageparser_base.h" #include "ui_datastream_zmq.h" #include "zmq.hpp" +#include +#include +#include +#include class StreamZMQDialog : public QDialog { @@ -57,9 +59,13 @@ class DataStreamZMQ : public PJ::DataStreamer std::string _socket_address; std::thread _receive_thread; std::vector _topic_filters; - + std::map _parsers; + PJ::ParserFactoryPlugin::Ptr _parser_creator; + bool _is_connect = false; void receiveLoop(); bool parseMessage(const PJ::MessageRef& msg, double& timestamp); + bool parseMessage(const std::string& topic, const PJ::MessageRef& msg, + double& timestamp); void parseTopicFilters(const QString& filters); void subscribeTopics(); void unsubscribeTopics(); diff --git a/plotjuggler_plugins/DataStreamZMQ/utilities/start_test_publisher.py b/plotjuggler_plugins/DataStreamZMQ/utilities/start_test_publisher.py index b6818de40..93abc2edc 100755 --- a/plotjuggler_plugins/DataStreamZMQ/utilities/start_test_publisher.py +++ b/plotjuggler_plugins/DataStreamZMQ/utilities/start_test_publisher.py @@ -5,21 +5,31 @@ import json import argparse -from time import sleep +from time import sleep, time_ns import numpy as np PORT = 9872 parser = argparse.ArgumentParser("start_test_publisher") -parser.add_argument("--topic|-t", - dest="topic", - help="Topic on which messages will be published", - type=str, - required=False) +parser.add_argument( + "--topic|-t", + dest="topic", + help="Topic on which messages will be published", + type=str, + required=False, +) +parser.add_argument( + "--timestamp", + dest="timestamp", + help="Send timestamp as message part, requires topic to work properly", + required=False, + action="store_true", +) args = parser.parse_args() topic = args.topic +timestamp = args.timestamp def main(): @@ -29,28 +39,37 @@ def main(): ticks = 0 while True: + out_str = [] + packet = [] data = { "ticks": ticks, "data": { "cos": math.cos(ticks), "sin": math.sin(ticks), "floor": np.floor(np.cos(ticks)), - "ceil": np.ceil(np.cos(ticks)) - } + "ceil": np.ceil(np.cos(ticks)), + }, } if topic: - print(f"[{topic}] - " + json.dumps(data)) - server_socket.send_multipart( - [topic.encode(), json.dumps(data).encode()]) - else: - print(json.dumps(data)) - server_socket.send(json.dumps(data).encode()) + out_str.append(f"[{topic}] - ") + packet.append(topic.encode()) + + out_str.append(json.dumps(data)) + packet.append(out_str[-1].encode()) + + if timestamp: + timestamp_s = str(time_ns() * 1e-9) + out_str.append(" - timestamp: " + timestamp_s) + packet.append(timestamp_s.encode()) + + print("".join(out_str)) + server_socket.send_multipart(packet) ticks += 1 sleep(0.1) -if __name__ == '__main__': +if __name__ == "__main__": main()