diff --git a/include/paimon/io/byte_order.h b/include/paimon/io/byte_order.h new file mode 100644 index 0000000..299244c --- /dev/null +++ b/include/paimon/io/byte_order.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +namespace paimon { + +#if defined(__s390x__) +#define PAIMON_LITTLEENDIAN 0 +#endif // __s390x__ +#if !defined(PAIMON_LITTLEENDIAN) +#if defined(__GNUC__) || defined(__clang__) || defined(__ICCARM__) +#if (defined(__BIG_ENDIAN__) || (defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__)) +#define PAIMON_LITTLEENDIAN 0 +#else +#define PAIMON_LITTLEENDIAN 1 +#endif // __BIG_ENDIAN__ +#elif defined(_MSC_VER) +#if defined(_M_PPC) +#define PAIMON_LITTLEENDIAN 0 +#else +#define PAIMON_LITTLEENDIAN 1 +#endif +#else +#error Unable to determine endianness, define PAIMON_LITTLEENDIAN. +#endif +#endif // !defined(PAIMON_LITTLEENDIAN) + +enum class ByteOrder : int8_t { PAIMON_BIG_ENDIAN = 1, PAIMON_LITTLE_ENDIAN = 2 }; + +/// Get the byte order of the system. +constexpr ByteOrder SystemByteOrder() { + if (PAIMON_LITTLEENDIAN) { + return ByteOrder::PAIMON_LITTLE_ENDIAN; + } else { + return ByteOrder::PAIMON_BIG_ENDIAN; + } +} + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_segment.cpp b/src/paimon/common/memory/memory_segment.cpp new file mode 100644 index 0000000..be4a8d5 --- /dev/null +++ b/src/paimon/common/memory/memory_segment.cpp @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/memory/memory_segment.h" + +#include + +namespace paimon { + +int32_t MemorySegment::Compare(const MemorySegment& seg2, int32_t offset1, int32_t offset2, + int32_t len) const { + while (len >= 8) { + uint64_t l1 = GetLongBigEndian(offset1); + uint64_t l2 = seg2.GetLongBigEndian(offset2); + + if (l1 != l2) { + return (l1 < l2) ? -1 : 1; + } + + offset1 += 8; + offset2 += 8; + len -= 8; + } + while (len > 0) { + int32_t b1 = Get(offset1) & 0xff; + int32_t b2 = seg2.Get(offset2) & 0xff; + int32_t cmp = b1 - b2; + if (cmp != 0) { + return cmp; + } + offset1++; + offset2++; + len--; + } + return 0; +} + +int32_t MemorySegment::Compare(const MemorySegment& seg2, int32_t offset1, int32_t offset2, + int32_t len1, int32_t len2) const { + const int32_t min_length = std::min(len1, len2); + int32_t c = Compare(seg2, offset1, offset2, min_length); + return c == 0 ? (len1 - len2) : c; +} + +bool MemorySegment::EqualTo(const MemorySegment& seg2, int32_t offset1, int32_t offset2, + int32_t length) const { + int32_t i = 0; + // we assume unaligned accesses are supported. + // Compare 8 bytes at a time. + while (i <= length - 8) { + if (GetValue(offset1 + i) != seg2.GetValue(offset2 + i)) { + return false; + } + i += 8; + } + + // cover the last (length % 8) elements. + while (i < length) { + if (Get(offset1 + i) != seg2.Get(offset2 + i)) { + return false; + } + i += 1; + } + + return true; +} + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_segment.h b/src/paimon/common/memory/memory_segment.h new file mode 100644 index 0000000..a98e5d8 --- /dev/null +++ b/src/paimon/common/memory/memory_segment.h @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/common/utils/math.h" +#include "paimon/io/byte_order.h" +#include "paimon/memory/bytes.h" +#include "paimon/visibility.h" + +namespace paimon { +class MemoryPool; + +/// This class represents a piece of memory. +/// +/// Supports two modes: +/// - Owning mode: holds a shared_ptr for lifetime management. +/// - Non-owning (view) mode: holds a raw pointer to external data. +/// The caller must ensure the underlying memory outlives this segment. +class PAIMON_EXPORT MemorySegment { + public: + MemorySegment() : data_(nullptr), size_(0) {} + + /// Wrap a shared_ptr to create an owning segment. + static MemorySegment Wrap(const std::shared_ptr& buffer) { + return MemorySegment(buffer); + } + + /// Create a non-owning segment that references external memory. + /// The caller must guarantee that `data` remains valid for the lifetime of this segment. + static MemorySegment WrapView(const char* data, int32_t size) { + return MemorySegment(data, size); + } + + static MemorySegment AllocateHeapMemory(int32_t size, MemoryPool* pool) { + assert(pool); + return Wrap(Bytes::AllocateBytes(size, pool)); + } + + MemorySegment(const MemorySegment& other) = default; + MemorySegment& operator=(const MemorySegment& other) = default; + + bool operator==(const MemorySegment& other) const { + if (this == &other) { + return true; + } + if (data_ == other.data_ && size_ == other.size_) { + return true; + } + if (!data_ || !other.data_) { + return false; + } + if (size_ != other.size_) { + return false; + } + return std::memcmp(data_, other.data_, size_) == 0; + } + + inline int32_t Size() const { + return size_; + } + + /// Returns the raw data pointer (valid for both owning and non-owning segments). + inline const char* Data() const { + return data_; + } + + inline char Get(int32_t index) const { + return *(data_ + index); + } + + /// Returns a mutable pointer to the data. Use with caution on non-owning segments. + inline char* MutableData() { + return const_cast(data_); + } + + inline void Put(int32_t index, char b) { + MutableData()[index] = b; + } + + inline void Get(int32_t index, Bytes* dst) const { + return Get(index, dst, /*offset=*/0, dst->size()); + } + + inline void Put(int32_t index, const Bytes& src) { + return Put(index, src, /*offset=*/0, src.size()); + } + + template + inline void Get(int32_t index, T* dst, int32_t offset, int32_t length) const { + assert(static_cast(dst->size()) >= (offset + length)); + assert(size_ >= (index + length)); + std::memcpy(const_cast(dst->data()) + offset, data_ + index, length); + } + + template + inline void Put(int32_t index, const T& src, int32_t offset, int32_t length) { + assert(static_cast(src.size()) >= (offset + length)); + assert(size_ >= (index + length)); + std::memcpy(MutableData() + index, src.data() + offset, length); + } + + template + T GetValue(int32_t index) const { + static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); + T value; + std::memcpy(&value, data_ + index, sizeof(T)); + return value; + } + + template + void PutValue(int32_t index, const T& value) { + static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); + + std::memcpy(MutableData() + index, &value, sizeof(T)); + } + + inline uint64_t GetLongBigEndian(int32_t index) const { + auto value = GetValue(index); + if constexpr (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) { + return EndianSwapValue(value); + } + return value; + } + + void CopyTo(int32_t offset, MemorySegment* target, int32_t target_offset, + int32_t num_bytes) const { + assert(offset >= 0); + assert(target_offset >= 0); + assert(num_bytes >= 0); + + std::memcpy(target->MutableData() + target_offset, data_ + offset, num_bytes); + } + + void CopyToUnsafe(int32_t offset, void* target, int32_t target_offset, + int32_t num_bytes) const { + std::memcpy(static_cast(target) + target_offset, data_ + offset, num_bytes); + } + + int32_t Compare(const MemorySegment& seg2, int32_t offset1, int32_t offset2, int32_t len) const; + + int32_t Compare(const MemorySegment& seg2, int32_t offset1, int32_t offset2, int32_t len1, + int32_t len2) const; + + bool EqualTo(const MemorySegment& seg2, int32_t offset1, int32_t offset2, int32_t length) const; + + std::shared_ptr GetOrCreateHeapMemory(MemoryPool* pool) const { + if (heap_memory_) { + return heap_memory_; + } + if (!data_) { + return nullptr; + } + auto copy = std::make_shared(size_, pool); + std::memcpy(const_cast(copy->data()), data_, size_); + return copy; + } + + private: + /// Owning constructor. + explicit MemorySegment(const std::shared_ptr& heap_memory) + : heap_memory_(heap_memory), + data_(heap_memory->data()), + size_(static_cast(heap_memory->size())) { + assert(heap_memory_); + } + + /// Non-owning constructor. + MemorySegment(const char* data, int32_t size) + : heap_memory_(nullptr), data_(data), size_(size) { + assert(data != nullptr || size == 0); + } + + std::shared_ptr heap_memory_; + const char* data_; + int32_t size_; +}; + +template <> +inline bool MemorySegment::GetValue(int32_t index) const { + return Get(index) != 0; +} + +template <> +inline void MemorySegment::PutValue(int32_t index, const bool& value) { + Put(index, static_cast(value ? 1 : 0)); +} + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_segment_test.cpp b/src/paimon/common/memory/memory_segment_test.cpp new file mode 100644 index 0000000..0501295 --- /dev/null +++ b/src/paimon/common/memory/memory_segment_test.cpp @@ -0,0 +1,760 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/memory/memory_segment.h" + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(MemorySegmentTest, TestByteAccess) { + auto pool = paimon::GetDefaultPool(); + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + for (int32_t i = 0; i < page_size; i++) { + segment.Put(i, static_cast(std::rand())); + } + std::srand(seed); + for (int32_t i = 0; i < page_size; i++) { + ASSERT_EQ(segment.Get(i), static_cast(std::rand())) + << "seed: " << seed << ", idx: " << i; + } + + // test expected correct behavior, random access + + std::srand(seed); + bool* occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % page_size; + if (occupied[pos]) { + continue; + } else { + occupied[pos] = true; + } + segment.Put(pos, static_cast(std::rand())); + } + delete[] occupied; + + std::srand(seed); + occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % page_size; + + if (occupied[pos]) { + continue; + } else { + occupied[pos] = true; + } + + ASSERT_EQ(segment.Get(pos), static_cast(std::rand())) + << "seed: " << seed << ", idx: " << pos; + } + delete[] occupied; +} + +TEST(MemorySegmentTest, TestBooleanAccess) { + auto pool = paimon::GetDefaultPool(); + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + bool* occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % page_size; + if (occupied[pos]) { + continue; + } else { + occupied[pos] = true; + } + segment.PutValue(pos, static_cast(std::rand() % 2)); + } + delete[] occupied; + + std::srand(seed); + occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % page_size; + if (occupied[pos]) { + continue; + } else { + occupied[pos] = true; + } + + ASSERT_EQ(segment.GetValue(pos), static_cast(std::rand() % 2)) + << "seed: " << seed << ", idx: " << pos; + } + delete[] occupied; +} + +TEST(MemorySegmentTest, TestEqualTo) { + auto pool = paimon::GetDefaultPool(); + int32_t page_size = 64 * 1024; + MemorySegment seg1 = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + MemorySegment seg2 = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + Bytes reference_array(page_size, pool.get()); + seg1.Put(0, reference_array); + seg2.Put(0, reference_array); + + int32_t i = paimon::test::RandomNumber(0, (page_size - 8) - 1); + seg1.Put(i, static_cast(10)); + ASSERT_FALSE(seg1.EqualTo(seg2, i, i, 9)) << "rand value:" << i; + + seg1.Put(i, static_cast(0)); + ASSERT_TRUE(seg1.EqualTo(seg2, i, i, 9)) << "rand value:" << i; + + seg1.Put(i + 8, static_cast(10)); + ASSERT_FALSE(seg1.EqualTo(seg2, i, i, 9)) << "rand value:" << i; +} + +TEST(MemorySegmentTest, TestCompare) { + auto pool = paimon::GetDefaultPool(); + int32_t page_size = 64 * 1024; + MemorySegment seg1 = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + MemorySegment seg2 = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + Bytes reference_array(page_size, pool.get()); + seg1.Put(0, reference_array); + seg2.Put(0, reference_array); + + int32_t i = paimon::test::RandomNumber(0, (page_size - 8) - 1); + seg1.Put(i, static_cast(10)); + ASSERT_GT(seg1.Compare(seg2, i, i, 9, 9), 0); + + seg1.Put(i, static_cast(0)); + ASSERT_EQ(seg1.Compare(seg2, i, i, 9, 9), 0); + + seg2.Put(i + 8, static_cast(10)); + ASSERT_EQ(seg1.Compare(seg2, i, i, 7, 7), 0); + ASSERT_LT(seg1.Compare(seg2, i, i, 9, 9), 0); + + // Verify big-endian byte-order comparison semantics within a single 8-byte block. + // On little-endian machines, naive native-endian uint64 comparison would give wrong results. + MemorySegment seg3 = MemorySegment::AllocateHeapMemory(16, pool.get()); + MemorySegment seg4 = MemorySegment::AllocateHeapMemory(16, pool.get()); + Bytes zeros(16, pool.get()); + seg3.Put(0, zeros); + seg4.Put(0, zeros); + + // seg3: [0x00, 0x01, 0, 0, 0, 0, 0, 0] at offset 0 + // seg4: [0x01, 0x00, 0, 0, 0, 0, 0, 0] at offset 0 + // Lexicographic (byte-order) comparison: first byte 0x00 < 0x01, so seg3 < seg4. + seg3.Put(1, static_cast(0x01)); + seg4.Put(0, static_cast(0x01)); + ASSERT_LT(seg3.Compare(seg4, 0, 0, 8), 0); + ASSERT_GT(seg4.Compare(seg3, 0, 0, 8), 0); + + // seg3: [0x01, 0x02, 0, 0, 0, 0, 0, 0] + // seg4: [0x01, 0x01, 0, 0, 0, 0, 0, 0] + // First bytes equal (0x01 == 0x01), second byte 0x02 > 0x01, so seg3 > seg4. + seg3.Put(0, static_cast(0x01)); + seg3.Put(1, static_cast(0x02)); + seg4.Put(0, static_cast(0x01)); + seg4.Put(1, static_cast(0x01)); + ASSERT_GT(seg3.Compare(seg4, 0, 0, 8), 0); + ASSERT_LT(seg4.Compare(seg3, 0, 0, 8), 0); +} + +TEST(MemorySegmentTest, TestCharAccess) { + auto pool = paimon::GetDefaultPool(); + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + + for (int32_t i = 0; i <= page_size - 2; i += 2) { + segment.PutValue(i, static_cast(std::rand() % (CHAR_MAX))); + } + + std::srand(seed); + for (int32_t i = 0; i <= page_size - 2; i += 2) { + ASSERT_EQ(segment.GetValue(i), static_cast(std::rand() % (CHAR_MAX))) + << "seed: " << seed << ", idx: " << i; + } + + // test expected correct behavior, random access + + std::srand(seed); + bool* occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 1); + if (occupied[pos] || occupied[pos + 1]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + } + segment.PutValue(pos, static_cast(std::rand() % (CHAR_MAX))); + } + delete[] occupied; + + std::srand(seed); + occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 1); + if (occupied[pos] || occupied[pos + 1]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + } + + ASSERT_EQ(segment.GetValue(pos), static_cast(std::rand() % (CHAR_MAX))) + << "seed: " << seed << ", idx:" << pos; + } + delete[] occupied; +} + +TEST(MemorySegmentTest, TestShortAccess) { + auto pool = paimon::GetDefaultPool(); + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + + for (int32_t i = 0; i <= page_size - 2; i += 2) { + segment.PutValue(i, static_cast(std::rand())); + } + + std::srand(seed); + for (int32_t i = 0; i <= page_size - 2; i += 2) { + ASSERT_EQ(segment.GetValue(i), static_cast(std::rand())) + << "seed: " << seed << ", idx:" << i; + } + + // test expected correct behavior, random access + + std::srand(seed); + bool* occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 1); + if (occupied[pos] || occupied[pos + 1]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + } + segment.PutValue(pos, static_cast(std::rand())); + } + delete[] occupied; + + std::srand(seed); + occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 1); + if (occupied[pos] || occupied[pos + 1]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + } + + ASSERT_EQ(segment.GetValue(pos), static_cast(std::rand())) + << "seed: " << seed << ", idx:" << pos; + } + delete[] occupied; +} + +TEST(MemorySegmentTest, TestIntAccess) { + auto pool = paimon::GetDefaultPool(); + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + + for (int32_t i = 0; i <= page_size - 4; i += 4) { + segment.PutValue(i, static_cast(std::rand())); + } + + std::srand(seed); + for (int32_t i = 0; i <= page_size - 4; i += 4) { + ASSERT_EQ(segment.GetValue(i), static_cast(std::rand())) + << "seed: " << seed << ", idx:" << i; + } + + // test expected correct behavior, random access + + std::srand(seed); + bool* occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 3); + if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || occupied[pos + 3]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + occupied[pos + 2] = true; + occupied[pos + 3] = true; + } + segment.PutValue(pos, static_cast(std::rand())); + } + delete[] occupied; + + std::srand(seed); + occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 3); + if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || occupied[pos + 3]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + occupied[pos + 2] = true; + occupied[pos + 3] = true; + } + + ASSERT_EQ(segment.GetValue(pos), static_cast(std::rand())) + << "seed: " << seed << ", idx:" << pos; + } + delete[] occupied; +} + +TEST(MemorySegmentTest, TestLongAccess) { + auto pool = paimon::GetDefaultPool(); + auto lrand = []() -> int64_t { + return (static_cast(std::rand()) << (sizeof(int32_t) * 8)) | std::rand(); + }; + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + + for (int32_t i = 0; i <= page_size - 8; i += 8) { + segment.PutValue(i, lrand()); + } + + std::srand(seed); + for (int32_t i = 0; i <= page_size - 8; i += 8) { + ASSERT_EQ(segment.GetValue(i), lrand()) << "seed: " << seed << ", idx:" << i; + } + + // test expected correct behavior, random access + + std::srand(seed); + bool* occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 7); + if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || occupied[pos + 3] || + occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] || occupied[pos + 7]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + occupied[pos + 2] = true; + occupied[pos + 3] = true; + occupied[pos + 4] = true; + occupied[pos + 5] = true; + occupied[pos + 6] = true; + occupied[pos + 7] = true; + } + segment.PutValue(pos, lrand()); + } + delete[] occupied; + + std::srand(seed); + occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 7); + if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || occupied[pos + 3] || + occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] || occupied[pos + 7]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + occupied[pos + 2] = true; + occupied[pos + 3] = true; + occupied[pos + 4] = true; + occupied[pos + 5] = true; + occupied[pos + 6] = true; + occupied[pos + 7] = true; + } + + ASSERT_EQ(segment.GetValue(pos), lrand()) << "seed: " << seed << ", idx:" << pos; + } + delete[] occupied; +} + +TEST(MemorySegmentTest, TestFloatAccess) { + auto pool = paimon::GetDefaultPool(); + auto frand = []() -> int64_t { + return (static_cast(std::rand()) / static_cast(RAND_MAX)); + }; + + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + + for (int32_t i = 0; i <= page_size - 4; i += 4) { + segment.PutValue(i, frand()); + } + + std::srand(seed); + for (int32_t i = 0; i <= page_size - 4; i += 4) { + ASSERT_EQ(segment.GetValue(i), frand()) << "seed: " << seed << ", idx:" << i; + } + + // test expected correct behavior, random access + + std::srand(seed); + bool* occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 3); + if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || occupied[pos + 3]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + occupied[pos + 2] = true; + occupied[pos + 3] = true; + } + segment.PutValue(pos, frand()); + } + delete[] occupied; + + std::srand(seed); + occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 3); + if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || occupied[pos + 3]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + occupied[pos + 2] = true; + occupied[pos + 3] = true; + } + + ASSERT_EQ(segment.GetValue(pos), frand()) << "seed: " << seed << ", idx:" << pos; + } + delete[] occupied; +} + +TEST(MemorySegmentTest, TestDoubleAccess) { + auto pool = paimon::GetDefaultPool(); + auto lrand = []() -> int64_t { + return (static_cast(std::rand()) << (sizeof(int32_t) * 8)) | std::rand(); + }; + auto drand = [&]() -> int64_t { + return (static_cast(lrand()) / + static_cast(std::numeric_limits::max())); + }; + + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + + for (int32_t i = 0; i <= page_size - 8; i += 8) { + segment.PutValue(i, drand()); + } + + std::srand(seed); + for (int32_t i = 0; i <= page_size - 8; i += 8) { + ASSERT_EQ(segment.GetValue(i), drand()) << "seed: " << seed << ", idx:" << i; + } + + // test expected correct behavior, random access + + std::srand(seed); + bool* occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 7); + if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || occupied[pos + 3] || + occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] || occupied[pos + 7]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + occupied[pos + 2] = true; + occupied[pos + 3] = true; + occupied[pos + 4] = true; + occupied[pos + 5] = true; + occupied[pos + 6] = true; + occupied[pos + 7] = true; + } + segment.PutValue(pos, drand()); + } + delete[] occupied; + + std::srand(seed); + occupied = new bool[page_size]; + std::memset(occupied, 0, page_size * sizeof(bool)); + + for (int32_t i = 0; i < 1000; i++) { + int32_t pos = std::rand() % (page_size - 7); + if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || occupied[pos + 3] || + occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] || occupied[pos + 7]) { + continue; + } else { + occupied[pos] = true; + occupied[pos + 1] = true; + occupied[pos + 2] = true; + occupied[pos + 3] = true; + occupied[pos + 4] = true; + occupied[pos + 5] = true; + occupied[pos + 6] = true; + occupied[pos + 7] = true; + } + + ASSERT_EQ(segment.GetValue(pos), drand()) << "seed: " << seed << ", idx:" << pos; + } + delete[] occupied; +} + +// ------------------------------------------------------------------------ +// Bulk Byte Movements +// ------------------------------------------------------------------------ + +TEST(MemorySegmentTest, TestBulkByteAccess) { + auto pool = paimon::GetDefaultPool(); + // test expected correct behavior with default offset / length + auto rand_bytes = [&](int32_t size) -> std::shared_ptr { + auto bytes = Bytes::AllocateBytes(size, pool.get()); + for (int32_t i = 0; i < static_cast(bytes->size()); i++) { + (*bytes)[i] = static_cast(std::rand()); + } + return bytes; + }; + { + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + for (int32_t i = 0; i < 8; i++) { + auto src = rand_bytes(page_size / 8); + segment.Put(i * (page_size / 8), *src); + } + + std::srand(seed); + + for (int32_t i = 0; i < 8; i++) { + std::shared_ptr expected = rand_bytes(page_size / 8); + std::shared_ptr actual = Bytes::AllocateBytes(page_size / 8, pool.get()); + segment.Get(i * (page_size / 8), actual.get()); + ASSERT_EQ(expected->size(), actual->size()) << "seed: " << seed << ", i:" << i; + ASSERT_EQ(std::memcmp(expected->data(), actual->data(), expected->size()), 0) + << "seed: " << seed << ", i:" << i; + } + } + + // test expected correct behavior with specific offset / length + { + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + std::shared_ptr expected = rand_bytes(page_size); + + for (int32_t i = 0; i < 16; i++) { + segment.Put(i * (page_size / 16), *expected, i * (page_size / 16), page_size / 16); + } + auto actual = Bytes::AllocateBytes(page_size, pool.get()); + for (int32_t i = 0; i < 16; i++) { + segment.Get(i * (page_size / 16), actual.get(), i * (page_size / 16), page_size / 16); + } + ASSERT_EQ(expected->size(), actual->size()) << "seed: " << seed; + ASSERT_EQ(std::memcmp(expected->data(), actual->data(), expected->size()), 0) + << "seed: " << seed; + } + // put segments of various lengths to various positions + { + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + auto expected = Bytes::AllocateBytes(page_size, pool.get()); + segment.Put(0, *expected, 0, page_size); + for (int32_t i = 0; i < 200; i++) { + int32_t num_bytes = std::rand() % (page_size - 10) + 1; + int32_t pos = std::rand() % (page_size - num_bytes + 1); + std::shared_ptr data = rand_bytes((std::rand() % 3 + 1) * num_bytes); + int32_t data_start_pos = std::rand() % (data->size() - num_bytes + 1); + + // copy to the expected + std::memcpy(expected->data() + pos, data->data() + data_start_pos, num_bytes); + + // put to the memory segment + segment.Put(pos, *data, data_start_pos, num_bytes); + } + + auto validation = Bytes::AllocateBytes(page_size, pool.get()); + segment.Get(0, validation.get()); + ASSERT_EQ(std::memcmp(validation->data(), expected->data(), expected->size()), 0) + << "seed: " << seed; + } + // get segments with various contents + { + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + int32_t page_size = 64 * 1024; + MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, pool.get()); + std::shared_ptr contents = rand_bytes(page_size); + segment.Put(0, *contents); + + for (int32_t i = 0; i < 200; i++) { + int32_t num_bytes = std::rand() % (page_size / 8) + 1; + int32_t pos = std::rand() % (page_size - num_bytes + 1); + std::shared_ptr data = rand_bytes((std::rand() % 3 + 1) * num_bytes); + int32_t data_start_pos = std::rand() % (data->size() - num_bytes + 1); + + segment.Get(pos, data.get(), data_start_pos, num_bytes); + Bytes expected(num_bytes, pool.get()); + std::memcpy(expected.data(), contents->data() + pos, num_bytes); + Bytes validation(num_bytes, pool.get()); + std::memcpy(validation.data(), data->data() + data_start_pos, num_bytes); + ASSERT_EQ(std::memcmp(validation.data(), expected.data(), expected.size()), 0) + << "seed: " << seed << ", i:" << i; + } + } +} + +TEST(MemorySegmentTest, TestEqual) { + auto pool = paimon::GetDefaultPool(); + auto seg1 = MemorySegment::Wrap(std::make_shared("abcd", pool.get())); + auto seg2 = MemorySegment::Wrap(std::make_shared("abce", pool.get())); + auto seg3 = MemorySegment::Wrap(std::make_shared("abcd", pool.get())); + ASSERT_EQ(seg1, seg1); + ASSERT_EQ(seg1, seg3); + ASSERT_FALSE(seg1 == seg2); + ASSERT_FALSE(seg2 == seg1); +} + +TEST(MemorySegmentTest, TestNonOwningWrapView) { + // Prepare owning data as source + auto pool = paimon::GetDefaultPool(); + std::string source_data = "Hello, WrapView MemorySegment!"; + auto owning_bytes = std::make_shared(source_data, pool.get()); + const char* raw_ptr = owning_bytes->data(); + auto raw_size = static_cast(owning_bytes->size()); + + // Create non-owning segment via WrapView + auto seg = MemorySegment::WrapView(raw_ptr, raw_size); + + // --- Data() / Size() --- + ASSERT_EQ(seg.Data(), raw_ptr); + ASSERT_EQ(seg.Size(), raw_size); + + // --- Get(index) --- + for (int32_t i = 0; i < raw_size; ++i) { + ASSERT_EQ(seg.Get(i), source_data[i]); + } + + // --- GetValue --- + // Read first 4 bytes as int32 + int32_t expected_int; + std::memcpy(&expected_int, raw_ptr, sizeof(int32_t)); + ASSERT_EQ(seg.GetValue(0), expected_int); + + // Read first 8 bytes as int64 + int64_t expected_long; + std::memcpy(&expected_long, raw_ptr, sizeof(int64_t)); + ASSERT_EQ(seg.GetValue(0), expected_long); + + // --- MutableData() + Put(index, char) --- + char original_char = seg.Get(0); + seg.Put(0, 'X'); + ASSERT_EQ(seg.Get(0), 'X'); + seg.Put(0, original_char); // restore + ASSERT_EQ(seg.Get(0), original_char); + + // --- Put(index, src, offset, length) --- + std::string patch = "AB"; + seg.Put(0, patch, 0, 2); + ASSERT_EQ(seg.Get(0), 'A'); + ASSERT_EQ(seg.Get(1), 'B'); + + // --- PutValue --- + int16_t val16 = 0x1234; + seg.PutValue(0, val16); + ASSERT_EQ(seg.GetValue(0), val16); + + // --- Compare --- + auto seg2 = MemorySegment::WrapView(raw_ptr, raw_size); + // Both point to same data (we mutated seg's underlying data, seg2 sees it too) + ASSERT_EQ(seg.Compare(seg2, 0, 0, raw_size), 0); + + // --- EqualTo --- + ASSERT_TRUE(seg.EqualTo(seg2, 2, 2, raw_size - 2)); + + // --- CopyTo owning target --- + auto target = MemorySegment::AllocateHeapMemory(raw_size, pool.get()); + seg.CopyTo(0, &target, 0, raw_size); + ASSERT_EQ(seg.Compare(target, 0, 0, raw_size), 0); + + // --- CopyToUnsafe --- + std::vector buf(raw_size); + seg.CopyToUnsafe(0, buf.data(), 0, raw_size); + ASSERT_EQ(std::memcmp(buf.data(), seg.Data(), raw_size), 0); + + // --- GetOrCreateHeapMemory on non-owning: should copy --- + auto heap = seg.GetOrCreateHeapMemory(pool.get()); + ASSERT_NE(heap, nullptr); + ASSERT_EQ(static_cast(heap->size()), raw_size); + // Returned copy should be independent: modifying seg shouldn't affect heap + seg.Put(0, 'Z'); + ASSERT_NE((*heap)[0], 'Z'); + + // --- operator== --- + auto seg3 = MemorySegment::WrapView(seg.Data(), seg.Size()); + ASSERT_EQ(seg, seg3); +} +} // namespace paimon::test diff --git a/src/paimon/common/memory/memory_segment_utils.cpp b/src/paimon/common/memory/memory_segment_utils.cpp new file mode 100644 index 0000000..2801854 --- /dev/null +++ b/src/paimon/common/memory/memory_segment_utils.cpp @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/memory/memory_segment_utils.h" + +#include + +#include "paimon/common/utils/murmurhash_utils.h" + +namespace paimon { +std::shared_ptr MemorySegmentUtils::AllocateBytes(int32_t length, MemoryPool* pool) { + return Bytes::AllocateBytes(length, pool); +} + +void MemorySegmentUtils::CopyFromBytes(std::vector* segments, int32_t offset, + const Bytes& bytes, int32_t bytes_offset, + int32_t num_bytes) { + if (segments->size() == 1) { + (*segments)[0].Put(offset, bytes, bytes_offset, num_bytes); + } else { + CopyMultiSegmentsFromBytes(segments, offset, bytes, bytes_offset, num_bytes); + } +} + +void MemorySegmentUtils::CopyMultiSegmentsFromBytes(std::vector* segments, + int32_t offset, const Bytes& bytes, + int32_t bytes_offset, int32_t num_bytes) { + int32_t remain_size = num_bytes; + for (auto& segment : (*segments)) { + int32_t remain = segment.Size() - offset; + if (remain > 0) { + int32_t n_copy = std::min(remain, remain_size); + segment.Put(offset, bytes, num_bytes - remain_size + bytes_offset, n_copy); + remain_size -= n_copy; + // next new segment. + offset = 0; + if (remain_size == 0) { + return; + } + } else { + // remain is negative, let's advance to next segment + // now the offset = offset - segmentSize (-remain) + offset = -remain; + } + } +} + +PAIMON_UNIQUE_PTR MemorySegmentUtils::CopyToBytes(const std::vector& segments, + int32_t offset, int32_t num_bytes, + MemoryPool* pool) { + assert(pool); + auto bytes = Bytes::AllocateBytes(num_bytes, pool); + CopyToBytes(segments, offset, bytes.get(), 0, num_bytes); + return bytes; +} + +void MemorySegmentUtils::CopyToUnsafe(const std::vector& segments, int32_t offset, + void* target, int32_t num_bytes) { + if (InFirstSegment(segments, offset, num_bytes)) { + segments[0].CopyToUnsafe(offset, target, 0, num_bytes); + } else { + CopyMultiSegmentsToUnsafe(segments, offset, target, num_bytes); + } +} + +void MemorySegmentUtils::CopyMultiSegmentsToUnsafe(const std::vector& segments, + int32_t offset, void* target, + int32_t num_bytes) { + int32_t remain_size = num_bytes; + for (const auto& segment : segments) { + int32_t remain = segment.Size() - offset; + if (remain > 0) { + int32_t n_copy = std::min(remain, remain_size); + segment.CopyToUnsafe(offset, target, num_bytes - remain_size, n_copy); + remain_size -= n_copy; + // next new segment. + offset = 0; + if (remain_size == 0) { + return; + } + } else { + // remain is negative, let's advance to next segment + // now the offset = offset - segmentSize (-remain) + offset = -remain; + } + } +} + +std::shared_ptr MemorySegmentUtils::GetBytes(const std::vector& segments, + int32_t base_offset, int32_t size_in_bytes, + MemoryPool* pool) { + // avoid copy if `base` is `byte[]` + if (segments.size() == 1) { + std::shared_ptr heap_memory = segments[0].GetOrCreateHeapMemory(pool); + if (base_offset == 0 && heap_memory != nullptr && + static_cast(heap_memory->size()) == size_in_bytes) { + return heap_memory; + } else { + std::shared_ptr bytes = Bytes::AllocateBytes(size_in_bytes, pool); + segments[0].Get(base_offset, bytes.get(), 0, size_in_bytes); + return bytes; + } + } else { + std::shared_ptr bytes = Bytes::AllocateBytes(size_in_bytes, pool); + CopyMultiSegmentsToBytes(segments, base_offset, bytes.get(), 0, size_in_bytes); + return bytes; + } +} + +bool MemorySegmentUtils::InFirstSegment(const std::vector& segments, int32_t offset, + int32_t num_bytes) { + return num_bytes + offset <= segments[0].Size(); +} + +int32_t MemorySegmentUtils::ByteIndex(int32_t bit_index) { + return (static_cast(bit_index)) >> ADDRESS_BITS_PER_WORD; +} + +void MemorySegmentUtils::BitUnSet(MemorySegment* segment, int32_t base_offset, int32_t index) { + int32_t offset = base_offset + ByteIndex(index); + char current = segment->Get(offset); + current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); + segment->Put(offset, current); +} + +void MemorySegmentUtils::BitSet(MemorySegment* segment, int32_t base_offset, int32_t index) { + int32_t offset = base_offset + ByteIndex(index); + char current = segment->Get(offset); + current |= (1 << (index & BIT_BYTE_INDEX_MASK)); + segment->Put(offset, current); +} + +bool MemorySegmentUtils::BitGet(const MemorySegment& segment, int32_t base_offset, int32_t index) { + int32_t offset = base_offset + ByteIndex(index); + char current = segment.Get(offset); + return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0; +} + +void MemorySegmentUtils::BitSet(std::vector* segments, int32_t base_offset, + int32_t index) { + if (segments->size() == 1) { + int32_t offset = base_offset + ByteIndex(index); + MemorySegment& segment = (*segments)[0]; + char current = segment.Get(offset); + current |= (1 << (index & BIT_BYTE_INDEX_MASK)); + segment.Put(offset, current); + } else { + BitSetMultiSegments(segments, base_offset, index); + } +} + +void MemorySegmentUtils::BitSetMultiSegments(std::vector* segments, + int32_t base_offset, int32_t index) { + int32_t offset = base_offset + ByteIndex(index); + int32_t seg_size = (*segments)[0].Size(); + int32_t seg_index = offset / seg_size; + int32_t seg_offset = offset - seg_index * seg_size; // equal to % + MemorySegment& segment = (*segments)[seg_index]; + + char current = segment.Get(seg_offset); + current |= (1 << (index & BIT_BYTE_INDEX_MASK)); + segment.Put(seg_offset, current); +} + +bool MemorySegmentUtils::BitGet(const std::vector& segments, int32_t base_offset, + int32_t index) { + int32_t offset = base_offset + ByteIndex(index); + char current = GetValue(segments, offset); + return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0; +} + +void MemorySegmentUtils::BitUnSet(std::vector* segments, int32_t base_offset, + int32_t index) { + if (segments->size() == 1) { + MemorySegment& segment = (*segments)[0]; + int32_t offset = base_offset + ByteIndex(index); + char current = segment.Get(offset); + current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); + segment.Put(offset, current); + } else { + BitUnSetMultiSegments(segments, base_offset, index); + } +} + +void MemorySegmentUtils::BitUnSetMultiSegments(std::vector* segments, + int32_t base_offset, int32_t index) { + int32_t offset = base_offset + ByteIndex(index); + int32_t seg_size = (*segments)[0].Size(); + int32_t seg_index = offset / seg_size; + int32_t seg_offset = offset - seg_index * seg_size; // equal to % + MemorySegment& segment = (*segments)[seg_index]; + + char current = segment.Get(seg_offset); + current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); + segment.Put(seg_offset, current); +} + +bool MemorySegmentUtils::Equals(const std::vector& segments1, int32_t offset1, + const std::vector& segments2, int32_t offset2, + int32_t len) { + if (InFirstSegment(segments1, offset1, len) && InFirstSegment(segments2, offset2, len)) { + return segments1[0].EqualTo(segments2[0], offset1, offset2, len); + } else { + return EqualsMultiSegments(segments1, offset1, segments2, offset2, len); + } +} + +bool MemorySegmentUtils::EqualsMultiSegments(const std::vector& segments1, + int32_t offset1, + const std::vector& segments2, + int32_t offset2, int32_t len) { + if (len == 0) { + // quick way and avoid seg_size is zero. + return true; + } + + int32_t seg_size1 = segments1[0].Size(); + int32_t seg_size2 = segments2[0].Size(); + + // find first seg_index and seg_offset of segments. + int32_t seg_index1 = offset1 / seg_size1; + int32_t seg_index2 = offset2 / seg_size2; + int32_t seg_offset1 = offset1 - seg_size1 * seg_index1; // equal to % + int32_t seg_offset2 = offset2 - seg_size2 * seg_index2; // equal to % + + while (len > 0) { + int32_t equal_len = + std::min(std::min(len, seg_size1 - seg_offset1), seg_size2 - seg_offset2); + if (!segments1[seg_index1].EqualTo(segments2[seg_index2], seg_offset1, seg_offset2, + equal_len)) { + return false; + } + len -= equal_len; + seg_offset1 += equal_len; + if (seg_offset1 == seg_size1) { + seg_offset1 = 0; + seg_index1++; + } + seg_offset2 += equal_len; + if (seg_offset2 == seg_size2) { + seg_offset2 = 0; + seg_index2++; + } + } + return true; +} + +int32_t MemorySegmentUtils::Find(const std::vector& segments1, int32_t offset1, + int32_t num_bytes1, const std::vector& segments2, + int32_t offset2, int32_t num_bytes2) { + if (num_bytes2 == 0) { // quick way 1. + return offset1; + } + if (InFirstSegment(segments1, offset1, num_bytes1) && + InFirstSegment(segments2, offset2, num_bytes2)) { + char first = segments2[0].Get(offset2); + int32_t end = num_bytes1 - num_bytes2 + offset1; + for (int32_t i = offset1; i <= end; i++) { + // quick way 2: equal first byte. + if (segments1[0].Get(i) == first && + segments1[0].EqualTo(segments2[0], i, offset2, num_bytes2)) { + return i; + } + } + return -1; + } else { + return FindInMultiSegments(segments1, offset1, num_bytes1, segments2, offset2, num_bytes2); + } +} +int32_t MemorySegmentUtils::FindInMultiSegments(const std::vector& segments1, + int32_t offset1, int32_t num_bytes1, + const std::vector& segments2, + int32_t offset2, int32_t num_bytes2) { + int32_t end = num_bytes1 - num_bytes2 + offset1; + for (int32_t i = offset1; i <= end; i++) { + if (EqualsMultiSegments(segments1, i, segments2, offset2, num_bytes2)) { + return i; + } + } + return -1; +} + +int32_t MemorySegmentUtils::Hash(const std::vector& segments, int32_t offset, + int32_t num_bytes, MemoryPool* pool) { + if (InFirstSegment(segments, offset, num_bytes)) { + return MurmurHashUtils::HashBytes(segments[0], offset, num_bytes); + } else { + return HashMultiSeg(segments, offset, num_bytes, pool); + } +} + +int32_t MemorySegmentUtils::HashByWords(const std::vector& segments, int32_t offset, + int32_t num_bytes, MemoryPool* pool) { + if (InFirstSegment(segments, offset, num_bytes)) { + return MurmurHashUtils::HashBytesByWords(segments[0], offset, num_bytes); + } else { + return HashMultiSegByWords(segments, offset, num_bytes, pool); + } +} + +int32_t MemorySegmentUtils::HashMultiSegByWords(const std::vector& segments, + int32_t offset, int32_t num_bytes, + MemoryPool* pool) { + std::shared_ptr bytes = AllocateBytes(num_bytes, pool); + CopyMultiSegmentsToBytes(segments, offset, bytes.get(), 0, num_bytes); + return MurmurHashUtils::HashUnsafeBytesByWords(reinterpret_cast(bytes->data()), 0, + num_bytes); +} + +int32_t MemorySegmentUtils::HashMultiSeg(const std::vector& segments, int32_t offset, + int32_t num_bytes, MemoryPool* pool) { + std::shared_ptr bytes = AllocateBytes(num_bytes, pool); + CopyMultiSegmentsToBytes(segments, offset, bytes.get(), 0, num_bytes); + + return MurmurHashUtils::HashUnsafeBytes(reinterpret_cast(bytes->data()), 0, num_bytes); +} + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_segment_utils.h b/src/paimon/common/memory/memory_segment_utils.h new file mode 100644 index 0000000..6208ff5 --- /dev/null +++ b/src/paimon/common/memory/memory_segment_utils.h @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/io/byte_order.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { + +class PAIMON_EXPORT MemorySegmentUtils { + public: + MemorySegmentUtils() = delete; + ~MemorySegmentUtils() = delete; + + /// Allocate bytes in pool + static std::shared_ptr AllocateBytes(int32_t length, MemoryPool* pool); + + /// Copy target segments from source byte[]. + /// + /// @param segments target segments. + /// @param offset target segments offset. + /// @param bytes source byte[]. + /// @param bytes_offset source byte[] offset. + /// @param num_bytes the number bytes to copy. + static void CopyFromBytes(std::vector* segments, int32_t offset, + const Bytes& bytes, int32_t bytes_offset, int32_t num_bytes); + + /// Copy segments to a new byte[]. + /// + /// @param segments Source segments. + /// @param offset Source segments offset. + /// @param num_bytes the number bytes to copy. + static PAIMON_UNIQUE_PTR CopyToBytes(const std::vector& segments, + int32_t offset, int32_t num_bytes, + MemoryPool* pool); + + /// Copy segments to target byte[]. + /// + /// @param segments Source segments. + /// @param offset Source segments offset. + /// @param bytes target byte[]. + /// @param bytes_offset target byte[] offset. + /// @param num_bytes the number bytes to copy. + template + static void CopyToBytes(const std::vector& segments, int32_t offset, T* bytes, + int32_t bytes_offset, int32_t num_bytes); + + /// Copy bytes of segments to output stream. + /// + /// @note It just copies the data in, not include the length. + /// + /// @param segments source segments + /// @param offset offset for segments + /// @param size_in_bytes size in bytes + /// @param target target output stream + static Status CopyToStream(const std::vector& segments, int32_t offset, + int32_t size_in_bytes, MemorySegmentOutputStream* target); + + /// Copy segments to target unsafe pointer. + /// + /// @param segments Source segments. + /// @param offset The position where the bytes are started to be read from these memory + /// segments. + /// @param target The unsafe memory to copy the bytes to. + /// @param num_bytes the number bytes to copy. + static void CopyToUnsafe(const std::vector& segments, int32_t offset, + void* target, int32_t num_bytes); + + template + static void CopyMultiSegmentsToBytes(const std::vector& segments, int32_t offset, + T* bytes, int32_t bytes_offset, int32_t num_bytes); + + static std::shared_ptr GetBytes(const std::vector& segments, + int32_t base_offset, int32_t size_in_bytes, + MemoryPool* pool); + + /// Is it just in first MemorySegment, we use quick way to do something. + static bool InFirstSegment(const std::vector& segments, int32_t offset, + int32_t num_bytes); + + /// unset bit. + /// + /// @param segment target segment. + /// @param base_offset bits base offset. + /// @param index bit index from base offset. + static void BitUnSet(MemorySegment* segment, int32_t base_offset, int32_t index); + + /// set bit. + /// + /// @param segment target segment. + /// @param base_offset bits base offset. + /// @param index bit index from base offset. + static void BitSet(MemorySegment* segment, int32_t base_offset, int32_t index); + + /// read bit. + /// + /// @param segment target segment. + /// @param base_offset bits base offset. + /// @param index bit index from base offset. + static bool BitGet(const MemorySegment& segment, int32_t base_offset, int32_t index); + + /// set bit from segments. + /// + /// @param segments target segments. + /// @param base_offset bits base offset. + /// @param index bit index from base offset. + static void BitSet(std::vector* segments, int32_t base_offset, int32_t index); + + /// read bit from segments. + /// + /// @param segments target segments. + /// @param base_offset bits base offset. + /// @param index bit index from base offset. + static bool BitGet(const std::vector& segments, int32_t base_offset, + int32_t index); + + /// unset bit from segments. + /// + /// @param segments target segments. + /// @param base_offset bits base offset. + /// @param index bit index from base offset. + static void BitUnSet(std::vector* segments, int32_t base_offset, int32_t index); + + /// get value from segments. Only support: bool, char, int16_t, int32_t, int64_t, double, float + /// + /// @param segments target segments. + /// @param offset value offset. + template + static T GetValue(const std::vector& segments, int32_t offset); + + /// set value to segments. Only support: bool, char, int16_t, int32_t, int64_t, double, float + /// + /// @param segments target segments. + /// @param offset value offset. + template + static void SetValue(std::vector* segments, int32_t offset, const T& value); + + /* + * Equals two memory segments regions. + * + * @param segments1 Segments 1 + * @param offset1 Offset of segments1 to start equaling + * @param segments2 Segments 2 + * @param offset2 Offset of segments2 to start equaling + * @param len Length of the equaled memory region + * @return true if equal, false otherwise + */ + static bool Equals(const std::vector& segments1, int32_t offset1, + const std::vector& segments2, int32_t offset2, int32_t len); + + static bool EqualsMultiSegments(const std::vector& segments1, int32_t offset1, + const std::vector& segments2, int32_t offset2, + int32_t len); + + /// Find equal segments2 in segments1. + /// + /// @param segments1 segs to find. + /// @param segments2 sub segs. + /// @return Return the found offset, return -1 if not find. + static int32_t Find(const std::vector& segments1, int32_t offset1, + int32_t num_bytes1, const std::vector& segments2, + int32_t offset2, int32_t num_bytes2); + + /// hash segments to int, num_bytes must be aligned to 4 bytes. + /// + /// @param segments Source segments. + /// @param offset Source segments offset. + /// @param num_bytes the number bytes to hash. + static int32_t HashByWords(const std::vector& segments, int32_t offset, + int32_t num_bytes, MemoryPool* pool); + + /// hash segments to int. + /// + /// @param segments Source segments. + /// @param offset Source segments offset. + /// @param num_bytes the number bytes to hash. + static int32_t Hash(const std::vector& segments, int32_t offset, + int32_t num_bytes, MemoryPool* pool); + + private: + static constexpr int32_t ADDRESS_BITS_PER_WORD = 3; + static constexpr int32_t BIT_BYTE_INDEX_MASK = 7; + static constexpr int32_t MAX_BYTES_LENGTH = 1024 * 64; + static constexpr int32_t MAX_CHARS_LENGTH = 1024 * 32; + + private: + template + static void SetValueToMultiSegments(std::vector* segments, int32_t offset, + const T& value); + + template + static void SetValueSlowly(std::vector* segments, int32_t seg_size, + int32_t seg_num, int32_t seg_offset, const T& value); + template + static T GetValueFromMultiSegments(const std::vector& segments, int32_t offset); + + template + static T GetValueSlowly(const std::vector& segments, int32_t seg_size, + int32_t seg_num, int32_t seg_offset); + + static void CopyMultiSegmentsFromBytes(std::vector* segments, int32_t offset, + const Bytes& bytes, int32_t bytes_offset, + int32_t num_bytes); + + static void CopyMultiSegmentsToUnsafe(const std::vector& segments, + int32_t offset, void* target, int32_t num_bytes); + + static int32_t FindInMultiSegments(const std::vector& segments1, int32_t offset1, + int32_t num_bytes1, + const std::vector& segments2, int32_t offset2, + int32_t num_bytes2); + + static int32_t HashMultiSeg(const std::vector& segments, int32_t offset, + int32_t num_bytes, MemoryPool* pool); + + static int32_t HashMultiSegByWords(const std::vector& segments, int32_t offset, + int32_t num_bytes, MemoryPool* pool); + + /// Given a bit index, return the byte index containing it. + /// + /// @param bit_index the bit index. + /// @return the byte index. + static int32_t ByteIndex(int32_t bit_index); + + static void BitSetMultiSegments(std::vector* segments, int32_t base_offset, + int32_t index); + + static void BitUnSetMultiSegments(std::vector* segments, int32_t base_offset, + int32_t index); +}; + +template +inline void MemorySegmentUtils::CopyToBytes(const std::vector& segments, + int32_t offset, T* bytes, int32_t bytes_offset, + int32_t num_bytes) { + if (InFirstSegment(segments, offset, num_bytes)) { + segments[0].Get(offset, bytes, bytes_offset, num_bytes); + } else { + CopyMultiSegmentsToBytes(segments, offset, bytes, bytes_offset, num_bytes); + } +} + +template +inline void MemorySegmentUtils::CopyMultiSegmentsToBytes(const std::vector& segments, + int32_t offset, T* bytes, + int32_t bytes_offset, int32_t num_bytes) { + int32_t remain_size = num_bytes; + for (const auto& segment : segments) { + int32_t remain = segment.Size() - offset; + if (remain > 0) { + int32_t n_copy = std::min(remain, remain_size); + segment.Get(offset, bytes, num_bytes - remain_size + bytes_offset, n_copy); + remain_size -= n_copy; + // next new segment. + offset = 0; + if (remain_size == 0) { + return; + } + } else { + // remain is negative, let's advance to next segment + // now the offset = offset - segment_size (-remain) + offset = -remain; + } + } +} + +template +inline T MemorySegmentUtils::GetValue(const std::vector& segments, int32_t offset) { + static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); + if (InFirstSegment(segments, offset, sizeof(T))) { + return segments[0].GetValue(offset); + } else { + return GetValueFromMultiSegments(segments, offset); + } +} + +template +inline void MemorySegmentUtils::SetValue(std::vector* segments, int32_t offset, + const T& value) { + static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); + if (InFirstSegment(*segments, offset, sizeof(T))) { + (*segments)[0].PutValue(offset, value); + } else { + SetValueToMultiSegments(segments, offset, value); + } +} + +template +inline void MemorySegmentUtils::SetValueToMultiSegments(std::vector* segments, + int32_t offset, const T& value) { + int32_t seg_size = (*segments)[0].Size(); + int32_t seg_index = offset / seg_size; + int32_t seg_offset = offset - seg_index * seg_size; // equal to % + + if (seg_offset <= seg_size - static_cast(sizeof(T))) { + (*segments)[seg_index].PutValue(seg_offset, value); + } else { + SetValueSlowly(segments, seg_size, seg_index, seg_offset, value); + } +} + +template <> +inline void MemorySegmentUtils::SetValueToMultiSegments(std::vector* segments, + int32_t offset, const float& value) { + int32_t seg_size = (*segments)[0].Size(); + int32_t seg_index = offset / seg_size; + int32_t seg_offset = offset - seg_index * seg_size; // equal to % + + if (seg_offset <= seg_size - static_cast(sizeof(float))) { + (*segments)[seg_index].PutValue(seg_offset, value); + } else { + int32_t int_value; + std::memcpy(&int_value, &value, sizeof(float)); + SetValueSlowly(segments, seg_size, seg_index, seg_offset, int_value); + } +} + +template <> +inline void MemorySegmentUtils::SetValueToMultiSegments(std::vector* segments, + int32_t offset, const double& value) { + int32_t seg_size = (*segments)[0].Size(); + int32_t seg_index = offset / seg_size; + int32_t seg_offset = offset - seg_index * seg_size; // equal to % + + if (seg_offset <= seg_size - static_cast(sizeof(double))) { + (*segments)[seg_index].PutValue(seg_offset, value); + } else { + int64_t long_value; + std::memcpy(&long_value, &value, sizeof(double)); + SetValueSlowly(segments, seg_size, seg_index, seg_offset, long_value); + } +} + +template +inline void MemorySegmentUtils::SetValueSlowly(std::vector* segments, + int32_t seg_size, int32_t seg_num, + int32_t seg_offset, const T& value) { + MemorySegment segment = (*segments)[seg_num]; + for (size_t i = 0; i < sizeof(T); i++) { + if (seg_offset == seg_size) { + segment = (*segments)[++seg_num]; + seg_offset = 0; + } + T unsigned_byte; + if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) { + unsigned_byte = value >> (i * 8); + } else { + int32_t shift_count = sizeof(T) - 1; + unsigned_byte = value >> ((shift_count - i) * 8); + } + segment.Put(seg_offset, static_cast(unsigned_byte)); + seg_offset++; + } +} + +template +inline T MemorySegmentUtils::GetValueFromMultiSegments(const std::vector& segments, + int32_t offset) { + int32_t seg_size = segments[0].Size(); + int32_t seg_index = offset / seg_size; + int32_t seg_offset = offset - seg_index * seg_size; // equal to % + if (seg_offset <= seg_size - static_cast(sizeof(T))) { + return segments[seg_index].GetValue(seg_offset); + } else { + return GetValueSlowly(segments, seg_size, seg_index, seg_offset); + } +} + +template <> +inline float MemorySegmentUtils::GetValueFromMultiSegments( + const std::vector& segments, int32_t offset) { + int32_t seg_size = segments[0].Size(); + int32_t seg_index = offset / seg_size; + int32_t seg_offset = offset - seg_index * seg_size; // equal to % + if (seg_offset <= seg_size - static_cast(sizeof(float))) { + return segments[seg_index].GetValue(seg_offset); + } else { + auto int_value = GetValueSlowly(segments, seg_size, seg_index, seg_offset); + float float_value; + std::memcpy(&float_value, &int_value, sizeof(float)); + return float_value; + } +} + +template <> +inline double MemorySegmentUtils::GetValueFromMultiSegments( + const std::vector& segments, int32_t offset) { + int32_t seg_size = segments[0].Size(); + int32_t seg_index = offset / seg_size; + int32_t seg_offset = offset - seg_index * seg_size; // equal to % + if (seg_offset <= seg_size - static_cast(sizeof(double))) { + return segments[seg_index].GetValue(seg_offset); + } else { + auto long_value = GetValueSlowly(segments, seg_size, seg_index, seg_offset); + double double_value; + std::memcpy(&double_value, &long_value, sizeof(double)); + return double_value; + } +} + +template +inline T MemorySegmentUtils::GetValueSlowly(const std::vector& segments, + int32_t seg_size, int32_t seg_num, int32_t seg_offset) { + MemorySegment segment = segments[seg_num]; + T ret = 0; + for (size_t i = 0; i < sizeof(T); i++) { + if (seg_offset == seg_size) { + segment = segments[++seg_num]; + seg_offset = 0; + } + T unsigned_byte = segment.Get(seg_offset) & 0xff; + if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) { + ret |= (unsigned_byte << (i * 8)); + } else { + int32_t shift_count = sizeof(T) - 1; + ret |= (unsigned_byte << ((shift_count - i) * 8)); + } + seg_offset++; + } + return ret; +} + +inline Status MemorySegmentUtils::CopyToStream(const std::vector& segments, + int32_t offset, int32_t size_in_bytes, + MemorySegmentOutputStream* target) { + for (const auto& source_segment : segments) { + int32_t cur_seg_remain = source_segment.Size() - offset; + if (cur_seg_remain > 0) { + int32_t copy_size = std::min(cur_seg_remain, size_in_bytes); + target->Write(source_segment, offset, copy_size); + size_in_bytes -= copy_size; + offset = 0; + } else { + offset -= source_segment.Size(); + } + + if (size_in_bytes == 0) { + return Status::OK(); + } + } + if (size_in_bytes != 0) { + return Status::Invalid( + fmt::format("No copy finished, this should be a bug, " + "The remaining length is: {}", + size_in_bytes)); + } + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_segment_utils_test.cpp b/src/paimon/common/memory/memory_segment_utils_test.cpp new file mode 100644 index 0000000..e4320b2 --- /dev/null +++ b/src/paimon/common/memory/memory_segment_utils_test.cpp @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/memory/memory_segment_utils.h" + +#include +#include + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(MemorySegmentUtilsTest, TestSetAndGetValue) { + auto pool = GetDefaultPool(); + for (auto single_segment_size : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 128}) { + std::vector segments; + segments.reserve(50); + for (size_t i = 0; i < 50; i++) { + segments.push_back( + MemorySegment::Wrap(Bytes::AllocateBytes(single_segment_size, pool.get()))); + } + int32_t offset = 0; + MemorySegmentUtils::SetValue(&segments, offset, 233); + ASSERT_EQ(233, MemorySegmentUtils::GetValue(segments, offset)); + + offset += sizeof(int32_t); + MemorySegmentUtils::SetValue(&segments, offset, 23333333333); + ASSERT_EQ(23333333333, MemorySegmentUtils::GetValue(segments, offset)); + + offset += sizeof(int64_t); + MemorySegmentUtils::SetValue(&segments, offset, 233.3); + ASSERT_NEAR(233.3, MemorySegmentUtils::GetValue(segments, offset), 0.001); + + offset += sizeof(float); + MemorySegmentUtils::SetValue(&segments, offset, 244.3); + ASSERT_NEAR(244.3, MemorySegmentUtils::GetValue(segments, offset), 0.001); + + offset += sizeof(double); + MemorySegmentUtils::SetValue(&segments, offset, 5564); + ASSERT_EQ(5564, MemorySegmentUtils::GetValue(segments, offset)); + + offset += sizeof(int16_t); + MemorySegmentUtils::SetValue(&segments, offset, 123); + ASSERT_EQ(123, MemorySegmentUtils::GetValue(segments, offset)); + + offset += sizeof(char); + MemorySegmentUtils::SetValue(&segments, offset, true); + ASSERT_EQ(true, MemorySegmentUtils::GetValue(segments, offset)); + + ASSERT_EQ(233, MemorySegmentUtils::GetValue(segments, 0)); + } +} + +TEST(MemorySegmentUtilsTest, TestCopyFromBytesAndGetBytes) { + auto pool = GetDefaultPool(); + int32_t str_size = 1024; + std::string test_string1, test_string2; + test_string1.reserve(str_size); + test_string2.reserve(str_size); + for (int32_t j = 0; j < str_size; j++) { + test_string1 += static_cast(paimon::test::RandomNumber(0, 25) + 'a'); + test_string2 += static_cast(paimon::test::RandomNumber(0, 25) + 'a'); + } + test_string2 += test_string1; + std::shared_ptr bytes1 = Bytes::AllocateBytes(test_string1, pool.get()); + std::shared_ptr bytes2 = Bytes::AllocateBytes(test_string2, pool.get()); + + std::vector segs = {MemorySegment::AllocateHeapMemory(str_size, pool.get())}; + MemorySegmentUtils::CopyFromBytes(&segs, 0, *bytes1, 0, test_string1.size()); + PAIMON_UNIQUE_PTR bytes_serialize = + MemorySegmentUtils::CopyToBytes(segs, 0, test_string1.size(), pool.get()); + auto bytes_get = MemorySegmentUtils::GetBytes(segs, /*base_offset=*/0, + /*size_in_bytes=*/str_size, pool.get()); + ASSERT_EQ(*bytes_get, *bytes1); + std::string str_serialize = std::string(bytes_serialize->data(), bytes_serialize->size()); + ASSERT_EQ(test_string1, str_serialize); + + // test MultiSegments + std::vector segs2 = {MemorySegment::AllocateHeapMemory(str_size, pool.get()), + MemorySegment::AllocateHeapMemory(str_size, pool.get())}; + MemorySegmentUtils::CopyFromBytes(&segs2, 0, *bytes2, 0, test_string2.size()); + PAIMON_UNIQUE_PTR bytes_serialize2 = + MemorySegmentUtils::CopyToBytes(segs2, 0, test_string2.size(), pool.get()); + std::string str_serialize2 = std::string(bytes_serialize2->data(), bytes_serialize2->size()); + ASSERT_EQ(test_string2, str_serialize2); + + std::shared_ptr bytes_serialize3 = MemorySegmentUtils::GetBytes( + segs2, test_string2.size() - test_string1.size(), test_string1.size(), pool.get()); + std::string str_serialize3 = std::string(bytes_serialize3->data(), bytes_serialize3->size()); + ASSERT_EQ(test_string1, str_serialize3); +} + +TEST(MemorySegmentUtilsTest, TestCopyToUnsafe) { + auto pool = GetDefaultPool(); + int32_t str_size = 1024; + std::string test_string1, test_string2; + test_string1.reserve(str_size); + test_string2.reserve(str_size); + for (int32_t j = 0; j < str_size; j++) { + test_string1 += static_cast(paimon::test::RandomNumber(0, 25) + 'a'); + test_string2 += static_cast(paimon::test::RandomNumber(0, 25) + 'a'); + } + test_string2 += test_string1; + std::shared_ptr bytes1 = Bytes::AllocateBytes(test_string1, pool.get()); + std::shared_ptr bytes2 = Bytes::AllocateBytes(test_string2, pool.get()); + + std::vector segs = {MemorySegment::AllocateHeapMemory(str_size, pool.get())}; + MemorySegmentUtils::CopyFromBytes(&segs, 0, *bytes1, 0, test_string1.size()); + std::shared_ptr bytes_serialize = Bytes::AllocateBytes(test_string1.size(), pool.get()); + MemorySegmentUtils::CopyToUnsafe(segs, 0, bytes_serialize->data(), test_string1.size()); + ASSERT_EQ(test_string1, std::string(bytes_serialize->data(), bytes_serialize->size())); + + // test MultiSegments + std::vector segs2 = {MemorySegment::AllocateHeapMemory(str_size, pool.get()), + MemorySegment::AllocateHeapMemory(str_size, pool.get())}; + MemorySegmentUtils::CopyFromBytes(&segs2, 0, *bytes2, 0, test_string2.size()); + std::shared_ptr bytes_serialize2 = Bytes::AllocateBytes(test_string1.size(), pool.get()); + MemorySegmentUtils::CopyToUnsafe(segs2, test_string2.size() - test_string1.size(), + bytes_serialize2->data(), test_string1.size()); + ASSERT_EQ(test_string1, std::string(bytes_serialize2->data(), bytes_serialize2->size())); +} + +TEST(MemorySegmentUtilsTest, TestSetAndUnSet) { + auto pool = GetDefaultPool(); + int32_t str_size = 1024; + std::string test_string1, test_string2; + test_string1.reserve(str_size); + test_string2.reserve(str_size); + for (int32_t j = 0; j < str_size; j++) { + test_string1 += static_cast(paimon::test::RandomNumber(0, 25) + 'a'); + test_string2 += static_cast(paimon::test::RandomNumber(0, 25) + 'a'); + } + test_string2 += test_string1; + std::shared_ptr bytes2 = Bytes::AllocateBytes(test_string2, pool.get()); + + std::vector segs = {MemorySegment::AllocateHeapMemory(str_size, pool.get()), + MemorySegment::AllocateHeapMemory(str_size, pool.get())}; + MemorySegmentUtils::CopyFromBytes(&segs, 0, *bytes2, 0, test_string2.size()); + int32_t index = paimon::test::RandomNumber(0, str_size - 1); + MemorySegmentUtils::BitUnSet(&segs, str_size, index); + ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, str_size, index)); + MemorySegmentUtils::BitSet(&segs, str_size, index); + ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, str_size, index)); + MemorySegmentUtils::BitSet(&segs[0], /*base_offset=*/0, index); + ASSERT_TRUE(MemorySegmentUtils::BitGet(segs[0], /*base_offset=*/0, index)); + MemorySegmentUtils::BitUnSet(&segs[0], /*base_offset=*/0, index); + ASSERT_FALSE(MemorySegmentUtils::BitGet(segs[0], /*base_offset=*/0, index)); +} + +TEST(MemorySegmentUtilsTest, TestBitSetAndUnSetSingleSegment) { + auto pool = GetDefaultPool(); + int32_t segment_size = 128; + std::vector segs = {MemorySegment::AllocateHeapMemory(segment_size, pool.get())}; + + // initially all bits should be 0 after allocation + for (int32_t i = 0; i < 16; i++) { + ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, i)); + } + + // set bits at various indices and verify + MemorySegmentUtils::BitSet(&segs, /*base_offset=*/0, /*index=*/0); + ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 0)); + + MemorySegmentUtils::BitSet(&segs, /*base_offset=*/0, /*index=*/7); + ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 7)); + + MemorySegmentUtils::BitSet(&segs, /*base_offset=*/0, /*index=*/15); + ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 15)); + + // unset and verify + MemorySegmentUtils::BitUnSet(&segs, /*base_offset=*/0, /*index=*/0); + ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 0)); + + MemorySegmentUtils::BitUnSet(&segs, /*base_offset=*/0, /*index=*/7); + ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 7)); + + // bit 15 should still be set + ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 15)); + + MemorySegmentUtils::BitUnSet(&segs, /*base_offset=*/0, /*index=*/15); + ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 15)); + + // test with non-zero base_offset + int32_t base_offset = 10; + MemorySegmentUtils::BitSet(&segs, base_offset, /*index=*/3); + ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, base_offset, 3)); + ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, base_offset, 2)); + + MemorySegmentUtils::BitUnSet(&segs, base_offset, /*index=*/3); + ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, base_offset, 3)); +} + +TEST(MemorySegmentUtilsTest, TestCopyMultiSegmentsFromBytes) { + auto pool = GetDefaultPool(); + std::shared_ptr bytes = Bytes::AllocateBytes("abcdef", pool.get()); + int32_t segment_size = 10; + std::vector segs = {MemorySegment::AllocateHeapMemory(segment_size, pool.get()), + MemorySegment::AllocateHeapMemory(segment_size, pool.get())}; + { + MemorySegmentUtils::CopyMultiSegmentsFromBytes(&segs, /*offset=*/3, *bytes, + /*bytes_offset=*/0, + /*num_bytes=*/bytes->size()); + auto result_bytes = + MemorySegmentUtils::CopyToBytes(segs, /*offset=*/3, + /*num_bytes=*/bytes->size(), pool.get()); + ASSERT_EQ(*bytes, *result_bytes); + } + { + MemorySegmentUtils::CopyMultiSegmentsFromBytes(&segs, /*offset=*/12, *bytes, + /*bytes_offset=*/0, + /*num_bytes=*/bytes->size()); + auto result_bytes = + MemorySegmentUtils::CopyToBytes(segs, /*offset=*/12, + /*num_bytes=*/bytes->size(), pool.get()); + ASSERT_EQ(*bytes, *result_bytes); + } +} + +TEST(MemorySegmentUtilsTest, TestCopyToStream) { + auto pool = GetDefaultPool(); + int32_t segment_size = 3; + std::shared_ptr bytes1 = Bytes::AllocateBytes("abc", pool.get()); + std::shared_ptr bytes2 = Bytes::AllocateBytes("def", pool.get()); + std::vector segs = {MemorySegment::Wrap(bytes1), MemorySegment::Wrap(bytes2)}; + { + MemorySegmentOutputStream out(/*segment_size=*/segment_size, pool); + ASSERT_OK(MemorySegmentUtils::CopyToStream(segs, /*offset=*/0, + /*size_in_bytes=*/segment_size * 2, &out)); + std::shared_ptr expected_bytes = Bytes::AllocateBytes("abcdef", pool.get()); + auto bytes = + MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); + ASSERT_EQ(*expected_bytes, *bytes); + } + { + MemorySegmentOutputStream out(/*segment_size=*/segment_size, pool); + ASSERT_OK(MemorySegmentUtils::CopyToStream(segs, /*offset=*/2, /*size_in_bytes=*/4, &out)); + std::shared_ptr expected_bytes = Bytes::AllocateBytes("cdef", pool.get()); + auto bytes = + MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); + ASSERT_EQ(*expected_bytes, *bytes); + } + { + MemorySegmentOutputStream out(/*segment_size=*/segment_size, pool); + ASSERT_OK(MemorySegmentUtils::CopyToStream(segs, /*offset=*/4, /*size_in_bytes=*/2, &out)); + std::shared_ptr expected_bytes = Bytes::AllocateBytes("ef", pool.get()); + auto bytes = + MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); + ASSERT_EQ(*expected_bytes, *bytes); + } +} + +TEST(MemorySegmentUtilsTest, TestFind) { + auto pool = GetDefaultPool(); + std::shared_ptr bytes1 = Bytes::AllocateBytes("abc", pool.get()); + std::shared_ptr bytes2 = Bytes::AllocateBytes("def", pool.get()); + std::shared_ptr bytes3 = Bytes::AllocateBytes("adef", pool.get()); + std::shared_ptr bytes4 = Bytes::AllocateBytes("ghi", pool.get()); + std::vector segs1 = {MemorySegment::Wrap(bytes1), + MemorySegment::Wrap(bytes2)}; // abcdef + std::vector segs2 = {MemorySegment::Wrap(bytes3), + MemorySegment::Wrap(bytes4)}; // adefghi + // find "" + ASSERT_EQ(1, MemorySegmentUtils::Find(segs1, /*offset1=*/1, /*num_bytes1=*/5, segs2, + /*offset2=*/0, /*num_bytes2=*/0)); + // find "def" + ASSERT_EQ(3, MemorySegmentUtils::Find(segs1, /*offset1=*/0, /*num_bytes1=*/6, segs2, + /*offset2=*/1, /*num_bytes2=*/3)); + // find "defg" + ASSERT_EQ(-1, MemorySegmentUtils::Find(segs1, /*offset1=*/0, /*num_bytes1=*/6, segs2, + /*offset2=*/1, /*num_bytes2=*/4)); + // find "de" in "abc" of segs1 + ASSERT_EQ(-1, MemorySegmentUtils::Find(segs1, /*offset1=*/0, /*num_bytes1=*/3, segs2, + /*offset2=*/1, /*num_bytes2=*/2)); + // find "a" in "abc" of segs1 + ASSERT_EQ(0, MemorySegmentUtils::Find(segs1, /*offset1=*/0, /*num_bytes1=*/3, segs2, + /*offset2=*/0, /*num_bytes2=*/1)); +} + +} // namespace paimon::test