From c6e5290f3f426312d9ba4bdd5a56d0f791a3f079 Mon Sep 17 00:00:00 2001 From: Sean Kelly Date: Tue, 10 May 2022 19:37:20 -0400 Subject: [PATCH] Make unpublished topics unrecorded by default (#968) * Make unpublished topics unrecorded by default Topics with no publishers do not offer any QoS profiles. This causes the recorder to subscribe with a default QoS, which is often disadvantageous. This change makes subscribing to unpublished topics optional, off by default. Once a publisher is discovered for a topic, discovery picks it up and adds it to the bag. This addresses #967 Signed-off-by: Sean Kelly * Feedback from review Signed-off-by: Sean Kelly * Additional review feedback Signed-off-by: Sean Kelly * Narrow scope of tests and make more deterministic Signed-off-by: Sean Kelly * Fix test expectation for unpublished topics Signed-off-by: Sean Kelly * Add ROSBAG2_TRANSPORT_EXPORT for get_requested_or_available_topics() This is needed to try to fix Windows build Co-authored-by: Sean Kelly Co-authored-by: Michael Orlov --- README.md | 1 + ros2bag/ros2bag/verb/record.py | 6 ++ ros2bag/test/test_record.py | 4 - rosbag2_py/src/rosbag2_py/_transport.cpp | 1 + rosbag2_transport/CMakeLists.txt | 5 + .../rosbag2_transport/record_options.hpp | 1 + .../include/rosbag2_transport/recorder.hpp | 7 +- .../src/rosbag2_transport/record_options.cpp | 4 + .../src/rosbag2_transport/topic_filter.cpp | 14 +++ .../test/rosbag2_transport/mock_recorder.hpp | 70 ++++++++++++++ ..._record_all_include_unpublished_topics.cpp | 95 +++++++++++++++++++ .../rosbag2_transport/test_record_options.cpp | 1 + 12 files changed, 202 insertions(+), 7 deletions(-) create mode 100644 rosbag2_transport/test/rosbag2_transport/mock_recorder.hpp create mode 100644 rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp diff --git a/README.md b/README.md index 45fae53210..bd6a0bf7fd 100644 --- a/README.md +++ b/README.md @@ -222,6 +222,7 @@ output_bags: compression_queue_size: 1 compression_threads: 0 include_hidden_topics: false + include_unpublished_topics: false ``` Example merge: diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index a67c54fdab..51c4fce982 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -60,6 +60,11 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '-x', '--exclude', default='', help='Exclude topics containing provided regular expression. ' 'Works on top of --all, --regex, or topics list.') + parser.add_argument( + '--include-unpublished-topics', action='store_true', + help='Discover and record topics which have no publisher. ' + 'Subscriptions on such topics will be made with default QoS unless otherwise ' + 'specified in a QoS overrides file.') parser.add_argument( '--include-hidden-topics', action='store_true', help='Discover and record hidden topics as well. ' @@ -228,6 +233,7 @@ def main(self, *, args): # noqa: D102 record_options.compression_threads = args.compression_threads record_options.topic_qos_profile_overrides = qos_profile_overrides record_options.include_hidden_topics = args.include_hidden_topics + record_options.include_unpublished_topics = args.include_unpublished_topics record_options.start_paused = args.start_paused record_options.ignore_leaf_topics = args.ignore_leaf_topics diff --git a/ros2bag/test/test_record.py b/ros2bag/test/test_record.py index 51cb400b7e..bb6222c238 100644 --- a/ros2bag/test/test_record.py +++ b/ros2bag/test/test_record.py @@ -57,10 +57,6 @@ def test_output(self, record_all_process, proc_output): "Subscribed to topic '/rosout'", process=record_all_process ) - proc_output.assertWaitFor( - "Subscribed to topic '/parameter_events'", - process=record_all_process - ) @launch_testing.post_shutdown_test() diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index ba514e97af..54a9adf717 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -270,6 +270,7 @@ PYBIND11_MODULE(_transport, m) { &RecordOptions::getTopicQoSProfileOverrides, &RecordOptions::setTopicQoSProfileOverrides) .def_readwrite("include_hidden_topics", &RecordOptions::include_hidden_topics) + .def_readwrite("include_unpublished_topics", &RecordOptions::include_unpublished_topics) .def_readwrite("start_paused", &RecordOptions::start_paused) .def_readwrite("ignore_leaf_topics", &RecordOptions::ignore_leaf_topics) ; diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 7800141804..a83e553ebd 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -173,6 +173,11 @@ function(create_tests_for_rmw_implementation) LINK_LIBS rosbag2_transport AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_record_all_include_unpublished_topics + test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_record_all_no_discovery test/rosbag2_transport/test_record_all_no_discovery.cpp LINK_LIBS rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index bda88db7fb..caed6a22f4 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -44,6 +44,7 @@ struct RecordOptions uint64_t compression_threads = 0; std::unordered_map topic_qos_profile_overrides{}; bool include_hidden_topics = false; + bool include_unpublished_topics = false; bool ignore_leaf_topics = false; bool start_paused = false; }; diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index d4c8be5c6f..d880c57353 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -109,12 +109,13 @@ class Recorder : public rclcpp::Node inline constexpr static const auto kPauseResumeToggleKey = KeyboardHandler::KeyCode::SPACE; +protected: + ROSBAG2_TRANSPORT_EXPORT + std::unordered_map get_requested_or_available_topics(); + private: void topics_discovery(); - std::unordered_map - get_requested_or_available_topics(); - std::unordered_map get_missing_topics(const std::unordered_map & all_topics); diff --git a/rosbag2_transport/src/rosbag2_transport/record_options.cpp b/rosbag2_transport/src/rosbag2_transport/record_options.cpp index 6047a1464d..9f94ac68a3 100644 --- a/rosbag2_transport/src/rosbag2_transport/record_options.cpp +++ b/rosbag2_transport/src/rosbag2_transport/record_options.cpp @@ -59,6 +59,7 @@ Node convert::encode( record_options.topic_qos_profile_overrides.end()); node["topic_qos_profile_overrides"] = qos_overrides; node["include_hidden_topics"] = record_options.include_hidden_topics; + node["include_unpublished_topics"] = record_options.include_unpublished_topics; return node; } @@ -87,6 +88,9 @@ bool convert::decode( record_options.topic_qos_profile_overrides.insert(qos_overrides.begin(), qos_overrides.end()); optional_assign(node, "include_hidden_topics", record_options.include_hidden_topics); + optional_assign( + node, "include_unpublished_topics", + record_options.include_unpublished_topics); return true; } diff --git a/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp b/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp index e036ba9920..01379ce7f3 100644 --- a/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp +++ b/rosbag2_transport/src/rosbag2_transport/topic_filter.cpp @@ -70,6 +70,14 @@ bool topic_in_list(const std::string & topic_name, const std::vector +#include +#include +#include + +#include "rosbag2_transport/recorder.hpp" + +class MockRecorder : public rosbag2_transport::Recorder +{ +public: + MockRecorder( + std::shared_ptr writer, + const rosbag2_storage::StorageOptions & storage_options, + const rosbag2_transport::RecordOptions & record_options) + : Recorder(writer, storage_options, record_options) + {} + + template + bool wait_for_topic_to_be_discovered( + const std::string & topic_name_to_wait_for, + std::chrono::duration timeout = std::chrono::seconds(10)) + { + bool discovered = false; + using clock = std::chrono::steady_clock; + auto start = clock::now(); + do { + auto topic_names_and_types = this->get_topic_names_and_types(); + for (const auto &[topic_name, topic_types] : topic_names_and_types) { + if (topic_name_to_wait_for == topic_name) { + discovered = true; + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } while (!discovered && (clock::now() - start) < timeout); + return discovered; + } + + bool topic_available_for_recording(const std::string & topic_name) + { + bool available_for_recording = false; + auto topics_to_subscribe = this->get_requested_or_available_topics(); + for (const auto & topic_and_type : topics_to_subscribe) { + if (topic_and_type.first == topic_name) { + available_for_recording = true; + break; + } + } + return available_for_recording; + } +}; + +#endif // ROSBAG2_TRANSPORT__MOCK_RECORDER_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp new file mode 100644 index 0000000000..809a1c6ad5 --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_include_unpublished_topics.cpp @@ -0,0 +1,95 @@ +// Copyright 2022 Seegrid Corporation. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include + +#include "mock_recorder.hpp" +#include "test_msgs/msg/basic_types.hpp" +#include "test_msgs/message_fixtures.hpp" +#include "rosbag2_test_common/publication_manager.hpp" +#include "record_integration_fixture.hpp" + +using namespace std::chrono_literals; // NOLINT + +TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_false_ignores_unpublished) +{ + const std::string string_topic = "/string_topic"; + auto node = std::make_shared("test_string_msg_listener_node"); + auto string_msgs_sub = node->create_subscription( + string_topic, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {}); + + rosbag2_transport::RecordOptions record_options = {true, false, {}, "rmw_format", 100ms}; + record_options.include_unpublished_topics = false; + auto recorder = std::make_shared(writer_, storage_options_, record_options); + recorder->record(); + start_async_spin(recorder); + + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); + ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); +} + +TEST_F(RecordIntegrationTestFixture, record_all_include_unpublished_true_includes_unpublished) +{ + const std::string string_topic = "/string_topic"; + auto node = std::make_shared("test_string_msg_listener_node"); + auto string_msgs_sub = node->create_subscription( + string_topic, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {}); + + rosbag2_transport::RecordOptions record_options = {true, false, {}, "rmw_format", 100ms}; + record_options.include_unpublished_topics = true; + auto recorder = std::make_shared(writer_, storage_options_, record_options); + recorder->record(); + start_async_spin(recorder); + + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); + ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); +} + +TEST_F( + RecordIntegrationTestFixture, + record_all_include_unpublished_false_includes_later_published) +{ + const std::string string_topic = "/string_topic"; + auto node = std::make_shared("test_string_msg_listener_node"); + auto string_msgs_sub = node->create_subscription( + string_topic, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {}); + + rosbag2_transport::RecordOptions record_options = {true, false, {}, "rmw_format", 100ms}; + record_options.include_unpublished_topics = false; + auto recorder = std::make_shared(writer_, storage_options_, record_options); + recorder->record(); + start_async_spin(recorder); + + ASSERT_TRUE(recorder->wait_for_topic_to_be_discovered(string_topic)); + ASSERT_FALSE(recorder->topic_available_for_recording(string_topic)); + + // Start up a publisher on our topic *after* the recording has started + auto string_message = get_messages_strings()[0]; + string_message->string_value = "Hello World"; + rosbag2_test_common::PublicationManager pub_manager; + + // Publish 10 messages at a 30ms interval for a steady 300 milliseconds worth of data + pub_manager.setup_publisher( + string_topic, string_message, 10, rclcpp::QoS{rclcpp::KeepAll()}, 30ms); + + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); + pub_manager.run_publishers(); + + ASSERT_TRUE(recorder->topic_available_for_recording(string_topic)); +} diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_options.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_options.cpp index 08dd6a8849..e305229152 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_options.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_options.cpp @@ -35,6 +35,7 @@ TEST(record_options, test_yaml_serialization) original.compression_threads = 123; original.topic_qos_profile_overrides.emplace("topic", rclcpp::QoS(10).transient_local()); original.include_hidden_topics = true; + original.include_unpublished_topics = true; auto node = YAML::convert().encode(original);