Skip to content

Commit

Permalink
Expose pause/resume related services on the Player (ros2#729)
Browse files Browse the repository at this point in the history
* Pause/Resume services on Player

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
emersonknapp committed Apr 23, 2021
1 parent beeea93 commit cdbee9b
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 3 deletions.
5 changes: 5 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until)
const auto steady_until = impl_->ros_to_steady(until);
impl_->cv.wait_until(lock, steady_until);
}
if (impl_->paused) {
// Don't allow publishing any messages while paused
// even if the time was technically reached by the time of wakeup
return false;
}
}
return now() >= until;
}
Expand Down
14 changes: 13 additions & 1 deletion rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ find_package(rcutils REQUIRED)
find_package(rmw REQUIRED)
find_package(rosbag2_compression REQUIRED)
find_package(rosbag2_cpp REQUIRED)
find_package(rosbag2_interfaces REQUIRED)
find_package(rosbag2_storage REQUIRED)
find_package(rmw_implementation_cmake REQUIRED)
find_package(shared_queues_vendor REQUIRED)
Expand All @@ -48,6 +49,7 @@ ament_target_dependencies(${PROJECT_NAME}
rmw
rosbag2_compression
rosbag2_cpp
rosbag2_interfaces
rosbag2_storage
shared_queues_vendor
yaml_cpp_vendor
Expand All @@ -72,7 +74,12 @@ install(
ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME})
ament_export_targets(export_${PROJECT_NAME})
ament_export_dependencies(rosbag2_cpp rosbag2_compression shared_queues_vendor yaml_cpp_vendor)
ament_export_dependencies(
rosbag2_cpp
rosbag2_compression
rosbag2_interfaces
shared_queues_vendor
yaml_cpp_vendor)

function(create_tests_for_rmw_implementation)
# disable the following tests for connext
Expand Down Expand Up @@ -107,6 +114,11 @@ function(create_tests_for_rmw_implementation)
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_play_services
test/rosbag2_transport/test_play_services.cpp
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_play_topic_remap
test/rosbag2_transport/test_play_topic_remap.cpp
LINK_LIBS rosbag2_transport
Expand Down
27 changes: 26 additions & 1 deletion rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
#include "rclcpp/qos.hpp"

#include "rosbag2_cpp/clocks/player_clock.hpp"

#include "rosbag2_interfaces/srv/pause.hpp"
#include "rosbag2_interfaces/srv/resume.hpp"
#include "rosbag2_interfaces/srv/toggle_paused.hpp"
#include "rosbag2_interfaces/srv/is_paused.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_options.hpp"

Expand Down Expand Up @@ -71,6 +74,23 @@ class Player : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
rosbag2_cpp::Reader * release_reader();

// Playback control interface
/// Pause the flow of time for playback.
ROSBAG2_TRANSPORT_PUBLIC
void pause();

/// Start the flow of time for playback.
ROSBAG2_TRANSPORT_PUBLIC
void resume();

/// Pause if time running, resume if paused.
ROSBAG2_TRANSPORT_PUBLIC
void toggle_paused();

/// Return whether the playback is currently paused.
ROSBAG2_TRANSPORT_PUBLIC
bool is_paused() const;

private:
void load_storage_content();
bool is_storage_completely_loaded() const;
Expand All @@ -93,6 +113,11 @@ class Player : public rclcpp::Node
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr clock_publisher_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;

rclcpp::Service<rosbag2_interfaces::srv::Pause>::SharedPtr srv_pause_;
rclcpp::Service<rosbag2_interfaces::srv::Resume>::SharedPtr srv_resume_;
rclcpp::Service<rosbag2_interfaces::srv::TogglePaused>::SharedPtr srv_toggle_paused_;
rclcpp::Service<rosbag2_interfaces::srv::IsPaused>::SharedPtr srv_is_paused_;
};

} // namespace rosbag2_transport
Expand Down
1 change: 1 addition & 0 deletions rosbag2_transport/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<depend>rclcpp</depend>
<depend>rosbag2_compression</depend>
<depend>rosbag2_cpp</depend>
<depend>rosbag2_interfaces</depend>
<depend>rosbag2_storage</depend>
<depend>rmw</depend>
<depend>shared_queues_vendor</depend>
Expand Down
73 changes: 72 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,44 @@ Player::Player(
reader_(std::move(reader)),
storage_options_(storage_options),
play_options_(play_options)
{}
{
srv_pause_ = create_service<rosbag2_interfaces::srv::Pause>(
"~/pause",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::Pause::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::Pause::Response>/* response */)
{
pause();
});
srv_resume_ = create_service<rosbag2_interfaces::srv::Resume>(
"~/resume",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::Resume::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::Resume::Response>/* response */)
{
resume();
});
srv_toggle_paused_ = create_service<rosbag2_interfaces::srv::TogglePaused>(
"~/toggle_paused",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::TogglePaused::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::TogglePaused::Response>/* response */)
{
toggle_paused();
});
srv_is_paused_ = create_service<rosbag2_interfaces::srv::IsPaused>(
"~/is_paused",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::IsPaused::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces::srv::IsPaused::Response> response)
{
response->paused = is_paused();
});
}

Player::~Player()
{
Expand Down Expand Up @@ -157,6 +194,39 @@ void Player::play()
}
}

void Player::pause()
{
if (clock_) {
clock_->pause();
}
}

void Player::resume()
{
if (clock_) {
clock_->resume();
}
}

void Player::toggle_paused()
{
if (clock_) {
if (clock_->is_paused()) {
clock_->resume();
} else {
clock_->pause();
}
}
}

bool Player::is_paused() const
{
if (clock_) {
return clock_->is_paused();
}
return true;
}

void Player::wait_for_filled_queue() const
{
while (
Expand Down Expand Up @@ -263,6 +333,7 @@ void Player::prepare_publishers()

void Player::prepare_clock(rcutils_time_point_value_t starting_time)
{
// TODO(emersonknapp) move clock setup into the constructor
double rate = play_options_.rate > 0.0 ? play_options_.rate : 1.0;
clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(starting_time, rate);

Expand Down
Loading

0 comments on commit cdbee9b

Please sign in to comment.