From cdbee9bc3dfe23bada4b5a7276b595dc17a8fc3a Mon Sep 17 00:00:00 2001 From: Emerson Knapp <537409+emersonknapp@users.noreply.github.com> Date: Thu, 22 Apr 2021 17:27:15 -0700 Subject: [PATCH] Expose pause/resume related services on the Player (#729) * Pause/Resume services on Player Signed-off-by: Emerson Knapp --- .../clocks/time_controller_clock.cpp | 5 + rosbag2_transport/CMakeLists.txt | 14 +- .../include/rosbag2_transport/player.hpp | 27 +- rosbag2_transport/package.xml | 1 + .../src/rosbag2_transport/player.cpp | 73 +++++- .../rosbag2_transport/test_play_services.cpp | 236 ++++++++++++++++++ 6 files changed, 353 insertions(+), 3 deletions(-) create mode 100644 rosbag2_transport/test/rosbag2_transport/test_play_services.cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp index 062bc78d67..8e2dc6d4f5 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp @@ -150,6 +150,11 @@ bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until) const auto steady_until = impl_->ros_to_steady(until); impl_->cv.wait_until(lock, steady_until); } + if (impl_->paused) { + // Don't allow publishing any messages while paused + // even if the time was technically reached by the time of wakeup + return false; + } } return now() >= until; } diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index d6dc99537e..59ade9645d 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -28,6 +28,7 @@ find_package(rcutils REQUIRED) find_package(rmw REQUIRED) find_package(rosbag2_compression REQUIRED) find_package(rosbag2_cpp REQUIRED) +find_package(rosbag2_interfaces REQUIRED) find_package(rosbag2_storage REQUIRED) find_package(rmw_implementation_cmake REQUIRED) find_package(shared_queues_vendor REQUIRED) @@ -48,6 +49,7 @@ ament_target_dependencies(${PROJECT_NAME} rmw rosbag2_compression rosbag2_cpp + rosbag2_interfaces rosbag2_storage shared_queues_vendor yaml_cpp_vendor @@ -72,7 +74,12 @@ install( ament_export_include_directories(include) ament_export_libraries(${PROJECT_NAME}) ament_export_targets(export_${PROJECT_NAME}) -ament_export_dependencies(rosbag2_cpp rosbag2_compression shared_queues_vendor yaml_cpp_vendor) +ament_export_dependencies( + rosbag2_cpp + rosbag2_compression + rosbag2_interfaces + shared_queues_vendor + yaml_cpp_vendor) function(create_tests_for_rmw_implementation) # disable the following tests for connext @@ -107,6 +114,11 @@ function(create_tests_for_rmw_implementation) LINK_LIBS rosbag2_transport AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_play_services + test/rosbag2_transport/test_play_services.cpp + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_play_topic_remap test/rosbag2_transport/test_play_topic_remap.cpp LINK_LIBS rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 8241d64e46..552b780a8b 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -29,7 +29,10 @@ #include "rclcpp/qos.hpp" #include "rosbag2_cpp/clocks/player_clock.hpp" - +#include "rosbag2_interfaces/srv/pause.hpp" +#include "rosbag2_interfaces/srv/resume.hpp" +#include "rosbag2_interfaces/srv/toggle_paused.hpp" +#include "rosbag2_interfaces/srv/is_paused.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_storage/storage_options.hpp" @@ -71,6 +74,23 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC rosbag2_cpp::Reader * release_reader(); + // Playback control interface + /// Pause the flow of time for playback. + ROSBAG2_TRANSPORT_PUBLIC + void pause(); + + /// Start the flow of time for playback. + ROSBAG2_TRANSPORT_PUBLIC + void resume(); + + /// Pause if time running, resume if paused. + ROSBAG2_TRANSPORT_PUBLIC + void toggle_paused(); + + /// Return whether the playback is currently paused. + ROSBAG2_TRANSPORT_PUBLIC + bool is_paused() const; + private: void load_storage_content(); bool is_storage_completely_loaded() const; @@ -93,6 +113,11 @@ class Player : public rclcpp::Node std::unique_ptr clock_; rclcpp::Publisher::SharedPtr clock_publisher_; std::shared_ptr clock_publish_timer_; + + rclcpp::Service::SharedPtr srv_pause_; + rclcpp::Service::SharedPtr srv_resume_; + rclcpp::Service::SharedPtr srv_toggle_paused_; + rclcpp::Service::SharedPtr srv_is_paused_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/package.xml b/rosbag2_transport/package.xml index 2b3594ab01..57c5c20cc2 100644 --- a/rosbag2_transport/package.xml +++ b/rosbag2_transport/package.xml @@ -15,6 +15,7 @@ rclcpp rosbag2_compression rosbag2_cpp + rosbag2_interfaces rosbag2_storage rmw shared_queues_vendor diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 9b05d9c161..102bbb1b59 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -95,7 +95,44 @@ Player::Player( reader_(std::move(reader)), storage_options_(storage_options), play_options_(play_options) -{} +{ + srv_pause_ = create_service( + "~/pause", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr/* response */) + { + pause(); + }); + srv_resume_ = create_service( + "~/resume", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr/* response */) + { + resume(); + }); + srv_toggle_paused_ = create_service( + "~/toggle_paused", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr/* response */) + { + toggle_paused(); + }); + srv_is_paused_ = create_service( + "~/is_paused", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr response) + { + response->paused = is_paused(); + }); +} Player::~Player() { @@ -157,6 +194,39 @@ void Player::play() } } +void Player::pause() +{ + if (clock_) { + clock_->pause(); + } +} + +void Player::resume() +{ + if (clock_) { + clock_->resume(); + } +} + +void Player::toggle_paused() +{ + if (clock_) { + if (clock_->is_paused()) { + clock_->resume(); + } else { + clock_->pause(); + } + } +} + +bool Player::is_paused() const +{ + if (clock_) { + return clock_->is_paused(); + } + return true; +} + void Player::wait_for_filled_queue() const { while ( @@ -263,6 +333,7 @@ void Player::prepare_publishers() void Player::prepare_clock(rcutils_time_point_value_t starting_time) { + // TODO(emersonknapp) move clock setup into the constructor double rate = play_options_.rate > 0.0 ? play_options_.rate : 1.0; clock_ = std::make_unique(starting_time, rate); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp new file mode 100644 index 0000000000..f9dadd40b3 --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp @@ -0,0 +1,236 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" + +#include "rosbag2_interfaces/srv/is_paused.hpp" +#include "rosbag2_interfaces/srv/pause.hpp" +#include "rosbag2_interfaces/srv/resume.hpp" +#include "rosbag2_interfaces/srv/toggle_paused.hpp" +#include "rosbag2_transport/player.hpp" +#include "test_msgs/msg/basic_types.hpp" +#include "test_msgs/message_fixtures.hpp" + +#include "rosbag2_play_test_fixture.hpp" + +using namespace ::testing; // NOLINT + +class PlaySrvsTest : public RosBag2PlayTestFixture +{ +public: + using Pause = rosbag2_interfaces::srv::Pause; + using Resume = rosbag2_interfaces::srv::Resume; + using TogglePaused = rosbag2_interfaces::srv::TogglePaused; + using IsPaused = rosbag2_interfaces::srv::IsPaused; + + PlaySrvsTest() + : RosBag2PlayTestFixture(), + client_node_(std::make_shared("test_play_client")) + {} + + ~PlaySrvsTest() override + { + exec_.cancel(); + rclcpp::shutdown(); + spin_thread_.join(); + play_thread_.join(); + } + + /// Use SetUp instead of ctor because we want to ASSERT some preconditions for the tests + void SetUp() override + { + setup_player(); + + const std::string ns = "/" + player_name_; + cli_pause_ = client_node_->create_client(ns + "/pause"); + cli_resume_ = client_node_->create_client(ns + "/resume"); + cli_toggle_paused_ = client_node_->create_client(ns + "/toggle_paused"); + cli_is_paused_ = client_node_->create_client(ns + "/is_paused"); + topic_sub_ = client_node_->create_subscription( + test_topic_, 10, + std::bind(&PlaySrvsTest::topic_callback, this, std::placeholders::_1)); + + exec_.add_node(player_); + exec_.add_node(client_node_); + spin_thread_ = std::thread( + [this]() { + exec_.spin(); + }); + + // Make sure all expected services are present before starting any test + ASSERT_TRUE(cli_resume_->wait_for_service(service_wait_timeout_)); + ASSERT_TRUE(cli_pause_->wait_for_service(service_wait_timeout_)); + ASSERT_TRUE(cli_is_paused_->wait_for_service(service_wait_timeout_)); + ASSERT_TRUE(cli_toggle_paused_->wait_for_service(service_wait_timeout_)); + + // Wait for paused == false to know that playback has begun + bool is_playing = false; + IsPaused::Response::SharedPtr is_paused_result; + for (size_t retry = 0; retry < 3 && rclcpp::ok(); retry++) { + is_paused_result = successful_call(cli_is_paused_); + if (is_paused_result->paused == false) { + is_playing = true; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + ASSERT_TRUE(is_playing); + } + + /// Call a service client, and expect it to successfully return within a reasonable timeout + template + typename T::Response::SharedPtr successful_call(typename rclcpp::Client::SharedPtr cli) + { + auto request = std::make_shared(); + auto future = cli->async_send_request(request); + EXPECT_EQ(future.wait_for(service_call_timeout_), std::future_status::ready); + auto result = future.get(); + EXPECT_TRUE(result); + return result; + } + + bool is_paused() + { + auto result = successful_call(cli_is_paused_); + return result->paused; + } + + /// EXPECT to receive (or not receive) any messages for a period + void expect_message(bool messages_should_arrive) + { + // Not too worried about the exact timing in this test, give a lot of leeway + const auto condition_clear_time = std::chrono::milliseconds(ms_between_msgs_ * 10); + std::unique_lock lock(got_msg_mutex_); + if (!messages_should_arrive) { + // Allow for a single unprocessed message to be handled before expecting nothing to arrive + got_msg_.wait_for(lock, condition_clear_time); + EXPECT_EQ(got_msg_.wait_for(lock, condition_clear_time), std::cv_status::timeout); + } else { + EXPECT_EQ(got_msg_.wait_for(lock, condition_clear_time), std::cv_status::no_timeout); + } + } + +private: + /// Create a player with some messages to play back, and start it on loop + void setup_player() + { + rosbag2_storage::StorageOptions storage_options; + rosbag2_transport::PlayOptions play_options; + play_options.loop = true; + + const size_t num_msgs_to_publish = 200; + auto message = get_messages_basic_types()[0]; + message->int32_value = 42; + + auto topic_types = std::vector{ + {test_topic_, "test_msgs/BasicTypes", "", ""}, + }; + std::vector> messages; + for (size_t i = 0; i < num_msgs_to_publish; i++) { + messages.push_back(serialize_test_message(test_topic_, i * ms_between_msgs_, message)); + } + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + player_ = std::make_shared( + std::move(reader), storage_options, play_options, player_name_); + play_thread_ = std::thread( + [this]() { + player_->play(); + }); + } + + void topic_callback(const test_msgs::msg::BasicTypes::SharedPtr /* msg */) + { + got_msg_.notify_all(); + } + +public: + // Basic configuration + const std::string player_name_ = "rosbag2_player_for_test_srvs"; + const std::chrono::seconds service_wait_timeout_ {2}; + const std::chrono::seconds service_call_timeout_ {1}; + const std::string test_topic_ = "/player_srvs_test_topic"; + // publishing at 50hz + const size_t ms_between_msgs_ = 20; + + // Orchestration + std::thread spin_thread_; + std::thread play_thread_; + rclcpp::executors::SingleThreadedExecutor exec_; + std::shared_ptr player_; + + // Service clients + rclcpp::Node::SharedPtr client_node_; + rclcpp::Client::SharedPtr cli_pause_; + rclcpp::Client::SharedPtr cli_resume_; + rclcpp::Client::SharedPtr cli_toggle_paused_; + rclcpp::Client::SharedPtr cli_is_paused_; + + // Mechanism to check on playback status + rclcpp::Subscription::SharedPtr topic_sub_; + std::mutex got_msg_mutex_; + std::condition_variable got_msg_; +}; + +TEST_F(PlaySrvsTest, pause_resume) +{ + // No matter how many times we call pause, it's paused + for (size_t i = 0; i < 3; i++) { + successful_call(cli_pause_); + ASSERT_TRUE(is_paused()); + } + expect_message(false); + + // No matter how many times we call resume, it's resumed + for (size_t i = 0; i < 3; i++) { + successful_call(cli_resume_); + ASSERT_FALSE(is_paused()); + } + expect_message(true); + + // Let's do pause again to make sure back-and-forth works + for (size_t i = 0; i < 3; i++) { + successful_call(cli_pause_); + ASSERT_TRUE(is_paused()); + } + expect_message(false); +} + +TEST_F(PlaySrvsTest, toggle_paused) +{ + successful_call(cli_toggle_paused_); + ASSERT_TRUE(is_paused()); + expect_message(false); + + successful_call(cli_toggle_paused_); + ASSERT_FALSE(is_paused()); + expect_message(true); + + successful_call(cli_toggle_paused_); + ASSERT_TRUE(is_paused()); + expect_message(false); + + successful_call(cli_toggle_paused_); + ASSERT_FALSE(is_paused()); + expect_message(true); +}