Skip to content

Commit

Permalink
Fix relative metadata paths in SequentialCompressionWriter (ros2#613)
Browse files Browse the repository at this point in the history
* Fix relative metadata path writing in compression by deduplicating business logic

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
emersonknapp committed Jan 27, 2021
1 parent f0e5744 commit c4912cd
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 79 deletions.
Expand Up @@ -123,10 +123,17 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
/**
* Compress a file and update the metadata file path.
*
* Note: this may log an error without raising an exception in the case that the input file
* could not be deleted after compressing. This is an error and should never happen, but given
* that the desired output is created, execution will not be halted.
*
* \param compressor An initialized compression context.
* \param message The URI of the file to compress.
* \param file_relative_to_bag Relative path of the file to compress, as stored in metadata -
* meaning the path is relative to the bag base folder.
*/
virtual void compress_file(BaseCompressorInterface & compressor, const std::string & file);
virtual void compress_file(
BaseCompressorInterface & compressor,
const std::string & file_relative_to_bag);

/**
* Checks if the compression by message option is specified and a compressor exists.
Expand Down
Expand Up @@ -77,19 +77,24 @@ void SequentialCompressionWriter::compression_thread_fn()
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message;
std::string file;
{
// This mutex synchronizes both the condition and the running_ boolean, so it has to be
// held when dealing with either/both
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
if (!compression_is_running_) {
break;
}
compressor_condition_.wait(lock);
compressor_condition_.wait(
lock,
[&] {
return !compression_is_running_ ||
!compressor_message_queue_.empty() ||
!compressor_file_queue_.empty();
});

if (!compressor_message_queue_.empty()) {
message = compressor_message_queue_.front();
compressor_message_queue_.pop();
} else if (!compressor_file_queue_.empty()) {
file = compressor_file_queue_.front();
compressor_file_queue_.pop();
} else if (!compression_is_running_) {
// I woke up, all work queues are empty, and the main thread has stopped execution. Exit.
break;
}
}

Expand All @@ -111,11 +116,7 @@ void SequentialCompressionWriter::compression_thread_fn()
void SequentialCompressionWriter::init_metadata()
{
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
metadata_ = rosbag2_storage::BagMetadata{};
metadata_.storage_identifier = storage_->get_storage_identifier();
metadata_.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>{
std::chrono::nanoseconds::max()};
metadata_.relative_file_paths = {storage_->get_relative_file_path()};
SequentialWriter::init_metadata();
metadata_.compression_format = compression_options_.compression_format;
metadata_.compression_mode =
rosbag2_compression::compression_mode_to_string(compression_options_.compression_mode);
Expand Down Expand Up @@ -240,38 +241,43 @@ void SequentialCompressionWriter::remove_topic(

void SequentialCompressionWriter::compress_file(
BaseCompressorInterface & compressor,
const std::string & file)
const std::string & file_relative_to_bag)
{
ROSBAG2_COMPRESSION_LOG_DEBUG("Compressing file: %s", file.c_str());
const auto to_compress = rcpputils::fs::path{file};
using rcpputils::fs::path;

const auto file_relative_to_pwd = path(base_folder_) / file_relative_to_bag;
ROSBAG2_COMPRESSION_LOG_INFO_STREAM("Compressing file: " << file_relative_to_pwd.string());

if (to_compress.exists() && to_compress.file_size() > 0u) {
const auto compressed_uri = compressor.compress_uri(to_compress.string());
if (file_relative_to_pwd.exists() && file_relative_to_pwd.file_size() > 0u) {
const auto compressed_uri = compressor.compress_uri(file_relative_to_pwd.string());
const auto relative_compressed_uri = path(compressed_uri).filename();
{
// After we've compressed the file, replace the name in the file list with the new name.
// Must search for the entry because other threads may have changed the order of the vector
// and invalidated any index or iterator we held to it.
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
auto iter = std::find(
const auto iter = std::find(
metadata_.relative_file_paths.begin(),
metadata_.relative_file_paths.end(),
file);
file_relative_to_bag);
if (iter != metadata_.relative_file_paths.end()) {
*iter = compressed_uri;
*iter = relative_compressed_uri.string();
} else {
ROSBAG2_COMPRESSION_LOG_ERROR_STREAM(
"Failed to find path to uncompressed bag: \"" << file <<
"Failed to find path to uncompressed bag: \"" << file_relative_to_pwd.string() <<
"\"; this shouldn't happen.");
}
}

if (!rcpputils::fs::remove(to_compress)) {
if (!rcpputils::fs::remove(file_relative_to_pwd)) {
ROSBAG2_COMPRESSION_LOG_ERROR_STREAM(
"Failed to remove uncompressed bag: \"" << to_compress.string() << "\"");
"Failed to remove original pre-compressed bag file: \"" <<
file_relative_to_pwd.string() << "\". This should never happen - but execution " <<
"will not be halted because the compressed output was successfully created.");
}
} else {
ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM(
"Removing last file: \"" << to_compress.string() <<
"Removing last file: \"" << file_relative_to_pwd.string() <<
"\" because it either is empty or does not exist.");
}
}
Expand All @@ -281,13 +287,14 @@ void SequentialCompressionWriter::split_bagfile()
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);

switch_to_next_storage();
// Grab last file before calling common splitting logic, which pushes the new filename
const auto last_file = metadata_.relative_file_paths.back();
SequentialWriter::split_bagfile();

// If we're in FILE compression mode, push this file's name on to the queue so another
// thread will handle compressing it. If not, we can just carry on.
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) {
std::string file = metadata_.relative_file_paths.back();
compressor_file_queue_.push(file);
compressor_file_queue_.push(last_file);
compressor_condition_.notify_one();
}

Expand All @@ -296,8 +303,6 @@ void SequentialCompressionWriter::split_bagfile()
// storage plugin.
should_compress_last_file_ = false;
}

metadata_.relative_file_paths.push_back(storage_->get_relative_file_path());
}

void SequentialCompressionWriter::compress_message(
Expand Down
Expand Up @@ -14,6 +14,7 @@

#include <gmock/gmock.h>

#include <fstream>
#include <memory>
#include <string>
#include <utility>
Expand Down Expand Up @@ -50,19 +51,84 @@ class SequentialCompressionWriterTest : public Test
serialization_format_{"rmw_format"}
{
tmp_dir_storage_options_.uri = tmp_dir_.string();
rcpputils::fs::remove(tmp_dir_);
rcpputils::fs::remove_all(tmp_dir_);
ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(Return(storage_));
EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0));
// intercept the metadata write so we can analyze it.
ON_CALL(*metadata_io_, write_metadata).WillByDefault(
[this](const std::string &, const rosbag2_storage::BagMetadata & metadata) {
intercepted_metadata_ = metadata;
});
}

~SequentialCompressionWriterTest()
{
rcpputils::fs::remove_all(tmp_dir_);
}

void initializeFakeFileStorage()
{
// Create mock implementation of the storage, using files and a size of 1 per message
// initialize values when opening a new bagfile
ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(
DoAll(
Invoke(
[this](const rosbag2_storage::StorageOptions & storage_options) {
fake_storage_size_ = 0;
fake_storage_uri_ = storage_options.uri;
// Touch the file
std::ofstream output(storage_options.uri);
ASSERT_TRUE(output.is_open());
// Put some arbitrary bytes in the file so it isn't interpreted as being empty
output << "Fake storage data" << std::endl;
output.close();
}),
Return(storage_)));
ON_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
});
ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_;
});
ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});
}

void initializeWriter(
const rosbag2_compression::CompressionOptions & compression_options,
std::unique_ptr<rosbag2_compression::CompressionFactory> custom_compression_factory = nullptr)
{
auto compression_factory = std::move(custom_compression_factory);
if (!compression_factory) {
compression_factory = std::make_unique<rosbag2_compression::CompressionFactory>();
}
auto sequential_writer = std::make_unique<rosbag2_compression::SequentialCompressionWriter>(
compression_options,
std::move(compression_factory),
std::move(storage_factory_),
converter_factory_,
std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
}

const std::string bag_name_ = "SequentialCompressionWriterTest";
std::unique_ptr<StrictMock<MockStorageFactory>> storage_factory_;
std::shared_ptr<NiceMock<MockStorage>> storage_;
std::shared_ptr<StrictMock<MockConverterFactory>> converter_factory_;
std::unique_ptr<MockMetadataIo> metadata_io_;
std::unique_ptr<rosbag2_cpp::Writer> writer_;
rcpputils::fs::path tmp_dir_;
rosbag2_storage::StorageOptions tmp_dir_storage_options_;
rosbag2_storage::BagMetadata intercepted_metadata_;
std::string serialization_format_;
uint64_t fake_storage_size_;
std::string fake_storage_uri_;

const uint64_t kDefaultCompressionQueueSize = 1;
const uint64_t kDefaultCompressionQueueThreads = 4;
Expand All @@ -73,15 +139,7 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri
rosbag2_compression::CompressionOptions compression_options{
"zstd", rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
auto compression_factory = std::make_unique<rosbag2_compression::CompressionFactory>();

auto sequential_writer = std::make_unique<rosbag2_compression::SequentialCompressionWriter>(
compression_options,
std::move(compression_factory),
std::move(storage_factory_),
converter_factory_,
std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
initializeWriter(compression_options);

EXPECT_THROW(
writer_->open(
Expand All @@ -95,29 +153,18 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_bad_compression_format)
rosbag2_compression::CompressionOptions compression_options{
"bad_format", rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
auto compression_factory = std::make_unique<rosbag2_compression::CompressionFactory>();

auto sequential_writer = std::make_unique<rosbag2_compression::SequentialCompressionWriter>(
compression_options,
std::move(compression_factory),
std::move(storage_factory_),
converter_factory_,
std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
initializeWriter(compression_options);

EXPECT_THROW(
writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_}),
std::invalid_argument);

EXPECT_TRUE(rcpputils::fs::remove(tmp_dir_));
}

TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size)
{
rosbag2_compression::CompressionOptions compression_options{
"zstd", rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
auto compression_factory = std::make_unique<rosbag2_compression::CompressionFactory>();

// Set minimum file size greater than max bagfile size option
const uint64_t min_split_file_size = 10;
Expand All @@ -127,13 +174,7 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size)
storage_options.max_bagfile_size = max_bagfile_size;
storage_options.uri = "foo.bar";

auto sequential_writer = std::make_unique<rosbag2_compression::SequentialCompressionWriter>(
compression_options,
std::move(compression_factory),
std::move(storage_factory_),
converter_factory_,
std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
initializeWriter(compression_options);

EXPECT_THROW(
writer_->open(storage_options, {serialization_format_, serialization_format_}),
Expand All @@ -145,24 +186,14 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f
rosbag2_compression::CompressionOptions compression_options{
"zstd", rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads};
auto compression_factory = std::make_unique<rosbag2_compression::CompressionFactory>();

auto sequential_writer = std::make_unique<rosbag2_compression::SequentialCompressionWriter>(
compression_options,
std::move(compression_factory),
std::move(storage_factory_),
converter_factory_,
std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
initializeWriter(compression_options);

auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty";
auto storage_options = rosbag2_storage::StorageOptions();
storage_options.uri = tmp_dir.string();

EXPECT_NO_THROW(
writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_}));

EXPECT_TRUE(rcpputils::fs::remove(tmp_dir_));
}

TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor)
Expand All @@ -173,19 +204,55 @@ TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor)
auto compression_factory = std::make_unique<StrictMock<MockCompressionFactory>>();
EXPECT_CALL(*compression_factory, create_compressor(_)).Times(1);

auto sequential_writer = std::make_unique<rosbag2_compression::SequentialCompressionWriter>(
compression_options,
std::move(compression_factory),
std::move(storage_factory_),
converter_factory_,
std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));
initializeWriter(compression_options, std::move(compression_factory));

// This will throw an exception because the MockCompressionFactory does not actually create
// a compressor.
EXPECT_THROW(
writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_}),
std::runtime_error);
}

EXPECT_TRUE(rcpputils::fs::remove(tmp_dir_));
TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative_filepaths)
{
// In this test, check that the SequentialCompressionWriter creates relative filepaths correctly
// Check both the first path, which is created in init_metadata,
// and subsequent paths, which are created in the splitting logic
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";
rosbag2_compression::CompressionOptions compression_options {
"zstd",
rosbag2_compression::CompressionMode::FILE,
kDefaultCompressionQueueSize,
kDefaultCompressionQueueThreads
};

initializeFakeFileStorage();
initializeWriter(compression_options);

tmp_dir_storage_options_.max_bagfile_size = 1;
writer_->open(tmp_dir_storage_options_);
writer_->create_topic({test_topic_name, test_topic_type, "", ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = test_topic_name;

writer_->write(message);
// bag size == max_bafile_size, no split yet
writer_->write(message);
// bag size > max_bagfile_size, split
writer_->write(message);
writer_.reset();

EXPECT_EQ(
intercepted_metadata_.relative_file_paths.size(), 2u);

const auto base_path = tmp_dir_storage_options_.uri;
int counter = 0;
for (const auto & path : intercepted_metadata_.relative_file_paths) {
std::stringstream ss;
ss << bag_name_ << "_" << counter << ".zstd";
counter++;
EXPECT_EQ(ss.str(), path);
}
}

0 comments on commit c4912cd

Please sign in to comment.