Skip to content

Commit

Permalink
player owns the reader (ros2#725)
Browse files Browse the repository at this point in the history
* player owns the reader

Signed-off-by: Karsten Knese <Karsten1987@users.noreply.github.com>

* adapt rosbag2_py

Signed-off-by: Karsten Knese <Karsten1987@users.noreply.github.com>

* expose rosbag2_cpp::Reader::reset

Signed-off-by: Karsten Knese <Karsten1987@users.noreply.github.com>

* release reader from player to play in loop

Signed-off-by: Karsten Knese <Karsten1987@users.noreply.github.com>

* linters

Signed-off-by: Karsten Knese <Karsten1987@users.noreply.github.com>

* introduce timeout for subscription manager

Signed-off-by: Karsten Knese <Karsten1987@users.noreply.github.com>
  • Loading branch information
Karsten1987 committed Apr 19, 2021
1 parent 62fe091 commit 28ed9d5
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 159 deletions.
5 changes: 5 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ class ROSBAG2_CPP_PUBLIC Reader final
const rosbag2_storage::StorageOptions & storage_options,
const ConverterOptions & converter_options = ConverterOptions());

/**
* Closing the reader instance.
*/
void reset();

/**
* Ask whether the underlying bagfile contains at least one more message.
*
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ void Reader::open(
reader_impl_->open(storage_options, converter_options);
}

void Reader::reset()
{
reader_impl_->reset();
}

bool Reader::has_next()
{
return reader_impl_->has_next();
Expand Down
8 changes: 4 additions & 4 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,25 @@ class Player
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options)
{
std::shared_ptr<rosbag2_cpp::Reader> reader = nullptr;
std::unique_ptr<rosbag2_cpp::Reader> reader = nullptr;
// Determine whether to build compression or regular reader
{
rosbag2_storage::MetadataIo metadata_io{};
rosbag2_storage::BagMetadata metadata{};
if (metadata_io.metadata_file_exists(storage_options.uri)) {
metadata = metadata_io.read_metadata(storage_options.uri);
if (!metadata.compression_format.empty()) {
reader = std::make_shared<rosbag2_cpp::Reader>(
reader = std::make_unique<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_compression::SequentialCompressionReader>());
}
}
if (reader == nullptr) {
reader = std::make_shared<rosbag2_cpp::Reader>(
reader = std::make_unique<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_cpp::readers::SequentialReader>());
}
}

rosbag2_transport::Player impl(reader);
rosbag2_transport::Player impl(std::move(reader));
impl.play(storage_options, play_options);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,23 @@ class SubscriptionManager
return async(
std::launch::async, [this]() {
rclcpp::executors::SingleThreadedExecutor exec;
while (rclcpp::ok() && continue_spinning(expected_topics_with_size_)) {
auto start = std::chrono::high_resolution_clock::now();
while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start)) {
exec.spin_node_some(subscriber_node_);
}
});
}

private:
bool continue_spinning(std::unordered_map<std::string, size_t> expected_topics_with_sizes)
bool continue_spinning(
std::unordered_map<std::string, size_t> expected_topics_with_sizes,
std::chrono::time_point<std::chrono::high_resolution_clock> start_time)
{
auto current = std::chrono::high_resolution_clock::now();
if (current - start_time > std::chrono::seconds(10)) {
return false;
}

for (const auto & topic_expected : expected_topics_with_sizes) {
if (subscribed_messages_[topic_expected.first].size() < topic_expected.second) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace rosbag2_transport
struct PlayOptions
{
public:
size_t read_ahead_queue_size;
size_t read_ahead_queue_size = 1000;
std::string node_prefix = "";
float rate = 1.0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Player
Player();

ROSBAG2_TRANSPORT_PUBLIC
explicit Player(std::shared_ptr<rosbag2_cpp::Reader> reader);
explicit Player(std::unique_ptr<rosbag2_cpp::Reader> reader);

ROSBAG2_TRANSPORT_PUBLIC
virtual ~Player();
Expand All @@ -59,7 +59,7 @@ class Player
const PlayOptions & play_options);

protected:
std::shared_ptr<rosbag2_cpp::Reader> reader_;
std::unique_ptr<rosbag2_cpp::Reader> reader_;
};

class Recorder
Expand Down
17 changes: 14 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,30 @@ Player::Player(const std::string & node_name, const rclcpp::NodeOptions & node_o
}

Player::Player(
std::shared_ptr<rosbag2_cpp::Reader> reader,
std::unique_ptr<rosbag2_cpp::Reader> reader,
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name,
const rclcpp::NodeOptions & node_options)
: rclcpp::Node(
node_name,
rclcpp::NodeOptions(node_options).arguments(play_options.topic_remapping_options)),
reader_(reader),
reader_(std::move(reader)),
storage_options_(storage_options),
play_options_(play_options)
{}

Player::~Player()
{
// reader_->reset();
if (reader_) {
reader_->reset();
}
}

rosbag2_cpp::Reader * Player::release_reader()
{
reader_->reset();
return reader_.release();
}

const std::chrono::milliseconds
Expand Down Expand Up @@ -218,6 +226,9 @@ void Player::prepare_publishers()

auto topics = reader_->get_all_topics_and_types();
for (const auto & topic : topics) {
if (publishers_.find(topic.name) != publishers_.end()) {
continue;
}
// filter topics to add publishers if necessary
auto & filter_topics = storage_filter.topics;
if (!filter_topics.empty()) {
Expand Down
6 changes: 4 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Player : public rclcpp::Node
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

Player(
std::shared_ptr<rosbag2_cpp::Reader> reader,
std::unique_ptr<rosbag2_cpp::Reader> reader,
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name = "rosbag2_player",
Expand All @@ -62,6 +62,8 @@ class Player : public rclcpp::Node

void play();

rosbag2_cpp::Reader * release_reader();

private:
void load_storage_content();
bool is_storage_completely_loaded() const;
Expand All @@ -74,7 +76,7 @@ class Player : public rclcpp::Node
static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;

std::shared_ptr<rosbag2_cpp::Reader> reader_;
std::unique_ptr<rosbag2_cpp::Reader> reader_;
rosbag2_storage::StorageOptions storage_options_;
rosbag2_transport::PlayOptions play_options_;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
Expand Down
12 changes: 7 additions & 5 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
namespace rosbag2_transport
{

Player::Player(std::shared_ptr<rosbag2_cpp::Reader> reader)
Player::Player(std::unique_ptr<rosbag2_cpp::Reader> reader)
: reader_(std::move(reader))
{}

Player::Player()
: Player(std::make_shared<rosbag2_cpp::Reader>(
: Player(std::make_unique<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_cpp::readers::SequentialReader>()))
{}

Expand All @@ -53,17 +53,18 @@ Player::~Player()
void Player::play(
const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
auto player = std::make_shared<impl::Player>(reader_, storage_options, play_options);
auto player = std::make_shared<impl::Player>(std::move(reader_), storage_options, play_options);
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
auto exit = rcpputils::scope_exit(
[&exec, &spin_thread]() {
[&player, &exec, &spin_thread, this]() {
exec.cancel();
spin_thread.join();
reader_ = std::unique_ptr<rosbag2_cpp::Reader>(player->release_reader());
});
try {
do {
Expand All @@ -89,7 +90,8 @@ Recorder::~Recorder()
void Recorder::record(
const rosbag2_storage::StorageOptions & storage_options, const RecordOptions & record_options)
{
auto recorder = std::make_shared<impl::Recorder>(writer_, storage_options, record_options);
auto recorder = std::make_shared<impl::Recorder>(
writer_, storage_options, record_options);
try {
recorder->record();
rclcpp::executors::SingleThreadedExecutor exec;
Expand Down
Loading

0 comments on commit 28ed9d5

Please sign in to comment.