From 406349bb8acc9c26ebc996b98dc1ff0a6465ec80 Mon Sep 17 00:00:00 2001 From: Hans-Joachim Krauch Date: Wed, 22 Feb 2023 19:48:55 +0000 Subject: [PATCH] Avoid re-advertising existing channels when advertising new channels --- .../foxglove_bridge/server_interface.hpp | 5 +- .../foxglove_bridge/websocket_server.hpp | 77 ++++---- .../src/ros1_foxglove_bridge_nodelet.cpp | 164 +++++++--------- .../src/ros2_foxglove_bridge.cpp | 179 ++++++++---------- 4 files changed, 197 insertions(+), 228 deletions(-) diff --git a/foxglove_bridge_base/include/foxglove_bridge/server_interface.hpp b/foxglove_bridge_base/include/foxglove_bridge/server_interface.hpp index 358b826..44797f1 100644 --- a/foxglove_bridge_base/include/foxglove_bridge/server_interface.hpp +++ b/foxglove_bridge_base/include/foxglove_bridge/server_interface.hpp @@ -51,9 +51,8 @@ class ServerInterface { virtual void start(const std::string& host, uint16_t port) = 0; virtual void stop() = 0; - virtual ChannelId addChannel(ChannelWithoutId channel) = 0; - virtual void removeChannel(ChannelId chanId) = 0; - virtual void broadcastChannels() = 0; + virtual std::vector addChannels(const std::vector& channels) = 0; + virtual void removeChannels(const std::vector& channelIds) = 0; virtual void publishParameterValues(ConnectionHandle clientHandle, const std::vector& parameters, const std::optional& requestId) = 0; diff --git a/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp b/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp index f72f5c1..1c610a8 100644 --- a/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp +++ b/foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp @@ -95,9 +95,8 @@ class Server final : public ServerInterface { void start(const std::string& host, uint16_t port) override; void stop() override; - ChannelId addChannel(ChannelWithoutId channel) override; - void removeChannel(ChannelId chanId) override; - void broadcastChannels() override; + std::vector addChannels(const std::vector& channels) override; + void removeChannels(const std::vector& channelIds) override; void publishParameterValues(ConnHandle clientHandle, const std::vector& parameters, const std::optional& requestId = std::nullopt) override; void updateParameterValues(const std::vector& parameters) override; @@ -804,46 +803,60 @@ inline void Server::handleBinaryMessage(ConnHandle hdl, con } template -inline ChannelId Server::addChannel(ChannelWithoutId channel) { - std::unique_lock lock(_channelsMutex); - const auto newId = ++_nextChannelId; - Channel newChannel{newId, std::move(channel)}; - _channels.emplace(newId, std::move(newChannel)); - return newId; -} +inline std::vector Server::addChannels( + const std::vector& channels) { + if (channels.empty()) { + return {}; + } -template -inline void Server::removeChannel(ChannelId chanId) { - std::unique_lock channelsLock(_channelsMutex); - std::unique_lock clientsLock(_clientsMutex); - _channels.erase(chanId); - for (auto& [hdl, clientInfo] : _clients) { - if (const auto it = clientInfo.subscriptionsByChannel.find(chanId); - it != clientInfo.subscriptionsByChannel.end()) { - clientInfo.subscriptionsByChannel.erase(it); + std::vector channelIds; + channelIds.reserve(channels.size()); + json::array_t channelsJson; + + { + std::unique_lock lock(_channelsMutex); + for (const auto& channelWithoutId : channels) { + const auto newId = ++_nextChannelId; + channelIds.push_back(newId); + Channel newChannel{newId, channelWithoutId}; + channelsJson.push_back(newChannel); + _channels.emplace(newId, std::move(newChannel)); } - sendJson(hdl, {{"op", "unadvertise"}, {"channelIds", {chanId}}}); } -} -template -inline void Server::broadcastChannels() { + const auto msg = json{{"op", "advertise"}, {"channels", channelsJson}}.dump(); std::shared_lock clientsLock(_clientsMutex); - std::shared_lock channelsLock(_channelsMutex); + for (const auto& [hdl, clientInfo] : _clients) { + (void)clientInfo; + sendJsonRaw(hdl, msg); + } + + return channelIds; +} - if (_clients.empty()) { +template +inline void Server::removeChannels(const std::vector& channelIds) { + if (channelIds.empty()) { return; } - json channels; - for (const auto& [id, channel] : _channels) { - (void)id; - channels.push_back(channel); + { + std::unique_lock channelsLock(_channelsMutex); + for (auto channelId : channelIds) { + _channels.erase(channelId); + } } - std::string msg = json{{"op", "advertise"}, {"channels", std::move(channels)}}.dump(); - for (const auto& [hdl, clientInfo] : _clients) { - (void)clientInfo; + const auto msg = json{{"op", "unadvertise"}, {"channelIds", channelIds}}.dump(); + + std::unique_lock clientsLock(_clientsMutex); + for (auto& [hdl, clientInfo] : _clients) { + for (auto channelId : channelIds) { + if (const auto it = clientInfo.subscriptionsByChannel.find(channelId); + it != clientInfo.subscriptionsByChannel.end()) { + clientInfo.subscriptionsByChannel.erase(it); + } + } sendJsonRaw(hdl, msg); } } diff --git a/ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp b/ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp index 21babe2..6e778a4 100644 --- a/ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp +++ b/ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp @@ -233,21 +233,15 @@ class FoxgloveBridge : public nodelet::Nodelet { void subscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle) { std::lock_guard lock(_subscriptionsMutex); - auto it = _channelToTopicAndDatatype.find(channelId); - if (it == _channelToTopicAndDatatype.end()) { + auto it = _advertisedTopics.find(channelId); + if (it == _advertisedTopics.end()) { ROS_WARN("Received subscribe request for unknown channel %d", channelId); return; } - auto& topicAndDatatype = it->second; - auto topic = topicAndDatatype.first; - auto datatype = topicAndDatatype.second; - auto it2 = _advertisedTopics.find(topicAndDatatype); - if (it2 == _advertisedTopics.end()) { - ROS_ERROR("Channel %d for topic \"%s\" (%s) is not advertised", channelId, topic.c_str(), - datatype.c_str()); - return; - } - const auto& channel = it2->second; + + const auto& channel = it->second; + const auto& topic = channel.topic; + const auto& datatype = channel.schemaName; // Get client subscriptions for this channel or insert an empty map. auto [subscriptionsIt, firstSubscription] = @@ -264,7 +258,7 @@ class FoxgloveBridge : public nodelet::Nodelet { subscriptionsByClient.emplace( clientHandle, getMTNodeHandle().subscribe( topic, SUBSCRIPTION_QUEUE_LENGTH, - std::bind(&FoxgloveBridge::rosMessageHandler, this, channel, clientHandle, + std::bind(&FoxgloveBridge::rosMessageHandler, this, channelId, clientHandle, std::placeholders::_1))); if (firstSubscription) { ROS_INFO("Subscribed to topic \"%s\" (%s) on channel %d", topic.c_str(), datatype.c_str(), @@ -283,17 +277,12 @@ class FoxgloveBridge : public nodelet::Nodelet { void unsubscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle) { std::lock_guard lock(_subscriptionsMutex); - auto it = _channelToTopicAndDatatype.find(channelId); - TopicAndDatatype topicAndDatatype = - it != _channelToTopicAndDatatype.end() - ? it->second - : std::make_pair("[Unknown]", "[Unknown]"); - - auto it2 = _subscriptions.find(channelId); - if (it2 == _subscriptions.end()) { + const auto channelIt = _advertisedTopics.find(channelId); + if (channelIt == _advertisedTopics.end()) { ROS_WARN("Received unsubscribe request for unknown channel %d", channelId); return; } + const auto& channel = channelIt->second; auto subscriptionsIt = _subscriptions.find(channelId); if (subscriptionsIt == _subscriptions.end()) { @@ -313,9 +302,9 @@ class FoxgloveBridge : public nodelet::Nodelet { subscriptionsByClient.erase(clientSubscription); if (subscriptionsByClient.empty()) { - ROS_INFO("Unsubscribing from topic \"%s\" (%s) on channel %d", topicAndDatatype.first.c_str(), - topicAndDatatype.second.c_str(), channelId); - _subscriptions.erase(it2); + ROS_INFO("Unsubscribing from topic \"%s\" (%s) on channel %d", channel.topic.c_str(), + channel.schemaName.c_str(), channelId); + _subscriptions.erase(subscriptionsIt); } else { ROS_INFO("Removed one subscription from channel %d (%zu subscription(s) left)", channelId, subscriptionsByClient.size()); @@ -480,80 +469,70 @@ class FoxgloveBridge : public nodelet::Nodelet { numIgnoredTopics); } - // Create a list of topics that are new to us - std::vector newTopics; - for (const auto& topic : latestTopics) { - if (_advertisedTopics.find(topic) == _advertisedTopics.end()) { - newTopics.push_back(topic); - } - } + std::lock_guard lock(_subscriptionsMutex); - // Create a list of topics that have been removed - std::vector removedTopics; - for (const auto& [topic, channel] : _advertisedTopics) { - (void)channel; - if (latestTopics.find(topic) == latestTopics.end()) { - removedTopics.push_back(topic); + // Remove channels for which the topic does not exist anymore + std::vector channelIdsToRemove; + for (auto channelIt = _advertisedTopics.begin(); channelIt != _advertisedTopics.end();) { + const TopicAndDatatype topicAndDatatype = {channelIt->second.topic, + channelIt->second.schemaName}; + if (latestTopics.find(topicAndDatatype) == latestTopics.end()) { + const auto channelId = channelIt->first; + channelIdsToRemove.push_back(channelId); + _subscriptions.erase(channelId); + ROS_DEBUG("Removed channel %d for topic \"%s\" (%s)", channelId, + topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str()); + channelIt = _advertisedTopics.erase(channelIt); + } else { + channelIt++; } } - - // Remove advertisements for topics that have been removed - { - std::lock_guard lock(_subscriptionsMutex); - for (const auto& topicAndDatatype : removedTopics) { - auto& channel = _advertisedTopics.at(topicAndDatatype); - - // Stop tracking this channel in the WebSocket server - _server->removeChannel(channel.id); - - // Remove the subscription for this topic, if any - _subscriptions.erase(channel.id); - - // Remove this topic+datatype tuple - _channelToTopicAndDatatype.erase(channel.id); - _advertisedTopics.erase(topicAndDatatype); - - ROS_DEBUG("Removed channel %d for topic \"%s\" (%s)", channel.id, - topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str()); + _server->removeChannels(channelIdsToRemove); + + // Add new channels for new topics + std::vector channelsToAdd; + for (const auto& topicAndDatatype : latestTopics) { + if (std::find_if(_advertisedTopics.begin(), _advertisedTopics.end(), + [topicAndDatatype](const auto& channelIdAndChannel) { + const auto& channel = channelIdAndChannel.second; + return channel.topic == topicAndDatatype.first && + channel.schemaName == topicAndDatatype.second; + }) != _advertisedTopics.end()) { + continue; // Topic already advertised } - // Advertise new topics - for (const auto& topicAndDatatype : newTopics) { - foxglove::ChannelWithoutId newChannel{}; - newChannel.topic = topicAndDatatype.first; - newChannel.schemaName = topicAndDatatype.second; - newChannel.encoding = ROS1_CHANNEL_ENCODING; - - try { - const auto msgDescription = - _rosTypeInfoProvider.getMessageDescription(topicAndDatatype.second); - if (msgDescription) { - newChannel.schema = msgDescription->message_definition; - } else { - ROS_WARN("Could not find definition for type %s", topicAndDatatype.second.c_str()); - - // We still advertise the channel, but with an emtpy schema - newChannel.schema = ""; - } - } catch (const std::exception& err) { - ROS_WARN("Failed to add channel for topic \"%s\" (%s): %s", - topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), err.what()); - continue; - } - - auto channel = foxglove::Channel{_server->addChannel(newChannel), newChannel}; - ROS_DEBUG("Advertising channel %d for topic \"%s\" (%s)", channel.id, channel.topic.c_str(), - channel.schemaName.c_str()); + foxglove::ChannelWithoutId newChannel{}; + newChannel.topic = topicAndDatatype.first; + newChannel.schemaName = topicAndDatatype.second; + newChannel.encoding = ROS1_CHANNEL_ENCODING; - // Add a mapping from the topic+datatype tuple to the channel, and channel ID to the - // topic+datatype tuple - _advertisedTopics.emplace(topicAndDatatype, std::move(channel)); - _channelToTopicAndDatatype.emplace(channel.id, topicAndDatatype); + try { + const auto msgDescription = + _rosTypeInfoProvider.getMessageDescription(topicAndDatatype.second); + if (msgDescription) { + newChannel.schema = msgDescription->message_definition; + } else { + ROS_WARN("Could not find definition for type %s", topicAndDatatype.second.c_str()); + + // We still advertise the channel, but with an emtpy schema + newChannel.schema = ""; + } + } catch (const std::exception& err) { + ROS_WARN("Failed to add channel for topic \"%s\" (%s): %s", topicAndDatatype.first.c_str(), + topicAndDatatype.second.c_str(), err.what()); + continue; } + + channelsToAdd.push_back(newChannel); } - if (newTopics.size() > 0) { - _server->broadcastChannels(); + const auto channelIds = _server->addChannels(channelsToAdd); + for (size_t i = 0; i < channelsToAdd.size(); ++i) { + const auto channelId = channelIds[i]; + const auto& channel = channelsToAdd[i]; + _advertisedTopics.emplace(channelId, channel); + ROS_DEBUG("Advertising channel %d for topic \"%s\" (%s)", channelId, channel.topic.c_str(), + channel.schemaName.c_str()); } } @@ -774,11 +753,11 @@ class FoxgloveBridge : public nodelet::Nodelet { } void rosMessageHandler( - const foxglove::Channel& channel, ConnectionHandle clientHandle, + const foxglove::ChannelId channelId, ConnectionHandle clientHandle, const ros::MessageEvent& msgEvent) { const auto& msg = msgEvent.getConstMessage(); const auto receiptTimeNs = msgEvent.getReceiptTime().toNSec(); - _server->sendMessage(clientHandle, channel.id, receiptTimeNs, msg->buffer(), msg->size()); + _server->sendMessage(clientHandle, channelId, receiptTimeNs, msg->buffer(), msg->size()); } void serviceRequest(const foxglove::ServiceRequest& request, ConnectionHandle clientHandle) { @@ -830,8 +809,7 @@ class FoxgloveBridge : public nodelet::Nodelet { std::vector _paramWhitelistPatterns; std::vector _serviceWhitelistPatterns; ros::XMLRPCManager xmlrpcServer; - std::unordered_map _advertisedTopics; - std::unordered_map _channelToTopicAndDatatype; + std::unordered_map _advertisedTopics; std::unordered_map _subscriptions; std::unordered_map _advertisedServices; PublicationsByClient _clientAdvertisedTopics; diff --git a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp index 0d1fd80..c5f80d2 100644 --- a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp +++ b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp @@ -190,89 +190,80 @@ class FoxgloveBridge : public rclcpp::Node { numIgnoredTopics); } - // Create a list of topics that are new to us - std::vector newTopics; - for (const auto& topic : latestTopics) { - if (_advertisedTopics.find(topic) == _advertisedTopics.end()) { - newTopics.push_back(topic); - } - } + std::lock_guard lock(_subscriptionsMutex); - // Create a list of topics that have been removed - std::vector removedTopics; - for (const auto& [topic, channel] : _advertisedTopics) { - if (latestTopics.find(topic) == latestTopics.end()) { - removedTopics.push_back(topic); + // Remove channels for which the topic does not exist anymore + std::vector channelIdsToRemove; + for (auto channelIt = _advertisedTopics.begin(); channelIt != _advertisedTopics.end();) { + const TopicAndDatatype topicAndDatatype = {channelIt->second.topic, + channelIt->second.schemaName}; + if (latestTopics.find(topicAndDatatype) == latestTopics.end()) { + const auto channelId = channelIt->first; + channelIdsToRemove.push_back(channelId); + _subscriptions.erase(channelId); + RCLCPP_INFO(this->get_logger(), "Removed channel %d for topic \"%s\" (%s)", channelId, + topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str()); + channelIt = _advertisedTopics.erase(channelIt); + } else { + channelIt++; } } - - // Remove advertisements for topics that have been removed - { - std::lock_guard lock(_subscriptionsMutex); - for (const auto& topicAndDatatype : removedTopics) { - auto& channel = _advertisedTopics.at(topicAndDatatype); - - // Stop tracking this channel in the WebSocket server - _server->removeChannel(channel.id); - - // Remove the subscription for this topic, if any - _subscriptions.erase(channel.id); - - // Remove this topic+datatype tuple - _channelToTopicAndDatatype.erase(channel.id); - _advertisedTopics.erase(topicAndDatatype); - - RCLCPP_INFO(this->get_logger(), "Removed channel %d for topic \"%s\" (%s)", channel.id, - topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str()); + _server->removeChannels(channelIdsToRemove); + + // Add new channels for new topics + std::vector channelsToAdd; + for (const auto& topicAndDatatype : latestTopics) { + if (std::find_if(_advertisedTopics.begin(), _advertisedTopics.end(), + [topicAndDatatype](const auto& channelIdAndChannel) { + const auto& channel = channelIdAndChannel.second; + return channel.topic == topicAndDatatype.first && + channel.schemaName == topicAndDatatype.second; + }) != _advertisedTopics.end()) { + continue; // Topic already advertised } - // Advertise new topics - for (const auto& topicAndDatatype : newTopics) { - foxglove::ChannelWithoutId newChannel{}; - newChannel.topic = topicAndDatatype.first; - newChannel.schemaName = topicAndDatatype.second; - - try { - auto [format, schema] = _messageDefinitionCache.get_full_text(topicAndDatatype.second); - switch (format) { - case foxglove::MessageDefinitionFormat::MSG: - newChannel.encoding = "cdr"; - newChannel.schema = schema; - break; - case foxglove::MessageDefinitionFormat::IDL: - RCLCPP_WARN(this->get_logger(), - "IDL message definition format cannot be communicated over ws-protocol. " - "Topic \"%s\" (%s) may not decode correctly in clients", - topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str()); - newChannel.encoding = "cdr"; - newChannel.schema = schema; - break; - } - - } catch (const foxglove::DefinitionNotFoundError& err) { - RCLCPP_WARN(this->get_logger(), "Could not find definition for type %s: %s", - topicAndDatatype.second.c_str(), err.what()); - // We still advertise the channel, but with an emtpy schema - newChannel.schema = ""; - } catch (const std::exception& err) { - RCLCPP_WARN(this->get_logger(), "Failed to add channel for topic \"%s\" (%s): %s", - topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), err.what()); - continue; - } + foxglove::ChannelWithoutId newChannel{}; + newChannel.topic = topicAndDatatype.first; + newChannel.schemaName = topicAndDatatype.second; - auto channel = foxglove::Channel{_server->addChannel(newChannel), newChannel}; - RCLCPP_DEBUG(this->get_logger(), "Advertising channel %d for topic \"%s\" (%s)", channel.id, - channel.topic.c_str(), channel.schemaName.c_str()); + try { + auto [format, schema] = _messageDefinitionCache.get_full_text(topicAndDatatype.second); + switch (format) { + case foxglove::MessageDefinitionFormat::MSG: + newChannel.encoding = "cdr"; + newChannel.schema = schema; + break; + case foxglove::MessageDefinitionFormat::IDL: + RCLCPP_WARN(this->get_logger(), + "IDL message definition format cannot be communicated over ws-protocol. " + "Topic \"%s\" (%s) may not decode correctly in clients", + topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str()); + newChannel.encoding = "cdr"; + newChannel.schema = schema; + break; + } - // Add a mapping from the topic+datatype tuple to the channel, and channel ID to the - // topic+datatype tuple - _advertisedTopics.emplace(topicAndDatatype, std::move(channel)); - _channelToTopicAndDatatype.emplace(channel.id, topicAndDatatype); + } catch (const foxglove::DefinitionNotFoundError& err) { + RCLCPP_WARN(this->get_logger(), "Could not find definition for type %s: %s", + topicAndDatatype.second.c_str(), err.what()); + // We still advertise the channel, but with an emtpy schema + newChannel.schema = ""; + } catch (const std::exception& err) { + RCLCPP_WARN(this->get_logger(), "Failed to add channel for topic \"%s\" (%s): %s", + topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), err.what()); + continue; } + + channelsToAdd.push_back(newChannel); } - if (newTopics.size() > 0) { - _server->broadcastChannels(); + const auto channelIds = _server->addChannels(channelsToAdd); + for (size_t i = 0; i < channelsToAdd.size(); ++i) { + const auto channelId = channelIds[i]; + const auto& channel = channelsToAdd[i]; + _advertisedTopics.emplace(channelId, channel); + RCLCPP_DEBUG(this->get_logger(), "Advertising channel %d for topic \"%s\" (%s)", channelId, + channel.topic.c_str(), channel.schemaName.c_str()); } } @@ -381,9 +372,8 @@ class FoxgloveBridge : public rclcpp::Node { std::vector _topicWhitelistPatterns; std::vector _serviceWhitelistPatterns; std::shared_ptr _paramInterface; - std::unordered_map _advertisedTopics; + std::unordered_map _advertisedTopics; std::unordered_map _advertisedServices; - std::unordered_map _channelToTopicAndDatatype; std::unordered_map _subscriptions; PublicationsByClient _clientAdvertisedTopics; std::unordered_map _serviceClients; @@ -448,22 +438,16 @@ class FoxgloveBridge : public rclcpp::Node { void subscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle) { std::lock_guard lock(_subscriptionsMutex); - auto it = _channelToTopicAndDatatype.find(channelId); - if (it == _channelToTopicAndDatatype.end()) { + auto it = _advertisedTopics.find(channelId); + if (it == _advertisedTopics.end()) { RCLCPP_WARN(this->get_logger(), "Received subscribe request for unknown channel %d", channelId); return; } - auto& topicAndDatatype = it->second; - auto topic = topicAndDatatype.first; - auto datatype = topicAndDatatype.second; - auto it2 = _advertisedTopics.find(topicAndDatatype); - if (it2 == _advertisedTopics.end()) { - RCLCPP_ERROR(this->get_logger(), "Channel %d for topic \"%s\" (%s) is not advertised", - channelId, topic.c_str(), datatype.c_str()); - return; - } - const auto& channel = it2->second; + + const auto& channel = it->second; + const auto& topic = channel.topic; + const auto& datatype = channel.schemaName; // Get client subscriptions for this channel or insert an empty map. auto [subscriptionsIt, firstSubscription] = @@ -547,7 +531,7 @@ class FoxgloveBridge : public rclcpp::Node { try { auto subscriber = this->create_generic_subscription( topic, datatype, qos, - std::bind(&FoxgloveBridge::rosMessageHandler, this, channel, clientHandle, _1), + std::bind(&FoxgloveBridge::rosMessageHandler, this, channelId, clientHandle, _1), subscriptionOptions); subscriptionsByClient.emplace(clientHandle, std::move(subscriber)); } catch (const std::exception& ex) { @@ -559,18 +543,13 @@ class FoxgloveBridge : public rclcpp::Node { void unsubscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle) { std::lock_guard lock(_subscriptionsMutex); - auto it = _channelToTopicAndDatatype.find(channelId); - TopicAndDatatype topicAndDatatype = - it != _channelToTopicAndDatatype.end() - ? it->second - : std::make_pair("[Unknown]", "[Unknown]"); - - auto it2 = _subscriptions.find(channelId); - if (it2 == _subscriptions.end()) { + const auto channelIt = _advertisedTopics.find(channelId); + if (channelIt == _advertisedTopics.end()) { RCLCPP_WARN(this->get_logger(), "Received unsubscribe request for unknown channel %d", channelId); return; } + const auto& channel = channelIt->second; auto subscriptionsIt = _subscriptions.find(channelId); if (subscriptionsIt == _subscriptions.end()) { @@ -594,8 +573,8 @@ class FoxgloveBridge : public rclcpp::Node { subscriptionsByClient.erase(clientSubscription); if (subscriptionsByClient.empty()) { RCLCPP_INFO(this->get_logger(), "Unsubscribing from topic \"%s\" (%s) on channel %d", - topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), channelId); - _subscriptions.erase(it2); + channel.topic.c_str(), channel.schemaName.c_str(), channelId); + _subscriptions.erase(subscriptionsIt); } else { RCLCPP_INFO(this->get_logger(), "Removed one subscription from channel %d (%zu subscription(s) left)", channelId, @@ -767,14 +746,14 @@ class FoxgloveBridge : public rclcpp::Node { } } - void rosMessageHandler(const foxglove::Channel& channel, ConnectionHandle clientHandle, + void rosMessageHandler(const foxglove::ChannelId& channelId, ConnectionHandle clientHandle, std::shared_ptr msg) { // NOTE: Do not call any RCLCPP_* logging functions from this function. Otherwise, subscribing // to `/rosout` will cause a feedback loop const auto timestamp = this->now().nanoseconds(); assert(timestamp >= 0 && "Timestamp is negative"); const auto rclSerializedMsg = msg->get_rcl_serialized_message(); - _server->sendMessage(clientHandle, channel.id, static_cast(timestamp), + _server->sendMessage(clientHandle, channelId, static_cast(timestamp), rclSerializedMsg.buffer, rclSerializedMsg.buffer_length); }