From f6d5c641763ac5e947f7457bdad1c1d8b3de462e Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Wed, 21 Apr 2021 12:50:01 -0700 Subject: [PATCH] use public recorder api in tests (#741) * use public recorder api in tests Signed-off-by: Karsten Knese * fix segfault Signed-off-by: Karsten Knese * match rmw_connext* Signed-off-by: Karsten Knese --- .../include/rosbag2_test_common/wait_for.hpp | 15 + rosbag2_transport/CMakeLists.txt | 2 +- .../include/rosbag2_transport/recorder.hpp | 3 + .../src/rosbag2_transport/recorder.cpp | 5 + .../record_integration_fixture.hpp | 24 +- .../test/rosbag2_transport/test_record.cpp | 270 ++++++++++++------ .../rosbag2_transport/test_record_all.cpp | 47 ++- .../test_record_all_no_discovery.cpp | 38 ++- .../rosbag2_transport/test_record_regex.cpp | 154 ++++++---- 9 files changed, 387 insertions(+), 171 deletions(-) diff --git a/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp b/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp index 608aa4837b..d8406c286b 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp @@ -36,6 +36,21 @@ bool spin_and_wait_for(Timeout timeout, const Node & node, Condition condition) } return true; } + +template +bool wait_until_shutdown(Timeout timeout, Condition condition) +{ + using clock = std::chrono::system_clock; + auto start = clock::now(); + while (!condition()) { + if ((clock::now() - start) > timeout) { + return false; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + rclcpp::shutdown(); + return true; +} } // namespace rosbag2_test_common #endif // ROSBAG2_TEST_COMMON__WAIT_FOR_HPP_ diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index c037f4588b..e6eb8ea99b 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -79,7 +79,7 @@ function(create_tests_for_rmw_implementation) # disable the following tests for connext # due to slower discovery of nodes set(SKIP_TEST "") - if(${rmw_implementation} MATCHES "rmw_connext(.*)_cpp") + if(${rmw_implementation} MATCHES "rmw_connext(.*)") set(SKIP_TEST "SKIP_TEST") endif() diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index fa2eb0c03f..4a8fa6f662 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -77,6 +77,9 @@ class Recorder : public rclcpp::Node return subscriptions_; } + ROSBAG2_TRANSPORT_PUBLIC + const rosbag2_cpp::Writer & get_writer_handle(); + private: void topics_discovery(); diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 18ec4ffe7a..240d8b4f61 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -104,6 +104,11 @@ void Recorder::record() } } +const rosbag2_cpp::Writer & Recorder::get_writer_handle() +{ + return *writer_; +} + void Recorder::topics_discovery() { while (rclcpp::ok() && stop_discovery_ == false) { diff --git a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp index b421999db6..ec7df56ff8 100644 --- a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp @@ -42,24 +42,32 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture public: RecordIntegrationTestFixture() : Rosbag2TransportTestFixture() + { + } + + void SetUp() override { rclcpp::init(0, nullptr); } - void start_recording(const RecordOptions & options) + void TearDown() override + { + rclcpp::shutdown(); + } + + template + void start_async_spin(T node) { - // the future object returned from std::async needs to be stored not to block the execution future_ = std::async( - std::launch::async, [this, options]() { - rosbag2_transport::Recorder recorder(writer_); - recorder.record(storage_options_, options); - }); + std::launch::async, [node]() -> void {rclcpp::spin(node);}); } - void stop_recording() + void stop_spinning() { rclcpp::shutdown(); - future_.get(); + if (future_.valid()) { + future_.wait(); + } } void run_publishers() diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index c7b93c3b44..c9942715b6 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -17,13 +17,14 @@ #include #include #include +#include #include #include "rclcpp/rclcpp.hpp" #include "rosbag2_test_common/wait_for.hpp" -#include "rosbag2_transport/rosbag2_transport.hpp" +#include "rosbag2_transport/recorder.hpp" #include "test_msgs/msg/arrays.hpp" #include "test_msgs/message_fixtures.hpp" @@ -39,20 +40,43 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are auto string_message = get_messages_strings()[1]; std::string string_topic = "/string_topic"; - start_recording({false, false, {string_topic, array_topic}, "rmw_format", 100ms}); + // TODO(karsten1987) Refactor this into publication manager + auto pub_node = std::make_shared("rosbag2_test_record_1"); + auto array_pub = pub_node->create_publisher( + array_topic, rclcpp::QoS{rclcpp::KeepAll()}); + auto string_pub = pub_node->create_publisher( + string_topic, rclcpp::QoS{rclcpp::KeepAll()}); - pub_man_.add_publisher( - string_topic, string_message, 2); - pub_man_.add_publisher( - array_topic, array_message, 2); - run_publishers(); - stop_recording(); + rosbag2_transport::RecordOptions record_options = + {false, false, {string_topic, array_topic}, "rmw_format", 100ms}; + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); + + start_async_spin(recorder); + + for (auto i = 0u; i < 2; ++i) { + array_pub->publish(*array_message); + } + for (auto i = 0u; i < 2; ++i) { + string_pub->publish(*string_message); + } + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); - MockSequentialWriter & writer = - static_cast(writer_->get_implementation_handle()); - auto recorded_messages = writer.get_messages(); - auto recorded_topics = writer.get_topics(); + size_t expected_messages = 4; + auto ret = rosbag2_test_common::wait_until_shutdown( + std::chrono::seconds(5), + [&mock_writer, &expected_messages]() { + return mock_writer.get_messages().size() >= expected_messages; + }); + auto recorded_messages = mock_writer.get_messages(); + EXPECT_TRUE(ret) << "failed to capture expected messages in time"; + EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); + auto recorded_topics = mock_writer.get_topics(); ASSERT_THAT(recorded_topics, SizeIs(2)); EXPECT_THAT(recorded_topics.at(string_topic).serialization_format, Eq("rmw_format")); EXPECT_THAT(recorded_topics.at(array_topic).serialization_format, Eq("rmw_format")); @@ -72,14 +96,38 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) { auto string_message = get_messages_strings()[1]; std::string topic = "/chatter"; - start_recording({false, false, {topic}, "rmw_format", 100ms}); - pub_man_.add_publisher(topic, string_message, 2); - run_publishers(); - stop_recording(); - - MockSequentialWriter & writer = - static_cast(writer_->get_implementation_handle()); - auto recorded_topics = writer.get_topics(); + + auto pub_node = std::make_shared("rosbag2_test_record_2"); + auto string_pub = pub_node->create_publisher( + topic, rclcpp::QoS{rclcpp::KeepAll()}); + + rosbag2_transport::RecordOptions record_options = + {false, false, {topic}, "rmw_format", 100ms}; + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); + + start_async_spin(recorder); + + for (auto i = 0u; i < 2; ++i) { + string_pub->publish(*string_message); + } + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + size_t expected_messages = 2; + auto ret = rosbag2_test_common::wait_until_shutdown( + std::chrono::seconds(5), + [&mock_writer, &expected_messages]() { + return mock_writer.get_messages().size() >= expected_messages; + }); + auto recorded_messages = mock_writer.get_messages(); + EXPECT_TRUE(ret) << "failed to capture expected messages in time"; + EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); + + auto recorded_topics = mock_writer.get_topics(); std::string serialized_profiles = recorded_topics.at(topic).offered_qos_profiles; // Basic smoke test that the profile was serialized into the metadata as a string. EXPECT_THAT(serialized_profiles, ContainsRegex("reliability: 1\n")); @@ -107,72 +155,83 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) TEST_F(RecordIntegrationTestFixture, records_sensor_data) { - std::string topic = "/string_topic"; - start_recording({false, false, {topic}, "rmw_format", 100ms}); + auto string_message = get_messages_strings()[1]; + std::string topic = "/chatter"; - auto publisher_node = std::make_shared("publisher_for_qos_test"); - auto publisher = publisher_node->create_publisher( + auto pub_node = std::make_shared("rosbag2_test_record_3"); + auto string_pub = pub_node->create_publisher( topic, rclcpp::SensorDataQoS()); - auto publish_timer = publisher_node->create_wall_timer( - 50ms, [publisher]() -> void { - test_msgs::msg::Strings msg; - msg.string_value = "Hello"; - publisher->publish(msg); - } - ); - auto & writer = static_cast(writer_->get_implementation_handle()); - // Takes ~200ms in local testing, 5s chosen as a very long timeout - bool succeeded = rosbag2_test_common::spin_and_wait_for( - std::chrono::seconds(5), publisher_node, - [&writer]() { - return writer.get_messages().size() > 0; - }); - ASSERT_TRUE(succeeded); - stop_recording(); + rosbag2_transport::RecordOptions record_options = + {false, false, {topic}, "rmw_format", 100ms}; + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); + + start_async_spin(recorder); + + for (auto i = 0u; i < 2; ++i) { + string_pub->publish(*string_message); + } + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); - auto recorded_messages = writer.get_messages(); - auto recorded_topics = writer.get_topics(); + size_t expected_messages = 2; + auto ret = rosbag2_test_common::wait_until_shutdown( + std::chrono::seconds(5), + [&mock_writer, &expected_messages]() { + return mock_writer.get_messages().size() >= expected_messages; + }); + auto recorded_messages = mock_writer.get_messages(); + auto recorded_topics = mock_writer.get_topics(); + EXPECT_TRUE(ret) << "failed to capture expected messages in time"; + EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); EXPECT_EQ(recorded_topics.size(), 1u); EXPECT_FALSE(recorded_messages.empty()); } TEST_F(RecordIntegrationTestFixture, receives_latched_messages) { - // Ensure rosbag2 can receive Transient Local Durability "latched messages" - const std::string topic = "/latched_topic"; - const size_t num_latched_messages = 3; + auto string_message = get_messages_strings()[1]; + std::string topic = "/chatter"; - auto publisher_node = std::make_shared("publisher_for_latched_test"); + size_t num_latched_messages = 3; + auto pub_node = std::make_shared("rosbag2_test_record_4"); auto profile_transient_local = rclcpp::QoS(num_latched_messages).transient_local(); - auto publisher_transient_local = publisher_node->create_publisher( + auto string_pub = pub_node->create_publisher( topic, profile_transient_local); // Publish messages before starting recording - test_msgs::msg::Strings msg; - msg.string_value = "Hello"; - { - // Scope the executor so that the node is removed from it before `spin_and_wait_for` - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(publisher_node); - for (size_t i = 0; i < num_latched_messages; i++) { - publisher_transient_local->publish(msg); - exec.spin_some(); - } + for (auto i = 0u; i < num_latched_messages; ++i) { + string_pub->publish(*string_message); } - start_recording({false, false, {topic}, "rmw_format", 100ms}); - auto & writer = static_cast(writer_->get_implementation_handle()); - // Takes ~100ms in local testing, 5s chosen as a very long timeout - bool succeeded = rosbag2_test_common::spin_and_wait_for( - std::chrono::seconds(5), publisher_node, - // The = is necessary due to an implementation difference between clang and windows - // Clang claims capture is unecessary, but windows needs it - [&writer, num_latched_messages = num_latched_messages]() { - return writer.get_messages().size() == num_latched_messages; + rosbag2_transport::RecordOptions record_options = + {false, false, {topic}, "rmw_format", 100ms}; + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); + + start_async_spin(recorder); + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + size_t expected_messages = num_latched_messages; + auto ret = rosbag2_test_common::wait_until_shutdown( + std::chrono::seconds(5), + [&mock_writer, &expected_messages]() { + return mock_writer.get_messages().size() >= expected_messages; }); - stop_recording(); - ASSERT_TRUE(succeeded); + auto recorded_messages = mock_writer.get_messages(); + auto recorded_topics = mock_writer.get_topics(); + EXPECT_TRUE(ret) << "failed to capture expected messages in time"; + EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); + EXPECT_EQ(recorded_topics.size(), 1u); + EXPECT_FALSE(recorded_messages.empty()); } TEST_F(RecordIntegrationTestFixture, mixed_qos_subscribes) { @@ -186,23 +245,26 @@ TEST_F(RecordIntegrationTestFixture, mixed_qos_subscribes) { auto profile_volatile = rclcpp::QoS(arbitrary_history).reliable().durability_volatile(); auto profile_transient_local = rclcpp::QoS(arbitrary_history).reliable().transient_local(); - auto publisher_node = std::make_shared("publisher_for_qos_test"); + auto publisher_node = std::make_shared("rosbag2_test_record_5"); auto publisher_volatile = publisher_node->create_publisher( topic, profile_volatile); auto publisher_transient_local = publisher_node->create_publisher( topic, profile_transient_local); - start_recording({false, false, {topic}, "rmw_format", 100ms}); + rosbag2_transport::RecordOptions record_options = {false, false, {topic}, "rmw_format", 100ms}; + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); + // Takes ~100ms in local testing, 5s chosen as a very long timeout bool succeeded = rosbag2_test_common::spin_and_wait_for( - std::chrono::seconds(5), publisher_node, + std::chrono::seconds(5), recorder, [publisher_volatile, publisher_transient_local]() { // This test is a success if rosbag2 has connected to both publishers return publisher_volatile->get_subscription_count() && publisher_transient_local->get_subscription_count(); }); - stop_recording(); ASSERT_TRUE(succeeded); } @@ -217,7 +279,7 @@ TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixe const auto liveliness = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; const rmw_time_t liveliness_lease_duration{0, 5000000}; - auto publisher_node = std::make_shared("publisher_for"); + auto publisher_node = std::make_shared("rosbag2_test_record_6"); auto create_pub = [publisher_node, topic](auto qos) { return publisher_node->create_publisher(topic, qos); }; @@ -235,10 +297,14 @@ TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixe .liveliness(liveliness).liveliness_lease_duration(liveliness_lease_duration); auto publisher_liveliness = create_pub(profile_liveliness); - start_recording({false, false, {topic}, "rmw_format", 100ms}); + rosbag2_transport::RecordOptions record_options = {false, false, {topic}, "rmw_format", 100ms}; + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); + // Takes ~200ms in local testing, 5s chosen as a very long timeout bool succeeded = rosbag2_test_common::spin_and_wait_for( - std::chrono::seconds(5), publisher_node, + std::chrono::seconds(5), recorder, [publisher_history, publisher_lifespan, publisher_deadline, publisher_liveliness]() { return publisher_history->get_subscription_count() && @@ -246,17 +312,27 @@ TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixe publisher_deadline->get_subscription_count() && publisher_liveliness->get_subscription_count(); }); - stop_recording(); ASSERT_TRUE(succeeded); } TEST_F(RecordIntegrationTestFixture, topic_qos_overrides) { const auto num_msgs = 3; - auto strict_msg = std::make_shared(); - strict_msg->string_value = "strict"; + test_msgs::msg::Strings strict_msg; + strict_msg.string_value = "strict"; const auto strict_topic = "/strict_topic"; + // Create two BEST_EFFORT publishers on the same topic with different Durability policies. + // If no override is specified, then the recorder cannot see any published messages. + auto profile1 = rosbag2_transport::Rosbag2QoS{}.best_effort().durability_volatile(); + auto profile2 = rosbag2_transport::Rosbag2QoS{}.best_effort().transient_local(); + // TODO(karsten1987) Refactor this into publication manager + auto pub_node = std::make_shared("rosbag2_test_record_7"); + auto pub_profile1 = pub_node->create_publisher( + strict_topic, profile1); + auto pub_profile2 = pub_node->create_publisher( + strict_topic, profile2); + rosbag2_transport::RecordOptions record_options = {false, false, {strict_topic}, "rmw_format", 100ms}; const auto profile_override = rclcpp::QoS{rclcpp::KeepAll()} @@ -266,22 +342,32 @@ TEST_F(RecordIntegrationTestFixture, topic_qos_overrides) }; record_options.topic_qos_profile_overrides = topic_qos_profile_overrides; - // Create two BEST_EFFORT publishers on the same topic with different Durability policies. - // If no override is specified, then the recorder cannot see any published messages. - auto profile1 = rosbag2_transport::Rosbag2QoS{}.best_effort().durability_volatile(); - auto profile2 = rosbag2_transport::Rosbag2QoS{}.best_effort().transient_local(); - pub_man_.add_publisher( - strict_topic, strict_msg, num_msgs, profile1); - pub_man_.add_publisher( - strict_topic, strict_msg, num_msgs, profile2); + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); - start_recording(record_options); - run_publishers(); - stop_recording(); + start_async_spin(recorder); - auto & writer = - static_cast(writer_->get_implementation_handle()); - auto recorded_messages = writer.get_messages(); + for (auto i = 0u; i < num_msgs; ++i) { + pub_profile1->publish(strict_msg); + } + for (auto i = 0u; i < num_msgs; ++i) { + pub_profile2->publish(strict_msg); + } + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + size_t expected_messages = 2 * num_msgs; + auto ret = rosbag2_test_common::wait_until_shutdown( + std::chrono::seconds(5), + [&mock_writer, &expected_messages]() { + return mock_writer.get_messages().size() >= expected_messages; + }); + auto recorded_messages = mock_writer.get_messages(); + EXPECT_TRUE(ret) << "failed to capture expected messages in time"; + EXPECT_THAT(recorded_messages, SizeIs(Ge(expected_messages))); ASSERT_GE(recorded_messages.size(), 0u); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index 0e4cf2c02b..82c23e914c 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -14,12 +14,18 @@ #include +#include #include +#include #include "test_msgs/msg/arrays.hpp" #include "test_msgs/msg/basic_types.hpp" #include "test_msgs/message_fixtures.hpp" +#include "rosbag2_test_common/wait_for.hpp" + +#include "rosbag2_transport/recorder.hpp" + #include "record_integration_fixture.hpp" TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are_recorded) @@ -33,20 +39,43 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are string_message->string_value = "Hello World"; std::string string_topic = "/string_topic"; - pub_man_.add_publisher(string_topic, string_message, 2); - pub_man_.add_publisher(array_topic, array_message, 2); + // TODO(karsten1987) Refactor this into publication manager + auto pub_node = std::make_shared("rosbag2_test_record_all_1"); + auto array_pub = pub_node->create_publisher( + array_topic, rclcpp::QoS{rclcpp::KeepAll()}); + auto string_pub = pub_node->create_publisher( + string_topic, rclcpp::QoS{rclcpp::KeepAll()}); + + rosbag2_transport::RecordOptions record_options = {true, false, {}, "rmw_format", 100ms}; + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); - start_recording({true, false, {}, "rmw_format", 100ms}); - run_publishers(); - stop_recording(); + start_async_spin(recorder); - MockSequentialWriter & writer = - static_cast(writer_->get_implementation_handle()); - auto recorded_messages = writer.get_messages(); + for (auto i = 0u; i < 2; ++i) { + array_pub->publish(*array_message); + } + for (auto i = 0u; i < 2; ++i) { + string_pub->publish(*string_message); + } + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + size_t expected_messages = 4; + auto ret = rosbag2_test_common::wait_until_shutdown( + std::chrono::seconds(5), + [&mock_writer, &expected_messages]() { + return mock_writer.get_messages().size() >= expected_messages; + }); + auto recorded_messages = mock_writer.get_messages(); // We may receive additional messages from rosout, it doesn't matter, // as long as we have received at least as many total messages as we expect - EXPECT_THAT(recorded_messages, SizeIs(Ge(4u))); + EXPECT_TRUE(ret) << "failed to capture expected messages in time"; + EXPECT_THAT(recorded_messages, SizeIs(Ge(expected_messages))); + auto string_messages = filter_messages( recorded_messages, string_topic); auto array_messages = filter_messages( diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp index 7b7ca42665..b135dfde99 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp @@ -16,8 +16,15 @@ #include #include +#include #include +#include "rclcpp/rclcpp.hpp" + +#include "rosbag2_test_common/wait_for.hpp" + +#include "rosbag2_transport/recorder.hpp" + #include "test_msgs/msg/basic_types.hpp" #include "test_msgs/message_fixtures.hpp" @@ -29,22 +36,35 @@ TEST_F(RecordIntegrationTestFixture, record_all_without_discovery_ignores_later_ auto string_message = get_messages_strings()[0]; string_message->string_value = "Hello World"; - auto publisher_node = std::make_shared( - "publisher_for_test", - rclcpp::NodeOptions().start_parameter_event_publisher(false)); + rosbag2_transport::RecordOptions record_options = {true, true, {}, "rmw_format", 100ms}; + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); - start_recording({true, true, {}, "rmw_format", 1ms}); + start_async_spin(recorder); - std::this_thread::sleep_for(100ms); + auto publisher_node = std::make_shared( + "rosbag2_test_record_all_no_discovery_1", + rclcpp::NodeOptions().start_parameter_event_publisher(false)); auto publisher = publisher_node->create_publisher(topic, 10); + for (int i = 0; i < 5; ++i) { std::this_thread::sleep_for(20ms); publisher->publish(*string_message); } - stop_recording(); - MockSequentialWriter & writer = - static_cast(writer_->get_implementation_handle()); - auto recorded_topics = writer.get_topics(); + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + size_t expected_messages = 0; + rosbag2_test_common::wait_until_shutdown( + std::chrono::seconds(2), + [&mock_writer, &expected_messages]() { + return mock_writer.get_messages().size() > expected_messages; + }); + (void) expected_messages; // we can't say anything here, there might be some rosout + + auto recorded_topics = mock_writer.get_topics(); EXPECT_EQ(0u, recorded_topics.count(topic)); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp index c2a2e1dc03..f67e6c909e 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_regex.cpp @@ -14,8 +14,16 @@ #include +#include #include #include +#include + +#include "rclcpp/rclcpp.hpp" + +#include "rosbag2_test_common/wait_for.hpp" + +#include "rosbag2_transport/recorder.hpp" #include "test_msgs/msg/arrays.hpp" #include "test_msgs/msg/basic_types.hpp" @@ -46,32 +54,53 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording) ASSERT_FALSE(std::regex_match(b3, re)); ASSERT_FALSE(std::regex_match(b4, re)); - RecordOptions ro{false, false, {}, "rmw_format", 10ms}; - ro.regex = regex; - - start_recording(ro); - - pub_man_.add_publisher( - v1, test_string_messages[0], 0); - - pub_man_.add_publisher( - b1, test_string_messages[0], 0); - pub_man_.add_publisher( - b2, test_string_messages[1], 0); - pub_man_.add_publisher( - b3, test_string_messages[0], 0); - pub_man_.add_publisher( - b4, test_string_messages[1], 0); - - run_publishers(); - stop_recording(); - - MockSequentialWriter & writer = - static_cast(writer_->get_implementation_handle()); - auto recorded_topics = writer.get_topics(); - + rosbag2_transport::RecordOptions record_options = {false, false, {}, "rmw_format", 10ms}; + record_options.regex = regex; + + // TODO(karsten1987) Refactor this into publication manager + auto pub_node = std::make_shared("rosbag2_test_record_regex_1"); + auto pub_v1 = pub_node->create_publisher( + v1, rclcpp::QoS{rclcpp::KeepAll()}); + auto pub_b1 = pub_node->create_publisher( + b1, rclcpp::QoS{rclcpp::KeepAll()}); + auto pub_b2 = pub_node->create_publisher( + b2, rclcpp::QoS{rclcpp::KeepAll()}); + auto pub_b3 = pub_node->create_publisher( + b3, rclcpp::QoS{rclcpp::KeepAll()}); + auto pub_b4 = pub_node->create_publisher( + b4, rclcpp::QoS{rclcpp::KeepAll()}); + + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); + + start_async_spin(recorder); + + for (auto i = 0u; i < 3; ++i) { + pub_v1->publish(*test_string_messages[0]); + pub_b1->publish(*test_string_messages[0]); + pub_b2->publish(*test_string_messages[1]); + pub_b3->publish(*test_string_messages[0]); + pub_b4->publish(*test_string_messages[1]); + } + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + size_t expected_messages = 3; + auto ret = rosbag2_test_common::wait_until_shutdown( + std::chrono::seconds(5), + [&mock_writer, &expected_messages]() { + return mock_writer.get_messages().size() >= expected_messages; + }); + auto recorded_messages = mock_writer.get_messages(); + // We may receive additional messages from rosout, it doesn't matter, + // as long as we have received at least as many total messages as we expect + EXPECT_TRUE(ret) << "failed to capture expected messages in time"; + EXPECT_THAT(recorded_messages, SizeIs(Ge(expected_messages))); + auto recorded_topics = mock_writer.get_topics(); EXPECT_THAT(recorded_topics, SizeIs(1)); - EXPECT_TRUE(recorded_topics.find(v1) != recorded_topics.end()); } @@ -105,34 +134,55 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_recording) ASSERT_TRUE(std::regex_match(e1, re)); ASSERT_TRUE(std::regex_match(e1, exclude)); - RecordOptions ro{false, false, {}, "rmw_format", 10ms}; - ro.regex = regex; - ro.exclude = regex_exclude; - - start_recording(ro); - - pub_man_.add_publisher( - v1, test_string_messages[0], 0); - pub_man_.add_publisher( - v2, test_string_messages[1], 0); - - pub_man_.add_publisher( - b1, test_string_messages[0], 0); - pub_man_.add_publisher( - b2, test_string_messages[1], 0); - - pub_man_.add_publisher( - e1, test_array_messages[0], 0); - - run_publishers(); - stop_recording(); - - MockSequentialWriter & writer = - static_cast(writer_->get_implementation_handle()); - auto recorded_topics = writer.get_topics(); - + rosbag2_transport::RecordOptions record_options = {false, false, {}, "rmw_format", 10ms}; + record_options.regex = regex; + record_options.exclude = regex_exclude; + + // TODO(karsten1987) Refactor this into publication manager + auto pub_node = std::make_shared("rosbag2_test_record_regex_2"); + auto pub_v1 = pub_node->create_publisher( + v1, rclcpp::QoS{rclcpp::KeepAll()}); + auto pub_v2 = pub_node->create_publisher( + v2, rclcpp::QoS{rclcpp::KeepAll()}); + auto pub_b1 = pub_node->create_publisher( + b1, rclcpp::QoS{rclcpp::KeepAll()}); + auto pub_b2 = pub_node->create_publisher( + b2, rclcpp::QoS{rclcpp::KeepAll()}); + auto pub_e1 = pub_node->create_publisher( + e1, rclcpp::QoS{rclcpp::KeepAll()}); + + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); + + start_async_spin(recorder); + + for (auto i = 0u; i < 3; ++i) { + pub_v1->publish(*test_string_messages[0]); + pub_v2->publish(*test_string_messages[1]); + pub_b1->publish(*test_string_messages[0]); + pub_b2->publish(*test_string_messages[1]); + pub_e1->publish(*test_string_messages[0]); + } + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + size_t expected_messages = 3; + auto ret = rosbag2_test_common::wait_until_shutdown( + std::chrono::seconds(5), + [&mock_writer, &expected_messages]() { + return mock_writer.get_messages().size() >= expected_messages; + }); + auto recorded_messages = mock_writer.get_messages(); + // We may receive additional messages from rosout, it doesn't matter, + // as long as we have received at least as many total messages as we expect + EXPECT_TRUE(ret) << "failed to capture expected messages in time"; + EXPECT_THAT(recorded_messages, SizeIs(Ge(expected_messages))); + + auto recorded_topics = mock_writer.get_topics(); EXPECT_THAT(recorded_topics, SizeIs(2)); - EXPECT_TRUE(recorded_topics.find(v1) != recorded_topics.end()); EXPECT_TRUE(recorded_topics.find(v2) != recorded_topics.end()); }