diff --git a/thirdparty/download-thirdparty.sh b/thirdparty/download-thirdparty.sh index 89040c09d6ad40..815b3e081e3ea2 100755 --- a/thirdparty/download-thirdparty.sh +++ b/thirdparty/download-thirdparty.sh @@ -374,6 +374,14 @@ if [[ " ${TP_ARCHIVES[*]} " =~ " ARROW " ]]; then if [[ "${ARROW_SOURCE}" == "arrow-apache-arrow-17.0.0" ]]; then cd "${TP_SOURCE_DIR}/${ARROW_SOURCE}" if [[ ! -f "${PATCHED_MARK}" ]]; then + # Paimon-cpp parquet patches: row-group-aware batch reader, max_row_group_size, + # GetBufferedSize(), int96 NANO guard, and Thrift_VERSION empty fix. + patch -p1 <"${TP_PATCH_DIR}/apache-arrow-17.0.0-paimon.patch" + + # apache-arrow-17.0.0-force-write-int96-timestamps.patch : + # Introducing the parameter that forces writing int96 timestampes for compatibility with Paimon cpp. + patch -p1 <"${TP_PATCH_DIR}/apache-arrow-17.0.0-force-write-int96-timestamps.patch" + # apache-arrow-17.0.0-status-inline-static-fix.patch : # Move Status::message()/detail() empty sentinels out of header # inline function-local statics. Clang can place those weak inline diff --git a/thirdparty/patches/apache-arrow-17.0.0-force-write-int96-timestamps.patch b/thirdparty/patches/apache-arrow-17.0.0-force-write-int96-timestamps.patch new file mode 100644 index 00000000000000..5a75424756671d --- /dev/null +++ b/thirdparty/patches/apache-arrow-17.0.0-force-write-int96-timestamps.patch @@ -0,0 +1,98 @@ +diff -ruN arrow-apache-arrow-17.0.0-after-paimon/cpp/src/parquet/arrow/schema.cc arrow-apache-arrow-17.0.0/cpp/src/parquet/arrow/schema.cc +--- arrow-apache-arrow-17.0.0-after-paimon/cpp/src/parquet/arrow/schema.cc 2026-03-27 01:23:23.651831424 +0800 ++++ arrow-apache-arrow-17.0.0/cpp/src/parquet/arrow/schema.cc 2026-03-27 01:28:36.855281965 +0800 +@@ -178,7 +178,8 @@ + + // The user is explicitly asking for Impala int96 encoding, there is no + // logical type. +- if (arrow_properties.support_deprecated_int96_timestamps() && target_unit == ::arrow::TimeUnit::NANO) { ++ if (arrow_properties.force_write_int96_timestamps() || ++ (arrow_properties.support_deprecated_int96_timestamps() && target_unit == ::arrow::TimeUnit::NANO)) { + *physical_type = ParquetType::INT96; + return Status::OK(); + } +diff -ruN arrow-apache-arrow-17.0.0-after-paimon/cpp/src/parquet/properties.h arrow-apache-arrow-17.0.0/cpp/src/parquet/properties.h +--- arrow-apache-arrow-17.0.0-after-paimon/cpp/src/parquet/properties.h 2026-03-27 01:23:23.643831362 +0800 ++++ arrow-apache-arrow-17.0.0/cpp/src/parquet/properties.h 2026-03-27 01:27:47.717897537 +0800 +@@ -980,6 +980,7 @@ + public: + Builder() + : write_timestamps_as_int96_(false), ++ force_write_int96_timestamps_(false), + coerce_timestamps_enabled_(false), + coerce_timestamps_unit_(::arrow::TimeUnit::SECOND), + truncated_timestamps_allowed_(false), +@@ -1005,6 +1006,21 @@ + return this; + } + ++ /// \brief Force writing legacy int96 timestamps. ++ /// ++ /// This bypasses unit-based guards and writes INT96 whenever timestamp ++ /// metadata is resolved. ++ Builder* enable_force_write_int96_timestamps() { ++ force_write_int96_timestamps_ = true; ++ return this; ++ } ++ ++ /// \brief Disable forcing legacy int96 timestamps (default). ++ Builder* disable_force_write_int96_timestamps() { ++ force_write_int96_timestamps_ = false; ++ return this; ++ } ++ + /// \brief Coerce all timestamps to the specified time unit. + /// \param unit time unit to truncate to. + /// For Parquet versions 1.0 and 2.4, nanoseconds are casted to microseconds. +@@ -1085,7 +1101,8 @@ + /// Create the final properties. + std::shared_ptr build() { + return std::shared_ptr(new ArrowWriterProperties( +- write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, ++ write_timestamps_as_int96_, force_write_int96_timestamps_, ++ coerce_timestamps_enabled_, coerce_timestamps_unit_, + truncated_timestamps_allowed_, store_schema_, compliant_nested_types_, + engine_version_, use_threads_, executor_)); + } +@@ -1093,6 +1110,8 @@ + private: + bool write_timestamps_as_int96_; + ++ bool force_write_int96_timestamps_; ++ + bool coerce_timestamps_enabled_; + ::arrow::TimeUnit::type coerce_timestamps_unit_; + bool truncated_timestamps_allowed_; +@@ -1107,6 +1126,8 @@ + + bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; } + ++ bool force_write_int96_timestamps() const { return force_write_int96_timestamps_; } ++ + bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; } + ::arrow::TimeUnit::type coerce_timestamps_unit() const { + return coerce_timestamps_unit_; +@@ -1138,6 +1159,7 @@ + + private: + explicit ArrowWriterProperties(bool write_nanos_as_int96, ++ bool force_write_int96_timestamps, + bool coerce_timestamps_enabled, + ::arrow::TimeUnit::type coerce_timestamps_unit, + bool truncated_timestamps_allowed, bool store_schema, +@@ -1145,6 +1167,7 @@ + EngineVersion engine_version, bool use_threads, + ::arrow::internal::Executor* executor) + : write_timestamps_as_int96_(write_nanos_as_int96), ++ force_write_int96_timestamps_(force_write_int96_timestamps), + coerce_timestamps_enabled_(coerce_timestamps_enabled), + coerce_timestamps_unit_(coerce_timestamps_unit), + truncated_timestamps_allowed_(truncated_timestamps_allowed), +@@ -1155,6 +1178,7 @@ + executor_(executor) {} + + const bool write_timestamps_as_int96_; ++ const bool force_write_int96_timestamps_; + const bool coerce_timestamps_enabled_; + const ::arrow::TimeUnit::type coerce_timestamps_unit_; + const bool truncated_timestamps_allowed_; diff --git a/thirdparty/patches/apache-arrow-17.0.0-paimon.patch b/thirdparty/patches/apache-arrow-17.0.0-paimon.patch new file mode 100644 index 00000000000000..4e53117b79b65b --- /dev/null +++ b/thirdparty/patches/apache-arrow-17.0.0-paimon.patch @@ -0,0 +1,224 @@ +diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc +index ec3890a41f..943f69bb6c 100644 +--- a/cpp/src/parquet/arrow/schema.cc ++++ b/cpp/src/parquet/arrow/schema.cc +@@ -178,7 +178,7 @@ static Status GetTimestampMetadata(const ::arrow::TimestampType& type, + + // The user is explicitly asking for Impala int96 encoding, there is no + // logical type. +- if (arrow_properties.support_deprecated_int96_timestamps()) { ++ if (arrow_properties.support_deprecated_int96_timestamps() && target_unit == ::arrow::TimeUnit::NANO) { + *physical_type = ParquetType::INT96; + return Status::OK(); + } + +diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc +index 285e2a5973..aa6f92f077 100644 +--- a/cpp/src/parquet/arrow/reader.cc ++++ b/cpp/src/parquet/arrow/reader.cc +@@ -1013,25 +1013,32 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, + return Status::OK(); + } + +- int64_t num_rows = 0; ++ std::vector num_rows; + for (int row_group : row_groups) { +- num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows(); ++ num_rows.push_back(parquet_reader()->metadata()->RowGroup(row_group)->num_rows()); + } + + using ::arrow::RecordBatchIterator; ++ int row_group_idx = 0; + + // NB: This lambda will be invoked outside the scope of this call to + // `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value. + // `this` is a non-owning pointer so we are relying on the parent FileReader outliving + // this RecordBatchReader. + ::arrow::Iterator batches = ::arrow::MakeFunctionIterator( +- [readers, batch_schema, num_rows, ++ [readers, batch_schema, num_rows, row_group_idx, + this]() mutable -> ::arrow::Result { + ::arrow::ChunkedArrayVector columns(readers.size()); + +- // don't reserve more rows than necessary +- int64_t batch_size = std::min(properties().batch_size(), num_rows); +- num_rows -= batch_size; ++ int64_t batch_size = 0; ++ if (!num_rows.empty()) { ++ // don't reserve more rows than necessary ++ batch_size = std::min(properties().batch_size(), num_rows[row_group_idx]); ++ num_rows[row_group_idx] -= batch_size; ++ if (num_rows[row_group_idx] == 0 && (num_rows.size() - 1) != row_group_idx) { ++ row_group_idx++; ++ } ++ } + + RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( + reader_properties_.use_threads(), static_cast(readers.size()), +diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc +index 4fd7ef1b47..87326a54f1 100644 +--- a/cpp/src/parquet/arrow/writer.cc ++++ b/cpp/src/parquet/arrow/writer.cc +@@ -314,6 +314,14 @@ class FileWriterImpl : public FileWriter { + return Status::OK(); + } + ++ int64_t GetBufferedSize() override { ++ if (row_group_writer_ == nullptr) { ++ return 0; ++ } ++ return row_group_writer_->total_compressed_bytes() + ++ row_group_writer_->total_compressed_bytes_written(); ++ } ++ + Status Close() override { + if (!closed_) { + // Make idempotent +@@ -418,10 +426,13 @@ class FileWriterImpl : public FileWriter { + + // Max number of rows allowed in a row group. + const int64_t max_row_group_length = this->properties().max_row_group_length(); ++ const int64_t max_row_group_size = this->properties().max_row_group_size(); + + // Initialize a new buffered row group writer if necessary. + if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || +- row_group_writer_->num_rows() >= max_row_group_length) { ++ row_group_writer_->num_rows() >= max_row_group_length || ++ (row_group_writer_->total_compressed_bytes_written() + ++ row_group_writer_->total_compressed_bytes() >= max_row_group_size)) { + RETURN_NOT_OK(NewBufferedRowGroup()); + } + +diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h +index 4a1a033a7b..0f13d05e44 100644 +--- a/cpp/src/parquet/arrow/writer.h ++++ b/cpp/src/parquet/arrow/writer.h +@@ -138,6 +138,9 @@ class PARQUET_EXPORT FileWriter { + /// option in this case. + virtual ::arrow::Status WriteRecordBatch(const ::arrow::RecordBatch& batch) = 0; + ++ /// \brief Return the buffered size in bytes. ++ virtual int64_t GetBufferedSize() = 0; ++ + /// \brief Write the footer and close the file. + virtual ::arrow::Status Close() = 0; + virtual ~FileWriter(); +diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h +index 4d3acb491e..3906ff3c59 100644 +--- a/cpp/src/parquet/properties.h ++++ b/cpp/src/parquet/properties.h +@@ -139,6 +139,7 @@ static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; + static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; + static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; + static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024; ++static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 128 * 1024 * 1024; + static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true; + static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096; + static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN; +@@ -232,6 +233,7 @@ class PARQUET_EXPORT WriterProperties { + dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), + write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), + max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH), ++ max_row_group_size_(DEFAULT_MAX_ROW_GROUP_SIZE), + pagesize_(kDefaultDataPageSize), + version_(ParquetVersion::PARQUET_2_6), + data_page_version_(ParquetDataPageVersion::V1), +@@ -244,6 +246,7 @@ class PARQUET_EXPORT WriterProperties { + dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()), + write_batch_size_(properties.write_batch_size()), + max_row_group_length_(properties.max_row_group_length()), ++ max_row_group_size_(properties.max_row_group_size()), + pagesize_(properties.data_pagesize()), + version_(properties.version()), + data_page_version_(properties.data_page_version()), +@@ -321,6 +324,13 @@ class PARQUET_EXPORT WriterProperties { + return this; + } + ++ /// Specify the max bytes size to put in a single row group. ++ /// Default 128 M. ++ Builder* max_row_group_size(int64_t max_row_group_size) { ++ max_row_group_size_ = max_row_group_size; ++ return this; ++ } ++ + /// Specify the data page size. + /// Default 1MB. + Builder* data_pagesize(int64_t pg_size) { +@@ -664,7 +674,7 @@ class PARQUET_EXPORT WriterProperties { + + return std::shared_ptr(new WriterProperties( + pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, +- pagesize_, version_, created_by_, page_checksum_enabled_, ++ max_row_group_size_, pagesize_, version_, created_by_, page_checksum_enabled_, + std::move(file_encryption_properties_), default_column_properties_, + column_properties, data_page_version_, store_decimal_as_integer_, + std::move(sorting_columns_))); +@@ -675,6 +685,7 @@ class PARQUET_EXPORT WriterProperties { + int64_t dictionary_pagesize_limit_; + int64_t write_batch_size_; + int64_t max_row_group_length_; ++ int64_t max_row_group_size_; + int64_t pagesize_; + ParquetVersion::type version_; + ParquetDataPageVersion data_page_version_; +@@ -705,6 +716,8 @@ class PARQUET_EXPORT WriterProperties { + + inline int64_t max_row_group_length() const { return max_row_group_length_; } + ++ inline int64_t max_row_group_size() const { return max_row_group_size_; } ++ + inline int64_t data_pagesize() const { return pagesize_; } + + inline ParquetDataPageVersion data_page_version() const { +@@ -810,7 +823,7 @@ class PARQUET_EXPORT WriterProperties { + private: + explicit WriterProperties( + MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, +- int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version, ++ int64_t max_row_group_length, int64_t max_row_group_size, int64_t pagesize, ParquetVersion::type version, + const std::string& created_by, bool page_write_checksum_enabled, + std::shared_ptr file_encryption_properties, + const ColumnProperties& default_column_properties, +@@ -821,6 +834,7 @@ class PARQUET_EXPORT WriterProperties { + dictionary_pagesize_limit_(dictionary_pagesize_limit), + write_batch_size_(write_batch_size), + max_row_group_length_(max_row_group_length), ++ max_row_group_size_(max_row_group_size), + pagesize_(pagesize), + parquet_data_page_version_(data_page_version), + parquet_version_(version), +@@ -836,6 +850,7 @@ class PARQUET_EXPORT WriterProperties { + int64_t dictionary_pagesize_limit_; + int64_t write_batch_size_; + int64_t max_row_group_length_; ++ int64_t max_row_group_size_; + int64_t pagesize_; + ParquetDataPageVersion parquet_data_page_version_; + ParquetVersion::type parquet_version_; +diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake +index 9df922afa2..5c8b3d4d07 100644 +--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake ++++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake +@@ -1789,7 +1789,20 @@ if(ARROW_WITH_THRIFT) + REQUIRED_VERSION + 0.11.0) + +- string(REPLACE "." ";" Thrift_VERSION_LIST ${Thrift_VERSION}) ++ if(NOT Thrift_VERSION) ++ if(DEFINED thrift_PC_VERSION AND thrift_PC_VERSION) ++ set(Thrift_VERSION "${thrift_PC_VERSION}") ++ elseif(DEFINED ThriftAlt_VERSION AND ThriftAlt_VERSION) ++ set(Thrift_VERSION "${ThriftAlt_VERSION}") ++ elseif(DEFINED THRIFT_VERSION AND THRIFT_VERSION) ++ set(Thrift_VERSION "${THRIFT_VERSION}") ++ endif() ++ endif() ++ if(NOT Thrift_VERSION) ++ message(FATAL_ERROR "Thrift_VERSION is empty after resolving Thrift dependency") ++ endif() ++ ++ string(REPLACE "." ";" Thrift_VERSION_LIST "${Thrift_VERSION}") + list(GET Thrift_VERSION_LIST 0 Thrift_VERSION_MAJOR) + list(GET Thrift_VERSION_LIST 1 Thrift_VERSION_MINOR) + list(GET Thrift_VERSION_LIST 2 Thrift_VERSION_PATCH)