From b4845a3f50478fd257e04a9de9511e7e39bbbb51 Mon Sep 17 00:00:00 2001 From: Andreas Holzner Date: Tue, 18 Sep 2018 14:59:10 +0200 Subject: [PATCH] GH-8 Refactor future usage in player Make the future a class member of player to avoid having to hand it into several functions which is difficult with a move-only type. --- rosbag2/src/rosbag2/player.cpp | 27 +++++++++++++++------------ rosbag2/src/rosbag2/player.hpp | 7 ++++--- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/rosbag2/src/rosbag2/player.cpp b/rosbag2/src/rosbag2/player.cpp index 182de23856..4c73ea1786 100644 --- a/rosbag2/src/rosbag2/player.cpp +++ b/rosbag2/src/rosbag2/player.cpp @@ -41,29 +41,33 @@ Player::Player(std::shared_ptr("rosbag2_node")) {} -bool is_pending(const std::future & future) +bool Player::is_storage_completely_loaded() const { - return !(future.valid() && future.wait_for(std::chrono::seconds(0)) == std::future_status::ready); + if (storage_loading_future_.valid() && + storage_loading_future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready) + { + storage_loading_future_.get(); + } + return !storage_loading_future_.valid(); } void Player::play(const Rosbag2PlayOptions & options) { prepare_publishers(); - auto storage_loading_future = std::async(std::launch::async, + storage_loading_future_ = std::async(std::launch::async, [this, options]() {load_storage_content(options);}); - wait_for_filled_queue(options, storage_loading_future); + wait_for_filled_queue(options); - play_messages_from_queue(std::move(storage_loading_future)); + play_messages_from_queue(); } -void Player::wait_for_filled_queue( - const Rosbag2PlayOptions & options, const std::future & storage_loading_future) const +void Player::wait_for_filled_queue(const Rosbag2PlayOptions & options) const { while ( - message_queue_.size_approx() < options.queue_buffer_length_ && - is_pending(storage_loading_future)) + message_queue_.size_approx() < options.read_ahead_queue_size && + !is_storage_completely_loaded()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } @@ -109,18 +113,17 @@ void Player::enqueue_up_to_boundary(const TimePoint & time_first_message, uint64 } } -void Player::play_messages_from_queue(std::future storage_loading_future) +void Player::play_messages_from_queue() { auto start_time = std::chrono::high_resolution_clock::now(); - while (message_queue_.size_approx() != 0 || is_pending(storage_loading_future)) { + while (message_queue_.size_approx() != 0 || !is_storage_completely_loaded()) { ReplayableMessage message; if (message_queue_.try_dequeue(message)) { std::this_thread::sleep_until(start_time + message.time_since_start); publishers_[message.message->topic_name]->publish(message.message->serialized_data); } } - storage_loading_future.get(); } void Player::prepare_publishers() diff --git a/rosbag2/src/rosbag2/player.hpp b/rosbag2/src/rosbag2/player.hpp index 5e794d92dc..6ed27096f5 100644 --- a/rosbag2/src/rosbag2/player.hpp +++ b/rosbag2/src/rosbag2/player.hpp @@ -44,16 +44,17 @@ class Player private: void load_storage_content(const Rosbag2PlayOptions & options); + bool is_storage_completely_loaded() const; void enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary); - void wait_for_filled_queue( - const Rosbag2PlayOptions & options, const std::future & storage_loading_future) const; - void play_messages_from_queue(std::future storage_loading_future); + void wait_for_filled_queue(const Rosbag2PlayOptions & options) const; + void play_messages_from_queue(); void prepare_publishers(); static constexpr float read_ahead_lower_bound_percentage_ = 0.9; std::shared_ptr storage_; moodycamel::ReaderWriterQueue message_queue_; + mutable std::future storage_loading_future_; std::shared_ptr node_; std::map> publishers_; };