diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index abdb563df..71bc15b41 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -76,7 +76,6 @@ set(PAIMON_COMMON_SRCS common/memory/memory_segment.cpp common/memory/memory_segment_utils.cpp common/memory/memory_slice.cpp - common/memory/memory_slice_input.cpp common/memory/memory_slice_output.cpp common/metrics/metrics_impl.cpp common/metrics/histogram.cpp diff --git a/src/paimon/common/data/abstract_binary_writer.cpp b/src/paimon/common/data/abstract_binary_writer.cpp index 60a54f35c..c0b23194f 100644 --- a/src/paimon/common/data/abstract_binary_writer.cpp +++ b/src/paimon/common/data/abstract_binary_writer.cpp @@ -182,7 +182,8 @@ void AbstractBinaryWriter::Grow(int32_t min_capacity) { if (new_capacity - min_capacity < 0) { new_capacity = min_capacity; } - std::shared_ptr new_bytes = Bytes::CopyOf(*(segment_.GetArray()), new_capacity, pool_); + std::shared_ptr new_bytes = + Bytes::CopyOf(*(segment_.GetOrCreateHeapMemory(pool_)), new_capacity, pool_); segment_ = MemorySegment::Wrap(new_bytes); AfterGrow(); } diff --git a/src/paimon/common/data/binary_string.cpp b/src/paimon/common/data/binary_string.cpp index cd02d96c3..38e15dca6 100644 --- a/src/paimon/common/data/binary_string.cpp +++ b/src/paimon/common/data/binary_string.cpp @@ -279,8 +279,6 @@ bool BinaryString::MatchAtOneSeg(const BinaryString& s, int32_t pos) const { } std::string_view BinaryString::GetStringView() const { - const auto* bytes = segment_.GetArray(); - assert(bytes); - return std::string_view(bytes->data() + offset_, size_in_bytes_); + return std::string_view(segment_.Data() + offset_, size_in_bytes_); } } // namespace paimon diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.cpp b/src/paimon/common/data/serializer/row_compacted_serializer.cpp index d4bc8479d..547d1211e 100644 --- a/src/paimon/common/data/serializer/row_compacted_serializer.cpp +++ b/src/paimon/common/data/serializer/row_compacted_serializer.cpp @@ -524,7 +524,7 @@ std::shared_ptr RowCompactedSerializer::RowWriter::CopyBuffer() const { Status RowCompactedSerializer::RowWriter::WriteUnsignedInt(int32_t value) { EnsureCapacity(VarLengthIntUtils::kMaxVarIntSize); PAIMON_ASSIGN_OR_RAISE(int32_t len, - VarLengthIntUtils::EncodeInt(position_, value, buffer_.get())); + VarLengthIntUtils::EncodeInt(value, buffer_->data() + position_)); position_ += len; return Status::OK(); } @@ -577,7 +577,7 @@ Result RowCompactedSerializer::RowReader::ReadRowKind() const { Result RowCompactedSerializer::RowReader::ReadStringView() { PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt()); - std::string_view str(segment_.GetArray()->data() + position_, length); + std::string_view str(segment_.Data() + position_, length); position_ += length; return str; } diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.h b/src/paimon/common/data/serializer/row_compacted_serializer.h index 3b4fd2ce1..b2989933a 100644 --- a/src/paimon/common/data/serializer/row_compacted_serializer.h +++ b/src/paimon/common/data/serializer/row_compacted_serializer.h @@ -157,8 +157,7 @@ class RowCompactedSerializer { private: Result ReadUnsignedInt() { - const auto* bytes = segment_.GetArray(); - return VarLengthIntUtils::DecodeInt(bytes, &position_); + return VarLengthIntUtils::DecodeInt(segment_.Data(), &position_); } private: diff --git a/src/paimon/common/global_index/btree/btree_global_index_reader.cpp b/src/paimon/common/global_index/btree/btree_global_index_reader.cpp index d4a589a68..9adadd48f 100644 --- a/src/paimon/common/global_index/btree/btree_global_index_reader.cpp +++ b/src/paimon/common/global_index/btree/btree_global_index_reader.cpp @@ -33,7 +33,8 @@ BTreeGlobalIndexReader::BTreeGlobalIndexReader( null_bitmap_(std::move(null_bitmap)), min_key_(min_key), max_key_(max_key), - key_type_(key_type) {} + key_type_(key_type), + comparator_(KeySerializer::CreateComparator(key_type, pool)) {} Result> BTreeGlobalIndexReader::VisitIsNotNull() { return std::make_shared( @@ -260,9 +261,13 @@ Result BTreeGlobalIndexReader::RangeQuery(const std::optional from_bytes, KeySerializer::SerializeKey(from.value(), key_type_, pool_.get())); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr to_bytes, + KeySerializer::SerializeKey(to.value(), key_type_, pool_.get())); + MemorySlice from_slice = MemorySlice::Wrap(from_bytes); + MemorySlice to_slice = MemorySlice::Wrap(to_bytes); + auto index_iterator = sst_file_reader_->CreateIndexIterator(); - PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool seek_result, - index_iterator->SeekTo(MemorySlice::Wrap(from_bytes))); + PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool seek_result, index_iterator->SeekTo(from_slice)); bool first_block = true; while (index_iterator->HasNext()) { @@ -276,33 +281,33 @@ Result BTreeGlobalIndexReader::RangeQuery(const std::optionalSeekTo(MemorySlice::Wrap(from_bytes))); + PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool found, data_iterator->SeekTo(from_slice)); first_block = false; } // Iterate through entries in the data block while (data_iterator->HasNext()) { - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr entry, data_iterator->Next()); - PAIMON_ASSIGN_OR_RAISE( - Literal key, KeySerializer::DeserializeKey(entry->key, key_type_, pool_.get())); - PAIMON_ASSIGN_OR_RAISE(int32_t cmp_from, key.CompareTo(from.value())); + PAIMON_ASSIGN_OR_RAISE(BlockEntry entry, data_iterator->Next()); + + // Compare entry key against from bound + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_from, comparator_(entry.key, from_slice)); // Check lower bound if (!from_inclusive && cmp_from == 0) { continue; } - // Check upper bound - PAIMON_ASSIGN_OR_RAISE(int32_t cmp_to, key.CompareTo(to.value())); + // Compare entry key against to bound + PAIMON_ASSIGN_OR_RAISE(int32_t cmp_to, comparator_(entry.key, to_slice)); if (cmp_to > 0 || (!to_inclusive && cmp_to == 0)) { return result; } - PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry->value, &result)); + PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry.value, &result)); } } + return result; } diff --git a/src/paimon/common/global_index/btree/btree_global_index_reader.h b/src/paimon/common/global_index/btree/btree_global_index_reader.h index e994c986a..fc39450d1 100644 --- a/src/paimon/common/global_index/btree/btree_global_index_reader.h +++ b/src/paimon/common/global_index/btree/btree_global_index_reader.h @@ -24,6 +24,7 @@ #include "arrow/api.h" #include "paimon/common/global_index/btree/btree_defs.h" +#include "paimon/common/global_index/btree/key_serializer.h" #include "paimon/common/sst/sst_file_reader.h" #include "paimon/global_index/global_index_io_meta.h" #include "paimon/global_index/global_index_reader.h" @@ -101,6 +102,7 @@ class BTreeGlobalIndexReader : public GlobalIndexReader, std::optional min_key_; std::optional max_key_; std::shared_ptr key_type_; + MemorySlice::SliceComparator comparator_; }; } // namespace paimon diff --git a/src/paimon/common/global_index/btree/btree_index_meta.cpp b/src/paimon/common/global_index/btree/btree_index_meta.cpp index 2b2bf8f9f..a6e9c5a1b 100644 --- a/src/paimon/common/global_index/btree/btree_index_meta.cpp +++ b/src/paimon/common/global_index/btree/btree_index_meta.cpp @@ -27,12 +27,12 @@ std::shared_ptr BTreeIndexMeta::Deserialize(const std::shared_pt auto first_key_len = input.ReadInt(); std::shared_ptr first_key; if (first_key_len) { - first_key = input.ReadSlice(first_key_len).CopyBytes(pool); + first_key = input.ReadSliceView(first_key_len).CopyBytes(pool); } auto last_key_len = input.ReadInt(); std::shared_ptr last_key; if (last_key_len) { - last_key = input.ReadSlice(last_key_len).CopyBytes(pool); + last_key = input.ReadSliceView(last_key_len).CopyBytes(pool); } auto has_nulls = input.ReadByte() == static_cast(1); return std::make_shared(first_key, last_key, has_nulls); @@ -62,7 +62,7 @@ std::shared_ptr BTreeIndexMeta::Serialize(paimon::MemoryPool* pool) const // Write has_nulls output.WriteValue(static_cast(has_nulls_ ? 1 : 0)); - return output.ToSlice().GetHeapMemory(); + return output.ToSlice().GetOrCreateHeapMemory(pool); } } // namespace paimon diff --git a/src/paimon/common/global_index/btree/key_serializer.cpp b/src/paimon/common/global_index/btree/key_serializer.cpp index dc5456907..6228a4a89 100644 --- a/src/paimon/common/global_index/btree/key_serializer.cpp +++ b/src/paimon/common/global_index/btree/key_serializer.cpp @@ -21,6 +21,7 @@ #include "paimon/common/memory/memory_slice_output.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/common/utils/field_type_utils.h" +#include "paimon/common/utils/fields_comparator.h" #include "paimon/common/utils/preconditions.h" #include "paimon/data/decimal.h" #include "paimon/data/timestamp.h" @@ -195,6 +196,66 @@ Result KeySerializer::DeserializeKey(const MemorySlice& slice, MemorySlice::SliceComparator KeySerializer::CreateComparator( const std::shared_ptr& type, const std::shared_ptr& pool) { + // Fast paths for integer and string types: direct value comparison without Literal + // deserialization, avoiding heap allocations entirely. + switch (type->id()) { + case arrow::Type::type::STRING: + return [](const MemorySlice& a, const MemorySlice& b) -> Result { + std::string_view sv_a = a.ReadStringView(); + std::string_view sv_b = b.ReadStringView(); + int32_t cmp = sv_a.compare(sv_b); + return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1); + }; + case arrow::Type::type::BOOL: + case arrow::Type::type::INT8: + return [](const MemorySlice& a, const MemorySlice& b) -> Result { + int8_t va = a.ReadByte(0); + int8_t vb = b.ReadByte(0); + return (va < vb) ? -1 : (va > vb ? 1 : 0); + }; + case arrow::Type::type::INT16: + return [](const MemorySlice& a, const MemorySlice& b) -> Result { + int16_t va = a.ReadShort(0); + int16_t vb = b.ReadShort(0); + return (va < vb) ? -1 : (va > vb ? 1 : 0); + }; + case arrow::Type::type::INT32: + case arrow::Type::type::DATE32: + return [](const MemorySlice& a, const MemorySlice& b) -> Result { + int32_t va = a.ReadInt(0); + int32_t vb = b.ReadInt(0); + return (va < vb) ? -1 : (va > vb ? 1 : 0); + }; + case arrow::Type::type::INT64: + return [](const MemorySlice& a, const MemorySlice& b) -> Result { + int64_t va = a.ReadLong(0); + int64_t vb = b.ReadLong(0); + return (va < vb) ? -1 : (va > vb ? 1 : 0); + }; + case arrow::Type::type::FLOAT: + return [](const MemorySlice& a, const MemorySlice& b) -> Result { + int32_t ia = a.ReadInt(0); + int32_t ib = b.ReadInt(0); + float fa, fb; + memcpy(&fa, &ia, sizeof(float)); + memcpy(&fb, &ib, sizeof(float)); + return FieldsComparator::CompareFloatingPoint(fa, fb); + }; + case arrow::Type::type::DOUBLE: + return [](const MemorySlice& a, const MemorySlice& b) -> Result { + int64_t ia = a.ReadLong(0); + int64_t ib = b.ReadLong(0); + double da, db; + memcpy(&da, &ia, sizeof(double)); + memcpy(&db, &ib, sizeof(double)); + return FieldsComparator::CompareFloatingPoint(da, db); + }; + default: + break; + } + + // Fallback for complex types (TIMESTAMP, DECIMAL, etc.): + // deserialize to Literal and compare. return [pool = pool, type = type](const MemorySlice& a, const MemorySlice& b) -> Result { PAIMON_ASSIGN_OR_RAISE(Literal la, DeserializeKey(a, type, pool.get())); diff --git a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp index c5f3bdf1f..de054607e 100644 --- a/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp +++ b/src/paimon/common/global_index/btree/lazy_filtered_btree_reader.cpp @@ -234,6 +234,7 @@ Result> LazyFilteredBTreeReader::GetOrCreateR Result> LazyFilteredBTreeReader::CreateSingleReader( const GlobalIndexIOMeta& meta) { + // Create comparator based on field type auto comparator = KeySerializer::CreateComparator(key_type_, pool_); // Deserialize min/max keys from meta @@ -298,7 +299,7 @@ Result LazyFilteredBTreeReader::ReadNullBitmap( auto slice_input = slice.ToInput(); // Read null bitmap data - auto null_bitmap_bytes = slice_input.ReadSlice(block_handle->Size()).CopyBytes(pool_.get()); + auto null_bitmap_bytes = slice_input.ReadSliceView(block_handle->Size()).CopyBytes(pool_.get()); // Calculate and verify CRC32C checksum uint32_t calculated_crc = diff --git a/src/paimon/common/io/cache/lru_cache_test.cpp b/src/paimon/common/io/cache/lru_cache_test.cpp index c853b44cb..62767079f 100644 --- a/src/paimon/common/io/cache/lru_cache_test.cpp +++ b/src/paimon/common/io/cache/lru_cache_test.cpp @@ -46,7 +46,7 @@ class LruCacheTest : public ::testing::Test { std::shared_ptr MakeValue(int32_t size, char fill_byte = 0, CacheCallback callback = {}) const { auto segment = MemorySegment::AllocateHeapMemory(size, pool_.get()); - std::memset(segment.GetHeapMemory()->data(), fill_byte, size); + std::memset(segment.MutableData(), fill_byte, size); return std::make_shared(segment, std::move(callback)); } diff --git a/src/paimon/common/memory/memory_segment.cpp b/src/paimon/common/memory/memory_segment.cpp index 836010e7d..6182349ac 100644 --- a/src/paimon/common/memory/memory_segment.cpp +++ b/src/paimon/common/memory/memory_segment.cpp @@ -58,33 +58,6 @@ int32_t MemorySegment::Compare(const MemorySegment& seg2, int32_t offset1, int32 return c == 0 ? (len1 - len2) : c; } -void MemorySegment::SwapBytes(Bytes* temp_buffer, MemorySegment* seg2, int32_t offset1, - int32_t offset2, int32_t len) { - if ((offset1 | offset2 | len | (temp_buffer->size() - len) | - (heap_memory_->size() - (offset1 + len)) | - (seg2->heap_memory_->size() - (offset2 + len))) >= 0) { - // this -> temp buffer - std::memcpy(temp_buffer->data(), heap_memory_->data() + offset1, len); - - // other -> this - std::memcpy(heap_memory_->data() + offset1, seg2->heap_memory_->data() + offset2, len); - - // temp buffer -> other - std::memcpy(seg2->heap_memory_->data() + offset2, temp_buffer->data(), len); - return; - } - assert(false); - - // index is in fact invalid - // return Status::InternalError( - // "IndexOutOfBoundsException", - // fmt::format("offset1={}, offset2={}, len={}, temp buffer size={}, heap - // mem 1 " - // "size={}, heap mem 2 size={}", - // offset1, offset2, len, temp_buffer->size(), - // heap_memory_->size(), seg2->heap_memory_->size())); -} - bool MemorySegment::EqualTo(const MemorySegment& seg2, int32_t offset1, int32_t offset2, int32_t length) const { int32_t i = 0; diff --git a/src/paimon/common/memory/memory_segment.h b/src/paimon/common/memory/memory_segment.h index 909e5fb76..d1d24c9f7 100644 --- a/src/paimon/common/memory/memory_segment.h +++ b/src/paimon/common/memory/memory_segment.h @@ -29,107 +29,121 @@ namespace paimon { class MemoryPool; /// This class represents a piece of memory. +/// +/// Supports two modes: +/// - Owning mode: holds a shared_ptr for lifetime management. +/// - Non-owning (view) mode: holds a raw pointer to external data. +/// The caller must ensure the underlying memory outlives this segment. class PAIMON_EXPORT MemorySegment { public: - MemorySegment() = default; + MemorySegment() : data_(nullptr), size_(0) {} + /// Wrap a shared_ptr to create an owning segment. static MemorySegment Wrap(const std::shared_ptr& buffer) { return MemorySegment(buffer); } + /// Create a non-owning segment that references external memory. + /// The caller must guarantee that `data` remains valid for the lifetime of this segment. + static MemorySegment WrapView(const char* data, int32_t size) { + return MemorySegment(data, size); + } + static MemorySegment AllocateHeapMemory(int32_t size, MemoryPool* pool) { assert(pool); return Wrap(Bytes::AllocateBytes(size, pool)); } - MemorySegment(const MemorySegment& other) { - heap_memory_ = other.heap_memory_; - } - + MemorySegment(const MemorySegment& other) = default; MemorySegment& operator=(const MemorySegment& other) = default; bool operator==(const MemorySegment& other) const { if (this == &other) { return true; } - if (heap_memory_ == other.heap_memory_) { + if (data_ == other.data_ && size_ == other.size_) { return true; } - if (!heap_memory_ || !other.heap_memory_) { + if (!data_ || !other.data_) { + return false; + } + if (size_ != other.size_) { return false; } - return *heap_memory_ == *other.heap_memory_; + return std::memcmp(data_, other.data_, size_) == 0; } inline int32_t Size() const { - return heap_memory_->size(); + return size_; } - inline bool IsOffHeap() const { - return false; - } - inline Bytes* GetArray() const { - return heap_memory_.get(); + + /// Returns the raw data pointer (valid for both owning and non-owning segments). + inline const char* Data() const { + return data_; } inline char Get(int32_t index) const { - return *(heap_memory_->data() + index); + return *(data_ + index); } + + /// Returns a mutable pointer to the data. Use with caution on non-owning segments. + inline char* MutableData() { + return const_cast(data_); + } + inline void Put(int32_t index, char b) { - (*heap_memory_)[index] = b; + MutableData()[index] = b; } + inline void Get(int32_t index, Bytes* dst) const { return Get(index, dst, /*offset=*/0, dst->size()); } + inline void Put(int32_t index, const Bytes& src) { return Put(index, src, /*offset=*/0, src.size()); } + template inline void Get(int32_t index, T* dst, int32_t offset, int32_t length) const { - // check the byte array offset and length and the status assert(static_cast(dst->size()) >= (offset + length)); - assert(static_cast(heap_memory_->size()) >= (index + length)); - std::memcpy(const_cast(dst->data()) + offset, heap_memory_->data() + index, length); + assert(size_ >= (index + length)); + std::memcpy(const_cast(dst->data()) + offset, data_ + index, length); } template inline void Put(int32_t index, const T& src, int32_t offset, int32_t length) { - // check the byte array offset and length assert(static_cast(src.size()) >= (offset + length)); - assert(static_cast(heap_memory_->size()) >= (index + length)); - std::memcpy(heap_memory_->data() + index, src.data() + offset, length); + assert(size_ >= (index + length)); + std::memcpy(MutableData() + index, src.data() + offset, length); } - // only support: bool, char, int16_t, int32_t, int64_t, double, float template T GetValue(int32_t index) const { static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); T value; - std::memcpy(&value, heap_memory_->data() + index, sizeof(T)); + std::memcpy(&value, data_ + index, sizeof(T)); return value; } - // only support: bool, char, int16_t, int32_t, int64_t, double, float template void PutValue(int32_t index, const T& value) { static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); - std::memcpy(heap_memory_->data() + index, &value, sizeof(T)); - } - // TODO(yonghao.fyh): support bulk read / write + std::memcpy(MutableData() + index, &value, sizeof(T)); + } void CopyTo(int32_t offset, MemorySegment* target, int32_t target_offset, int32_t num_bytes) const { assert(offset >= 0); assert(target_offset >= 0); assert(num_bytes >= 0); - std::memcpy(target->heap_memory_->data() + target_offset, heap_memory_->data() + offset, - num_bytes); + + std::memcpy(target->MutableData() + target_offset, data_ + offset, num_bytes); } void CopyToUnsafe(int32_t offset, void* target, int32_t target_offset, int32_t num_bytes) const { - std::memcpy(static_cast(target) + target_offset, heap_memory_->data() + offset, - num_bytes); + std::memcpy(static_cast(target) + target_offset, data_ + offset, num_bytes); } int32_t Compare(const MemorySegment& seg2, int32_t offset1, int32_t offset2, int32_t len) const; @@ -137,31 +151,38 @@ class PAIMON_EXPORT MemorySegment { int32_t Compare(const MemorySegment& seg2, int32_t offset1, int32_t offset2, int32_t len1, int32_t len2) const; - void SwapBytes(Bytes* temp_buffer, MemorySegment* seg2, int32_t offset1, int32_t offset2, - int32_t len); - /// Equals two memory segment regions. - /// - /// @param seg2 Segment to equal this segment with - /// @param offset1 Offset of this segment to start equaling - /// @param offset2 Offset of seg2 to start equaling - /// @param length Size of the equaled memory region - /// @return true if equal, false otherwise bool EqualTo(const MemorySegment& seg2, int32_t offset1, int32_t offset2, int32_t length) const; - /// Get the heap byte array object. - /// - /// @return Return non-null if the memory is on the heap, and return null if the - /// memory if off the heap. - std::shared_ptr GetHeapMemory() const { - return heap_memory_; + std::shared_ptr GetOrCreateHeapMemory(MemoryPool* pool) const { + if (heap_memory_) { + return heap_memory_; + } + if (!data_) { + return nullptr; + } + auto copy = std::make_shared(size_, pool); + std::memcpy(const_cast(copy->data()), data_, size_); + return copy; } private: - explicit MemorySegment(const std::shared_ptr& heap_memory) : heap_memory_(heap_memory) { + /// Owning constructor. + explicit MemorySegment(const std::shared_ptr& heap_memory) + : heap_memory_(heap_memory), + data_(heap_memory->data()), + size_(static_cast(heap_memory->size())) { assert(heap_memory_); } + /// Non-owning constructor. + MemorySegment(const char* data, int32_t size) + : heap_memory_(nullptr), data_(data), size_(size) { + assert(data != nullptr || size == 0); + } + std::shared_ptr heap_memory_; + const char* data_; + int32_t size_; }; template <> diff --git a/src/paimon/common/memory/memory_segment_test.cpp b/src/paimon/common/memory/memory_segment_test.cpp index 07149d308..c991c4944 100644 --- a/src/paimon/common/memory/memory_segment_test.cpp +++ b/src/paimon/common/memory/memory_segment_test.cpp @@ -160,31 +160,6 @@ TEST(MemorySegmentTest, TestCompare) { ASSERT_LT(seg1.Compare(seg2, i, i, 9, 9), 0); } -TEST(MemorySegmentTest, TestSwapBytes) { - auto pool = GetDefaultPool(); - int32_t str_size = 1024; - std::string test_string1, test_string2; - test_string1.reserve(str_size); - test_string2.reserve(str_size); - for (int32_t j = 0; j < str_size; j++) { - test_string1 += static_cast(paimon::test::RandomNumber(0, 25) + 'a'); - test_string2 += static_cast(paimon::test::RandomNumber(0, 25) + 'a'); - } - std::shared_ptr bytes1 = Bytes::AllocateBytes(test_string1, pool.get()); - std::shared_ptr bytes2 = Bytes::AllocateBytes(test_string2, pool.get()); - - MemorySegment seg1 = MemorySegment::Wrap(bytes1); - MemorySegment seg2 = MemorySegment::Wrap(bytes2); - - std::shared_ptr bytes1_serialize = Bytes::AllocateBytes(test_string2.size(), pool.get()); - std::shared_ptr bytes2_serialize = Bytes::AllocateBytes(test_string1.size(), pool.get()); - seg1.SwapBytes(bytes1_serialize.get(), &seg2, 0, 0, str_size); - seg1.CopyToUnsafe(0, bytes1_serialize->data(), 0, str_size); - seg2.CopyToUnsafe(0, bytes2_serialize->data(), 0, str_size); - ASSERT_EQ(test_string1, std::string(bytes2_serialize->data(), bytes2_serialize->size())); - ASSERT_EQ(test_string2, std::string(bytes1_serialize->data(), bytes1_serialize->size())); -} - TEST(MemorySegmentTest, TestCharAccess) { auto pool = paimon::GetDefaultPool(); int32_t page_size = 64 * 1024; @@ -673,4 +648,84 @@ TEST(MemorySegmentTest, TestEqual) { ASSERT_FALSE(seg1 == seg2); ASSERT_FALSE(seg2 == seg1); } + +TEST(MemorySegmentTest, TestNonOwningWrapView) { + // Prepare owning data as source + auto pool = paimon::GetDefaultPool(); + std::string source_data = "Hello, WrapView MemorySegment!"; + auto owning_bytes = std::make_shared(source_data, pool.get()); + const char* raw_ptr = owning_bytes->data(); + auto raw_size = static_cast(owning_bytes->size()); + + // Create non-owning segment via WrapView + auto seg = MemorySegment::WrapView(raw_ptr, raw_size); + + // --- Data() / Size() --- + ASSERT_EQ(seg.Data(), raw_ptr); + ASSERT_EQ(seg.Size(), raw_size); + + // --- Get(index) --- + for (int32_t i = 0; i < raw_size; ++i) { + ASSERT_EQ(seg.Get(i), source_data[i]); + } + + // --- GetValue --- + // Read first 4 bytes as int32 + int32_t expected_int; + std::memcpy(&expected_int, raw_ptr, sizeof(int32_t)); + ASSERT_EQ(seg.GetValue(0), expected_int); + + // Read first 8 bytes as int64 + int64_t expected_long; + std::memcpy(&expected_long, raw_ptr, sizeof(int64_t)); + ASSERT_EQ(seg.GetValue(0), expected_long); + + // --- MutableData() + Put(index, char) --- + char original_char = seg.Get(0); + seg.Put(0, 'X'); + ASSERT_EQ(seg.Get(0), 'X'); + seg.Put(0, original_char); // restore + ASSERT_EQ(seg.Get(0), original_char); + + // --- Put(index, src, offset, length) --- + std::string patch = "AB"; + seg.Put(0, patch, 0, 2); + ASSERT_EQ(seg.Get(0), 'A'); + ASSERT_EQ(seg.Get(1), 'B'); + + // --- PutValue --- + int16_t val16 = 0x1234; + seg.PutValue(0, val16); + ASSERT_EQ(seg.GetValue(0), val16); + + // --- Compare --- + auto seg2 = MemorySegment::WrapView(raw_ptr, raw_size); + // Both point to same data (we mutated seg's underlying data, seg2 sees it too) + ASSERT_EQ(seg.Compare(seg2, 0, 0, raw_size), 0); + + // --- EqualTo --- + ASSERT_TRUE(seg.EqualTo(seg2, 2, 2, raw_size - 2)); + + // --- CopyTo owning target --- + auto target = MemorySegment::AllocateHeapMemory(raw_size, pool.get()); + seg.CopyTo(0, &target, 0, raw_size); + ASSERT_EQ(seg.Compare(target, 0, 0, raw_size), 0); + + // --- CopyToUnsafe --- + std::vector buf(raw_size); + seg.CopyToUnsafe(0, buf.data(), 0, raw_size); + ASSERT_EQ(std::memcmp(buf.data(), seg.Data(), raw_size), 0); + + // --- GetOrCreateHeapMemory on non-owning: should copy --- + auto heap = seg.GetOrCreateHeapMemory(pool.get()); + ASSERT_NE(heap, nullptr); + ASSERT_EQ(static_cast(heap->size()), raw_size); + // Returned copy should be independent: modifying seg shouldn't affect heap + seg.Put(0, 'Z'); + ASSERT_NE((*heap)[0], 'Z'); + + // --- operator== --- + auto seg3 = MemorySegment::WrapView(seg.Data(), seg.Size()); + ASSERT_EQ(seg, seg3); +} } // namespace paimon::test diff --git a/src/paimon/common/memory/memory_segment_utils.cpp b/src/paimon/common/memory/memory_segment_utils.cpp index c2f844e73..902713d97 100644 --- a/src/paimon/common/memory/memory_segment_utils.cpp +++ b/src/paimon/common/memory/memory_segment_utils.cpp @@ -104,7 +104,7 @@ std::shared_ptr MemorySegmentUtils::GetBytes(const std::vector heap_memory = segments[0].GetHeapMemory(); + std::shared_ptr heap_memory = segments[0].GetOrCreateHeapMemory(pool); if (base_offset == 0 && heap_memory != nullptr && static_cast(heap_memory->size()) == size_in_bytes) { return heap_memory; diff --git a/src/paimon/common/memory/memory_slice.cpp b/src/paimon/common/memory/memory_slice.cpp index b8d3d0e4d..9efa63f67 100644 --- a/src/paimon/common/memory/memory_slice.cpp +++ b/src/paimon/common/memory/memory_slice.cpp @@ -46,8 +46,8 @@ int32_t MemorySlice::Offset() const { return offset_; } -std::shared_ptr MemorySlice::GetHeapMemory() const { - return segment_.GetHeapMemory(); +std::shared_ptr MemorySlice::GetOrCreateHeapMemory(MemoryPool* pool) const { + return segment_.GetOrCreateHeapMemory(pool); } const MemorySegment& MemorySlice::GetSegment() const { @@ -71,14 +71,12 @@ int64_t MemorySlice::ReadLong(int32_t position) const { } std::string_view MemorySlice::ReadStringView() const { - auto array = segment_.GetArray(); - return {array->data() + offset_, static_cast(length_)}; + return {Data(), static_cast(length_)}; } std::shared_ptr MemorySlice::CopyBytes(MemoryPool* pool) const { auto bytes = std::make_shared(length_, pool); - auto target = MemorySegment::Wrap(bytes); - segment_.CopyTo(offset_, &target, 0, length_); + std::memcpy(const_cast(bytes->data()), Data(), length_); return bytes; } diff --git a/src/paimon/common/memory/memory_slice.h b/src/paimon/common/memory/memory_slice.h index d8bca924d..4b9fcb482 100644 --- a/src/paimon/common/memory/memory_slice.h +++ b/src/paimon/common/memory/memory_slice.h @@ -30,7 +30,10 @@ namespace paimon { class MemoryPool; class MemorySliceInput; -/// Slice of a MemorySegment. +/// Slice of a MemorySegment. +/// +/// Represents a sub-range [offset_, offset_ + length_) of a MemorySegment. +/// The MemorySegment may be owning (with shared_ptr) or non-owning (view). class PAIMON_EXPORT MemorySlice { public: static MemorySlice Wrap(const std::shared_ptr& bytes); @@ -39,14 +42,21 @@ class PAIMON_EXPORT MemorySlice { using SliceComparator = std::function(const MemorySlice&, const MemorySlice&)>; public: + /// Construct a slice over a segment with given offset and length. MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length); + MemorySlice Slice(int32_t index, int32_t length) const; int32_t Length() const; int32_t Offset() const; - std::shared_ptr GetHeapMemory() const; + std::shared_ptr GetOrCreateHeapMemory(MemoryPool* pool) const; const MemorySegment& GetSegment() const; + /// Returns a raw pointer to the start of this slice's data. + inline const char* Data() const { + return segment_.Data() + offset_; + } + int8_t ReadByte(int32_t position) const; int32_t ReadInt(int32_t position) const; int16_t ReadShort(int32_t position) const; diff --git a/src/paimon/common/memory/memory_slice_input.cpp b/src/paimon/common/memory/memory_slice_input.cpp deleted file mode 100644 index 3c1404d31..000000000 --- a/src/paimon/common/memory/memory_slice_input.cpp +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2026-present Alibaba Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "paimon/common/memory/memory_slice_input.h" - -#include "fmt/format.h" -#include "paimon/common/utils/math.h" - -namespace paimon { - -MemorySliceInput::MemorySliceInput(const MemorySlice& slice) : slice_(slice), position_(0) {} - -int32_t MemorySliceInput::Position() const { - return position_; -} - -Status MemorySliceInput::SetPosition(int32_t position) { - if (position < 0 || position > slice_.Length()) { - return Status::IndexError(fmt::format("position {} index out of bounds", position)); - } - position_ = position; - return Status::OK(); -} - -bool MemorySliceInput::IsReadable() const { - return Available() > 0; -} - -int32_t MemorySliceInput::Available() const { - return slice_.Length() - position_; -} - -int8_t MemorySliceInput::ReadByte() { - return slice_.ReadByte(position_++); -} - -int8_t MemorySliceInput::ReadUnsignedByte() { - return static_cast(ReadByte() & 0xFF); -} - -int32_t MemorySliceInput::ReadInt() { - int32_t v = slice_.ReadInt(position_); - position_ += 4; - if (NeedSwap()) { - return EndianSwapValue(v); - } - return v; -} - -int64_t MemorySliceInput::ReadLong() { - int64_t v = slice_.ReadLong(position_); - position_ += 8; - if (NeedSwap()) { - return EndianSwapValue(v); - } - return v; -} - -Result MemorySliceInput::ReadVarLenInt() { - for (int32_t offset = 0, result = 0; offset < 32; offset += 7) { - int32_t b = ReadUnsignedByte(); - result |= (b & 0x7F) << offset; - if ((b & 0x80) == 0) { - return result; - } - } - return Status::Invalid("Malformed integer."); -} - -Result MemorySliceInput::ReadVarLenLong() { - int64_t result = 0; - for (int32_t offset = 0; offset < 64; offset += 7) { - int64_t b = ReadUnsignedByte(); - result |= (b & 0x7F) << offset; - if ((b & 0x80) == 0) { - return result; - } - } - return Status::Invalid("Malformed long."); -} - -void MemorySliceInput::SetOrder(ByteOrder order) { - byte_order_ = order; -} - -bool MemorySliceInput::NeedSwap() const { - return SystemByteOrder() != byte_order_; -} - -MemorySlice MemorySliceInput::ReadSlice(int32_t length) { - auto slice = slice_.Slice(position_, length); - position_ += length; - return slice; -} - -} // namespace paimon diff --git a/src/paimon/common/memory/memory_slice_input.h b/src/paimon/common/memory/memory_slice_input.h index f85de0da4..3536e375e 100644 --- a/src/paimon/common/memory/memory_slice_input.h +++ b/src/paimon/common/memory/memory_slice_input.h @@ -22,41 +22,103 @@ #include #include +#include "fmt/format.h" #include "paimon/common/memory/memory_slice.h" +#include "paimon/common/utils/math.h" +#include "paimon/common/utils/var_length_int_utils.h" #include "paimon/io/byte_order.h" #include "paimon/status.h" #include "paimon/visibility.h" - namespace paimon { class MemoryPool; -/// Slice of a MemorySegment. +/// Input stream over a MemorySlice with inline hot-path methods. class PAIMON_EXPORT MemorySliceInput { public: - explicit MemorySliceInput(const MemorySlice& slice); + explicit MemorySliceInput(const MemorySlice& slice) + : slice_(slice), data_(slice.Data()), length_(slice.Length()) {} + + inline int32_t Position() const { + return position_; + } + + inline Status SetPosition(int32_t position) { + if (position < 0 || position > length_) { + return Status::IndexError(fmt::format("position {} index out of bounds", position)); + } + position_ = position; + return Status::OK(); + } + + inline bool IsReadable() const { + return position_ < length_; + } + + inline int32_t Available() const { + return length_ - position_; + } + + inline int8_t ReadByte() { + int8_t value; + std::memcpy(&value, data_ + position_, 1); + position_++; + return value; + } + + inline int8_t ReadUnsignedByte() { + return static_cast(static_cast(data_[position_++])); + } + + inline int32_t ReadInt() { + int32_t v; + std::memcpy(&v, data_ + position_, sizeof(v)); + position_ += 4; + if (NeedSwap()) { + return EndianSwapValue(v); + } + return v; + } + + inline int64_t ReadLong() { + int64_t v; + std::memcpy(&v, data_ + position_, sizeof(v)); + position_ += 8; + if (NeedSwap()) { + return EndianSwapValue(v); + } + return v; + } - int32_t Position() const; - Status SetPosition(int32_t position); + /// Reads a varint32 from the current position. + inline Result ReadVarLenInt() { + return VarLengthIntUtils::DecodeInt(data_, &position_); + } - bool IsReadable() const; - int32_t Available() const; + /// Reads a varint64 from the current position. + inline Result ReadVarLenLong() { + return VarLengthIntUtils::DecodeLong(data_, &position_); + } - int8_t ReadByte(); - int8_t ReadUnsignedByte(); - int32_t ReadInt(); - int64_t ReadLong(); - Result ReadVarLenInt(); - Result ReadVarLenLong(); - MemorySlice ReadSlice(int32_t length); + inline MemorySlice ReadSliceView(int32_t length) { + auto view_segment = MemorySegment::WrapView(data_ + position_, length); + position_ += length; + return MemorySlice(view_segment, 0, length); + } - void SetOrder(ByteOrder order); + void SetOrder(ByteOrder order) { + byte_order_ = order; + } private: - bool NeedSwap() const; + inline bool NeedSwap() const { + return SystemByteOrder() != byte_order_; + } private: MemorySlice slice_; - int32_t position_; + const char* data_; // Cached raw pointer for fast access. + int32_t length_; // Cached length. + int32_t position_ = 0; ByteOrder byte_order_ = SystemByteOrder(); }; diff --git a/src/paimon/common/memory/memory_slice_output.cpp b/src/paimon/common/memory/memory_slice_output.cpp index 04b985ea9..7674ea211 100644 --- a/src/paimon/common/memory/memory_slice_output.cpp +++ b/src/paimon/common/memory/memory_slice_output.cpp @@ -18,6 +18,7 @@ #include "fmt/format.h" #include "paimon/common/utils/math.h" +#include "paimon/common/utils/var_length_int_utils.h" namespace paimon { MemorySliceOutput::MemorySliceOutput(int32_t estimated_size, MemoryPool* pool) { @@ -51,26 +52,18 @@ void MemorySliceOutput::WriteValue(T value) { } Status MemorySliceOutput::WriteVarLenInt(int32_t value) { - if (value < 0) { - return Status::Invalid(fmt::format("negative value: v={}", value)); - } - while ((value & ~0x7F) != 0) { - WriteValue(static_cast((value & 0x7F) | 0x80)); - value >>= 7; - } - WriteValue(static_cast(value)); + EnsureSize(size_ + VarLengthIntUtils::kMaxVarIntSize); + PAIMON_ASSIGN_OR_RAISE(int32_t bytes_written, + VarLengthIntUtils::EncodeInt(value, segment_.MutableData() + size_)); + size_ += bytes_written; return Status::OK(); } Status MemorySliceOutput::WriteVarLenLong(int64_t value) { - if (value < 0) { - return Status::Invalid(fmt::format("negative value: v={}", value)); - } - while ((value & ~0x7F) != 0) { - WriteValue(static_cast((value & 0x7F) | 0x80)); - value >>= 7; - } - WriteValue(static_cast(value)); + EnsureSize(size_ + VarLengthIntUtils::kMaxVarLongSize); + PAIMON_ASSIGN_OR_RAISE(int32_t bytes_written, + VarLengthIntUtils::EncodeLong(value, segment_.MutableData() + size_)); + size_ += bytes_written; return Status::OK(); } diff --git a/src/paimon/common/memory/memory_slice_test.cpp b/src/paimon/common/memory/memory_slice_test.cpp index 697f13ee4..ba776dd78 100644 --- a/src/paimon/common/memory/memory_slice_test.cpp +++ b/src/paimon/common/memory/memory_slice_test.cpp @@ -273,10 +273,10 @@ TEST_F(MemorySliceTest, TestSliceCopyBytes) { ASSERT_NE(bytes->data(), copied->data()); } -TEST_F(MemorySliceTest, TestSliceGetHeapMemory) { +TEST_F(MemorySliceTest, TestSliceGetOrCreateHeapMemory) { auto bytes = std::make_shared("test", pool_.get()); MemorySlice slice = MemorySlice::Wrap(bytes); - ASSERT_EQ(bytes, slice.GetHeapMemory()); + ASSERT_EQ(bytes, slice.GetOrCreateHeapMemory(pool_.get())); } TEST_F(MemorySliceTest, TestSliceToInput) { @@ -372,17 +372,17 @@ TEST_F(MemorySliceTest, TestInputReadIntAndLong) { ASSERT_EQ(INT64_MIN, input.ReadLong()); } -TEST_F(MemorySliceTest, TestInputReadSlice) { +TEST_F(MemorySliceTest, TestInputReadSliceView) { std::string data = "abcdefghij"; auto bytes = std::make_shared(data, pool_.get()); MemorySlice slice = MemorySlice::Wrap(bytes); MemorySliceInput input(slice); - MemorySlice sub = input.ReadSlice(5); + MemorySlice sub = input.ReadSliceView(5); ASSERT_EQ("abcde", sub.ReadStringView()); ASSERT_EQ(5, input.Position()); - MemorySlice sub2 = input.ReadSlice(5); + MemorySlice sub2 = input.ReadSliceView(5); ASSERT_EQ("fghij", sub2.ReadStringView()); ASSERT_EQ(10, input.Position()); } @@ -420,7 +420,7 @@ TEST_F(MemorySliceTest, TestRoundTripMixedTypes) { int32_t str_len = input.ReadInt(); ASSERT_EQ(4, str_len); - MemorySlice str_slice = input.ReadSlice(str_len); + MemorySlice str_slice = input.ReadSliceView(str_len); ASSERT_EQ("test", str_slice.ReadStringView()); } diff --git a/src/paimon/common/sst/block_cache.h b/src/paimon/common/sst/block_cache.h index a5c3554f1..6fb596ef9 100644 --- a/src/paimon/common/sst/block_cache.h +++ b/src/paimon/common/sst/block_cache.h @@ -93,7 +93,7 @@ class PAIMON_EXPORT BlockCache { Result ReadFrom(int64_t offset, int32_t length) { PAIMON_RETURN_NOT_OK(in_->Seek(offset, SeekOrigin::FS_SEEK_SET)); auto segment = MemorySegment::AllocateHeapMemory(length, pool_.get()); - PAIMON_RETURN_NOT_OK(in_->Read(segment.GetHeapMemory()->data(), length)); + PAIMON_RETURN_NOT_OK(in_->Read(segment.MutableData(), length)); return segment; } diff --git a/src/paimon/common/sst/block_cache_test.cpp b/src/paimon/common/sst/block_cache_test.cpp index a1b212cf8..75d8e0425 100644 --- a/src/paimon/common/sst/block_cache_test.cpp +++ b/src/paimon/common/sst/block_cache_test.cpp @@ -42,8 +42,8 @@ class BlockCacheTest : public ::testing::Test { PAIMON_ASSIGN_OR_RAISE(auto out, fs_->Create(path, false)); for (int32_t i = 0; i < num_blocks; i++) { auto segment = MemorySegment::AllocateHeapMemory(block_size, pool_.get()); - std::memset(segment.GetHeapMemory()->data(), i & 0xFF, block_size); - PAIMON_RETURN_NOT_OK(out->Write(segment.GetHeapMemory()->data(), block_size)); + std::memset(segment.MutableData(), i & 0xFF, block_size); + PAIMON_RETURN_NOT_OK(out->Write(segment.MutableData(), block_size)); } PAIMON_RETURN_NOT_OK(out->Flush()); PAIMON_RETURN_NOT_OK(out->Close()); diff --git a/src/paimon/common/sst/block_entry.h b/src/paimon/common/sst/block_entry.h index b96f49a0c..286d195a2 100644 --- a/src/paimon/common/sst/block_entry.h +++ b/src/paimon/common/sst/block_entry.h @@ -16,8 +16,6 @@ #pragma once -#include - #include "paimon/common/memory/memory_slice.h" #include "paimon/result.h" diff --git a/src/paimon/common/sst/block_iterator.cpp b/src/paimon/common/sst/block_iterator.cpp index db8b1564a..e1a0b074b 100644 --- a/src/paimon/common/sst/block_iterator.cpp +++ b/src/paimon/common/sst/block_iterator.cpp @@ -26,7 +26,7 @@ bool BlockIterator::HasNext() const { return polled_position_ >= 0 || input_.IsReadable(); } -Result> BlockIterator::Next() { +Result BlockIterator::Next() { if (!HasNext()) { return Status::Invalid("no such element"); } @@ -38,17 +38,17 @@ Result> BlockIterator::Next() { return ReadEntry(); } -Result> BlockIterator::ReadEntry() { +Result BlockIterator::ReadEntry() { PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt()); - auto key = input_.ReadSlice(key_length); + auto key = input_.ReadSliceView(key_length); PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt()); - auto value = input_.ReadSlice(value_length); - return std::make_unique(key, value); + auto value = input_.ReadSliceView(value_length); + return BlockEntry(key, value); } Result BlockIterator::ReadKeyAndSkipValue() { PAIMON_ASSIGN_OR_RAISE(int32_t key_length, input_.ReadVarLenInt()); - auto key = input_.ReadSlice(key_length); + auto key = input_.ReadSliceView(key_length); PAIMON_ASSIGN_OR_RAISE(int32_t value_length, input_.ReadVarLenInt()); PAIMON_RETURN_NOT_OK(input_.SetPosition(input_.Position() + value_length)); return key; diff --git a/src/paimon/common/sst/block_iterator.h b/src/paimon/common/sst/block_iterator.h index 18eb934b5..10ee7a43d 100644 --- a/src/paimon/common/sst/block_iterator.h +++ b/src/paimon/common/sst/block_iterator.h @@ -28,9 +28,9 @@ class PAIMON_EXPORT BlockIterator { bool HasNext() const; - Result> Next(); + Result Next(); - Result> ReadEntry(); + Result ReadEntry(); Result SeekTo(const MemorySlice& target_key); diff --git a/src/paimon/common/sst/sst_file_reader.cpp b/src/paimon/common/sst/sst_file_reader.cpp index ad430ceb9..8243048b2 100644 --- a/src/paimon/common/sst/sst_file_reader.cpp +++ b/src/paimon/common/sst/sst_file_reader.cpp @@ -110,8 +110,8 @@ Result> SstFileReader::Lookup(const std::shared_ptrSeekTo(key_slice)); if (success) { - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr ret, current->Next()); - return ret->value.CopyBytes(pool_.get()); + PAIMON_ASSIGN_OR_RAISE(BlockEntry ret, current->Next()); + return ret.value.CopyBytes(pool_.get()); } } return std::shared_ptr(); @@ -119,8 +119,8 @@ Result> SstFileReader::Lookup(const std::shared_ptr> SstFileReader::GetNextBlock( std::unique_ptr& index_iterator) { - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr block_entry, index_iterator->Next()); - auto block_input = block_entry->value.ToInput(); + PAIMON_ASSIGN_OR_RAISE(BlockEntry block_entry, index_iterator->Next()); + auto block_input = block_entry.value.ToInput(); PAIMON_ASSIGN_OR_RAISE(BlockHandle block_handle, BlockHandle::ReadBlockHandle(&block_input)); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr reader, ReadBlock(block_handle, false)); return reader->Iterator(); @@ -147,10 +147,8 @@ Result> SstFileReader::ReadBlock(const BlockHandle& Result SstFileReader::DecompressBlock(const MemorySegment& compressed_data, const std::shared_ptr& trailer, const std::shared_ptr& pool) { - auto input_memory = compressed_data.GetHeapMemory(); - // check crc32c - auto crc32c_code = CRC32C::calculate(input_memory->data(), input_memory->size()); + auto crc32c_code = CRC32C::calculate(compressed_data.Data(), compressed_data.Size()); auto compression_val = static_cast(static_cast(trailer->CompressionType()) & 0xFF); crc32c_code = CRC32C::calculate(&compression_val, 1, crc32c_code); @@ -172,15 +170,15 @@ Result SstFileReader::DecompressBlock(const MemorySegment& compre auto input = slice.ToInput(); PAIMON_ASSIGN_OR_RAISE(int32_t uncompressed_size, input.ReadVarLenInt()); auto output = MemorySegment::AllocateHeapMemory(uncompressed_size, pool.get()); - auto output_memory = output.GetHeapMemory(); + PAIMON_ASSIGN_OR_RAISE( int32_t actual_uncompressed_size, - decompressor->Decompress(input_memory->data() + input.Position(), input.Available(), - output_memory->data(), output_memory->size())); - if (static_cast(actual_uncompressed_size) != output_memory->size()) { + decompressor->Decompress(compressed_data.Data() + input.Position(), input.Available(), + output.MutableData(), output.Size())); + if (actual_uncompressed_size != output.Size()) { return Status::Invalid(fmt::format( "Invalid data: expect uncompressed size {}, actual uncompressed size {}", - output_memory->size(), actual_uncompressed_size)); + output.Size(), actual_uncompressed_size)); } return output; } diff --git a/src/paimon/common/utils/var_length_int_utils.h b/src/paimon/common/utils/var_length_int_utils.h index 7f4829d8c..43c6eda28 100644 --- a/src/paimon/common/utils/var_length_int_utils.h +++ b/src/paimon/common/utils/var_length_int_utils.h @@ -16,16 +16,23 @@ #pragma once +#include +#include + #include "fmt/format.h" -#include "paimon/memory/bytes.h" +#include "paimon/macros.h" #include "paimon/result.h" - namespace paimon { -/// This file is based on source code of LongPacker from the PalDB Project -/// (https://github.com/linkedin/PalDB), licensed by the Apache Software Foundation (ASF) under the -/// Apache License, Version 2.0. See the NOTICE file distributed with this work for additional -/// information regarding copyright ownership. -/// Utils for encoding int/long to var length bytes. + +/// Variable-length integer encoding/decoding utilities. +/// +/// Encoding format (same as protobuf unsigned varint): +/// - Each byte stores 7 payload bits in bits [6:0]. +/// - Bit 7 (0x80) is the continuation flag: 1 = more bytes follow, 0 = last byte. +/// - A varint32 uses at most 5 bytes; a varint64 uses at most 9 bytes. +/// +/// Based on the LongPacker from PalDB (https://github.com/linkedin/PalDB), +/// licensed under Apache 2.0. class VarLengthIntUtils { public: VarLengthIntUtils() = delete; @@ -34,67 +41,85 @@ class VarLengthIntUtils { static constexpr int32_t kMaxVarIntSize = 5; static constexpr int32_t kMaxVarLongSize = 9; - /// Returns encoding bytes length. - static Result EncodeInt(int32_t offset, int32_t value, Bytes* bytes) { - if (value < 0) { + // ==================== Encoding (writes to char*) ==================== + + /// Encodes a non-negative int32 as varint into `dest`. + /// Returns the number of bytes written. + static Result EncodeInt(int32_t value, char* dest) { + if (PAIMON_UNLIKELY(value < 0)) { return Status::Invalid( fmt::format("negative value: v={} for VarLengthInt Encoding", value)); } - - int32_t i = 1; + int32_t num_bytes = 0; while ((value & ~0x7F) != 0) { - (*bytes)[i + offset - 1] = static_cast((value & 0x7F) | 0x80); + dest[num_bytes] = static_cast((value & 0x7F) | 0x80); + value >>= 7; + ++num_bytes; + } + dest[num_bytes] = static_cast(value); + return num_bytes + 1; + } + + /// Encodes a non-negative int64 as varint into `dest`. + /// Returns the number of bytes written. + static Result EncodeLong(int64_t value, char* dest) { + if (PAIMON_UNLIKELY(value < 0)) { + return Status::Invalid( + fmt::format("negative value: v={} for VarLengthInt Encoding", value)); + } + int32_t num_bytes = 0; + while ((value & ~0x7FLL) != 0) { + dest[num_bytes] = static_cast(static_cast(value & 0x7F) | 0x80); value >>= 7; - ++i; + ++num_bytes; } - (*bytes)[i + offset - 1] = static_cast(value); - return i; + dest[num_bytes] = static_cast(value); + return num_bytes + 1; } - static Result DecodeInt(const Bytes* bytes, int32_t* offset) { + // ==================== Decoding (reads from const char*) ==================== + + /// Decodes a varint32 from `data` at `*offset`, advancing `*offset` past the consumed bytes. + /// Inlines a 1-byte fast path (values 0-127), which is the most common case. + static inline Result DecodeInt(const char* data, int32_t* offset) { + auto first_byte = static_cast(data[*offset]); + if (PAIMON_LIKELY((first_byte & 0x80) == 0)) { + ++(*offset); + return static_cast(first_byte); + } + // Multi-byte: fall through to generic loop int32_t result = 0; for (int32_t shift = 0; shift < 32; shift += 7) { - auto b = static_cast((*bytes)[*offset]); + auto byte_val = static_cast(data[*offset]); ++(*offset); - - result |= (b & 0x7Fu) << shift; - - if ((b & 0x80u) == 0) { + result |= static_cast(byte_val & 0x7F) << shift; + if ((byte_val & 0x80) == 0) { return result; } } - return Status::Invalid("Malformed integer for VarLengthInt Decoding"); + return Status::Invalid("Malformed varint32: too many continuation bytes"); } - /// Returns encoding bytes length. - static Result EncodeLong(int64_t value, Bytes* bytes) { - if (value < 0) { - return Status::Invalid( - fmt::format("negative value: v={} for VarLengthInt Encoding", value)); - } - - int32_t i = 1; - while ((value & ~0x7FLL) != 0) { - (*bytes)[i - 1] = static_cast(static_cast(value & 0x7F) | 0x80); - value >>= 7; - ++i; + /// Decodes a varint64 from `data` at `*offset`, advancing `*offset` past the consumed bytes. + /// Inlines a 1-byte fast path (values 0-127), which is the most common case. + static inline Result DecodeLong(const char* data, int32_t* offset) { + auto first_byte = static_cast(data[*offset]); + if (PAIMON_LIKELY((first_byte & 0x80) == 0)) { + ++(*offset); + return static_cast(first_byte); } - (*bytes)[i - 1] = static_cast(value); - return i; - } - - static Result DecodeLong(const Bytes* bytes, int32_t index) { + // Multi-byte: fall through to generic loop int64_t result = 0; for (int32_t shift = 0; shift < 64; shift += 7) { - auto b = static_cast((*bytes)[index++]); - - result |= (b & 0x7FLL) << shift; - - if ((b & 0x80u) == 0) { + auto byte_val = static_cast(data[*offset]); + ++(*offset); + result |= static_cast(byte_val & 0x7F) << shift; + if ((byte_val & 0x80) == 0) { return result; } } - return Status::Invalid("Malformed long for VarLengthInt Decoding"); + return Status::Invalid("Malformed varint64: too many continuation bytes"); } }; + } // namespace paimon diff --git a/src/paimon/common/utils/var_length_int_utils_test.cpp b/src/paimon/common/utils/var_length_int_utils_test.cpp index f66e7fbb1..d1fb9a4ea 100644 --- a/src/paimon/common/utils/var_length_int_utils_test.cpp +++ b/src/paimon/common/utils/var_length_int_utils_test.cpp @@ -16,8 +16,11 @@ #include "paimon/common/utils/var_length_int_utils.h" +#include +#include +#include + #include "gtest/gtest.h" -#include "paimon/memory/memory_pool.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -26,16 +29,13 @@ TEST(VarLengthIntUtilsTest, TestEncodeAndDecodeInt) { 1000, 10000, 100000, 1000000, 10000000, 2147483647}; for (int32_t value : test_values) { - // Encode - auto buffer = - std::make_shared(VarLengthIntUtils::kMaxVarIntSize, GetDefaultPool().get()); - ASSERT_OK_AND_ASSIGN(int32_t encoded_length, - VarLengthIntUtils::EncodeInt(0, value, buffer.get())); + char buffer[VarLengthIntUtils::kMaxVarIntSize]; + std::memset(buffer, 0, sizeof(buffer)); + + ASSERT_OK_AND_ASSIGN(int32_t encoded_length, VarLengthIntUtils::EncodeInt(value, buffer)); - // Decode int32_t offset = 0; - ASSERT_OK_AND_ASSIGN(int32_t decoded_value, - VarLengthIntUtils::DecodeInt(buffer.get(), &offset)); + ASSERT_OK_AND_ASSIGN(int32_t decoded_value, VarLengthIntUtils::DecodeInt(buffer, &offset)); ASSERT_EQ(value, decoded_value) << "Encoded and decoded values don't match for: " << value; ASSERT_EQ(offset, encoded_length) << "Offset doesn't match encoded length for: " << value; @@ -58,53 +58,45 @@ TEST(VarLengthIntUtilsTest, TestEncodeAndDecodeLong) { std::numeric_limits::max()}; for (int64_t value : test_values) { - // Encode - auto buffer = - std::make_shared(VarLengthIntUtils::kMaxVarLongSize, GetDefaultPool().get()); + char buffer[VarLengthIntUtils::kMaxVarLongSize + 1]; + std::memset(buffer, 0, sizeof(buffer)); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t encoded_length, - VarLengthIntUtils::EncodeLong(value, buffer.get())); + VarLengthIntUtils::EncodeLong(value, buffer)); - // Decode - ASSERT_OK_AND_ASSIGN(int64_t decoded_value, - VarLengthIntUtils::DecodeLong(buffer.get(), /*index=*/0)); + int32_t offset = 0; + ASSERT_OK_AND_ASSIGN(int64_t decoded_value, VarLengthIntUtils::DecodeLong(buffer, &offset)); ASSERT_EQ(value, decoded_value) << "Encoded and decoded values don't match for: " << value; } } TEST(VarLengthIntUtilsTest, TestEncodeNegativeValue) { - auto buffer = - std::make_shared(VarLengthIntUtils::kMaxVarIntSize, GetDefaultPool().get()); - ASSERT_NOK_WITH_MSG(VarLengthIntUtils::EncodeInt(0, -1, buffer.get()), + char buffer[VarLengthIntUtils::kMaxVarIntSize]; + ASSERT_NOK_WITH_MSG(VarLengthIntUtils::EncodeInt(-1, buffer), "negative value: v=-1 for VarLengthInt Encoding"); } TEST(VarLengthIntUtilsTest, TestEncodeNegativeLongValue) { - auto buffer = - std::make_shared(VarLengthIntUtils::kMaxVarLongSize, GetDefaultPool().get()); - ASSERT_NOK_WITH_MSG(VarLengthIntUtils::EncodeLong(-1, buffer.get()), + char buffer[VarLengthIntUtils::kMaxVarLongSize + 1]; + ASSERT_NOK_WITH_MSG(VarLengthIntUtils::EncodeLong(-1, buffer), "negative value: v=-1 for VarLengthInt Encoding"); } -TEST(VarLengthIntUtilsTest, TestEncodeIntWithOffset) { - auto buffer = - std::make_shared(VarLengthIntUtils::kMaxVarIntSize * 2, GetDefaultPool().get()); +TEST(VarLengthIntUtilsTest, TestEncodeIntSequential) { + char buffer[VarLengthIntUtils::kMaxVarIntSize * 2]; + std::memset(buffer, 0, sizeof(buffer)); int32_t value1 = 100; int32_t value2 = 200; - // Encode first value at offset 0 - ASSERT_OK_AND_ASSIGN(auto length1, VarLengthIntUtils::EncodeInt(0, value1, buffer.get())); - // Encode second value at offset length1 + ASSERT_OK_AND_ASSIGN(auto length1, VarLengthIntUtils::EncodeInt(value1, buffer)); ASSERT_OK_AND_ASSIGN([[maybe_unused]] auto length2, - VarLengthIntUtils::EncodeInt(length1, value2, buffer.get())); + VarLengthIntUtils::EncodeInt(value2, buffer + length1)); - // Decode both values int32_t offset = 0; - ASSERT_OK_AND_ASSIGN(int32_t decoded_result1, - VarLengthIntUtils::DecodeInt(buffer.get(), &offset)); + ASSERT_OK_AND_ASSIGN(int32_t decoded_result1, VarLengthIntUtils::DecodeInt(buffer, &offset)); ASSERT_EQ(value1, decoded_result1); - ASSERT_OK_AND_ASSIGN(int32_t decoded_result2, - VarLengthIntUtils::DecodeInt(buffer.get(), &offset)); + ASSERT_OK_AND_ASSIGN(int32_t decoded_result2, VarLengthIntUtils::DecodeInt(buffer, &offset)); ASSERT_EQ(value2, decoded_result2); } @@ -118,10 +110,10 @@ TEST(VarLengthIntUtilsTest, TestEncodeBytesNumber) { }; for (int32_t i = 0; i < static_cast(values.size()); ++i) { - auto buffer = - std::make_shared(VarLengthIntUtils::kMaxVarIntSize, GetDefaultPool().get()); + char buffer[VarLengthIntUtils::kMaxVarIntSize]; + std::memset(buffer, 0, sizeof(buffer)); ASSERT_OK_AND_ASSIGN(int32_t encoded_length, - VarLengthIntUtils::EncodeInt(0, values[i], buffer.get())); + VarLengthIntUtils::EncodeInt(values[i], buffer)); ASSERT_EQ(encoded_length, i + 1); } } @@ -140,10 +132,10 @@ TEST(VarLengthIntUtilsTest, TestEncodeLongBytesNumber) { }; for (int32_t i = 0; i < static_cast(values.size()); ++i) { - auto buffer = - std::make_shared(VarLengthIntUtils::kMaxVarLongSize, GetDefaultPool().get()); + char buffer[VarLengthIntUtils::kMaxVarLongSize + 1]; + std::memset(buffer, 0, sizeof(buffer)); ASSERT_OK_AND_ASSIGN(int32_t encoded_length, - VarLengthIntUtils::EncodeLong(values[i], buffer.get())); + VarLengthIntUtils::EncodeLong(values[i], buffer)); ASSERT_EQ(encoded_length, i + 1) << values[i]; } } diff --git a/src/paimon/core/mergetree/lookup/persist_position_processor.h b/src/paimon/core/mergetree/lookup/persist_position_processor.h index 7345b4677..43624b1ec 100644 --- a/src/paimon/core/mergetree/lookup/persist_position_processor.h +++ b/src/paimon/core/mergetree/lookup/persist_position_processor.h @@ -38,7 +38,7 @@ class PersistPositionProcessor : public PersistProcessor { int64_t row_position) const override { auto bytes = std::make_shared(VarLengthIntUtils::kMaxVarLongSize, pool_.get()); PAIMON_ASSIGN_OR_RAISE(int64_t len, - VarLengthIntUtils::EncodeLong(row_position, bytes.get())); + VarLengthIntUtils::EncodeLong(row_position, bytes->data())); std::shared_ptr copy_bytes = Bytes::CopyOf(*bytes, len, pool_.get()); return copy_bytes; } @@ -46,8 +46,9 @@ class PersistPositionProcessor : public PersistProcessor { Result ReadFromDisk(std::shared_ptr key, int32_t level, const std::shared_ptr& value_bytes, const std::string& file_name) const override { + int32_t decode_offset = 0; PAIMON_ASSIGN_OR_RAISE(int64_t row_position, - VarLengthIntUtils::DecodeLong(value_bytes.get(), /*index=*/0)); + VarLengthIntUtils::DecodeLong(value_bytes->data(), &decode_offset)); return FilePosition{file_name, row_position}; }