From 7766a0c16522fb0d9e2b5b9324d6bdfcbae432b2 Mon Sep 17 00:00:00 2001 From: "lisizhuo.lsz" Date: Mon, 25 May 2026 11:20:50 +0800 Subject: [PATCH] feat: introduce basic data types including decimal, blob and timestamp --- include/paimon/data/blob.h | 113 +++++++ include/paimon/data/decimal.h | 146 +++++++++ include/paimon/data/timestamp.h | 139 +++++++++ include/paimon/logging.h | 73 +++++ src/paimon/common/data/blob_defs.h | 63 ++++ src/paimon/common/data/blob_descriptor.cpp | 114 +++++++ src/paimon/common/data/blob_descriptor.h | 92 ++++++ .../common/data/blob_descriptor_test.cpp | 228 ++++++++++++++ src/paimon/common/data/blob_test.cpp | 188 +++++++++++ src/paimon/common/data/blob_utils.cpp | 120 +++++++ src/paimon/common/data/blob_utils.h | 71 +++++ src/paimon/common/data/blob_utils_test.cpp | 145 +++++++++ src/paimon/common/data/decimal.cpp | 222 +++++++++++++ src/paimon/common/data/decimal_test.cpp | 292 ++++++++++++++++++ src/paimon/common/data/timestamp.cpp | 63 ++++ src/paimon/common/data/timestamp_test.cpp | 103 ++++++ src/paimon/common/logging/logging.cpp | 94 ++++++ src/paimon/common/logging/logging_test.cpp | 42 +++ 18 files changed, 2308 insertions(+) create mode 100644 include/paimon/data/blob.h create mode 100644 include/paimon/data/decimal.h create mode 100644 include/paimon/data/timestamp.h create mode 100644 include/paimon/logging.h create mode 100644 src/paimon/common/data/blob_defs.h create mode 100644 src/paimon/common/data/blob_descriptor.cpp create mode 100644 src/paimon/common/data/blob_descriptor.h create mode 100644 src/paimon/common/data/blob_descriptor_test.cpp create mode 100644 src/paimon/common/data/blob_test.cpp create mode 100644 src/paimon/common/data/blob_utils.cpp create mode 100644 src/paimon/common/data/blob_utils.h create mode 100644 src/paimon/common/data/blob_utils_test.cpp create mode 100644 src/paimon/common/data/decimal.cpp create mode 100644 src/paimon/common/data/decimal_test.cpp create mode 100644 src/paimon/common/data/timestamp.cpp create mode 100644 src/paimon/common/data/timestamp_test.cpp create mode 100644 src/paimon/common/logging/logging.cpp create mode 100644 src/paimon/common/logging/logging_test.cpp diff --git a/include/paimon/data/blob.h b/include/paimon/data/blob.h new file mode 100644 index 0000000..ff89714 --- /dev/null +++ b/include/paimon/data/blob.h @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/fs/file_system.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +struct ArrowSchema; + +namespace paimon { + +/// Represents a binary large object (BLOB) that can be read from various sources. +/// +/// The Blob class provides a unified interface for handling binary data from different +/// sources including file path and blob descriptor. It supports reading data through input +/// streams and provides descriptor-based serialization. +class PAIMON_EXPORT Blob { + public: + ~Blob(); + + /// Creates a Blob from a file path. + /// + /// @param path The file path to create the blob from. + /// @return A result containing the created blob or an error. + static Result> FromPath(const std::string& path); + + /// Creates a Blob from a file path with specified offset and length. + /// + /// @param path The file path to create the blob from. + /// @param offset The starting offset within the file. + /// @param length The length of data to read from the file. + /// @return A result containing the created blob or an error. + static Result> FromPath(const std::string& path, int64_t offset, + int64_t length); + + /// Creates a Blob from a blob descriptor. + /// + /// @param buffer The buffer of the blob descriptor. + /// @param length The length of the buffer. + /// @return A result containing the created blob or an error. + static Result> FromDescriptor(const char* buffer, uint64_t length); + + /// Converts the blob to a blob descriptor. + /// + /// @param pool The memory pool to use for allocation. + /// @return A blob descriptor bytes representing the blob. + PAIMON_UNIQUE_PTR ToDescriptor(const std::shared_ptr& pool) const; + + /// Gets the URI of the blob. + const std::string& Uri() const; + + /// Creates an input stream for reading the blob data. + /// + /// @param fs The file system to use for reading. + /// @return A result containing the input stream or an error. + Result> NewInputStream( + const std::shared_ptr& fs) const; + + /// Reads the blob data to bytes. + /// + /// @param fs The file system to use for reading. + /// @param pool The memory pool to use for allocation. + /// @return A result containing the blob data bytes or an error. + Result> ToData(const std::shared_ptr& fs, + const std::shared_ptr& pool) const; + + /// Creates an Arrow field definition for the Blob type. + /// + /// This function constructs an Arrow Field (internally using `arrow::large_binary()`) + /// and exports it to the C data interface structure `::ArrowSchema`. + /// It automatically injects Paimon-specific metadata to identify the field as a BLOB. + /// + /// @param field_name The name of the Arrow field. + /// @param metadata A map of key-value metadata to be attached to the field. + /// @return A result containing a unique pointer to the generated `ArrowSchema` or an error. + static Result> ArrowField( + const std::string& field_name, std::unordered_map metadata = {}); + + private: + class Impl; + + explicit Blob(std::unique_ptr&& impl); + + std::unique_ptr impl_; +}; + +} // namespace paimon diff --git a/include/paimon/data/decimal.h b/include/paimon/data/decimal.h new file mode 100644 index 0000000..707e3e5 --- /dev/null +++ b/include/paimon/data/decimal.h @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include +#include + +#include "paimon/visibility.h" + +namespace paimon { +class Bytes; + +/// A data structure representing data of Decimal. It might be stored in a compact representation +/// (as a long value) if values are small enough. +class PAIMON_EXPORT Decimal { + public: + using int128_t = __int128_t; + using uint128_t = __uint128_t; + + Decimal(int32_t precision, int32_t scale, int128_t value) + : precision_(precision), scale_(scale), value_(value) {} + + /// Get the **precision** of this decimal. + /// + /// The precision is the number of digits in the unscaled value. + int32_t Precision() const { + return precision_; + } + + /// Get the **scale** of this decimal. + int32_t Scale() const { + return scale_; + } + + /// Get the underlying int128_t value of this decimal. + int128_t Value() const { + return value_; + } + + /// Get the low 64 bits of the decimal value. + uint64_t LowBits() const { + return static_cast(value_ & 0xFFFFFFFFFFFFFFFF); + } + + /// Get the high 64 bits of the decimal value. + uint64_t HighBits() const { + return static_cast(value_ >> 64); + } + + /// @return Whether the decimal value is small enough to be stored in a long. + bool IsCompact() const { + return precision_ <= MAX_COMPACT_PRECISION; + } + + std::string ToString() const; + + /// @return Whether the decimal value is small enough to be stored in a long. + static bool IsCompact(int32_t precision) { + return precision <= MAX_COMPACT_PRECISION; + } + + /// Creates an instance of `Decimal` from an unscaled long value and the given precision and + /// scale. + static Decimal FromUnscaledLong(int64_t unscaled_long, int32_t precision, int32_t scale) { + return {precision, scale, unscaled_long}; + } + + /// @return A long describing the **unscaled value** of this decimal. + int64_t ToUnscaledLong() const { + int64_t ret = 0; + uint64_t low_bits = (value_ & 0xFFFFFFFFFFFFFFFF); + memcpy(&ret, &low_bits, sizeof(uint64_t)); + return ret; + } + + /// @return A byte array describing the **unscaled value** of this decimal. + std::vector ToUnscaledBytes() const; + + /// Creates an instance of `Decimal` from an unscaled byte array value and the given precision + /// and scale. + static Decimal FromUnscaledBytes(int32_t precision, int32_t scale, Bytes* bytes); + + bool operator==(const Decimal& other) const { + if (this == &other) { + return true; + } + return CompareTo(other) == 0; + } + + bool operator<(const Decimal& other) const { + if (this == &other) { + return false; + } + return CompareTo(other) < 0; + } + + bool operator>(const Decimal& other) const { + if (this == &other) { + return false; + } + return CompareTo(other) > 0; + } + + int32_t CompareTo(const Decimal& other) const; + static constexpr int32_t DEFAULT_PRECISION = 10; + static constexpr int32_t DEFAULT_SCALE = 0; + static constexpr int32_t MIN_PRECISION = 1; + static constexpr int32_t MAX_PRECISION = 38; + + private: + static constexpr int32_t MAX_COMPACT_PRECISION = 18; + static const int128_t INT128_MAXIMUM_VALUE; + static const int128_t INT128_MINIMUM_VALUE; + static const int64_t POWERS_OF_TEN[MAX_COMPACT_PRECISION + 1]; + + static int32_t clz_u128(uint128_t u); + static int32_t count_leading_zero_bytes(uint128_t u); + static int32_t count_leading_all_ones_bytes(uint128_t u); + static int128_t DownScaleInt128(int128_t value, int32_t scale); + static int128_t ScaleInt128(int128_t value, int32_t scale, bool* overflow); + + private: + int32_t precision_ = 0; + int32_t scale_ = 0; + int128_t value_ = 0; +}; + +} // namespace paimon diff --git a/include/paimon/data/timestamp.h b/include/paimon/data/timestamp.h new file mode 100644 index 0000000..93dfe1b --- /dev/null +++ b/include/paimon/data/timestamp.h @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/visibility.h" + +namespace paimon { +/// A data structure representing data of Timestamp without timezone. +/// +/// This data structure is immutable and consists of a milliseconds and nanos-of-millisecond since +/// `1970-01-01 00:00:00`. It might be stored in a compact representation (as a long value) if +/// values are small enough. Timestamp range from 0000-01-01 00:00:00.000000000 to 9999-12-31 +/// 23:59:59.999999999. +/// +class PAIMON_EXPORT Timestamp { + public: + Timestamp() : Timestamp(0, 0) {} + Timestamp(int64_t millisecond, int32_t nano_of_millisecond) + : millisecond_(millisecond), nano_of_millisecond_(nano_of_millisecond) { + assert(nano_of_millisecond >= 0 && nano_of_millisecond <= 999999ll); + } + + /// Get the number of milliseconds since `1970-01-01 00:00:00`. + int64_t GetMillisecond() const { + return millisecond_; + } + /// Get the number of nanoseconds (the nanoseconds within the milliseconds). + /// + /// The value range is from 0 to 999,999. + int32_t GetNanoOfMillisecond() const { + return nano_of_millisecond_; + } + + /// Creates an instance of `Timestamp` from milliseconds. + /// + /// The nanos-of-millisecond field will be set to zero. + /// + /// @param milliseconds The number of milliseconds since `1970-01-01 00:00:00`; a + /// negative number is the number of milliseconds before `1970-01-01 00:00:00`. + static Timestamp FromEpochMillis(int64_t milliseconds) { + return {milliseconds, 0}; + } + + /// Creates an instance of `Timestamp` from milliseconds and a nanos-of-millisecond. + /// + /// @param milliseconds The number of milliseconds since `1970-01-01 00:00:00`; a + /// negative number is the number of milliseconds before `1970-01-01 00:00:00`. + /// @param nanos_of_millisecond The nanoseconds within the millisecond, from 0 to 999,999. + static Timestamp FromEpochMillis(int64_t milliseconds, int32_t nanos_of_millisecond) { + return {milliseconds, nanos_of_millisecond}; + } + + /// Converts this `Timestamp` object to millis `Timestamp` object (ignore + /// `nanos_of_millisecond`). + Timestamp ToMillisTimestamp() const { + return FromEpochMillis(millisecond_); + } + + /// Converts this `Timestamp` object to microsecond. + int64_t ToMicrosecond() const; + + /// Converts this `Timestamp` object to nanoseconds. + int64_t ToNanosecond() const { + std::chrono::nanoseconds nanosecond = std::chrono::nanoseconds(nano_of_millisecond_) + + std::chrono::milliseconds(millisecond_); + return nanosecond.count(); + } + + /// @return Whether the timestamp data is small enough to be stored in a long of + /// milliseconds. + static bool IsCompact(int32_t precision) { + return precision <= MAX_COMPACT_PRECISION; + } + + bool operator==(const Timestamp& other) const { + if (this == &other) { + return true; + } + return millisecond_ == other.millisecond_ && + nano_of_millisecond_ == other.nano_of_millisecond_; + } + + bool operator!=(const Timestamp& other) const { + return !(*this == other); + } + + bool operator<(const Timestamp& other) const { + if (millisecond_ == other.millisecond_) { + return nano_of_millisecond_ < other.nano_of_millisecond_; + } + return millisecond_ < other.millisecond_; + } + + /// Converts the Timestamp object to a string representation in UTC (GMT). + /// + /// The format of the returned string is "YYYY-MM-DD HH:MM:SS.nnnnnnnnn", + /// where the date and time are in UTC (GMT), and the nanoseconds are derived + /// from the millisecond and nanosecond parts of the timestamp. + /// + /// @note This method uses UTC (GMT) time zone when formatting the time. + /// This is different from the Java Paimon implementation, which may convert + /// the timestamp to the local time zone of the machine running the Java process. + std::string ToString() const; + + static const int32_t DEFAULT_PRECISION; + static const int32_t MILLIS_PRECISION; + static const int32_t MIN_PRECISION; + static const int32_t MAX_PRECISION; + static const int32_t MAX_COMPACT_PRECISION; + + private: + int64_t millisecond_ = 0; + int32_t nano_of_millisecond_ = 0; +}; + +} // namespace paimon diff --git a/include/paimon/logging.h b/include/paimon/logging.h new file mode 100644 index 0000000..0859d44 --- /dev/null +++ b/include/paimon/logging.h @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/visibility.h" + +namespace paimon { + +#define PAIMON_LOGGER_IMPL "logger_impl" + +enum PaimonLogLevel { + PAIMON_LOG_LEVEL_DEBUG = 0, + PAIMON_LOG_LEVEL_INFO = 1, + PAIMON_LOG_LEVEL_WARN = 2, + PAIMON_LOG_LEVEL_ERROR = 3, + PAIMON_LOG_LEVEL_NONE = 4, + PAIMON_LOG_LEVEL_MAX = 5 +}; + +#define PAIMON_LOG_V(level, logger, format, ...) \ + do { \ + if (logger->IsLevelEnabled(PAIMON_LOG_LEVEL_##level)) { \ + logger->LogV(PAIMON_LOG_LEVEL_##level, __FILE__, __LINE__, __FUNCTION__, format, \ + __VA_ARGS__); \ + } \ + } while (0) + +#define PAIMON_LOG_INFO(logger, fmt, ...) PAIMON_LOG_V(INFO, logger, fmt, __VA_ARGS__) +#define PAIMON_LOG_ERROR(logger, fmt, ...) PAIMON_LOG_V(ERROR, logger, fmt, __VA_ARGS__) +#define PAIMON_LOG_WARN(logger, fmt, ...) PAIMON_LOG_V(WARN, logger, fmt, __VA_ARGS__) +#define PAIMON_LOG_DEBUG(logger, fmt, ...) PAIMON_LOG_V(DEBUG, logger, fmt, __VA_ARGS__) + +class PAIMON_EXPORT Logger { + public: + using LoggerCreator = std::function(const std::string&)>; + + static void RegisterLogger(LoggerCreator creator); + + static std::unique_ptr GetLogger(const std::string& path); + + virtual void LogV(PaimonLogLevel level, const char* fname, int lineno, const char* function, + const char* fmt, ...) = 0; + + virtual bool IsLevelEnabled(PaimonLogLevel level) const = 0; + + virtual ~Logger() = default; + + protected: + Logger() = default; +}; + +} // namespace paimon diff --git a/src/paimon/common/data/blob_defs.h b/src/paimon/common/data/blob_defs.h new file mode 100644 index 0000000..6a478e7 --- /dev/null +++ b/src/paimon/common/data/blob_defs.h @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +namespace paimon { + +/// Blob file format constants shared between writer and reader. +/// +/// A Blob field uses the 'large_binary' type as its underlying physical storage in Apache Arrow +/// Schema, and is marked as the Paimon Blob extension type by attaching specific +/// **KeyValueMetadata**. Multiple blob fields in one paimon table are supported. +class BlobDefs { + public: + BlobDefs() = delete; + ~BlobDefs() = delete; + + /// To create a Blob field: + /// @code + /// std::unordered_map blob_metadata_map = { + /// {Blob::kExtensionTypeKey, Blob::kExtensionTypeValue} + /// }; + /// auto field = arrow::field("my_blob_field", arrow::large_binary(), false, + /// std::make_shared(blob_metadata_map)); + /// @endcode + /// Metadata key identifying a Paimon Blob extension type field. + static constexpr char kExtensionTypeKey[] = "paimon.extension.type"; + /// Metadata value identifying a Paimon Blob extension type field. + static constexpr char kExtensionTypeValue[] = "paimon.type.blob"; + + /// A bin_length value of -1 in the index indicates a null blob entry. + static constexpr int64_t kNullBinLength = -1; + /// Blob file format version. + static constexpr int8_t kFileVersion = 1; + /// Magic number identifying the start of each blob bin. + static constexpr int32_t kMagicNumber = 1481511375; + /// Offset from the start of a bin to the actual blob content (magic number size). + static constexpr int32_t kContentStartOffset = 4; + /// Total metadata length per bin: magic(4) + bin_length(8) + crc32(4) = 16. + static constexpr int32_t kTotalMetaLength = 16; + /// Blob file header length: index_len(4) + version(1) = 5. + static constexpr uint32_t kBlobFileHeaderLength = 5; +}; + +} // namespace paimon diff --git a/src/paimon/common/data/blob_descriptor.cpp b/src/paimon/common/data/blob_descriptor.cpp new file mode 100644 index 0000000..f5624c7 --- /dev/null +++ b/src/paimon/common/data/blob_descriptor.cpp @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/blob_descriptor.h" + +#include + +#include "fmt/format.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/byte_order.h" +#include "paimon/io/data_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/status.h" + +namespace paimon { + +Result> BlobDescriptor::Create(const std::string& uri, + int64_t offset, int64_t length) { + return Create(kCurrentVersion, uri, offset, length); +} + +Result> BlobDescriptor::Create(int8_t version, + const std::string& uri, + int64_t offset, int64_t length) { + if (offset < 0) { + return Status::Invalid(fmt::format("offset {} is less than 0", offset)); + } + // length == -1 means it's dynamic length + if (length < -1) { + return Status::Invalid(fmt::format("length {} is less than -1", length)); + } + return std::unique_ptr(new BlobDescriptor(version, uri, offset, length)); +} + +PAIMON_UNIQUE_PTR BlobDescriptor::Serialize(const std::shared_ptr& pool) const { + MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + out.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN); + out.WriteValue(version_); + out.WriteValue(kMagic); + out.WriteValue(static_cast(uri_.size())); + + auto uri_bytes = std::make_shared(uri_, pool.get()); + out.WriteBytes(uri_bytes); + out.WriteValue(offset_); + out.WriteValue(length_); + return MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); +} + +Result> BlobDescriptor::Deserialize(const char* buffer, + uint64_t size) { + auto input_stream = std::make_shared(buffer, size); + DataInputStream in(std::move(input_stream)); + in.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN); + PAIMON_ASSIGN_OR_RAISE(int8_t version, in.ReadValue()); + if (version > kCurrentVersion) { + return Status::Invalid(fmt::format( + "Expecting BlobDescriptor version to be less than or equal to {}, but found {}.", + kCurrentVersion, version)); + } + if (version > 1) { + PAIMON_ASSIGN_OR_RAISE(int64_t magic, in.ReadValue()); + if (kMagic != magic) { + return Status::Invalid(fmt::format( + "Invalid BlobDescriptor: missing magic header. Expected magic: {}, but found {}", + kMagic, magic)); + } + } + PAIMON_ASSIGN_OR_RAISE(int32_t uri_length, in.ReadValue()); + std::string uri(uri_length, '\0'); + PAIMON_RETURN_NOT_OK(in.Read(uri.data(), uri.size())); + PAIMON_ASSIGN_OR_RAISE(int64_t offset, in.ReadValue()); + PAIMON_ASSIGN_OR_RAISE(int64_t length, in.ReadValue()); + return BlobDescriptor::Create(version, uri, offset, length); +} + +Result BlobDescriptor::IsBlobDescriptor(const char* buffer, uint64_t size) { + if (size < kMinDescriptorLength) { + return false; + } + auto input_stream = std::make_shared(buffer, size); + DataInputStream in(std::move(input_stream)); + in.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN); + PAIMON_ASSIGN_OR_RAISE(int8_t version, in.ReadValue()); + if (version > kCurrentVersion) { + return false; + } + PAIMON_ASSIGN_OR_RAISE(int64_t magic, in.ReadValue()); + return kMagic == magic; +} + +std::string BlobDescriptor::ToString() const { + return fmt::format("BlobDescriptor{{version={}, uri='{}', offset={}, length={}}}", version_, + uri_, offset_, length_); +} + +} // namespace paimon diff --git a/src/paimon/common/data/blob_descriptor.h b/src/paimon/common/data/blob_descriptor.h new file mode 100644 index 0000000..6664b85 --- /dev/null +++ b/src/paimon/common/data/blob_descriptor.h @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" + +namespace paimon { +/// Blob descriptor to describe a blob reference. +/// Memory Layout Description: All multi-byte numerical values (int/long) are stored using Little +/// Endian byte order. +/// +/// | Offset | Field Name | Type | Size | +/// |--------|---------------|-----------|------| +/// | 0 | version | byte | 1 | +/// | 1 | magic_number | long | 8 | +/// | 9 | uri_length | int | 4 | +/// | 13 | uri_bytes | byte[N] | N | +/// | 13 + N | offset | long | 8 | +/// | 21 + N | length | long | 8 | + +class BlobDescriptor { + public: + static Result> Create(const std::string& uri, int64_t offset, + int64_t length); + + static Result> Create(int8_t version, const std::string& uri, + int64_t offset, int64_t length); + + static Result> Deserialize(const char* buffer, uint64_t size); + + static Result IsBlobDescriptor(const char* buffer, uint64_t size); + + PAIMON_UNIQUE_PTR Serialize(const std::shared_ptr& pool) const; + + std::string ToString() const; + + int8_t Version() const { + return version_; + } + + const std::string& Uri() const { + return uri_; + } + + int64_t Offset() const { + return offset_; + } + + int64_t Length() const { + return length_; + } + + private: + BlobDescriptor(int8_t version, const std::string& uri, int64_t offset, int64_t length) + : version_(version), uri_(uri), offset_(offset), length_(length) {} + + private: + static constexpr int64_t kMagic = 0x424C4F4244455343l; + /// one byte for version, eight bytes for magic number. + static constexpr uint64_t kMinDescriptorLength = 9; + static constexpr int8_t kCurrentVersion = 2; + + const int8_t version_ = kCurrentVersion; + std::string uri_; + int64_t offset_ = 0; + int64_t length_ = -1; +}; + +} // namespace paimon diff --git a/src/paimon/common/data/blob_descriptor_test.cpp b/src/paimon/common/data/blob_descriptor_test.cpp new file mode 100644 index 0000000..a5221bf --- /dev/null +++ b/src/paimon/common/data/blob_descriptor_test.cpp @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/blob_descriptor.h" + +#include +#include + +#include "gtest/gtest.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class BlobDescriptorTest : public testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(descriptor_, + BlobDescriptor::Create("test_uri", /*offset=*/1024, /*length=*/2048)); + } + + private: + std::shared_ptr pool_; + std::unique_ptr descriptor_; +}; + +TEST_F(BlobDescriptorTest, TestConstructorAndGetters) { + ASSERT_EQ(descriptor_->Uri(), "test_uri"); + ASSERT_EQ(descriptor_->Offset(), 1024); + ASSERT_EQ(descriptor_->Length(), 2048); +} + +TEST_F(BlobDescriptorTest, TestDeserializeCompatibilityForJavaWithVersion1) { + std::vector bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117, 114, 105, 0, 4, + 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0}; + auto java_serialized = std::string(bytes.data(), bytes.size()); + + ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Deserialize(java_serialized.data(), + java_serialized.size())); + ASSERT_EQ(descriptor->Version(), (int8_t)1); + ASSERT_EQ(descriptor->Uri(), "test_uri"); + ASSERT_EQ(descriptor->Offset(), 1024); + ASSERT_EQ(descriptor->Length(), 2048); +} + +TEST_F(BlobDescriptorTest, TestDeserializeCompatibilityForJavaWithVersion2) { + std::vector bytes = {2, 67, 83, 69, 68, 66, 79, 76, 66, 8, 0, 0, 0, + 116, 101, 115, 116, 95, 117, 114, 105, 0, 4, 0, 0, 0, + 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0}; + auto java_serialized = std::string(bytes.data(), bytes.size()); + + ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Deserialize(java_serialized.data(), + java_serialized.size())); + ASSERT_EQ(descriptor->Version(), (int8_t)2); + ASSERT_EQ(descriptor->Uri(), "test_uri"); + ASSERT_EQ(descriptor->Offset(), 1024); + ASSERT_EQ(descriptor->Length(), 2048); + + PAIMON_UNIQUE_PTR cpp_serialized = descriptor->Serialize(pool_); + auto cpp_serialized_string = std::string(cpp_serialized->data(), cpp_serialized->size()); + ASSERT_EQ(cpp_serialized_string, java_serialized); +} + +TEST_F(BlobDescriptorTest, TestSerializeDeserializeWithEmptyUri) { + ASSERT_OK_AND_ASSIGN(std::unique_ptr empty_uri_descriptor, + BlobDescriptor::Create(/*uri=*/"", /*offset=*/0, /*length=*/0)); + auto serialized = empty_uri_descriptor->Serialize(pool_); + ASSERT_OK_AND_ASSIGN(auto restored_descriptor, + BlobDescriptor::Deserialize(serialized->data(), serialized->size())); + + ASSERT_EQ(restored_descriptor->Uri(), ""); + ASSERT_EQ(restored_descriptor->Offset(), 0); + ASSERT_EQ(restored_descriptor->Length(), 0); +} + +TEST_F(BlobDescriptorTest, TestSerializeDeserializeWithDynamicLength) { + ASSERT_OK_AND_ASSIGN(std::unique_ptr empty_uri_descriptor, + BlobDescriptor::Create("test_uri", /*offset=*/0, /*length=*/-1)); + auto serialized = empty_uri_descriptor->Serialize(pool_); + ASSERT_OK_AND_ASSIGN(auto restored_descriptor, + BlobDescriptor::Deserialize(serialized->data(), serialized->size())); + + ASSERT_EQ(restored_descriptor->Uri(), "test_uri"); + ASSERT_EQ(restored_descriptor->Offset(), 0); + ASSERT_EQ(restored_descriptor->Length(), -1); +} + +TEST_F(BlobDescriptorTest, TestInvalidParameters) { + // Test deserialize invalid version + { + ASSERT_OK_AND_ASSIGN(std::unique_ptr descriptor, + BlobDescriptor::Create(/*uri=*/"test", /*offset=*/1, /*length=*/2)); + auto serialized = descriptor->Serialize(pool_); + (*serialized)[0] = '\x03'; + ASSERT_NOK_WITH_MSG( + BlobDescriptor::Deserialize(serialized->data(), serialized->size()), + "Expecting BlobDescriptor version to be less than or equal to 2, but found 3"); + } + // Test deserialize invalid buffer size + { + ASSERT_OK_AND_ASSIGN(std::unique_ptr descriptor, + BlobDescriptor::Create(/*uri=*/"test", /*offset=*/1, /*length=*/2)); + auto serialized = descriptor->Serialize(pool_); + ASSERT_NOK(BlobDescriptor::Deserialize(serialized->data(), /*size=*/5)); + } + // Test invalid offset + { + ASSERT_NOK_WITH_MSG(BlobDescriptor::Create(/*uri=*/"test", /*offset=*/-1, /*length=*/2), + "offset -1 is less than 0"); + } + // Test invalid length + { + ASSERT_NOK_WITH_MSG(BlobDescriptor::Create(/*uri=*/"test", /*offset=*/1, /*length=*/-2), + "length -2 is less than -1"); + } +} + +TEST_F(BlobDescriptorTest, TestToString) { + std::string debug_str = descriptor_->ToString(); + ASSERT_FALSE(debug_str.empty()); + ASSERT_TRUE(debug_str.find("version=2") != std::string::npos); + ASSERT_TRUE(debug_str.find("uri='test_uri'") != std::string::npos); + ASSERT_TRUE(debug_str.find("offset=1024") != std::string::npos); + ASSERT_TRUE(debug_str.find("length=2048") != std::string::npos); +} + +TEST_F(BlobDescriptorTest, TestRoundTripConsistency) { + auto first_serialized = descriptor_->Serialize(pool_); + ASSERT_OK_AND_ASSIGN( + auto first_restored, + BlobDescriptor::Deserialize(first_serialized->data(), first_serialized->size())); + auto second_serialized = first_restored->Serialize(pool_); + ASSERT_EQ(*first_serialized, *second_serialized); + + ASSERT_OK_AND_ASSIGN( + auto second_restored, + BlobDescriptor::Deserialize(second_serialized->data(), second_serialized->size())); + ASSERT_EQ(second_restored->Uri(), "test_uri"); + ASSERT_EQ(second_restored->Offset(), 1024); + ASSERT_EQ(second_restored->Length(), 2048); +} + +TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithValidDescriptor) { + // A valid v2 descriptor should be recognized + auto serialized = descriptor_->Serialize(pool_); + ASSERT_OK_AND_ASSIGN(bool result, + BlobDescriptor::IsBlobDescriptor(serialized->data(), serialized->size())); + ASSERT_TRUE(result); +} + +TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithTooShortBuffer) { + // Buffer shorter than 9 bytes should return false + std::vector short_buffer = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 0x4F, 0x4C}; + ASSERT_OK_AND_ASSIGN( + bool result, BlobDescriptor::IsBlobDescriptor(short_buffer.data(), short_buffer.size())); + ASSERT_FALSE(result); + + // Empty buffer + ASSERT_OK_AND_ASSIGN(bool empty_result, BlobDescriptor::IsBlobDescriptor(nullptr, 0)); + ASSERT_FALSE(empty_result); +} + +TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithFutureVersion) { + // Version > CURRENT_VERSION should return false (not an error) + auto serialized = descriptor_->Serialize(pool_); + (*serialized)[0] = '\x03'; // set version to 3 (> CURRENT_VERSION) + ASSERT_OK_AND_ASSIGN(bool result, + BlobDescriptor::IsBlobDescriptor(serialized->data(), serialized->size())); + ASSERT_FALSE(result); +} + +TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithWrongMagic) { + // Wrong magic number should return false + auto serialized = descriptor_->Serialize(pool_); + // Corrupt the magic bytes (bytes 1-8) + (*serialized)[1] = '\x00'; + (*serialized)[2] = '\x00'; + ASSERT_OK_AND_ASSIGN(bool result, + BlobDescriptor::IsBlobDescriptor(serialized->data(), serialized->size())); + ASSERT_FALSE(result); +} + +TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithRandomData) { + // Random data that doesn't match blob descriptor format + std::vector random_data = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09}; + ASSERT_OK_AND_ASSIGN(bool result, + BlobDescriptor::IsBlobDescriptor(random_data.data(), random_data.size())); + ASSERT_FALSE(result); +} + +TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithVersion1Data) { + // v1 data: version=1, followed by uri_length (not magic), should return false + // because reading bytes 1-8 as magic won't match MAGIC constant + std::vector v1_bytes = {1, 8, 0, 0, 0, 116, 101, 115, 116, 95, 117, 114, 105, 0, 4, + 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0}; + ASSERT_OK_AND_ASSIGN(bool result, + BlobDescriptor::IsBlobDescriptor(v1_bytes.data(), v1_bytes.size())); + ASSERT_FALSE(result); +} + +TEST_F(BlobDescriptorTest, TestIsBlobDescriptorWithExactly9Bytes) { + // Exactly 9 bytes with valid version and magic should return true + // version=2, magic=0x424C4F4244455343 in little-endian + std::vector minimal = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 0x4F, 0x4C, 0x42}; + ASSERT_OK_AND_ASSIGN(bool result, + BlobDescriptor::IsBlobDescriptor(minimal.data(), minimal.size())); + ASSERT_TRUE(result); +} + +} // namespace paimon::test diff --git a/src/paimon/common/data/blob_test.cpp b/src/paimon/common/data/blob_test.cpp new file mode 100644 index 0000000..4c95711 --- /dev/null +++ b/src/paimon/common/data/blob_test.cpp @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/data/blob.h" + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "gtest/gtest.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class BlobTest : public ::testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + dir_ = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir_); + uri_ = dir_->Str() + "/file.blob"; + file_system_ = std::make_shared(); + ASSERT_OK(file_system_->WriteFile(uri_, "abcdefghijklmn", true)); + } + void TearDown() override {} + + private: + std::shared_ptr pool_; + std::unique_ptr dir_; + std::shared_ptr file_system_; + std::string uri_; +}; + +TEST_F(BlobTest, TestSimple) { + ASSERT_OK_AND_ASSIGN(std::unique_ptr blob, Blob::FromPath(uri_)); + auto serialized = blob->ToDescriptor(pool_); + + ASSERT_OK_AND_ASSIGN(auto restored_blob, + Blob::FromDescriptor(serialized->data(), serialized->size())); + ASSERT_EQ(*restored_blob->ToDescriptor(pool_), *serialized); + ASSERT_OK_AND_ASSIGN(auto input_stream, restored_blob->NewInputStream(file_system_)); + ASSERT_EQ(uri_, restored_blob->Uri()); + std::string str(14, '\0'); + ASSERT_OK(input_stream->Read(str.data(), str.size())); + ASSERT_EQ("abcdefghijklmn", str); +} + +TEST_F(BlobTest, TestInvalidParameters) { + // Test null file system in NewInputStream + ASSERT_OK_AND_ASSIGN(std::unique_ptr blob, Blob::FromPath(uri_)); + ASSERT_NOK_WITH_MSG(blob->NewInputStream(nullptr), "file system is nullptr"); + + // Test invalid descriptor + std::string invalid_bytes = "invalid_descriptor_bytes"; + ASSERT_NOK(Blob::FromDescriptor(invalid_bytes.data(), invalid_bytes.size())); +} + +TEST_F(BlobTest, TestRoundTripConsistency) { + ASSERT_OK_AND_ASSIGN(std::unique_ptr original_blob, + Blob::FromPath(uri_, /*offset=*/1, /*length=*/5)); + + auto first_descriptor = original_blob->ToDescriptor(pool_); + ASSERT_OK_AND_ASSIGN(auto restored_blob, + Blob::FromDescriptor(first_descriptor->data(), first_descriptor->size())); + + auto second_descriptor = restored_blob->ToDescriptor(pool_); + ASSERT_EQ(*first_descriptor, *second_descriptor); + + // Verify scheme consistency + ASSERT_EQ(original_blob->Uri(), restored_blob->Uri()); +} + +TEST_F(BlobTest, TestInputStreamCreation) { + ASSERT_OK_AND_ASSIGN(std::unique_ptr blob, Blob::FromPath(uri_)); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(file_system_)); + ASSERT_TRUE(input_stream); + + // Test reading from the input stream + std::string buffer(14, '\0'); + ASSERT_OK_AND_ASSIGN(auto bytes_read, input_stream->Read(buffer.data(), buffer.size())); + ASSERT_EQ(14, bytes_read); + ASSERT_EQ("abcdefghijklmn", buffer); +} + +TEST_F(BlobTest, TestBoundaryConditions) { + // Test with empty path + ASSERT_NOK_WITH_MSG(Blob::FromPath(""), "path is an empty string"); + + // Test with very large offset and length values + ASSERT_OK_AND_ASSIGN(std::unique_ptr blob, + Blob::FromPath(uri_, /*offset=*/INT64_MAX, /*length=*/1)); + auto serialized = blob->ToDescriptor(pool_); + + ASSERT_OK_AND_ASSIGN(auto restored_blob, + Blob::FromDescriptor(serialized->data(), serialized->size())); + ASSERT_EQ(*restored_blob->ToDescriptor(pool_), *serialized); +} + +TEST_F(BlobTest, TestNewInputStreamWithOffsetAndLength) { + // Create blob with offset=2, length=6 (should read "cdefgh") + ASSERT_OK_AND_ASSIGN(std::unique_ptr blob, + Blob::FromPath(uri_, /*offset=*/2, /*length=*/6)); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(file_system_)); + ASSERT_TRUE(input_stream); + + ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length()); + ASSERT_EQ(6, length); + + // Test reading with offset and length applied + std::string buffer(6, '\0'); + ASSERT_OK_AND_ASSIGN(auto bytes_read, input_stream->Read(buffer.data(), buffer.size())); + ASSERT_EQ(6, bytes_read); + ASSERT_EQ("cdefgh", buffer); +} + +TEST_F(BlobTest, TestNewInputStreamWithDynamicLength) { + // Create blob with offset=2, dynamic length (-1 means read to end) + ASSERT_OK_AND_ASSIGN(std::unique_ptr blob, + Blob::FromPath(uri_, /*offset=*/2, /*length=*/-1)); + ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(file_system_)); + ASSERT_TRUE(input_stream); + + ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length()); + ASSERT_EQ(12, length); + + // Test reading from offset to end (should read "cdefghijklmn") + std::string buffer(12, '\0'); + ASSERT_OK_AND_ASSIGN(auto bytes_read, input_stream->Read(buffer.data(), buffer.size())); + ASSERT_EQ(12, bytes_read); + ASSERT_EQ("cdefghijklmn", buffer); +} + +TEST_F(BlobTest, TestArrowField) { + { + // basic: field name, non-nullable by default + ASSERT_OK_AND_ASSIGN(auto schema, Blob::ArrowField("my_blob")); + ASSERT_NE(schema, nullptr); + + // import back to arrow::Field to verify + auto field_result = arrow::ImportField(schema.get()); + ASSERT_TRUE(field_result.ok()); + auto field = field_result.ValueUnsafe(); + + ASSERT_EQ(field->name(), "my_blob"); + ASSERT_EQ(field->type()->id(), arrow::Type::LARGE_BINARY); + ASSERT_FALSE(field->nullable()); + ASSERT_TRUE(field->HasMetadata()); + auto extension_type = field->metadata()->Get("paimon.extension.type"); + ASSERT_TRUE(extension_type.ok()); + ASSERT_EQ(extension_type.ValueUnsafe(), "paimon.type.blob"); + } + { + // with custom metadata + std::unordered_map custom_metadata = { + {"custom_key", "custom_value"}}; + ASSERT_OK_AND_ASSIGN(auto schema, Blob::ArrowField("meta_blob", custom_metadata)); + auto field = arrow::ImportField(schema.get()).ValueUnsafe(); + ASSERT_EQ(field->name(), "meta_blob"); + ASSERT_FALSE(field->nullable()); + ASSERT_TRUE(field->HasMetadata()); + // blob extension metadata should be present + auto extension_type = field->metadata()->Get("paimon.extension.type"); + ASSERT_TRUE(extension_type.ok()); + ASSERT_EQ(extension_type.ValueUnsafe(), "paimon.type.blob"); + // custom metadata should also be present + auto custom_val = field->metadata()->Get("custom_key"); + ASSERT_TRUE(custom_val.ok()); + ASSERT_EQ(custom_val.ValueUnsafe(), "custom_value"); + } +} + +} // namespace paimon::test diff --git a/src/paimon/common/data/blob_utils.cpp b/src/paimon/common/data/blob_utils.cpp new file mode 100644 index 0000000..9eb716a --- /dev/null +++ b/src/paimon/common/data/blob_utils.cpp @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/blob_utils.h" + +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_nested.h" +#include "arrow/type.h" +#include "paimon/common/data/blob_defs.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/string_utils.h" + +namespace arrow { +class Array; +} + +namespace paimon { + +BlobUtils::SeparatedSchemas BlobUtils::SeparateBlobSchema( + const std::shared_ptr& schema) { + std::vector> remaining_fields; + std::vector> blob_fields; + for (auto i = 0; i < schema->num_fields(); i++) { + auto field = schema->field(i); + if (IsBlobField(field)) { + blob_fields.emplace_back(field); + } else { + remaining_fields.emplace_back(field); + } + } + SeparatedSchemas result; + result.main_schema = arrow::schema(remaining_fields); + result.blob_schema = arrow::schema(blob_fields); + return result; +} + +Result BlobUtils::SeparateBlobArray( + const std::shared_ptr& struct_array) { + std::shared_ptr old_type = + std::static_pointer_cast(struct_array->type()); + const auto& old_fields = old_type->fields(); + const auto& old_arrays = struct_array->fields(); + + std::vector> remaining_fields; + std::vector> remaining_arrays; + std::vector> blob_fields; + std::vector> blob_arrays; + + for (size_t i = 0; i < old_fields.size(); i++) { + if (IsBlobField(old_fields[i])) { + blob_fields.push_back(old_fields[i]); + blob_arrays.push_back(old_arrays[i]); + } else { + remaining_fields.push_back(old_fields[i]); + remaining_arrays.push_back(old_arrays[i]); + } + } + + SeparatedStructArrays result; + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(result.main_array, + arrow::StructArray::Make(remaining_arrays, remaining_fields)); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(result.blob_array, + arrow::StructArray::Make(blob_arrays, blob_fields)); + return result; +} + +bool BlobUtils::IsBlobField(const std::shared_ptr& field) { + const auto& type = field->type(); + if (type->id() != arrow::Type::LARGE_BINARY) { + return false; + } + if (!field->HasMetadata()) { + return false; + } + return IsBlobMetadata(field->metadata()); +} + +bool BlobUtils::IsBlobMetadata(const std::shared_ptr& metadata) { + if (!metadata) { + return false; + } + auto extension_name = metadata->Get(BlobDefs::kExtensionTypeKey); + if (!extension_name.ok()) { + return false; + } + return extension_name.ValueUnsafe() == BlobDefs::kExtensionTypeValue; +} + +bool BlobUtils::IsBlobFile(const std::string& file_name) { + return StringUtils::EndsWith(file_name, ".blob"); +} + +std::shared_ptr BlobUtils::ToArrowField( + const std::string& field_name, bool nullable, + std::unordered_map metadata) { + metadata[BlobDefs::kExtensionTypeKey] = BlobDefs::kExtensionTypeValue; + return arrow::field(field_name, arrow::large_binary(), nullable, + std::make_shared(metadata)); +} +} // namespace paimon diff --git a/src/paimon/common/data/blob_utils.h b/src/paimon/common/data/blob_utils.h new file mode 100644 index 0000000..505cad5 --- /dev/null +++ b/src/paimon/common/data/blob_utils.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/visibility.h" + +namespace arrow { +class Field; +class KeyValueMetadata; +class Schema; +class StructArray; +} // namespace arrow + +namespace paimon { +/// Utils for blob type. +class PAIMON_EXPORT BlobUtils { + public: + BlobUtils() = delete; + ~BlobUtils() = delete; + + struct SeparatedSchemas { + /// Non-blob fields + std::shared_ptr main_schema; + /// Blob fields only + std::shared_ptr blob_schema; + }; + + struct SeparatedStructArrays { + /// Non-blob fields + std::shared_ptr main_array; + /// Blob fields only + std::shared_ptr blob_array; + }; + + static SeparatedSchemas SeparateBlobSchema(const std::shared_ptr& schema); + + static Result SeparateBlobArray( + const std::shared_ptr& struct_array); + + static bool IsBlobField(const std::shared_ptr& field); + static bool IsBlobMetadata(const std::shared_ptr& metadata); + static bool IsBlobFile(const std::string& file_name); + + static std::shared_ptr ToArrowField( + const std::string& field_name, bool nullable = false, + std::unordered_map metadata = {}); +}; + +} // namespace paimon diff --git a/src/paimon/common/data/blob_utils_test.cpp b/src/paimon/common/data/blob_utils_test.cpp new file mode 100644 index 0000000..d042ecd --- /dev/null +++ b/src/paimon/common/data/blob_utils_test.cpp @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/blob_utils.h" + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_defs.h" +#include "paimon/data/blob.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class BlobUtilsTest : public ::testing::Test { + private: + std::shared_ptr CreateBlobMetadata() { + std::unordered_map blob_metadata_map = { + {BlobDefs::kExtensionTypeKey, BlobDefs::kExtensionTypeValue}}; + return std::make_shared(blob_metadata_map); + } +}; + +TEST_F(BlobUtilsTest, IsBlobMetadata) { + auto correct_metadata = CreateBlobMetadata(); + EXPECT_TRUE(BlobUtils::IsBlobMetadata(correct_metadata)); + EXPECT_FALSE(BlobUtils::IsBlobMetadata(nullptr)); + std::unordered_map wrong_metadata_map = { + {BlobDefs::kExtensionTypeKey, "paimon.type.varchar"}}; + auto wrong_metadata = std::make_shared(wrong_metadata_map); + EXPECT_FALSE(BlobUtils::IsBlobMetadata(wrong_metadata)); + std::unordered_map no_extension_metadata_map = { + {"other_key", BlobDefs::kExtensionTypeValue}}; + auto no_extension_metadata = + std::make_shared(no_extension_metadata_map); + EXPECT_FALSE(BlobUtils::IsBlobMetadata(no_extension_metadata)); +} + +TEST_F(BlobUtilsTest, IsBlobField) { + std::shared_ptr blob_field = BlobUtils::ToArrowField("f1", true); + EXPECT_TRUE(BlobUtils::IsBlobField(blob_field)); + + auto int_field = arrow::field("i_int", arrow::int32()); + EXPECT_FALSE(BlobUtils::IsBlobField(int_field)); + + auto binary_field_no_meta = arrow::field("b_no_meta", arrow::large_binary()); + EXPECT_FALSE(BlobUtils::IsBlobField(binary_field_no_meta)); + + auto wrong_meta = std::make_shared( + std::unordered_map{{"other_key", "value"}}); + auto binary_field_wrong_meta = + arrow::field("b_wrong_meta", arrow::large_binary(), false, wrong_meta); + EXPECT_FALSE(BlobUtils::IsBlobField(binary_field_wrong_meta)); +} + +TEST_F(BlobUtilsTest, SeparateBlobSchema) { + auto int_field = arrow::field("f1_int", arrow::int32()); + auto string_field = arrow::field("f2_string", arrow::utf8()); + std::shared_ptr blob_field_1 = BlobUtils::ToArrowField("f3_blob_1", true); + { + std::shared_ptr original_schema = + arrow::schema({int_field, string_field, blob_field_1}); + + BlobUtils::SeparatedSchemas schemas = BlobUtils::SeparateBlobSchema(original_schema); + + std::shared_ptr expected_main_schema = + arrow::schema({int_field, string_field}); + ASSERT_TRUE(schemas.main_schema->Equals(*expected_main_schema)); + + std::shared_ptr expected_blob_schema = arrow::schema({blob_field_1}); + ASSERT_TRUE(schemas.blob_schema->Equals(*expected_blob_schema)); + } + { + std::shared_ptr no_blob_schema = arrow::schema({int_field, string_field}); + BlobUtils::SeparatedSchemas no_blob_schemas = BlobUtils::SeparateBlobSchema(no_blob_schema); + ASSERT_TRUE(no_blob_schemas.main_schema->Equals(*no_blob_schema)); + ASSERT_EQ(no_blob_schemas.blob_schema->num_fields(), 0); + } + { + std::shared_ptr only_blob_schema = arrow::schema({blob_field_1}); + BlobUtils::SeparatedSchemas only_blob_schemas = + BlobUtils::SeparateBlobSchema(only_blob_schema); + ASSERT_TRUE(only_blob_schemas.blob_schema->Equals(*only_blob_schema)); + ASSERT_EQ(only_blob_schemas.main_schema->num_fields(), 0); + } +} + +TEST_F(BlobUtilsTest, SeparateBlobArray) { + auto int_field = arrow::field("f1_int", arrow::int32()); + std::shared_ptr blob_field = BlobUtils::ToArrowField("f2_blob", false); + auto string_field = arrow::field("f3_string", arrow::utf8()); + auto schema = arrow::schema({int_field, blob_field, string_field}); + + arrow::Int32Builder int_builder; + ASSERT_TRUE(int_builder.AppendValues({1, 2, 3}).ok()); + auto int_array = int_builder.Finish().ValueOrDie(); + + arrow::StringBuilder string_builder; + ASSERT_TRUE(string_builder.AppendValues({"a", "b", "c"}).ok()); + auto string_array = string_builder.Finish().ValueOrDie(); + + arrow::LargeBinaryBuilder blob_builder; + ASSERT_TRUE(blob_builder.Append("1", 1).ok()); + ASSERT_TRUE(blob_builder.Append("2", 1).ok()); + ASSERT_TRUE(blob_builder.Append("3", 1).ok()); + auto blob_array_data = blob_builder.Finish().ValueOrDie(); + + auto raw_struct_array = + arrow::StructArray::Make({int_array, blob_array_data, string_array}, schema->fields()) + .ValueOrDie(); + + std::shared_ptr struct_array = + std::static_pointer_cast(raw_struct_array); + + ASSERT_OK_AND_ASSIGN(auto separated, BlobUtils::SeparateBlobArray(struct_array)); + + std::shared_ptr expected_main_type = arrow::struct_({int_field, string_field}); + ASSERT_TRUE(separated.main_array->type()->Equals(*expected_main_type)); + ASSERT_EQ(separated.main_array->num_fields(), 2); + ASSERT_TRUE(separated.main_array->field(0)->Equals(*int_array)); + ASSERT_TRUE(separated.main_array->field(1)->Equals(*string_array)); + + std::shared_ptr expected_blob_type = arrow::struct_({blob_field}); + ASSERT_TRUE(separated.blob_array->type()->Equals(*expected_blob_type)); + ASSERT_EQ(separated.blob_array->num_fields(), 1); + ASSERT_TRUE(separated.blob_array->field(0)->Equals(*blob_array_data)); +} + +} // namespace paimon::test diff --git a/src/paimon/common/data/decimal.cpp b/src/paimon/common/data/decimal.cpp new file mode 100644 index 0000000..c05982c --- /dev/null +++ b/src/paimon/common/data/decimal.cpp @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/data/decimal.h" + +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/scalar.h" +#include "arrow/util/decimal.h" +#include "paimon/io/byte_order.h" +#include "paimon/memory/bytes.h" + +namespace paimon { +const int64_t Decimal::POWERS_OF_TEN[MAX_COMPACT_PRECISION + 1] = {1, + 10, + 100, + 1000, + 10000, + 100000, + 1000000l, + 10000000l, + 100000000l, + 1000000000l, + 10000000000l, + 100000000000l, + 1000000000000l, + 10000000000000l, + 100000000000000l, + 1000000000000000l, + 10000000000000000l, + 100000000000000000l, + 1000000000000000000l}; +const Decimal::int128_t Decimal::INT128_MAXIMUM_VALUE = + static_cast(0x7fffffffffffffff) << 64 | 0xffffffffffffffff; +const Decimal::int128_t Decimal::INT128_MINIMUM_VALUE = + static_cast(0x8000000000000000) << 64; + +std::string Decimal::ToString() const { + auto type = arrow::decimal128(Precision(), Scale()); + arrow::Decimal128Scalar scalar(arrow::Decimal128(HighBits(), LowBits()), type); + return scalar.ToString(); +} + +std::vector Decimal::ToUnscaledBytes() const { + bool positive = value_ >= 0; + int32_t valid_bytes = 0; + if (positive) { + int32_t leading_zero_bytes = count_leading_zero_bytes(value_); + valid_bytes = sizeof(value_) - leading_zero_bytes; + } else { + int32_t leading_all_ones_bytes = count_leading_all_ones_bytes(value_); + valid_bytes = sizeof(value_) - leading_all_ones_bytes; + } + if (valid_bytes == 0) { + // if value_ == 0, return one byte with 0; + // if value_ == 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF, return one byte with 0xFF + valid_bytes = 1; + } else { + // java BigInteger use highest significant bit to determine if the number is positive or + // negative, e.g., a positive BigInteger 0xFF, will return two bytes [0x00, 0xFF] + bool highest_significant_bit = + (static_cast(value_) >> ((valid_bytes - 1) * 8)) & 0x80; + if ((positive && highest_significant_bit) || (!positive && !highest_significant_bit)) { + valid_bytes += 1; + } + } + std::vector bytes(valid_bytes); + memcpy(bytes.data(), &value_, valid_bytes); + if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) { + std::reverse(bytes.data(), bytes.data() + bytes.size()); + } + return bytes; +} + +Decimal Decimal::FromUnscaledBytes(int32_t precision, int32_t scale, Bytes* bytes) { + if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) { + std::reverse(bytes->data(), bytes->data() + bytes->size()); + } + Decimal::int128_t value = 0; + for (size_t i = 0; i < bytes->size(); ++i) { + value |= static_cast(static_cast((*bytes)[i])) << (8 * i); + } + // for negative + if ((*bytes)[bytes->size() - 1] & 0x80) { + for (size_t i = bytes->size(); i < sizeof(Decimal::int128_t); ++i) { + value |= static_cast(0xFFull) << (8 * i); + } + } + return Decimal(precision, scale, value); +} + +int32_t Decimal::clz_u128(uint128_t u) { + uint64_t hi = u >> 64; + uint64_t lo = u; + int32_t retval[3] = {__builtin_clzll(hi), __builtin_clzll(lo) + 64, 128}; + int32_t idx = !hi + ((!lo) & (!hi)); + return retval[idx]; +} + +int32_t Decimal::count_leading_zero_bytes(uint128_t u) { + if (u == 0) { + return sizeof(u); + } + int32_t leading_zeros = clz_u128(u); + return leading_zeros / 8; +} + +int32_t Decimal::count_leading_all_ones_bytes(uint128_t u) { + if (u == 0) { + return 0; + } + int32_t count = 0; + for (int32_t i = sizeof(uint128_t) - 1; i >= 0; i--) { + if (((u >> (i * 8)) & 0xFF) == 0xFF) { + count++; + } else { + break; + } + } + return count; +} + +Decimal::int128_t Decimal::DownScaleInt128(Decimal::int128_t value, int32_t scale) { + while (scale > 0) { + int32_t step = std::min(std::abs(scale), MAX_COMPACT_PRECISION); + value /= POWERS_OF_TEN[step]; + scale -= step; + } + return value; +} + +Decimal::int128_t Decimal::ScaleInt128(Decimal::int128_t value, int32_t scale, bool* overflow) { + *overflow = false; + while (scale > 0) { + int32_t step = std::min(scale, MAX_COMPACT_PRECISION); + if (value > 0 && INT128_MAXIMUM_VALUE / POWERS_OF_TEN[step] < value) { + *overflow = true; + return INT128_MAXIMUM_VALUE; + } else if (value < 0 && INT128_MINIMUM_VALUE / POWERS_OF_TEN[step] > value) { + *overflow = true; + return INT128_MINIMUM_VALUE; + } + + value *= POWERS_OF_TEN[step]; + scale -= step; + } + return value; +} + +int32_t Decimal::CompareTo(const Decimal& other) const { + auto l_value = value_; + auto l_scale = scale_; + auto r_value = other.value_; + auto r_scale = other.scale_; + + bool l_positive = l_value >= 0; + bool r_positive = r_value >= 0; + if (l_positive && !r_positive) { + return 1; + } else if (!l_positive && r_positive) { + return -1; + } + + // compare integral parts + Decimal::int128_t l_integral = DownScaleInt128(l_value, l_scale); + Decimal::int128_t r_integral = DownScaleInt128(r_value, r_scale); + + if (l_integral < r_integral) { + return -1; + } else if (l_integral > r_integral) { + return 1; + } + + // integral parts are equal, continue comparing fractional parts + // unnecessary to check overflow here because the scaled number will not + // exceed original ones + bool overflow = false, positive = l_value >= 0; + l_value -= ScaleInt128(l_integral, l_scale, &overflow); + r_value -= ScaleInt128(r_integral, r_scale, &overflow); + + int32_t diff = l_scale - r_scale; + if (diff > 0) { + r_value = ScaleInt128(r_value, diff, &overflow); + if (overflow) { + return positive ? -1 : 1; + } + } else { + l_value = ScaleInt128(l_value, -diff, &overflow); + if (overflow) { + return positive ? 1 : -1; + } + } + + if (l_value < r_value) { + return -1; + } else if (l_value > r_value) { + return 1; + } else { + return 0; + } +} + +} // namespace paimon diff --git a/src/paimon/common/data/decimal_test.cpp b/src/paimon/common/data/decimal_test.cpp new file mode 100644 index 0000000..038b3d3 --- /dev/null +++ b/src/paimon/common/data/decimal_test.cpp @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/data/decimal.h" + +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/utils/decimal_utils.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/data_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(DecimalTest, TestSimple) { + auto CheckResult = [](const Decimal& decimal, const std::vector& bytes) { + auto pool = GetDefaultPool(); + // prepare java bytes + auto java_bytes = Bytes::AllocateBytes(bytes.size(), pool.get()); + memcpy(java_bytes->data(), bytes.data(), bytes.size()); + // prepare result cpp bytes + auto cpp_serialized_bytes = decimal.ToUnscaledBytes(); + + // check serialized bytes equal + ASSERT_EQ(std::vector(java_bytes->data(), java_bytes->data() + java_bytes->size()), + cpp_serialized_bytes); + // check deserialize equal + auto decimal2 = Decimal::FromUnscaledBytes(38, 38, java_bytes.get()); + ASSERT_EQ(decimal, decimal2); + }; + { + Decimal decimal(38, 38, DecimalUtils::StrToInt128("12345678998765432145678").value()); + std::vector java_bytes = {0x2, 0x9d, 0x42, 0xb6, 0xa7, 0x2a, 0x9d, 0xc7, 0x7, 0xe}; + CheckResult(decimal, java_bytes); + } + { + Decimal decimal(38, 38, DecimalUtils::StrToInt128("-12345678998765432145678").value()); + std::vector java_bytes = {0xfd, 0x62, 0xbd, 0x49, 0x58, + 0xd5, 0x62, 0x38, 0xf8, 0xf2}; + CheckResult(decimal, java_bytes); + } + { + Decimal decimal(38, 38, DecimalUtils::StrToInt128("0").value()); + std::vector java_bytes = {0x0}; + CheckResult(decimal, java_bytes); + } + { + Decimal decimal(38, 38, DecimalUtils::StrToInt128("1").value()); + std::vector java_bytes = {0x1}; + CheckResult(decimal, java_bytes); + } + { + Decimal decimal(38, 38, DecimalUtils::StrToInt128("-1").value()); + std::vector java_bytes = {0xff}; + CheckResult(decimal, java_bytes); + } + { + Decimal decimal(38, 38, DecimalUtils::StrToInt128("128").value()); + std::vector java_bytes = {0x0, 0x80}; + CheckResult(decimal, java_bytes); + } + { + Decimal decimal(38, 38, Decimal::INT128_MINIMUM_VALUE); + std::vector java_bytes = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + CheckResult(decimal, java_bytes); + } + { + Decimal decimal(38, 38, Decimal::INT128_MAXIMUM_VALUE); + std::vector java_bytes = {0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; + CheckResult(decimal, java_bytes); + } +} + +TEST(DecimalTest, TestCompatibleWithJava) { + auto pool = GetDefaultPool(); + auto file_system = std::make_unique(); + auto file_name = paimon::test::GetDataDir() + "/decimal_bytes.data"; + uint64_t file_length = file_system->GetFileStatus(file_name).value()->GetLen(); + ASSERT_GT(file_length, 0); + ASSERT_OK_AND_ASSIGN(auto input_stream, file_system->Open(file_name)); + auto data_bytes = Bytes::AllocateBytes(file_length, pool.get()); + ASSERT_OK(input_stream->Read(data_bytes->data(), file_length)); + auto byte_array_input_stream = + std::make_shared(data_bytes->data(), file_length); + DataInputStream data_input_stream(byte_array_input_stream); + for (int32_t i = 0; i < 2000; i++) { + // read decimal str + ASSERT_OK_AND_ASSIGN(int32_t decimal_str_len, data_input_stream.ReadValue()); + auto decimal_str = Bytes::AllocateBytes(decimal_str_len, pool.get()); + ASSERT_OK(data_input_stream.ReadBytes(decimal_str.get())); + + // read decimal serialized bytes from java + ASSERT_OK_AND_ASSIGN(int32_t decimal_bytes_len, data_input_stream.ReadValue()); + auto decimal_bytes = Bytes::AllocateBytes(decimal_bytes_len, pool.get()); + ASSERT_OK(data_input_stream.ReadBytes(decimal_bytes.get())); + + // check result + auto str = std::string(decimal_str->data(), decimal_str->size()); + Decimal decimal(/*precision=*/29, /*scale=*/29, DecimalUtils::StrToInt128(str).value()); + auto serialized_bytes = decimal.ToUnscaledBytes(); + ASSERT_EQ( + std::vector(decimal_bytes->data(), decimal_bytes->data() + decimal_bytes->size()), + serialized_bytes); + auto decimal2 = + Decimal::FromUnscaledBytes(/*precision=*/29, /*scale=*/29, decimal_bytes.get()); + ASSERT_EQ(decimal, decimal2); + } +} + +TEST(DecimalTest, TestCompareTo) { + auto CheckResult = [](const Decimal& decimal1, const Decimal& decimal2) { + ASSERT_FALSE(decimal1 < decimal1); + ASSERT_FALSE(decimal1 > decimal1); + ASSERT_EQ(decimal1, decimal1); + + ASSERT_EQ(decimal1.CompareTo(decimal2), -1); + ASSERT_LT(decimal1, decimal2); + ASSERT_EQ(decimal2.CompareTo(decimal1), 1); + ASSERT_GT(decimal2, decimal1); + auto decimal3 = decimal1; + ASSERT_EQ(decimal3.CompareTo(decimal1), 0); + ASSERT_EQ(decimal3, decimal1); + + Decimal negative_decimal1(decimal1.Precision(), decimal1.Scale(), -decimal1.Value()); + Decimal negative_decimal2(decimal2.Precision(), decimal2.Scale(), -decimal2.Value()); + ASSERT_EQ(negative_decimal1.CompareTo(negative_decimal2), 1); + ASSERT_GT(negative_decimal1, negative_decimal2); + ASSERT_EQ(negative_decimal2.CompareTo(negative_decimal1), -1); + ASSERT_LT(negative_decimal2, negative_decimal1); + auto negative_decimal3 = negative_decimal1; + ASSERT_EQ(negative_decimal3.CompareTo(negative_decimal1), 0); + ASSERT_EQ(negative_decimal3, negative_decimal1); + }; + + auto CheckEqual = [](const Decimal& decimal1, const Decimal& decimal2) { + ASSERT_EQ(decimal1.CompareTo(decimal2), 0); + ASSERT_EQ(decimal2.CompareTo(decimal1), 0); + + Decimal negative_decimal1(decimal1.Precision(), decimal1.Scale(), -decimal1.Value()); + Decimal negative_decimal2(decimal2.Precision(), decimal2.Scale(), -decimal2.Value()); + ASSERT_EQ(negative_decimal1.CompareTo(negative_decimal2), 0); + ASSERT_EQ(negative_decimal2.CompareTo(negative_decimal1), 0); + }; + + // same scales + { + Decimal decimal1(23, 0, DecimalUtils::StrToInt128("99").value()); + Decimal decimal2(23, 0, DecimalUtils::StrToInt128("100").value()); + CheckResult(decimal1, decimal2); + } + { + Decimal decimal1(23, 5, DecimalUtils::StrToInt128("34543").value()); + Decimal decimal2(23, 5, DecimalUtils::StrToInt128("4324324").value()); + CheckResult(decimal1, decimal2); + } + { + Decimal decimal1(23, 15, DecimalUtils::StrToInt128("345345435432").value()); + Decimal decimal2(23, 15, DecimalUtils::StrToInt128("345344425435432").value()); + CheckResult(decimal1, decimal2); + } + { + Decimal decimal1(23, 20, DecimalUtils::StrToInt128("5").value()); + Decimal decimal2(23, 20, DecimalUtils::StrToInt128("50").value()); + CheckResult(decimal1, decimal2); + } + + // different scales + { + Decimal decimal1(23, 4, DecimalUtils::StrToInt128("10000").value()); + Decimal decimal2(23, 3, DecimalUtils::StrToInt128("10000").value()); + CheckResult(decimal1, decimal2); + } + { + Decimal decimal1(23, 2, DecimalUtils::StrToInt128("111").value()); + Decimal decimal2(23, 3, DecimalUtils::StrToInt128("1111").value()); + CheckResult(decimal1, decimal2); + } + { + Decimal decimal1(23, 5, DecimalUtils::StrToInt128("999999").value()); + Decimal decimal2(23, 5, DecimalUtils::StrToInt128("9999999").value()); + CheckResult(decimal1, decimal2); + } + { + Decimal decimal1(23, 1, DecimalUtils::StrToInt128("100").value()); + Decimal decimal2(23, 0, DecimalUtils::StrToInt128("99").value()); + CheckResult(decimal1, decimal2); + } + + // same integral parts + { + Decimal decimal1(23, 0, DecimalUtils::StrToInt128("99999").value()); + Decimal decimal2(23, 1, DecimalUtils::StrToInt128("999999").value()); + CheckResult(decimal1, decimal2); + } + { + Decimal decimal1(23, 3, DecimalUtils::StrToInt128("12345123").value()); + Decimal decimal2(23, 5, DecimalUtils::StrToInt128("1234553432").value()); + CheckResult(decimal1, decimal2); + } + + // equal numbers + { + Decimal decimal1(23, 3, DecimalUtils::StrToInt128("100000").value()); + Decimal decimal2(23, 0, DecimalUtils::StrToInt128("100").value()); + CheckEqual(decimal1, decimal2); + } + { + Decimal decimal1(23, 3, DecimalUtils::StrToInt128("100000").value()); + Decimal decimal2(23, 3, DecimalUtils::StrToInt128("100000").value()); + CheckEqual(decimal1, decimal2); + } + { + Decimal decimal1(23, 10, DecimalUtils::StrToInt128("1").value()); + Decimal decimal2(23, 11, DecimalUtils::StrToInt128("10").value()); + CheckEqual(decimal1, decimal2); + } + + // large scales (>18) + { + Decimal decimal1(128, 35, DecimalUtils::StrToInt128("99").value()); + Decimal decimal2(128, 35, DecimalUtils::StrToInt128("100").value()); + CheckResult(decimal1, decimal2); + } + { + Decimal decimal1( + 128, 29, DecimalUtils::StrToInt128("12345678999999999999999999999999999998").value()); + Decimal decimal2( + 128, 30, DecimalUtils::StrToInt128("123456789999999999999999999999999999999").value()); + CheckResult(decimal1, decimal2); + } + { + Decimal decimal1( + 128, 30, DecimalUtils::StrToInt128("123456789999999999999999999999999999900").value()); + Decimal decimal2( + 128, 29, DecimalUtils::StrToInt128("12345678999999999999999999999999999990").value()); + CheckEqual(decimal1, decimal2); + } + + // minimum and maximum + { + Decimal decimal1(128, 39, Decimal::INT128_MAXIMUM_VALUE); + Decimal decimal2( + 128, 39, DecimalUtils::StrToInt128("170141183460469231731687303715884105727").value()); + CheckEqual(decimal1, decimal2); + } + { + Decimal decimal1(128, 39, Decimal::INT128_MINIMUM_VALUE); + Decimal decimal2( + 128, 39, DecimalUtils::StrToInt128("-170141183460469231731687303715884105728").value()); + CheckEqual(decimal1, decimal2); + } + + // fractional overflow + { + Decimal decimal1(128, 39, Decimal::INT128_MAXIMUM_VALUE); + Decimal decimal2( + 128, 38, DecimalUtils::StrToInt128("99999999999999999999999999999999999999").value()); + CheckResult(decimal1, decimal2); + } + { + Decimal decimal1( + 128, 38, DecimalUtils::StrToInt128("-99999999999999999999999999999999999999").value()); + Decimal decimal2(128, 39, Decimal::INT128_MINIMUM_VALUE); + CheckResult(decimal1, decimal2); + } +} +} // namespace paimon::test diff --git a/src/paimon/common/data/timestamp.cpp b/src/paimon/common/data/timestamp.cpp new file mode 100644 index 0000000..3f4a313 --- /dev/null +++ b/src/paimon/common/data/timestamp.cpp @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/data/timestamp.h" + +#include +#include +#include + +namespace paimon { +const int32_t Timestamp::DEFAULT_PRECISION = 6; +const int32_t Timestamp::MILLIS_PRECISION = 3; +const int32_t Timestamp::MAX_PRECISION = 9; +const int32_t Timestamp::MIN_PRECISION = 0; +const int32_t Timestamp::MAX_COMPACT_PRECISION = 3; + +int64_t Timestamp::ToMicrosecond() const { + int64_t MICROS_PER_MILLIS = 1000l; + int64_t NANOS_PER_MICROS = 1000l; + int64_t micro = millisecond_ * MICROS_PER_MILLIS; + return micro + nano_of_millisecond_ / NANOS_PER_MICROS; +} + +std::string Timestamp::ToString() const { + time_t seconds = millisecond_ / 1000; + std::stringstream out; + int64_t ns = (millisecond_ % 1000) * 1000000l + nano_of_millisecond_; + if (ns < 0) { + seconds -= 1; + ns += 1000000000l; + } + + std::tm tm_info; + ::gmtime_r(&seconds, &tm_info); + out << std::put_time(&tm_info, "%Y-%m-%d %H:%M:%S") << '.' << std::setfill('0') << std::setw(9) + << ns; + // for year with less than 4 digits, year str is not 4 digits in put_time(), e.g., year 1 is "1" + // rather than "0001" + static const int32_t MAX_STR_LENGTH = 29; + std::string ret = out.str(); + if (ret.length() < MAX_STR_LENGTH) { + ret.insert(0, MAX_STR_LENGTH - ret.length(), '0'); + } + return ret; +} + +} // namespace paimon diff --git a/src/paimon/common/data/timestamp_test.cpp b/src/paimon/common/data/timestamp_test.cpp new file mode 100644 index 0000000..886f087 --- /dev/null +++ b/src/paimon/common/data/timestamp_test.cpp @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/data/timestamp.h" + +#include "gtest/gtest.h" + +namespace paimon::test { + +class TimestampTest : public ::testing::Test { + protected: + void SetUp() override { + timestamp_ = Timestamp(1622547800000, 123456); // 2021-06-01 11:43:20.000123456 + } + + Timestamp timestamp_; +}; + +TEST_F(TimestampTest, GetMillisecond) { + ASSERT_EQ(timestamp_.GetMillisecond(), 1622547800000); +} + +TEST_F(TimestampTest, GetNanoOfMillisecond) { + ASSERT_EQ(timestamp_.GetNanoOfMillisecond(), 123456); +} + +TEST_F(TimestampTest, ToNanosecond) { + ASSERT_EQ(timestamp_.ToNanosecond(), 1622547800000123456ll); +} + +TEST_F(TimestampTest, FromEpochMillis) { + Timestamp ts1 = Timestamp::FromEpochMillis(1622547800000); + ASSERT_EQ(ts1.GetMillisecond(), 1622547800000); + ASSERT_EQ(ts1.GetNanoOfMillisecond(), 0); + + Timestamp ts2 = Timestamp::FromEpochMillis(1622547800000, 123456); + ASSERT_EQ(ts2.GetMillisecond(), 1622547800000); + ASSERT_EQ(ts2.GetNanoOfMillisecond(), 123456); +} + +TEST_F(TimestampTest, ToMillisTimestamp) { + Timestamp ts = timestamp_.ToMillisTimestamp(); + ASSERT_EQ(ts.GetMillisecond(), 1622547800000); + ASSERT_EQ(ts.GetNanoOfMillisecond(), 0); +} + +TEST_F(TimestampTest, IsCompact) { + ASSERT_TRUE(Timestamp::IsCompact(3)); + ASSERT_FALSE(Timestamp::IsCompact(6)); +} + +TEST_F(TimestampTest, EqualityOperator) { + Timestamp ts1(1622547800000, 123456); + Timestamp ts2(1622547800000, 123456); + Timestamp ts3(1622547800000, 654321); + ASSERT_EQ(ts1, ts1); + ASSERT_EQ(ts1, ts2); + ASSERT_NE(ts1, ts3); +} + +TEST_F(TimestampTest, LessThanOperator) { + Timestamp ts1(1622547800000, 123456); + Timestamp ts2(1622547800000, 654321); + Timestamp ts3(1622547800001, 123456); + ASSERT_LT(ts1, ts2); + ASSERT_LT(ts1, ts3); + ASSERT_FALSE(ts2 < ts1); + ASSERT_FALSE(ts3 < ts1); +} + +TEST_F(TimestampTest, ToString) { + ASSERT_EQ(timestamp_.ToString(), "2021-06-01 11:43:20.000123456"); + ASSERT_EQ(Timestamp(-1, 0).ToString(), "1969-12-31 23:59:59.999000000"); + ASSERT_EQ(Timestamp(-62109569749000l, 0).ToString(), "0001-10-29 05:44:11.000000000"); +} + +TEST_F(TimestampTest, TestToMicrosecond) { + { + Timestamp ts(1622547800000, 123456); + ASSERT_EQ(1622547800000123l, ts.ToMicrosecond()); + } + { + Timestamp ts(-16225478, 123456); + ASSERT_EQ(-16225477877, ts.ToMicrosecond()); + } +} +} // namespace paimon::test diff --git a/src/paimon/common/logging/logging.cpp b/src/paimon/common/logging/logging.cpp new file mode 100644 index 0000000..5e8a3f1 --- /dev/null +++ b/src/paimon/common/logging/logging.cpp @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/logging.h" + +#include +#include +#include +#include +#include + +#include "glog/log_severity.h" +#include "glog/logging.h" +#include "glog/raw_logging.h" + +namespace paimon { + +static std::optional& getLoggerCreator() { + static std::optional _loggerCreator; + return _loggerCreator; +} + +static std::shared_mutex& getRegistryLock() { + static std::shared_mutex registryMutex; + return registryMutex; +} + +void Logger::RegisterLogger(LoggerCreator creator) { + std::unique_lock lock(getRegistryLock()); + getLoggerCreator() = creator; +} + +static google::LogSeverity ToGlogLevel(PaimonLogLevel level) { + switch (level) { + case PAIMON_LOG_LEVEL_DEBUG: + return google::GLOG_INFO; + case PAIMON_LOG_LEVEL_INFO: + return google::GLOG_INFO; + case PAIMON_LOG_LEVEL_WARN: + return google::GLOG_WARNING; + case PAIMON_LOG_LEVEL_ERROR: + return google::GLOG_ERROR; + case PAIMON_LOG_LEVEL_NONE: + case PAIMON_LOG_LEVEL_MAX: + default: + return google::GLOG_INFO; + } +} + +class GlogAdaptor : public Logger { + public: + void LogV(PaimonLogLevel level, const char* fname, int lineno, const char* function, + const char* fmt, ...) override { + va_list args; + va_start(args, fmt); + google::RawLog__(ToGlogLevel(level), fname, lineno, fmt, args); + va_end(args); + } + + bool IsLevelEnabled(PaimonLogLevel level) const override { + return true; + } +}; + +std::unique_ptr Logger::GetLogger(const std::string& path) { + auto& creator = getLoggerCreator(); + if (creator) { + std::shared_lock lock(getRegistryLock()); + return creator.value()(path); + } + std::unique_lock ulock(getRegistryLock()); + if (!google::IsGoogleLoggingInitialized()) { + google::InitGoogleLogging(program_invocation_name); + } + return std::make_unique(); +} + +} // namespace paimon diff --git a/src/paimon/common/logging/logging_test.cpp b/src/paimon/common/logging/logging_test.cpp new file mode 100644 index 0000000..8b6e3f6 --- /dev/null +++ b/src/paimon/common/logging/logging_test.cpp @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "paimon/logging.h" + +#include +#include +#include + +#include "paimon/common/executor/future.h" +#include "paimon/executor.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::test { +TEST(LoggerTest, TestMultiThreadGetLogger) { + auto executor = CreateDefaultExecutor(/*thread_count=*/4); + auto get_logger = []() { + auto logger = Logger::GetLogger("my_log"); + ASSERT_TRUE(logger); + }; + + std::vector> futures; + for (int32_t i = 0; i < 1000; ++i) { + futures.push_back(Via(executor.get(), get_logger)); + } + Wait(futures); +} +} // namespace paimon::test