Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/paimon/utils/roaring_bitmap64.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ class PAIMON_EXPORT RoaringBitmap64 {
/// @param x value added to bitmap
void Add(int64_t x);

/// Bulk-insert `n` values into the bitmap.
///
/// Compared to repeatedly calling `Add`, this implementation:
/// 1. Buckets the input values by their high-32 bits in a single pass.
/// 2. Feeds each bucket to the inner 32-bit Roaring's true-batch
/// `addMany(uint32_t*)` path, which performs container-level bulk
/// insertion.
///
/// This avoids the per-value `std::map` lookup of the 64-bit wrapper and
/// the per-value insertion overhead inside the 32-bit array container.
/// Values may be unsorted; ordering is handled internally.
void AddMany(size_t n, const int64_t* values);

/// @param x value added to bitmap
/// @return false if contain x; true if not contain x
bool CheckedAdd(int64_t x);
Expand Down
10 changes: 10 additions & 0 deletions src/paimon/common/global_index/btree/btree_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ struct BtreeDefs {
static inline const char kBtreeIndexHighPriorityPoolRatio[] =
"btree-index.high-priority-pool-ratio";

/// "btree-index.read-buffer-size" - Optional. Specifies the read buffer size for the B-tree
/// index. This setting can be tuned based on query patterns:
/// - For range queries (e.g., `VisitLessThan`, `VisitGreaterOrEqual`), increasing the buffer
/// size (e.g., to 1MB) may improve I/O bandwidth and sequential read performance.
/// - For point queries (e.g., `VisitEqual`), buffering can introduce negative effects due to
/// read amplification; it is recommended to leave this option unset.
///
/// If specified, read block with `BufferedInputStream`.
static inline const char kBtreeIndexReadBufferSize[] = "btree-index.read-buffer-size";

static inline const char kDefaultBtreeIndexBlockSize[] = "64KB";
static inline const char kDefaultBtreeIndexCompression[] = "none";
static inline const char kDefaultBtreeIndexCacheSize[] = "128MB";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
#include "paimon/common/factories/io_hook.h"
#include "paimon/common/global_index/btree/btree_global_index_writer.h"
#include "paimon/common/global_index/btree/btree_global_indexer.h"
#include "paimon/common/global_index/btree/lazy_filtered_btree_reader.h"
#include "paimon/common/options/memory_size.h"
#include "paimon/common/utils/scope_guard.h"
#include "paimon/data/decimal.h"
#include "paimon/data/timestamp.h"
#include "paimon/fs/file_system.h"
#include "paimon/global_index/bitmap_global_index_result.h"
#include "paimon/global_index/io/global_index_file_reader.h"
#include "paimon/global_index/io/global_index_file_writer.h"
#include "paimon/io/buffered_input_stream.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/predicate/literal.h"
#include "paimon/testing/utils/io_exception_helper.h"
Expand Down Expand Up @@ -1506,11 +1509,6 @@ TEST_P(BTreeGlobalIndexIntegrationTest, WriteAndReadLargeDataWithSmallBlocks) {
ASSERT_OK_AND_ASSIGN(auto metas, writer->Finish());
ASSERT_EQ(metas.size(), 1);

auto file_reader = std::make_shared<FakeGlobalIndexFileReader>(fs_, base_path_);
c_schema = CreateArrowSchema(field);
ASSERT_OK_AND_ASSIGN(auto reader,
indexer->CreateReader(c_schema.get(), file_reader, metas, pool_));

// Helper lambda: given a predicate on value, collect matching row ids
auto collect_rows = [total_rows = total_rows](std::function<bool(int32_t)> predicate) {
std::vector<int64_t> result;
Expand All @@ -1529,69 +1527,110 @@ TEST_P(BTreeGlobalIndexIntegrationTest, WriteAndReadLargeDataWithSmallBlocks) {
return result;
};

// --- VisitIsNull ---
{
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull());
CheckResult(result, null_row_ids);
}
// Read back with different read-buffer-size configurations:
// "" -> no buffer (original path)
// "128" -> very small buffer (smaller than block size 256)
// "512" -> moderate buffer (larger than block size)
// "64KB" -> large buffer
// "1MB" -> very large buffer (likely covers entire file)
std::vector<std::string> buffer_sizes = {"", "128", "512", "64KB", "1MB"};

for (const auto& buf_size : buffer_sizes) {
std::map<std::string, std::string> read_options = {
{BtreeDefs::kBtreeIndexBlockSize, "256"}, {BtreeDefs::kBtreeIndexCacheSize, "1024"}};
if (!buf_size.empty()) {
read_options[BtreeDefs::kBtreeIndexReadBufferSize] = buf_size;
}
ASSERT_OK_AND_ASSIGN(auto read_indexer, BTreeGlobalIndexer::Create(read_options));

// --- VisitIsNotNull ---
{
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull());
CheckResult(result, non_null_row_ids);
}
auto file_reader = std::make_shared<FakeGlobalIndexFileReader>(fs_, base_path_);
c_schema = CreateArrowSchema(field);
ASSERT_OK_AND_ASSIGN(auto reader,
read_indexer->CreateReader(c_schema.get(), file_reader, metas, pool_));

// check input stream and read buffer size
{
auto lazy_reader = std::dynamic_pointer_cast<LazyFilteredBTreeReader>(reader);
ASSERT_TRUE(lazy_reader);
ASSERT_OK_AND_ASSIGN(auto tmp_reader, lazy_reader->GetOrCreateReader(metas[0]));
auto btree_reader = std::dynamic_pointer_cast<BTreeGlobalIndexReader>(tmp_reader);
ASSERT_TRUE(btree_reader);
auto input_stream = btree_reader->sst_file_reader_->block_cache_->in_;
auto buffered_stream = std::dynamic_pointer_cast<BufferedInputStream>(input_stream);
if (!buf_size.empty()) {
ASSERT_TRUE(buffered_stream);
ASSERT_OK_AND_ASSIGN(int32_t expected_buffer_size,
MemorySize::ParseBytes(buf_size));
ASSERT_EQ(buffered_stream->buffer_size_, expected_buffer_size);
} else {
ASSERT_FALSE(buffered_stream);
}
}

// --- VisitEqual for value 0 ---
{
Literal lit_0(0);
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_0));
CheckResult(result, collect_rows([](int32_t v) { return v == 0; }));
}
// --- VisitIsNull ---
{
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNull());
CheckResult(result, null_row_ids);
}

// --- VisitEqual for a value in the middle ---
{
Literal lit_100(100);
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_100));
CheckResult(result, collect_rows([](int32_t v) { return v == 100; }));
}
// --- VisitIsNotNull ---
{
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIsNotNull());
CheckResult(result, non_null_row_ids);
}

// --- VisitEqual for non-existent value ---
{
Literal lit_neg(static_cast<int32_t>(-1));
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_neg));
CheckResult(result, {});
}
// --- VisitEqual for value 0 ---
{
Literal lit_0(0);
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_0));
CheckResult(result, collect_rows([](int32_t v) { return v == 0; }));
}

// --- VisitLessThan for value 5 ---
{
Literal lit_5(5);
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessThan(lit_5));
CheckResult(result, collect_rows([](int32_t v) { return v < 5; }));
}
// --- VisitEqual for a value in the middle ---
{
Literal lit_100(100);
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_100));
CheckResult(result, collect_rows([](int32_t v) { return v == 100; }));
}

// --- VisitGreaterOrEqual for a high value near max ---
{
int32_t max_val =
static_cast<int32_t>(collect_rows([](int32_t) { return true; }).size()) / 3;
int32_t threshold = max_val - 2;
Literal lit_threshold(threshold);
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterOrEqual(lit_threshold));
CheckResult(result, collect_rows([threshold](int32_t v) { return v >= threshold; }));
}
// --- VisitEqual for non-existent value ---
{
Literal lit_neg(static_cast<int32_t>(-1));
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitEqual(lit_neg));
CheckResult(result, {});
}

// --- VisitIn for scattered values ---
{
std::vector<Literal> in_literals = {Literal(0), Literal(500), Literal(10000)};
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIn(in_literals));
CheckResult(result,
collect_rows([](int32_t v) { return v == 0 || v == 500 || v == 10000; }));
}
// --- VisitLessThan for value 5 ---
{
Literal lit_5(5);
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitLessThan(lit_5));
CheckResult(result, collect_rows([](int32_t v) { return v < 5; }));
}

// --- VisitNotEqual for value 0 ---
{
Literal lit_0(0);
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitNotEqual(lit_0));
CheckResult(result, collect_rows([](int32_t v) { return v != 0; }));
// --- VisitGreaterOrEqual for a high value near max ---
{
int32_t max_val =
static_cast<int32_t>(collect_rows([](int32_t) { return true; }).size()) / 3;
int32_t threshold = max_val - 2;
Literal lit_threshold(threshold);
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitGreaterOrEqual(lit_threshold));
CheckResult(result, collect_rows([threshold](int32_t v) { return v >= threshold; }));
}

// --- VisitIn for scattered values ---
{
std::vector<Literal> in_literals = {Literal(0), Literal(500), Literal(10000)};
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitIn(in_literals));
CheckResult(result,
collect_rows([](int32_t v) { return v == 0 || v == 500 || v == 10000; }));
}

// --- VisitNotEqual for value 0 ---
{
Literal lit_0(0);
ASSERT_OK_AND_ASSIGN(auto result, reader->VisitNotEqual(lit_0));
CheckResult(result, collect_rows([](int32_t v) { return v != 0; }));
}
}
}

Expand Down Expand Up @@ -1722,6 +1761,22 @@ TEST_P(BTreeGlobalIndexIntegrationTest, FinishWithEmptyData) {
ASSERT_NOK_WITH_MSG(writer->Finish(), "Should never write an empty btree index file");
}

TEST_P(BTreeGlobalIndexIntegrationTest, InvalidReadOptions) {
auto file_writer = std::make_shared<FakeGlobalIndexFileWriter>(fs_, base_path_);
auto field = arrow::field("int_field", arrow::int32());
auto c_schema = CreateArrowSchema(field);

std::map<std::string, std::string> options = {{BtreeDefs::kBtreeIndexBlockSize, "4096"},
{BtreeDefs::kBtreeIndexCompression, GetParam()},
{BtreeDefs::kBtreeIndexReadBufferSize, "4GB"}};
ASSERT_OK_AND_ASSIGN(auto indexer, BTreeGlobalIndexer::Create(options));

auto file_reader = std::make_shared<FakeGlobalIndexFileReader>(fs_, base_path_);
ASSERT_NOK_WITH_MSG(indexer->CreateReader(c_schema.get(), file_reader, /*files=*/{}, pool_),
"In BTreeGlobalIndexer::CreateReader: option btree-index.read-buffer-size "
"is 4GB, exceed INT_MAX or less than 0");
}

TEST_P(BTreeGlobalIndexIntegrationTest, TestIOException) {
bool run_complete = false;
auto io_hook = paimon::IOHook::GetInstance();
Expand Down
Loading
Loading