Skip to content

Commit

Permalink
Implement snapshot mechanism and corresponding ROS Service (ros2#850)
Browse files Browse the repository at this point in the history
* Add snapshot service to recorder node
* Simplify and clarify double buffering patterns

Co-authored-by: Cameron Miller <cammlle@amazon.com>
Co-authored-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
3 people authored and 兰陈昕 committed Nov 18, 2022
1 parent 1bf6ee4 commit 031e824
Show file tree
Hide file tree
Showing 24 changed files with 481 additions and 193 deletions.
3 changes: 0 additions & 3 deletions rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer
/// shut down the consumer thread
void close();

/// Set new consume callback, restart thread if necessary
void change_consume_callback(consume_callback_function_t callback);

private:
std::shared_ptr<MessageCacheInterface> message_cache_;
consume_callback_function_t consume_callback_;
Expand Down
27 changes: 17 additions & 10 deletions rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ namespace rosbag2_cpp
namespace cache
{

/// Provides a "deferred-consumption" implementation of the MessageCacheInterface.
/// When a consumer asks for a buffer, it will not receive a new buffer until some control
/// source calls `swap_buffers` manually.
/// This is useful for a snapshot mode, where no data is written to disk until asked for,
/// then the full circular buffer is dumped all at once, giving historical context.
class ROSBAG2_CPP_PUBLIC CircularMessageCache
: public MessageCacheInterface
{
Expand All @@ -48,22 +53,24 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache
/// Puts msg into circular buffer, replacing the oldest msg when buffer is full
void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) override;

/// get current buffer to consume
/// Get current buffer to consume.
/// Locks consumer_buffer and swap_buffers until release_consumer_buffer is called.
/// This may be repeatedly empty if `swap_buffers` has not been called.
std::shared_ptr<CacheBufferInterface> consumer_buffer() override;

/// Unlock access to the consumer buffer.
void release_consumer_buffer() override;

/// Swap the primary and secondary buffer before consumption.
/// NOTE: consumer_buffer() should be called sequentially after
/// swap_buffer() to ensure expected behavior. Calling swap_buffer()
/// while accessing consumer_buffer() will be undefined behavior.
/// NOTE: If swap_buffers is called again before consuming via consumer_buffer,
/// that data will be cleared for use by the producer.
void swap_buffers() override;

private:
/// Double buffers
std::shared_ptr<MessageCacheCircularBuffer> primary_buffer_;
std::shared_ptr<MessageCacheCircularBuffer> secondary_buffer_;

/// Double buffers sync
std::mutex cache_mutex_;
std::shared_ptr<MessageCacheCircularBuffer> producer_buffer_;
std::mutex producer_buffer_mutex_;
std::shared_ptr<MessageCacheCircularBuffer> consumer_buffer_;
std::mutex consumer_buffer_mutex_;
};

} // namespace cache
Expand Down
40 changes: 24 additions & 16 deletions rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ namespace cache
* Double buffering is a part of producer-consumer pattern and optimizes for
* the consumer performance (which can be a bottleneck, e.g. disk writes).
*
* This is a "greedy consumer" implementation - every time the consumer asks
* for a buffer to consume, the buffers are swapped so that the latest data
* goes to the consumer right away.
*
* Two instances of MessageCacheBuffer are used, one for producer and one for
* the consumer. Buffers are switched through swap_buffers function, which
* involves synchronization and a simple pointer switch.
Expand All @@ -72,14 +76,12 @@ class ROSBAG2_CPP_PUBLIC MessageCache
/// Puts msg into primary buffer. With full cache, msg is ignored and counted as lost
void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) override;

/// Summarize dropped/remaining messages
void log_dropped() override;

/// Set the cache to consume-only mode for final buffer flush before closing
void finalize() override;
/// Gets a consumer buffer.
/// In this greedy implementation, swap buffers before providing the buffer.
std::shared_ptr<CacheBufferInterface> consumer_buffer() override;

/// Notify that flushing is complete
void notify_flushing_done() override;
/// Notify that consumer_buffer has been fully used. Unlock.
void release_consumer_buffer() override;

/**
* Consumer API: wait until primary buffer is ready and swap it with consumer buffer.
Expand All @@ -90,27 +92,33 @@ class ROSBAG2_CPP_PUBLIC MessageCache
**/
void swap_buffers() override;

/// Consumer API: get current buffer to consume
std::shared_ptr<CacheBufferInterface> consumer_buffer() override;
/// Set the cache to consume-only mode for final buffer flush before closing
void begin_flushing() override;

/// Exposes counts of messages dropped per topic
std::unordered_map<std::string, uint32_t> messages_dropped() const;
/// Notify that flushing is complete
void done_flushing() override;

/// Summarize dropped/remaining messages
void log_dropped() override;

protected:
/// Dropped messages per topic. Used for printing in alphabetic order
std::unordered_map<std::string, uint32_t> messages_dropped_per_topic_;

private:
/// Producer API: notify consumer to wake-up (primary buffer has data)
void notify_buffer_consumer();

/// Double buffers
std::shared_ptr<MessageCacheBuffer> primary_buffer_;
std::shared_ptr<MessageCacheBuffer> secondary_buffer_;
std::shared_ptr<MessageCacheBuffer> producer_buffer_;
std::mutex producer_buffer_mutex_;
std::shared_ptr<MessageCacheBuffer> consumer_buffer_;
std::recursive_mutex consumer_buffer_mutex_;

/// Dropped messages per topic. Used for printing in alphabetic order
std::unordered_map<std::string, uint32_t> messages_dropped_per_topic_;

/// Double buffers sync (following cpp core guidelines for condition variables)
bool primary_buffer_can_be_swapped_ {false};
std::condition_variable cache_condition_var_;
std::mutex cache_mutex_;

/// Cache is no longer accepting messages and is in the process of flushing
std::atomic_bool flushing_ {false};
Expand Down
45 changes: 18 additions & 27 deletions rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,41 +40,32 @@ class ROSBAG2_CPP_PUBLIC MessageCacheInterface
public:
virtual ~MessageCacheInterface() {}

/**
* Push a pointer to a bag message into the primary buffer.
*/
/// Push a bag message into the producer buffer.
virtual void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) = 0;

/**
* Get a pointer to the buffer that can be used for consuming the
* cached messages. In a double buffer implementation, this should
* always be called after swap_buffers().
*
* \return a pointer to the consumer buffer interface.
*/
/// Get a pointer to the buffer that can be used for consuming the cached messages.
/// This call locks access to the buffer, `swap_buffers` and `consumer_buffer` will block until
/// `release_consumer_buffer` is called to unlock access to the buffer.
/// Consumer should call `release_consumer_buffer` when they are done consuming the messages.
/// \return a pointer to the consumer buffer interface.
virtual std::shared_ptr<CacheBufferInterface> consumer_buffer() = 0;

/**
* Swap primary and secondary buffers when using a double buffer
* configuration.
*/
/// Signals that tne consumer is done consuming, unlocking the buffer so it may be swapped.
virtual void release_consumer_buffer() = 0;

/// Swap producer and consumer buffers.
/// Note: this will block if `consumer_buffer` has been called but `release_consumer_buffer`
/// has not been called yet to signal end of consuming.
virtual void swap_buffers() = 0;

/**
* Mark message cache as having finished flushing
* remaining messages to the bagfile.
*/
virtual void notify_flushing_done() {}
/// Go into a read-only state to drain the final consumer buffer without letting in new data.
virtual void begin_flushing() {}

/**
* Print a log message with details of any dropped messages.
*/
virtual void log_dropped() {}
/// Call after `begin_flushing`, once the final consumer buffer is emptied.
virtual void done_flushing() {}

/**
* Perform any cleanup or final tasks before exitting.
*/
virtual void finalize() {}
/// Print a log message with details of any dropped messages.
virtual void log_dropped() {}
};

} // namespace cache
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ class ROSBAG2_CPP_PUBLIC Writer final
*/
void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type);

/**
* Trigger a snapshot when snapshot mode is enabled.
* \returns true if snapshot is successful, false if snapshot fails or is not supported
*/
bool take_snapshot();

/**
* Remove a new topic in the underlying storage.
* If creation of subscription fails remove the topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface
virtual void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) = 0;

virtual void write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) = 0;

/**
* Triggers a snapshot for writers that support it.
* \returns true if snapshot is successful, false if snapshot fails or is not supported
*/
virtual bool take_snapshot() = 0;
};

} // namespace writer_interfaces
Expand Down
12 changes: 12 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <vector>

#include "rosbag2_cpp/cache/cache_consumer.hpp"
#include "rosbag2_cpp/cache/circular_message_cache.hpp"
#include "rosbag2_cpp/cache/message_cache.hpp"
#include "rosbag2_cpp/cache/message_cache_interface.hpp"
#include "rosbag2_cpp/converter.hpp"
Expand Down Expand Up @@ -107,6 +108,12 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
*/
void write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) override;

/**
* Take a snapshot by triggering a circular buffer flip, writing data to disk.
* *\returns true if snapshot is successful
*/
bool take_snapshot() override;

protected:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
Expand Down Expand Up @@ -154,6 +161,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
virtual std::shared_ptr<rosbag2_storage::SerializedBagMessage>
get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message);

private:
/// Helper method to write messages while also updating tracked metadata.
void write_messages(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages);
};

} // namespace writers
Expand Down
24 changes: 6 additions & 18 deletions rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ CacheConsumer::~CacheConsumer()

void CacheConsumer::close()
{
message_cache_->finalize();
message_cache_->begin_flushing();
is_stop_issued_ = true;

ROSBAG2_CPP_LOG_INFO_STREAM(
Expand All @@ -47,36 +47,24 @@ void CacheConsumer::close()
if (consumer_thread_.joinable()) {
consumer_thread_.join();
}
message_cache_->notify_flushing_done();
}

void CacheConsumer::change_consume_callback(
CacheConsumer::consume_callback_function_t consume_callback)
{
consume_callback_ = consume_callback;
if (!consumer_thread_.joinable()) {
is_stop_issued_ = false;
consumer_thread_ = std::thread(&CacheConsumer::exec_consuming, this);
}
message_cache_->done_flushing();
}

void CacheConsumer::exec_consuming()
{
bool exit_flag = false;
bool flushing = false;
while (!exit_flag) {
// Invariant at loop start: consumer buffer is empty

// swap producer buffer with consumer buffer
message_cache_->swap_buffers();

// make sure to use consistent callback for each iteration
auto callback_for_this_loop = consume_callback_;

// consume all the data from consumer buffer
// Get the current consumer buffer.
// Depending on the cache implementation, this may swap buffers now, or could
// provide an empty buffer if swapping is handled via other conditions.
auto consumer_buffer = message_cache_->consumer_buffer();
callback_for_this_loop(consumer_buffer->data());
consumer_buffer->clear();
message_cache_->release_consumer_buffer();

if (flushing) {exit_flag = true;} // this was the final run
if (is_stop_issued_) {flushing = true;} // run one final time to flush
Expand Down
23 changes: 15 additions & 8 deletions rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,33 @@ namespace cache

CircularMessageCache::CircularMessageCache(size_t max_buffer_size)
{
primary_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
secondary_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
producer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
consumer_buffer_ = std::make_shared<MessageCacheCircularBuffer>(max_buffer_size);
}

void CircularMessageCache::push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg)
{
std::lock_guard<std::mutex> cache_lock(cache_mutex_);
primary_buffer_->push(msg);
std::lock_guard<std::mutex> cache_lock(producer_buffer_mutex_);
producer_buffer_->push(msg);
}

std::shared_ptr<CacheBufferInterface> CircularMessageCache::consumer_buffer()
{
return secondary_buffer_;
consumer_buffer_mutex_.lock();
return consumer_buffer_;
}

void CircularMessageCache::release_consumer_buffer()
{
consumer_buffer_mutex_.unlock();
}

void CircularMessageCache::swap_buffers()
{
std::lock_guard<std::mutex> cache_lock(cache_mutex_);
secondary_buffer_->clear();
std::swap(primary_buffer_, secondary_buffer_);
std::lock_guard<std::mutex> producer_lock(producer_buffer_mutex_);
std::lock_guard<std::mutex> consumer_lock(consumer_buffer_mutex_);
consumer_buffer_->clear();
std::swap(producer_buffer_, consumer_buffer_);
}

} // namespace cache
Expand Down
Loading

0 comments on commit 031e824

Please sign in to comment.