Skip to content

Commit

Permalink
Make unpublished topics unrecorded by default (ros2#968)
Browse files Browse the repository at this point in the history
* Make unpublished topics unrecorded by default

Topics with no publishers do not offer any QoS profiles. This causes the
recorder to subscribe with a default QoS, which is often disadvantageous.

This change makes subscribing to unpublished topics optional, off by
default. Once a publisher is discovered for a topic, discovery picks it
up and adds it to the bag.

This addresses ros2#967

Signed-off-by: Sean Kelly <skelly@seegrid.com>

* Feedback from review

Signed-off-by: Sean Kelly <skelly@seegrid.com>

* Additional review feedback

Signed-off-by: Sean Kelly <skelly@seegrid.com>

* Narrow scope of tests and make more deterministic

Signed-off-by: Sean Kelly <skelly@seegrid.com>

* Fix test expectation for unpublished topics

Signed-off-by: Sean Kelly <skelly@seegrid.com>

* Add ROSBAG2_TRANSPORT_EXPORT for get_requested_or_available_topics()

This is needed to try to fix Windows build

Co-authored-by: Sean Kelly <skelly@seegrid.com>
Co-authored-by: Michael Orlov <morlovmr@gmail.com>
  • Loading branch information
3 people committed May 10, 2022
1 parent 4e4619b commit c6e5290
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ output_bags:
compression_queue_size: 1
compression_threads: 0
include_hidden_topics: false
include_unpublished_topics: false
```

Example merge:
Expand Down
6 changes: 6 additions & 0 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'-x', '--exclude', default='',
help='Exclude topics containing provided regular expression. '
'Works on top of --all, --regex, or topics list.')
parser.add_argument(
'--include-unpublished-topics', action='store_true',
help='Discover and record topics which have no publisher. '
'Subscriptions on such topics will be made with default QoS unless otherwise '
'specified in a QoS overrides file.')
parser.add_argument(
'--include-hidden-topics', action='store_true',
help='Discover and record hidden topics as well. '
Expand Down Expand Up @@ -228,6 +233,7 @@ def main(self, *, args): # noqa: D102
record_options.compression_threads = args.compression_threads
record_options.topic_qos_profile_overrides = qos_profile_overrides
record_options.include_hidden_topics = args.include_hidden_topics
record_options.include_unpublished_topics = args.include_unpublished_topics
record_options.start_paused = args.start_paused
record_options.ignore_leaf_topics = args.ignore_leaf_topics

Expand Down
4 changes: 0 additions & 4 deletions ros2bag/test/test_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ def test_output(self, record_all_process, proc_output):
"Subscribed to topic '/rosout'",
process=record_all_process
)
proc_output.assertWaitFor(
"Subscribed to topic '/parameter_events'",
process=record_all_process
)


@launch_testing.post_shutdown_test()
Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ PYBIND11_MODULE(_transport, m) {
&RecordOptions::getTopicQoSProfileOverrides,
&RecordOptions::setTopicQoSProfileOverrides)
.def_readwrite("include_hidden_topics", &RecordOptions::include_hidden_topics)
.def_readwrite("include_unpublished_topics", &RecordOptions::include_unpublished_topics)
.def_readwrite("start_paused", &RecordOptions::start_paused)
.def_readwrite("ignore_leaf_topics", &RecordOptions::ignore_leaf_topics)
;
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ function(create_tests_for_rmw_implementation)
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_record_all_include_unpublished_topics
test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_record_all_no_discovery
test/rosbag2_transport/test_record_all_no_discovery.cpp
LINK_LIBS rosbag2_transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct RecordOptions
uint64_t compression_threads = 0;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides{};
bool include_hidden_topics = false;
bool include_unpublished_topics = false;
bool ignore_leaf_topics = false;
bool start_paused = false;
};
Expand Down
7 changes: 4 additions & 3 deletions rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ class Recorder : public rclcpp::Node

inline constexpr static const auto kPauseResumeToggleKey = KeyboardHandler::KeyCode::SPACE;

protected:
ROSBAG2_TRANSPORT_EXPORT
std::unordered_map<std::string, std::string> get_requested_or_available_topics();

private:
void topics_discovery();

std::unordered_map<std::string, std::string>
get_requested_or_available_topics();

std::unordered_map<std::string, std::string>
get_missing_topics(const std::unordered_map<std::string, std::string> & all_topics);

Expand Down
4 changes: 4 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/record_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Node convert<rosbag2_transport::RecordOptions>::encode(
record_options.topic_qos_profile_overrides.end());
node["topic_qos_profile_overrides"] = qos_overrides;
node["include_hidden_topics"] = record_options.include_hidden_topics;
node["include_unpublished_topics"] = record_options.include_unpublished_topics;
return node;
}

Expand Down Expand Up @@ -87,6 +88,9 @@ bool convert<rosbag2_transport::RecordOptions>::decode(
record_options.topic_qos_profile_overrides.insert(qos_overrides.begin(), qos_overrides.end());

optional_assign<bool>(node, "include_hidden_topics", record_options.include_hidden_topics);
optional_assign<bool>(
node, "include_unpublished_topics",
record_options.include_unpublished_topics);
return true;
}

Expand Down
14 changes: 14 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/topic_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ bool topic_in_list(const std::string & topic_name, const std::vector<std::string
return it != topics.end();
}

bool
topic_is_unpublished(
const std::string & topic_name, rclcpp::node_interfaces::NodeGraphInterface & node_graph)
{
auto publishers_info = node_graph.get_publishers_info_by_topic(topic_name);
return publishers_info.size() == 0;
}

bool
is_leaf_topic(
const std::string & topic_name, rclcpp::node_interfaces::NodeGraphInterface & node_graph)
Expand Down Expand Up @@ -124,6 +132,12 @@ bool TopicFilter::take_topic(
return false;
}

if (!record_options_.include_unpublished_topics && node_graph_ &&
topic_is_unpublished(topic_name, *node_graph_))
{
return false;
}

if (record_options_.ignore_leaf_topics && node_graph_ &&
is_leaf_topic(topic_name, *node_graph_))
{
Expand Down
70 changes: 70 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/mock_recorder.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022, Apex.AI
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_TRANSPORT__MOCK_RECORDER_HPP_
#define ROSBAG2_TRANSPORT__MOCK_RECORDER_HPP_

#include <chrono>
#include <memory>
#include <string>
#include <ratio>

#include "rosbag2_transport/recorder.hpp"

class MockRecorder : public rosbag2_transport::Recorder
{
public:
MockRecorder(
std::shared_ptr<rosbag2_cpp::Writer> writer,
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::RecordOptions & record_options)
: Recorder(writer, storage_options, record_options)
{}

template<typename DurationRepT = int64_t, typename DurationT = std::milli>
bool wait_for_topic_to_be_discovered(
const std::string & topic_name_to_wait_for,
std::chrono::duration<DurationRepT, DurationT> timeout = std::chrono::seconds(10))
{
bool discovered = false;
using clock = std::chrono::steady_clock;
auto start = clock::now();
do {
auto topic_names_and_types = this->get_topic_names_and_types();
for (const auto &[topic_name, topic_types] : topic_names_and_types) {
if (topic_name_to_wait_for == topic_name) {
discovered = true;
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
} while (!discovered && (clock::now() - start) < timeout);
return discovered;
}

bool topic_available_for_recording(const std::string & topic_name)
{
bool available_for_recording = false;
auto topics_to_subscribe = this->get_requested_or_available_topics();
for (const auto & topic_and_type : topics_to_subscribe) {
if (topic_and_type.first == topic_name) {
available_for_recording = true;
break;
}
}
return available_for_recording;
}
};

#endif // ROSBAG2_TRANSPORT__MOCK_RECORDER_HPP_
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2022 Seegrid Corporation. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gmock/gmock.h>

#include <chrono>
#include <memory>
#include <string>
#include <utility>

#include "mock_recorder.hpp"
#include "test_msgs/msg/basic_types.hpp"
#include "test_msgs/message_fixtures.hpp"
#include "rosbag2_test_common/publication_manager.hpp"
#include "record_integration_fixture.hpp"

using namespace std::chrono_literals; // NOLINT

TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_false_ignores_unpublished)
{
const std::string string_topic = "/string_topic";
auto node = std::make_shared<rclcpp::Node>("test_string_msg_listener_node");
auto string_msgs_sub = node->create_subscription<test_msgs::msg::Strings>(
string_topic, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {});

rosbag2_transport::RecordOptions record_options = {true, false, {}, "rmw_format", 100ms};
record_options.include_unpublished_topics = false;
auto recorder = std::make_shared<MockRecorder>(writer_, storage_options_, record_options);
recorder->record();
start_async_spin(recorder);

ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic));
ASSERT_FALSE(recorder->topic_available_for_recording(string_topic));
}

TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_true_includes_unpublished)
{
const std::string string_topic = "/string_topic";
auto node = std::make_shared<rclcpp::Node>("test_string_msg_listener_node");
auto string_msgs_sub = node->create_subscription<test_msgs::msg::Strings>(
string_topic, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {});

rosbag2_transport::RecordOptions record_options = {true, false, {}, "rmw_format", 100ms};
record_options.include_unpublished_topics = true;
auto recorder = std::make_shared<MockRecorder>(writer_, storage_options_, record_options);
recorder->record();
start_async_spin(recorder);

ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic));
ASSERT_TRUE(recorder->topic_available_for_recording(string_topic));
}

TEST_F(
RecordIntegrationTestFixture,
record_all_include_unpublished_false_includes_later_published)
{
const std::string string_topic = "/string_topic";
auto node = std::make_shared<rclcpp::Node>("test_string_msg_listener_node");
auto string_msgs_sub = node->create_subscription<test_msgs::msg::Strings>(
string_topic, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {});

rosbag2_transport::RecordOptions record_options = {true, false, {}, "rmw_format", 100ms};
record_options.include_unpublished_topics = false;
auto recorder = std::make_shared<MockRecorder>(writer_, storage_options_, record_options);
recorder->record();
start_async_spin(recorder);

ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic));
ASSERT_FALSE(recorder->topic_available_for_recording(string_topic));

// Start up a publisher on our topic *after* the recording has started
auto string_message = get_messages_strings()[0];
string_message->string_value = "Hello World";
rosbag2_test_common::PublicationManager pub_manager;

// Publish 10 messages at a 30ms interval for a steady 300 milliseconds worth of data
pub_manager.setup_publisher(
string_topic, string_message, 10, rclcpp::QoS{rclcpp::KeepAll()}, 30ms);

ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str()));
pub_manager.run_publishers();

ASSERT_TRUE(recorder->topic_available_for_recording(string_topic));
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ TEST(record_options, test_yaml_serialization)
original.compression_threads = 123;
original.topic_qos_profile_overrides.emplace("topic", rclcpp::QoS(10).transient_local());
original.include_hidden_topics = true;
original.include_unpublished_topics = true;

auto node = YAML::convert<rosbag2_transport::RecordOptions>().encode(original);

Expand Down

0 comments on commit c6e5290

Please sign in to comment.