Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ set(PAIMON_COMMON_SRCS
common/memory/memory_segment.cpp
common/memory/memory_segment_utils.cpp
common/memory/memory_slice.cpp
common/memory/memory_slice_input.cpp
common/memory/memory_slice_output.cpp
common/metrics/metrics_impl.cpp
common/metrics/histogram.cpp
Expand Down
3 changes: 2 additions & 1 deletion src/paimon/common/data/abstract_binary_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ void AbstractBinaryWriter::Grow(int32_t min_capacity) {
if (new_capacity - min_capacity < 0) {
new_capacity = min_capacity;
}
std::shared_ptr<Bytes> new_bytes = Bytes::CopyOf(*(segment_.GetArray()), new_capacity, pool_);
std::shared_ptr<Bytes> new_bytes =
Bytes::CopyOf(*(segment_.GetOrCreateHeapMemory(pool_)), new_capacity, pool_);
segment_ = MemorySegment::Wrap(new_bytes);
AfterGrow();
}
Expand Down
4 changes: 1 addition & 3 deletions src/paimon/common/data/binary_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,6 @@ bool BinaryString::MatchAtOneSeg(const BinaryString& s, int32_t pos) const {
}

std::string_view BinaryString::GetStringView() const {
const auto* bytes = segment_.GetArray();
assert(bytes);
return std::string_view(bytes->data() + offset_, size_in_bytes_);
return std::string_view(segment_.Data() + offset_, size_in_bytes_);
}
} // namespace paimon
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ std::shared_ptr<Bytes> RowCompactedSerializer::RowWriter::CopyBuffer() const {
Status RowCompactedSerializer::RowWriter::WriteUnsignedInt(int32_t value) {
EnsureCapacity(VarLengthIntUtils::kMaxVarIntSize);
PAIMON_ASSIGN_OR_RAISE(int32_t len,
VarLengthIntUtils::EncodeInt(position_, value, buffer_.get()));
VarLengthIntUtils::EncodeInt(value, buffer_->data() + position_));
position_ += len;
return Status::OK();
}
Expand Down Expand Up @@ -577,7 +577,7 @@ Result<const RowKind*> RowCompactedSerializer::RowReader::ReadRowKind() const {

Result<std::string_view> RowCompactedSerializer::RowReader::ReadStringView() {
PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
std::string_view str(segment_.GetArray()->data() + position_, length);
std::string_view str(segment_.Data() + position_, length);
position_ += length;
return str;
}
Expand Down
3 changes: 1 addition & 2 deletions src/paimon/common/data/serializer/row_compacted_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ class RowCompactedSerializer {

private:
Result<int32_t> ReadUnsignedInt() {
const auto* bytes = segment_.GetArray();
return VarLengthIntUtils::DecodeInt(bytes, &position_);
return VarLengthIntUtils::DecodeInt(segment_.Data(), &position_);
}

private:
Expand Down
29 changes: 17 additions & 12 deletions src/paimon/common/global_index/btree/btree_global_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ BTreeGlobalIndexReader::BTreeGlobalIndexReader(
null_bitmap_(std::move(null_bitmap)),
min_key_(min_key),
max_key_(max_key),
key_type_(key_type) {}
key_type_(key_type),
comparator_(KeySerializer::CreateComparator(key_type, pool)) {}

Result<std::shared_ptr<GlobalIndexResult>> BTreeGlobalIndexReader::VisitIsNotNull() {
return std::make_shared<BitmapGlobalIndexResult>(
Expand Down Expand Up @@ -260,9 +261,13 @@ Result<RoaringBitmap64> BTreeGlobalIndexReader::RangeQuery(const std::optional<L
// Create an index block iterator to iterate through data blocks
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> from_bytes,
KeySerializer::SerializeKey(from.value(), key_type_, pool_.get()));
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> to_bytes,
KeySerializer::SerializeKey(to.value(), key_type_, pool_.get()));
MemorySlice from_slice = MemorySlice::Wrap(from_bytes);
MemorySlice to_slice = MemorySlice::Wrap(to_bytes);

auto index_iterator = sst_file_reader_->CreateIndexIterator();
PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool seek_result,
index_iterator->SeekTo(MemorySlice::Wrap(from_bytes)));
PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool seek_result, index_iterator->SeekTo(from_slice));

bool first_block = true;
while (index_iterator->HasNext()) {
Expand All @@ -276,33 +281,33 @@ Result<RoaringBitmap64> BTreeGlobalIndexReader::RangeQuery(const std::optional<L

// For the first block, we need to seek within the block to the exact position
if (first_block) {
PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool found,
data_iterator->SeekTo(MemorySlice::Wrap(from_bytes)));
PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] bool found, data_iterator->SeekTo(from_slice));
first_block = false;
}

// Iterate through entries in the data block
while (data_iterator->HasNext()) {
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<BlockEntry> entry, data_iterator->Next());
PAIMON_ASSIGN_OR_RAISE(
Literal key, KeySerializer::DeserializeKey(entry->key, key_type_, pool_.get()));
PAIMON_ASSIGN_OR_RAISE(int32_t cmp_from, key.CompareTo(from.value()));
PAIMON_ASSIGN_OR_RAISE(BlockEntry entry, data_iterator->Next());

// Compare entry key against from bound
PAIMON_ASSIGN_OR_RAISE(int32_t cmp_from, comparator_(entry.key, from_slice));

// Check lower bound
if (!from_inclusive && cmp_from == 0) {
continue;
}

// Check upper bound
PAIMON_ASSIGN_OR_RAISE(int32_t cmp_to, key.CompareTo(to.value()));
// Compare entry key against to bound
PAIMON_ASSIGN_OR_RAISE(int32_t cmp_to, comparator_(entry.key, to_slice));

if (cmp_to > 0 || (!to_inclusive && cmp_to == 0)) {
return result;
}

PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry->value, &result));
PAIMON_RETURN_NOT_OK(DeserializeRowIds(entry.value, &result));
}
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "arrow/api.h"
#include "paimon/common/global_index/btree/btree_defs.h"
#include "paimon/common/global_index/btree/key_serializer.h"
#include "paimon/common/sst/sst_file_reader.h"
#include "paimon/global_index/global_index_io_meta.h"
#include "paimon/global_index/global_index_reader.h"
Expand Down Expand Up @@ -101,6 +102,7 @@ class BTreeGlobalIndexReader : public GlobalIndexReader,
std::optional<Literal> min_key_;
std::optional<Literal> max_key_;
std::shared_ptr<arrow::DataType> key_type_;
MemorySlice::SliceComparator comparator_;
};

} // namespace paimon
6 changes: 3 additions & 3 deletions src/paimon/common/global_index/btree/btree_index_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ std::shared_ptr<BTreeIndexMeta> BTreeIndexMeta::Deserialize(const std::shared_pt
auto first_key_len = input.ReadInt();
std::shared_ptr<Bytes> first_key;
if (first_key_len) {
first_key = input.ReadSlice(first_key_len).CopyBytes(pool);
first_key = input.ReadSliceView(first_key_len).CopyBytes(pool);
}
auto last_key_len = input.ReadInt();
std::shared_ptr<Bytes> last_key;
if (last_key_len) {
last_key = input.ReadSlice(last_key_len).CopyBytes(pool);
last_key = input.ReadSliceView(last_key_len).CopyBytes(pool);
}
auto has_nulls = input.ReadByte() == static_cast<int8_t>(1);
return std::make_shared<BTreeIndexMeta>(first_key, last_key, has_nulls);
Expand Down Expand Up @@ -62,7 +62,7 @@ std::shared_ptr<Bytes> BTreeIndexMeta::Serialize(paimon::MemoryPool* pool) const
// Write has_nulls
output.WriteValue(static_cast<int8_t>(has_nulls_ ? 1 : 0));

return output.ToSlice().GetHeapMemory();
return output.ToSlice().GetOrCreateHeapMemory(pool);
}

} // namespace paimon
61 changes: 61 additions & 0 deletions src/paimon/common/global_index/btree/key_serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "paimon/common/memory/memory_slice_output.h"
#include "paimon/common/utils/date_time_utils.h"
#include "paimon/common/utils/field_type_utils.h"
#include "paimon/common/utils/fields_comparator.h"
#include "paimon/common/utils/preconditions.h"
#include "paimon/data/decimal.h"
#include "paimon/data/timestamp.h"
Expand Down Expand Up @@ -195,6 +196,66 @@ Result<Literal> KeySerializer::DeserializeKey(const MemorySlice& slice,

MemorySlice::SliceComparator KeySerializer::CreateComparator(
const std::shared_ptr<arrow::DataType>& type, const std::shared_ptr<MemoryPool>& pool) {
// Fast paths for integer and string types: direct value comparison without Literal
// deserialization, avoiding heap allocations entirely.
switch (type->id()) {
case arrow::Type::type::STRING:
return [](const MemorySlice& a, const MemorySlice& b) -> Result<int32_t> {
std::string_view sv_a = a.ReadStringView();
std::string_view sv_b = b.ReadStringView();
int32_t cmp = sv_a.compare(sv_b);
return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
};
case arrow::Type::type::BOOL:
case arrow::Type::type::INT8:
return [](const MemorySlice& a, const MemorySlice& b) -> Result<int32_t> {
int8_t va = a.ReadByte(0);
int8_t vb = b.ReadByte(0);
return (va < vb) ? -1 : (va > vb ? 1 : 0);
};
case arrow::Type::type::INT16:
return [](const MemorySlice& a, const MemorySlice& b) -> Result<int32_t> {
int16_t va = a.ReadShort(0);
int16_t vb = b.ReadShort(0);
return (va < vb) ? -1 : (va > vb ? 1 : 0);
};
case arrow::Type::type::INT32:
case arrow::Type::type::DATE32:
return [](const MemorySlice& a, const MemorySlice& b) -> Result<int32_t> {
int32_t va = a.ReadInt(0);
int32_t vb = b.ReadInt(0);
return (va < vb) ? -1 : (va > vb ? 1 : 0);
};
case arrow::Type::type::INT64:
return [](const MemorySlice& a, const MemorySlice& b) -> Result<int32_t> {
int64_t va = a.ReadLong(0);
int64_t vb = b.ReadLong(0);
return (va < vb) ? -1 : (va > vb ? 1 : 0);
};
case arrow::Type::type::FLOAT:
return [](const MemorySlice& a, const MemorySlice& b) -> Result<int32_t> {
int32_t ia = a.ReadInt(0);
int32_t ib = b.ReadInt(0);
float fa, fb;
memcpy(&fa, &ia, sizeof(float));
memcpy(&fb, &ib, sizeof(float));
return FieldsComparator::CompareFloatingPoint(fa, fb);
};
case arrow::Type::type::DOUBLE:
return [](const MemorySlice& a, const MemorySlice& b) -> Result<int32_t> {
int64_t ia = a.ReadLong(0);
int64_t ib = b.ReadLong(0);
double da, db;
memcpy(&da, &ia, sizeof(double));
memcpy(&db, &ib, sizeof(double));
return FieldsComparator::CompareFloatingPoint(da, db);
};
Comment thread
lxy-9602 marked this conversation as resolved.
default:
break;
}

// Fallback for complex types (TIMESTAMP, DECIMAL, etc.):
// deserialize to Literal and compare.
return
[pool = pool, type = type](const MemorySlice& a, const MemorySlice& b) -> Result<int32_t> {
PAIMON_ASSIGN_OR_RAISE(Literal la, DeserializeKey(a, type, pool.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ Result<std::shared_ptr<GlobalIndexReader>> LazyFilteredBTreeReader::GetOrCreateR

Result<std::shared_ptr<GlobalIndexReader>> LazyFilteredBTreeReader::CreateSingleReader(
const GlobalIndexIOMeta& meta) {
// Create comparator based on field type
auto comparator = KeySerializer::CreateComparator(key_type_, pool_);

// Deserialize min/max keys from meta
Expand Down Expand Up @@ -298,7 +299,7 @@ Result<RoaringBitmap64> LazyFilteredBTreeReader::ReadNullBitmap(
auto slice_input = slice.ToInput();

// Read null bitmap data
auto null_bitmap_bytes = slice_input.ReadSlice(block_handle->Size()).CopyBytes(pool_.get());
auto null_bitmap_bytes = slice_input.ReadSliceView(block_handle->Size()).CopyBytes(pool_.get());

// Calculate and verify CRC32C checksum
uint32_t calculated_crc =
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/io/cache/lru_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class LruCacheTest : public ::testing::Test {
std::shared_ptr<CacheValue> MakeValue(int32_t size, char fill_byte = 0,
CacheCallback callback = {}) const {
auto segment = MemorySegment::AllocateHeapMemory(size, pool_.get());
std::memset(segment.GetHeapMemory()->data(), fill_byte, size);
std::memset(segment.MutableData(), fill_byte, size);
return std::make_shared<CacheValue>(segment, std::move(callback));
}

Expand Down
27 changes: 0 additions & 27 deletions src/paimon/common/memory/memory_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,6 @@ int32_t MemorySegment::Compare(const MemorySegment& seg2, int32_t offset1, int32
return c == 0 ? (len1 - len2) : c;
}

void MemorySegment::SwapBytes(Bytes* temp_buffer, MemorySegment* seg2, int32_t offset1,
int32_t offset2, int32_t len) {
if ((offset1 | offset2 | len | (temp_buffer->size() - len) |
(heap_memory_->size() - (offset1 + len)) |
(seg2->heap_memory_->size() - (offset2 + len))) >= 0) {
// this -> temp buffer
std::memcpy(temp_buffer->data(), heap_memory_->data() + offset1, len);

// other -> this
std::memcpy(heap_memory_->data() + offset1, seg2->heap_memory_->data() + offset2, len);

// temp buffer -> other
std::memcpy(seg2->heap_memory_->data() + offset2, temp_buffer->data(), len);
return;
}
assert(false);

// index is in fact invalid
// return Status::InternalError(
// "IndexOutOfBoundsException",
// fmt::format("offset1={}, offset2={}, len={}, temp buffer size={}, heap
// mem 1 "
// "size={}, heap mem 2 size={}",
// offset1, offset2, len, temp_buffer->size(),
// heap_memory_->size(), seg2->heap_memory_->size()));
}

bool MemorySegment::EqualTo(const MemorySegment& seg2, int32_t offset1, int32_t offset2,
int32_t length) const {
int32_t i = 0;
Expand Down
Loading
Loading