Skip to content

Commit

Permalink
Assume publisher qos depth of 1 if the middleware reports the qos his…
Browse files Browse the repository at this point in the history
…tory as unknown (#239)

### Public-Facing Changes

Assume publisher qos depth of 1 if the middleware reports the qos
history as unknown

### Description
Some RMWs do not report history information of a publisher endpoint in
which case the history depth is reported as 0. We internally use this
depth to calculate an appropriate depth for the subscription, which uses
the `KeepLast(depth)` history policy.
In case a publisher's history depth is reported as 0, we assume a depth
of 1 so that the final history depth is at least equal to the number of
publishers. This covers the case where there are multiple
transient_local publishers with a depth of 1 (e.g. multiple tf_static
transform broadcasters). Before this PR, a user would have to manually
specify a `min_qos_depth` of N when having N tf_static broadcasters.

This PR also increases the max qos depth default value, 10 seemed very
low. I'm not sure anymore why we added an upper limit in the first
place. Additionally, a warning is logged when the upper limit is
reached.

Fixes #238 
See also #208
  • Loading branch information
achim-k committed Jul 5, 2023
1 parent 6281aec commit 6660b0d
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 24 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -80,7 +80,7 @@ Parameters are provided to configure the behavior of the bridge. These parameter
* (ROS 1) __max_update_ms__: The maximum number of milliseconds to wait in between polling `roscore` for new topics, services, or parameters. Defaults to `5000`.
* (ROS 2) __num_threads__: The number of threads to use for the ROS node executor. This controls the number of subscriptions that can be processed in parallel. 0 means one thread per CPU core. Defaults to `0`.
* (ROS 2) __min_qos_depth__: Minimum depth used for the QoS profile of subscriptions. Defaults to `1`. This is to set a lower limit for a subscriber's QoS depth which is computed by summing up depths of all publishers. See also [#208](https://github.com/foxglove/ros-foxglove-bridge/issues/208).
* (ROS 2) __max_qos_depth__: Maximum depth used for the QoS profile of subscriptions. Defaults to `10`.
* (ROS 2) __max_qos_depth__: Maximum depth used for the QoS profile of subscriptions. Defaults to `25`.
* (ROS 2) __include_hidden__: Include hidden topics and services. Defaults to `false`.

## Building from source
Expand Down
Expand Up @@ -22,6 +22,9 @@ std::future<ServiceResponse> waitForServiceResponse(std::shared_ptr<ClientInterf
std::future<Service> waitForService(std::shared_ptr<ClientInterface> client,
const std::string& serviceName);

std::future<Channel> waitForChannel(std::shared_ptr<ClientInterface> client,
const std::string& topicName);

extern template class Client<websocketpp::config::asio_client>;

} // namespace foxglove
54 changes: 36 additions & 18 deletions foxglove_bridge_base/src/test/test_client.cpp
Expand Up @@ -12,43 +12,37 @@ namespace foxglove {
constexpr auto DEFAULT_TIMEOUT = std::chrono::seconds(5);

std::vector<uint8_t> connectClientAndReceiveMsg(const std::string& uri,
const std::string& topic_name) {
const std::string& topicName) {
// Set up text message handler to resolve the promise when the topic is advertised
foxglove::Client<websocketpp::config::asio_client> wsClient;
auto wsClient = std::make_shared<foxglove::Client<websocketpp::config::asio_client>>();
std::promise<nlohmann::json> channelPromise;
auto channelFuture = channelPromise.get_future();
wsClient.setTextMessageHandler([&topic_name, &channelPromise](const std::string& payload) {
const auto msg = nlohmann::json::parse(payload);
const auto& op = msg.at("op").get<std::string>();
if (op == "advertise") {
for (const auto& channel : msg.at("channels")) {
if (topic_name == channel.at("topic")) {
channelPromise.set_value(channel);
}
}
}
});
auto channelFuture = waitForChannel(wsClient, topicName);

// Connect the client and wait for the channel future
if (std::future_status::ready != wsClient.connect(uri).wait_for(DEFAULT_TIMEOUT)) {
if (std::future_status::ready != wsClient->connect(uri).wait_for(DEFAULT_TIMEOUT)) {
throw std::runtime_error("Client failed to connect");
} else if (std::future_status::ready != channelFuture.wait_for(DEFAULT_TIMEOUT)) {
throw std::runtime_error("Client failed to receive channel");
}

const auto channel = channelFuture.get();
const SubscriptionId subscriptionId = 1;

// Set up binary message handler to resolve when a binary message has been received
std::promise<std::vector<uint8_t>> msgPromise;
auto msgFuture = msgPromise.get_future();
wsClient.setBinaryMessageHandler([&msgPromise](const uint8_t* data, size_t dataLength) {
wsClient->setBinaryMessageHandler([&msgPromise](const uint8_t* data, size_t dataLength) {
if (ReadUint32LE(data + 1) != subscriptionId) {
return;
}
const size_t offset = 1 + 4 + 8;
std::vector<uint8_t> dataCopy(dataLength - offset);
std::memcpy(dataCopy.data(), data + offset, dataLength - offset);
msgPromise.set_value(std::move(dataCopy));
});

// Subscribe to the channel that corresponds to the topic
const auto channelId = channelFuture.get().at("id").get<foxglove::ChannelId>();
wsClient.subscribe({{1, channelId}});
wsClient->subscribe({{subscriptionId, channel.id}});

// Wait until we have received a binary message
if (std::future_status::ready != msgFuture.wait_for(DEFAULT_TIMEOUT)) {
Expand Down Expand Up @@ -118,6 +112,30 @@ std::future<Service> waitForService(std::shared_ptr<ClientInterface> client,
return future;
}

std::future<Channel> waitForChannel(std::shared_ptr<ClientInterface> client,
const std::string& topicName) {
auto promise = std::make_shared<std::promise<Channel>>();
auto future = promise->get_future();

client->setTextMessageHandler(
[promise = std::move(promise), topicName](const std::string& payload) mutable {
const auto msg = nlohmann::json::parse(payload);
const auto& op = msg["op"].get<std::string>();

if (op == "advertise") {
const auto channels = msg["channels"].get<std::vector<Channel>>();
for (const auto& channel : channels) {
if (channel.topic == topicName) {
promise->set_value(channel);
break;
}
}
}
});

return future;
}

// Explicit template instantiation
template class Client<websocketpp::config::asio_client>;

Expand Down
Expand Up @@ -28,7 +28,7 @@ constexpr int64_t DEFAULT_PORT = 8765;
constexpr char DEFAULT_ADDRESS[] = "0.0.0.0";
constexpr int64_t DEFAULT_SEND_BUFFER_LIMIT = 10000000;
constexpr int64_t DEFAULT_MIN_QOS_DEPTH = 1;
constexpr int64_t DEFAULT_MAX_QOS_DEPTH = 10;
constexpr int64_t DEFAULT_MAX_QOS_DEPTH = 25;

void declareParameters(rclcpp::Node* node);

Expand Down
25 changes: 21 additions & 4 deletions ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp
Expand Up @@ -457,10 +457,27 @@ void FoxgloveBridge::subscribe(foxglove::ChannelId channelId, ConnectionHandle c
if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
++durabilityTransientLocalEndpointsCount;
}
depth = std::min(_maxQosDepth, depth + qos.depth());
}

rclcpp::QoS qos{rclcpp::KeepLast(std::max(depth, _minQosDepth))};
// Some RMWs do not retrieve history information of the publisher endpoint in which case the
// history depth is 0. We use a lower limit of 1 here, so that the history depth is at least
// equal to the number of publishers. This covers the case where there are multiple
// transient_local publishers with a depth of 1 (e.g. multiple tf_static transform
// broadcasters). See also
// https://github.com/foxglove/ros-foxglove-bridge/issues/238 and
// https://github.com/foxglove/ros-foxglove-bridge/issues/208
const size_t publisherHistoryDepth = std::max(1ul, qos.depth());
depth = depth + publisherHistoryDepth;
}

depth = std::max(depth, _minQosDepth);
if (depth > _maxQosDepth) {
RCLCPP_WARN(this->get_logger(),
"Limiting history depth for topic '%s' to %zu (was %zu). You may want to increase "
"the max_qos_depth parameter value.",
topic.c_str(), _maxQosDepth, depth);
depth = _maxQosDepth;
}

rclcpp::QoS qos{rclcpp::KeepLast(depth)};

// If all endpoints are reliable, ask for reliable
if (reliabilityReliableEndpointsCount == publisherInfo.size()) {
Expand Down
47 changes: 47 additions & 0 deletions ros2_foxglove_bridge/tests/smoke_test.cpp
Expand Up @@ -496,6 +496,53 @@ TEST_F(ServiceTest, testCallServiceParallel) {
}
}

TEST(SmokeTest, receiveMessagesOfMultipleTransientLocalPublishers) {
const std::string topicName = "/latched";
auto node = rclcpp::Node::make_shared("node");
rclcpp::QoS qos = rclcpp::QoS(rclcpp::KeepLast(1));
qos.transient_local();
qos.reliable();

rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
auto spinnerThread = std::thread([&executor]() {
executor.spin();
});

constexpr size_t nPubs = 15;
std::vector<rclcpp::Publisher<std_msgs::msg::String>::SharedPtr> pubs;
for (size_t i = 0; i < nPubs; ++i) {
auto pub = pubs.emplace_back(node->create_publisher<std_msgs::msg::String>(topicName, qos));
std_msgs::msg::String msg;
msg.data = "Hello";
pub->publish(msg);
}

// Set up a client and subscribe to the channel.
auto client = std::make_shared<foxglove::Client<websocketpp::config::asio_client>>();
auto channelFuture = foxglove::waitForChannel(client, topicName);
ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND));
ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND));
const foxglove::Channel channel = channelFuture.get();
const foxglove::SubscriptionId subscriptionId = 1;

// Set up binary message handler to resolve the promise when all nPub message have been received
std::promise<void> promise;
size_t nReceivedMessages = 0;
client->setBinaryMessageHandler([&promise, &nReceivedMessages](const uint8_t*, size_t) {
if (++nReceivedMessages == nPubs) {
promise.set_value();
}
});

// Subscribe to the channel and confirm that the promise resolves
client->subscribe({{subscriptionId, channel.id}});
ASSERT_EQ(std::future_status::ready, promise.get_future().wait_for(ONE_SECOND));

executor.cancel();
spinnerThread.join();
}

// Run all the tests that were declared with TEST()
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
Expand Down

0 comments on commit 6660b0d

Please sign in to comment.