From fd719fff3b3cce25de6f02be3ba0cb8d5819069b Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 10 Mar 2026 15:50:07 +0800 Subject: [PATCH 1/7] feat(compaction): adapt to sst reader/writer --- include/paimon/defs.h | 15 + src/paimon/CMakeLists.txt | 3 + .../compression/block_compression_factory.cpp | 20 +- .../compression/block_compression_factory.h | 7 +- .../serializer/row_compacted_serializer.cpp | 44 +++ .../serializer/row_compacted_serializer.h | 4 + .../row_compacted_serializer_test.cpp | 278 ++++++++++++++++++ src/paimon/common/defs.cpp | 5 + .../rangebitmap/dictionary/key_factory.cpp | 31 +- src/paimon/common/io/cache/cache_manager.cpp | 2 +- src/paimon/common/io/cache/cache_manager.h | 2 +- .../common/lookup/lookup_store_factory.cpp | 42 +++ .../common/lookup/lookup_store_factory.h | 68 +++++ .../lookup/sort/sort_lookup_store_factory.cpp | 55 ++++ .../lookup/sort/sort_lookup_store_factory.h | 89 ++++++ .../lookup/sort/sort_lookup_store_test.cpp | 118 ++++++++ src/paimon/common/memory/memory_slice.cpp | 6 +- src/paimon/common/memory/memory_slice.h | 8 +- src/paimon/common/sst/block_cache.h | 6 + src/paimon/common/sst/block_entry.h | 11 +- src/paimon/common/sst/block_iterator.cpp | 21 +- src/paimon/common/sst/block_iterator.h | 6 +- src/paimon/common/sst/block_reader.cpp | 22 +- src/paimon/common/sst/block_reader.h | 40 +-- src/paimon/common/sst/sst_file_io_test.cpp | 45 ++- src/paimon/common/sst/sst_file_reader.cpp | 84 +++--- src/paimon/common/sst/sst_file_reader.h | 34 +-- src/paimon/common/sst/sst_file_utils.h | 10 +- src/paimon/common/utils/bloom_filter.cpp | 9 +- src/paimon/core/core_options.cpp | 42 +++ src/paimon/core/core_options.h | 7 + src/paimon/core/core_options_test.cpp | 17 +- src/paimon/core/options/compress_options.h | 27 ++ src/paimon/core/utils/fields_comparator.cpp | 125 ++++++++ src/paimon/core/utils/fields_comparator.h | 33 +++ 35 files changed, 1140 insertions(+), 196 deletions(-) create mode 100644 src/paimon/common/lookup/lookup_store_factory.cpp create mode 100644 src/paimon/common/lookup/lookup_store_factory.h create mode 100644 src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp create mode 100644 src/paimon/common/lookup/sort/sort_lookup_store_factory.h create mode 100644 src/paimon/common/lookup/sort/sort_lookup_store_test.cpp create mode 100644 src/paimon/core/options/compress_options.h diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 297d122a..89550a80 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -329,6 +329,21 @@ struct PAIMON_EXPORT Options { /// ratio (e.g. `compaction.offpeak-ratio=20`) to enable more aggressive data compaction. /// Default is 0. static const char COMPACTION_OFFPEAK_RATIO[]; + /// "lookup.cache.bloom.filter.enabled" - Whether to enable the bloom filter for lookup cache. + /// Default value is true. + static const char LOOKUP_CACHE_BLOOM_FILTER_ENABLED[]; + /// "lookup.cache.bloom.filter.fpp" - Define the default false positive probability for lookup + /// cache bloom filters. Default value is 0.05. + static const char LOOKUP_CACHE_BLOOM_FILTER_FPP[]; + /// "lookup.cache-spill-compression" - Spill compression for lookup cache, currently zstd, none, + /// lz4 and lzo are supported. Default value is zstd. + static const char LOOKUP_CACHE_SPILL_COMPRESSION[]; + /// "spill-compression.zstd-level" - Default spill compression zstd level. For higher + /// compression rates, it can be configured to 9, but the read and write speed will + /// significantly decrease. Default value is 1. + static const char SPILL_COMPRESSION_ZSTD_LEVEL[]; + /// "cache-page-size" - Memory page size for caching. Default value is 64 kb. + static const char CACHE_PAGE_SIZE[]; }; static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits::max(); diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index afad9bb1..6d3f2198 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -68,6 +68,8 @@ set(PAIMON_COMMON_SRCS common/io/cache/cache_key.cpp common/io/cache/cache_manager.cpp common/logging/logging.cpp + common/lookup/sort/sort_lookup_store_factory.cpp + common/lookup/lookup_store_factory.cpp common/memory/bytes.cpp common/memory/memory_pool.cpp common/memory/memory_segment.cpp @@ -385,6 +387,7 @@ if(PAIMON_BUILD_TESTS) common/io/buffered_input_stream_test.cpp common/io/memory_segment_output_stream_test.cpp common/io/offset_input_stream_test.cpp + common/lookup/sort/sort_lookup_store_test.cpp common/logging/logging_test.cpp common/metrics/metrics_impl_test.cpp common/options/memory_size_test.cpp diff --git a/src/paimon/common/compression/block_compression_factory.cpp b/src/paimon/common/compression/block_compression_factory.cpp index c11b3e48..ea0f2104 100644 --- a/src/paimon/common/compression/block_compression_factory.cpp +++ b/src/paimon/common/compression/block_compression_factory.cpp @@ -19,9 +19,24 @@ #include "paimon/common/compression/lz4/lz4_block_compression_factory.h" #include "paimon/common/compression/none_block_compression_factory.h" #include "paimon/common/compression/zstd/zstd_block_compression_factory.h" +#include "paimon/common/utils/string_utils.h" namespace paimon { +Result> BlockCompressionFactory::Create( + const CompressOptions& compression) { + auto compress = StringUtils::ToLowerCase(compression.compress); + if (compress == "none") { + return std::make_shared(); + } else if (compress == "zstd") { + return std::make_shared(compression.zstd_level); + } else if (compress == "lz4") { + return std::make_shared(); + } + // TODO(liangzi): LZO support + return Status::Invalid(fmt::format("Unsupported compression type: {}", compress)); +} + Result> BlockCompressionFactory::Create( BlockCompressionType compression) { switch (compression) { @@ -33,9 +48,8 @@ Result> BlockCompressionFactory::Create return std::make_shared(ZSTD_COMPRESSION_LEVEL); default: // TODO(liangzi): LZO support - return Status::Invalid("Unsupported compression type: " + - std::to_string(static_cast(compression))); + return Status::Invalid( + fmt::format("Unsupported compression type: {}", static_cast(compression))); } } - } // namespace paimon diff --git a/src/paimon/common/compression/block_compression_factory.h b/src/paimon/common/compression/block_compression_factory.h index d58f41a5..7154b67d 100644 --- a/src/paimon/common/compression/block_compression_factory.h +++ b/src/paimon/common/compression/block_compression_factory.h @@ -21,8 +21,8 @@ #include "paimon/common/compression/block_compression_type.h" #include "paimon/common/compression/block_compressor.h" #include "paimon/common/compression/block_decompressor.h" +#include "paimon/core/options/compress_options.h" #include "paimon/result.h" - namespace paimon { /// Each compression codec has an implementation of {@link BlockCompressionFactory} to create @@ -30,7 +30,10 @@ namespace paimon { class BlockCompressionFactory { public: static Result> Create( - BlockCompressionType compression); + const CompressOptions& compression); + + static Result> Create( + BlockCompressionType compress_type); BlockCompressionFactory() = default; virtual ~BlockCompressionFactory() = default; diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.cpp b/src/paimon/common/data/serializer/row_compacted_serializer.cpp index 6fa11afd..19fd8774 100644 --- a/src/paimon/common/data/serializer/row_compacted_serializer.cpp +++ b/src/paimon/common/data/serializer/row_compacted_serializer.cpp @@ -23,6 +23,8 @@ #include "paimon/common/data/generic_row.h" #include "paimon/common/data/serializer/binary_serializer_utils.h" #include "paimon/common/utils/date_time_utils.h" +#include "paimon/core/utils/fields_comparator.h" + namespace paimon { Result> RowCompactedSerializer::Create( const std::shared_ptr& schema, const std::shared_ptr& pool) { @@ -41,6 +43,48 @@ Result> RowCompactedSerializer::Create( schema, std::move(getters), std::move(writers), std::move(readers), pool)); } +Result RowCompactedSerializer::CreateSliceComparator( + const std::shared_ptr& schema, const std::shared_ptr& pool) { + int32_t bit_set_in_bytes = RowCompactedSerializer::CalculateBitSetInBytes(schema->num_fields()); + auto row_reader1 = std::make_shared(bit_set_in_bytes, pool); + auto row_reader2 = std::make_shared(bit_set_in_bytes, pool); + std::vector readers(schema->num_fields()); + std::vector comparators(schema->num_fields()); + for (int32_t i = 0; i < schema->num_fields(); i++) { + auto field_type = schema->field(i)->type(); + PAIMON_ASSIGN_OR_RAISE(readers[i], CreateFieldReader(field_type, pool)); + PAIMON_ASSIGN_OR_RAISE(comparators[i], + FieldsComparator::CompareVariant(i, field_type, /*use_view=*/false)); + } + auto comparator = [row_reader1, row_reader2, readers, comparators]( + const std::shared_ptr& slice1, + const std::shared_ptr& slice2) -> Result { + row_reader1->PointTo(*slice1->GetSegment(), slice1->Offset()); + row_reader2->PointTo(*slice2->GetSegment(), slice2->Offset()); + for (int32_t i = 0; i < static_cast(readers.size()); i++) { + bool is_null1 = row_reader1->IsNullAt(i); + bool is_null2 = row_reader2->IsNullAt(i); + if (!is_null1 || !is_null2) { + if (is_null1) { + return -1; + } else if (is_null2) { + return 1; + } else { + PAIMON_ASSIGN_OR_RAISE(VariantType field1, readers[i](i, row_reader1.get())); + PAIMON_ASSIGN_OR_RAISE(VariantType field2, readers[i](i, row_reader2.get())); + int32_t comp = comparators[i](field1, field2); + if (comp != 0) { + return comp; + } + } + } + } + return 0; + }; + return std::function(const std::shared_ptr&, + const std::shared_ptr&)>(comparator); +} + Result> RowCompactedSerializer::SerializeToBytes(const InternalRow& row) { if (!row_writer_) { row_writer_ = std::make_unique(CalculateBitSetInBytes(getters_.size()), pool_); diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.h b/src/paimon/common/data/serializer/row_compacted_serializer.h index 0c3db3d4..13be8ef6 100644 --- a/src/paimon/common/data/serializer/row_compacted_serializer.h +++ b/src/paimon/common/data/serializer/row_compacted_serializer.h @@ -21,6 +21,7 @@ #include "paimon/common/data/binary_writer.h" #include "paimon/common/memory/memory_segment.h" #include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/common/memory/memory_slice.h" #include "paimon/common/utils/var_length_int_utils.h" namespace paimon { class RowCompactedSerializer { @@ -36,6 +37,9 @@ class RowCompactedSerializer { Result> Deserialize(const std::shared_ptr& bytes); + static Result CreateSliceComparator( + const std::shared_ptr& schema, const std::shared_ptr& pool); + private: class RowWriter { public: diff --git a/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp b/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp index 95b857da..3189b291 100644 --- a/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp +++ b/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp @@ -505,4 +505,282 @@ TEST(RowCompactedSerializerTest, TestNestedNullWithTimestampAndDecimal2) { ASSERT_EQ(java_bytes, cpp_bytes); } } + +TEST(RowCompactedSerializerTest, TestSliceComparator) { + auto pool = GetDefaultPool(); + arrow::FieldVector fields = { + arrow::field("f1", arrow::boolean()), + arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int32()), + arrow::field("f5", arrow::int64()), + arrow::field("f6", arrow::float32()), + arrow::field("f7", arrow::float64()), + arrow::field("f8", arrow::utf8()), + arrow::field("f9", arrow::binary()), + arrow::field("f10", arrow::date32()), + arrow::field("f11", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("f12", arrow::timestamp(arrow::TimeUnit::SECOND)), + arrow::field("f13", arrow::decimal128(5, 2)), + arrow::field("f14", arrow::decimal128(30, 5)), + }; + auto check_result = [&](const std::shared_ptr& type, + const std::shared_ptr& array) { + auto struct_array = std::dynamic_pointer_cast(array); + ASSERT_TRUE(struct_array); + ASSERT_OK_AND_ASSIGN(auto serializer, + RowCompactedSerializer::Create(arrow::schema(type->fields()), pool)); + auto columnar_row1 = std::make_shared( + /*struct_array=*/nullptr, struct_array->fields(), pool, /*row_id=*/0); + auto columnar_row2 = std::make_shared( + /*struct_array=*/nullptr, struct_array->fields(), pool, /*row_id=*/1); + + ASSERT_OK_AND_ASSIGN(auto bytes1, serializer->SerializeToBytes(*columnar_row1)); + auto slice1 = MemorySlice::Wrap(bytes1); + ASSERT_OK_AND_ASSIGN(auto bytes2, serializer->SerializeToBytes(*columnar_row2)); + auto slice2 = MemorySlice::Wrap(bytes2); + + ASSERT_OK_AND_ASSIGN(auto comparator, RowCompactedSerializer::CreateSliceComparator( + arrow::schema(type->fields()), pool)); + ASSERT_EQ(-1, comparator(slice1, slice2).value()); + ASSERT_EQ(1, comparator(slice2, slice1).value()); + ASSERT_EQ(0, comparator(slice1, slice1).value()); + ASSERT_EQ(0, comparator(slice2, slice2).value()); + }; + // test various type + { + auto type = arrow::struct_({fields[0], fields[1], fields[2], fields[3]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [false, 0, 32767, 2147483647], + [true, 0, 32767, 2147483647] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[0], fields[1], fields[2], fields[3]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [true, 0, 32767, 2147483647], + [true, 1, 32767, 2147483647] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[0], fields[1], fields[2], fields[3]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [true, 0, 32766, 2147483647], + [true, 0, 32767, 2147483647] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[0], fields[1], fields[2], fields[3]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [true, 0, 32767, 2147483646], + [true, 0, 32767, 2147483647] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[4], fields[5], fields[6]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [4294967295, 10.1, 100.123], + [4294967296, 10.1, 100.123] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[4], fields[5], fields[6]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [4294967295, 10.1, 100.123], + [4294967295, 10.11, 100.123] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[4], fields[5], fields[6]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [4294967295, 10.1, 100.123], + [4294967295, 10.1, 100.124] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[7], fields[8], fields[9]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + ["Alice", "这是一个中文", 10], + ["Bob", "这是一个中文", 10] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[7], fields[8], fields[9]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + ["Alice", "这是一个中文", 10], + ["Alice", "这是一个中文!", 10] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[7], fields[8], fields[9]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + ["Alice", "这是一个中文", 10], + ["Alice", "这是一个中文", 20] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[10], fields[11], fields[12], fields[13]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [1732603136054000054, 11, "55.02", "-123456789987654321.45678"], + [1732603136054000055, 11, "55.02", "-123456789987654321.45678"] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[10], fields[11], fields[12], fields[13]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [1732603136054000054, 11, "55.02", "-123456789987654321.45678"], + [1732603136054000054, 12, "55.02", "-123456789987654321.45678"] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[10], fields[11], fields[12], fields[13]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [1732603136054000054, 11, "55.02", "-123456789987654321.45678"], + [1732603136054000054, 11, "55.03", "-123456789987654321.45678"] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[10], fields[11], fields[12], fields[13]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [1732603136054000054, 11, "55.02", "-123456789987654321.45678"], + [1732603136054000054, 11, "55.02", "-123456789987654321.45670"] + ])") + .ValueOrDie(); + check_result(type, array); + } + // test null + { + auto type = arrow::struct_({fields[0], fields[1]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [false, null], + [false, 20] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[0], fields[1]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [null, 20], + [null, 21] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[0], fields[1]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [false, null], + [true, 20] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[0], fields[1]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [null, 21], + [false, 20] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[0], fields[1]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [null, null], + [null, 20] + ])") + .ValueOrDie(); + check_result(type, array); + } + // test float & double + // -infinity < -0.0 < +0.0 < +infinity < NaN == NaN + { + auto type = arrow::struct_({fields[5], fields[5], fields[5], fields[6], fields[6]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [-Inf, -0.0, 0.0, Inf, NaN], + [-0.0, -0.0, 0.0, Inf, NaN] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[5], fields[5], fields[5], fields[6], fields[6]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [-Inf, -0.0, 0.0, Inf, NaN], + [-Inf, 0.0, 0.0, Inf, NaN] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[5], fields[5], fields[5], fields[6], fields[6]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [-Inf, -0.0, 0.0, 0.0, NaN], + [-Inf, -0.0, 0.0, Inf, NaN] + ])") + .ValueOrDie(); + check_result(type, array); + } + { + auto type = arrow::struct_({fields[5], fields[5], fields[5], fields[6], fields[6]}); + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(type, + R"([ + [-Inf, -0.0, 0.0, Inf, Inf], + [-Inf, -0.0, 0.0, Inf, NaN] + ])") + .ValueOrDie(); + check_result(type, array); + } +} + } // namespace paimon::test diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 91697085..91e1ee0e 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -93,4 +93,9 @@ const char Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD[] = const char Options::COMPACT_OFFPEAK_START_HOUR[] = "compaction.offpeak.start.hour"; const char Options::COMPACT_OFFPEAK_END_HOUR[] = "compaction.offpeak.end.hour"; const char Options::COMPACTION_OFFPEAK_RATIO[] = "compaction.offpeak-ratio"; +const char Options::LOOKUP_CACHE_BLOOM_FILTER_ENABLED[] = "lookup.cache.bloom.filter.enabled"; +const char Options::LOOKUP_CACHE_BLOOM_FILTER_FPP[] = "lookup.cache.bloom.filter.fpp"; +const char Options::LOOKUP_CACHE_SPILL_COMPRESSION[] = "lookup.cache-spill-compression"; +const char Options::SPILL_COMPRESSION_ZSTD_LEVEL[] = "spill-compression.zstd-level"; +const char Options::CACHE_PAGE_SIZE[] = "cache-page-size"; } // namespace paimon diff --git a/src/paimon/common/file_index/rangebitmap/dictionary/key_factory.cpp b/src/paimon/common/file_index/rangebitmap/dictionary/key_factory.cpp index eab86b37..19fc3728 100644 --- a/src/paimon/common/file_index/rangebitmap/dictionary/key_factory.cpp +++ b/src/paimon/common/file_index/rangebitmap/dictionary/key_factory.cpp @@ -23,35 +23,10 @@ #include "paimon/common/file_index/rangebitmap/dictionary/fixed_length_chunk.h" #include "paimon/common/file_index/rangebitmap/utils/literal_serialization_utils.h" #include "paimon/common/utils/field_type_utils.h" +#include "paimon/core/utils/fields_comparator.h" namespace paimon { -/// Java-compatible ordering for floating-point types: -/// -infinity < -0.0 < +0.0 < +infinity < NaN == NaN -template -static int32_t CompareFloatingPoint(T a, T b) { - const bool a_nan = std::isnan(a); - const bool b_nan = std::isnan(b); - if (a_nan && b_nan) { - return 0; - } - if (a_nan) { - return 1; - } - if (b_nan) { - return -1; - } - if (a == b) { - const bool a_neg = std::signbit(a); - const bool b_neg = std::signbit(b); - if (a_neg == b_neg) { - return 0; - } - return a_neg ? -1 : 1; // -0.0 < +0.0 - } - return a < b ? -1 : 1; -} - Result> KeyFactory::Create(FieldType field_type) { // todo: support timestamp switch (field_type) { @@ -126,14 +101,14 @@ Result> VariableLengthKeyFactory::MmapChunk( Result FloatKeyFactory::CompareLiteral(const Literal& lhs, const Literal& rhs) const { const auto a = lhs.GetValue(); const auto b = rhs.GetValue(); - return CompareFloatingPoint(a, b); + return FieldsComparator::CompareFloatingPoint(a, b); } /// Java-compatible ordering for doubles Result DoubleKeyFactory::CompareLiteral(const Literal& lhs, const Literal& rhs) const { const auto a = lhs.GetValue(); const auto b = rhs.GetValue(); - return CompareFloatingPoint(a, b); + return FieldsComparator::CompareFloatingPoint(a, b); } } // namespace paimon diff --git a/src/paimon/common/io/cache/cache_manager.cpp b/src/paimon/common/io/cache/cache_manager.cpp index 14342a5d..0e3e3ea4 100644 --- a/src/paimon/common/io/cache/cache_manager.cpp +++ b/src/paimon/common/io/cache/cache_manager.cpp @@ -34,7 +34,7 @@ std::shared_ptr CacheManager::GetPage( return cache->Get(key, supplier)->GetSegment(); } -void CacheManager::InvalidPage(std::shared_ptr& key) { +void CacheManager::InvalidPage(const std::shared_ptr& key) { if (key->IsIndex()) { index_cache_->Invalidate(key); } else { diff --git a/src/paimon/common/io/cache/cache_manager.h b/src/paimon/common/io/cache/cache_manager.h index 5e92f9cb..5a379b30 100644 --- a/src/paimon/common/io/cache/cache_manager.h +++ b/src/paimon/common/io/cache/cache_manager.h @@ -38,7 +38,7 @@ class CacheManager { std::shared_ptr& key, std::function(const std::shared_ptr&)> reader); - void InvalidPage(std::shared_ptr& key); + void InvalidPage(const std::shared_ptr& key); private: std::shared_ptr data_cache_; diff --git a/src/paimon/common/lookup/lookup_store_factory.cpp b/src/paimon/common/lookup/lookup_store_factory.cpp new file mode 100644 index 00000000..0257cd86 --- /dev/null +++ b/src/paimon/common/lookup/lookup_store_factory.cpp @@ -0,0 +1,42 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/lookup/lookup_store_factory.h" + +#include "paimon/common/lookup/sort/sort_lookup_store_factory.h" +namespace paimon { +Result> LookupStoreFactory::Create( + MemorySlice::SliceComparator comparator, const CoreOptions& options) { + const auto& compress_options = options.GetLookupCompressOptions(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr compression_factory, + BlockCompressionFactory::Create(compress_options)); + return std::make_shared( + std::move(comparator), options.GetCachePageSize(), compression_factory); +} + +Result> LookupStoreFactory::BfGenerator(int64_t row_count, + const CoreOptions& options, + MemoryPool* pool) { + if (row_count <= 0 || !options.LookupCacheBloomFilterEnabled()) { + return std::shared_ptr(); + } + auto bloom_filter = BloomFilter::Create(row_count, options.GetLookupCacheBloomFilterFpp()); + auto bytes_for_bf = MemorySegment::AllocateHeapMemory(bloom_filter->ByteLength(), pool); + auto memory_segment = std::make_shared(bytes_for_bf); + PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(memory_segment)); + return bloom_filter; +} +} // namespace paimon diff --git a/src/paimon/common/lookup/lookup_store_factory.h b/src/paimon/common/lookup/lookup_store_factory.h new file mode 100644 index 00000000..2acf15be --- /dev/null +++ b/src/paimon/common/lookup/lookup_store_factory.h @@ -0,0 +1,68 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "paimon/common/memory/memory_slice.h" +#include "paimon/common/utils/bloom_filter.h" +#include "paimon/core/core_options.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +namespace paimon { +/// Reader, lookup value by key bytes. +class LookupStoreReader { + public: + virtual ~LookupStoreReader() = default; + /// Lookup value by key. + virtual Result> Lookup(const std::shared_ptr& key) const = 0; + virtual Status Close() = 0; +}; + +/// Writer to prepare binary file. +class LookupStoreWriter { + public: + virtual ~LookupStoreWriter() = default; + virtual Status Put(std::shared_ptr&& key, std::shared_ptr&& value) = 0; + virtual Status Close() = 0; +}; + +/// A key-value store for lookup, key-value store should be single binary file written once and +/// ready to be used. This factory provide two interfaces: +/// +/// Writer: written once to prepare binary file. +/// Reader: lookup value by key bytes. +class LookupStoreFactory { + public: + virtual ~LookupStoreFactory() = default; + + virtual Result> CreateWriter( + const std::shared_ptr& fs, const std::string& file_path, + const std::shared_ptr& bloom_filter, + const std::shared_ptr& pool) const = 0; + + virtual Result> CreateReader( + const std::shared_ptr& fs, const std::string& file_path, + const std::shared_ptr& pool) const = 0; + + static Result> Create( + MemorySlice::SliceComparator comparator, const CoreOptions& options); + + static Result> BfGenerator(int64_t row_count, + const CoreOptions& options, + MemoryPool* pool); +}; +} // namespace paimon diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp b/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp new file mode 100644 index 00000000..19e26851 --- /dev/null +++ b/src/paimon/common/lookup/sort/sort_lookup_store_factory.cpp @@ -0,0 +1,55 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/lookup/sort/sort_lookup_store_factory.h" +namespace paimon { +Result> SortLookupStoreFactory::CreateWriter( + const std::shared_ptr& fs, const std::string& file_path, + const std::shared_ptr& bloom_filter, + const std::shared_ptr& pool) const { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr out, + fs->Create(file_path, /*overwrite=*/false)); + return std::make_unique( + out, std::make_shared(out, pool, bloom_filter, block_size_, + compression_factory_)); +} + +Result> SortLookupStoreFactory::CreateReader( + const std::shared_ptr& fs, const std::string& file_path, + const std::shared_ptr& pool) const { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr in, fs->Open(file_path)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, + SstFileReader::Create(pool, in, comparator_)); + return std::make_unique(in, reader); +} + +Status SortLookupStoreReader::Close() { + PAIMON_RETURN_NOT_OK(reader_->Close()); + return in_->Close(); +} + +Status SortLookupStoreWriter::Close() { + PAIMON_RETURN_NOT_OK(writer_->Flush()); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr bloom_filter_handle, + writer_->WriteBloomFilter()); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr index_block_handle, + writer_->WriteIndexBlock()); + PAIMON_RETURN_NOT_OK(writer_->WriteFooter(index_block_handle, bloom_filter_handle)); + PAIMON_RETURN_NOT_OK(out_->Close()); + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_factory.h b/src/paimon/common/lookup/sort/sort_lookup_store_factory.h new file mode 100644 index 00000000..6284879d --- /dev/null +++ b/src/paimon/common/lookup/sort/sort_lookup_store_factory.h @@ -0,0 +1,89 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/common/lookup/lookup_store_factory.h" +#include "paimon/common/memory/memory_slice.h" +#include "paimon/common/sst/sst_file_reader.h" +#include "paimon/common/sst/sst_file_writer.h" +#include "paimon/common/utils/bloom_filter.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +namespace paimon { +/// Reader, lookup value by key bytes. +class SortLookupStoreReader : public LookupStoreReader { + public: + SortLookupStoreReader(const std::shared_ptr& in, + const std::shared_ptr& reader) + : in_(in), reader_(reader) {} + + Result> Lookup(const std::shared_ptr& key) const override { + return reader_->Lookup(key); + } + + Status Close() override; + + private: + std::shared_ptr in_; + std::shared_ptr reader_; +}; + +/// Writer to prepare binary file. +class SortLookupStoreWriter : public LookupStoreWriter { + public: + SortLookupStoreWriter(const std::shared_ptr& out, + const std::shared_ptr& writer) + : out_(out), writer_(writer) {} + + Status Put(std::shared_ptr&& key, std::shared_ptr&& value) override { + return writer_->Write(std::move(key), std::move(value)); + } + + Status Close() override; + + private: + std::shared_ptr out_; + std::shared_ptr writer_; +}; + +/// A `LookupStoreFactory` which uses hash to lookup records on disk. +class SortLookupStoreFactory : public LookupStoreFactory { + public: + SortLookupStoreFactory(MemorySlice::SliceComparator comparator, int32_t block_size, + const std::shared_ptr& compression_factory) + : block_size_(block_size), + comparator_(std::move(comparator)), + compression_factory_(compression_factory) {} + + Result> CreateWriter( + const std::shared_ptr& fs, const std::string& file_path, + const std::shared_ptr& bloom_filter, + const std::shared_ptr& pool) const; + + Result> CreateReader( + const std::shared_ptr& fs, const std::string& file_path, + const std::shared_ptr& pool) const; + + private: + int32_t block_size_; + // TODO(xinyu.lxy): support CacheManager + MemorySlice::SliceComparator comparator_; + std::shared_ptr compression_factory_; +}; +} // namespace paimon diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp new file mode 100644 index 00000000..82387ff5 --- /dev/null +++ b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp @@ -0,0 +1,118 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "arrow/api.h" +#include "arrow/ipc/api.h" +#include "gtest/gtest.h" +#include "paimon/common/data/columnar/columnar_row.h" +#include "paimon/common/data/serializer/row_compacted_serializer.h" +#include "paimon/common/lookup/lookup_store_factory.h" +#include "paimon/common/lookup/sort/sort_lookup_store_factory.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::test { +TEST(SortLookupStoreTest, TestSimple) { + auto pool = GetDefaultPool(); + auto dir = UniqueTestDirectory::Create("local"); + std::string file_path = dir->Str() + "/test.lookup"; + auto fs = dir->GetFileSystem(); + + arrow::FieldVector fields = { + arrow::field("key", arrow::utf8()), + arrow::field("value", arrow::int32()), + }; + auto key_type = arrow::struct_({fields[0]}); + auto value_type = arrow::struct_({fields[1]}); + ASSERT_OK_AND_ASSIGN(auto comparator, RowCompactedSerializer::CreateSliceComparator( + arrow::schema(key_type->fields()), pool)); + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + + ASSERT_OK_AND_ASSIGN(auto factory, LookupStoreFactory::Create(comparator, options)); + int64_t row_count = 6; + ASSERT_OK_AND_ASSIGN(auto bloom_filter, + LookupStoreFactory::BfGenerator(row_count, options, pool.get())); + ASSERT_OK_AND_ASSIGN(auto writer, factory->CreateWriter(fs, file_path, bloom_filter, pool)); + + std::shared_ptr key_array = arrow::ipc::internal::json::ArrayFromJSON(key_type, + R"([ + ["Alex"], + ["Alice"], + ["Bob"], + ["David"], + ["Lily"], + ["Lucy"] + ])") + .ValueOrDie(); + std::shared_ptr value_array = + arrow::ipc::internal::json::ArrayFromJSON(value_type, + R"([ + [0], + [10], + [20], + [30], + [40], + [100] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(auto key_serializer, + RowCompactedSerializer::Create(arrow::schema(key_type->fields()), pool)); + ASSERT_OK_AND_ASSIGN(auto value_serializer, + RowCompactedSerializer::Create(arrow::schema(value_type->fields()), pool)); + // write data + std::vector, std::shared_ptr>> key_value_bytes_vec; + for (int64_t i = 0; i < row_count; i++) { + auto key_struct_array = std::dynamic_pointer_cast(key_array); + auto value_struct_array = std::dynamic_pointer_cast(value_array); + ASSERT_TRUE(key_struct_array); + ASSERT_TRUE(value_struct_array); + auto key_row = std::make_shared( + /*struct_array=*/nullptr, key_struct_array->fields(), pool, /*row_id=*/i); + auto value_row = std::make_shared( + /*struct_array=*/nullptr, value_struct_array->fields(), pool, /*row_id=*/i); + ASSERT_OK_AND_ASSIGN(auto key_bytes, key_serializer->SerializeToBytes(*key_row)); + ASSERT_OK_AND_ASSIGN(auto value_bytes, value_serializer->SerializeToBytes(*value_row)); + key_value_bytes_vec.push_back({key_bytes, value_bytes}); + writer->Put(std::move(key_bytes), std::move(value_bytes)); + } + ASSERT_OK(writer->Close()); + + // read data + ASSERT_TRUE(fs->Exists(file_path).value()); + ASSERT_OK_AND_ASSIGN(auto reader, factory->CreateReader(fs, file_path, pool)); + for (int64_t i = 0; i < row_count; i++) { + const auto& [key, value] = key_value_bytes_vec[i]; + ASSERT_OK_AND_ASSIGN(auto value_bytes, reader->Lookup(key)); + ASSERT_TRUE(value_bytes); + // test value bytes + ASSERT_EQ(*value_bytes, *value); + + // test deserialize data + ASSERT_OK_AND_ASSIGN(auto de_row, value_serializer->Deserialize(value_bytes)); + auto value_struct_array = std::dynamic_pointer_cast(value_array); + ASSERT_TRUE(value_struct_array); + auto typed_value_array = + std::dynamic_pointer_cast(value_struct_array->field(0)); + ASSERT_TRUE(typed_value_array); + ASSERT_EQ(de_row->GetInt(0), typed_value_array->Value(i)); + } + // test non-exist key + auto non_exist_key = std::make_shared("non-exist", pool.get()); + ASSERT_OK_AND_ASSIGN(auto value_bytes, reader->Lookup(non_exist_key)); + ASSERT_FALSE(value_bytes); + ASSERT_OK(reader->Close()); +} +} // namespace paimon::test diff --git a/src/paimon/common/memory/memory_slice.cpp b/src/paimon/common/memory/memory_slice.cpp index 6747dbd1..8e1b7669 100644 --- a/src/paimon/common/memory/memory_slice.cpp +++ b/src/paimon/common/memory/memory_slice.cpp @@ -48,10 +48,14 @@ int32_t MemorySlice::Offset() const { return offset_; } -std::shared_ptr MemorySlice::GetHeapMemory() { +std::shared_ptr MemorySlice::GetHeapMemory() const { return segment_->GetHeapMemory(); } +std::shared_ptr MemorySlice::GetSegment() const { + return segment_; +} + int8_t MemorySlice::ReadByte(int32_t position) { return segment_->GetValue(offset_ + position); } diff --git a/src/paimon/common/memory/memory_slice.h b/src/paimon/common/memory/memory_slice.h index 26a7198b..0dfe5f6c 100644 --- a/src/paimon/common/memory/memory_slice.h +++ b/src/paimon/common/memory/memory_slice.h @@ -24,8 +24,8 @@ #include "paimon/common/memory/memory_segment.h" #include "paimon/memory/bytes.h" +#include "paimon/result.h" #include "paimon/visibility.h" - namespace paimon { class MemoryPool; class MemorySliceInput; @@ -36,6 +36,9 @@ class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this Wrap(const std::shared_ptr& bytes); static std::shared_ptr Wrap(const std::shared_ptr& segment); + using SliceComparator = std::function(const std::shared_ptr&, + const std::shared_ptr&)>; + public: MemorySlice() = default; @@ -44,7 +47,8 @@ class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this GetHeapMemory(); + std::shared_ptr GetHeapMemory() const; + std::shared_ptr GetSegment() const; int8_t ReadByte(int32_t position); int32_t ReadInt(int32_t position); diff --git a/src/paimon/common/sst/block_cache.h b/src/paimon/common/sst/block_cache.h index f2db0e67..0d95fa05 100644 --- a/src/paimon/common/sst/block_cache.h +++ b/src/paimon/common/sst/block_cache.h @@ -53,6 +53,12 @@ class BlockCache { return it->second->GetSegment(); } + void Close() { + for (const auto& [key, _] : blocks_) { + cache_manager_->InvalidPage(key); + } + } + private: Result ReadFrom(int64_t offset, int length) { PAIMON_RETURN_NOT_OK(in_->Seek(offset, SeekOrigin::FS_SEEK_SET)); diff --git a/src/paimon/common/sst/block_entry.h b/src/paimon/common/sst/block_entry.h index a5fdc26a..cc833e68 100644 --- a/src/paimon/common/sst/block_entry.h +++ b/src/paimon/common/sst/block_entry.h @@ -24,13 +24,10 @@ namespace paimon { struct BlockEntry { - public: - BlockEntry(std::shared_ptr& key, std::shared_ptr& value) - : key_(key), value_(value) {} + BlockEntry(const std::shared_ptr& _key, const std::shared_ptr& _value) + : key(_key), value(_value) {} - ~BlockEntry() = default; - - std::shared_ptr key_; - std::shared_ptr value_; + std::shared_ptr key; + std::shared_ptr value; }; } // namespace paimon diff --git a/src/paimon/common/sst/block_iterator.cpp b/src/paimon/common/sst/block_iterator.cpp index 6fb471dd..901e7c3f 100644 --- a/src/paimon/common/sst/block_iterator.cpp +++ b/src/paimon/common/sst/block_iterator.cpp @@ -19,7 +19,7 @@ #include "paimon/common/sst/block_reader.h" namespace paimon { -BlockIterator::BlockIterator(std::shared_ptr& reader) : reader_(reader) { +BlockIterator::BlockIterator(const std::shared_ptr& reader) : reader_(reader) { input_ = reader->BlockInput(); } @@ -38,26 +38,23 @@ Result> BlockIterator::Next() { } std::unique_ptr BlockIterator::ReadEntry() { - int key_length = input_->ReadVarLenInt(); + int32_t key_length = input_->ReadVarLenInt(); auto key = input_->ReadSlice(key_length); - int value_length = input_->ReadVarLenInt(); + int32_t value_length = input_->ReadVarLenInt(); auto value = input_->ReadSlice(value_length); return std::make_unique(key, value); } -bool BlockIterator::SeekTo(std::shared_ptr target_key) { - int left = 0; - int right = reader_->RecordCount() - 1; +Result BlockIterator::SeekTo(const std::shared_ptr& target_key) { + int32_t left = 0; + int32_t right = reader_->RecordCount() - 1; while (left <= right) { - int mid = left + (right - left) / 2; + int32_t mid = left + (right - left) / 2; - auto status = input_->SetPosition(reader_->SeekTo(mid)); - if (!status.ok()) { - return false; - } + PAIMON_RETURN_NOT_OK(input_->SetPosition(reader_->SeekTo(mid))); auto mid_entry = ReadEntry(); - int compare = reader_->Comparator()(mid_entry->key_, target_key); + PAIMON_ASSIGN_OR_RAISE(int32_t compare, reader_->Comparator()(mid_entry->key, target_key)); if (compare == 0) { polled_ = std::move(mid_entry); diff --git a/src/paimon/common/sst/block_iterator.h b/src/paimon/common/sst/block_iterator.h index 284857c5..73a89300 100644 --- a/src/paimon/common/sst/block_iterator.h +++ b/src/paimon/common/sst/block_iterator.h @@ -26,9 +26,7 @@ class BlockReader; class BlockIterator { public: - explicit BlockIterator(std::shared_ptr& reader); - - ~BlockIterator() = default; + explicit BlockIterator(const std::shared_ptr& reader); bool HasNext() const; @@ -36,7 +34,7 @@ class BlockIterator { std::unique_ptr ReadEntry(); - bool SeekTo(std::shared_ptr target_key); + Result SeekTo(const std::shared_ptr& target_key); private: std::shared_ptr input_; diff --git a/src/paimon/common/sst/block_reader.cpp b/src/paimon/common/sst/block_reader.cpp index d4bdb843..9eeefd52 100644 --- a/src/paimon/common/sst/block_reader.cpp +++ b/src/paimon/common/sst/block_reader.cpp @@ -17,26 +17,19 @@ #include "paimon/common/sst/block_reader.h" #include "paimon/common/sst/block_trailer.h" - namespace paimon { -std::shared_ptr BlockReader::Create( - std::shared_ptr block, - std::function&, const std::shared_ptr&)> - comparator) { - auto ret = From(block->ReadByte(block->Length() - 1)); - if (!ret.ok()) { - return nullptr; - } - BlockAlignedType type = ret.value(); +Result> BlockReader::Create(const std::shared_ptr& block, + MemorySlice::SliceComparator comparator) { + PAIMON_ASSIGN_OR_RAISE(BlockAlignedType type, From(block->ReadByte(block->Length() - 1))); const auto trailer_len = BlockTrailer::ENCODED_LENGTH; - int size = block->ReadInt(block->Length() - trailer_len); + int32_t size = block->ReadInt(block->Length() - trailer_len); if (type == BlockAlignedType::ALIGNED) { auto data = block->Slice(0, block->Length() - trailer_len); return std::make_shared(data, size, std::move(comparator)); } else { - int index_length = size * 4; - int index_offset = block->Length() - trailer_len - index_length; + int32_t index_length = size * 4; + int32_t index_offset = block->Length() - trailer_len - index_length; auto data = block->Slice(0, index_offset); auto index = block->Slice(index_offset, index_length); return std::make_shared(data, index, std::move(comparator)); @@ -56,8 +49,7 @@ int32_t BlockReader::RecordCount() const { return record_count_; } -std::function&, const std::shared_ptr&)> -BlockReader::Comparator() { +MemorySlice::SliceComparator BlockReader::Comparator() const { return comparator_; } diff --git a/src/paimon/common/sst/block_reader.h b/src/paimon/common/sst/block_reader.h index 1998fad7..ec417105 100644 --- a/src/paimon/common/sst/block_reader.h +++ b/src/paimon/common/sst/block_reader.h @@ -26,51 +26,40 @@ #include "paimon/memory/bytes.h" #include "paimon/reader/batch_reader.h" #include "paimon/result.h" - namespace paimon { class BlockIterator; /// Reader for a block. class BlockReader : public std::enable_shared_from_this { public: - static std::shared_ptr Create( - std::shared_ptr block, - std::function&, - const std::shared_ptr&)> - comparator); - virtual ~BlockReader() = default; - std::unique_ptr Iterator(); - virtual int32_t SeekTo(int32_t record_position) = 0; + static Result> Create(const std::shared_ptr& block, + MemorySlice::SliceComparator comparator); - std::shared_ptr BlockInput(); + virtual int32_t SeekTo(int32_t record_position) = 0; int32_t RecordCount() const; + MemorySlice::SliceComparator Comparator() const; - std::function&, const std::shared_ptr&)> - Comparator(); + std::unique_ptr Iterator(); + std::shared_ptr BlockInput(); protected: - BlockReader(std::shared_ptr& block, int32_t record_count, - std::function&, - const std::shared_ptr&)> - comparator) + BlockReader(const std::shared_ptr& block, int32_t record_count, + MemorySlice::SliceComparator comparator) : block_(block), comparator_(std::move(comparator)), record_count_(record_count) {} private: std::shared_ptr block_; - std::function&, const std::shared_ptr&)> - comparator_; + MemorySlice::SliceComparator comparator_; int32_t record_count_; }; class AlignedBlockReader : public BlockReader { public: - AlignedBlockReader(std::shared_ptr& block, int32_t record_size, - std::function&, - const std::shared_ptr&)> - comparator) + AlignedBlockReader(const std::shared_ptr& block, int32_t record_size, + MemorySlice::SliceComparator comparator) : BlockReader(block, block->Length() / record_size, std::move(comparator)), record_size_(record_size) {} @@ -84,10 +73,9 @@ class AlignedBlockReader : public BlockReader { class UnAlignedBlockReader : public BlockReader { public: - UnAlignedBlockReader(std::shared_ptr& data, std::shared_ptr& index, - std::function&, - const std::shared_ptr&)> - comparator) + UnAlignedBlockReader(const std::shared_ptr& data, + std::shared_ptr& index, + MemorySlice::SliceComparator comparator) : BlockReader(data, index->Length() / 4, std::move(comparator)), index_(index) {} int32_t SeekTo(int32_t record_position) override { diff --git a/src/paimon/common/sst/sst_file_io_test.cpp b/src/paimon/common/sst/sst_file_io_test.cpp index a782ec26..edad7c93 100644 --- a/src/paimon/common/sst/sst_file_io_test.cpp +++ b/src/paimon/common/sst/sst_file_io_test.cpp @@ -36,7 +36,6 @@ #include "paimon/testing/mock/mock_file_batch_reader.h" #include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/testharness.h" - namespace paimon { class Predicate; } // namespace paimon @@ -54,7 +53,7 @@ class SstFileIOTest : public ::testing::TestWithParam { fs_ = dir_->GetFileSystem(); pool_ = GetDefaultPool(); comparator_ = [](const std::shared_ptr& a, - const std::shared_ptr& b) -> int32_t { + const std::shared_ptr& b) -> Result { std::string_view va = a->ReadStringView(); std::string_view vb = b->ReadStringView(); if (va == vb) { @@ -73,8 +72,7 @@ class SstFileIOTest : public ::testing::TestWithParam { std::shared_ptr fs_; std::shared_ptr pool_; - std::function&, const std::shared_ptr&)> - comparator_; + MemorySlice::SliceComparator comparator_; }; TEST_P(SstFileIOTest, TestSimple) { @@ -117,12 +115,8 @@ TEST_P(SstFileIOTest, TestSimple) { ASSERT_EQ(6, writer->IndexWriter()->Size()); - auto bloom_filter_handle_ret = writer->WriteBloomFilter(); - ASSERT_OK(bloom_filter_handle_ret); - auto index_block_handle_ret = writer->WriteIndexBlock(); - ASSERT_OK(index_block_handle_ret); - auto index_block_handle = index_block_handle_ret.value(); - auto bloom_filter_handle = bloom_filter_handle_ret.value(); + ASSERT_OK_AND_ASSIGN(auto bloom_filter_handle, writer->WriteBloomFilter()); + ASSERT_OK_AND_ASSIGN(auto index_block_handle, writer->WriteIndexBlock()); ASSERT_OK(writer->WriteFooter(index_block_handle, bloom_filter_handle)); ASSERT_OK(out->Flush()); @@ -145,27 +139,27 @@ TEST_P(SstFileIOTest, TestSimple) { } // test read - auto reader_ret = SstFileReader::Create(pool_, fs_, index_path, comparator_); - ASSERT_OK(reader_ret); - auto reader = reader_ret.value(); + ASSERT_OK_AND_ASSIGN(in, fs_->Open(index_path)); + ASSERT_OK_AND_ASSIGN(auto reader, SstFileReader::Create(pool_, in, comparator_)); + // not exist key std::string k0 = "k0"; - ASSERT_EQ(nullptr, reader->Lookup(std::make_shared(k0, pool_.get()))); + ASSERT_FALSE(reader->Lookup(std::make_shared(k0, pool_.get())).value()); // k4 std::string k4 = "k4"; - auto v4 = reader->Lookup(std::make_shared(k4, pool_.get())); + ASSERT_OK_AND_ASSIGN(auto v4, reader->Lookup(std::make_shared(k4, pool_.get()))); ASSERT_TRUE(v4); std::string string4{v4->data(), v4->size()}; ASSERT_EQ("4", string4); // not exist key std::string k55 = "k55"; - ASSERT_EQ(nullptr, reader->Lookup(std::make_shared(k55, pool_.get()))); + ASSERT_FALSE(reader->Lookup(std::make_shared(k55, pool_.get())).value()); // k915 std::string k915 = "k915"; - auto v15 = reader->Lookup(std::make_shared(k915, pool_.get())); + ASSERT_OK_AND_ASSIGN(auto v15, reader->Lookup(std::make_shared(k915, pool_.get()))); ASSERT_TRUE(v15); std::string string15{v15->data(), v15->size()}; ASSERT_EQ("looooooooooong-值-15", string15); @@ -181,32 +175,33 @@ TEST_P(SstFileIOTest, TestJavaCompatibility) { std::make_shared(file, in, pool_, std::make_unique()); // test read - auto reader_ret = SstFileReader::Create(pool_, fs_, file, comparator_); - ASSERT_OK(reader_ret); - auto reader = reader_ret.value(); + ASSERT_OK_AND_ASSIGN(auto reader, SstFileReader::Create(pool_, in, comparator_)); // not exist key std::string k0 = "10000"; - ASSERT_EQ(nullptr, reader->Lookup(std::make_shared(k0, pool_.get()))); + ASSERT_FALSE(reader->Lookup(std::make_shared(k0, pool_.get())).value()); // k1314520 std::string k1314520 = "1314520"; - auto v1314520 = reader->Lookup(std::make_shared(k1314520, pool_.get())); + ASSERT_OK_AND_ASSIGN(auto v1314520, + reader->Lookup(std::make_shared(k1314520, pool_.get()))); ASSERT_TRUE(v1314520); std::string string1314520{v1314520->data(), v1314520->size()}; ASSERT_EQ("1314520", string1314520); // not exist key std::string k13145200 = "13145200"; - ASSERT_EQ(nullptr, reader->Lookup(std::make_shared(k13145200, pool_.get()))); + ASSERT_FALSE(reader->Lookup(std::make_shared(k13145200, pool_.get())).value()); std::string k1314521 = "1314521"; - auto v1314521 = reader->Lookup(std::make_shared(k1314521, pool_.get())); + ASSERT_OK_AND_ASSIGN(auto v1314521, + reader->Lookup(std::make_shared(k1314521, pool_.get()))); ASSERT_TRUE(v1314521); std::string string1314521{v1314521->data(), v1314521->size()}; ASSERT_EQ("1314521", string1314521); std::string k1999999 = "1999999"; - auto v1999999 = reader->Lookup(std::make_shared(k1999999, pool_.get())); + ASSERT_OK_AND_ASSIGN(auto v1999999, + reader->Lookup(std::make_shared(k1999999, pool_.get()))); ASSERT_TRUE(v1999999); std::string string1999999{v1999999->data(), v1999999->size()}; ASSERT_EQ("1999999", string1999999); diff --git a/src/paimon/common/sst/sst_file_reader.cpp b/src/paimon/common/sst/sst_file_reader.cpp index f337d790..8436da40 100644 --- a/src/paimon/common/sst/sst_file_reader.cpp +++ b/src/paimon/common/sst/sst_file_reader.cpp @@ -22,12 +22,10 @@ namespace paimon { Result> SstFileReader::Create( - const std::shared_ptr& pool, const std::shared_ptr& fs, - const std::string& file_path, - std::function&, const std::shared_ptr&)> - comparator) { - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr in, fs->Open(file_path)); + const std::shared_ptr& pool, const std::shared_ptr& in, + MemorySlice::SliceComparator comparator) { PAIMON_ASSIGN_OR_RAISE(uint64_t file_len, in->Length()); + PAIMON_ASSIGN_OR_RAISE(std::string file_path, in->GetUri()); auto block_cache = std::make_shared(file_path, in, pool, std::make_unique()); @@ -39,12 +37,12 @@ Result> SstFileReader::Create( } auto slice = MemorySlice::Wrap(segment); auto input = slice->ToInput(); - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr footer, + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr footer, BlockFooter::ReadBlockFooter(input)); // read bloom filter directly now auto bloom_filter_handle = footer->GetBloomFilterHandle(); - std::shared_ptr bloom_filter = nullptr; + std::shared_ptr bloom_filter = nullptr; if (bloom_filter_handle->ExpectedEntries() || bloom_filter_handle->Size() || bloom_filter_handle->Offset()) { bloom_filter = std::make_shared(bloom_filter_handle->ExpectedEntries(), @@ -62,19 +60,19 @@ Result> SstFileReader::Create( auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input); auto block_data = block_cache->GetBlock(index_block_handle->Offset(), index_block_handle->Size(), true); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr uncompressed_data, + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr uncompressed_data, DecompressBlock(block_data, trailer, pool)); - auto reader = BlockReader::Create(MemorySlice::Wrap(uncompressed_data), comparator); - - return std::make_shared(pool, block_cache, bloom_filter, reader, comparator); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, + BlockReader::Create(MemorySlice::Wrap(uncompressed_data), comparator)); + return std::shared_ptr( + new SstFileReader(pool, block_cache, bloom_filter, reader, comparator)); } -SstFileReader::SstFileReader( - const std::shared_ptr& pool, const std::shared_ptr& block_cache, - const std::shared_ptr& bloom_filter, - const std::shared_ptr& index_block_reader, - std::function&, const std::shared_ptr&)> - comparator) +SstFileReader::SstFileReader(const std::shared_ptr& pool, + const std::shared_ptr& block_cache, + const std::shared_ptr& bloom_filter, + const std::shared_ptr& index_block_reader, + MemorySlice::SliceComparator comparator) : pool_(pool), block_cache_(block_cache), bloom_filter_(bloom_filter), @@ -85,37 +83,32 @@ std::unique_ptr SstFileReader::CreateIterator() { return std::make_unique(this, index_block_reader_->Iterator()); } -std::shared_ptr SstFileReader::Lookup(std::shared_ptr key) { +Result> SstFileReader::Lookup(const std::shared_ptr& key) { if (bloom_filter_.get() && !bloom_filter_->TestHash(MurmurHashUtils::HashBytes(key))) { - return nullptr; + return std::shared_ptr(); } auto key_slice = MemorySlice::Wrap(key); // seek the index to the block containing the key auto index_block_iterator = index_block_reader_->Iterator(); - index_block_iterator->SeekTo(key_slice); + PAIMON_ASSIGN_OR_RAISE(bool _, index_block_iterator->SeekTo(key_slice)); // if indexIterator does not have a next, it means the key does not exist in this iterator if (index_block_iterator->HasNext()) { // seek the current iterator to the key - auto current = GetNextBlock(index_block_iterator); - if (!current.ok() || !current.value()) { - return nullptr; - } - auto& it = current.value(); - if (it->SeekTo(key_slice)) { - auto ret = it->Next(); - if (!ret.ok()) { - return nullptr; - } - return ret.value()->value_->CopyBytes(pool_.get()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr current, + GetNextBlock(index_block_iterator)); + PAIMON_ASSIGN_OR_RAISE(bool success, current->SeekTo(key_slice)); + if (success) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr ret, current->Next()); + return ret->value->CopyBytes(pool_.get()); } } - return nullptr; + return std::shared_ptr(); } Result> SstFileReader::GetNextBlock( std::unique_ptr& index_iterator) { PAIMON_ASSIGN_OR_RAISE(auto ret, index_iterator->Next()); - auto& slice = ret->value_; + auto& slice = ret->value; auto input = slice->ToInput(); PAIMON_ASSIGN_OR_RAISE(auto reader, ReadBlock(BlockHandle::ReadBlockHandle(input), false)); return reader->Iterator(); @@ -134,13 +127,13 @@ Result> SstFileReader::ReadBlock( auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput(); auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input); auto block_data = block_cache_->GetBlock(handle->Offset(), handle->Size(), index); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr uncompressed_data, + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr uncompressed_data, DecompressBlock(block_data, trailer, pool_)); return BlockReader::Create(MemorySlice::Wrap(uncompressed_data), comparator_); } -Result> SstFileReader::DecompressBlock( - const std::shared_ptr& compressed_data, +Result> SstFileReader::DecompressBlock( + const std::shared_ptr& compressed_data, const std::unique_ptr& trailer, const std::shared_ptr& pool) { auto input_memory = compressed_data->GetHeapMemory(); @@ -156,8 +149,10 @@ Result> SstFileReader::DecompressBlock( } // decompress data - PAIMON_ASSIGN_OR_RAISE(auto factory, BlockCompressionFactory::Create( - SstFileUtils::From(trailer->CompressionType()))); + PAIMON_ASSIGN_OR_RAISE(BlockCompressionType compress_type, + SstFileUtils::From(trailer->CompressionType())); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr factory, + BlockCompressionFactory::Create(compress_type)); if (!factory || factory->GetCompressionType() == BlockCompressionType::NONE) { return compressed_data; } else { @@ -176,20 +171,27 @@ Result> SstFileReader::DecompressBlock( } } +Status SstFileReader::Close() { + // TODO(xinyu.lxy): support close FileBasedBloomFilter + block_cache_->Close(); + return Status::OK(); +} + SstFileIterator::SstFileIterator(SstFileReader* reader, std::unique_ptr index_iterator) : reader_(reader), index_iterator_(std::move(index_iterator)) {} -Status SstFileIterator::SeekTo(std::shared_ptr& key) { +Status SstFileIterator::SeekTo(const std::shared_ptr& key) { auto key_slice = MemorySlice::Wrap(key); - index_iterator_->SeekTo(key_slice); + PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool index_success, index_iterator_->SeekTo(key_slice)); if (index_iterator_->HasNext()) { PAIMON_ASSIGN_OR_RAISE(data_iterator_, reader_->GetNextBlock(index_iterator_)); // The index block entry key is the last key of the corresponding data block. // If there is some index entry key >= target key, the related data block must // also contain some key >= target key, which means seeked_data_block.HasNext() // must be true - data_iterator_->SeekTo(key_slice); + PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool data_success, + data_iterator_->SeekTo(key_slice)); } else { data_iterator_.reset(); } diff --git a/src/paimon/common/sst/sst_file_reader.h b/src/paimon/common/sst/sst_file_reader.h index 616a85ef..f1fd36fd 100644 --- a/src/paimon/common/sst/sst_file_reader.h +++ b/src/paimon/common/sst/sst_file_reader.h @@ -40,21 +40,9 @@ class SstFileIterator; /// queries. Note that this class is NOT thread-safe. class SstFileReader { public: - static Result> Create( - const std::shared_ptr& pool, const std::shared_ptr& fs, - const std::string& file_path, - std::function&, - const std::shared_ptr&)> - comparator); - - SstFileReader(const std::shared_ptr& pool, - const std::shared_ptr& block_cache, - const std::shared_ptr& bloom_filter, - const std::shared_ptr& index_block_reader, - std::function&, - const std::shared_ptr&)> - comparator); - ~SstFileReader() = default; + static Result> Create(const std::shared_ptr& pool, + const std::shared_ptr& input, + MemorySlice::SliceComparator comparator); std::unique_ptr CreateIterator(); @@ -64,7 +52,7 @@ class SstFileReader { * @param key serialized key * @return corresponding serialized value, nullptr if not found. */ - std::shared_ptr Lookup(std::shared_ptr key); + Result> Lookup(const std::shared_ptr& key); Result> GetNextBlock( std::unique_ptr& index_iterator); @@ -85,30 +73,36 @@ class SstFileReader { Result> ReadBlock(const std::shared_ptr& handle, bool index); + Status Close(); + private: static Result> DecompressBlock( const std::shared_ptr& compressed_data, const std::unique_ptr& trailer, const std::shared_ptr& pool); + SstFileReader(const std::shared_ptr& pool, + const std::shared_ptr& block_cache, + const std::shared_ptr& bloom_filter, + const std::shared_ptr& index_block_reader, + MemorySlice::SliceComparator comparator); + private: std::shared_ptr pool_; std::shared_ptr block_cache_; std::shared_ptr bloom_filter_; std::shared_ptr index_block_reader_; - std::function&, const std::shared_ptr&)> - comparator_; + MemorySlice::SliceComparator comparator_; }; class SstFileIterator { public: - SstFileIterator() = default; SstFileIterator(SstFileReader* reader, std::unique_ptr index_iterator); /** * Seek to the position of the record whose key is exactly equal to or greater than the * specified key. */ - Status SeekTo(std::shared_ptr& key); + Status SeekTo(const std::shared_ptr& key); private: SstFileReader* reader_; diff --git a/src/paimon/common/sst/sst_file_utils.h b/src/paimon/common/sst/sst_file_utils.h index 0aaec6f3..e84651db 100644 --- a/src/paimon/common/sst/sst_file_utils.h +++ b/src/paimon/common/sst/sst_file_utils.h @@ -17,6 +17,7 @@ #pragma once #include +#include "fmt/format.h" #include "paimon/common/compression/block_compression_type.h" #include "paimon/common/memory/memory_slice.h" @@ -25,13 +26,16 @@ namespace paimon { /// Utils for sst file. class SstFileUtils { public: - static BlockCompressionType From(int8_t v) { - if (v == 1) { + static Result From(int8_t v) { + if (v == 0) { + return BlockCompressionType::NONE; + } else if (v == 1) { return BlockCompressionType::ZSTD; } else if (v == 2) { return BlockCompressionType::LZ4; } - return BlockCompressionType::NONE; + return Status::Invalid( + fmt::format("not support compression type code {}", static_cast(v))); } static std::string ToHexString(int32_t crc32c) { diff --git a/src/paimon/common/utils/bloom_filter.cpp b/src/paimon/common/utils/bloom_filter.cpp index f46f1b8d..9edf702f 100644 --- a/src/paimon/common/utils/bloom_filter.cpp +++ b/src/paimon/common/utils/bloom_filter.cpp @@ -39,11 +39,12 @@ int32_t BloomFilter::OptimalNumOfHashFunctions(int64_t expect_entries, int64_t b } double ratio = static_cast(bit_size) / static_cast(expect_entries); double result = ratio * std::log(2.0); - return std::max(1, static_cast(std::round(result))); + return std::max(1, static_cast(std::round(result))); } std::shared_ptr BloomFilter::Create(int64_t expect_entries, double fpp) { - int bytes = static_cast(ceil(BloomFilter::OptimalNumOfBits(expect_entries, fpp) / 8.0)); + int32_t bytes = + static_cast(ceil(BloomFilter::OptimalNumOfBits(expect_entries, fpp) / 8.0)); return std::make_shared(expect_entries, bytes); } @@ -55,7 +56,7 @@ BloomFilter::BloomFilter(int64_t expected_entries, int32_t byte_length) } Status BloomFilter::AddHash(int32_t hash1) { - int hash2 = hash1 >> 16; + int32_t hash2 = hash1 >> 16; for (int32_t i = 1; i <= num_hash_functions_; i++) { int32_t combined_hash = hash1 + (i * hash2); @@ -72,7 +73,7 @@ Status BloomFilter::AddHash(int32_t hash1) { bool BloomFilter::TestHash(int32_t hash1) const { int32_t hash2 = hash1 >> 16; - for (int i = 1; i <= num_hash_functions_; i++) { + for (int32_t i = 1; i <= num_hash_functions_; i++) { int32_t combined_hash = hash1 + (i * hash2); // hashcode should be positive, flip all the bits if it's negative if (combined_hash < 0) { diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 0dd5c93b..150cce7f 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -330,6 +330,10 @@ struct CoreOptions::Impl { int32_t compact_off_peak_start_hour = -1; int32_t compact_off_peak_end_hour = -1; int32_t compact_off_peak_ratio = 0; + bool lookup_cache_bloom_filter = true; + double lookup_cache_bloom_filter_fpp = 0.05; + CompressOptions lookup_compress_options{"zstd", 1}; + int64_t cache_page_size = 64 * 1024; // 64KB }; // Parse configurations from a map and return a populated CoreOptions object @@ -561,6 +565,29 @@ Result CoreOptions::FromMap( PAIMON_RETURN_NOT_OK( parser.Parse(Options::COMPACTION_OFFPEAK_RATIO, &impl->compact_off_peak_ratio)); + // Parse lookup.cache.bloom.filter.enabled + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_CACHE_BLOOM_FILTER_ENABLED, + &impl->lookup_cache_bloom_filter)); + + // Parse lookup.cache.bloom.filter.fpp + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_CACHE_BLOOM_FILTER_FPP, + &impl->lookup_cache_bloom_filter_fpp)); + + // Parse lookup.cache-spill-compression + std::string lookup_compress_options_compression_str; + PAIMON_RETURN_NOT_OK(parser.ParseString(Options::LOOKUP_CACHE_SPILL_COMPRESSION, + &lookup_compress_options_compression_str)); + if (!lookup_compress_options_compression_str.empty()) { + impl->lookup_compress_options.compress = lookup_compress_options_compression_str; + } + + // Parse spill-compression.zstd-level + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SPILL_COMPRESSION_ZSTD_LEVEL, + &(impl->lookup_compress_options.zstd_level))); + + // Parse cache-page-size + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::CACHE_PAGE_SIZE, &impl->cache_page_size)); + return options; } @@ -909,4 +936,19 @@ int32_t CoreOptions::GetCompactOffPeakRatio() const { return impl_->compact_off_peak_ratio; } +bool CoreOptions::LookupCacheBloomFilterEnabled() const { + return impl_->lookup_cache_bloom_filter; +} + +double CoreOptions::GetLookupCacheBloomFilterFpp() const { + return impl_->lookup_cache_bloom_filter_fpp; +} + +const CompressOptions& CoreOptions::GetLookupCompressOptions() const { + return impl_->lookup_compress_options; +} + +int32_t CoreOptions::GetCachePageSize() const { + return static_cast(impl_->cache_page_size); +} } // namespace paimon diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 112c1d7e..7765b53a 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -25,6 +25,7 @@ #include #include "paimon/core/options/changelog_producer.h" +#include "paimon/core/options/compress_options.h" #include "paimon/core/options/external_path_strategy.h" #include "paimon/core/options/merge_engine.h" #include "paimon/core/options/sort_engine.h" @@ -133,6 +134,12 @@ class PAIMON_EXPORT CoreOptions { int32_t GetCompactOffPeakEndHour() const; int32_t GetCompactOffPeakRatio() const; + bool LookupCacheBloomFilterEnabled() const; + double GetLookupCacheBloomFilterFpp() const; + + const CompressOptions& GetLookupCompressOptions() const; + int32_t GetCachePageSize() const; + const std::map& ToMap() const; private: diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 3acc538f..50ced4c0 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -103,6 +103,11 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_EQ(-1, core_options.GetCompactOffPeakStartHour()); ASSERT_EQ(-1, core_options.GetCompactOffPeakEndHour()); ASSERT_EQ(0, core_options.GetCompactOffPeakRatio()); + ASSERT_TRUE(core_options.LookupCacheBloomFilterEnabled()); + ASSERT_EQ(0.05, core_options.GetLookupCacheBloomFilterFpp()); + ASSERT_EQ("zstd", core_options.GetLookupCompressOptions().compress); + ASSERT_EQ(1, core_options.GetLookupCompressOptions().zstd_level); + ASSERT_EQ(64 * 1024, core_options.GetCachePageSize()); } TEST(CoreOptionsTest, TestFromMap) { @@ -171,7 +176,12 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD, "12 kB"}, {Options::COMPACT_OFFPEAK_START_HOUR, "3"}, {Options::COMPACT_OFFPEAK_END_HOUR, "16"}, - {Options::COMPACTION_OFFPEAK_RATIO, "8"}}; + {Options::COMPACTION_OFFPEAK_RATIO, "8"}, + {Options::LOOKUP_CACHE_BLOOM_FILTER_ENABLED, "false"}, + {Options::LOOKUP_CACHE_BLOOM_FILTER_FPP, "0.5"}, + {Options::LOOKUP_CACHE_SPILL_COMPRESSION, "lz4"}, + {Options::SPILL_COMPRESSION_ZSTD_LEVEL, "2"}, + {Options::CACHE_PAGE_SIZE, "6MB"}}; ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); auto fs = core_options.GetFileSystem(); @@ -256,6 +266,11 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ(3, core_options.GetCompactOffPeakStartHour()); ASSERT_EQ(16, core_options.GetCompactOffPeakEndHour()); ASSERT_EQ(8, core_options.GetCompactOffPeakRatio()); + ASSERT_FALSE(core_options.LookupCacheBloomFilterEnabled()); + ASSERT_EQ(0.5, core_options.GetLookupCacheBloomFilterFpp()); + ASSERT_EQ("lz4", core_options.GetLookupCompressOptions().compress); + ASSERT_EQ(2, core_options.GetLookupCompressOptions().zstd_level); + ASSERT_EQ(6 * 1024 * 1024, core_options.GetCachePageSize()); } TEST(CoreOptionsTest, TestInvalidCase) { diff --git a/src/paimon/core/options/compress_options.h b/src/paimon/core/options/compress_options.h new file mode 100644 index 00000000..428d950d --- /dev/null +++ b/src/paimon/core/options/compress_options.h @@ -0,0 +1,27 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include + +namespace paimon { +/// Options of compression. +struct CompressOptions { + std::string compress; + int32_t zstd_level; +}; +} // namespace paimon diff --git a/src/paimon/core/utils/fields_comparator.cpp b/src/paimon/core/utils/fields_comparator.cpp index 4b18ec22..9ca32aa0 100644 --- a/src/paimon/core/utils/fields_comparator.cpp +++ b/src/paimon/core/utils/fields_comparator.cpp @@ -126,6 +126,8 @@ Result FieldsComparator::CompareField( return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); }); case arrow::Type::type::FLOAT: + // TODO(xinyu.lxy): currently in java KeyComparatorSupplier: -inf < -0.0 == +0.0 < +inf + // = nan paimon-cpp: -inf < -0.0 == +0.0 < +inf and nan cannot be compared return FieldsComparator::FieldComparatorFunc( [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> int32_t { float lvalue = lhs.GetFloat(field_idx); @@ -211,4 +213,127 @@ Result FieldsComparator::CompareField( } } +Result FieldsComparator::CompareVariant( + int32_t field_idx, const std::shared_ptr& input_type, bool use_view) { + arrow::Type::type type = input_type->id(); + switch (type) { + case arrow::Type::type::BOOL: + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); + }); + case arrow::Type::type::INT8: + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); + }); + case arrow::Type::type::INT16: + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); + }); + case arrow::Type::type::DATE32: + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); + }); + + case arrow::Type::type::INT32: + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); + }); + case arrow::Type::type::INT64: + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); + }); + case arrow::Type::type::FLOAT: + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + return CompareFloatingPoint(lvalue, rvalue); + }); + case arrow::Type::type::DOUBLE: + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + return CompareFloatingPoint(lvalue, rvalue); + }); + case arrow::Type::type::STRING: { + if (use_view) { + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + int32_t cmp = lvalue.compare(rvalue); + return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1); + }); + } else { + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + int32_t cmp = lvalue.CompareTo(rvalue); + return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1); + }); + } + } + case arrow::Type::type::BINARY: { + // TODO(xinyu.lxy): may use 64byte compare + if (use_view) { + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + int32_t cmp = lvalue.compare(rvalue); + return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1); + }); + } else { + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue>(lhs); + auto rvalue = DataDefine::GetVariantValue>(rhs); + int32_t cmp = lvalue->compare(*rvalue); + return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1); + }); + } + } + case arrow::Type::type::TIMESTAMP: { + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); + }); + } + case arrow::Type::type::DECIMAL: { + return FieldsComparator::VariantComparatorFunc( + [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + auto lvalue = DataDefine::GetVariantValue(lhs); + auto rvalue = DataDefine::GetVariantValue(rhs); + int32_t cmp = lvalue.CompareTo(rvalue); + return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1); + }); + } + default: + return Status::NotImplemented(fmt::format("Do not support comparing {} type in idx {}", + input_type->ToString(), field_idx)); + } +} + } // namespace paimon diff --git a/src/paimon/core/utils/fields_comparator.h b/src/paimon/core/utils/fields_comparator.h index bba56593..4691482d 100644 --- a/src/paimon/core/utils/fields_comparator.h +++ b/src/paimon/core/utils/fields_comparator.h @@ -51,9 +51,42 @@ class FieldsComparator { return sort_fields_; } + using VariantComparatorFunc = + std::function; + + static Result CompareVariant( + int32_t field_idx, const std::shared_ptr& input_type, bool use_view); + + /// Java-compatible ordering for floating-point types: + /// -infinity < -0.0 < +0.0 < +infinity < NaN == NaN + template + static int32_t CompareFloatingPoint(T a, T b) { + const bool a_nan = std::isnan(a); + const bool b_nan = std::isnan(b); + if (a_nan && b_nan) { + return 0; + } + if (a_nan) { + return 1; + } + if (b_nan) { + return -1; + } + if (a == b) { + const bool a_neg = std::signbit(a); + const bool b_neg = std::signbit(b); + if (a_neg == b_neg) { + return 0; + } + return a_neg ? -1 : 1; // -0.0 < +0.0 + } + return a < b ? -1 : 1; + } + private: using FieldComparatorFunc = std::function; + FieldsComparator(bool is_ascending_order, const std::vector& sort_fields, std::vector&& comparators) : is_ascending_order_(is_ascending_order), From 2d9d0e865af42d830dd6280fa6d0c845dc736cab Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 10 Mar 2026 16:43:36 +0800 Subject: [PATCH 2/7] fix --- src/paimon/CMakeLists.txt | 2 +- .../lookup/sort/sort_lookup_store_test.cpp | 148 ++++++++++-------- src/paimon/common/sst/sst_file_reader.cpp | 4 +- src/paimon/common/sst/sst_file_writer.cpp | 4 +- src/paimon/common/sst/sst_file_writer.h | 1 + src/paimon/core/utils/fields_comparator.cpp | 33 ++-- src/paimon/core/utils/fields_comparator.h | 1 + 7 files changed, 104 insertions(+), 89 deletions(-) diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 6d3f2198..1707124a 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -387,7 +387,7 @@ if(PAIMON_BUILD_TESTS) common/io/buffered_input_stream_test.cpp common/io/memory_segment_output_stream_test.cpp common/io/offset_input_stream_test.cpp - common/lookup/sort/sort_lookup_store_test.cpp + common/lookup/sort/sort_lookup_store_test.cpp common/logging/logging_test.cpp common/metrics/metrics_impl_test.cpp common/options/memory_size_test.cpp diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp index 82387ff5..b017ebbc 100644 --- a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp +++ b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp @@ -26,28 +26,30 @@ namespace paimon::test { TEST(SortLookupStoreTest, TestSimple) { auto pool = GetDefaultPool(); - auto dir = UniqueTestDirectory::Create("local"); - std::string file_path = dir->Str() + "/test.lookup"; - auto fs = dir->GetFileSystem(); + auto check_result = [&](const std::map& options_map) { + auto dir = UniqueTestDirectory::Create("local"); + std::string file_path = dir->Str() + "/test.lookup"; + auto fs = dir->GetFileSystem(); - arrow::FieldVector fields = { - arrow::field("key", arrow::utf8()), - arrow::field("value", arrow::int32()), - }; - auto key_type = arrow::struct_({fields[0]}); - auto value_type = arrow::struct_({fields[1]}); - ASSERT_OK_AND_ASSIGN(auto comparator, RowCompactedSerializer::CreateSliceComparator( - arrow::schema(key_type->fields()), pool)); - ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + arrow::FieldVector fields = { + arrow::field("key", arrow::utf8()), + arrow::field("value", arrow::int32()), + }; + auto key_type = arrow::struct_({fields[0]}); + auto value_type = arrow::struct_({fields[1]}); + ASSERT_OK_AND_ASSIGN(auto comparator, RowCompactedSerializer::CreateSliceComparator( + arrow::schema(key_type->fields()), pool)); + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap(options_map)); - ASSERT_OK_AND_ASSIGN(auto factory, LookupStoreFactory::Create(comparator, options)); - int64_t row_count = 6; - ASSERT_OK_AND_ASSIGN(auto bloom_filter, - LookupStoreFactory::BfGenerator(row_count, options, pool.get())); - ASSERT_OK_AND_ASSIGN(auto writer, factory->CreateWriter(fs, file_path, bloom_filter, pool)); + ASSERT_OK_AND_ASSIGN(auto factory, LookupStoreFactory::Create(comparator, options)); + int64_t row_count = 6; + ASSERT_OK_AND_ASSIGN(auto bloom_filter, + LookupStoreFactory::BfGenerator(row_count, options, pool.get())); + ASSERT_OK_AND_ASSIGN(auto writer, factory->CreateWriter(fs, file_path, bloom_filter, pool)); - std::shared_ptr key_array = arrow::ipc::internal::json::ArrayFromJSON(key_type, - R"([ + std::shared_ptr key_array = + arrow::ipc::internal::json::ArrayFromJSON(key_type, + R"([ ["Alex"], ["Alice"], ["Bob"], @@ -55,10 +57,10 @@ TEST(SortLookupStoreTest, TestSimple) { ["Lily"], ["Lucy"] ])") - .ValueOrDie(); - std::shared_ptr value_array = - arrow::ipc::internal::json::ArrayFromJSON(value_type, - R"([ + .ValueOrDie(); + std::shared_ptr value_array = + arrow::ipc::internal::json::ArrayFromJSON(value_type, + R"([ [0], [10], [20], @@ -66,53 +68,63 @@ TEST(SortLookupStoreTest, TestSimple) { [40], [100] ])") - .ValueOrDie(); + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(auto key_serializer, RowCompactedSerializer::Create( + arrow::schema(key_type->fields()), pool)); + ASSERT_OK_AND_ASSIGN(auto value_serializer, RowCompactedSerializer::Create( + arrow::schema(value_type->fields()), pool)); + // write data + std::vector, std::shared_ptr>> key_value_bytes_vec; + for (int64_t i = 0; i < row_count; i++) { + auto key_struct_array = std::dynamic_pointer_cast(key_array); + auto value_struct_array = std::dynamic_pointer_cast(value_array); + ASSERT_TRUE(key_struct_array); + ASSERT_TRUE(value_struct_array); + auto key_row = std::make_shared( + /*struct_array=*/nullptr, key_struct_array->fields(), pool, /*row_id=*/i); + auto value_row = std::make_shared( + /*struct_array=*/nullptr, value_struct_array->fields(), pool, /*row_id=*/i); + ASSERT_OK_AND_ASSIGN(auto key_bytes, key_serializer->SerializeToBytes(*key_row)); + ASSERT_OK_AND_ASSIGN(auto value_bytes, value_serializer->SerializeToBytes(*value_row)); + key_value_bytes_vec.push_back({key_bytes, value_bytes}); + writer->Put(std::move(key_bytes), std::move(value_bytes)); + } + ASSERT_OK(writer->Close()); - ASSERT_OK_AND_ASSIGN(auto key_serializer, - RowCompactedSerializer::Create(arrow::schema(key_type->fields()), pool)); - ASSERT_OK_AND_ASSIGN(auto value_serializer, - RowCompactedSerializer::Create(arrow::schema(value_type->fields()), pool)); - // write data - std::vector, std::shared_ptr>> key_value_bytes_vec; - for (int64_t i = 0; i < row_count; i++) { - auto key_struct_array = std::dynamic_pointer_cast(key_array); - auto value_struct_array = std::dynamic_pointer_cast(value_array); - ASSERT_TRUE(key_struct_array); - ASSERT_TRUE(value_struct_array); - auto key_row = std::make_shared( - /*struct_array=*/nullptr, key_struct_array->fields(), pool, /*row_id=*/i); - auto value_row = std::make_shared( - /*struct_array=*/nullptr, value_struct_array->fields(), pool, /*row_id=*/i); - ASSERT_OK_AND_ASSIGN(auto key_bytes, key_serializer->SerializeToBytes(*key_row)); - ASSERT_OK_AND_ASSIGN(auto value_bytes, value_serializer->SerializeToBytes(*value_row)); - key_value_bytes_vec.push_back({key_bytes, value_bytes}); - writer->Put(std::move(key_bytes), std::move(value_bytes)); - } - ASSERT_OK(writer->Close()); + // read data + ASSERT_TRUE(fs->Exists(file_path).value()); + ASSERT_OK_AND_ASSIGN(auto reader, factory->CreateReader(fs, file_path, pool)); + for (int64_t i = 0; i < row_count; i++) { + const auto& [key, value] = key_value_bytes_vec[i]; + ASSERT_OK_AND_ASSIGN(auto value_bytes, reader->Lookup(key)); + ASSERT_TRUE(value_bytes); + // test value bytes + ASSERT_EQ(*value_bytes, *value); - // read data - ASSERT_TRUE(fs->Exists(file_path).value()); - ASSERT_OK_AND_ASSIGN(auto reader, factory->CreateReader(fs, file_path, pool)); - for (int64_t i = 0; i < row_count; i++) { - const auto& [key, value] = key_value_bytes_vec[i]; - ASSERT_OK_AND_ASSIGN(auto value_bytes, reader->Lookup(key)); - ASSERT_TRUE(value_bytes); - // test value bytes - ASSERT_EQ(*value_bytes, *value); + // test deserialize data + ASSERT_OK_AND_ASSIGN(auto de_row, value_serializer->Deserialize(value_bytes)); + auto value_struct_array = std::dynamic_pointer_cast(value_array); + ASSERT_TRUE(value_struct_array); + auto typed_value_array = + std::dynamic_pointer_cast(value_struct_array->field(0)); + ASSERT_TRUE(typed_value_array); + ASSERT_EQ(de_row->GetInt(0), typed_value_array->Value(i)); + } + // test non-exist key + auto non_exist_key = std::make_shared("non-exist", pool.get()); + ASSERT_OK_AND_ASSIGN(auto value_bytes, reader->Lookup(non_exist_key)); + ASSERT_FALSE(value_bytes); + ASSERT_OK(reader->Close()); + }; - // test deserialize data - ASSERT_OK_AND_ASSIGN(auto de_row, value_serializer->Deserialize(value_bytes)); - auto value_struct_array = std::dynamic_pointer_cast(value_array); - ASSERT_TRUE(value_struct_array); - auto typed_value_array = - std::dynamic_pointer_cast(value_struct_array->field(0)); - ASSERT_TRUE(typed_value_array); - ASSERT_EQ(de_row->GetInt(0), typed_value_array->Value(i)); - } - // test non-exist key - auto non_exist_key = std::make_shared("non-exist", pool.get()); - ASSERT_OK_AND_ASSIGN(auto value_bytes, reader->Lookup(non_exist_key)); - ASSERT_FALSE(value_bytes); - ASSERT_OK(reader->Close()); + check_result({}); + check_result(std::map( + {{Options::LOOKUP_CACHE_BLOOM_FILTER_ENABLED, "false"}})); + check_result( + std::map({{Options::LOOKUP_CACHE_SPILL_COMPRESSION, "lz4"}})); + check_result( + std::map({{Options::SPILL_COMPRESSION_ZSTD_LEVEL, "3"}})); + check_result(std::map({{Options::CACHE_PAGE_SIZE, "4"}})); } } // namespace paimon::test diff --git a/src/paimon/common/sst/sst_file_reader.cpp b/src/paimon/common/sst/sst_file_reader.cpp index 8436da40..c197b4cb 100644 --- a/src/paimon/common/sst/sst_file_reader.cpp +++ b/src/paimon/common/sst/sst_file_reader.cpp @@ -43,8 +43,8 @@ Result> SstFileReader::Create( // read bloom filter directly now auto bloom_filter_handle = footer->GetBloomFilterHandle(); std::shared_ptr bloom_filter = nullptr; - if (bloom_filter_handle->ExpectedEntries() || bloom_filter_handle->Size() || - bloom_filter_handle->Offset()) { + if (bloom_filter_handle && (bloom_filter_handle->ExpectedEntries() || + bloom_filter_handle->Size() || bloom_filter_handle->Offset())) { bloom_filter = std::make_shared(bloom_filter_handle->ExpectedEntries(), bloom_filter_handle->Size()); PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(block_cache->GetBlock( diff --git a/src/paimon/common/sst/sst_file_writer.cpp b/src/paimon/common/sst/sst_file_writer.cpp index 96f95c6f..088ac24b 100644 --- a/src/paimon/common/sst/sst_file_writer.cpp +++ b/src/paimon/common/sst/sst_file_writer.cpp @@ -70,8 +70,8 @@ Result> SstFileWriter::WriteIndexBlock() { } Result> SstFileWriter::WriteBloomFilter() { - if (!bloom_filter_.get()) { - return Status::Invalid("bloom_filter_ should be set before writing"); + if (!bloom_filter_) { + return std::shared_ptr(); } auto data = bloom_filter_->GetBitSet()->ToSlice()->ReadStringView(); auto handle = std::make_shared(out_->GetPos().value(), data.size(), diff --git a/src/paimon/common/sst/sst_file_writer.h b/src/paimon/common/sst/sst_file_writer.h index 75888010..fb75fc3d 100644 --- a/src/paimon/common/sst/sst_file_writer.h +++ b/src/paimon/common/sst/sst_file_writer.h @@ -55,6 +55,7 @@ class SstFileWriter { Result> WriteIndexBlock(); + // When bloom-filter is disabled, return nullptr. Result> WriteBloomFilter(); Status WriteFooter(const std::shared_ptr& index_block_handle, diff --git a/src/paimon/core/utils/fields_comparator.cpp b/src/paimon/core/utils/fields_comparator.cpp index 9ca32aa0..cb74ffc9 100644 --- a/src/paimon/core/utils/fields_comparator.cpp +++ b/src/paimon/core/utils/fields_comparator.cpp @@ -126,8 +126,9 @@ Result FieldsComparator::CompareField( return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); }); case arrow::Type::type::FLOAT: - // TODO(xinyu.lxy): currently in java KeyComparatorSupplier: -inf < -0.0 == +0.0 < +inf - // = nan paimon-cpp: -inf < -0.0 == +0.0 < +inf and nan cannot be compared + // TODO(xinyu.lxy): + // currently in java KeyComparatorSupplier: -inf < -0.0 == +0.0 < +inf = nan + // paimon-cpp: -inf < -0.0 == +0.0 < +inf and nan cannot be compared return FieldsComparator::FieldComparatorFunc( [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> int32_t { float lvalue = lhs.GetFloat(field_idx); @@ -219,28 +220,28 @@ Result FieldsComparator::CompareVariant switch (type) { case arrow::Type::type::BOOL: return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); }); case arrow::Type::type::INT8: return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); }); case arrow::Type::type::INT16: return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); }); case arrow::Type::type::DATE32: return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); @@ -248,28 +249,28 @@ Result FieldsComparator::CompareVariant case arrow::Type::type::INT32: return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); }); case arrow::Type::type::INT64: return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); }); case arrow::Type::type::FLOAT: return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); return CompareFloatingPoint(lvalue, rvalue); }); case arrow::Type::type::DOUBLE: return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); return CompareFloatingPoint(lvalue, rvalue); @@ -277,7 +278,7 @@ Result FieldsComparator::CompareVariant case arrow::Type::type::STRING: { if (use_view) { return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); int32_t cmp = lvalue.compare(rvalue); @@ -285,7 +286,7 @@ Result FieldsComparator::CompareVariant }); } else { return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); int32_t cmp = lvalue.CompareTo(rvalue); @@ -297,7 +298,7 @@ Result FieldsComparator::CompareVariant // TODO(xinyu.lxy): may use 64byte compare if (use_view) { return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); int32_t cmp = lvalue.compare(rvalue); @@ -305,7 +306,7 @@ Result FieldsComparator::CompareVariant }); } else { return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue>(lhs); auto rvalue = DataDefine::GetVariantValue>(rhs); int32_t cmp = lvalue->compare(*rvalue); @@ -315,7 +316,7 @@ Result FieldsComparator::CompareVariant } case arrow::Type::type::TIMESTAMP: { return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1); @@ -323,7 +324,7 @@ Result FieldsComparator::CompareVariant } case arrow::Type::type::DECIMAL: { return FieldsComparator::VariantComparatorFunc( - [field_idx](const VariantType& lhs, const VariantType& rhs) -> int32_t { + [](const VariantType& lhs, const VariantType& rhs) -> int32_t { auto lvalue = DataDefine::GetVariantValue(lhs); auto rvalue = DataDefine::GetVariantValue(rhs); int32_t cmp = lvalue.CompareTo(rvalue); diff --git a/src/paimon/core/utils/fields_comparator.h b/src/paimon/core/utils/fields_comparator.h index 4691482d..95c4d706 100644 --- a/src/paimon/core/utils/fields_comparator.h +++ b/src/paimon/core/utils/fields_comparator.h @@ -59,6 +59,7 @@ class FieldsComparator { /// Java-compatible ordering for floating-point types: /// -infinity < -0.0 < +0.0 < +infinity < NaN == NaN + /// for range index and sst key comparator template static int32_t CompareFloatingPoint(T a, T b) { const bool a_nan = std::isnan(a); From 61d2ad270937674c6390edaa473678aeafdb481b Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 10 Mar 2026 16:50:12 +0800 Subject: [PATCH 3/7] fix --- src/paimon/common/lookup/sort/sort_lookup_store_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp index b017ebbc..732ea892 100644 --- a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp +++ b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 6925d4c5136c570fddee7db9bbdb61e62457e2f1 Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 10 Mar 2026 16:56:17 +0800 Subject: [PATCH 4/7] fix --- include/paimon/defs.h | 1 + src/paimon/common/compression/block_compression_factory.cpp | 1 + src/paimon/common/sst/block_cache.h | 1 + 3 files changed, 3 insertions(+) diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 89550a80..6a102948 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -337,6 +337,7 @@ struct PAIMON_EXPORT Options { static const char LOOKUP_CACHE_BLOOM_FILTER_FPP[]; /// "lookup.cache-spill-compression" - Spill compression for lookup cache, currently zstd, none, /// lz4 and lzo are supported. Default value is zstd. + /// Noted that paimon-cpp does not support lzo for now. static const char LOOKUP_CACHE_SPILL_COMPRESSION[]; /// "spill-compression.zstd-level" - Default spill compression zstd level. For higher /// compression rates, it can be configured to 9, but the read and write speed will diff --git a/src/paimon/common/compression/block_compression_factory.cpp b/src/paimon/common/compression/block_compression_factory.cpp index ea0f2104..36d450d7 100644 --- a/src/paimon/common/compression/block_compression_factory.cpp +++ b/src/paimon/common/compression/block_compression_factory.cpp @@ -16,6 +16,7 @@ #include "paimon/common/compression/block_compression_factory.h" +#include "fmt/format.h" #include "paimon/common/compression/lz4/lz4_block_compression_factory.h" #include "paimon/common/compression/none_block_compression_factory.h" #include "paimon/common/compression/zstd/zstd_block_compression_factory.h" diff --git a/src/paimon/common/sst/block_cache.h b/src/paimon/common/sst/block_cache.h index 0d95fa05..03a6ef94 100644 --- a/src/paimon/common/sst/block_cache.h +++ b/src/paimon/common/sst/block_cache.h @@ -57,6 +57,7 @@ class BlockCache { for (const auto& [key, _] : blocks_) { cache_manager_->InvalidPage(key); } + blocks_.clear(); } private: From 588da2c071ccdfd0719b9bdaae89905a66c61be8 Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 10 Mar 2026 18:24:57 +0800 Subject: [PATCH 5/7] fix --- src/paimon/common/lookup/sort/sort_lookup_store_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp index 732ea892..83e5a6cf 100644 --- a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp +++ b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp @@ -88,7 +88,7 @@ TEST(SortLookupStoreTest, TestSimple) { ASSERT_OK_AND_ASSIGN(auto key_bytes, key_serializer->SerializeToBytes(*key_row)); ASSERT_OK_AND_ASSIGN(auto value_bytes, value_serializer->SerializeToBytes(*value_row)); key_value_bytes_vec.push_back({key_bytes, value_bytes}); - writer->Put(std::move(key_bytes), std::move(value_bytes)); + ASSERT_OK(writer->Put(std::move(key_bytes), std::move(value_bytes))); } ASSERT_OK(writer->Close()); From 9e9f488f85d51a4c3c7778987fbdfea5e8e555d7 Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 10 Mar 2026 20:55:08 +0800 Subject: [PATCH 6/7] fix --- src/paimon/common/lookup/sort/sort_lookup_store_factory.h | 4 ++-- src/paimon/common/lookup/sort/sort_lookup_store_test.cpp | 2 +- src/paimon/common/utils/bloom_filter.cpp | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_factory.h b/src/paimon/common/lookup/sort/sort_lookup_store_factory.h index 6284879d..3917a496 100644 --- a/src/paimon/common/lookup/sort/sort_lookup_store_factory.h +++ b/src/paimon/common/lookup/sort/sort_lookup_store_factory.h @@ -74,11 +74,11 @@ class SortLookupStoreFactory : public LookupStoreFactory { Result> CreateWriter( const std::shared_ptr& fs, const std::string& file_path, const std::shared_ptr& bloom_filter, - const std::shared_ptr& pool) const; + const std::shared_ptr& pool) const override; Result> CreateReader( const std::shared_ptr& fs, const std::string& file_path, - const std::shared_ptr& pool) const; + const std::shared_ptr& pool) const override; private: int32_t block_size_; diff --git a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp index 83e5a6cf..4138d34a 100644 --- a/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp +++ b/src/paimon/common/lookup/sort/sort_lookup_store_test.cpp @@ -87,7 +87,7 @@ TEST(SortLookupStoreTest, TestSimple) { /*struct_array=*/nullptr, value_struct_array->fields(), pool, /*row_id=*/i); ASSERT_OK_AND_ASSIGN(auto key_bytes, key_serializer->SerializeToBytes(*key_row)); ASSERT_OK_AND_ASSIGN(auto value_bytes, value_serializer->SerializeToBytes(*value_row)); - key_value_bytes_vec.push_back({key_bytes, value_bytes}); + key_value_bytes_vec.emplace_back(key_bytes, value_bytes); ASSERT_OK(writer->Put(std::move(key_bytes), std::move(value_bytes))); } ASSERT_OK(writer->Close()); diff --git a/src/paimon/common/utils/bloom_filter.cpp b/src/paimon/common/utils/bloom_filter.cpp index 9edf702f..607f1aa6 100644 --- a/src/paimon/common/utils/bloom_filter.cpp +++ b/src/paimon/common/utils/bloom_filter.cpp @@ -43,7 +43,7 @@ int32_t BloomFilter::OptimalNumOfHashFunctions(int64_t expect_entries, int64_t b } std::shared_ptr BloomFilter::Create(int64_t expect_entries, double fpp) { - int32_t bytes = + auto bytes = static_cast(ceil(BloomFilter::OptimalNumOfBits(expect_entries, fpp) / 8.0)); return std::make_shared(expect_entries, bytes); } From 3148d05aceaf6ae6a46f1821cc76513f99ff40e8 Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Wed, 11 Mar 2026 09:17:25 +0800 Subject: [PATCH 7/7] fix comments --- include/paimon/defs.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 6a102948..f542605c 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -336,8 +336,8 @@ struct PAIMON_EXPORT Options { /// cache bloom filters. Default value is 0.05. static const char LOOKUP_CACHE_BLOOM_FILTER_FPP[]; /// "lookup.cache-spill-compression" - Spill compression for lookup cache, currently zstd, none, - /// lz4 and lzo are supported. Default value is zstd. - /// Noted that paimon-cpp does not support lzo for now. + /// lz4 are supported. Default value is zstd. + /// Noted that java paimon also supports lzo which paimon-cpp does not support for now. static const char LOOKUP_CACHE_SPILL_COMPRESSION[]; /// "spill-compression.zstd-level" - Default spill compression zstd level. For higher /// compression rates, it can be configured to 9, but the read and write speed will