Skip to content

Commit

Permalink
ros2GH-8 Refactor future usage in player
Browse files Browse the repository at this point in the history
Make the future a class member of player to avoid having to hand it
into several functions which is difficult with a move-only type.
  • Loading branch information
anhosi committed Sep 18, 2018
1 parent 1555e04 commit b4845a3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
27 changes: 15 additions & 12 deletions rosbag2/src/rosbag2/player.cpp
Expand Up @@ -41,29 +41,33 @@ Player::Player(std::shared_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInte
: storage_(storage), node_(std::make_shared<Rosbag2Node>("rosbag2_node"))
{}

bool is_pending(const std::future<void> & future)
bool Player::is_storage_completely_loaded() const
{
return !(future.valid() && future.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
if (storage_loading_future_.valid() &&
storage_loading_future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
storage_loading_future_.get();
}
return !storage_loading_future_.valid();
}

void Player::play(const Rosbag2PlayOptions & options)
{
prepare_publishers();

auto storage_loading_future = std::async(std::launch::async,
storage_loading_future_ = std::async(std::launch::async,
[this, options]() {load_storage_content(options);});

wait_for_filled_queue(options, storage_loading_future);
wait_for_filled_queue(options);

play_messages_from_queue(std::move(storage_loading_future));
play_messages_from_queue();
}

void Player::wait_for_filled_queue(
const Rosbag2PlayOptions & options, const std::future<void> & storage_loading_future) const
void Player::wait_for_filled_queue(const Rosbag2PlayOptions & options) const
{
while (
message_queue_.size_approx() < options.queue_buffer_length_ &&
is_pending(storage_loading_future))
message_queue_.size_approx() < options.read_ahead_queue_size &&
!is_storage_completely_loaded())
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
Expand Down Expand Up @@ -109,18 +113,17 @@ void Player::enqueue_up_to_boundary(const TimePoint & time_first_message, uint64
}
}

void Player::play_messages_from_queue(std::future<void> storage_loading_future)
void Player::play_messages_from_queue()
{
auto start_time = std::chrono::high_resolution_clock::now();

while (message_queue_.size_approx() != 0 || is_pending(storage_loading_future)) {
while (message_queue_.size_approx() != 0 || !is_storage_completely_loaded()) {
ReplayableMessage message;
if (message_queue_.try_dequeue(message)) {
std::this_thread::sleep_until(start_time + message.time_since_start);
publishers_[message.message->topic_name]->publish(message.message->serialized_data);
}
}
storage_loading_future.get();
}

void Player::prepare_publishers()
Expand Down
7 changes: 4 additions & 3 deletions rosbag2/src/rosbag2/player.hpp
Expand Up @@ -44,16 +44,17 @@ class Player

private:
void load_storage_content(const Rosbag2PlayOptions & options);
bool is_storage_completely_loaded() const;
void enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary);
void wait_for_filled_queue(
const Rosbag2PlayOptions & options, const std::future<void> & storage_loading_future) const;
void play_messages_from_queue(std::future<void> storage_loading_future);
void wait_for_filled_queue(const Rosbag2PlayOptions & options) const;
void play_messages_from_queue();
void prepare_publishers();

static constexpr float read_ahead_lower_bound_percentage_ = 0.9;

std::shared_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInterface> storage_;
moodycamel::ReaderWriterQueue<ReplayableMessage> message_queue_;
mutable std::future<void> storage_loading_future_;
std::shared_ptr<Rosbag2Node> node_;
std::map<std::string, std::shared_ptr<GenericPublisher>> publishers_;
};
Expand Down

0 comments on commit b4845a3

Please sign in to comment.