Skip to content

Commit

Permalink
Remove some code duplication between SequentialWriter and SequentialC…
Browse files Browse the repository at this point in the history
…ompressionWriter (ros2#527)

Remove code duplication between SequentialWriter and SequentialCompressionWriter

Signed-off-by: Jaison Titus <jaisontj@amazon.com>
  • Loading branch information
jaisontj committed Oct 2, 2020
1 parent 9b72b4a commit d745972
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp"
#include "rosbag2_cpp/storage_options.hpp"
#include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"

#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_factory.hpp"
Expand All @@ -48,7 +48,7 @@ namespace rosbag2_compression
{

class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
: public rosbag2_cpp::writer_interfaces::BaseWriterInterface
: public rosbag2_cpp::writers::SequentialWriter
{
public:
explicit SequentialCompressionWriter(
Expand Down Expand Up @@ -81,33 +81,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
*/
void reset() override;

/**
* Create a new topic in the underlying storage. Needs to be called for every topic used within
* a message which is passed to write(...).
*
* \param topic_with_type name and type identifier of topic to be created
* \throws runtime_error if the Writer is not open.
*/
void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override;

/**
* Remove a new topic in the underlying storage.
* If creation of subscription fails remove the topic
* from the db (more of cleanup)
*
* \param topic_with_type name and type identifier of topic to be created
* \throws runtime_error if the Writer is not open.
*/
void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override;

/**
* Write a message to a bagfile. The topic needs to have been created before writing is possible.
*
* \param message to be written to the bagfile
* \throws runtime_error if the Writer is not open.
*/
void write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) override;

protected:
/**
* Compress the most recent file and update the metadata file path.
Expand All @@ -132,38 +105,27 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
virtual void setup_compression();

private:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_{};
std::shared_ptr<rosbag2_cpp::SerializationFormatConverterFactoryInterface> converter_factory_{};
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> storage_{};
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_{};
std::unique_ptr<rosbag2_cpp::Converter> converter_{};
std::unique_ptr<rosbag2_compression::BaseCompressorInterface> compressor_{};
std::unique_ptr<rosbag2_compression::CompressionFactory> compression_factory_{};

// Used in bagfile splitting; specifies the best-effort maximum sub-section of a bagfile in bytes.
uint64_t max_bagfile_size_{rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT};

// Used to track topic -> message count
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info_{};

rosbag2_storage::BagMetadata metadata_{};

rosbag2_compression::CompressionOptions compression_options_{};

bool should_compress_last_file_{true};

// Closes the current backed storage and opens the next bagfile.
void split_bagfile();

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile() const;
void split_bagfile() override;

// Prepares the metadata by setting initial values.
void init_metadata();

// Record TopicInformation into metadata
void finalize_metadata();
void init_metadata() override;

// Helper method used by write to get the message in a format that is ready to be written.
// Common use cases include converting the message using the converter or
// performing other operations like compression on it
std::shared_ptr<rosbag2_storage::SerializedBagMessage>
get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) override;
};

} // namespace rosbag2_compression

#endif // ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ std::string format_storage_uri(const std::string & base_folder, uint64_t storage

SequentialCompressionWriter::SequentialCompressionWriter(
const rosbag2_compression::CompressionOptions & compression_options)
: storage_factory_{std::make_unique<rosbag2_storage::StorageFactory>()},
converter_factory_{std::make_shared<rosbag2_cpp::SerializationFormatConverterFactory>()},
metadata_io_{std::make_unique<rosbag2_storage::MetadataIo>()},
: SequentialWriter(),
compression_factory_{std::make_unique<rosbag2_compression::CompressionFactory>()},
compression_options_{compression_options}
{}
Expand All @@ -66,9 +64,7 @@ SequentialCompressionWriter::SequentialCompressionWriter(
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory,
std::shared_ptr<rosbag2_cpp::SerializationFormatConverterFactoryInterface> converter_factory,
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io)
: storage_factory_{std::move(storage_factory)},
converter_factory_{std::move(converter_factory)},
metadata_io_{std::move(metadata_io)},
: SequentialWriter(std::move(storage_factory), converter_factory, std::move(metadata_io)),
compression_factory_{std::move(compression_factory)},
compression_options_{compression_options}
{}
Expand Down Expand Up @@ -103,50 +99,11 @@ void SequentialCompressionWriter::open(
const rosbag2_cpp::StorageOptions & storage_options,
const rosbag2_cpp::ConverterOptions & converter_options)
{
max_bagfile_size_ = storage_options.max_bagfile_size;
base_folder_ = storage_options.uri;

if (converter_options.output_serialization_format !=
converter_options.input_serialization_format)
{
converter_ = std::make_unique<rosbag2_cpp::Converter>(converter_options, converter_factory_);
}

rcpputils::fs::path db_path(base_folder_);
if (db_path.is_directory()) {
std::stringstream error;
error << "Database directory already exists (" << db_path.string() <<
"), can't overwrite existing database";
throw std::runtime_error{error.str()};
}

bool dir_created = rcpputils::fs::create_directories(db_path);
if (!dir_created) {
std::stringstream error;
error << "Failed to create database directory (" << db_path.string() << ").";
throw std::runtime_error{error.str()};
}

const auto storage_uri = format_storage_uri(base_folder_, 0);
storage_ = storage_factory_->open_read_write(storage_uri, storage_options.storage_id);
if (!storage_) {
throw std::runtime_error{"No storage could be initialized. Abort"};
}

if (max_bagfile_size_ != 0 &&
max_bagfile_size_ < storage_->get_minimum_split_file_size())
{
std::stringstream error;
error << "Invalid bag splitting size given. Please provide a value greater than " <<
storage_->get_minimum_split_file_size() << ". Specified value of " <<
storage_options.max_bagfile_size;
throw std::runtime_error{error.str()};
}

SequentialWriter::open(storage_options, converter_options);
setup_compression();
init_metadata();
}


void SequentialCompressionWriter::reset()
{
if (!base_folder_.empty() && compressor_) {
Expand All @@ -170,52 +127,6 @@ void SequentialCompressionWriter::reset()
storage_factory_.reset();
}

void SequentialCompressionWriter::create_topic(
const rosbag2_storage::TopicMetadata & topic_with_type)
{
if (!storage_) {
throw std::runtime_error{"Bag is not open. Call open() before writing."};
}

if (converter_) {
converter_->add_topic(topic_with_type.name, topic_with_type.type);
}

if (topics_names_to_info_.find(topic_with_type.name) ==
topics_names_to_info_.end())
{
rosbag2_storage::TopicInformation info{};
info.topic_metadata = topic_with_type;

const auto insert_res = topics_names_to_info_.insert(
std::make_pair(topic_with_type.name, info));

if (!insert_res.second) {
std::stringstream errmsg;
errmsg << "Failed to insert topic \"" << topic_with_type.name << "\"!";
throw std::runtime_error{errmsg.str()};
}

storage_->create_topic(topic_with_type);
}
}

void SequentialCompressionWriter::remove_topic(
const rosbag2_storage::TopicMetadata & topic_with_type)
{
if (!storage_) {
throw std::runtime_error{"Bag is not open. Call open() before removing."};
}

if (topics_names_to_info_.erase(topic_with_type.name) > 0) {
storage_->remove_topic(topic_with_type);
} else {
std::stringstream errmsg;
errmsg << "Failed to remove the non-existing topic \"" << topic_with_type.name << "\"!";
throw std::runtime_error{errmsg.str()};
}
}

void SequentialCompressionWriter::compress_last_file()
{
if (!compressor_) {
Expand Down Expand Up @@ -282,64 +193,15 @@ void SequentialCompressionWriter::compress_message(
compressor_->compress_serialized_bag_message(message.get());
}

void SequentialCompressionWriter::write(
std::shared_ptr<rosbag2_storage::SerializedBagMessage>
SequentialCompressionWriter::get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
{
if (!storage_) {
throw std::runtime_error{"Bag is not open. Call open() before writing."};
}

// Update the message count for the Topic.
++topics_names_to_info_.at(message->topic_name).message_count;

if (should_split_bagfile()) {
split_bagfile();
}

const auto message_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>{
std::chrono::nanoseconds(message->time_stamp)};
metadata_.starting_time = std::min(metadata_.starting_time, message_timestamp);

const auto duration = message_timestamp - metadata_.starting_time;
metadata_.duration = std::max(metadata_.duration, duration);

auto converted_message = converter_ ? converter_->convert(message) : message;
auto writeable_msg = SequentialWriter::get_writeable_message(message);
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::MESSAGE) {
compress_message(converted_message);
}

storage_->write(converted_message);
}

bool SequentialCompressionWriter::should_split_bagfile() const
{
if (max_bagfile_size_ == rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT) {
return false;
} else {
return storage_->get_bagfile_size() > max_bagfile_size_;
}
}

void SequentialCompressionWriter::finalize_metadata()
{
metadata_.bag_size = 0;

for (const auto & path : metadata_.relative_file_paths) {
const auto bag_path = rcpputils::fs::path{path};

if (bag_path.exists()) {
metadata_.bag_size += bag_path.file_size();
}
}

metadata_.topics_with_message_count.clear();
metadata_.topics_with_message_count.reserve(topics_names_to_info_.size());
metadata_.message_count = 0;

for (const auto & topic : topics_names_to_info_) {
metadata_.topics_with_message_count.push_back(topic.second);
metadata_.message_count += topic.second.message_count;
compress_message(writeable_msg);
}
return writeable_msg;
}

} // namespace rosbag2_compression
13 changes: 10 additions & 3 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
*/
void write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) override;

private:
protected:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_;
Expand All @@ -128,16 +128,23 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
rosbag2_storage::BagMetadata metadata_;

// Closes the current backed storage and opens the next bagfile.
void split_bagfile();
virtual void split_bagfile();

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile() const;

// Prepares the metadata by setting initial values.
void init_metadata();
virtual void init_metadata();

// Record TopicInformation into metadata
void finalize_metadata();

// Helper method used by write to get the message in a format that is ready to be written.
// Common use cases include converting the message using the converter or
// performing other operations like compression on it
virtual std::shared_ptr<rosbag2_storage::SerializedBagMessage>
get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message);
};

} // namespace writers
Expand Down
12 changes: 10 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,12 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa
const auto duration = message_timestamp - metadata_.starting_time;
metadata_.duration = std::max(metadata_.duration, duration);

auto converted_msg = get_writeable_message(message);
// if cache size is set to zero, we directly call write
if (max_cache_size_ == 0u) {
storage_->write(converter_ ? converter_->convert(message) : message);
storage_->write(converted_msg);
} else {
cache_.push_back(converter_ ? converter_->convert(message) : message);
cache_.push_back(converted_msg);
if (cache_.size() >= max_cache_size_) {
storage_->write(cache_);
// reset cache
Expand All @@ -257,6 +258,13 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa
}
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage>
SequentialWriter::get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
{
return converter_ ? converter_->convert(message) : message;
}

bool SequentialWriter::should_split_bagfile() const
{
// Assume we aren't splitting
Expand Down

0 comments on commit d745972

Please sign in to comment.