Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid re-advertising existing channels when advertising new channels #172

Merged
merged 1 commit into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ class ServerInterface {
virtual void start(const std::string& host, uint16_t port) = 0;
virtual void stop() = 0;

virtual ChannelId addChannel(ChannelWithoutId channel) = 0;
virtual void removeChannel(ChannelId chanId) = 0;
virtual void broadcastChannels() = 0;
virtual std::vector<ChannelId> addChannels(const std::vector<ChannelWithoutId>& channels) = 0;
virtual void removeChannels(const std::vector<ChannelId>& channelIds) = 0;
virtual void publishParameterValues(ConnectionHandle clientHandle,
const std::vector<Parameter>& parameters,
const std::optional<std::string>& requestId) = 0;
Expand Down
77 changes: 45 additions & 32 deletions foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ class Server final : public ServerInterface<ConnHandle> {
void start(const std::string& host, uint16_t port) override;
void stop() override;

ChannelId addChannel(ChannelWithoutId channel) override;
void removeChannel(ChannelId chanId) override;
void broadcastChannels() override;
std::vector<ChannelId> addChannels(const std::vector<ChannelWithoutId>& channels) override;
void removeChannels(const std::vector<ChannelId>& channelIds) override;
void publishParameterValues(ConnHandle clientHandle, const std::vector<Parameter>& parameters,
const std::optional<std::string>& requestId = std::nullopt) override;
void updateParameterValues(const std::vector<Parameter>& parameters) override;
Expand Down Expand Up @@ -804,46 +803,60 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, con
}

template <typename ServerConfiguration>
inline ChannelId Server<ServerConfiguration>::addChannel(ChannelWithoutId channel) {
std::unique_lock<std::shared_mutex> lock(_channelsMutex);
const auto newId = ++_nextChannelId;
Channel newChannel{newId, std::move(channel)};
_channels.emplace(newId, std::move(newChannel));
return newId;
}
inline std::vector<ChannelId> Server<ServerConfiguration>::addChannels(
const std::vector<ChannelWithoutId>& channels) {
if (channels.empty()) {
return {};
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::removeChannel(ChannelId chanId) {
std::unique_lock<std::shared_mutex> channelsLock(_channelsMutex);
std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
_channels.erase(chanId);
for (auto& [hdl, clientInfo] : _clients) {
if (const auto it = clientInfo.subscriptionsByChannel.find(chanId);
it != clientInfo.subscriptionsByChannel.end()) {
clientInfo.subscriptionsByChannel.erase(it);
std::vector<ChannelId> channelIds;
channelIds.reserve(channels.size());
json::array_t channelsJson;

{
std::unique_lock<std::shared_mutex> lock(_channelsMutex);
for (const auto& channelWithoutId : channels) {
const auto newId = ++_nextChannelId;
channelIds.push_back(newId);
Channel newChannel{newId, channelWithoutId};
channelsJson.push_back(newChannel);
_channels.emplace(newId, std::move(newChannel));
}
sendJson(hdl, {{"op", "unadvertise"}, {"channelIds", {chanId}}});
}
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::broadcastChannels() {
const auto msg = json{{"op", "advertise"}, {"channels", channelsJson}}.dump();
std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
std::shared_lock<std::shared_mutex> channelsLock(_channelsMutex);
for (const auto& [hdl, clientInfo] : _clients) {
(void)clientInfo;
sendJsonRaw(hdl, msg);
}

return channelIds;
}

if (_clients.empty()) {
template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::removeChannels(const std::vector<ChannelId>& channelIds) {
if (channelIds.empty()) {
return;
}

json channels;
for (const auto& [id, channel] : _channels) {
(void)id;
channels.push_back(channel);
{
std::unique_lock<std::shared_mutex> channelsLock(_channelsMutex);
for (auto channelId : channelIds) {
_channels.erase(channelId);
}
}
std::string msg = json{{"op", "advertise"}, {"channels", std::move(channels)}}.dump();

for (const auto& [hdl, clientInfo] : _clients) {
(void)clientInfo;
const auto msg = json{{"op", "unadvertise"}, {"channelIds", channelIds}}.dump();

std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
for (auto& [hdl, clientInfo] : _clients) {
for (auto channelId : channelIds) {
if (const auto it = clientInfo.subscriptionsByChannel.find(channelId);
it != clientInfo.subscriptionsByChannel.end()) {
clientInfo.subscriptionsByChannel.erase(it);
}
}
sendJsonRaw(hdl, msg);
}
}
Expand Down
164 changes: 71 additions & 93 deletions ros1_foxglove_bridge/src/ros1_foxglove_bridge_nodelet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,21 +233,15 @@ class FoxgloveBridge : public nodelet::Nodelet {
void subscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle) {
std::lock_guard<std::mutex> lock(_subscriptionsMutex);

auto it = _channelToTopicAndDatatype.find(channelId);
if (it == _channelToTopicAndDatatype.end()) {
auto it = _advertisedTopics.find(channelId);
if (it == _advertisedTopics.end()) {
ROS_WARN("Received subscribe request for unknown channel %d", channelId);
return;
}
auto& topicAndDatatype = it->second;
auto topic = topicAndDatatype.first;
auto datatype = topicAndDatatype.second;
auto it2 = _advertisedTopics.find(topicAndDatatype);
if (it2 == _advertisedTopics.end()) {
ROS_ERROR("Channel %d for topic \"%s\" (%s) is not advertised", channelId, topic.c_str(),
datatype.c_str());
return;
}
const auto& channel = it2->second;

const auto& channel = it->second;
const auto& topic = channel.topic;
const auto& datatype = channel.schemaName;

// Get client subscriptions for this channel or insert an empty map.
auto [subscriptionsIt, firstSubscription] =
Expand All @@ -264,7 +258,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
subscriptionsByClient.emplace(
clientHandle, getMTNodeHandle().subscribe<ros_babel_fish::BabelFishMessage>(
topic, SUBSCRIPTION_QUEUE_LENGTH,
std::bind(&FoxgloveBridge::rosMessageHandler, this, channel, clientHandle,
std::bind(&FoxgloveBridge::rosMessageHandler, this, channelId, clientHandle,
std::placeholders::_1)));
if (firstSubscription) {
ROS_INFO("Subscribed to topic \"%s\" (%s) on channel %d", topic.c_str(), datatype.c_str(),
Expand All @@ -283,17 +277,12 @@ class FoxgloveBridge : public nodelet::Nodelet {
void unsubscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle) {
std::lock_guard<std::mutex> lock(_subscriptionsMutex);

auto it = _channelToTopicAndDatatype.find(channelId);
TopicAndDatatype topicAndDatatype =
it != _channelToTopicAndDatatype.end()
? it->second
: std::make_pair<std::string, std::string>("[Unknown]", "[Unknown]");

auto it2 = _subscriptions.find(channelId);
if (it2 == _subscriptions.end()) {
const auto channelIt = _advertisedTopics.find(channelId);
if (channelIt == _advertisedTopics.end()) {
ROS_WARN("Received unsubscribe request for unknown channel %d", channelId);
return;
}
const auto& channel = channelIt->second;

auto subscriptionsIt = _subscriptions.find(channelId);
if (subscriptionsIt == _subscriptions.end()) {
Expand All @@ -313,9 +302,9 @@ class FoxgloveBridge : public nodelet::Nodelet {

subscriptionsByClient.erase(clientSubscription);
if (subscriptionsByClient.empty()) {
ROS_INFO("Unsubscribing from topic \"%s\" (%s) on channel %d", topicAndDatatype.first.c_str(),
topicAndDatatype.second.c_str(), channelId);
_subscriptions.erase(it2);
ROS_INFO("Unsubscribing from topic \"%s\" (%s) on channel %d", channel.topic.c_str(),
channel.schemaName.c_str(), channelId);
_subscriptions.erase(subscriptionsIt);
} else {
ROS_INFO("Removed one subscription from channel %d (%zu subscription(s) left)", channelId,
subscriptionsByClient.size());
Expand Down Expand Up @@ -480,80 +469,70 @@ class FoxgloveBridge : public nodelet::Nodelet {
numIgnoredTopics);
}

// Create a list of topics that are new to us
std::vector<TopicAndDatatype> newTopics;
for (const auto& topic : latestTopics) {
if (_advertisedTopics.find(topic) == _advertisedTopics.end()) {
newTopics.push_back(topic);
}
}
std::lock_guard<std::mutex> lock(_subscriptionsMutex);

// Create a list of topics that have been removed
std::vector<TopicAndDatatype> removedTopics;
for (const auto& [topic, channel] : _advertisedTopics) {
(void)channel;
if (latestTopics.find(topic) == latestTopics.end()) {
removedTopics.push_back(topic);
// Remove channels for which the topic does not exist anymore
std::vector<foxglove::ChannelId> channelIdsToRemove;
for (auto channelIt = _advertisedTopics.begin(); channelIt != _advertisedTopics.end();) {
const TopicAndDatatype topicAndDatatype = {channelIt->second.topic,
channelIt->second.schemaName};
if (latestTopics.find(topicAndDatatype) == latestTopics.end()) {
const auto channelId = channelIt->first;
channelIdsToRemove.push_back(channelId);
_subscriptions.erase(channelId);
ROS_DEBUG("Removed channel %d for topic \"%s\" (%s)", channelId,
topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str());
channelIt = _advertisedTopics.erase(channelIt);
} else {
channelIt++;
}
}

// Remove advertisements for topics that have been removed
{
std::lock_guard<std::mutex> lock(_subscriptionsMutex);
for (const auto& topicAndDatatype : removedTopics) {
auto& channel = _advertisedTopics.at(topicAndDatatype);

// Stop tracking this channel in the WebSocket server
_server->removeChannel(channel.id);

// Remove the subscription for this topic, if any
_subscriptions.erase(channel.id);

// Remove this topic+datatype tuple
_channelToTopicAndDatatype.erase(channel.id);
_advertisedTopics.erase(topicAndDatatype);

ROS_DEBUG("Removed channel %d for topic \"%s\" (%s)", channel.id,
topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str());
_server->removeChannels(channelIdsToRemove);

// Add new channels for new topics
std::vector<foxglove::ChannelWithoutId> channelsToAdd;
for (const auto& topicAndDatatype : latestTopics) {
if (std::find_if(_advertisedTopics.begin(), _advertisedTopics.end(),
[topicAndDatatype](const auto& channelIdAndChannel) {
const auto& channel = channelIdAndChannel.second;
return channel.topic == topicAndDatatype.first &&
channel.schemaName == topicAndDatatype.second;
}) != _advertisedTopics.end()) {
continue; // Topic already advertised
}

// Advertise new topics
for (const auto& topicAndDatatype : newTopics) {
foxglove::ChannelWithoutId newChannel{};
newChannel.topic = topicAndDatatype.first;
newChannel.schemaName = topicAndDatatype.second;
newChannel.encoding = ROS1_CHANNEL_ENCODING;

try {
const auto msgDescription =
_rosTypeInfoProvider.getMessageDescription(topicAndDatatype.second);
if (msgDescription) {
newChannel.schema = msgDescription->message_definition;
} else {
ROS_WARN("Could not find definition for type %s", topicAndDatatype.second.c_str());

// We still advertise the channel, but with an emtpy schema
newChannel.schema = "";
}
} catch (const std::exception& err) {
ROS_WARN("Failed to add channel for topic \"%s\" (%s): %s",
topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), err.what());
continue;
}

auto channel = foxglove::Channel{_server->addChannel(newChannel), newChannel};
ROS_DEBUG("Advertising channel %d for topic \"%s\" (%s)", channel.id, channel.topic.c_str(),
channel.schemaName.c_str());
foxglove::ChannelWithoutId newChannel{};
newChannel.topic = topicAndDatatype.first;
newChannel.schemaName = topicAndDatatype.second;
newChannel.encoding = ROS1_CHANNEL_ENCODING;

// Add a mapping from the topic+datatype tuple to the channel, and channel ID to the
// topic+datatype tuple
_advertisedTopics.emplace(topicAndDatatype, std::move(channel));
_channelToTopicAndDatatype.emplace(channel.id, topicAndDatatype);
try {
const auto msgDescription =
_rosTypeInfoProvider.getMessageDescription(topicAndDatatype.second);
if (msgDescription) {
newChannel.schema = msgDescription->message_definition;
} else {
ROS_WARN("Could not find definition for type %s", topicAndDatatype.second.c_str());

// We still advertise the channel, but with an emtpy schema
newChannel.schema = "";
}
} catch (const std::exception& err) {
ROS_WARN("Failed to add channel for topic \"%s\" (%s): %s", topicAndDatatype.first.c_str(),
topicAndDatatype.second.c_str(), err.what());
continue;
}

channelsToAdd.push_back(newChannel);
}

if (newTopics.size() > 0) {
_server->broadcastChannels();
const auto channelIds = _server->addChannels(channelsToAdd);
for (size_t i = 0; i < channelsToAdd.size(); ++i) {
const auto channelId = channelIds[i];
const auto& channel = channelsToAdd[i];
_advertisedTopics.emplace(channelId, channel);
ROS_DEBUG("Advertising channel %d for topic \"%s\" (%s)", channelId, channel.topic.c_str(),
channel.schemaName.c_str());
}
}

Expand Down Expand Up @@ -774,11 +753,11 @@ class FoxgloveBridge : public nodelet::Nodelet {
}

void rosMessageHandler(
const foxglove::Channel& channel, ConnectionHandle clientHandle,
const foxglove::ChannelId channelId, ConnectionHandle clientHandle,
const ros::MessageEvent<ros_babel_fish::BabelFishMessage const>& msgEvent) {
const auto& msg = msgEvent.getConstMessage();
const auto receiptTimeNs = msgEvent.getReceiptTime().toNSec();
_server->sendMessage(clientHandle, channel.id, receiptTimeNs, msg->buffer(), msg->size());
_server->sendMessage(clientHandle, channelId, receiptTimeNs, msg->buffer(), msg->size());
}

void serviceRequest(const foxglove::ServiceRequest& request, ConnectionHandle clientHandle) {
Expand Down Expand Up @@ -830,8 +809,7 @@ class FoxgloveBridge : public nodelet::Nodelet {
std::vector<std::regex> _paramWhitelistPatterns;
std::vector<std::regex> _serviceWhitelistPatterns;
ros::XMLRPCManager xmlrpcServer;
std::unordered_map<TopicAndDatatype, foxglove::Channel, PairHash> _advertisedTopics;
std::unordered_map<foxglove::ChannelId, TopicAndDatatype> _channelToTopicAndDatatype;
std::unordered_map<foxglove::ChannelId, foxglove::ChannelWithoutId> _advertisedTopics;
std::unordered_map<foxglove::ChannelId, SubscriptionsByClient> _subscriptions;
std::unordered_map<foxglove::ServiceId, foxglove::ServiceWithoutId> _advertisedServices;
PublicationsByClient _clientAdvertisedTopics;
Expand Down
Loading