diff --git a/include/paimon/utils/roaring_bitmap64.h b/include/paimon/utils/roaring_bitmap64.h index f170d66e1..3162f3912 100644 --- a/include/paimon/utils/roaring_bitmap64.h +++ b/include/paimon/utils/roaring_bitmap64.h @@ -74,6 +74,19 @@ class PAIMON_EXPORT RoaringBitmap64 { /// @param x value added to bitmap void Add(int64_t x); + /// Bulk-insert `n` values into the bitmap. + /// + /// Compared to repeatedly calling `Add`, this implementation: + /// 1. Buckets the input values by their high-32 bits in a single pass. + /// 2. Feeds each bucket to the inner 32-bit Roaring's true-batch + /// `addMany(uint32_t*)` path, which performs container-level bulk + /// insertion. + /// + /// This avoids the per-value `std::map` lookup of the 64-bit wrapper and + /// the per-value insertion overhead inside the 32-bit array container. + /// Values may be unsorted; ordering is handled internally. + void AddMany(size_t n, const int64_t* values); + /// @param x value added to bitmap /// @return false if contain x; true if not contain x bool CheckedAdd(int64_t x); diff --git a/src/paimon/common/global_index/btree/btree_defs.h b/src/paimon/common/global_index/btree/btree_defs.h index 2a880d516..0818d7980 100644 --- a/src/paimon/common/global_index/btree/btree_defs.h +++ b/src/paimon/common/global_index/btree/btree_defs.h @@ -41,6 +41,16 @@ struct BtreeDefs { static inline const char kBtreeIndexHighPriorityPoolRatio[] = "btree-index.high-priority-pool-ratio"; + /// "btree-index.read-buffer-size" - Optional. Specifies the read buffer size for the B-tree + /// index. This setting can be tuned based on query patterns: + /// - For range queries (e.g., `VisitLessThan`, `VisitGreaterOrEqual`), increasing the buffer + /// size (e.g., to 1MB) may improve I/O bandwidth and sequential read performance. + /// - For point queries (e.g., `VisitEqual`), buffering can introduce negative effects due to + /// read amplification; it is recommended to leave this option unset. + /// + /// If specified, read block with `BufferedInputStream`. + static inline const char kBtreeIndexReadBufferSize[] = "btree-index.read-buffer-size"; + static inline const char kDefaultBtreeIndexBlockSize[] = "64KB"; static inline const char kDefaultBtreeIndexCompression[] = "none"; static inline const char kDefaultBtreeIndexCacheSize[] = "128MB"; diff --git a/src/paimon/common/global_index/btree/btree_global_index_integration_test.cpp b/src/paimon/common/global_index/btree/btree_global_index_integration_test.cpp index 6454a0045..3d1e5d2e2 100644 --- a/src/paimon/common/global_index/btree/btree_global_index_integration_test.cpp +++ b/src/paimon/common/global_index/btree/btree_global_index_integration_test.cpp @@ -20,6 +20,8 @@ #include "paimon/common/factories/io_hook.h" #include "paimon/common/global_index/btree/btree_global_index_writer.h" #include "paimon/common/global_index/btree/btree_global_indexer.h" +#include "paimon/common/global_index/btree/lazy_filtered_btree_reader.h" +#include "paimon/common/options/memory_size.h" #include "paimon/common/utils/scope_guard.h" #include "paimon/data/decimal.h" #include "paimon/data/timestamp.h" @@ -27,6 +29,7 @@ #include "paimon/global_index/bitmap_global_index_result.h" #include "paimon/global_index/io/global_index_file_reader.h" #include "paimon/global_index/io/global_index_file_writer.h" +#include "paimon/io/buffered_input_stream.h" #include "paimon/memory/memory_pool.h" #include "paimon/predicate/literal.h" #include "paimon/testing/utils/io_exception_helper.h" @@ -1506,11 +1509,6 @@ TEST_P(BTreeGlobalIndexIntegrationTest, WriteAndReadLargeDataWithSmallBlocks) { ASSERT_OK_AND_ASSIGN(auto metas, writer->Finish()); ASSERT_EQ(metas.size(), 1); - auto file_reader = std::make_shared(fs_, base_path_); - c_schema = CreateArrowSchema(field); - ASSERT_OK_AND_ASSIGN(auto reader, - indexer->CreateReader(c_schema.get(), file_reader, metas, pool_)); - // Helper lambda: given a predicate on value, collect matching row ids auto collect_rows = [total_rows = total_rows](std::function predicate) { std::vector result; @@ -1529,69 +1527,110 @@ TEST_P(BTreeGlobalIndexIntegrationTest, WriteAndReadLargeDataWithSmallBlocks) { return result; }; - // --- VisitIsNull --- - { - ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull()); - CheckResult(result, null_row_ids); - } + // Read back with different read-buffer-size configurations: + // "" -> no buffer (original path) + // "128" -> very small buffer (smaller than block size 256) + // "512" -> moderate buffer (larger than block size) + // "64KB" -> large buffer + // "1MB" -> very large buffer (likely covers entire file) + std::vector buffer_sizes = {"", "128", "512", "64KB", "1MB"}; + + for (const auto& buf_size : buffer_sizes) { + std::map read_options = { + {BtreeDefs::kBtreeIndexBlockSize, "256"}, {BtreeDefs::kBtreeIndexCacheSize, "1024"}}; + if (!buf_size.empty()) { + read_options[BtreeDefs::kBtreeIndexReadBufferSize] = buf_size; + } + ASSERT_OK_AND_ASSIGN(auto read_indexer, BTreeGlobalIndexer::Create(read_options)); - // --- VisitIsNotNull --- - { - ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull()); - CheckResult(result, non_null_row_ids); - } + auto file_reader = std::make_shared(fs_, base_path_); + c_schema = CreateArrowSchema(field); + ASSERT_OK_AND_ASSIGN(auto reader, + read_indexer->CreateReader(c_schema.get(), file_reader, metas, pool_)); + + // check input stream and read buffer size + { + auto lazy_reader = std::dynamic_pointer_cast(reader); + ASSERT_TRUE(lazy_reader); + ASSERT_OK_AND_ASSIGN(auto tmp_reader, lazy_reader->GetOrCreateReader(metas[0])); + auto btree_reader = std::dynamic_pointer_cast(tmp_reader); + ASSERT_TRUE(btree_reader); + auto input_stream = btree_reader->sst_file_reader_->block_cache_->in_; + auto buffered_stream = std::dynamic_pointer_cast(input_stream); + if (!buf_size.empty()) { + ASSERT_TRUE(buffered_stream); + ASSERT_OK_AND_ASSIGN(int32_t expected_buffer_size, + MemorySize::ParseBytes(buf_size)); + ASSERT_EQ(buffered_stream->buffer_size_, expected_buffer_size); + } else { + ASSERT_FALSE(buffered_stream); + } + } - // --- VisitEqual for value 0 --- - { - Literal lit_0(0); - ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_0)); - CheckResult(result, collect_rows([](int32_t v) { return v == 0; })); - } + // --- VisitIsNull --- + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull()); + CheckResult(result, null_row_ids); + } - // --- VisitEqual for a value in the middle --- - { - Literal lit_100(100); - ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_100)); - CheckResult(result, collect_rows([](int32_t v) { return v == 100; })); - } + // --- VisitIsNotNull --- + { + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull()); + CheckResult(result, non_null_row_ids); + } - // --- VisitEqual for non-existent value --- - { - Literal lit_neg(static_cast(-1)); - ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_neg)); - CheckResult(result, {}); - } + // --- VisitEqual for value 0 --- + { + Literal lit_0(0); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_0)); + CheckResult(result, collect_rows([](int32_t v) { return v == 0; })); + } - // --- VisitLessThan for value 5 --- - { - Literal lit_5(5); - ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessThan(lit_5)); - CheckResult(result, collect_rows([](int32_t v) { return v < 5; })); - } + // --- VisitEqual for a value in the middle --- + { + Literal lit_100(100); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_100)); + CheckResult(result, collect_rows([](int32_t v) { return v == 100; })); + } - // --- VisitGreaterOrEqual for a high value near max --- - { - int32_t max_val = - static_cast(collect_rows([](int32_t) { return true; }).size()) / 3; - int32_t threshold = max_val - 2; - Literal lit_threshold(threshold); - ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterOrEqual(lit_threshold)); - CheckResult(result, collect_rows([threshold](int32_t v) { return v >= threshold; })); - } + // --- VisitEqual for non-existent value --- + { + Literal lit_neg(static_cast(-1)); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_neg)); + CheckResult(result, {}); + } - // --- VisitIn for scattered values --- - { - std::vector in_literals = {Literal(0), Literal(500), Literal(10000)}; - ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIn(in_literals)); - CheckResult(result, - collect_rows([](int32_t v) { return v == 0 || v == 500 || v == 10000; })); - } + // --- VisitLessThan for value 5 --- + { + Literal lit_5(5); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessThan(lit_5)); + CheckResult(result, collect_rows([](int32_t v) { return v < 5; })); + } - // --- VisitNotEqual for value 0 --- - { - Literal lit_0(0); - ASSERT_OK_AND_ASSIGN(auto result, reader->VisitNotEqual(lit_0)); - CheckResult(result, collect_rows([](int32_t v) { return v != 0; })); + // --- VisitGreaterOrEqual for a high value near max --- + { + int32_t max_val = + static_cast(collect_rows([](int32_t) { return true; }).size()) / 3; + int32_t threshold = max_val - 2; + Literal lit_threshold(threshold); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterOrEqual(lit_threshold)); + CheckResult(result, collect_rows([threshold](int32_t v) { return v >= threshold; })); + } + + // --- VisitIn for scattered values --- + { + std::vector in_literals = {Literal(0), Literal(500), Literal(10000)}; + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIn(in_literals)); + CheckResult(result, + collect_rows([](int32_t v) { return v == 0 || v == 500 || v == 10000; })); + } + + // --- VisitNotEqual for value 0 --- + { + Literal lit_0(0); + ASSERT_OK_AND_ASSIGN(auto result, reader->VisitNotEqual(lit_0)); + CheckResult(result, collect_rows([](int32_t v) { return v != 0; })); + } } } @@ -1722,6 +1761,22 @@ TEST_P(BTreeGlobalIndexIntegrationTest, FinishWithEmptyData) { ASSERT_NOK_WITH_MSG(writer->Finish(), "Should never write an empty btree index file"); } +TEST_P(BTreeGlobalIndexIntegrationTest, InvalidReadOptions) { + auto file_writer = std::make_shared(fs_, base_path_); + auto field = arrow::field("int_field", arrow::int32()); + auto c_schema = CreateArrowSchema(field); + + std::map options = {{BtreeDefs::kBtreeIndexBlockSize, "4096"}, + {BtreeDefs::kBtreeIndexCompression, GetParam()}, + {BtreeDefs::kBtreeIndexReadBufferSize, "4GB"}}; + ASSERT_OK_AND_ASSIGN(auto indexer, BTreeGlobalIndexer::Create(options)); + + auto file_reader = std::make_shared(fs_, base_path_); + ASSERT_NOK_WITH_MSG(indexer->CreateReader(c_schema.get(), file_reader, /*files=*/{}, pool_), + "In BTreeGlobalIndexer::CreateReader: option btree-index.read-buffer-size " + "is 4GB, exceed INT_MAX or less than 0"); +} + TEST_P(BTreeGlobalIndexIntegrationTest, TestIOException) { bool run_complete = false; auto io_hook = paimon::IOHook::GetInstance(); diff --git a/src/paimon/common/global_index/btree/btree_global_index_reader.cpp b/src/paimon/common/global_index/btree/btree_global_index_reader.cpp index 9adadd48f..b47ae0e87 100644 --- a/src/paimon/common/global_index/btree/btree_global_index_reader.cpp +++ b/src/paimon/common/global_index/btree/btree_global_index_reader.cpp @@ -24,15 +24,39 @@ #include "paimon/memory/bytes.h" #include "paimon/predicate/literal.h" namespace paimon { + +Result> BTreeGlobalIndexReader::Create( + const std::shared_ptr& sst_file_reader, RoaringBitmap64&& null_bitmap, + const std::optional& min_key_slice, + const std::optional& max_key_slice, + const std::shared_ptr& key_type, const std::shared_ptr& pool) { + std::optional min_key; + std::optional max_key; + if (min_key_slice) { + PAIMON_ASSIGN_OR_RAISE( + min_key, KeySerializer::DeserializeKey(min_key_slice.value(), key_type, pool.get())); + } + if (max_key_slice) { + PAIMON_ASSIGN_OR_RAISE( + max_key, KeySerializer::DeserializeKey(max_key_slice.value(), key_type, pool.get())); + } + return std::shared_ptr(new BTreeGlobalIndexReader( + sst_file_reader, std::move(null_bitmap), std::move(min_key), std::move(max_key), + min_key_slice, max_key_slice, key_type, pool)); +} + BTreeGlobalIndexReader::BTreeGlobalIndexReader( const std::shared_ptr& sst_file_reader, RoaringBitmap64&& null_bitmap, - const std::optional& min_key, const std::optional& max_key, + std::optional min_key, std::optional max_key, + std::optional min_key_slice, std::optional max_key_slice, const std::shared_ptr& key_type, const std::shared_ptr& pool) : pool_(pool), sst_file_reader_(sst_file_reader), null_bitmap_(std::move(null_bitmap)), - min_key_(min_key), - max_key_(max_key), + min_key_(std::move(min_key)), + max_key_(std::move(max_key)), + min_key_slice_(std::move(min_key_slice)), + max_key_slice_(std::move(max_key_slice)), key_type_(key_type), comparator_(KeySerializer::CreateComparator(key_type, pool)) {} @@ -253,9 +277,8 @@ Result> BTreeGlobalIndexReader::VisitFullText Result BTreeGlobalIndexReader::RangeQuery(const std::optional& from, const std::optional& to, bool from_inclusive, bool to_inclusive) { - RoaringBitmap64 result; if (!from || !to) { - return result; + return RoaringBitmap64(); } // Create an index block iterator to iterate through data blocks @@ -266,10 +289,35 @@ Result BTreeGlobalIndexReader::RangeQuery(const std::optional= from, so skip lower bound comparison. + // When to == max_key_, all entries are <= to, so skip upper bound comparison. + bool skip_from_check = false; + bool skip_to_check = false; + if (min_key_slice_) { + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_min, comparator_(from_slice, min_key_slice_.value())); + skip_from_check = (cmp_min == 0) && from_inclusive; + } + if (max_key_slice_) { + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_max, comparator_(to_slice, max_key_slice_.value())); + skip_to_check = (cmp_max == 0) && to_inclusive; + } + + RoaringBitmap64 result; + + // Per-data-block row-id buffer. Collect row-ids within a single data + // block and flush them with one AddMany call. + std::vector block_row_ids; + auto index_iterator = sst_file_reader_->CreateIndexIterator(); PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool seek_result, index_iterator->SeekTo(from_slice)); bool first_block = true; + // After SeekTo positions at the first entry >= from, only entries in the first + // block before the seek position could be < from. Once we pass them, all subsequent + // entries are guaranteed >= from, so we can skip the lower bound check. + bool passed_from_bound = skip_from_check; + while (index_iterator->HasNext()) { // Get the next data block PAIMON_ASSIGN_OR_RAISE(std::unique_ptr data_iterator, @@ -285,26 +333,83 @@ Result BTreeGlobalIndexReader::RangeQuery(const std::optionalHasNext()) { - PAIMON_ASSIGN_OR_RAISE(BlockEntry entry, data_iterator->Next()); + block_row_ids.clear(); - // Compare entry key against from bound - PAIMON_ASSIGN_OR_RAISE(int32_t cmp_from, comparator_(entry.key, from_slice)); - - // Check lower bound - if (!from_inclusive && cmp_from == 0) { - continue; + if (skip_to_check && passed_from_bound) { + // Fast path: no boundary checks needed, skip key parsing entirely + while (data_iterator->HasNext()) { + PAIMON_ASSIGN_OR_RAISE(MemorySlice value, data_iterator->SkipKeyAndReadValue()); + PAIMON_RETURN_NOT_OK(DeserializeRowIds(value, &block_row_ids)); } - - // Compare entry key against to bound - PAIMON_ASSIGN_OR_RAISE(int32_t cmp_to, comparator_(entry.key, to_slice)); - - if (cmp_to > 0 || (!to_inclusive && cmp_to == 0)) { + } else if (skip_to_check) { + // Only need to check lower bound (first few entries in first block) + while (data_iterator->HasNext()) { + PAIMON_ASSIGN_OR_RAISE(BlockEntry entry, data_iterator->Next()); + if (!passed_from_bound) { + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_from, comparator_(entry.key, from_slice)); + if (!from_inclusive && cmp_from == 0) { + continue; + } + if (cmp_from > 0) { + passed_from_bound = true; + } + } + PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry.value, &block_row_ids)); + } + passed_from_bound = true; + } else if (passed_from_bound) { + // Only need to check upper bound + bool reached_end = false; + while (data_iterator->HasNext()) { + PAIMON_ASSIGN_OR_RAISE(BlockEntry entry, data_iterator->Next()); + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_to, comparator_(entry.key, to_slice)); + if (cmp_to > 0 || (!to_inclusive && cmp_to == 0)) { + reached_end = true; + break; + } + PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry.value, &block_row_ids)); + } + if (!block_row_ids.empty()) { + result.AddMany(block_row_ids.size(), block_row_ids.data()); + } + if (reached_end) { + return result; + } + continue; + } else { + // Need to check both bounds + bool reached_end = false; + while (data_iterator->HasNext()) { + PAIMON_ASSIGN_OR_RAISE(BlockEntry entry, data_iterator->Next()); + if (!passed_from_bound) { + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_from, comparator_(entry.key, from_slice)); + if (!from_inclusive && cmp_from == 0) { + continue; + } + if (cmp_from > 0) { + passed_from_bound = true; + } + } + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_to, comparator_(entry.key, to_slice)); + if (cmp_to > 0 || (!to_inclusive && cmp_to == 0)) { + reached_end = true; + break; + } + PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry.value, &block_row_ids)); + } + passed_from_bound = true; + if (!block_row_ids.empty()) { + result.AddMany(block_row_ids.size(), block_row_ids.data()); + } + if (reached_end) { return result; } + continue; + } - PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry.value, &result)); + // Flush this block's row-ids in one batched call + if (!block_row_ids.empty()) { + result.AddMany(block_row_ids.size(), block_row_ids.data()); } } @@ -312,7 +417,7 @@ Result BTreeGlobalIndexReader::RangeQuery(const std::optional* out) const { auto input = slice.ToInput(); PAIMON_ASSIGN_OR_RAISE(int32_t num_row_ids, input.ReadVarLenInt()); if (num_row_ids <= 0) { @@ -320,9 +425,10 @@ Status BTreeGlobalIndexReader::DeserializeRowIds(const MemorySlice& slice, "Invalid row id length {} in DeserializeRowIds for BTreeGlobalIndexReader, must > 0", num_row_ids)); } + out->reserve(out->size() + static_cast(num_row_ids)); for (int32_t i = 0; i < num_row_ids; i++) { PAIMON_ASSIGN_OR_RAISE(int64_t row_id, input.ReadVarLenLong()); - result->Add(row_id); + out->push_back(row_id); } return Status::OK(); } diff --git a/src/paimon/common/global_index/btree/btree_global_index_reader.h b/src/paimon/common/global_index/btree/btree_global_index_reader.h index fc39450d1..204da21a8 100644 --- a/src/paimon/common/global_index/btree/btree_global_index_reader.h +++ b/src/paimon/common/global_index/btree/btree_global_index_reader.h @@ -37,11 +37,11 @@ namespace paimon { class BTreeGlobalIndexReader : public GlobalIndexReader, public std::enable_shared_from_this { public: - BTreeGlobalIndexReader(const std::shared_ptr& sst_file_reader, - RoaringBitmap64&& null_bitmap, const std::optional& min_key, - const std::optional& max_key, - const std::shared_ptr& key_type, - const std::shared_ptr& pool); + static Result> Create( + const std::shared_ptr& sst_file_reader, RoaringBitmap64&& null_bitmap, + const std::optional& min_key_slice, + const std::optional& max_key_slice, + const std::shared_ptr& key_type, const std::shared_ptr& pool); Result> VisitIsNotNull() override; @@ -88,11 +88,18 @@ class BTreeGlobalIndexReader : public GlobalIndexReader, } private: + BTreeGlobalIndexReader(const std::shared_ptr& sst_file_reader, + RoaringBitmap64&& null_bitmap, std::optional min_key, + std::optional max_key, std::optional min_key_slice, + std::optional max_key_slice, + const std::shared_ptr& key_type, + const std::shared_ptr& pool); + Result RangeQuery(const std::optional& from, const std::optional& to, bool from_inclusive, bool to_inclusive); - Status DeserializeRowIds(const MemorySlice& slice, RoaringBitmap64* result) const; + Status DeserializeRowIds(const MemorySlice& slice, std::vector* out) const; Result AllNonNullRows(); @@ -101,6 +108,10 @@ class BTreeGlobalIndexReader : public GlobalIndexReader, RoaringBitmap64 null_bitmap_; std::optional min_key_; std::optional max_key_; + /// Cached serialized min/max key slices to avoid repeated serialization in RangeQuery. + std::optional min_key_slice_; + std::optional max_key_slice_; + std::shared_ptr key_type_; MemorySlice::SliceComparator comparator_; }; diff --git a/src/paimon/common/global_index/btree/btree_global_indexer.cpp b/src/paimon/common/global_index/btree/btree_global_indexer.cpp index a21edf166..fdad901b7 100644 --- a/src/paimon/common/global_index/btree/btree_global_indexer.cpp +++ b/src/paimon/common/global_index/btree/btree_global_indexer.cpp @@ -101,10 +101,22 @@ Result> BTreeGlobalIndexer::CreateReader( "invalid schema for BTreeGlobalIndexReader, supposed to have single field."); } auto key_type = schema->field(0)->type(); + + std::optional read_buffer_size; + if (auto iter = options_.find(BtreeDefs::kBtreeIndexReadBufferSize); iter != options_.end()) { + PAIMON_ASSIGN_OR_RAISE(int64_t tmp_buffer_size, MemorySize::ParseBytes(iter->second)); + if (tmp_buffer_size <= 0 || tmp_buffer_size > INT_MAX) { + return Status::Invalid( + fmt::format("In BTreeGlobalIndexer::CreateReader: option {} is {}, exceed INT_MAX " + "or less than 0", + BtreeDefs::kBtreeIndexReadBufferSize, iter->second)); + } + read_buffer_size = static_cast(tmp_buffer_size); + } // TODO(lisizhuo.lsz): Allow users to specify an executor std::shared_ptr executor = CreateDefaultExecutor(); - return std::make_shared(files, key_type, file_reader, cache_manager_, - pool, executor); + return std::make_shared(read_buffer_size, files, key_type, file_reader, + cache_manager_, pool, executor); } } // namespace paimon diff --git a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp index f2ae0338e..06bad81f6 100644 --- a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp +++ b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp @@ -31,15 +31,18 @@ #include "paimon/common/sst/sst_file_reader.h" #include "paimon/common/utils/crc32c.h" #include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/io/buffered_input_stream.h" #include "paimon/utils/roaring_bitmap64.h" namespace paimon { LazyFilteredBTreeReader::LazyFilteredBTreeReader( - const std::vector& files, const std::shared_ptr& key_type, + std::optional read_buffer_size, const std::vector& files, + const std::shared_ptr& key_type, const std::shared_ptr& file_reader, const std::shared_ptr& cache_manager, const std::shared_ptr& pool, const std::shared_ptr& executor) - : pool_(pool), + : read_buffer_size_(read_buffer_size), + pool_(pool), file_selector_(files, key_type, pool), key_type_(key_type), file_reader_(file_reader), @@ -187,6 +190,7 @@ Result> LazyFilteredBTreeReader::CreateUnionR PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, GetOrCreateReader(meta)); readers.push_back(std::move(reader)); } + return std::make_shared(std::move(readers), executor_); } @@ -206,24 +210,25 @@ Result> LazyFilteredBTreeReader::CreateSingle // Create comparator based on field type auto comparator = KeySerializer::CreateComparator(key_type_, pool_); - // Deserialize min/max keys from meta + // Get min/max key slices from meta data (keep as slices; Create() will deserialize) auto index_meta = BTreeIndexMeta::Deserialize(meta.metadata, pool_.get()); - std::optional min_key; - std::optional max_key; + std::optional min_key_slice; + std::optional max_key_slice; if (index_meta->FirstKey()) { - PAIMON_ASSIGN_OR_RAISE( - min_key, KeySerializer::DeserializeKey(MemorySlice::Wrap(index_meta->FirstKey()), - key_type_, pool_.get())); + min_key_slice = MemorySlice::Wrap(index_meta->FirstKey()); } if (index_meta->LastKey()) { - PAIMON_ASSIGN_OR_RAISE( - max_key, KeySerializer::DeserializeKey(MemorySlice::Wrap(index_meta->LastKey()), - key_type_, pool_.get())); + max_key_slice = MemorySlice::Wrap(index_meta->LastKey()); } // Open input stream and create block cache PAIMON_ASSIGN_OR_RAISE(std::shared_ptr input_stream, file_reader_->GetInputStream(meta.file_path)); + if (read_buffer_size_) { + input_stream = std::make_shared( + input_stream, read_buffer_size_.value(), pool_.get()); + } + auto block_cache = std::make_shared(meta.file_path, input_stream, cache_manager_, pool_); @@ -247,8 +252,8 @@ Result> LazyFilteredBTreeReader::CreateSingle SstFileReader::Create(footer->GetIndexBlockHandle(), footer->GetBloomFilterHandle(), comparator, block_cache, pool_)); - return std::make_shared(sst_file_reader, std::move(null_bitmap), - min_key, max_key, key_type_, pool_); + return BTreeGlobalIndexReader::Create(sst_file_reader, std::move(null_bitmap), min_key_slice, + max_key_slice, key_type_, pool_); } Result LazyFilteredBTreeReader::ReadNullBitmap( diff --git a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.h b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.h index 4889b1e4e..229b01c6b 100644 --- a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.h +++ b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.h @@ -37,7 +37,8 @@ namespace paimon { class LazyFilteredBTreeReader : public GlobalIndexReader { public: - LazyFilteredBTreeReader(const std::vector& files, + LazyFilteredBTreeReader(std::optional read_buffer_size, + const std::vector& files, const std::shared_ptr& key_type, const std::shared_ptr& file_reader, const std::shared_ptr& cache_manager, @@ -90,6 +91,7 @@ class LazyFilteredBTreeReader : public GlobalIndexReader { const std::optional& block_handle); private: + std::optional read_buffer_size_; std::shared_ptr pool_; BTreeFileMetaSelector file_selector_; std::shared_ptr key_type_; diff --git a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader_test.cpp b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader_test.cpp index 183526ec6..f95d4c0f5 100644 --- a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader_test.cpp +++ b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader_test.cpp @@ -135,7 +135,8 @@ class LazyFilteredBTreeReaderTest : public ::testing::Test { const std::shared_ptr& executor = nullptr) const { auto file_reader = std::make_shared(fs_, base_path_); auto cache_manager = std::make_shared(1024 * 1024, 0.5); - return std::make_shared(all_metas_, arrow::int32(), file_reader, + return std::make_shared(/*read_buffer_size=*/std::nullopt, + all_metas_, arrow::int32(), file_reader, cache_manager, pool_, executor); } @@ -360,7 +361,8 @@ TEST_F(LazyFilteredBTreeReaderTest, TestEmptyFilesList) { auto file_reader = std::make_shared(fs_, base_path_); auto cache_manager = std::make_shared(1024 * 1024, 0.5); auto reader = std::make_shared( - empty_metas, arrow::int32(), file_reader, cache_manager, pool_, /*executor=*/nullptr); + /*read_buffer_size=*/std::nullopt, empty_metas, arrow::int32(), file_reader, cache_manager, + pool_, /*executor=*/nullptr); // Any query on empty files should return empty bitmap Literal literal_1(1); @@ -451,7 +453,8 @@ TEST_F(LazyFilteredBTreeReaderTest, TestParallelEmptyFilesList) { auto file_reader = std::make_shared(fs_, base_path_); auto cache_manager = std::make_shared(1024 * 1024, 0.5); auto reader = std::make_shared( - empty_metas, arrow::int32(), file_reader, cache_manager, pool_, executor); + /*read_buffer_size=*/std::nullopt, empty_metas, arrow::int32(), file_reader, cache_manager, + pool_, executor); Literal literal_1(1); ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(literal_1)); CheckEmpty(result); diff --git a/src/paimon/common/io/buffered_input_stream.cpp b/src/paimon/common/io/buffered_input_stream.cpp index 7739b1d14..ff9006c5a 100644 --- a/src/paimon/common/io/buffered_input_stream.cpp +++ b/src/paimon/common/io/buffered_input_stream.cpp @@ -37,15 +37,34 @@ BufferedInputStream::BufferedInputStream(const std::shared_ptr& in, BufferedInputStream::~BufferedInputStream() noexcept = default; Status BufferedInputStream::Seek(int64_t offset, SeekOrigin origin) { + // Convert all seek origins to an absolute offset so the buffer-hit fast + // path below can work uniformly on absolute positions. + int64_t target_abs_offset = offset; if (origin == SeekOrigin::FS_SEEK_CUR) { PAIMON_ASSIGN_OR_RAISE(int64_t cur_pos, GetPos()); - offset = offset + cur_pos; - PAIMON_RETURN_NOT_OK(in_->Seek(offset, FS_SEEK_SET)); - } else { - PAIMON_RETURN_NOT_OK(in_->Seek(offset, origin)); + target_abs_offset = cur_pos + offset; + } else if (origin == SeekOrigin::FS_SEEK_END) { + PAIMON_ASSIGN_OR_RAISE(int64_t length, in_->Length()); + target_abs_offset = length + offset; + } + // else: FS_SEEK_SET — target_abs_offset is already absolute. + + // Fast path: if the new absolute offset still falls into the bytes already + // cached in buffer_ (i.e. the window from buf_start_abs to buf_end_abs), just + // adjust pos_ without touching the underlying stream. + if (count_ > 0) { + PAIMON_ASSIGN_OR_RAISE(int64_t in_pos, in_->GetPos()); + const int64_t buf_start_abs = in_pos - count_; + const int64_t buf_end_abs = in_pos; + if (target_abs_offset >= buf_start_abs && target_abs_offset <= buf_end_abs) { + pos_ = static_cast(target_abs_offset - buf_start_abs); + return Status::OK(); + } } - // after seek, reset buffer_ + // Slow path: the target is outside the current buffer window, fall back to + // a real seek on the underlying stream and invalidate the buffer. + PAIMON_RETURN_NOT_OK(in_->Seek(target_abs_offset, FS_SEEK_SET)); pos_ = 0; count_ = 0; return Status::OK(); diff --git a/src/paimon/common/io/buffered_input_stream_test.cpp b/src/paimon/common/io/buffered_input_stream_test.cpp index 8b9cceae0..14f2e85fb 100644 --- a/src/paimon/common/io/buffered_input_stream_test.cpp +++ b/src/paimon/common/io/buffered_input_stream_test.cpp @@ -93,4 +93,175 @@ TEST(BufferedInputStreamTest, TestSimple) { ASSERT_OK(input_stream->Close()); } + +TEST(BufferedInputStreamTest, TestSeek) { + // Data: "0123456789abcdef" (16 bytes), buffer_size = 8 + auto pool = GetDefaultPool(); + std::string data = "0123456789abcdef"; + auto in = std::make_shared(reinterpret_cast(data.data()), + data.size()); + auto stream = std::make_shared(in, /*buffer_size=*/8, pool.get()); + + // Helper: verify buffer_ content matches expected substring of data + auto check_buffer = [&](const BufferedInputStream& s, const std::string& expected_content, + int32_t expected_pos, int32_t expected_count) { + ASSERT_EQ(s.pos_, expected_pos); + ASSERT_EQ(s.count_, expected_count); + std::string actual(s.buffer_->data(), s.buffer_->data() + expected_count); + ASSERT_EQ(actual, expected_content) << "buffer content mismatch: actual=\"" << actual + << "\", expected=\"" << expected_content << "\""; + }; + + std::string buf(4, '\0'); + + // Initial state: buffer empty + ASSERT_EQ(stream->pos_, 0); + ASSERT_EQ(stream->count_, 0); + + // FS_SEEK_SET slow path (buffer empty, count_==0): seek to pos 4, read "4567" + // First Read calls Fill: reads 8 bytes from pos 4 -> buffer = "456789ab", count_=8. + // Then consumes 4 bytes -> pos_=4. + { + ASSERT_OK(stream->Seek(4, SeekOrigin::FS_SEEK_SET)); + // After seek: slow path, buffer invalidated + ASSERT_EQ(stream->pos_, 0); + ASSERT_EQ(stream->count_, 0); + ASSERT_EQ(4, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("4567", buf); + check_buffer(*stream, "456789ab", 4, 8); + } + + // FS_SEEK_CUR buffer hit: pos=8, seek -4 -> target=4, inside buffer [4..12) + // Fast path: only adjusts pos_ to 0, count_ stays 8, buffer unchanged. + { + ASSERT_OK(stream->Seek(-4, SeekOrigin::FS_SEEK_CUR)); + check_buffer(*stream, "456789ab", 0, 8); + ASSERT_EQ(4, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("4567", buf); + check_buffer(*stream, "456789ab", 4, 8); + } + + // FS_SEEK_SET buffer hit: pos=8, seek to 5, inside buffer [4..12) + // Fast path: pos_ = 5 - 4 = 1, count_ stays 8, buffer unchanged. + { + ASSERT_OK(stream->Seek(5, SeekOrigin::FS_SEEK_SET)); + check_buffer(*stream, "456789ab", 1, 8); + ASSERT_EQ(5, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("5678", buf); + check_buffer(*stream, "456789ab", 5, 8); + } + + // FS_SEEK_END buffer miss: target = 16 + (-6) = 10 + // Current buffer covers [4..12). pos 10 is inside [4..12) -> actually buffer hit! + // Fast path: pos_ = 10 - 4 = 6, count_ stays 8. + { + ASSERT_OK(stream->Seek(-6, SeekOrigin::FS_SEEK_END)); + check_buffer(*stream, "456789ab", 6, 8); + ASSERT_EQ(10, stream->GetPos().value()); + + // Read 4 bytes from pos 10: 2 left in buffer ("ab"), then refill. + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("abcd", buf); + // After consuming "ab" (pos_==count_==8), InnerRead calls Fill from in_ pos 12, + // reads [12..16) -> buffer = "cdef", count_=4, then consumes 2 -> pos_=2. + check_buffer(*stream, "cdef", 2, 4); + } + + // Buffer miss slow path: seek to pos 0, current buffer covers [12..16). + // Target 0 is outside [12..16) -> slow path, buffer invalidated. + { + ASSERT_OK(stream->Seek(0, SeekOrigin::FS_SEEK_SET)); + ASSERT_EQ(stream->pos_, 0); + ASSERT_EQ(stream->count_, 0); + ASSERT_EQ(0, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("0123", buf); + // Fill reads [0..8) -> buffer = "01234567", count_=8, pos_=4 + check_buffer(*stream, "01234567", 4, 8); + } + + // Buffer hit at exact boundary start: seek to pos 0 (= buf_start_abs=0) + // Fast path: pos_ = 0, count_ stays 8, buffer unchanged. + { + ASSERT_OK(stream->Seek(0, SeekOrigin::FS_SEEK_SET)); + check_buffer(*stream, "01234567", 0, 8); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("0123", buf); + check_buffer(*stream, "01234567", 4, 8); + } + + // Buffer hit at exact boundary end: seek to pos 8 (= buf_end_abs=8) + // Fast path: pos_ = 8 == count_, buffer unchanged but next Read triggers refill. + { + ASSERT_OK(stream->Seek(8, SeekOrigin::FS_SEEK_SET)); + check_buffer(*stream, "01234567", 8, 8); + ASSERT_EQ(8, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("89ab", buf); + // pos_==count_ triggered Fill: buffer = "89abcdef", count_=8, pos_=4 + check_buffer(*stream, "89abcdef", 4, 8); + } + + // FS_SEEK_CUR buffer miss: pos=12, seek -12 -> target=0, outside [8..16) + { + ASSERT_OK(stream->Seek(-12, SeekOrigin::FS_SEEK_CUR)); + ASSERT_EQ(stream->pos_, 0); + ASSERT_EQ(stream->count_, 0); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("0123", buf); + check_buffer(*stream, "01234567", 4, 8); + } + + // FS_SEEK_END buffer hit: fill buffer near file end, then seek within via FS_SEEK_END. + { + ASSERT_OK(stream->Seek(12, SeekOrigin::FS_SEEK_SET)); + // pos 12 is outside current buffer [0..8) -> slow path + ASSERT_EQ(stream->pos_, 0); + ASSERT_EQ(stream->count_, 0); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("cdef", buf); + // Fill from 12: buffer = "cdef", count_=4, pos_=4 + check_buffer(*stream, "cdef", 4, 4); + + // Seek -4 from end -> target = 12, buffer covers [12..16), 12 is inside. + // Fast path: pos_ = 12 - 12 = 0 + ASSERT_OK(stream->Seek(-4, SeekOrigin::FS_SEEK_END)); + check_buffer(*stream, "cdef", 0, 4); + ASSERT_EQ(12, stream->GetPos().value()); + + ASSERT_EQ(4, stream->Read(buf.data(), 4).value()); + ASSERT_EQ("cdef", buf); + check_buffer(*stream, "cdef", 4, 4); + } + + // Empty buffer (count_==0): fresh stream, seek always takes slow path. + { + auto in2 = std::make_shared( + reinterpret_cast(data.data()), data.size()); + auto fresh = std::make_shared(in2, /*buffer_size=*/8, pool.get()); + ASSERT_EQ(fresh->pos_, 0); + ASSERT_EQ(fresh->count_, 0); + + ASSERT_OK(fresh->Seek(10, SeekOrigin::FS_SEEK_SET)); + ASSERT_EQ(fresh->pos_, 0); + ASSERT_EQ(fresh->count_, 0); + ASSERT_EQ(10, fresh->GetPos().value()); + + ASSERT_EQ(4, fresh->Read(buf.data(), 4).value()); + ASSERT_EQ("abcd", buf); + // Fill from 10: buffer = "abcdef", count_=6, pos_=4 + check_buffer(*fresh, "abcdef", 4, 6); + } +} } // namespace paimon::test diff --git a/src/paimon/common/sst/block_iterator.cpp b/src/paimon/common/sst/block_iterator.cpp index e1a0b074b..18c3b9618 100644 --- a/src/paimon/common/sst/block_iterator.cpp +++ b/src/paimon/common/sst/block_iterator.cpp @@ -46,6 +46,17 @@ Result BlockIterator::ReadEntry() { return BlockEntry(key, value); } +Result BlockIterator::SkipKeyAndReadValue() { + if (polled_position_ >= 0) { + PAIMON_RETURN_NOT_OK(input_.SetPosition(polled_position_)); + polled_position_ = -1; + } + PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt()); + PAIMON_RETURN_NOT_OK(input_.SetPosition(input_.Position() + key_length)); + PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt()); + return input_.ReadSliceView(value_length); +} + Result BlockIterator::ReadKeyAndSkipValue() { PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt()); auto key = input_.ReadSliceView(key_length); diff --git a/src/paimon/common/sst/block_iterator.h b/src/paimon/common/sst/block_iterator.h index 10ee7a43d..078ef4c73 100644 --- a/src/paimon/common/sst/block_iterator.h +++ b/src/paimon/common/sst/block_iterator.h @@ -30,15 +30,20 @@ class PAIMON_EXPORT BlockIterator { Result Next(); - Result ReadEntry(); + /// Read only the value MemorySlice from the current position, skipping the key. + /// Used in fast-path iteration where no key comparison is needed. + /// @note Public function, conceptually similar to `Next()` but optimized to skip key parsing. + Result SkipKeyAndReadValue(); Result SeekTo(const MemorySlice& target_key); private: /// Read only the key MemorySlice from the current position, skipping the value. - /// This avoids creating a value MemorySlice and BlockEntry during binary search. + /// @note Inner function, only called by `SeekTo`. Result ReadKeyAndSkipValue(); + Result ReadEntry(); + MemorySliceInput input_; /// Position of the entry that should be returned by Next() after SeekTo. /// -1 means no pending entry. diff --git a/src/paimon/common/utils/roaring_bitmap64.cpp b/src/paimon/common/utils/roaring_bitmap64.cpp index e22672c75..ff807e3f0 100644 --- a/src/paimon/common/utils/roaring_bitmap64.cpp +++ b/src/paimon/common/utils/roaring_bitmap64.cpp @@ -173,6 +173,57 @@ void RoaringBitmap64::Add(int64_t x) { GetRoaringBitmap(roaring_bitmap_).add(static_cast(x)); } +void RoaringBitmap64::AddMany(size_t n, const int64_t* values) { + if (n == 0 || !values) { + return; + } + auto& bitmap = GetRoaringBitmap(roaring_bitmap_); + + // Bucket values by their high-32 bits in a single pass. K (the number of + // distinct buckets) is typically very small (often 1, rarely > a handful). + struct Bucket { + uint32_t high; + std::vector lows; + }; + std::vector buckets; + buckets.reserve(4); + + for (size_t i = 0; i < n; ++i) { + const auto v = static_cast(values[i]); + const auto high = static_cast(v >> 32); + const auto low = static_cast(v); + + Bucket* target = nullptr; + // Fast path: the previous bucket is overwhelmingly likely to match for + // sequential row-id streams produced by a B-tree scan. + if (!buckets.empty() && buckets.back().high == high) { + target = &buckets.back(); + } else { + for (auto& b : buckets) { + if (b.high == high) { + target = &b; + break; + } + } + if (target == nullptr) { + buckets.push_back({high, {}}); + // Reserve generously on first encounter; we may end up using + // more memory than necessary if K turns out to be > 1, but + // this avoids repeated grow-and-copy in the common K == 1 case. + buckets.back().lows.reserve(buckets.size() == 1 ? n : n / 4); + target = &buckets.back(); + } + } + target->lows.push_back(low); + } + + // Hand each bucket to the inner 32-bit Roaring's true-batch addMany, + // which performs container-level bulk insertion. + for (const auto& b : buckets) { + bitmap.getOrCreateInner(b.high).addMany(b.lows.size(), b.lows.data()); + } +} + void RoaringBitmap64::AddRange(int64_t min, int64_t max) { GetRoaringBitmap(roaring_bitmap_).addRange(min, max); } diff --git a/src/paimon/common/utils/roaring_bitmap64_test.cpp b/src/paimon/common/utils/roaring_bitmap64_test.cpp index bc70cee89..b9eb80258 100644 --- a/src/paimon/common/utils/roaring_bitmap64_test.cpp +++ b/src/paimon/common/utils/roaring_bitmap64_test.cpp @@ -518,4 +518,114 @@ TEST(RoaringBitmap64Test, TestAddRangeLargeValues) { ASSERT_EQ(values[0], start); ASSERT_EQ(values[100], end); } + +TEST(RoaringBitmap64Test, TestAddMany) { + // empty input + { + RoaringBitmap64 bitmap; + bitmap.AddMany(0, nullptr); + ASSERT_TRUE(bitmap.IsEmpty()); + } + // single element + { + RoaringBitmap64 bitmap; + int64_t value = 999999999999ll; + bitmap.AddMany(1, &value); + ASSERT_EQ(bitmap.Cardinality(), 1); + ASSERT_TRUE(bitmap.Contains(999999999999ll)); + } + // single bucket (all values share the same high-32 bits) + { + RoaringBitmap64 bitmap; + std::vector values = {100000000001ll, 100000000005ll, 100000000003ll, + 100000000002ll}; + bitmap.AddMany(values.size(), values.data()); + ASSERT_EQ(bitmap.Cardinality(), 4); + ASSERT_TRUE(bitmap.Contains(100000000001ll)); + ASSERT_TRUE(bitmap.Contains(100000000002ll)); + ASSERT_TRUE(bitmap.Contains(100000000003ll)); + ASSERT_TRUE(bitmap.Contains(100000000005ll)); + ASSERT_FALSE(bitmap.Contains(100000000004ll)); + } + // multiple buckets (values span different high-32 bit buckets) + { + RoaringBitmap64 bitmap; + std::vector values = {1ll, 100000000000ll, 200000000000ll, 2ll, 100000000001ll}; + bitmap.AddMany(values.size(), values.data()); + ASSERT_EQ(bitmap.Cardinality(), 5); + ASSERT_TRUE(bitmap.Contains(1ll)); + ASSERT_TRUE(bitmap.Contains(2ll)); + ASSERT_TRUE(bitmap.Contains(100000000000ll)); + ASSERT_TRUE(bitmap.Contains(100000000001ll)); + ASSERT_TRUE(bitmap.Contains(200000000000ll)); + } + // duplicates + { + RoaringBitmap64 bitmap; + std::vector values = {42ll, 42ll, 42ll, 100000000000ll, 100000000000ll}; + bitmap.AddMany(values.size(), values.data()); + ASSERT_EQ(bitmap.Cardinality(), 2); + ASSERT_TRUE(bitmap.Contains(42ll)); + ASSERT_TRUE(bitmap.Contains(100000000000ll)); + } + // unsorted input (reverse order) + { + std::vector values; + for (int64_t i = 99; i >= 0; --i) { + values.push_back(i + 100000000000ll); + } + RoaringBitmap64 bitmap; + bitmap.AddMany(values.size(), values.data()); + ASSERT_EQ(bitmap.Cardinality(), 100); + for (int64_t i = 0; i < 100; ++i) { + ASSERT_TRUE(bitmap.Contains(i + 100000000000ll)); + } + } + // incremental insert (multiple AddMany calls accumulate) + { + RoaringBitmap64 bitmap; + std::vector batch1 = {10ll, 20ll, 30ll}; + std::vector batch2 = {20ll, 40ll, 100000000000ll}; + bitmap.AddMany(batch1.size(), batch1.data()); + ASSERT_EQ(bitmap.Cardinality(), 3); + bitmap.AddMany(batch2.size(), batch2.data()); + ASSERT_EQ(bitmap.Cardinality(), 5); + ASSERT_TRUE(bitmap.Contains(10ll)); + ASSERT_TRUE(bitmap.Contains(20ll)); + ASSERT_TRUE(bitmap.Contains(30ll)); + ASSERT_TRUE(bitmap.Contains(40ll)); + ASSERT_TRUE(bitmap.Contains(100000000000ll)); + } + // large values near MAX_VALUE + { + std::vector values; + for (int64_t i = 0; i < 50; ++i) { + values.push_back(RoaringBitmap64::MAX_VALUE - 1 - i); + } + RoaringBitmap64 bitmap; + bitmap.AddMany(values.size(), values.data()); + ASSERT_EQ(bitmap.Cardinality(), 50); + for (auto v : values) { + ASSERT_TRUE(bitmap.Contains(v)); + } + } + // consistency with Add: AddMany must produce the same bitmap as repeated Add + { + std::vector values; + for (int64_t i = 0; i < 100; ++i) { + values.push_back(i + 100000000000ll); + } + for (int64_t i = 0; i < 50; ++i) { + values.push_back(i + 200000000000ll); + } + RoaringBitmap64 bitmap_add; + for (auto v : values) { + bitmap_add.Add(v); + } + RoaringBitmap64 bitmap_add_many; + bitmap_add_many.AddMany(values.size(), values.data()); + ASSERT_EQ(bitmap_add, bitmap_add_many); + } +} + } // namespace paimon::test diff --git a/third_party/roaring_bitmap/roaring.hh b/third_party/roaring_bitmap/roaring.hh index ff0c17b13..2bb158fdc 100644 --- a/third_party/roaring_bitmap/roaring.hh +++ b/third_party/roaring_bitmap/roaring.hh @@ -2552,6 +2552,10 @@ public: friend class Roaring64MapSetBitForwardIterator; friend class Roaring64MapSetBitBiDirectionalIterator; + + Roaring &getOrCreateInner(uint32_t high) { + return lookupOrCreateInner(high); + } typedef Roaring64MapSetBitForwardIterator const_iterator; typedef Roaring64MapSetBitBiDirectionalIterator const_bidirectional_iterator;