Skip to content

Commit

Permalink
turn player into a node (ros2#723)
Browse files Browse the repository at this point in the history
Signed-off-by: Karsten Knese <Karsten1987@users.noreply.github.com>
  • Loading branch information
Karsten1987 committed Apr 9, 2021
1 parent 6325857 commit f3f4f10
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 62 deletions.
79 changes: 52 additions & 27 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,37 @@ namespace rosbag2_transport
namespace impl
{

const std::chrono::milliseconds
Player::queue_read_wait_period_ = std::chrono::milliseconds(100);
Player::Player(const std::string & node_name, const rclcpp::NodeOptions & node_options)
: rclcpp::Node(node_name, node_options)
{
// TODO(karsten1987): Use this constructor later with parameter parsing.
// The reader, storage_options as well as play_options can be loaded via parameter.
// That way, the player can be used as a simple component in a component manager.
throw rclcpp::exceptions::UnimplementedError();
}

Player::Player(
std::shared_ptr<rosbag2_cpp::Reader> reader, std::shared_ptr<rclcpp::Node> transport_node)
: reader_(std::move(reader)), transport_node_(transport_node)
std::shared_ptr<rosbag2_cpp::Reader> reader,
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name,
const rclcpp::NodeOptions & node_options)
: rclcpp::Node(
node_name,
rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)),
reader_(reader),
storage_options_(storage_options),
play_options_(play_options)
{}

Player::~Player()
{
// reader_->reset();
}

const std::chrono::milliseconds
Player::queue_read_wait_period_ = std::chrono::milliseconds(100);

bool Player::is_storage_completely_loaded() const
{
if (storage_loading_future_.valid() &&
Expand All @@ -94,46 +117,48 @@ bool Player::is_storage_completely_loaded() const
return !storage_loading_future_.valid();
}

void Player::play(const PlayOptions & options)
void Player::play()
{
// TODO(karsten1987): Expose `Reader::reset` in rosbag2_cpp
reader_->open(storage_options_, {"", rmw_get_serialization_format()});
if (reader_->has_next()) {
// Reader does not have "peek", so we must "pop" the first message to see its timestamp
auto message = reader_->read_next();
prepare_clock(options, message->time_stamp);
prepare_clock(message->time_stamp);
// Make sure that first message gets played by putting it into the play queue
message_queue_.enqueue(message);
} else {
// The bag contains no messages - there is nothing to play
return;
}

topic_qos_profile_overrides_ = options.topic_qos_profile_overrides;
prepare_publishers(options);
topic_qos_profile_overrides_ = play_options_.topic_qos_profile_overrides;
prepare_publishers();

storage_loading_future_ = std::async(
std::launch::async,
[this, options]() {load_storage_content(options);});
[this]() {load_storage_content();});

wait_for_filled_queue(options);
wait_for_filled_queue();

play_messages_from_queue();
}

void Player::wait_for_filled_queue(const PlayOptions & options) const
void Player::wait_for_filled_queue() const
{
while (
message_queue_.size_approx() < options.read_ahead_queue_size &&
message_queue_.size_approx() < play_options_.read_ahead_queue_size &&
!is_storage_completely_loaded() && rclcpp::ok())
{
std::this_thread::sleep_for(queue_read_wait_period_);
}
}

void Player::load_storage_content(const PlayOptions & options)
void Player::load_storage_content()
{
auto queue_lower_boundary =
static_cast<size_t>(options.read_ahead_queue_size * read_ahead_lower_bound_percentage_);
auto queue_upper_boundary = options.read_ahead_queue_size;
static_cast<size_t>(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_);
auto queue_upper_boundary = play_options_.read_ahead_queue_size;

while (reader_->has_next() && rclcpp::ok()) {
if (message_queue_.size_approx() < queue_lower_boundary) {
Expand Down Expand Up @@ -162,7 +187,7 @@ void Player::play_messages_from_queue()
play_messages_until_queue_empty();
if (!is_storage_completely_loaded() && rclcpp::ok()) {
RCLCPP_WARN(
transport_node_->get_logger(),
this->get_logger(),
"Message queue starved. Messages will be delayed. Consider "
"increasing the --read-ahead-queue-size option.");
}
Expand All @@ -185,10 +210,10 @@ void Player::play_messages_until_queue_empty()
}
}

void Player::prepare_publishers(const PlayOptions & options)
void Player::prepare_publishers()
{
rosbag2_storage::StorageFilter storage_filter;
storage_filter.topics = options.topics_to_filter;
storage_filter.topics = play_options_.topics_to_filter;
reader_->set_filter(storage_filter);

auto topics = reader_->get_all_topics_and_types();
Expand All @@ -204,36 +229,36 @@ void Player::prepare_publishers(const PlayOptions & options)

auto topic_qos = publisher_qos_for_topic(
topic, topic_qos_profile_overrides_,
transport_node_->get_logger());
this->get_logger());
try {
publishers_.insert(
std::make_pair(
topic.name, transport_node_->create_generic_publisher(
topic.name, this->create_generic_publisher(
topic.name, topic.type, topic_qos)));
} catch (const std::runtime_error & e) {
// using a warning log seems better than adding a new option
// to ignore some unknown message type library
RCLCPP_WARN(
transport_node_->get_logger(),
this->get_logger(),
"Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what());
}
}
}

void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time)
void Player::prepare_clock(rcutils_time_point_value_t starting_time)
{
double rate = options.rate > 0.0 ? options.rate : 1.0;
double rate = play_options_.rate > 0.0 ? play_options_.rate : 1.0;
clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(starting_time, rate);

// Create /clock publisher
if (options.clock_publish_frequency > 0.f) {
if (play_options_.clock_publish_frequency > 0.f) {
const auto publish_period = std::chrono::nanoseconds(
static_cast<uint64_t>(RCUTILS_S_TO_NS(1) / options.clock_publish_frequency));
static_cast<uint64_t>(RCUTILS_S_TO_NS(1) / play_options_.clock_publish_frequency));
// NOTE: PlayerClock does not own this publisher because rosbag2_cpp
// should not own transport-based functionality
clock_publisher_ = transport_node_->create_publisher<rosgraph_msgs::msg::Clock>(
clock_publisher_ = this->create_publisher<rosgraph_msgs::msg::Clock>(
"/clock", rclcpp::ClockQoS());
clock_publish_timer_ = transport_node_->create_wall_timer(
clock_publish_timer_ = this->create_wall_timer(
publish_period, [this]() {
auto msg = rosgraph_msgs::msg::Clock();
msg.clock = rclcpp::Time(clock_->now());
Expand Down
27 changes: 19 additions & 8 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "rosbag2_cpp/clocks/player_clock.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_options.hpp"
#include "rosbag2_transport/play_options.hpp"
#include "rosgraph_msgs/msg/clock.hpp"

Expand All @@ -43,31 +44,41 @@ namespace rosbag2_transport
namespace impl
{

class Player
class Player : public rclcpp::Node
{
public:
explicit Player(
const std::string & node_name = "rosbag2_player",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

Player(
std::shared_ptr<rosbag2_cpp::Reader> reader,
std::shared_ptr<rclcpp::Node> transport_node);
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name = "rosbag2_player",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

virtual ~Player();

void play(const PlayOptions & options);
void play();

private:
void load_storage_content(const PlayOptions & options);
void load_storage_content();
bool is_storage_completely_loaded() const;
void enqueue_up_to_boundary(uint64_t boundary);
void wait_for_filled_queue(const PlayOptions & options) const;
void wait_for_filled_queue() const;
void play_messages_from_queue();
void play_messages_until_queue_empty();
void prepare_publishers(const PlayOptions & options);
void prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time);
void prepare_publishers();
void prepare_clock(rcutils_time_point_value_t starting_time);
static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;

std::shared_ptr<rosbag2_cpp::Reader> reader_;
rosbag2_storage::StorageOptions storage_options_;
rosbag2_transport::PlayOptions play_options_;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
mutable std::future<void> storage_loading_future_;
std::shared_ptr<rclcpp::Node> transport_node_;
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericPublisher>> publishers_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
Expand Down
51 changes: 24 additions & 27 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,30 @@ Player::Player()
Player::~Player()
{}

void Player::play(
const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
auto player = std::make_shared<impl::Player>(reader_, storage_options, play_options);
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
auto exit = rcpputils::scope_exit(
[&exec, &spin_thread]() {
exec.cancel();
spin_thread.join();
});
try {
do {
player->play();
} while (rclcpp::ok() && play_options.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(player->get_logger(), "Failed to play: %s", e.what());
}
}

Recorder::Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer)
: writer_(std::move(writer))
{}
Expand Down Expand Up @@ -89,31 +113,4 @@ void Recorder::record(
}
}

void Player::play(
const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
auto transport_node =
setup_node(play_options.node_prefix, play_options.topic_remapping_options);
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(transport_node);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
auto exit = rcpputils::scope_exit(
[&exec, &spin_thread]() {
exec.cancel();
spin_thread.join();
});
try {
impl::Player player(reader_, transport_node);
do {
reader_->open(storage_options, {"", rmw_get_serialization_format()});
player.play(play_options);
} while (rclcpp::ok() && play_options.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(transport_node->get_logger(), "Failed to play: %s", e.what());
}
}

} // namespace rosbag2_transport

0 comments on commit f3f4f10

Please sign in to comment.