Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the ability to explicitly set type names and some QoS settings #61

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,58 +275,66 @@ client:

```yaml
bridge:
ros2mqtt: # Array specifying which ROS topics to map to which MQTT topics
ros2mqtt: # Array specifying which ROS topics to map to which MQTT topics
- ros_topic: # ROS topic whose messages are transformed to MQTT messages
mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker
primitive: # [false] whether to publish as primitive message
inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)
advanced:
ros:
queue_size: # [1] ROS subscriber queue size
queue_size: # [1] ROS subscriber queue size
mqtt:
qos: # [0] MQTT QoS value
retained: # [false] whether to retain MQTT message
mqtt2ros: # Array specifying which MQTT topics to map to which ROS topics
qos: # [0] MQTT QoS value
retained: # [false] whether to retain MQTT message
mqtt2ros: # Array specifying which MQTT topics to map to which ROS topics
- mqtt_topic: # MQTT topic on which messages are received from the broker
ros_topic: # ROS topic on which corresponding MQTT messages are published
primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client)
advanced:
mqtt:
qos: # [0] MQTT QoS value
qos: # [0] MQTT QoS value
ros:
queue_size: # [1] ROS publisher queue size
latched: # [false] whether to latch ROS message
queue_size: # [1] ROS publisher queue size
latched: # [false] whether to latch ROS message
```

##### ROS 2

```yaml
bridge:
ros2mqtt: # Object specifying which ROS topics to map to which MQTT topics
ros_topics: # Array specifying which ROS topics to bridge
ros2mqtt: # Object specifying which ROS topics to map to which MQTT topics
ros_topics: # Array specifying which ROS topics to bridge
- {{ ros_topic_name }} # The ROS topic that should be bridged, corresponds to the sub-object in the YAML
{{ ros_topic_name }}:
mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker
primitive: # [false] whether to publish as primitive message
ros_type: # [*empty*] If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the publisher
inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)
advanced:
ros:
queue_size: # [1] ROS subscriber queue size
queue_size: # [1] ROS subscriber queue size
qos:
reliability: # [auto] One of "auto", "system_default", "reliable", "best_effort". If auto, the QoS is automatically determined via the publisher
durability: # [auto] One of "auto", "system_default", "volatile", "transient_local". If auto, the QoS is automatically determined via the publisher
mqtt:
qos: # [0] MQTT QoS value
retained: # [false] whether to retain MQTT message
mqtt2ros: # Object specifying which MQTT topics to map to which ROS topics
mqtt_topics: # Array specifying which ROS topics to bridge
qos: # [0] MQTT QoS value
retained: # [false] whether to retain MQTT message
mqtt2ros: # Object specifying which MQTT topics to map to which ROS topics
mqtt_topics: # Array specifying which ROS topics to bridge
- {{ mqtt_topic_name }} # The MQTT topic that should be bridged, corresponds to the sub-object in the YAML
{{ mqtt_topic_name }}:
ros_topic: # ROS topic on which corresponding MQTT messages are published
ros_type: # [*empty*] If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the MQTT message
primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client)
advanced:
mqtt:
qos: # [0] MQTT QoS value
qos: # [0] MQTT QoS value
ros:
queue_size: # [1] ROS publisher queue size
latched: # [false] whether to latch ROS message
queue_size: # [1] ROS publisher queue size
latched: # [false] whether to latch ROS message
qos:
reliability: # [system_default] One of "system_default", "reliable", "best_effort".
durability: # [system_default] One of "system_default", "volatile", "transient_local".
```

## Primitive Messages
Expand Down
34 changes: 34 additions & 0 deletions mqtt_client/config/params.ros2.fixed.yaml
JayHerpin marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**/*:
ros__parameters:
broker:
host: localhost
port: 1883
bridge:
ros2mqtt:
ros_topics:
- /ping/primitive
/ping/primitive:
mqtt_topic: pingpong/primitive
# This is implied by the explicit ros type
#primitive: true
ros_type: std_msgs/msg/Bool
lreiher marked this conversation as resolved.
Show resolved Hide resolved
advanced:
ros:
queue_size: 10
qos:
durability: auto
reliability: auto
mqtt2ros:
mqtt_topics:
- pingpong/primitive
pingpong/primitive:
ros_topic: /pong/primitive
#This is implied by the explicit ros type
#primitive: true
ros_type: std_msgs/msg/Bool
advanced:
ros:
queue_size: 10
qos:
durability: transient_local
reliability: reliable
103 changes: 78 additions & 25 deletions mqtt_client/include/mqtt_client/MqttClient.ros2.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ SOFTWARE.
#include <filesystem>
#include <map>
#include <memory>
#include <optional>
#include <string>

#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <mqtt/async_client.h>
#include <mqtt_client_interfaces/srv/is_connected.hpp>
#include <mqtt_client_interfaces/srv/new_mqtt2_ros_bridge.hpp>
#include <mqtt_client_interfaces/srv/new_ros2_mqtt_bridge.hpp>
#include <mqtt/async_client.h>
#include <rclcpp/rclcpp.hpp>
#include <rclcpp/serialization.hpp>
#include <rclcpp/qos.hpp>
#include <std_msgs/msg/float64.hpp>


Expand Down Expand Up @@ -71,6 +73,9 @@ class MqttClient : public rclcpp::Node,
explicit MqttClient(const rclcpp::NodeOptions& options);

protected:
class Ros2MqttInterface;
class Mqtt2RosInterface;
JayHerpin marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Loads ROS parameters from parameter server.
*/
Expand Down Expand Up @@ -98,8 +103,7 @@ class MqttClient : public rclcpp::Node,
* @return true if parameter was successfully retrieved
* @return false if parameter was not found or default was used
*/
bool loadParameter(const std::string& key, std::string& value,
const std::string& default_value);
bool loadParameter(const std::string& key, std::string& value, const std::string& default_value);

/**
* @brief Loads requested ROS parameter from parameter server.
Expand Down Expand Up @@ -180,10 +184,42 @@ class MqttClient : public rclcpp::Node,
void setup();

/**
* @brief Checks all active ROS topics in order to set up generic subscribers.
* @brief Get the resolved compatible QOS from the interface and the endpoint
*
* This uses the two endpoints to decide upon a compatible QoS, resolving any "auto" QoS settings
*
* @param ros_topic the ROS topic we are looking on
* @param tei Topic endpoint info
* @param ros2mqtt the ROS to MQTT interface spec
*
* @returns The compatible QoS or nullopt if no compatible combination is found
*/
std::optional<rclcpp::QoS> getCompatibleQoS(
const std::string& ros_topic, const rclcpp::TopicEndpointInfo& tei,
const Ros2MqttInterface& ros2mqtt) const;

/**
* @brief Get the candiate topic endpoints for subscription matching
*
* @param ros2mqtt the ROS to MQTT interface spec
*
* @returns The compatible QoS or nullopt if no compatible combination is found
*/
std::vector<rclcpp::TopicEndpointInfo> getCandidatePublishers(
const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const;

/**
* @brief Setup any subscriptions we can.
*
* These may be fixed type/QoS, or dynamically matched against active publisher
*/
void setupSubscriptions();

/**
* @brief Setup any publishers that we can
*/
void setupPublishers();

/**
* @brief Sets up the client connection options and initializes the client
* object.
Expand Down Expand Up @@ -242,6 +278,15 @@ class MqttClient : public rclcpp::Node,
*/
void mqtt2primitive(mqtt::const_message_ptr mqtt_msg);

/**
* @brief Publishes a primitive message received via MQTT to ROS.
*
* Currently not implemented.
*
* @param mqtt_msg MQTT message
*/
void mqtt2fixed(mqtt::const_message_ptr mqtt_msg);

JayHerpin marked this conversation as resolved.
Show resolved Hide resolved
/**
* @brief Callback for when the client has successfully connected to the
* broker.
Expand Down Expand Up @@ -351,10 +396,9 @@ class MqttClient : public rclcpp::Node,
std::string user; ///< username
std::string pass; ///< password
struct {
bool enabled; ///< whether to connect via SSL/TLS
std::filesystem::path
ca_certificate; ///< public CA certificate trusted by client
} tls; ///< SSL/TLS-related variables
bool enabled; ///< whether to connect via SSL/TLS
std::filesystem::path ca_certificate; ///< public CA certificate trusted by client
} tls; ///< SSL/TLS-related variables
};

/**
Expand All @@ -377,14 +421,14 @@ class MqttClient : public rclcpp::Node,
double keep_alive_interval; ///< keep-alive interval
int max_inflight; ///< maximum number of inflight messages
struct {
std::filesystem::path certificate; ///< client certificate
std::filesystem::path key; ///< client private keyfile
std::string password; ///< decryption password for private key
int version; ///< TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305)
bool verify; ///< Verify the client should conduct
///< post-connect checks
std::vector<std::string> alpn_protos; ///< list of ALPN protocols
} tls; ///< SSL/TLS-related variables
std::filesystem::path certificate; ///< client certificate
std::filesystem::path key; ///< client private keyfile
std::string password; ///< decryption password for private key
int version; ///< TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305)
bool verify; ///< Verify the client should conduct
///< post-connect checks
std::vector<std::string> alpn_protos; ///< list of ALPN protocols
} tls; ///< SSL/TLS-related variables
};

/**
Expand All @@ -397,12 +441,18 @@ class MqttClient : public rclcpp::Node,
std::string msg_type; ///< message type of subscriber
int queue_size = 1; ///< ROS subscriber queue size
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
struct {
// If these are set to nullopt then that part of the QoS is determine automatically based on discovery
std::optional<rclcpp::ReliabilityPolicy> reliability;
std::optional<rclcpp::DurabilityPolicy> durability;
} qos;
} ros; ///< ROS-related variables
struct {
std::string topic; ///< MQTT topic
int qos = 0; ///< MQTT QoS value
bool retained = false; ///< whether to retain MQTT message
} mqtt; ///< MQTT-related variables
bool fixed_type = false; ///< whether the published message type is specified explicitly
bool primitive = false; ///< whether to publish as primitive message
bool stamped = false; ///< whether to inject timestamp in MQTT message
};
Expand All @@ -412,18 +462,23 @@ class MqttClient : public rclcpp::Node,
*/
struct Mqtt2RosInterface {
struct {
int qos = 0; ///< MQTT QoS value
} mqtt; ///< MQTT-related variables
int qos = 0; ///< MQTT QoS value
} mqtt; ///< MQTT-related variables
struct {
std::string topic; ///< ROS topic
std::string msg_type; ///< message type of publisher
rclcpp::GenericPublisher::SharedPtr publisher; ///< generic ROS publisher
rclcpp::Publisher<std_msgs::msg::Float64>::SharedPtr
latency_publisher; ///< ROS publisher for latency
int queue_size = 1; ///< ROS publisher queue size
struct {
rclcpp::ReliabilityPolicy reliability = rclcpp::ReliabilityPolicy::SystemDefault;
rclcpp::DurabilityPolicy durability = rclcpp::DurabilityPolicy::SystemDefault;
} qos;
bool latched = false; ///< whether to latch ROS message
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
} ros; ///< ROS-related variables
} ros; ///< ROS-related variables
bool fixed_type = false; ///< whether the published ros message type is specified explicitly
bool primitive = false; ///< whether to publish as primitive message (if
///< coming from non-ROS MQTT client)
bool stamped = false; ///< whether timestamp is injected
Expand Down Expand Up @@ -535,27 +590,25 @@ bool MqttClient::loadParameter(const std::string& key, T& value,


template <typename T>
bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value)
{
bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value) {
const bool found = get_parameter(key, value);
if (found)
RCLCPP_WARN(get_logger(), "Retrieved parameter '%s' = '[%s]'", key.c_str(),
fmt::format("{}", fmt::join(value, ", ")).c_str());
fmt::format("{}", fmt::join(value, ", ")).c_str());
return found;
}


template <typename T>
bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value,
const std::vector<T>& default_value)
{
const std::vector<T>& default_value) {
const bool found = get_parameter_or(key, value, default_value);
if (!found)
RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'",
key.c_str(), fmt::format("{}", fmt::join(value, ", ")).c_str());
if (found)
RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(),
fmt::format("{}", fmt::join(value, ", ")).c_str());
fmt::format("{}", fmt::join(value, ", ")).c_str());
return found;
}

Expand Down
Loading