diff --git a/rosbag2_compression/test/rosbag2_compression/mock_storage.hpp b/rosbag2_compression/test/rosbag2_compression/mock_storage.hpp index 39df735d41..7b72649597 100644 --- a/rosbag2_compression/test/rosbag2_compression/mock_storage.hpp +++ b/rosbag2_compression/test/rosbag2_compression/mock_storage.hpp @@ -45,6 +45,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa MOCK_METHOD0(get_metadata, rosbag2_storage::BagMetadata()); MOCK_METHOD0(reset_filter, void()); MOCK_METHOD1(set_filter, void(const rosbag2_storage::StorageFilter &)); + MOCK_METHOD1(seek, void(const rcutils_time_point_value_t &)); MOCK_CONST_METHOD0(get_bagfile_size, uint64_t()); MOCK_CONST_METHOD0(get_relative_file_path, std::string()); MOCK_CONST_METHOD0(get_storage_identifier, std::string()); diff --git a/rosbag2_cpp/include/rosbag2_cpp/reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/reader.hpp index bc787e63ae..13373ac9b0 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/reader.hpp @@ -169,6 +169,11 @@ class ROSBAG2_CPP_PUBLIC Reader final */ void reset_filter(); + /** + * Skip to a specific timestamp for reading. + */ + void seek(const rcutils_time_point_value_t & timestamp); + reader_interfaces::BaseReaderInterface & get_implementation_handle() const { return *reader_impl_; diff --git a/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp index 951f73fafe..d330cd3dd9 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp @@ -18,6 +18,7 @@ #include #include +#include "rcutils/types.h" #include "rosbag2_cpp/converter_options.hpp" #include "rosbag2_cpp/visibility_control.hpp" @@ -54,6 +55,8 @@ class ROSBAG2_CPP_PUBLIC BaseReaderInterface virtual void set_filter(const rosbag2_storage::StorageFilter & storage_filter) = 0; virtual void reset_filter() = 0; + + virtual void seek(const rcutils_time_point_value_t & timestamp) = 0; }; } // namespace reader_interfaces diff --git a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp index 9266d6ec8b..e1ffb2a3ef 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp @@ -76,6 +76,12 @@ class ROSBAG2_CPP_PUBLIC SequentialReader void reset_filter() override; + /** + * seek(t) will cause subsequent reads to return messages that satisfy + * timestamp >= time t. + */ + void seek(const rcutils_time_point_value_t & timestamp) override; + /** * Ask whether there is another database file to read from the list of relative * file paths. @@ -95,9 +101,15 @@ class ROSBAG2_CPP_PUBLIC SequentialReader virtual std::string get_current_uri() const; protected: + /** + * Opens the current file and sets up the filters in the new storage. + * + */ + virtual void load_current_file(); + /** * Increment the current file iterator to point to the next file in the list of relative file - * paths. + * paths, and opens the next file by calling open_current_file() * * Expected usage: * if (has_next_file()) load_next_file(); @@ -144,6 +156,7 @@ class ROSBAG2_CPP_PUBLIC SequentialReader std::unique_ptr converter_{}; std::unique_ptr metadata_io_{}; rosbag2_storage::BagMetadata metadata_{}; + rcutils_time_point_value_t seek_time_ = 0; rosbag2_storage::StorageFilter topics_filter_{}; std::vector topics_metadata_{}; std::vector file_paths_{}; // List of database files. diff --git a/rosbag2_cpp/src/rosbag2_cpp/reader.cpp b/rosbag2_cpp/src/rosbag2_cpp/reader.cpp index 2b7997209e..5acb51440c 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/reader.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/reader.cpp @@ -87,4 +87,9 @@ void Reader::reset_filter() reader_impl_->reset_filter(); } +void Reader::seek(const rcutils_time_point_value_t & timestamp) +{ + reader_impl_->seek(timestamp); +} + } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp index 050f0b3161..ba70aa40e2 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp @@ -142,10 +142,6 @@ bool SequentialReader::has_next() bool current_storage_has_next = storage_->has_next(); if (!current_storage_has_next && has_next_file()) { load_next_file(); - storage_options_.uri = get_current_file(); - - storage_ = storage_factory_->open_read_only(storage_options_); - storage_->set_filter(topics_filter_); // recursively call has_next again after rollover return has_next(); } @@ -154,13 +150,18 @@ bool SequentialReader::has_next() throw std::runtime_error("Bag is not open. Call open() before reading."); } +// Note: if files in the bag are not time +// normalized, it's possible to read messages that have a timestamp +// before the timestamp of the last read upon a file roll-over. std::shared_ptr SequentialReader::read_next() { if (storage_) { // performs rollover if necessary - has_next(); - auto message = storage_->read_next(); - return converter_ ? converter_->convert(message) : message; + if (has_next()) { + auto message = storage_->read_next(); + return converter_ ? converter_->convert(message) : message; + } + throw std::runtime_error("Bag is at end. No next message."); } throw std::runtime_error("Bag is not open. Call open() before reading."); } @@ -191,13 +192,20 @@ void SequentialReader::set_filter( void SequentialReader::reset_filter() { - topics_filter_ = rosbag2_storage::StorageFilter(); + set_filter(rosbag2_storage::StorageFilter()); +} + +void SequentialReader::seek(const rcutils_time_point_value_t & timestamp) +{ + seek_time_ = timestamp; if (storage_) { - storage_->reset_filter(); + // reset to the first file + current_file_iterator_ = file_paths_.begin(); + load_current_file(); return; } throw std::runtime_error( - "Bag is not open. Call open() before resetting filter."); + "Bag is not open. Call open() before seeking time."); } bool SequentialReader::has_next_file() const @@ -205,11 +213,21 @@ bool SequentialReader::has_next_file() const return current_file_iterator_ + 1 != file_paths_.end(); } +void SequentialReader::load_current_file() +{ + preprocess_current_file(); + storage_options_.uri = get_current_file(); + // open and set filters + storage_ = storage_factory_->open_read_only(storage_options_); + storage_->seek(seek_time_); + set_filter(topics_filter_); +} + void SequentialReader::load_next_file() { assert(current_file_iterator_ != file_paths_.end()); current_file_iterator_++; - preprocess_current_file(); + load_current_file(); } std::string SequentialReader::get_current_file() const diff --git a/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp b/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp index 303ddf9907..5cbfdfcc34 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp +++ b/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp @@ -46,6 +46,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa MOCK_METHOD0(get_metadata, rosbag2_storage::BagMetadata()); MOCK_METHOD0(reset_filter, void()); MOCK_METHOD1(set_filter, void(const rosbag2_storage::StorageFilter &)); + MOCK_METHOD1(seek, void(const rcutils_time_point_value_t &)); MOCK_CONST_METHOD0(get_bagfile_size, uint64_t()); MOCK_CONST_METHOD0(get_relative_file_path, std::string()); MOCK_CONST_METHOD0(get_storage_identifier, std::string()); diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp index af60a266d7..ec9c5a9249 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp @@ -222,3 +222,14 @@ TEST_F(MultifileReaderTest, get_all_topics_and_types_returns_from_io_metadata) const auto all_topics_and_types = reader_->get_all_topics_and_types(); EXPECT_FALSE(all_topics_and_types.empty()); } + +TEST_F(MultifileReaderTest, seek_bag) +{ + init(); + reader_->open(default_storage_options_, {"", storage_serialization_format_}); + EXPECT_CALL(*storage_, has_next()).Times(3).WillRepeatedly(Return(false)); + EXPECT_CALL(*storage_, seek(_)).Times(3); + EXPECT_CALL(*storage_, set_filter(_)).Times(3); + reader_->seek(9999999999999); + reader_->has_next(); +} diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp index 95bd16e78c..423eb61de7 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp @@ -68,6 +68,7 @@ class SequentialReaderTest : public Test EXPECT_CALL(*storage_, get_all_topics_and_types()) .Times(AtMost(1)).WillRepeatedly(Return(topics_and_types)); + EXPECT_CALL(*storage_, has_next()).WillRepeatedly(Return(true)); EXPECT_CALL(*storage_, read_next()).WillRepeatedly(Return(message)); EXPECT_CALL(*storage_factory, open_read_only(_)); @@ -138,7 +139,7 @@ TEST_F(SequentialReaderTest, set_filter_calls_storage) { EXPECT_ANY_THROW(reader_->get_implementation_handle().set_filter(storage_filter)); EXPECT_ANY_THROW(reader_->get_implementation_handle().reset_filter()); - EXPECT_CALL(*storage_, set_filter(_)).Times(2); + EXPECT_CALL(*storage_, set_filter(_)).Times(3); reader_->open(default_storage_options_, {"", storage_serialization_format_}); reader_->get_implementation_handle().set_filter(storage_filter); reader_->read_next(); @@ -146,8 +147,6 @@ TEST_F(SequentialReaderTest, set_filter_calls_storage) { storage_filter.topics.push_back("topic2"); reader_->get_implementation_handle().set_filter(storage_filter); reader_->read_next(); - - EXPECT_CALL(*storage_, reset_filter()).Times(1); reader_->get_implementation_handle().reset_filter(); reader_->read_next(); } diff --git a/rosbag2_py/CMakeLists.txt b/rosbag2_py/CMakeLists.txt index a5284cad2d..7646b8ea75 100644 --- a/rosbag2_py/CMakeLists.txt +++ b/rosbag2_py/CMakeLists.txt @@ -160,6 +160,12 @@ if(BUILD_TESTING) PYTHONPATH=${CMAKE_CURRENT_SOURCE_DIR} ${other_environment_vars} ) + ament_add_pytest_test(test_sequential_reader_multiple_files_py "test/test_sequential_reader_multiple_files.py" + PYTHON_EXECUTABLE "${_PYTHON_EXECUTABLE}" + APPEND_ENV + PYTHONPATH=${CMAKE_CURRENT_SOURCE_DIR} + ${other_environment_vars} + ) ament_add_pytest_test(test_sequential_writer_py "test/test_sequential_writer.py" PYTHON_EXECUTABLE "${_PYTHON_EXECUTABLE}" diff --git a/rosbag2_py/resources/wbag/metadata.yaml b/rosbag2_py/resources/wbag/metadata.yaml new file mode 100644 index 0000000000..a93a83a219 --- /dev/null +++ b/rosbag2_py/resources/wbag/metadata.yaml @@ -0,0 +1,65 @@ +rosbag2_bagfile_information: + version: 4 + storage_identifier: sqlite3 + relative_file_paths: + - wbag_0.db3 + - wbag_1.db3 + - wbag_2.db3 + - wbag_3.db3 + - wbag_4.db3 + duration: + nanoseconds: 411 + starting_time: + nanoseconds_since_epoch: 2623 + message_count: 6074 + topics_with_message_count: + - topic_metadata: + name: HHH + type: std_msgs/msg/String + serialization_format: cdr + offered_qos_profiles: "" + message_count: 726 + - topic_metadata: + name: GGG + type: std_msgs/msg/String + serialization_format: cdr + offered_qos_profiles: "" + message_count: 731 + - topic_metadata: + name: FFF + type: std_msgs/msg/String + serialization_format: cdr + offered_qos_profiles: "" + message_count: 772 + - topic_metadata: + name: EEE + type: std_msgs/msg/String + serialization_format: cdr + offered_qos_profiles: "" + message_count: 804 + - topic_metadata: + name: CCC + type: std_msgs/msg/String + serialization_format: cdr + offered_qos_profiles: "" + message_count: 742 + - topic_metadata: + name: DDD + type: std_msgs/msg/String + serialization_format: cdr + offered_qos_profiles: "" + message_count: 753 + - topic_metadata: + name: BBB + type: std_msgs/msg/String + serialization_format: cdr + offered_qos_profiles: "" + message_count: 742 + - topic_metadata: + name: AAA + type: std_msgs/msg/String + serialization_format: cdr + offered_qos_profiles: "" + message_count: 804 + compression_format: "" + compression_mode: "" \ No newline at end of file diff --git a/rosbag2_py/resources/wbag/wbag_0.db3 b/rosbag2_py/resources/wbag/wbag_0.db3 new file mode 100644 index 0000000000..f05324d949 Binary files /dev/null and b/rosbag2_py/resources/wbag/wbag_0.db3 differ diff --git a/rosbag2_py/resources/wbag/wbag_1.db3 b/rosbag2_py/resources/wbag/wbag_1.db3 new file mode 100644 index 0000000000..1c53ce23b6 Binary files /dev/null and b/rosbag2_py/resources/wbag/wbag_1.db3 differ diff --git a/rosbag2_py/resources/wbag/wbag_2.db3 b/rosbag2_py/resources/wbag/wbag_2.db3 new file mode 100644 index 0000000000..65c01acdd3 Binary files /dev/null and b/rosbag2_py/resources/wbag/wbag_2.db3 differ diff --git a/rosbag2_py/resources/wbag/wbag_3.db3 b/rosbag2_py/resources/wbag/wbag_3.db3 new file mode 100644 index 0000000000..e2bb437ec7 Binary files /dev/null and b/rosbag2_py/resources/wbag/wbag_3.db3 differ diff --git a/rosbag2_py/resources/wbag/wbag_4.db3 b/rosbag2_py/resources/wbag/wbag_4.db3 new file mode 100644 index 0000000000..dbc9cf6b29 Binary files /dev/null and b/rosbag2_py/resources/wbag/wbag_4.db3 differ diff --git a/rosbag2_py/src/rosbag2_py/_reader.cpp b/rosbag2_py/src/rosbag2_py/_reader.cpp index acacfe7df7..c8f8445e06 100644 --- a/rosbag2_py/src/rosbag2_py/_reader.cpp +++ b/rosbag2_py/src/rosbag2_py/_reader.cpp @@ -83,6 +83,11 @@ class Reader reader_->reset_filter(); } + void seek(const rcutils_time_point_value_t & timestamp) + { + reader_->seek(timestamp); + } + protected: std::unique_ptr reader_; }; @@ -116,7 +121,8 @@ PYBIND11_MODULE(_reader, m) { "get_all_topics_and_types", &rosbag2_py::Reader::get_all_topics_and_types) .def("set_filter", &rosbag2_py::Reader::set_filter) - .def("reset_filter", &rosbag2_py::Reader::reset_filter); + .def("reset_filter", &rosbag2_py::Reader::reset_filter) + .def("seek", &rosbag2_py::Reader::seek); pybind11::class_>( m, "SequentialCompressionReader") @@ -135,7 +141,10 @@ PYBIND11_MODULE(_reader, m) { &rosbag2_py::Reader::set_filter) .def( "reset_filter", - &rosbag2_py::Reader::reset_filter); + &rosbag2_py::Reader::reset_filter) + .def( + "seek", + &rosbag2_py::Reader::seek); m.def( "get_registered_readers", diff --git a/rosbag2_py/test/test_sequential_reader.py b/rosbag2_py/test/test_sequential_reader.py index fb545c9f5f..2caa0416a7 100644 --- a/rosbag2_py/test/test_sequential_reader.py +++ b/rosbag2_py/test/test_sequential_reader.py @@ -80,6 +80,68 @@ def test_sequential_reader(): msg_counter += 1 +def test_sequential_reader_seek(): + bag_path = str(Path(__file__).parent.parent / 'resources' / 'talker') + storage_options, converter_options = get_rosbag_options(bag_path) + + reader = rosbag2_py.SequentialReader() + reader.open(storage_options, converter_options) + + topic_types = reader.get_all_topics_and_types() + + # Create a map for quicker lookup + type_map = {topic_types[i].name: topic_types[i].type for i in range(len(topic_types))} + + # Seek No Filter + reader = rosbag2_py.SequentialReader() + reader.open(storage_options, converter_options) + reader.seek(1585866237113147888) + + msg_counter = 5 + + (topic, data, t) = reader.read_next() + msg_type = get_message(type_map[topic]) + msg = deserialize_message(data, msg_type) + + assert isinstance(msg, Log) + + (topic, data, t) = reader.read_next() + msg_type = get_message(type_map[topic]) + msg = deserialize_message(data, msg_type) + + isinstance(msg, String) + assert msg.data == f'Hello, world! {msg_counter}' + msg_counter += 1 + + # Set Filter will continue + storage_filter = rosbag2_py.StorageFilter(topics=['/topic']) + reader.set_filter(storage_filter) + + (topic, data, t) = reader.read_next() + msg_type = get_message(type_map[topic]) + msg = deserialize_message(data, msg_type) + isinstance(msg, String) + assert msg.data == f'Hello, world! {msg_counter}' + + # Seek will keep filter + reader.seek(1585866239113147888) + + msg_counter = 8 + + (topic, data, t) = reader.read_next() + msg_type = get_message(type_map[topic]) + msg = deserialize_message(data, msg_type) + isinstance(msg, String) + assert msg.data == f'Hello, world! {msg_counter}' + msg_counter += 1 + + (topic, data, t) = reader.read_next() + msg_type = get_message(type_map[topic]) + msg = deserialize_message(data, msg_type) + isinstance(msg, String) + assert msg.data == f'Hello, world! {msg_counter}' + + def test_plugin_list(): reader_plugins = rosbag2_py.get_registered_readers() assert 'my_read_only_test_plugin' in reader_plugins diff --git a/rosbag2_py/test/test_sequential_reader_multiple_files.py b/rosbag2_py/test/test_sequential_reader_multiple_files.py new file mode 100644 index 0000000000..f75a0da6e2 --- /dev/null +++ b/rosbag2_py/test/test_sequential_reader_multiple_files.py @@ -0,0 +1,140 @@ +# Copyright 2020 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from pathlib import Path +import sys + + +if os.environ.get('ROSBAG2_PY_TEST_WITH_RTLD_GLOBAL', None) is not None: + # This is needed on Linux when compiling with clang/libc++. + # TL;DR This makes class_loader work when using a python extension compiled with libc++. + # + # For the fun RTTI ABI details, see https://whatofhow.wordpress.com/2015/03/17/odr-rtti-dso/. + sys.setdlopenflags(os.RTLD_GLOBAL | os.RTLD_LAZY) + +from common import get_rosbag_options # noqa +import rosbag2_py # noqa + + +def test_reset_filter(): + bag_path = str(Path(__file__).parent.parent / 'resources' / 'wbag') + storage_options, converter_options = get_rosbag_options(bag_path) + + reader = rosbag2_py.SequentialReader() + reader.open(storage_options, converter_options) + + # Set filter for topic of string type + storage_filter = rosbag2_py.StorageFilter(topics=['AAA', 'CCC', 'DDD']) + reader.set_filter(storage_filter) + + (topic, data, t) = reader.read_next() + + assert topic == 'AAA' + assert t == 1001 + + (topic, data, t) = reader.read_next() + + assert topic == 'CCC' + assert t == 1002 + + (topic, data, t) = reader.read_next() + + assert topic == 'AAA' + assert t == 1004 + + # No filter and bag continues same location + reader.reset_filter() + + (topic, data, t) = reader.read_next() + + assert topic == 'FFF' + assert t == 1004 + + (topic, data, t) = reader.read_next() + + assert topic == 'BBB' + assert t == 1004 + + (topic, data, t) = reader.read_next() + + assert topic == 'EEE' + assert t == 1005 + + +def test_seek_forward(): + bag_path = str(Path(__file__).parent.parent / 'resources' / 'wbag') + storage_options, converter_options = get_rosbag_options(bag_path) + + reader = rosbag2_py.SequentialReader() + reader.open(storage_options, converter_options) + + # seek forward + reader.seek(1822) + + (topic, data, t) = reader.read_next() + + assert topic == 'CCC' + assert t == 1822 + + # set filter continues in same location + storage_filter = rosbag2_py.StorageFilter(topics=['BBB', 'GGG']) + reader.set_filter(storage_filter) + + (topic, data, t) = reader.read_next() + + assert topic == 'GGG' + assert t == 1822 + + (topic, data, t) = reader.read_next() + + assert topic == 'GGG' + assert t == 1822 + + (topic, data, t) = reader.read_next() + + assert topic == 'BBB' + assert t == 1826 + + +def test_seek_backward(): + bag_path = str(Path(__file__).parent.parent / 'resources' / 'wbag') + storage_options, converter_options = get_rosbag_options(bag_path) + + reader = rosbag2_py.SequentialReader() + reader.open(storage_options, converter_options) + + # seek forward first + reader.seek(1822) + storage_filter = rosbag2_py.StorageFilter(topics=['BBB', 'GGG']) + reader.set_filter(storage_filter) + (topic, data, t) = reader.read_next() + + # seek backwards & filter preserved + reader.seek(1408) + + (topic, data, t) = reader.read_next() + + assert topic == 'BBB' + assert t == 1408 + + (topic, data, t) = reader.read_next() + + assert topic == 'GGG' + assert t == 1408 + + (topic, data, t) = reader.read_next() + + assert topic == 'BBB' + assert t == 1413 diff --git a/rosbag2_storage/include/rosbag2_storage/storage_interfaces/read_only_interface.hpp b/rosbag2_storage/include/rosbag2_storage/storage_interfaces/read_only_interface.hpp index 3d5e9edd85..7aece26e8a 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_interfaces/read_only_interface.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_interfaces/read_only_interface.hpp @@ -17,6 +17,8 @@ #include +#include "rcutils/types.h" + #include "rosbag2_storage/storage_filter.hpp" #include "rosbag2_storage/storage_interfaces/base_info_interface.hpp" #include "rosbag2_storage/storage_interfaces/base_io_interface.hpp" @@ -43,8 +45,18 @@ class ROSBAG2_STORAGE_PUBLIC ReadOnlyInterface std::string get_storage_identifier() const override = 0; + /** + Sets filters on messages. This occurs in place, meaning that messages satisfying + the filter that were already read before applying the filter will not be re-read + by read_next() unless seek(t) is also called to an earlier timestamp t. + */ virtual void set_filter(const StorageFilter & storage_filter) = 0; + /** + Removes any previously set storage filter. This occurs in place, meaning that + after a reset, read_next() will not return either previously read or unread + messages that occur before the timestamp of the last-read message. + */ virtual void reset_filter() = 0; static std::string get_package_name() @@ -55,6 +67,24 @@ class ROSBAG2_STORAGE_PUBLIC ReadOnlyInterface { return StorageTraits::name; } + + /** + Seeks to a given timestamp. Running read_next() after seek(t) + will return a message that is equal to or after time t. Running read_next() + repeatedly until the end of the storage should return all messages equal to + or after time t. + + If a filter has been previously set, it will persist, meaning + that the next message returned will need to both satisfy the filter, + and satisfy the seek time requirement. + + seek(t) can jump forward or backward in time. If t is earlier than the + first message message timestamp that satisfies the storage filter, then read_next() + will return that first message. If t is later than the last message timestamp, then + read_next() will behave as if the end of the file has been reached, and has_next() + will return false. + */ + virtual void seek(const rcutils_time_point_value_t & timestamp) = 0; }; } // namespace storage_interfaces diff --git a/rosbag2_storage/include/rosbag2_storage/storage_interfaces/read_write_interface.hpp b/rosbag2_storage/include/rosbag2_storage/storage_interfaces/read_write_interface.hpp index 3778d490b1..c435391c36 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_interfaces/read_write_interface.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_interfaces/read_write_interface.hpp @@ -56,6 +56,8 @@ class ROSBAG2_STORAGE_PUBLIC ReadWriteInterface { return StorageTraits::name; } + + void seek(const rcutils_time_point_value_t & timestamp) override = 0; }; } // namespace storage_interfaces diff --git a/rosbag2_storage/test/rosbag2_storage/test_plugin.cpp b/rosbag2_storage/test/rosbag2_storage/test_plugin.cpp index 60ab7c7870..ead567f836 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_plugin.cpp +++ b/rosbag2_storage/test/rosbag2_storage/test_plugin.cpp @@ -124,4 +124,9 @@ void TestPlugin::reset_filter() std::cout << "\nresetting storage filter\n"; } +void TestPlugin::seek(const rcutils_time_point_value_t & /*timestamp*/) +{ + std::cout << "\nseeking\n"; +} + PLUGINLIB_EXPORT_CLASS(TestPlugin, rosbag2_storage::storage_interfaces::ReadWriteInterface) diff --git a/rosbag2_storage/test/rosbag2_storage/test_plugin.hpp b/rosbag2_storage/test/rosbag2_storage/test_plugin.hpp index 3322b35d5c..bd547876d1 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_plugin.hpp +++ b/rosbag2_storage/test/rosbag2_storage/test_plugin.hpp @@ -61,6 +61,8 @@ class TestPlugin : public rosbag2_storage::storage_interfaces::ReadWriteInterfac void set_filter(const rosbag2_storage::StorageFilter & storage_filter) override; void reset_filter() override; + + void seek(const rcutils_time_point_value_t & timestamp) override; }; #endif // ROSBAG2_STORAGE__TEST_PLUGIN_HPP_ diff --git a/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.cpp b/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.cpp index ae0488ca95..a88e11baa6 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.cpp +++ b/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.cpp @@ -92,4 +92,9 @@ void TestReadOnlyPlugin::reset_filter() std::cout << "\nresetting storage filter\n"; } +void TestReadOnlyPlugin::seek(const rcutils_time_point_value_t & /*timestamp*/) +{ + std::cout << "\nseeking\n"; +} + PLUGINLIB_EXPORT_CLASS(TestReadOnlyPlugin, rosbag2_storage::storage_interfaces::ReadOnlyInterface) diff --git a/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.hpp b/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.hpp index 49caaa993d..3a3dc34f8a 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.hpp +++ b/rosbag2_storage/test/rosbag2_storage/test_read_only_plugin.hpp @@ -48,6 +48,8 @@ class TestReadOnlyPlugin : public rosbag2_storage::storage_interfaces::ReadOnlyI void set_filter(const rosbag2_storage::StorageFilter & storage_filter) override; void reset_filter() override; + + void seek(const rcutils_time_point_value_t & timestamp) override; }; #endif // ROSBAG2_STORAGE__TEST_READ_ONLY_PLUGIN_HPP_ diff --git a/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp b/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp index ff22ec24cc..d7ca75da9e 100644 --- a/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp +++ b/rosbag2_storage_default_plugins/include/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp @@ -23,7 +23,6 @@ #include #include "rcpputils/thread_safety_annotations.hpp" -#include "rcutils/types.h" #include "rosbag2_storage/storage_interfaces/read_write_interface.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_storage/storage_filter.hpp" @@ -85,6 +84,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage void reset_filter() override; + void seek(const rcutils_time_point_value_t & timestamp) override; + std::string get_storage_setting(const std::string & key); private: @@ -98,7 +99,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage RCPPUTILS_TSA_REQUIRES(database_write_mutex_); using ReadQueryResult = SqliteStatementWrapper::QueryResult< - std::shared_ptr, rcutils_time_point_value_t, std::string>; + std::shared_ptr, rcutils_time_point_value_t, std::string, int>; std::shared_ptr database_ RCPPUTILS_TSA_GUARDED_BY(database_write_mutex_); SqliteStatement write_statement_ {}; @@ -110,6 +111,9 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage std::vector all_topics_and_types_; std::string relative_path_; std::atomic_bool active_transaction_ {false}; + + rcutils_time_point_value_t seek_time_ = 0; + int seek_row_id_ = 0; rosbag2_storage::StorageFilter storage_filter_ {}; // This mutex is necessary to protect: diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp index 20063af55a..c31bbe6091 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp @@ -301,6 +301,11 @@ std::shared_ptr SqliteStorage::read_next( bag_message->time_stamp = std::get<1>(*current_message_row_); bag_message->topic_name = std::get<2>(*current_message_row_); + // set start time to current time + // and set seek_row_id to the new row id up + seek_time_ = bag_message->time_stamp; + seek_row_id_ = std::get<3>(*current_message_row_) + 1; + ++current_message_row_; return bag_message; } @@ -376,6 +381,10 @@ void SqliteStorage::prepare_for_writing() void SqliteStorage::prepare_for_reading() { + std::string statement_str = "SELECT data, timestamp, topics.name, messages.id " + "FROM messages JOIN topics ON messages.topic_id = topics.id WHERE "; + + // add topic filter if (!storage_filter_.topics.empty()) { // Construct string for selected topics std::string topic_list{""}; @@ -385,20 +394,19 @@ void SqliteStorage::prepare_for_reading() topic_list += ","; } } - - read_statement_ = database_->prepare_statement( - "SELECT data, timestamp, topics.name " - "FROM messages JOIN topics ON messages.topic_id = topics.id " - "WHERE topics.name IN (" + topic_list + ")" - "ORDER BY messages.timestamp;"); - } else { - read_statement_ = database_->prepare_statement( - "SELECT data, timestamp, topics.name " - "FROM messages JOIN topics ON messages.topic_id = topics.id " - "ORDER BY messages.timestamp;"); + statement_str += "(topics.name IN (" + topic_list + ")) AND "; } + // add start time filter + statement_str += "(((timestamp = " + std::to_string(seek_time_) + ") " + "AND (messages.id >= " + std::to_string(seek_row_id_) + ")) " + "OR (timestamp > " + std::to_string(seek_time_) + ")) "; + + // add order by time then id + statement_str += "ORDER BY messages.timestamp, messages.id;"; + + read_statement_ = database_->prepare_statement(statement_str); message_result_ = read_statement_->execute_query< - std::shared_ptr, rcutils_time_point_value_t, std::string>(); + std::shared_ptr, rcutils_time_point_value_t, std::string, int>(); current_message_row_ = message_result_.begin(); } @@ -477,12 +485,24 @@ rosbag2_storage::BagMetadata SqliteStorage::get_metadata() void SqliteStorage::set_filter( const rosbag2_storage::StorageFilter & storage_filter) { + // keep current start time and start row_id + // set topic filter and reset read statement for re-read storage_filter_ = storage_filter; + read_statement_ = nullptr; } void SqliteStorage::reset_filter() { - storage_filter_ = rosbag2_storage::StorageFilter(); + set_filter(rosbag2_storage::StorageFilter()); +} + +void SqliteStorage::seek(const rcutils_time_point_value_t & timestamp) +{ + // reset row id to 0 and set start time to input + // keep topic filter and reset read statement for re-read + seek_row_id_ = 0; + seek_time_ = timestamp; + read_statement_ = nullptr; } std::string SqliteStorage::get_storage_setting(const std::string & key) diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp index 5e6b2bf1ef..02c04c2979 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp @@ -79,6 +79,11 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn filter_ = rosbag2_storage::StorageFilter(); } + void seek(const rcutils_time_point_value_t & timestamp) override + { + seek_time_ = timestamp; + } + void prepare( std::vector> messages, std::vector topics) @@ -92,6 +97,7 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn rosbag2_storage::BagMetadata metadata_; std::vector topics_; size_t num_read_; + rcutils_time_point_value_t seek_time_ = 0; rosbag2_storage::StorageFilter filter_; };