Skip to content

Commit

Permalink
Fix incorrect boundary check for playback_duration and `play_until_…
Browse files Browse the repository at this point in the history
…timestamp` (ros2#1032)

* Add initialization for `metadata  starting time` in MockSequentialReader

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Fix incorrect boundary check for `playback_duration` and `play_until`

- Properly handle cases when playback_duration = 0 or
play_until_timestamp_ = 0.
- Rewrite `play_for_none_are_played_due_to_duration` test to use
`on_playback` callbacks to decrease runtime from 10 seconds down to
100 milliseconds.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Rewrite false positive `play_until_none_are_played_due_to_timestamp`

Using on_play_callbacks instead of expectation for 0 messages on
subscription.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Address flakiness in `playback_duration_overrides_play_until` test

Change wrong expectations for number of messages to arrive.
It was expecting to arrive 1 but actually should expect 2.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Address code style issue in if-else statement

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov committed Jun 28, 2022
1 parent 3a6a68a commit 832dea8
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 78 deletions.
4 changes: 0 additions & 4 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'By default, if loaned message can be used, messages are published as loaned '
'message. It can help to reduce the number of data copies, so there is a greater '
'benefit for sending big data.')
parser.add_argument(
'-f', '--duration', type=float, default=None,
help='Play for SEC seconds. Default is None, meaning that playback will continue '
'until the end of the bag. Valid range > 0.0')

def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int:
nano_scale = 1000 * 1000 * 1000
Expand Down
17 changes: 8 additions & 9 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,22 +253,21 @@ class Player : public rclcpp::Node
void prepare_publishers();
bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message);
static callback_handle_t get_new_on_play_msg_callback_handle();
static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;
std::atomic_bool cancel_wait_for_next_message_{false};

std::mutex reader_mutex_;
std::unique_ptr<rosbag2_cpp::Reader> reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_);

void add_key_callback(
KeyboardHandler::KeyCode key,
const std::function<void()> & cb,
const std::string & op_name);
void add_keyboard_callbacks();

void create_control_services();

void configure_play_until_timestamp();
bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const;

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;
std::atomic_bool cancel_wait_for_next_message_{false};

std::mutex reader_mutex_;
std::unique_ptr<rosbag2_cpp::Reader> reader_ RCPPUTILS_TSA_GUARDED_BY(reader_mutex_);

rosbag2_storage::StorageOptions storage_options_;
rosbag2_transport::PlayOptions play_options_;
Expand Down
45 changes: 26 additions & 19 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,16 +363,12 @@ bool Player::play_next()
rosbag2_storage::SerializedBagMessageSharedPtr message_ptr = peek_next_message_from_queue();

bool next_message_published = false;
while (message_ptr != nullptr && !next_message_published) {
if (play_until_timestamp_ >= starting_time_ &&
message_ptr->time_stamp > play_until_timestamp_)
{
break;
}
{
next_message_published = publish_message(message_ptr);
clock_->jump(message_ptr->time_stamp);
}
while (rclcpp::ok() && !next_message_published &&
message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->time_stamp))
{
next_message_published = publish_message(message_ptr);
clock_->jump(message_ptr->time_stamp);

message_queue_.pop();
message_ptr = peek_next_message_from_queue();
}
Expand Down Expand Up @@ -529,12 +525,9 @@ void Player::play_messages_from_queue()
is_ready_to_play_from_queue_ = true;
ready_to_play_from_queue_cv_.notify_all();
}
while (message_ptr != nullptr && rclcpp::ok()) {
if (play_until_timestamp_ >= starting_time_ &&
message_ptr->time_stamp > play_until_timestamp_)
{
break;
}
while (rclcpp::ok() &&
message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->time_stamp))
{
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) {
Expand Down Expand Up @@ -833,12 +826,26 @@ void Player::configure_play_until_timestamp()
if (play_options_.playback_duration >= rclcpp::Duration(0, 0) ||
play_options_.playback_until_timestamp >= rcutils_time_point_value_t{0})
{
play_until_timestamp_ = std::max(
starting_time_ + play_options_.playback_duration.nanoseconds(),
play_options_.playback_until_timestamp);
// Handling special case when playback_duration = 0
auto play_until_from_duration = (play_options_.playback_duration == rclcpp::Duration(0, 0)) ?
0 : starting_time_ + play_options_.playback_duration.nanoseconds();

play_until_timestamp_ =
std::max(play_until_from_duration, play_options_.playback_until_timestamp);
} else {
play_until_timestamp_ = -1;
}
}

inline bool Player::shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const
{
if ((play_until_timestamp_ > -1 && msg_timestamp > play_until_timestamp_) ||
play_until_timestamp_ == 0)
{
return true;
} else {
return false;
}
}

} // namespace rosbag2_transport
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages,
std::vector<rosbag2_storage::TopicMetadata> topics)
{
metadata_.message_count = messages.size();
if (!messages.empty()) {
const auto message_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(messages[0]->time_stamp));
metadata_.starting_time = message_timestamp;
}
messages_ = std::move(messages);
topics_ = std::move(topics);
}
Expand Down
38 changes: 18 additions & 20 deletions rosbag2_transport/test/rosbag2_transport/test_play_for.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,29 +174,27 @@ TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration)
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

// Expect to receive messages only from play_next in second round
sub_->add_subscription<test_msgs::msg::BasicTypes>(kTopic1_, messages.size());
play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(49));
play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(0));

std::shared_ptr<MockPlayer> player_ = std::make_shared<MockPlayer>(
std::shared_ptr<MockPlayer> player = std::make_shared<MockPlayer>(
std::move(reader), storage_options_, play_options_);

// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s));
using SerializedBagMessage = rosbag2_storage::SerializedBagMessage;
testing::MockFunction<void(std::shared_ptr<SerializedBagMessage>)> mock_pre_callback;
EXPECT_CALL(mock_pre_callback, Call(_)).Times(Exactly(0));

auto await_received_messages = sub_->spin_subscriptions();
ASSERT_TRUE(player_->play());
testing::MockFunction<void(std::shared_ptr<SerializedBagMessage>)> mock_post_callback;
EXPECT_CALL(mock_post_callback, Call(_)).Times(Exactly(0));

// Playing one more time with play_next to save time and count messages
player_->pause();
auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();});
auto pre_callback_handle =
player->add_on_play_message_pre_callback(mock_pre_callback.AsStdFunction());
ASSERT_NE(pre_callback_handle, Player::invalid_callback_handle);

EXPECT_FALSE(player_->play_next());
player_->resume();
player_future.get();
await_received_messages.get();
auto replayed_topic1 = sub_->get_received_messages<test_msgs::msg::BasicTypes>(kTopic1_);
EXPECT_THAT(replayed_topic1, SizeIs(0));
auto post_callback_handle =
player->add_on_play_message_post_callback(mock_post_callback.AsStdFunction());
ASSERT_NE(post_callback_handle, Player::invalid_callback_handle);

ASSERT_TRUE(player->play());
}

TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration)
Expand All @@ -219,9 +217,9 @@ TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration)
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

// Expect to receive 1 message from play() and 2 messages from play_next in second round
sub_->add_subscription<test_msgs::msg::BasicTypes>(kTopic1_, messages.size() + 1);
play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(49));
// Expect to receive 1 message from play() and 1 messages from play_next in second round
sub_->add_subscription<test_msgs::msg::BasicTypes>(kTopic1_, 2);
play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(39));

std::shared_ptr<MockPlayer> player_ = std::make_shared<MockPlayer>(
std::move(reader), storage_options_, play_options_);
Expand Down
47 changes: 21 additions & 26 deletions rosbag2_transport/test/rosbag2_transport/test_play_until.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,31 +184,27 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_none_are_played_due_to_timestamp)
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

// Expect to receive no messages.
sub_->add_subscription<test_msgs::msg::BasicTypes>(kTopic1_, 0u);
play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1;

std::shared_ptr<MockPlayer> player_ = std::make_shared<MockPlayer>(
std::shared_ptr<MockPlayer> player = std::make_shared<MockPlayer>(
std::move(reader), storage_options_, play_options_);

// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s));
using SerializedBagMessage = rosbag2_storage::SerializedBagMessage;
testing::MockFunction<void(std::shared_ptr<SerializedBagMessage>)> mock_pre_callback;
EXPECT_CALL(mock_pre_callback, Call(_)).Times(Exactly(0));

auto await_received_messages = sub_->spin_subscriptions();
ASSERT_TRUE(player_->play());
testing::MockFunction<void(std::shared_ptr<SerializedBagMessage>)> mock_post_callback;
EXPECT_CALL(mock_post_callback, Call(_)).Times(Exactly(0));

// Playing one more time with play_next() to save time and count messages. Note
// that none of the following play() and play_next() functions will make any of
// the messages to be played.
player_->pause();
auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();});
auto pre_callback_handle =
player->add_on_play_message_pre_callback(mock_pre_callback.AsStdFunction());
ASSERT_NE(pre_callback_handle, Player::invalid_callback_handle);

EXPECT_FALSE(player_->play_next());
player_->resume();
player_future.get();
await_received_messages.get();
auto replayed_topic1 = sub_->get_received_messages<test_msgs::msg::BasicTypes>(kTopic1_);
EXPECT_THAT(replayed_topic1, SizeIs(0));
auto post_callback_handle =
player->add_on_play_message_post_callback(mock_post_callback.AsStdFunction());
ASSERT_NE(post_callback_handle, Player::invalid_callback_handle);

ASSERT_TRUE(player->play());
}

TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration)
Expand Down Expand Up @@ -341,17 +337,16 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_overrides_playback_duration)
TEST_F(RosBag2PlayUntilTestFixture, playback_duration_overrides_play_until)
{
InitPlayerWithPlaybackUntilAndPlay(
50 /* playback_until_timestamp_millis */, 1 /* num messages topic 1 */,
1 /* num messages topic 2 */, 150 /* playback_duration_millis */);
50 /* playback_until_timestamp_millis */, 2 /* num messages topic 1 */,
2 /* num messages topic 2 */, 150 /* playback_duration_millis */);

auto replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
kTopic1_);
EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(1u)));
auto replayed_test_primitives =
sub_->get_received_messages<test_msgs::msg::BasicTypes>(kTopic1_);
EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(2u)));
EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives);

auto replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>(
kTopic2_);
EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(1u)));
auto replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>(kTopic2_);
EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(2u)));
EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays);
EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays);
}
Expand Down

0 comments on commit 832dea8

Please sign in to comment.