Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
#include <memory>
#include <vector>

#include "paimon/bucket/bucket_function_type.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "paimon/utils/bucket_function_type.h"
#include "paimon/visibility.h"

struct ArrowSchema;
Expand Down
10 changes: 6 additions & 4 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ set(PAIMON_COMMON_SRCS
common/utils/bit_set.cpp
common/utils/bloom_filter.cpp
common/utils/bloom_filter64.cpp
common/utils/bucket_id_calculator.cpp
common/utils/crc32c.cpp
common/utils/decimal_utils.cpp
common/utils/delta_varint_compressor.cpp
common/utils/fields_comparator.cpp
common/utils/path_util.cpp
common/utils/range.cpp
common/utils/read_ahead_cache.cpp
Expand All @@ -151,6 +151,7 @@ set(PAIMON_CORE_SRCS
core/append/bucketed_append_compact_manager.cpp
core/bucket/hive_bucket_function.cpp
core/bucket/mod_bucket_function.cpp
core/bucket/bucket_id_calculator.cpp
core/casting/binary_to_string_cast_executor.cpp
core/casting/boolean_to_decimal_cast_executor.cpp
core/casting/boolean_to_numeric_cast_executor.cpp
Expand Down Expand Up @@ -304,7 +305,6 @@ set(PAIMON_CORE_SRCS
core/table/source/data_evolution_batch_scan.cpp
core/tag/tag.cpp
core/utils/field_mapping.cpp
core/utils/fields_comparator.cpp
core/utils/file_store_path_factory.cpp
core/utils/file_utils.cpp
core/utils/manifest_meta_reader.cpp
Expand Down Expand Up @@ -350,6 +350,7 @@ if(PAIMON_BUILD_TESTS)
common/memory/bytes_test.cpp
common/memory/memory_segment_test.cpp
common/memory/memory_segment_utils_test.cpp
common/memory/memory_slice_test.cpp
STATIC_LINK_LIBS
paimon_shared
test_utils_static
Expand Down Expand Up @@ -417,6 +418,7 @@ if(PAIMON_BUILD_TESTS)
common/io/data_input_output_stream_test.cpp
common/io/buffered_input_stream_test.cpp
common/io/memory_segment_output_stream_test.cpp
common/io/cache_input_stream_test.cpp
common/io/offset_input_stream_test.cpp
common/lookup/sort/sort_lookup_store_test.cpp
common/logging/logging_test.cpp
Expand Down Expand Up @@ -455,13 +457,13 @@ if(PAIMON_BUILD_TESTS)
common/utils/bloom_filter_test.cpp
common/utils/bloom_filter64_test.cpp
common/utils/xxhash_test.cpp
common/utils/bucket_id_calculator_test.cpp
common/utils/binary_row_partition_computer_test.cpp
common/utils/bin_packing_test.cpp
common/utils/data_converter_utils_test.cpp
common/utils/date_time_utils_test.cpp
common/utils/delta_varint_compressor_test.cpp
common/utils/field_type_utils_test.cpp
common/utils/fields_comparator_test.cpp
common/utils/internal_row_utils_test.cpp
common/utils/jsonizable_test.cpp
common/utils/linked_hash_map_test.cpp
Expand Down Expand Up @@ -524,6 +526,7 @@ if(PAIMON_BUILD_TESTS)
core/append/bucketed_append_compact_manager_test.cpp
core/bucket/default_bucket_function_test.cpp
core/bucket/hive_bucket_function_test.cpp
core/bucket/bucket_id_calculator_test.cpp
core/bucket/mod_bucket_function_test.cpp
core/casting/cast_executor_factory_test.cpp
core/casting/cast_executor_test.cpp
Expand Down Expand Up @@ -666,7 +669,6 @@ if(PAIMON_BUILD_TESTS)
core/utils/branch_manager_test.cpp
core/utils/file_store_path_factory_cache_test.cpp
core/utils/field_mapping_test.cpp
core/utils/fields_comparator_test.cpp
core/utils/file_store_path_factory_test.cpp
core/utils/file_utils_test.cpp
core/utils/manifest_meta_reader_test.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/compression/block_decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ int32_t BlockDecompressor::ReadIntLE(const char* buf) {
Status BlockDecompressor::ValidateLength(int32_t compressed_len, int32_t original_len) {
if (original_len < 0 || compressed_len < 0 || (original_len == 0 && compressed_len != 0) ||
(original_len != 0 && compressed_len == 0)) {
return Status::IOError(
return Status::Invalid(
fmt::format("Input is corrupted, compressed_len={}, , original_len={}", compressed_len,
original_len));
}
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/compression/lz4/lz4_block_compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Lz4BlockCompressor : public BlockCompressor {
LZ4_compress_default(src, dst + BlockCompressor::HEADER_LENGTH, src_length,
dst_length - BlockCompressor::HEADER_LENGTH);
if (compressed_size < 0) {
return Status::IOError(fmt::format("Compression failed with code {}", compressed_size));
return Status::Invalid(fmt::format("Compression failed with code {}", compressed_size));
}
WriteIntLE(compressed_size, dst);
WriteIntLE(src_length, dst + 4);
Expand Down
6 changes: 3 additions & 3 deletions src/paimon/common/compression/lz4/lz4_block_decompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ class Lz4BlockDecompressor : public BlockDecompressor {
PAIMON_RETURN_NOT_OK(ValidateLength(compressed_len, original_len));

if (dst_length < original_len) {
return Status::IOError(
return Status::Invalid(
fmt::format("Buffer length too small, compressed_len= {}, original_len={}",
compressed_len, original_len));
}

if (src_length - HEADER_LENGTH < compressed_len) {
return Status::IOError("Source data is not integral for decompression.");
return Status::Invalid("Source data is not integral for decompression.");
}

int32_t decompressed_size =
LZ4_decompress_safe(src + HEADER_LENGTH, dst, src_length - HEADER_LENGTH, dst_length);
if (decompressed_size != original_len) {
return Status::IOError(fmt::format("Input is corrupted, expected {}, but got {}",
return Status::Invalid(fmt::format("Input is corrupted, expected {}, but got {}",
original_len, decompressed_size));
}
return decompressed_size;
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/compression/zstd/zstd_block_compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ZstdBlockCompressor : public BlockCompressor {
int32_t dst_length) override {
size_t const compressed_size = ZSTD_compress(dst, dst_length, src, src_length, level_);
if (ZSTD_isError(compressed_size)) {
return Status::IOError(fmt::format("Compression failed with code {}", compressed_size));
return Status::Invalid(fmt::format("Compression failed with code {}", compressed_size));
}
return compressed_size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ZstdBlockDecompressor : public BlockDecompressor {
int32_t dst_length) override {
int32_t decompressed_size = ZSTD_decompress(dst, dst_length, src, src_length);
if (ZSTD_isError(decompressed_size)) {
return Status::IOError(
return Status::Invalid(
fmt::format("Input is corrupted with return code {}", decompressed_size));
}
return decompressed_size;
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/data/binary_row_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ TEST_F(BinaryRowTest, TestZeroOutPaddingString) {
writer.Reset();
writer.WriteString(0, BinaryString::FromString("wahahah", pool.get()));
writer.Complete();
int hash1 = row.HashCode();
int32_t hash1 = row.HashCode();

writer.Reset();
for (int32_t i = 0; i < bytes_size; i++) {
Expand All @@ -423,7 +423,7 @@ TEST_F(BinaryRowTest, TestZeroOutPaddingString) {
writer.Reset();
writer.WriteString(0, BinaryString::FromString("wahahah", pool.get()));
writer.Complete();
int hash2 = row.HashCode();
int32_t hash2 = row.HashCode();

ASSERT_EQ(hash2, hash1);
}
Expand Down
6 changes: 3 additions & 3 deletions src/paimon/common/data/binary_section_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class BinarySectionTest : public ::testing::Test {
pool_ = GetDefaultPool();

segment_ = MemorySegment::AllocateHeapMemory(2048, pool_.get());
for (int i = 0; i < 2048; ++i) {
for (int32_t i = 0; i < 2048; ++i) {
segment_.Put(i, static_cast<uint8_t>(i % 128));
}
offset_ = 0;
Expand All @@ -53,7 +53,7 @@ TEST_F(BinarySectionTest, EqualityOperator) {
TEST_F(BinarySectionTest, ToBytes) {
auto bytes = binary_section_.ToBytes(pool_.get());
ASSERT_EQ(bytes->size(), size_in_bytes_);
for (int i = 0; i < size_in_bytes_; ++i) {
for (int32_t i = 0; i < size_in_bytes_; ++i) {
EXPECT_EQ(static_cast<uint8_t>(bytes->data()[i]), static_cast<uint8_t>(i % 128));
}
}
Expand All @@ -68,7 +68,7 @@ TEST_F(BinarySectionTest, ReadBinary) {
auto bytes =
BinarySection::ReadBinary(segment_, 0, offset_, variable_part_offset_and_len, pool_.get());
ASSERT_EQ(bytes->size(), size_in_bytes_);
for (int i = 0; i < size_in_bytes_; ++i) {
for (int32_t i = 0; i < size_in_bytes_; ++i) {
EXPECT_EQ(static_cast<uint8_t>(bytes->data()[i]), static_cast<uint8_t>(i % 128));
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/paimon/common/data/columnar/columnar_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

namespace paimon {
Status ColumnarArray::CheckNoNull() const {
for (int i = 0; i < length_; i++) {
for (int32_t i = 0; i < length_; i++) {
if (IsNullAt(i)) {
return Status::Invalid(fmt::format("row {} is null", i));
}
Expand Down Expand Up @@ -92,7 +92,7 @@ std::shared_ptr<InternalRow> ColumnarArray::GetRow(int32_t pos, int32_t num_fiel
Result<std::vector<char>> ColumnarArray::ToBooleanArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<char> res(length_);
for (int i = 0; i < length_; i++) {
for (int32_t i = 0; i < length_; i++) {
bool element = GetBoolean(i);
res[i] = element ? static_cast<char>(1) : static_cast<char>(0);
}
Expand All @@ -102,7 +102,7 @@ Result<std::vector<char>> ColumnarArray::ToBooleanArray() const {
Result<std::vector<char>> ColumnarArray::ToByteArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<char> res(length_);
for (int i = 0; i < length_; i++) {
for (int32_t i = 0; i < length_; i++) {
res[i] = GetByte(i);
}
return res;
Expand All @@ -111,7 +111,7 @@ Result<std::vector<char>> ColumnarArray::ToByteArray() const {
Result<std::vector<int16_t>> ColumnarArray::ToShortArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<int16_t> res(length_);
for (int i = 0; i < length_; i++) {
for (int32_t i = 0; i < length_; i++) {
res[i] = GetShort(i);
}
return res;
Expand All @@ -120,7 +120,7 @@ Result<std::vector<int16_t>> ColumnarArray::ToShortArray() const {
Result<std::vector<int32_t>> ColumnarArray::ToIntArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<int32_t> res(length_);
for (int i = 0; i < length_; i++) {
for (int32_t i = 0; i < length_; i++) {
res[i] = GetInt(i);
}
return res;
Expand All @@ -129,7 +129,7 @@ Result<std::vector<int32_t>> ColumnarArray::ToIntArray() const {
Result<std::vector<int64_t>> ColumnarArray::ToLongArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<int64_t> res(length_);
for (int i = 0; i < length_; i++) {
for (int32_t i = 0; i < length_; i++) {
res[i] = GetLong(i);
}
return res;
Expand All @@ -138,7 +138,7 @@ Result<std::vector<int64_t>> ColumnarArray::ToLongArray() const {
Result<std::vector<float>> ColumnarArray::ToFloatArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<float> res(length_);
for (int i = 0; i < length_; i++) {
for (int32_t i = 0; i < length_; i++) {
res[i] = GetFloat(i);
}
return res;
Expand All @@ -147,7 +147,7 @@ Result<std::vector<float>> ColumnarArray::ToFloatArray() const {
Result<std::vector<double>> ColumnarArray::ToDoubleArray() const {
PAIMON_RETURN_NOT_OK(CheckNoNull());
std::vector<double> res(length_);
for (int i = 0; i < length_; i++) {
for (int32_t i = 0; i < length_; i++) {
res[i] = GetDouble(i);
}
return res;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "paimon/common/data/generic_row.h"
#include "paimon/common/data/serializer/binary_serializer_utils.h"
#include "paimon/common/utils/date_time_utils.h"
#include "paimon/core/utils/fields_comparator.h"
#include "paimon/common/utils/fields_comparator.h"

namespace paimon {
Result<std::unique_ptr<RowCompactedSerializer>> RowCompactedSerializer::Create(
Expand Down
6 changes: 3 additions & 3 deletions src/paimon/common/executor/default_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ TEST(DefaultExecutorTest, TestViaVoidFunc) {
auto executor = GetGlobalDefaultExecutor();
std::atomic<int64_t> sum = {0};
std::vector<std::future<void>> futures;
for (int i = 0; i < 10; ++i) {
for (int32_t i = 0; i < 10; ++i) {
futures.push_back(Via(executor.get(), [&sum]() { sum++; }));
}
Wait(futures);
Expand All @@ -46,8 +46,8 @@ TEST(DefaultExecutorTest, TestVia) {
auto executor = GetGlobalDefaultExecutor();
std::atomic<int64_t> sum = {0};
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; ++i) {
futures.push_back(Via(executor.get(), [i, &sum]() -> int {
for (int32_t i = 0; i < 10; ++i) {
futures.push_back(Via(executor.get(), [i, &sum]() -> int32_t {
sum++;
return i * 2;
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

#include "paimon/common/file_index/bitmap/bitmap_file_index_meta.h"

#include <map>
#include <memory>
#include <string>
#include <vector>

#include "gtest/gtest.h"
#include "paimon/common/file_index/bitmap/bitmap_file_index.h"
#include "paimon/common/file_index/bitmap/bitmap_file_index_meta_v1.h"
#include "paimon/common/file_index/bitmap/bitmap_file_index_meta_v2.h"
#include "paimon/common/io/memory_segment_output_stream.h"
#include "paimon/common/memory/memory_segment_utils.h"
#include "paimon/defs.h"
#include "paimon/fs/file_system.h"
#include "paimon/io/byte_array_input_stream.h"
Expand Down Expand Up @@ -110,4 +115,70 @@ TEST(BitmapFileIndexMetaTest, TestInvalidType) {
"not support field type DECIMAL in BitmapIndex");
}

// Test that when block_size is set small enough, entries overflow a single block and new blocks
// are created during serialization.
TEST(BitmapFileIndexMetaTest, TestSmallBlockSizeForcesMultipleBlocks) {
auto pool = GetDefaultPool();

int32_t entry_count = 5;
int32_t row_count = 100;

std::vector<BitmapFileIndexMeta::Entry> write_entries;
write_entries.reserve(entry_count);
for (int32_t i = 0; i < entry_count; i++) {
Literal key(FieldType::INT, i);
write_entries.emplace_back(key, /*offset=*/i * 10, /*length=*/10);
}

// Set a very small block size to force block overflow
std::map<std::string, std::string> options;
options[BitmapFileIndex::INDEX_BLOCK_SIZE] = "20";

BitmapFileIndexMeta::Entry null_entry(Literal(FieldType::INT), /*offset=*/0, /*length=*/5);

BitmapFileIndexMetaV2 write_meta(FieldType::INT, row_count, /*has_null_value=*/true, null_entry,
std::move(write_entries), options, pool);

// Serialize
auto output_stream = std::make_shared<MemorySegmentOutputStream>(/*segment_size=*/1024, pool);
ASSERT_OK(write_meta.Serialize(output_stream));

// Copy serialized data to a byte buffer
auto serialized_bytes = MemorySegmentUtils::CopyToBytes(
output_stream->Segments(), 0, output_stream->CurrentSize(), pool.get());
ASSERT_TRUE(serialized_bytes != nullptr);

// Deserialize from the serialized bytes
auto input_stream =
std::make_shared<ByteArrayInputStream>(serialized_bytes->data(), serialized_bytes->size());
BitmapFileIndexMetaV2 read_meta(FieldType::INT, serialized_bytes->size(), pool);
ASSERT_OK(read_meta.Deserialize(input_stream));

ASSERT_EQ(read_meta.GetRowCount(), row_count);

// Verify null entry lookup
Literal null_literal(FieldType::INT);
ASSERT_OK_AND_ASSIGN(const BitmapFileIndexMeta::Entry* found_null,
read_meta.FindEntry(null_literal));
ASSERT_TRUE(found_null != nullptr);
ASSERT_EQ(found_null->offset, 0);
ASSERT_EQ(found_null->length, 5);

// Verify all non-null entries can be found with correct offset/length
for (int32_t i = 0; i < entry_count; i++) {
Literal key(FieldType::INT, i);
ASSERT_OK_AND_ASSIGN(const BitmapFileIndexMeta::Entry* found_entry,
read_meta.FindEntry(key));
ASSERT_TRUE(found_entry != nullptr);
ASSERT_EQ(found_entry->offset, i * 10);
ASSERT_EQ(found_entry->length, 10);
}

// Verify non-existent key returns nullptr
Literal missing_key(FieldType::INT, 999);
ASSERT_OK_AND_ASSIGN(const BitmapFileIndexMeta::Entry* missing_entry,
read_meta.FindEntry(missing_key));
ASSERT_FALSE(missing_entry);
}

} // namespace paimon::test
Loading
Loading