Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish binary time data when use_sim_time parameter is true #114

Merged
merged 4 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions foxglove_bridge_base/include/foxglove_bridge/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
namespace foxglove {

constexpr char SUPPORTED_SUBPROTOCOL[] = "foxglove.websocket.v1";
constexpr char CAPABILITY_CLIENT_PUBLISH[] = "clientPublish";
constexpr char CAPABILITY_TIME[] = "time";

using ChannelId = uint32_t;
using ClientChannelId = uint32_t;
using SubscriptionId = uint32_t;

enum class BinaryOpcode : uint8_t {
MESSAGE_DATA = 1,
TIME_DATA = 2,
};

enum class ClientBinaryOpcode : uint8_t {
Expand Down
23 changes: 21 additions & 2 deletions foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class ServerInterface {

virtual void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp,
std::string_view data) = 0;
virtual void broadcastTime(uint64_t timestamp) = 0;

virtual std::optional<Tcp::endpoint> localEndpoint() = 0;
virtual std::string remoteEndpointString(ConnHandle clientHandle) = 0;
Expand All @@ -164,7 +165,8 @@ class Server final : public ServerInterface {

static bool USES_TLS;

explicit Server(std::string name, LogCallback logger, const std::string& certfile = "",
explicit Server(std::string name, LogCallback logger,
const std::vector<std::string>& capabilities, const std::string& certfile = "",
const std::string& keyfile = "");
virtual ~Server();

Expand All @@ -188,6 +190,7 @@ class Server final : public ServerInterface {

void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp,
std::string_view data) override;
void broadcastTime(uint64_t timestamp) override;

std::optional<Tcp::endpoint> localEndpoint() override;
std::string remoteEndpointString(ConnHandle clientHandle) override;
Expand All @@ -208,6 +211,7 @@ class Server final : public ServerInterface {

std::string _name;
LogCallback _logger;
std::vector<std::string> _capabilities;
std::string _certfile;
std::string _keyfile;
ServerType _server;
Expand Down Expand Up @@ -242,9 +246,11 @@ class Server final : public ServerInterface {

template <typename ServerConfiguration>
inline Server<ServerConfiguration>::Server(std::string name, LogCallback logger,
const std::vector<std::string>& capabilities,
const std::string& certfile, const std::string& keyfile)
: _name(std::move(name))
, _logger(logger)
, _capabilities(capabilities)
, _certfile(certfile)
, _keyfile(keyfile) {
// Redirect logging
Expand Down Expand Up @@ -310,7 +316,7 @@ inline void Server<ServerConfiguration>::handleConnectionOpened(ConnHandle hdl)
con->send(json({
{"op", "serverInfo"},
{"name", _name},
{"capabilities", json::array({"clientPublish"})},
{"capabilities", _capabilities},
})
.dump());

Expand Down Expand Up @@ -814,6 +820,19 @@ inline void Server<ServerConfiguration>::sendMessage(ConnHandle clientHandle, Ch
sendBinary(clientHandle, message);
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::broadcastTime(uint64_t timestamp) {
std::vector<uint8_t> message(1 + 8);
message[0] = uint8_t(BinaryOpcode::TIME_DATA);
foxglove::WriteUint64LE(message.data() + 1, timestamp);

std::shared_lock<std::shared_mutex> lock(_clientsChannelMutex);
for (const auto& [hdl, clientInfo] : _clients) {
(void)clientInfo;
sendBinary(hdl, message);
}
}

template <typename ServerConfiguration>
inline std::optional<asio::ip::tcp::endpoint> Server<ServerConfiguration>::localEndpoint() {
std::error_code ec;
Expand Down
43 changes: 19 additions & 24 deletions ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#define ASIO_STANDALONE

#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -46,6 +45,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
const auto certfile = nhp.param<std::string>("certfile", "");
const auto keyfile = nhp.param<std::string>("keyfile", "");
_maxUpdateMs = static_cast<size_t>(nhp.param<int>("max_update_ms", DEFAULT_MAX_UPDATE_MS));
_useSimTime = nhp.param<bool>("/use_sim_time", false);

const auto regexPatterns = nhp.param<std::vector<std::string>>("topic_whitelist", {".*"});
_topicWhitelistPatterns.reserve(regexPatterns.size());
Expand All @@ -62,14 +62,21 @@ class FoxgloveBridge : public nodelet::Nodelet {
foxglove::WebSocketUserAgent());

try {
std::vector<std::string> serverCapabilities = {
foxglove::CAPABILITY_CLIENT_PUBLISH,
};
if (_useSimTime) {
serverCapabilities.push_back(foxglove::CAPABILITY_TIME);
}

const auto logHandler =
std::bind(&FoxgloveBridge::logHandler, this, std::placeholders::_1, std::placeholders::_2);
if (useTLS) {
_server = std::make_unique<foxglove::Server<foxglove::WebSocketTls>>(
"foxglove_bridge", std::move(logHandler), certfile, keyfile);
"foxglove_bridge", std::move(logHandler), serverCapabilities, certfile, keyfile);
} else {
_server = std::make_unique<foxglove::Server<foxglove::WebSocketNoTls>>(
"foxglove_bridge", std::move(logHandler));
"foxglove_bridge", std::move(logHandler), serverCapabilities);
}

_server->setSubscribeHandler(std::bind(&FoxgloveBridge::subscribeHandler, this,
Expand All @@ -86,6 +93,13 @@ class FoxgloveBridge : public nodelet::Nodelet {
_server->start(address, static_cast<uint16_t>(port));

updateAdvertisedTopics(ros::TimerEvent());

if (_useSimTime) {
_clockSubscription = getMTNodeHandle().subscribe<rosgraph_msgs::Clock>(
"/clock", 10, [&](const rosgraph_msgs::Clock::ConstPtr msg) {
_server->broadcastTime(msg->clock.toNSec());
});
}
} catch (const std::exception& err) {
ROS_ERROR("Failed to start websocket server: %s", err.what());
// Rethrow exception such that the nodelet is unloaded.
Expand Down Expand Up @@ -329,14 +343,10 @@ class FoxgloveBridge : public nodelet::Nodelet {

std::unordered_set<TopicAndDatatype, PairHash> latestTopics;
latestTopics.reserve(topicNamesAndTypes.size());
bool hasClockTopic = false;
for (const auto& topicNameAndType : topicNamesAndTypes) {
const auto& topicName = topicNameAndType.name;
const auto& datatype = topicNameAndType.datatype;

// Check if a /clock topic is published
hasClockTopic = hasClockTopic || (topicName == "/clock" && datatype == "rosgraph_msgs/Clock");

// Ignore the topic if it is not on the topic whitelist
if (std::find_if(_topicWhitelistPatterns.begin(), _topicWhitelistPatterns.end(),
[&topicName](const auto& regex) {
Expand All @@ -352,20 +362,6 @@ class FoxgloveBridge : public nodelet::Nodelet {
numIgnoredTopics);
}

// Enable or disable simulated time based on the presence of a /clock topic
if (!_useSimTime && hasClockTopic) {
ROS_INFO("/clock topic found, using simulated time");
_useSimTime = true;
_clockSubscription = getMTNodeHandle().subscribe<rosgraph_msgs::Clock>(
"/clock", 10, [&](const rosgraph_msgs::Clock::ConstPtr msg) {
_simTimeNs = msg->clock.toNSec();
});
} else if (_useSimTime && !hasClockTopic) {
ROS_WARN("/clock topic disappeared");
_useSimTime = false;
_clockSubscription.shutdown();
}

// Create a list of topics that are new to us
std::vector<TopicAndDatatype> newTopics;
for (const auto& topic : latestTopics) {
Expand Down Expand Up @@ -475,7 +471,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
const foxglove::Channel& channel, foxglove::ConnHandle clientHandle,
const ros::MessageEvent<ros_babel_fish::BabelFishMessage const>& msgEvent) {
const auto& msg = msgEvent.getConstMessage();
const auto receiptTimeNs = _useSimTime ? _simTimeNs.load() : msgEvent.getReceiptTime().toNSec();
const auto receiptTimeNs = msgEvent.getReceiptTime().toNSec();
_server->sendMessage(
clientHandle, channel.id, receiptTimeNs,
std::string_view(reinterpret_cast<const char*>(msg->buffer()), msg->size()));
Expand All @@ -494,8 +490,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
size_t _maxUpdateMs = size_t(DEFAULT_MAX_UPDATE_MS);
size_t _updateCount = 0;
ros::Subscriber _clockSubscription;
std::atomic<uint64_t> _simTimeNs = 0;
std::atomic<bool> _useSimTime = false;
bool _useSimTime = false;
};

} // namespace foxglove_bridge
Expand Down
58 changes: 25 additions & 33 deletions ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#include <atomic>
#include <chrono>
#include <memory>
#include <regex>
Expand Down Expand Up @@ -116,14 +115,22 @@ class FoxgloveBridge : public rclcpp::Node {
const auto useTLS = this->get_parameter("tls").as_bool();
const auto certfile = this->get_parameter("certfile").as_string();
const auto keyfile = this->get_parameter("keyfile").as_string();
_useSimTime = this->get_parameter("use_sim_time").as_bool();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ROS 1 we retrieve /use_sim_time. I'm not familiar enough with ROS 2 params to know if this is correct or if we should be using the / prefix here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ROS 2 the use_sim_time is not global anymore but every node declares that paramter (automatically). From what I read, the main challenge is setting this parameter for all nodes when e.g. launching multiple nodes in a launch file

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I read, the main challenge is setting this parameter for all nodes when e.g. launching multiple nodes in a launch file

I happened to see this comment by mistake and want to share a tip; You can set the use_sim_time for all your Nodes inside a GroupAction in this way:

from launch.actions import GroupAction
from launch_ros.actions import SetParameter

GroupAction(
        actions=[
            SetParameter("use_sim_time", True),
            Node(...),
            Node(...)
       ]
)

const auto logHandler = std::bind(&FoxgloveBridge::logHandler, this, _1, _2);

std::vector<std::string> serverCapabilities = {
foxglove::CAPABILITY_CLIENT_PUBLISH,
};
if (_useSimTime) {
serverCapabilities.push_back(foxglove::CAPABILITY_TIME);
}

if (useTLS) {
_server = std::make_unique<foxglove::Server<foxglove::WebSocketTls>>(
"foxglove_bridge", std::move(logHandler), certfile, keyfile);
"foxglove_bridge", std::move(logHandler), serverCapabilities, certfile, keyfile);
} else {
_server = std::make_unique<foxglove::Server<foxglove::WebSocketNoTls>>("foxglove_bridge",
std::move(logHandler));
_server = std::make_unique<foxglove::Server<foxglove::WebSocketNoTls>>(
"foxglove_bridge", std::move(logHandler), serverCapabilities);
}
_server->setSubscribeHandler(std::bind(&FoxgloveBridge::subscribeHandler, this, _1, _2));
_server->setUnsubscribeHandler(std::bind(&FoxgloveBridge::unsubscribeHandler, this, _1, _2));
Expand Down Expand Up @@ -154,6 +161,16 @@ class FoxgloveBridge : public rclcpp::Node {

_clientPublishCallbackGroup =
this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

if (_useSimTime) {
_clockSubscription = this->create_subscription<rosgraph_msgs::msg::Clock>(
"/clock", rclcpp::QoS{rclcpp::KeepLast(1)}.best_effort(),
[&](std::shared_ptr<rosgraph_msgs::msg::Clock> msg) {
const auto timestamp = rclcpp::Time{msg->clock}.nanoseconds();
assert(timestamp >= 0 && "Timestamp is negative");
_server->broadcastTime(static_cast<uint64_t>(timestamp));
});
}
}

~FoxgloveBridge() {
Expand Down Expand Up @@ -192,16 +209,10 @@ class FoxgloveBridge : public rclcpp::Node {
auto topicNamesAndTypes = this->get_node_graph_interface()->get_topic_names_and_types();
std::unordered_set<TopicAndDatatype, PairHash> latestTopics;
latestTopics.reserve(topicNamesAndTypes.size());
bool hasClockTopic = false;
for (const auto& topicNamesAndType : topicNamesAndTypes) {
const auto& topicName = topicNamesAndType.first;
const auto& datatypes = topicNamesAndType.second;

// Check if a /clock topic is published
hasClockTopic = hasClockTopic || (topicName == "/clock" &&
std::find(datatypes.begin(), datatypes.end(),
"rosgraph_msgs/msg/Clock") != datatypes.end());

// Ignore the topic if it is not on the topic whitelist
if (std::find_if(_topicWhitelistPatterns.begin(), _topicWhitelistPatterns.end(),
[&topicName](const auto& regex) {
Expand All @@ -220,21 +231,6 @@ class FoxgloveBridge : public rclcpp::Node {
numIgnoredTopics);
}

// Enable or disable simulated time based on the presence of a /clock topic
if (!_useSimTime && hasClockTopic) {
RCLCPP_INFO(this->get_logger(), "/clock topic found, using simulated time");
_useSimTime = true;
_clockSubscription = this->create_subscription<rosgraph_msgs::msg::Clock>(
"/clock", rclcpp::QoS{rclcpp::KeepLast(1)}.best_effort(),
[&](std::shared_ptr<rosgraph_msgs::msg::Clock> msg) {
_simTimeNs = uint64_t(rclcpp::Time{msg->clock}.nanoseconds());
});
} else if (_useSimTime && !hasClockTopic) {
RCLCPP_WARN(this->get_logger(), "/clock topic disappeared");
_useSimTime = false;
_clockSubscription.reset();
}

// Create a list of topics that are new to us
std::vector<TopicAndDatatype> newTopics;
for (const auto& topic : latestTopics) {
Expand Down Expand Up @@ -343,8 +339,7 @@ class FoxgloveBridge : public rclcpp::Node {
std::unique_ptr<std::thread> _rosgraphPollThread;
size_t _maxQosDepth = DEFAULT_MAX_QOS_DEPTH;
std::shared_ptr<rclcpp::Subscription<rosgraph_msgs::msg::Clock>> _clockSubscription;
std::atomic<uint64_t> _simTimeNs = 0;
std::atomic<bool> _useSimTime = false;
bool _useSimTime = false;

void subscribeHandler(foxglove::ChannelId channelId, foxglove::ConnHandle clientHandle) {
std::lock_guard<std::mutex> lock(_subscriptionsMutex);
Expand Down Expand Up @@ -638,15 +633,12 @@ class FoxgloveBridge : public rclcpp::Node {
std::shared_ptr<rclcpp::SerializedMessage> msg) {
// NOTE: Do not call any RCLCPP_* logging functions from this function. Otherwise, subscribing
// to `/rosout` will cause a feedback loop
const auto timestamp = currentTimeNs();
const auto timestamp = this->now().nanoseconds();
assert(timestamp >= 0 && "Timestamp is negative");
const auto payload =
std::string_view{reinterpret_cast<const char*>(msg->get_rcl_serialized_message().buffer),
msg->get_rcl_serialized_message().buffer_length};
_server->sendMessage(clientHandle, channel.id, timestamp, payload);
}

uint64_t currentTimeNs() {
return _useSimTime ? _simTimeNs.load() : uint64_t(this->now().nanoseconds());
_server->sendMessage(clientHandle, channel.id, static_cast<uint64_t>(timestamp), payload);
}
};

Expand Down