Skip to content

Commit

Permalink
turn recorder into a node (ros2#724)
Browse files Browse the repository at this point in the history
* turn recorder into a node

Signed-off-by: Karsten Knese <Karsten1987@users.noreply.github.com>

* remove unused function

Signed-off-by: Karsten Knese <Karsten1987@users.noreply.github.com>
  • Loading branch information
Karsten1987 committed Apr 9, 2021
1 parent f3f4f10 commit cc12254
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 81 deletions.
126 changes: 72 additions & 54 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,92 +48,115 @@ namespace rosbag2_transport
{
namespace impl
{
Recorder::Recorder(
const std::string & node_name,
const rclcpp::NodeOptions & node_options)
: rclcpp::Node(node_name, node_options)
{
// TODO(karsten1987): Use this constructor later with parameter parsing.
// The reader, storage_options as well as record_options can be loaded via parameter.
// That way, the recorder can be used as a simple component in a component manager.
throw rclcpp::exceptions::UnimplementedError();
}

Recorder::Recorder(
std::shared_ptr<rosbag2_cpp::Writer> writer,
std::shared_ptr<rclcpp::Node> transport_node)
: writer_(std::move(writer)), transport_node_(std::move(transport_node)) {}
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::RecordOptions & record_options,
const std::string & node_name,
const rclcpp::NodeOptions & node_options)
: rclcpp::Node(node_name, rclcpp::NodeOptions(node_options).start_parameter_event_publisher(false)),
writer_(std::move(writer)),
storage_options_(storage_options),
record_options_(record_options),
stop_discovery_(record_options_.is_discovery_disabled)
{
}

void Recorder::record(const RecordOptions & record_options)
Recorder::~Recorder()
{
topic_qos_profile_overrides_ = record_options.topic_qos_profile_overrides;
if (record_options.rmw_serialization_format.empty()) {
throw std::runtime_error("No serialization format specified!");
}
serialization_format_ = record_options.rmw_serialization_format;
RCLCPP_INFO(transport_node_->get_logger(), "Listening for topics...");
subscribe_topics(get_requested_or_available_topics(record_options));

std::future<void> discovery_future;
if (!record_options.is_discovery_disabled) {
auto discovery = std::bind(
&Recorder::topics_discovery, this,
record_options);
discovery_future = std::async(std::launch::async, discovery);
stop_discovery_ = true;
if (discovery_future_.valid()) {
discovery_future_.wait();
}

record_messages();
subscriptions_.clear();
}

if (discovery_future.valid()) {
discovery_future.wait();
void Recorder::record()
{
topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides;
if (record_options_.rmw_serialization_format.empty()) {
throw std::runtime_error("No serialization format specified!");
}

subscriptions_.clear();
writer_->open(
storage_options_,
{rmw_get_serialization_format(), record_options_.rmw_serialization_format});

serialization_format_ = record_options_.rmw_serialization_format;
RCLCPP_INFO(this->get_logger(), "Listening for topics...");
subscribe_topics(get_requested_or_available_topics());

if (!record_options_.is_discovery_disabled) {
discovery_future_ =
std::async(std::launch::async, std::bind(&Recorder::topics_discovery, this));
}
}

void Recorder::topics_discovery(const RecordOptions & record_options)
void Recorder::topics_discovery()
{
while (rclcpp::ok()) {
while (rclcpp::ok() && stop_discovery_ == false) {
auto topics_to_subscribe =
get_requested_or_available_topics(record_options);
get_requested_or_available_topics();
for (const auto & topic_and_type : topics_to_subscribe) {
warn_if_new_qos_for_subscribed_topic(topic_and_type.first);
}
auto missing_topics = get_missing_topics(topics_to_subscribe);
subscribe_topics(missing_topics);

if (!record_options.topics.empty() && subscriptions_.size() == record_options.topics.size()) {
if (!record_options_.topics.empty() && subscriptions_.size() == record_options_.topics.size()) {
RCLCPP_INFO(
transport_node_->get_logger(),
this->get_logger(),
"All requested topics are subscribed. Stopping discovery...");
return;
}
std::this_thread::sleep_for(record_options.topic_polling_interval);
std::this_thread::sleep_for(record_options_.topic_polling_interval);
}
}

std::unordered_map<std::string, std::string>
Recorder::get_requested_or_available_topics(const RecordOptions & record_options)
Recorder::get_requested_or_available_topics()
{
auto all_topics_and_types = transport_node_->get_topic_names_and_types();
auto all_topics_and_types = this->get_topic_names_and_types();
auto filtered_topics_and_types = topic_filter::filter_topics_with_more_than_one_type(
all_topics_and_types, record_options.include_hidden_topics);
all_topics_and_types, record_options_.include_hidden_topics);

if (!record_options.topics.empty()) {
if (!record_options_.topics.empty()) {
// expand specified topics
std::vector<std::string> expanded_topics;
expanded_topics.reserve(record_options.topics.size());
for (const auto & topic : record_options.topics) {
expanded_topics.reserve(record_options_.topics.size());
for (const auto & topic : record_options_.topics) {
expanded_topics.push_back(
rclcpp::expand_topic_or_service_name(
topic, transport_node_->get_name(), transport_node_->get_namespace(), false));
topic, this->get_name(), this->get_namespace(), false));
}
filtered_topics_and_types = topic_filter::filter_topics(
expanded_topics, filtered_topics_and_types);
}

if (record_options.regex.empty() && record_options.exclude.empty()) {
if (record_options_.regex.empty() && record_options_.exclude.empty()) {
return filtered_topics_and_types;
}

std::unordered_map<std::string, std::string> filtered_by_regex;

std::regex topic_regex(record_options.regex);
std::regex exclude_regex(record_options.exclude);
bool take = record_options.all;
// for (const auto & kv : all_topics_and_types) {
std::regex topic_regex(record_options_.regex);
std::regex exclude_regex(record_options_.exclude);
bool take = record_options_.all;
for (const auto & kv : filtered_topics_and_types) {
// regex_match returns false for 'empty' regex
if (!record_options.regex.empty()) {
if (!record_options_.regex.empty()) {
take = std::regex_match(kv.first, topic_regex);
}
if (take) {
Expand Down Expand Up @@ -185,7 +208,7 @@ void Recorder::subscribe_topic(const rosbag2_storage::TopicMetadata & topic)
if (subscription) {
subscriptions_.insert({topic.name, subscription});
RCLCPP_INFO_STREAM(
transport_node_->get_logger(),
this->get_logger(),
"Subscribed to topic '" << topic.name << "'");
} else {
writer_->remove_topic(topic);
Expand All @@ -197,7 +220,7 @@ std::shared_ptr<rclcpp::GenericSubscription>
Recorder::create_subscription(
const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos)
{
auto subscription = transport_node_->create_generic_subscription(
auto subscription = this->create_generic_subscription(
topic_name,
topic_type,
qos,
Expand All @@ -222,7 +245,7 @@ Recorder::create_subscription(
int error = rcutils_system_time_now(&time_stamp);
if (error != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
transport_node_->get_logger(),
this->get_logger(),
"Error getting current time. Error:" << rcutils_get_error_string().str);
}
bag_message->time_stamp = time_stamp;
Expand All @@ -232,15 +255,10 @@ Recorder::create_subscription(
return subscription;
}

void Recorder::record_messages() const
{
spin(transport_node_);
}

std::string Recorder::serialized_offered_qos_profiles_for_topic(const std::string & topic_name)
{
YAML::Node offered_qos_profiles;
auto endpoints = transport_node_->get_publishers_info_by_topic(topic_name);
auto endpoints = this->get_publishers_info_by_topic(topic_name);
for (const auto & info : endpoints) {
offered_qos_profiles.push_back(Rosbag2QoS(info.qos_profile()));
}
Expand All @@ -251,12 +269,12 @@ rclcpp::QoS Recorder::subscription_qos_for_topic(const std::string & topic_name)
{
if (topic_qos_profile_overrides_.count(topic_name)) {
RCLCPP_INFO_STREAM(
transport_node_->get_logger(),
this->get_logger(),
"Overriding subscription profile for " << topic_name);
return topic_qos_profile_overrides_.at(topic_name);
}
return Rosbag2QoS::adapt_request_to_offers(
topic_name, transport_node_->get_publishers_info_by_topic(topic_name));
topic_name, this->get_publishers_info_by_topic(topic_name));
}

void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name)
Expand All @@ -271,7 +289,7 @@ void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_na
return;
}
const auto & used_profile = existing_subscription->second->get_actual_qos().get_rmw_qos_profile();
auto publishers_info = transport_node_->get_publishers_info_by_topic(topic_name);
auto publishers_info = this->get_publishers_info_by_topic(topic_name);
for (const auto & info : publishers_info) {
auto new_profile = info.qos_profile().get_rmw_qos_profile();
bool incompatible_reliability =
Expand All @@ -283,15 +301,15 @@ void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_na

if (incompatible_reliability) {
RCLCPP_WARN_STREAM(
transport_node_->get_logger(),
this->get_logger(),
"A new publisher for subscribed topic " << topic_name << " "
"was found offering RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT, "
"but rosbag already subscribed requesting RMW_QOS_POLICY_RELIABILITY_RELIABLE. "
"Messages from this new publisher will not be recorded.");
topics_warned_about_incompatibility_.insert(topic_name);
} else if (incompatible_durability) {
RCLCPP_WARN_STREAM(
transport_node_->get_logger(),
this->get_logger(),
"A new publisher for susbcribed topic " << topic_name << " "
"was found offering RMW_QOS_POLICY_DURABILITY_VOLATILE, "
"but rosbag2 already subscribed requesting RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. "
Expand Down
27 changes: 19 additions & 8 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <utility>
#include <vector>

#include "rclcpp/node.hpp"
#include "rclcpp/qos.hpp"

#include "rosbag2_cpp/writer.hpp"
Expand All @@ -41,14 +42,23 @@ namespace rosbag2_transport
namespace impl
{

class Recorder
class Recorder : public rclcpp::Node
{
public:
explicit Recorder(
const std::string & node_name = "rosbag2_recorder",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

Recorder(
std::shared_ptr<rosbag2_cpp::Writer> writer,
std::shared_ptr<rclcpp::Node> transport_node);
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::RecordOptions & record_options,
const std::string & node_name = "rosbag2_recorder",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions());

virtual ~Recorder();

void record(const RecordOptions & record_options);
void record();

const std::unordered_set<std::string> &
topics_using_fallback_qos() const
Expand All @@ -63,10 +73,10 @@ class Recorder
}

private:
void topics_discovery(const RecordOptions & record_options);
void topics_discovery();

std::unordered_map<std::string, std::string>
get_requested_or_available_topics(const RecordOptions & record_options);
get_requested_or_available_topics();

std::unordered_map<std::string, std::string>
get_missing_topics(const std::unordered_map<std::string, std::string> & all_topics);
Expand All @@ -79,8 +89,6 @@ class Recorder
std::shared_ptr<rclcpp::GenericSubscription> create_subscription(
const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos);

void record_messages() const;

/**
* Find the QoS profile that should be used for subscribing.
*
Expand All @@ -98,7 +106,10 @@ class Recorder
void warn_if_new_qos_for_subscribed_topic(const std::string & topic_name);

std::shared_ptr<rosbag2_cpp::Writer> writer_;
std::shared_ptr<rclcpp::Node> transport_node_;
rosbag2_storage::StorageOptions storage_options_;
rosbag2_transport::RecordOptions record_options_;
std::atomic<bool> stop_discovery_;
std::future<void> discovery_future_;
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericSubscription>> subscriptions_;
std::unordered_set<std::string> topics_warned_about_incompatibility_;
std::string serialization_format_;
Expand Down
33 changes: 14 additions & 19 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,6 @@
#include "player.hpp"
#include "recorder.hpp"

namespace
{
std::shared_ptr<rclcpp::Node> setup_node(
std::string node_prefix = "",
const std::vector<std::string> & topic_remapping_options = {})
{
auto node_options = rclcpp::NodeOptions().arguments(topic_remapping_options);
return std::make_shared<rclcpp::Node>(node_prefix + "_rosbag2", node_options);
}
} // namespace

namespace rosbag2_transport
{

Expand Down Expand Up @@ -100,16 +89,22 @@ Recorder::~Recorder()
void Recorder::record(
const rosbag2_storage::StorageOptions & storage_options, const RecordOptions & record_options)
{
auto recorder = std::make_shared<impl::Recorder>(writer_, storage_options, record_options);
try {
writer_->open(
storage_options, {rmw_get_serialization_format(), record_options.rmw_serialization_format});

auto transport_node = setup_node(record_options.node_prefix);

impl::Recorder recorder(writer_, transport_node);
recorder.record(record_options);
recorder->record();
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(recorder);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
auto exit = rcpputils::scope_exit(
[&exec, &spin_thread]() {
exec.cancel();
spin_thread.join();
});
} catch (std::runtime_error & e) {
RCLCPP_ERROR(rclcpp::get_logger("rosbag2_transport"), "Failed to record: %s", e.what());
RCLCPP_ERROR(recorder->get_logger(), "Failed to record: %s", e.what());
}
}

Expand Down

0 comments on commit cc12254

Please sign in to comment.