Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assume publisher qos depth of 1 if the middleware reports the qos history as unknown #239

Merged
merged 4 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Parameters are provided to configure the behavior of the bridge. These parameter
* (ROS 1) __max_update_ms__: The maximum number of milliseconds to wait in between polling `roscore` for new topics, services, or parameters. Defaults to `5000`.
* (ROS 2) __num_threads__: The number of threads to use for the ROS node executor. This controls the number of subscriptions that can be processed in parallel. 0 means one thread per CPU core. Defaults to `0`.
* (ROS 2) __min_qos_depth__: Minimum depth used for the QoS profile of subscriptions. Defaults to `1`. This is to set a lower limit for a subscriber's QoS depth which is computed by summing up depths of all publishers. See also [#208](https://github.com/foxglove/ros-foxglove-bridge/issues/208).
* (ROS 2) __max_qos_depth__: Maximum depth used for the QoS profile of subscriptions. Defaults to `10`.
* (ROS 2) __max_qos_depth__: Maximum depth used for the QoS profile of subscriptions. Defaults to `25`.
* (ROS 2) __include_hidden__: Include hidden topics and services. Defaults to `false`.

## Building from source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ std::future<ServiceResponse> waitForServiceResponse(std::shared_ptr<ClientInterf
std::future<Service> waitForService(std::shared_ptr<ClientInterface> client,
const std::string& serviceName);

std::future<Channel> waitForChannel(std::shared_ptr<ClientInterface> client,
const std::string& topicName);

extern template class Client<websocketpp::config::asio_client>;

} // namespace foxglove
54 changes: 36 additions & 18 deletions foxglove_bridge_base/src/test/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,37 @@ namespace foxglove {
constexpr auto DEFAULT_TIMEOUT = std::chrono::seconds(5);

std::vector<uint8_t> connectClientAndReceiveMsg(const std::string& uri,
const std::string& topic_name) {
const std::string& topicName) {
// Set up text message handler to resolve the promise when the topic is advertised
foxglove::Client<websocketpp::config::asio_client> wsClient;
auto wsClient = std::make_shared<foxglove::Client<websocketpp::config::asio_client>>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be a shared_ptr? Would unique_ptr work? I don't see the ownership being actually shared anywhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, will see what I can do. Can just pass a raw pointer to waitForChannel then

std::promise<nlohmann::json> channelPromise;
auto channelFuture = channelPromise.get_future();
wsClient.setTextMessageHandler([&topic_name, &channelPromise](const std::string& payload) {
const auto msg = nlohmann::json::parse(payload);
const auto& op = msg.at("op").get<std::string>();
if (op == "advertise") {
for (const auto& channel : msg.at("channels")) {
if (topic_name == channel.at("topic")) {
channelPromise.set_value(channel);
}
}
}
});
auto channelFuture = waitForChannel(wsClient, topicName);

// Connect the client and wait for the channel future
if (std::future_status::ready != wsClient.connect(uri).wait_for(DEFAULT_TIMEOUT)) {
if (std::future_status::ready != wsClient->connect(uri).wait_for(DEFAULT_TIMEOUT)) {
throw std::runtime_error("Client failed to connect");
} else if (std::future_status::ready != channelFuture.wait_for(DEFAULT_TIMEOUT)) {
throw std::runtime_error("Client failed to receive channel");
}

const auto channel = channelFuture.get();
const SubscriptionId subscriptionId = 1;

// Set up binary message handler to resolve when a binary message has been received
std::promise<std::vector<uint8_t>> msgPromise;
auto msgFuture = msgPromise.get_future();
wsClient.setBinaryMessageHandler([&msgPromise](const uint8_t* data, size_t dataLength) {
wsClient->setBinaryMessageHandler([&msgPromise](const uint8_t* data, size_t dataLength) {
if (ReadUint32LE(data + 1) != subscriptionId) {
return;
}
const size_t offset = 1 + 4 + 8;
std::vector<uint8_t> dataCopy(dataLength - offset);
std::memcpy(dataCopy.data(), data + offset, dataLength - offset);
msgPromise.set_value(std::move(dataCopy));
});

// Subscribe to the channel that corresponds to the topic
const auto channelId = channelFuture.get().at("id").get<foxglove::ChannelId>();
wsClient.subscribe({{1, channelId}});
wsClient->subscribe({{subscriptionId, channel.id}});

// Wait until we have received a binary message
if (std::future_status::ready != msgFuture.wait_for(DEFAULT_TIMEOUT)) {
Expand Down Expand Up @@ -118,6 +112,30 @@ std::future<Service> waitForService(std::shared_ptr<ClientInterface> client,
return future;
}

std::future<Channel> waitForChannel(std::shared_ptr<ClientInterface> client,
const std::string& topicName) {
auto promise = std::make_shared<std::promise<Channel>>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shared_ptr? The std::promise example code seems to allocate a promise on the stack and still move it into a thread or lambda.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that John and I had some issues with that but don't remember exactly why. Will see what I can do once I created a new release

auto future = promise->get_future();

client->setTextMessageHandler(
[promise = std::move(promise), topicName](const std::string& payload) mutable {
const auto msg = nlohmann::json::parse(payload);
const auto& op = msg["op"].get<std::string>();

if (op == "advertise") {
const auto channels = msg["channels"].get<std::vector<Channel>>();
for (const auto& channel : channels) {
if (channel.topic == topicName) {
promise->set_value(channel);
break;
}
}
}
});

return future;
}

// Explicit template instantiation
template class Client<websocketpp::config::asio_client>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ constexpr int64_t DEFAULT_PORT = 8765;
constexpr char DEFAULT_ADDRESS[] = "0.0.0.0";
constexpr int64_t DEFAULT_SEND_BUFFER_LIMIT = 10000000;
constexpr int64_t DEFAULT_MIN_QOS_DEPTH = 1;
constexpr int64_t DEFAULT_MAX_QOS_DEPTH = 10;
constexpr int64_t DEFAULT_MAX_QOS_DEPTH = 25;

void declareParameters(rclcpp::Node* node);

Expand Down
25 changes: 21 additions & 4 deletions ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,27 @@ void FoxgloveBridge::subscribe(foxglove::ChannelId channelId, ConnectionHandle c
if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
++durabilityTransientLocalEndpointsCount;
}
depth = std::min(_maxQosDepth, depth + qos.depth());
}

rclcpp::QoS qos{rclcpp::KeepLast(std::max(depth, _minQosDepth))};
// Some RMWs do not retrieve history information of the publisher endpoint in which case the
// history depth is 0. We use a lower limit of 1 here, so that the history depth is at least
// equal to the number of publishers. This covers the case where there are multiple
// transient_local publishers with a depth of 1 (e.g. multiple tf_static transform
// broadcasters). See also
// https://github.com/foxglove/ros-foxglove-bridge/issues/238 and
// https://github.com/foxglove/ros-foxglove-bridge/issues/208
const size_t publisherHistoryDepth = std::max(1ul, qos.depth());
depth = depth + publisherHistoryDepth;
}

depth = std::max(depth, _minQosDepth);
if (depth > _maxQosDepth) {
RCLCPP_WARN(this->get_logger(),
"Limiting history depth for topic '%s' to %zu (was %zu). You may want to increase "
"the max_qos_depth parameter value.",
topic.c_str(), _maxQosDepth, depth);
depth = _maxQosDepth;
}

rclcpp::QoS qos{rclcpp::KeepLast(depth)};

// If all endpoints are reliable, ask for reliable
if (reliabilityReliableEndpointsCount == publisherInfo.size()) {
Expand Down
47 changes: 47 additions & 0 deletions ros2_foxglove_bridge/tests/smoke_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,53 @@ TEST_F(ServiceTest, testCallServiceParallel) {
}
}

TEST(SmokeTest, receiveMessagesOfMultipleTransientLocalPublishers) {
const std::string topicName = "/latched";
auto node = rclcpp::Node::make_shared("node");
rclcpp::QoS qos = rclcpp::QoS(rclcpp::KeepLast(1));
qos.transient_local();
qos.reliable();

rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
auto spinnerThread = std::thread([&executor]() {
executor.spin();
});

constexpr size_t nPubs = 15;
std::vector<rclcpp::Publisher<std_msgs::msg::String>::SharedPtr> pubs;
for (size_t i = 0; i < nPubs; ++i) {
auto pub = pubs.emplace_back(node->create_publisher<std_msgs::msg::String>(topicName, qos));
std_msgs::msg::String msg;
msg.data = "Hello";
pub->publish(msg);
}

// Set up a client and subscribe to the channel.
auto client = std::make_shared<foxglove::Client<websocketpp::config::asio_client>>();
auto channelFuture = foxglove::waitForChannel(client, topicName);
ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(ONE_SECOND));
ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND));
const foxglove::Channel channel = channelFuture.get();
const foxglove::SubscriptionId subscriptionId = 1;

// Set up binary message handler to resolve the promise when all nPub message have been received
std::promise<void> promise;
size_t nReceivedMessages = 0;
client->setBinaryMessageHandler([&promise, &nReceivedMessages](const uint8_t*, size_t) {
if (++nReceivedMessages == nPubs) {
promise.set_value();
}
});

// Subscribe to the channel and confirm that the promise resolves
client->subscribe({{subscriptionId, channel.id}});
ASSERT_EQ(std::future_status::ready, promise.get_future().wait_for(ONE_SECOND));

executor.cancel();
spinnerThread.join();
}

// Run all the tests that were declared with TEST()
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
Expand Down