Skip to content

Commit

Permalink
ros2GH-27 Minor refactoring for readability
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin-Idel-SI authored and anhosi committed Nov 30, 2018
1 parent 81050be commit 7592b16
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 30 deletions.
Expand Up @@ -27,7 +27,7 @@ struct RecordOptions
bool all;
std::vector<std::string> topics;
std::string rmw_serialization_format;
std::chrono::milliseconds topic_polling_frequency;
std::chrono::milliseconds topic_polling_interval;
};

} // namespace rosbag2_transport
Expand Down
77 changes: 50 additions & 27 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Expand Up @@ -16,6 +16,7 @@

#include <algorithm>
#include <future>
#include <map>
#include <memory>
#include <string>
#include <utility>
Expand All @@ -35,8 +36,10 @@ void Recorder::record(const RecordOptions & record_options)
{
ROSBAG2_TRANSPORT_LOG_INFO("Setup complete. Listening for topics...");
auto discovery_future = launch_topics_discovery(
record_options.topic_polling_frequency, record_options.topics);
rclcpp::spin(node_);
record_options.topic_polling_interval, record_options.topics);

record_messages();

discovery_future.wait();
subscriptions_.clear();
}
Expand Down Expand Up @@ -66,42 +69,62 @@ Recorder::create_subscription(
}

std::future<void> Recorder::launch_topics_discovery(
std::chrono::milliseconds topic_polling_frequency, std::vector<std::string> topics)
std::chrono::milliseconds topic_polling_interval,
const std::vector<std::string> & topics_to_record)
{
auto subscribe_to_topics = [this, topics, topic_polling_frequency] {
auto subscribe_to_topics = [this, topics_to_record, topic_polling_interval] {
while (rclcpp::ok()) {
auto all_topics_and_types = topics.empty() ?
auto all_topics_and_types_to_subscribe = topics_to_record.empty() ?
node_->get_all_topics_with_types() :
node_->get_topics_with_types(topics);
node_->get_topics_with_types(topics_to_record);

if (!topics.empty() && subscriptions_.size() == topics.size()) {
if (is_every_topic_subscribed(topics_to_record)) {
return;
}

for (const auto & topic_with_type : all_topics_and_types) {
std::string topic_name = topic_with_type.first;
bool already_subscribed = std::find(
subscribed_topics_.begin(),
subscribed_topics_.end(),
topic_with_type.first) != subscribed_topics_.end();

if (!already_subscribed) {
std::string topic_type = topic_with_type.second;
auto subscription = create_subscription(topic_name, topic_type);
if (subscription) {
subscribed_topics_.push_back(topic_name);
subscriptions_.push_back(subscription);
writer_->create_topic({topic_name, topic_type});
ROSBAG2_TRANSPORT_LOG_INFO_STREAM(
"Subscribed to topic '" << topic_name << "'");
}
}
}
std::this_thread::sleep_for(topic_polling_frequency);
subscribe_all_missing_topics(all_topics_and_types_to_subscribe);
std::this_thread::sleep_for(topic_polling_interval);
}
};

return std::async(std::launch::async, subscribe_to_topics);
}

bool Recorder::is_every_topic_subscribed(
const std::vector<std::string> & topics_to_record) const
{
return !topics_to_record.empty() && subscriptions_.size() == topics_to_record.size();
}

void Recorder::subscribe_all_missing_topics(
const std::map<std::string, std::string> & all_topics_and_types)
{
for (const auto & topic_with_type : all_topics_and_types) {
bool already_subscribed = find(
subscribed_topics_.begin(),
subscribed_topics_.end(),
topic_with_type.first) != subscribed_topics_.end();

if (!already_subscribed) {
subscribe_topic({topic_with_type.first, topic_with_type.second});
}
}
}

void Recorder::subscribe_topic(const rosbag2::TopicWithType & topic)
{
auto subscription = create_subscription(topic.name, topic.type);
if (subscription) {
subscribed_topics_.push_back(topic.name);
subscriptions_.push_back(subscription);
writer_->create_topic(topic);
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Subscribed to topic '" << topic.name << "'");
}
}

void Recorder::record_messages() const
{
spin(node_);
}

} // namespace rosbag2_transport
12 changes: 11 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/recorder.hpp
Expand Up @@ -16,10 +16,14 @@
#define ROSBAG2_TRANSPORT__RECORDER_HPP_

#include <future>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <utility>

#include "rosbag2/types.hpp"
#include "rosbag2/writer.hpp"
#include "rosbag2_transport/record_options.hpp"

namespace rosbag2
Expand All @@ -44,7 +48,13 @@ class Recorder
std::shared_ptr<GenericSubscription> create_subscription(
const std::string & topic_name, const std::string & topic_type);
std::future<void> launch_topics_discovery(
std::chrono::milliseconds topic_polling_frequency, std::vector<std::string> topics = {});
std::chrono::milliseconds topic_polling_interval,
const std::vector<std::string> & topics_to_record = {});
void subscribe_all_missing_topics(
const std::map<std::string, std::string> & all_topics_and_types);
void subscribe_topic(const rosbag2::TopicWithType & topic_with_type);
bool is_every_topic_subscribed(const std::vector<std::string> & topics_to_record) const;
void record_messages() const;

std::shared_ptr<rosbag2::Writer> writer_;
std::shared_ptr<Rosbag2Node> node_;
Expand Down
Expand Up @@ -49,7 +49,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
storage_options.uri = std::string(uri);
storage_options.storage_id = std::string(storage_id);
record_options.all = all;
record_options.topic_polling_frequency = std::chrono::milliseconds(100);
record_options.topic_polling_interval = std::chrono::milliseconds(100);

if (topics) {
PyObject * topic_iterator = PyObject_GetIter(topics);
Expand Down

0 comments on commit 7592b16

Please sign in to comment.