Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions thirdparty/download-thirdparty.sh
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,14 @@ if [[ " ${TP_ARCHIVES[*]} " =~ " ARROW " ]]; then
if [[ "${ARROW_SOURCE}" == "arrow-apache-arrow-17.0.0" ]]; then
cd "${TP_SOURCE_DIR}/${ARROW_SOURCE}"
if [[ ! -f "${PATCHED_MARK}" ]]; then
# Paimon-cpp parquet patches: row-group-aware batch reader, max_row_group_size,
# GetBufferedSize(), int96 NANO guard, and Thrift_VERSION empty fix.
patch -p1 <"${TP_PATCH_DIR}/apache-arrow-17.0.0-paimon.patch"

# apache-arrow-17.0.0-force-write-int96-timestamps.patch :
# Introducing the parameter that forces writing int96 timestampes for compatibility with Paimon cpp.
patch -p1 <"${TP_PATCH_DIR}/apache-arrow-17.0.0-force-write-int96-timestamps.patch"

# apache-arrow-17.0.0-status-inline-static-fix.patch :
# Move Status::message()/detail() empty sentinels out of header
# inline function-local statics. Clang can place those weak inline
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
diff -ruN arrow-apache-arrow-17.0.0-after-paimon/cpp/src/parquet/arrow/schema.cc arrow-apache-arrow-17.0.0/cpp/src/parquet/arrow/schema.cc
--- arrow-apache-arrow-17.0.0-after-paimon/cpp/src/parquet/arrow/schema.cc 2026-03-27 01:23:23.651831424 +0800
+++ arrow-apache-arrow-17.0.0/cpp/src/parquet/arrow/schema.cc 2026-03-27 01:28:36.855281965 +0800
@@ -178,7 +178,8 @@

// The user is explicitly asking for Impala int96 encoding, there is no
// logical type.
- if (arrow_properties.support_deprecated_int96_timestamps() && target_unit == ::arrow::TimeUnit::NANO) {
+ if (arrow_properties.force_write_int96_timestamps() ||
+ (arrow_properties.support_deprecated_int96_timestamps() && target_unit == ::arrow::TimeUnit::NANO)) {
*physical_type = ParquetType::INT96;
return Status::OK();
}
diff -ruN arrow-apache-arrow-17.0.0-after-paimon/cpp/src/parquet/properties.h arrow-apache-arrow-17.0.0/cpp/src/parquet/properties.h
--- arrow-apache-arrow-17.0.0-after-paimon/cpp/src/parquet/properties.h 2026-03-27 01:23:23.643831362 +0800
+++ arrow-apache-arrow-17.0.0/cpp/src/parquet/properties.h 2026-03-27 01:27:47.717897537 +0800
@@ -980,6 +980,7 @@
public:
Builder()
: write_timestamps_as_int96_(false),
+ force_write_int96_timestamps_(false),
coerce_timestamps_enabled_(false),
coerce_timestamps_unit_(::arrow::TimeUnit::SECOND),
truncated_timestamps_allowed_(false),
@@ -1005,6 +1006,21 @@
return this;
}

+ /// \brief Force writing legacy int96 timestamps.
+ ///
+ /// This bypasses unit-based guards and writes INT96 whenever timestamp
+ /// metadata is resolved.
+ Builder* enable_force_write_int96_timestamps() {
+ force_write_int96_timestamps_ = true;
+ return this;
+ }
+
+ /// \brief Disable forcing legacy int96 timestamps (default).
+ Builder* disable_force_write_int96_timestamps() {
+ force_write_int96_timestamps_ = false;
+ return this;
+ }
+
/// \brief Coerce all timestamps to the specified time unit.
/// \param unit time unit to truncate to.
/// For Parquet versions 1.0 and 2.4, nanoseconds are casted to microseconds.
@@ -1085,7 +1101,8 @@
/// Create the final properties.
std::shared_ptr<ArrowWriterProperties> build() {
return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
- write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
+ write_timestamps_as_int96_, force_write_int96_timestamps_,
+ coerce_timestamps_enabled_, coerce_timestamps_unit_,
truncated_timestamps_allowed_, store_schema_, compliant_nested_types_,
engine_version_, use_threads_, executor_));
}
@@ -1093,6 +1110,8 @@
private:
bool write_timestamps_as_int96_;

+ bool force_write_int96_timestamps_;
+
bool coerce_timestamps_enabled_;
::arrow::TimeUnit::type coerce_timestamps_unit_;
bool truncated_timestamps_allowed_;
@@ -1107,6 +1126,8 @@

bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }

+ bool force_write_int96_timestamps() const { return force_write_int96_timestamps_; }
+
bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; }
::arrow::TimeUnit::type coerce_timestamps_unit() const {
return coerce_timestamps_unit_;
@@ -1138,6 +1159,7 @@

private:
explicit ArrowWriterProperties(bool write_nanos_as_int96,
+ bool force_write_int96_timestamps,
bool coerce_timestamps_enabled,
::arrow::TimeUnit::type coerce_timestamps_unit,
bool truncated_timestamps_allowed, bool store_schema,
@@ -1145,6 +1167,7 @@
EngineVersion engine_version, bool use_threads,
::arrow::internal::Executor* executor)
: write_timestamps_as_int96_(write_nanos_as_int96),
+ force_write_int96_timestamps_(force_write_int96_timestamps),
coerce_timestamps_enabled_(coerce_timestamps_enabled),
coerce_timestamps_unit_(coerce_timestamps_unit),
truncated_timestamps_allowed_(truncated_timestamps_allowed),
@@ -1155,6 +1178,7 @@
executor_(executor) {}

const bool write_timestamps_as_int96_;
+ const bool force_write_int96_timestamps_;
const bool coerce_timestamps_enabled_;
const ::arrow::TimeUnit::type coerce_timestamps_unit_;
const bool truncated_timestamps_allowed_;
224 changes: 224 additions & 0 deletions thirdparty/patches/apache-arrow-17.0.0-paimon.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index ec3890a41f..943f69bb6c 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -178,7 +178,7 @@ static Status GetTimestampMetadata(const ::arrow::TimestampType& type,

// The user is explicitly asking for Impala int96 encoding, there is no
// logical type.
- if (arrow_properties.support_deprecated_int96_timestamps()) {
+ if (arrow_properties.support_deprecated_int96_timestamps() && target_unit == ::arrow::TimeUnit::NANO) {
*physical_type = ParquetType::INT96;
return Status::OK();
}

diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 285e2a5973..aa6f92f077 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -1013,25 +1013,32 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
return Status::OK();
}

- int64_t num_rows = 0;
+ std::vector<int64_t> num_rows;
for (int row_group : row_groups) {
- num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
+ num_rows.push_back(parquet_reader()->metadata()->RowGroup(row_group)->num_rows());
}

using ::arrow::RecordBatchIterator;
+ int row_group_idx = 0;

// NB: This lambda will be invoked outside the scope of this call to
// `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value.
// `this` is a non-owning pointer so we are relying on the parent FileReader outliving
// this RecordBatchReader.
::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
- [readers, batch_schema, num_rows,
+ [readers, batch_schema, num_rows, row_group_idx,
this]() mutable -> ::arrow::Result<RecordBatchIterator> {
::arrow::ChunkedArrayVector columns(readers.size());

- // don't reserve more rows than necessary
- int64_t batch_size = std::min(properties().batch_size(), num_rows);
- num_rows -= batch_size;
+ int64_t batch_size = 0;
+ if (!num_rows.empty()) {
+ // don't reserve more rows than necessary
+ batch_size = std::min(properties().batch_size(), num_rows[row_group_idx]);
+ num_rows[row_group_idx] -= batch_size;
+ if (num_rows[row_group_idx] == 0 && (num_rows.size() - 1) != row_group_idx) {
+ row_group_idx++;
+ }
+ }

RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
reader_properties_.use_threads(), static_cast<int>(readers.size()),
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 4fd7ef1b47..87326a54f1 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -314,6 +314,14 @@ class FileWriterImpl : public FileWriter {
return Status::OK();
}

+ int64_t GetBufferedSize() override {
+ if (row_group_writer_ == nullptr) {
+ return 0;
+ }
+ return row_group_writer_->total_compressed_bytes() +
+ row_group_writer_->total_compressed_bytes_written();
+ }
+
Status Close() override {
if (!closed_) {
// Make idempotent
@@ -418,10 +426,13 @@ class FileWriterImpl : public FileWriter {

// Max number of rows allowed in a row group.
const int64_t max_row_group_length = this->properties().max_row_group_length();
+ const int64_t max_row_group_size = this->properties().max_row_group_size();

// Initialize a new buffered row group writer if necessary.
if (row_group_writer_ == nullptr || !row_group_writer_->buffered() ||
- row_group_writer_->num_rows() >= max_row_group_length) {
+ row_group_writer_->num_rows() >= max_row_group_length ||
+ (row_group_writer_->total_compressed_bytes_written() +
+ row_group_writer_->total_compressed_bytes() >= max_row_group_size)) {
RETURN_NOT_OK(NewBufferedRowGroup());
}

diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 4a1a033a7b..0f13d05e44 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -138,6 +138,9 @@ class PARQUET_EXPORT FileWriter {
/// option in this case.
virtual ::arrow::Status WriteRecordBatch(const ::arrow::RecordBatch& batch) = 0;

+ /// \brief Return the buffered size in bytes.
+ virtual int64_t GetBufferedSize() = 0;
+
/// \brief Write the footer and close the file.
virtual ::arrow::Status Close() = 0;
virtual ~FileWriter();
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 4d3acb491e..3906ff3c59 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -139,6 +139,7 @@ static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize;
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024;
+static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 128 * 1024 * 1024;
static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN;
@@ -232,6 +233,7 @@ class PARQUET_EXPORT WriterProperties {
dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
+ max_row_group_size_(DEFAULT_MAX_ROW_GROUP_SIZE),
pagesize_(kDefaultDataPageSize),
version_(ParquetVersion::PARQUET_2_6),
data_page_version_(ParquetDataPageVersion::V1),
@@ -244,6 +246,7 @@ class PARQUET_EXPORT WriterProperties {
dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()),
write_batch_size_(properties.write_batch_size()),
max_row_group_length_(properties.max_row_group_length()),
+ max_row_group_size_(properties.max_row_group_size()),
pagesize_(properties.data_pagesize()),
version_(properties.version()),
data_page_version_(properties.data_page_version()),
@@ -321,6 +324,13 @@ class PARQUET_EXPORT WriterProperties {
return this;
}

+ /// Specify the max bytes size to put in a single row group.
+ /// Default 128 M.
+ Builder* max_row_group_size(int64_t max_row_group_size) {
+ max_row_group_size_ = max_row_group_size;
+ return this;
+ }
+
/// Specify the data page size.
/// Default 1MB.
Builder* data_pagesize(int64_t pg_size) {
@@ -664,7 +674,7 @@ class PARQUET_EXPORT WriterProperties {

return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
- pagesize_, version_, created_by_, page_checksum_enabled_,
+ max_row_group_size_, pagesize_, version_, created_by_, page_checksum_enabled_,
std::move(file_encryption_properties_), default_column_properties_,
column_properties, data_page_version_, store_decimal_as_integer_,
std::move(sorting_columns_)));
@@ -675,6 +685,7 @@ class PARQUET_EXPORT WriterProperties {
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
int64_t max_row_group_length_;
+ int64_t max_row_group_size_;
int64_t pagesize_;
ParquetVersion::type version_;
ParquetDataPageVersion data_page_version_;
@@ -705,6 +716,8 @@ class PARQUET_EXPORT WriterProperties {

inline int64_t max_row_group_length() const { return max_row_group_length_; }

+ inline int64_t max_row_group_size() const { return max_row_group_size_; }
+
inline int64_t data_pagesize() const { return pagesize_; }

inline ParquetDataPageVersion data_page_version() const {
@@ -810,7 +823,7 @@ class PARQUET_EXPORT WriterProperties {
private:
explicit WriterProperties(
MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size,
- int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version,
+ int64_t max_row_group_length, int64_t max_row_group_size, int64_t pagesize, ParquetVersion::type version,
const std::string& created_by, bool page_write_checksum_enabled,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
@@ -821,6 +834,7 @@ class PARQUET_EXPORT WriterProperties {
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
max_row_group_length_(max_row_group_length),
+ max_row_group_size_(max_row_group_size),
pagesize_(pagesize),
parquet_data_page_version_(data_page_version),
parquet_version_(version),
@@ -836,6 +850,7 @@ class PARQUET_EXPORT WriterProperties {
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
int64_t max_row_group_length_;
+ int64_t max_row_group_size_;
int64_t pagesize_;
ParquetDataPageVersion parquet_data_page_version_;
ParquetVersion::type parquet_version_;
diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake
index 9df922afa2..5c8b3d4d07 100644
--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake
@@ -1789,7 +1789,20 @@ if(ARROW_WITH_THRIFT)
REQUIRED_VERSION
0.11.0)

- string(REPLACE "." ";" Thrift_VERSION_LIST ${Thrift_VERSION})
+ if(NOT Thrift_VERSION)
+ if(DEFINED thrift_PC_VERSION AND thrift_PC_VERSION)
+ set(Thrift_VERSION "${thrift_PC_VERSION}")
+ elseif(DEFINED ThriftAlt_VERSION AND ThriftAlt_VERSION)
+ set(Thrift_VERSION "${ThriftAlt_VERSION}")
+ elseif(DEFINED THRIFT_VERSION AND THRIFT_VERSION)
+ set(Thrift_VERSION "${THRIFT_VERSION}")
+ endif()
+ endif()
+ if(NOT Thrift_VERSION)
+ message(FATAL_ERROR "Thrift_VERSION is empty after resolving Thrift dependency")
+ endif()
+
+ string(REPLACE "." ";" Thrift_VERSION_LIST "${Thrift_VERSION}")
list(GET Thrift_VERSION_LIST 0 Thrift_VERSION_MAJOR)
list(GET Thrift_VERSION_LIST 1 Thrift_VERSION_MINOR)
list(GET Thrift_VERSION_LIST 2 Thrift_VERSION_PATCH)
Loading