From b9042779b868d676181688cf2d87fccbfc821d6a Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Mon, 1 Jun 2026 17:22:26 +0800 Subject: [PATCH 01/14] fix: add move constructor for RowRanges --- src/paimon/format/parquet/row_ranges.cpp | 4 ++-- src/paimon/format/parquet/row_ranges.h | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/paimon/format/parquet/row_ranges.cpp b/src/paimon/format/parquet/row_ranges.cpp index 1b03715be..40b2f46b3 100644 --- a/src/paimon/format/parquet/row_ranges.cpp +++ b/src/paimon/format/parquet/row_ranges.cpp @@ -44,11 +44,11 @@ RowRanges RowRanges::Union(const RowRanges& left, const RowRanges& right) { combined.reserve(left.ranges_.size() + right.ranges_.size()); combined.insert(combined.end(), left.ranges_.begin(), left.ranges_.end()); combined.insert(combined.end(), right.ranges_.begin(), right.ranges_.end()); - return RowRanges(Range::SortAndMergeOverlap(combined, /*adjacent=*/true)); + return RowRanges(std::move(Range::SortAndMergeOverlap(combined, /*adjacent=*/true))); } RowRanges RowRanges::Intersection(const RowRanges& left, const RowRanges& right) { - return RowRanges(Range::And(left.ranges_, right.ranges_)); + return RowRanges(std::move(Range::And(left.ranges_, right.ranges_))); } int64_t RowRanges::RowCount() const { diff --git a/src/paimon/format/parquet/row_ranges.h b/src/paimon/format/parquet/row_ranges.h index 288fa48f4..a9027050f 100644 --- a/src/paimon/format/parquet/row_ranges.h +++ b/src/paimon/format/parquet/row_ranges.h @@ -43,6 +43,9 @@ class RowRanges { /// Creates a RowRanges from a list of ranges. explicit RowRanges(const std::vector& ranges) : ranges_(ranges) {} + /// Creates a RowRanges from a list of ranges, taking ownership of the vector. + explicit RowRanges(const std::vector&& ranges) : ranges_(std::move(ranges)) {} + /// Creates a RowRanges with a single range [0, row_count - 1]. static RowRanges CreateSingle(int64_t row_count) { if (row_count <= 0) { From 84d73244fd1459b676f61402f3b8da7f06d9358c Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Tue, 2 Jun 2026 11:56:42 +0800 Subject: [PATCH 02/14] fix: add WhenBuffered after PreBuffer is called --- src/paimon/format/parquet/page_filtered_row_group_reader.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index 4594717f0..fa311eb31 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -249,6 +249,8 @@ Result> PageFilteredRowGroupReader::Re // Pre-buffering failed, fall back to row-group level PreBuffer ::arrow::io::IOContext io_ctx(pool); parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + parquet_reader->WhenBuffered(rg_vec, col_vec).status()); } } else { PAIMON_RETURN_NOT_OK_FROM_ARROW(parquet_reader->WhenBuffered(rg_vec, col_vec).status()); From ffebaedd51e0bd4939b8a0341c270c6b278170c7 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Tue, 2 Jun 2026 14:00:07 +0800 Subject: [PATCH 03/14] fix(ColumnIndexFilter): return all row when literal is empty --- .../format/parquet/column_index_filter.cpp | 31 ++++++------------- .../parquet/column_index_filter_test.cpp | 28 +++++++++++++++++ 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/src/paimon/format/parquet/column_index_filter.cpp b/src/paimon/format/parquet/column_index_filter.cpp index cf638cf6d..c2f9ab618 100644 --- a/src/paimon/format/parquet/column_index_filter.cpp +++ b/src/paimon/format/parquet/column_index_filter.cpp @@ -96,6 +96,10 @@ Result ColumnIndexFilter::VisitLeafPredicate( const auto& literals = leaf_predicate->Literals(); FieldType field_type = leaf_predicate->GetFieldType(); + if (function_type != Function::Type::IS_NULL && function_type != Function::Type::IS_NOT_NULL && + literals.empty()) { + return RowRanges::CreateSingle(row_group_row_count); + } std::vector matching_pages; switch (function_type) { @@ -106,37 +110,22 @@ Result ColumnIndexFilter::VisitLeafPredicate( matching_pages = FilterPagesByIsNotNull(column_index_ptr); break; case Function::Type::EQUAL: - if (!literals.empty()) { - matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::NOT_EQUAL: - if (!literals.empty()) { - matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByNotEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::LESS_THAN: - if (!literals.empty()) { - matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByLessThan(column_index_ptr, literals[0], field_type); break; case Function::Type::LESS_OR_EQUAL: - if (!literals.empty()) { - matching_pages = - FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByLessOrEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::GREATER_THAN: - if (!literals.empty()) { - matching_pages = - FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByGreaterThan(column_index_ptr, literals[0], field_type); break; case Function::Type::GREATER_OR_EQUAL: - if (!literals.empty()) { - matching_pages = - FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type); - } + matching_pages = FilterPagesByGreaterOrEqual(column_index_ptr, literals[0], field_type); break; case Function::Type::IN: matching_pages = FilterPagesByIn(column_index_ptr, literals, field_type); diff --git a/src/paimon/format/parquet/column_index_filter_test.cpp b/src/paimon/format/parquet/column_index_filter_test.cpp index 8249f6356..321d6b4e8 100644 --- a/src/paimon/format/parquet/column_index_filter_test.cpp +++ b/src/paimon/format/parquet/column_index_filter_test.cpp @@ -26,6 +26,9 @@ #include "arrow/c/abi.h" #include "arrow/c/bridge.h" #include "gtest/gtest.h" +#include "paimon/common/predicate/equal.h" +#include "paimon/common/predicate/in.h" +#include "paimon/common/predicate/leaf_predicate_impl.h" #include "paimon/common/utils/arrow/arrow_input_stream_adapter.h" #include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/defs.h" @@ -480,4 +483,29 @@ TEST_F(ColumnIndexFilterTest, NullPredicateReturnsAllRows) { EXPECT_EQ(row_group_row_count_, ranges.RowCount()); } +/// When literals is empty for comparison predicates (EQUAL, NOT_EQUAL, LESS_THAN, +/// LESS_OR_EQUAL, GREATER_THAN, GREATER_OR_EQUAL), the filter should return all +/// rows (conservative fallback) rather than returning empty ranges. +TEST_F(ColumnIndexFilterTest, EmptyLiteralsReturnsAllRows) { + // Construct a LeafPredicate with EQUAL function but empty literals vector. + // This simulates the edge case where literals are unexpectedly empty. + auto pred = std::make_shared(paimon::Equal::Instance(), 0, "val", + FieldType::INT, std::vector()); + ASSERT_OK_AND_ASSIGN(auto ranges, Filter(pred)); + // With empty literals, the filter cannot evaluate the comparison, + // so it should conservatively return all rows. + EXPECT_EQ(row_group_row_count_, ranges.RowCount()); +} + +/// Empty literals for IN predicate — the early guard in VisitLeafPredicate treats +/// all non-IS_NULL/IS_NOT_NULL predicates with empty literals conservatively, +/// returning all rows rather than risking incorrect filtering. +TEST_F(ColumnIndexFilterTest, EmptyLiteralsInReturnsAllRows) { + auto pred = std::make_shared(paimon::In::Instance(), 0, "val", + FieldType::INT, std::vector()); + ASSERT_OK_AND_ASSIGN(auto ranges, Filter(pred)); + // Empty literals → conservative fallback → all rows. + EXPECT_EQ(row_group_row_count_, ranges.RowCount()); +} + } // namespace paimon::parquet::test From 7b7fd11c89a8b21e3346756a5a57fd71d6838223 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Tue, 2 Jun 2026 14:04:17 +0800 Subject: [PATCH 04/14] style: delete duplicated macro and move it to common header files --- src/paimon/format/parquet/file_reader_wrapper.cpp | 12 +----------- .../format/parquet/parquet_file_batch_reader.cpp | 11 ----------- src/paimon/format/parquet/parquet_format_defs.h | 11 +++++++++++ 3 files changed, 12 insertions(+), 22 deletions(-) diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index bfabb9f86..98c5a9f38 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -26,23 +26,13 @@ #include "fmt/format.h" #include "paimon/format/parquet/column_index_filter.h" #include "paimon/format/parquet/page_filtered_row_group_reader.h" +#include "paimon/format/parquet/parquet_format_defs.h" #include "paimon/macros.h" #include "parquet/arrow/reader.h" #include "parquet/file_reader.h" #include "parquet/metadata.h" #include "parquet/page_index.h" -// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a -// Status. Used as the trailing catch clauses of a try block in every public -// method that calls into the parquet C++ API, so the read layer never throws. -#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \ - catch (const std::exception& e) { \ - return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \ - } \ - catch (...) { \ - return Status::UnknownError((context), ": unknown error"); \ - } - namespace paimon::parquet { namespace { diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index c37570acb..3c78720f1 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -48,17 +48,6 @@ #include "parquet/arrow/reader.h" #include "parquet/properties.h" -// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a -// Status. Used as the trailing catch clauses of a try block in every public -// method that calls into the parquet C++ API, so the read layer never throws. -#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \ - catch (const std::exception& e) { \ - return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \ - } \ - catch (...) { \ - return Status::UnknownError((context), ": unknown error"); \ - } - namespace arrow { class MemoryPool; } // namespace arrow diff --git a/src/paimon/format/parquet/parquet_format_defs.h b/src/paimon/format/parquet/parquet_format_defs.h index b979b5780..b646ca6c2 100644 --- a/src/paimon/format/parquet/parquet_format_defs.h +++ b/src/paimon/format/parquet/parquet_format_defs.h @@ -19,6 +19,17 @@ #include #include +// Convert any std::exception thrown by underlying Parquet/Arrow APIs into a +// Status. Used as the trailing catch clauses of a try block in every public +// method that calls into the parquet C++ API, so the read layer never throws. +#define PAIMON_PARQUET_CATCH_AND_RETURN_STATUS(context) \ + catch (const std::exception& e) { \ + return Status::Invalid(fmt::format("{}: {}", (context), e.what())); \ + } \ + catch (...) { \ + return Status::UnknownError((context), ": unknown error"); \ + } + namespace paimon::parquet { // write From e20b0308dd821106b1c933d982fd7c15936fa20c Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Tue, 2 Jun 2026 14:09:20 +0800 Subject: [PATCH 05/14] fix: compute 'column_name_to_index' only when needed --- .../parquet/parquet_file_batch_reader.cpp | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 3c78720f1..8da49efd6 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -148,18 +148,6 @@ Status ParquetFileBatchReader::SetReadSchema( } } - // Build column name to index map for page-level filtering. - // For leaf columns, indices[0] is the correct leaf column index in Parquet. - // For nested types (struct/list/map), FlattenSchema produces multiple leaf indices, - // but predicate pushdown only targets leaf columns with simple types, so indices[0] - // is always the correct single leaf index for predicate evaluation. - std::map column_name_to_index; - for (const auto& [name, indices] : field_index_map) { - if (!indices.empty()) { - column_name_to_index[name] = indices[0]; - } - } - std::vector row_groups = arrow::internal::Iota(reader_->GetNumberOfRowGroups()); if (predicate) { PAIMON_ASSIGN_OR_RAISE(row_groups, @@ -177,6 +165,18 @@ Status ParquetFileBatchReader::SetReadSchema( OptionsUtils::GetValueFromMap(options_, PARQUET_READ_ENABLE_PAGE_INDEX_FILTER, DEFAULT_PARQUET_READ_ENABLE_PAGE_INDEX_FILTER)); if (enable_page_index_filter) { + // Build column name to index map for page-level filtering. + // For leaf columns, indices[0] is the correct leaf column index in Parquet. + // For nested types (struct/list/map), FlattenSchema produces multiple leaf indices, + // but predicate pushdown only targets leaf columns with simple types, so indices[0] + // is always the correct single leaf index for predicate evaluation. + std::map column_name_to_index; + for (const auto& [name, indices] : field_index_map) { + if (!indices.empty()) { + column_name_to_index[name] = indices[0]; + } + } + PAIMON_ASSIGN_OR_RAISE( auto page_filter_result, FilterRowGroupsByPageIndex(predicate, column_name_to_index, row_groups)); From 6bb45ba2f244a565cb3bf76e32fd9f37923a50a5 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Tue, 2 Jun 2026 14:58:06 +0800 Subject: [PATCH 06/14] fix: removing 'const' in the move constructor of RowRanges --- src/paimon/format/parquet/row_ranges.cpp | 4 ++-- src/paimon/format/parquet/row_ranges.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/paimon/format/parquet/row_ranges.cpp b/src/paimon/format/parquet/row_ranges.cpp index 40b2f46b3..1b03715be 100644 --- a/src/paimon/format/parquet/row_ranges.cpp +++ b/src/paimon/format/parquet/row_ranges.cpp @@ -44,11 +44,11 @@ RowRanges RowRanges::Union(const RowRanges& left, const RowRanges& right) { combined.reserve(left.ranges_.size() + right.ranges_.size()); combined.insert(combined.end(), left.ranges_.begin(), left.ranges_.end()); combined.insert(combined.end(), right.ranges_.begin(), right.ranges_.end()); - return RowRanges(std::move(Range::SortAndMergeOverlap(combined, /*adjacent=*/true))); + return RowRanges(Range::SortAndMergeOverlap(combined, /*adjacent=*/true)); } RowRanges RowRanges::Intersection(const RowRanges& left, const RowRanges& right) { - return RowRanges(std::move(Range::And(left.ranges_, right.ranges_))); + return RowRanges(Range::And(left.ranges_, right.ranges_)); } int64_t RowRanges::RowCount() const { diff --git a/src/paimon/format/parquet/row_ranges.h b/src/paimon/format/parquet/row_ranges.h index a9027050f..401e49574 100644 --- a/src/paimon/format/parquet/row_ranges.h +++ b/src/paimon/format/parquet/row_ranges.h @@ -44,7 +44,7 @@ class RowRanges { explicit RowRanges(const std::vector& ranges) : ranges_(ranges) {} /// Creates a RowRanges from a list of ranges, taking ownership of the vector. - explicit RowRanges(const std::vector&& ranges) : ranges_(std::move(ranges)) {} + explicit RowRanges(std::vector&& ranges) : ranges_(std::move(ranges)) {} /// Creates a RowRanges with a single range [0, row_count - 1]. static RowRanges CreateSingle(int64_t row_count) { From 7458922ed5b21e13999c5c7a13ef7217965d5aae Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Wed, 3 Jun 2026 14:23:57 +0800 Subject: [PATCH 07/14] refractor: delete redundant member variables in FileReaderWrapper --- .../format/parquet/file_reader_wrapper.cpp | 122 ++++++++---------- .../format/parquet/file_reader_wrapper.h | 21 +-- .../parquet/file_reader_wrapper_test.cpp | 28 ++-- .../parquet/parquet_file_batch_reader.cpp | 25 ++-- .../parquet/parquet_file_batch_reader.h | 7 +- src/paimon/format/parquet/row_ranges.h | 12 ++ 6 files changed, 107 insertions(+), 108 deletions(-) diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index 98c5a9f38..2fbc9dca3 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -99,15 +99,16 @@ Result> FileReaderWrapper::Create( return Status::Invalid(fmt::format( "unexpected error. row group ranges not match with num rows {}", num_rows)); } - std::vector row_groups_indices = - arrow::internal::Iota(file_reader->num_row_groups()); std::vector columns_indices = arrow::internal::Iota(file_reader->parquet_reader()->metadata()->num_columns()); auto file_reader_wrapper = std::unique_ptr(new FileReaderWrapper( std::move(file_reader), all_row_group_ranges, num_rows, pool, batch_size)); - PAIMON_RETURN_NOT_OK(file_reader_wrapper->PrepareForReadingLazy( - std::set(row_groups_indices.begin(), row_groups_indices.end()), - columns_indices)); + std::vector all_target_row_groups; + for (int32_t i = 0; i < file_reader_wrapper->GetNumberOfRowGroups(); i++) { + all_target_row_groups.emplace_back(i, false, RowRanges()); + } + PAIMON_RETURN_NOT_OK( + file_reader_wrapper->PrepareForReadingLazy(all_target_row_groups, columns_indices)); return file_reader_wrapper; } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("FileReaderWrapper::Create") @@ -165,16 +166,18 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { filtered_global_offset_ = 0; for (uint64_t i = 0; i < target_row_groups_.size(); i++) { - if (row_number > target_row_groups_[i].first && - row_number < target_row_groups_[i].second) { + uint32_t rg_id = target_row_groups_[i].row_group_index; + uint64_t rg_start = all_row_group_ranges_[rg_id].first; + uint64_t rg_end = all_row_group_ranges_[rg_id].second; + if (row_number > rg_start && row_number < rg_end) { return Status::Invalid( fmt::format("seek to row failed. row number {} should not be in the middle of " "readable range", row_number)); } - if (target_row_groups_[i].first >= row_number) { + if (rg_start >= row_number) { current_row_group_idx_ = i; - next_row_to_read_ = target_row_groups_[i].first; + next_row_to_read_ = rg_start; // Rebuild batch_reader_ only for non-page-filtered row groups at/after seek // position. Page-filtered RGs need no seek-side bookkeeping: their per-RG @@ -182,10 +185,8 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { // time, so backward seek "just works". std::vector target_row_group_indices; for (uint64_t j = i; j < target_row_groups_.size(); j++) { - if (page_filtered_indices_.count(j) == 0) { - PAIMON_ASSIGN_OR_RAISE(int32_t row_group_id, - GetRowGroupId(target_row_groups_[j])); - target_row_group_indices.push_back(row_group_id); + if (!target_row_groups_[j].is_page_filtered) { + target_row_group_indices.push_back(target_row_groups_[j].row_group_index); } } if (!target_row_group_indices.empty()) { @@ -207,8 +208,7 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { Result> FileReaderWrapper::Next() { try { if (PAIMON_UNLIKELY(!reader_initialized_)) { - PAIMON_RETURN_NOT_OK( - PrepareForReading(target_row_group_indices_, target_column_indices_)); + PAIMON_RETURN_NOT_OK(PrepareForReading(target_row_groups_, target_column_indices_)); } // Loop until we produce a batch or exhaust all row groups. A null from the active @@ -216,7 +216,9 @@ Result> FileReaderWrapper::Next() { // surfacing a spurious null to the caller. while (current_row_group_idx_ < target_row_groups_.size()) { std::shared_ptr record_batch; - bool is_page_filtered = page_filtered_indices_.count(current_row_group_idx_) > 0; + int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index; + uint64_t rg_end = all_row_group_ranges_[rg_id].second; + bool is_page_filtered = target_row_groups_[current_row_group_idx_].is_page_filtered; if (is_page_filtered) { // Construct the per-RG streaming reader on demand. Inputs are recomputed each @@ -226,20 +228,9 @@ Result> FileReaderWrapper::Next() { // uniformly: SeekToRow only resets current_page_filtered_reader_, and the // next Next() rebuilds from authoritative state. if (!current_page_filtered_reader_) { - PAIMON_ASSIGN_OR_RAISE( - int32_t rg_index, - GetRowGroupId(target_row_groups_[current_row_group_idx_])); - auto range_it = row_group_row_ranges_.find(rg_index); - if (range_it == row_group_row_ranges_.end()) { - return Status::Invalid( - fmt::format("page-filtered row group {} missing row ranges in " - "row_group_row_ranges_", - rg_index)); - } - const RowRanges& row_ranges = range_it->second; + const auto& row_ranges = target_row_groups_[current_row_group_idx_].row_ranges; auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( - file_reader_->parquet_reader(), rg_index, row_ranges, - target_column_indices_); + file_reader_->parquet_reader(), rg_id, row_ranges, target_column_indices_); bool pre_buffered = !prebuffered_ranges_.empty(); // batch_size_ == 0 means "no per-batch row cap" in the wrapper's contract, // but TableBatchReader::set_chunksize(0) would loop forever emitting empty @@ -249,12 +240,12 @@ Result> FileReaderWrapper::Next() { batch_size_ > 0 ? batch_size_ : std::numeric_limits::max(); PAIMON_ASSIGN_OR_RAISE(current_page_filtered_reader_, PageFilteredRowGroupReader::ReadFilteredRowGroup( - file_reader_->parquet_reader(), rg_index, row_ranges, + file_reader_->parquet_reader(), rg_id, row_ranges, target_column_indices_, page_filtered_read_schema_, pool_, file_reader_->properties().cache_options(), pre_buffered, page_ranges, max_chunksize)); current_filtered_row_ranges_ = row_ranges; - current_filtered_rg_start_ = target_row_groups_[current_row_group_idx_].first; + current_filtered_rg_start_ = all_row_group_ranges_[rg_id].first; filtered_global_offset_ = 0; } PAIMON_RETURN_NOT_OK_FROM_ARROW( @@ -278,24 +269,24 @@ Result> FileReaderWrapper::Next() { // Stay on this RG; the next ReadNext will either return more data or null. } else { previous_first_row_ = next_row_to_read_; - if (next_row_to_read_ + num_rows < - target_row_groups_[current_row_group_idx_].second) { + if (next_row_to_read_ + num_rows < rg_end) { next_row_to_read_ += num_rows; - } else if (next_row_to_read_ + num_rows == - target_row_groups_[current_row_group_idx_].second) { + } else if (next_row_to_read_ + num_rows == rg_end) { if (current_row_group_idx_ == target_row_groups_.size() - 1) { next_row_to_read_ = num_rows_; } else { current_row_group_idx_++; - next_row_to_read_ = target_row_groups_[current_row_group_idx_].first; + next_row_to_read_ = + all_row_group_ranges_[target_row_groups_[current_row_group_idx_] + .row_group_index] + .first; } } else { return Status::Invalid(fmt::format( "Next failed. Unexpected error, next row to read {} + num rows just " "read {} should always be within current row group range or exactly " "equals to current row group end {}", - next_row_to_read_, num_rows, - target_row_groups_[current_row_group_idx_].second)); + next_row_to_read_, num_rows, rg_end)); } } return record_batch; @@ -311,7 +302,10 @@ Result> FileReaderWrapper::Next() { current_row_group_idx_ = target_row_groups_.size(); } else { current_row_group_idx_++; - next_row_to_read_ = target_row_groups_[current_row_group_idx_].first; + next_row_to_read_ = + all_row_group_ranges_[target_row_groups_[current_row_group_idx_] + .row_group_index] + .first; } } else { // Fully-matched path: batch_reader_ is exhausted with no more RBs to align on @@ -341,28 +335,20 @@ Result>> FileReaderWrapper::GetRowGrou return row_group_ranges; } -Status FileReaderWrapper::PrepareForReadingLazy(const std::set& target_row_group_indices, - const std::vector& column_indices) { - target_row_group_indices_ = target_row_group_indices; +Status FileReaderWrapper::PrepareForReadingLazy( + const std::vector& target_row_groups, + const std::vector& column_indices) { + target_row_groups_ = target_row_groups; target_column_indices_ = column_indices; reader_initialized_ = false; return Status::OK(); } -Status FileReaderWrapper::PrepareForReading(const std::set& target_row_group_indices, +Status FileReaderWrapper::PrepareForReading(const std::vector& target_row_groups, const std::vector& column_indices) { try { - std::vector> target_row_groups; - PAIMON_ASSIGN_OR_RAISE(target_row_groups, GetRowGroupRanges(target_row_group_indices)); - - // Build position map: rg_index -> position in target_row_groups (O(1) lookup) - std::map rg_idx_to_position; - { - uint64_t pos = 0; - for (int32_t rg_idx : target_row_group_indices) { - rg_idx_to_position[rg_idx] = pos++; - } - } + target_row_groups_ = target_row_groups; + target_column_indices_ = column_indices; // Separate row groups into fully matched (Arrow's standard reader) and partially // matched (page-filtered, per-RG reader constructed on demand in Next()). @@ -370,19 +356,14 @@ Status FileReaderWrapper::PrepareForReading(const std::set& target_row_ // recomputed on demand in Next() from row_group_row_ranges_ + target_column_indices_, // mirroring how the fully-matched path lets Arrow's FileReader own all metadata. std::vector fully_matched_row_groups; - page_filtered_indices_.clear(); page_filtered_read_schema_.reset(); // Page-level byte ranges collected here only for the bulk PreBuffer call below; // discarded once PreBuffer is dispatched. std::vector<::arrow::io::ReadRange> page_filtered_byte_ranges; - for (int32_t rg_idx : target_row_group_indices) { - auto range_it = row_group_row_ranges_.find(rg_idx); - if (range_it != row_group_row_ranges_.end()) { - uint64_t pos = rg_idx_to_position[rg_idx]; - page_filtered_indices_.insert(pos); - + for (const auto& target_row_group : target_row_groups_) { + if (target_row_group.is_page_filtered) { // Build the page-filter read_schema once on first encounter — it's identical // across all page-filtered RGs in this session. if (!page_filtered_read_schema_) { @@ -405,12 +386,13 @@ Status FileReaderWrapper::PrepareForReading(const std::set& target_row_ } auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( - file_reader_->parquet_reader(), rg_idx, range_it->second, column_indices); + file_reader_->parquet_reader(), target_row_group.row_group_index, + target_row_group.row_ranges, column_indices); page_filtered_byte_ranges.insert(page_filtered_byte_ranges.end(), std::make_move_iterator(page_ranges.begin()), std::make_move_iterator(page_ranges.end())); } else { - fully_matched_row_groups.push_back(rg_idx); + fully_matched_row_groups.push_back(target_row_group.row_group_index); } } @@ -437,7 +419,15 @@ Status FileReaderWrapper::PrepareForReading(const std::set& target_row_ // would tear down and rebuild cached_source_, redundantly re-issuing the same IO // on remote filesystems. The manual path is only needed to merge page-level ranges // with column-chunk ranges into a single PreBuffer covering both kinds of RGs. - if (!page_filtered_indices_.empty()) { + bool has_page_filtered_row_groups = false; + for (const auto& target_row_group : target_row_groups_) { + if (target_row_group.is_page_filtered) { + has_page_filtered_row_groups = true; + break; + } + } + + if (has_page_filtered_row_groups) { std::vector<::arrow::io::ReadRange> all_ranges = std::move(page_filtered_byte_ranges); // Fully-matched row groups: add entire column chunk ranges @@ -477,13 +467,11 @@ Status FileReaderWrapper::PrepareForReading(const std::set& target_row_ prebuffered_ranges_.clear(); } } - target_row_groups_ = target_row_groups; - target_column_indices_ = column_indices; batch_reader_ = std::move(batch_reader); if (target_row_groups_.empty()) { next_row_to_read_ = num_rows_; } else { - next_row_to_read_ = target_row_groups_[0].first; + next_row_to_read_ = all_row_group_ranges_[target_row_groups_[0].row_group_index].first; } previous_first_row_ = std::numeric_limits::max(); current_row_group_idx_ = 0; diff --git a/src/paimon/format/parquet/file_reader_wrapper.h b/src/paimon/format/parquet/file_reader_wrapper.h index c023a4cfd..819d42ed0 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.h +++ b/src/paimon/format/parquet/file_reader_wrapper.h @@ -109,12 +109,12 @@ class FileReaderWrapper { /// Prepare for lazy reading of the specified row groups and columns. /// Actual reader initialization is deferred until the first Next() call. - Status PrepareForReadingLazy(const std::set& row_group_indices, + Status PrepareForReadingLazy(const std::vector& target_row_groups, const std::vector& column_indices); /// Prepare for immediate reading of the specified row groups and columns. /// Initializes the reader and starts pre-buffering I/O. - Status PrepareForReading(const std::set& row_group_indices, + Status PrepareForReading(const std::vector& target_row_groups, const std::vector& column_indices); /// Filter row groups by read ranges, returning only those that overlap. @@ -122,12 +122,6 @@ class FileReaderWrapper { const std::vector>& read_ranges, const std::vector& src_row_groups) const; - /// Set per-row-group RowRanges for page-level filtering. - /// Only partially matched row groups should have entries. - void SetRowGroupRowRanges(const std::map& ranges) { - row_group_row_ranges_ = ranges; - } - /// Get the page index reader for the file. /// Returns nullptr if page index is not available. std::shared_ptr<::parquet::PageIndexReader> GetPageIndexReader(); @@ -154,8 +148,6 @@ class FileReaderWrapper { std::unique_ptr batch_reader_; std::vector> all_row_group_ranges_; - std::set target_row_group_indices_; - std::vector> target_row_groups_; std::vector target_column_indices_; ::arrow::MemoryPool* pool_; @@ -175,13 +167,8 @@ class FileReaderWrapper { RowRanges current_filtered_row_ranges_; // RowRanges for the active page-filtered RG uint64_t current_filtered_rg_start_ = 0; // Absolute row-group start row number - // Page-level filtering state. Externally injected via SetRowGroupRowRanges and - // looked up by row group index when entering a page-filtered RG. - std::map row_group_row_ranges_; - - // Set of target_row_groups_ positional indices that use page-filtered reading. - // Built in PrepareForReading from row_group_row_ranges_. - std::set page_filtered_indices_; + // Target row groups with row ranges for none page-level filtering and page-level filtering + std::vector target_row_groups_; // Arrow schema covering target_column_indices_, used when constructing the per-RG // page-filtered reader. Cached in PrepareForReading because it's identical across diff --git a/src/paimon/format/parquet/file_reader_wrapper_test.cpp b/src/paimon/format/parquet/file_reader_wrapper_test.cpp index b4c3d5880..26a9f9501 100644 --- a/src/paimon/format/parquet/file_reader_wrapper_test.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper_test.cpp @@ -261,11 +261,9 @@ TEST_F(FileReaderWrapperTest, PageFilteredZeroBatchSizeDoesNotHang) { // contiguous ranges keep the test honest about RowRanges semantics; the actual // numbers don't matter as long as their total falls inside the row group. RowRanges rr({RowRanges::Range(0, 49), RowRanges::Range(100, 149)}); - reader_wrapper->SetRowGroupRowRanges({{0, rr}}); std::vector all_columns = {0, 1, 2}; - ASSERT_OK(reader_wrapper->PrepareForReading({0}, all_columns)); - + ASSERT_OK(reader_wrapper->PrepareForReading({TargetRowGroup(0, true, rr)}, all_columns)); int64_t total = 0; int64_t batch_count = 0; while (true) { @@ -295,10 +293,11 @@ TEST_F(FileReaderWrapperTest, SeekBackToConsumedPageFilteredRowGroup) { std::map row_ranges_map; row_ranges_map[0] = RowRanges(RowRanges::Range(10, 49)); row_ranges_map[1] = RowRanges(RowRanges::Range(100, 149)); - reader_wrapper->SetRowGroupRowRanges(row_ranges_map); std::vector all_columns = {0, 1, 2}; - ASSERT_OK(reader_wrapper->PrepareForReading({0, 1}, all_columns)); + ASSERT_OK(reader_wrapper->PrepareForReading( + {TargetRowGroup(0, true, row_ranges_map[0]), TargetRowGroup(1, true, row_ranges_map[1])}, + all_columns)); auto count_all_rows = [&](int64_t* out_total) { int64_t total = 0; @@ -348,8 +347,7 @@ TEST_F(FileReaderWrapperTest, PageFilteredRespectsBatchSize) { for (int64_t batch_size : {int64_t{1}, int64_t{2}, int64_t{3}, int64_t{5}, int64_t{10}}) { SCOPED_TRACE("batch_size=" + std::to_string(batch_size)); ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path, batch_size)); - reader_wrapper->SetRowGroupRowRanges({{0, rr}}); - ASSERT_OK(reader_wrapper->PrepareForReading({0}, {0, 1, 2})); + ASSERT_OK(reader_wrapper->PrepareForReading({TargetRowGroup(0, true, rr)}, {0, 1, 2})); int64_t total = 0; int64_t batch_count = 0; @@ -417,8 +415,9 @@ TEST_F(FileReaderWrapperTest, PrepareForReading) { std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet"); PrepareParquetFile(file_path, /*row_count=*/5500); ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path)); - ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{1}, - /*column_indices=*/{0})); + ASSERT_OK(reader_wrapper->PrepareForReading( + /*target_row_groups=*/{TargetRowGroup(1, false, RowRanges())}, + /*column_indices=*/{0})); // seek before actual read range ASSERT_OK(reader_wrapper->SeekToRow(0)); ASSERT_EQ(1000, reader_wrapper->GetNextRowToRead()); @@ -438,8 +437,10 @@ TEST_F(FileReaderWrapperTest, PrepareForReading) { ASSERT_FALSE(record_batch); // empty column indices - ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{0, 1}, - /*column_indices=*/{})); + ASSERT_OK(reader_wrapper->PrepareForReading( + /*target_row_groups=*/{TargetRowGroup(0, false, RowRanges()), + TargetRowGroup(1, false, RowRanges())}, + /*column_indices=*/{})); ASSERT_EQ(0, reader_wrapper->GetNextRowToRead()); ASSERT_EQ(std::numeric_limits::max(), reader_wrapper->GetPreviousBatchFirstRowNumber().value()); @@ -448,8 +449,9 @@ TEST_F(FileReaderWrapperTest, PrepareForReading) { ASSERT_EQ(0, record_batch->num_columns()); // empty row group indices - ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{}, - /*column_indices=*/{0})); + ASSERT_OK(reader_wrapper->PrepareForReading( + /*target_row_groups=*/{}, + /*column_indices=*/{0})); ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead()); ASSERT_EQ(std::numeric_limits::max(), reader_wrapper->GetPreviousBatchFirstRowNumber().value()); diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 8da49efd6..24ae76fb2 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -159,6 +159,9 @@ Status ParquetFileBatchReader::SetReadSchema( } // Apply page-level filtering after bitmap pruning so we don't read page index // pages for row groups that the bitmap already excluded. + // If no predicate is provided, skip page-level filtering, row_group_row_ranges will be + // empty + std::map row_group_row_ranges; if (predicate && !row_groups.empty()) { PAIMON_ASSIGN_OR_RAISE( bool enable_page_index_filter, @@ -181,7 +184,7 @@ Status ParquetFileBatchReader::SetReadSchema( auto page_filter_result, FilterRowGroupsByPageIndex(predicate, column_name_to_index, row_groups)); row_groups = std::move(page_filter_result.first); - reader_->SetRowGroupRowRanges(page_filter_result.second); + row_group_row_ranges = std::move(page_filter_result.second); } } @@ -197,18 +200,20 @@ Status ParquetFileBatchReader::SetReadSchema( std::set ordered_row_groups, reader_->FilterRowGroupsByReadRanges(read_ranges_, read_row_groups_)); - // When predicate or selection is applied, prepare eagerly so PreBuffer I/O - // starts immediately. All file readers are created before consumption begins, - // so eager preparation allows I/O for multiple files to overlap. - Status ret; - if (predicate || selection_bitmap) { - ret = reader_->PrepareForReading(ordered_row_groups, read_column_indices_); - } else { - ret = reader_->PrepareForReadingLazy(ordered_row_groups, read_column_indices_); + // build target row groups with page-level row ranges for PrepareForReadingLazy + std::vector target_row_groups; + for (int32_t rg_id : ordered_row_groups) { + auto it = row_group_row_ranges.find(rg_id); + if (it != row_group_row_ranges.end()) { + target_row_groups.emplace_back(rg_id, true, it->second); + } else { + target_row_groups.emplace_back(rg_id, false, RowRanges()); + } } - return ret; + PAIMON_RETURN_NOT_OK(reader_->PrepareForReadingLazy(target_row_groups, column_indices)); } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("ParquetFileBatchReader::SetReadSchema") + return Status::OK(); } Result> ParquetFileBatchReader::FilterRowGroupsByPredicate( diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 632d7762a..900c72ab9 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -106,7 +106,12 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { PAIMON_ASSIGN_OR_RAISE( std::set ordered_row_groups, reader_->FilterRowGroupsByReadRanges(read_ranges_, read_row_groups_)); - return reader_->PrepareForReadingLazy(ordered_row_groups, read_column_indices_); + + std::vector target_row_groups; + for (const auto& row_group_id : ordered_row_groups) { + target_row_groups.emplace_back(row_group_id, false, RowRanges()); + } + return reader_->PrepareForReadingLazy(target_row_groups, read_column_indices_); } std::shared_ptr GetReaderMetrics() const override { diff --git a/src/paimon/format/parquet/row_ranges.h b/src/paimon/format/parquet/row_ranges.h index 401e49574..112ac9c86 100644 --- a/src/paimon/format/parquet/row_ranges.h +++ b/src/paimon/format/parquet/row_ranges.h @@ -105,4 +105,16 @@ class RowRanges { std::vector ranges_; }; +struct TargetRowGroup { + int32_t row_group_index; + bool is_page_filtered; + // page-filtered row ranges, only valid if is_page_filtered is true. + RowRanges row_ranges; + + TargetRowGroup() = default; + TargetRowGroup(int32_t rg_index, bool page_filtered, RowRanges ranges) + : row_group_index(rg_index), + is_page_filtered(page_filtered), + row_ranges(std::move(ranges)) {} +}; } // namespace paimon::parquet From 88a7c588d75000505f841872fb2e3d6d2f8247c8 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Wed, 3 Jun 2026 14:54:56 +0800 Subject: [PATCH 08/14] refractor: change the calling chain of SetReadRange, delete unused member variables in FileBatchReader --- .../format/parquet/file_reader_wrapper.cpp | 49 ++++++++----------- .../format/parquet/file_reader_wrapper.h | 11 ++--- .../parquet/file_reader_wrapper_test.cpp | 49 ++++++++++--------- .../parquet/parquet_file_batch_reader.cpp | 11 ++--- .../parquet/parquet_file_batch_reader.h | 13 +---- 5 files changed, 54 insertions(+), 79 deletions(-) diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index 2fbc9dca3..04be2c85e 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -481,39 +481,32 @@ Status FileReaderWrapper::PrepareForReading(const std::vector& t PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("FileReaderWrapper::PrepareForReading") } -Result> FileReaderWrapper::FilterRowGroupsByReadRanges( - const std::vector>& read_ranges, - const std::vector& src_row_groups) const { - std::set target_row_groups; - PAIMON_ASSIGN_OR_RAISE(std::set row_groups_to_read, - ReadRangesToRowGroupIds(read_ranges)); - for (const auto& row_group_id : src_row_groups) { - if (row_groups_to_read.find(row_group_id) != row_groups_to_read.end()) { - target_row_groups.emplace(row_group_id); - } +Status FileReaderWrapper::ApplyReadRanges( + const std::vector>& read_ranges) { + if (read_ranges.empty()) { + target_row_groups_.clear(); + reader_initialized_ = false; + return Status::OK(); } - return target_row_groups; -} - -Result> FileReaderWrapper::ReadRangesToRowGroupIds( - const std::vector>& read_ranges) const { - std::set selected_row_group_ids; + // Build a set of row group indices whose range matches one of the read ranges. + std::set matching_rg_indices; for (const auto& read_range : read_ranges) { - PAIMON_ASSIGN_OR_RAISE(int32_t row_group_id, GetRowGroupId(read_range)); - selected_row_group_ids.emplace(row_group_id); + for (size_t i = 0; i < all_row_group_ranges_.size(); i++) { + if (all_row_group_ranges_[i] == read_range) { + matching_rg_indices.insert(static_cast(i)); + } + } } - return selected_row_group_ids; -} - -Result FileReaderWrapper::GetRowGroupId(std::pair target_range) const { - for (size_t i = 0; i < all_row_group_ranges_.size(); i++) { - if (all_row_group_ranges_[i] == target_range) { - return i; + // Keep only target row groups whose row_group_index is in matching set. + std::vector filtered; + for (const auto& trg : target_row_groups_) { + if (matching_rg_indices.count(trg.row_group_index) > 0) { + filtered.push_back(trg); } } - return Status::Invalid(fmt::format( - "not expected failure. target range bound '{},{}' not match with row group range bound", - target_range.first, target_range.second)); + target_row_groups_ = std::move(filtered); + reader_initialized_ = false; + return Status::OK(); } std::shared_ptr<::parquet::PageIndexReader> FileReaderWrapper::GetPageIndexReader() { diff --git a/src/paimon/format/parquet/file_reader_wrapper.h b/src/paimon/format/parquet/file_reader_wrapper.h index 819d42ed0..a0e2681e1 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.h +++ b/src/paimon/format/parquet/file_reader_wrapper.h @@ -117,10 +117,10 @@ class FileReaderWrapper { Status PrepareForReading(const std::vector& target_row_groups, const std::vector& column_indices); - /// Filter row groups by read ranges, returning only those that overlap. - Result> FilterRowGroupsByReadRanges( - const std::vector>& read_ranges, - const std::vector& src_row_groups) const; + /// Apply read ranges to the current target_row_groups_, keeping only those + /// whose row-group range overlaps with one of the given read ranges. + /// Resets reader state so that the next Next() call will re-initialize. + Status ApplyReadRanges(const std::vector>& read_ranges); /// Get the page index reader for the file. /// Returns nullptr if page index is not available. @@ -140,9 +140,6 @@ class FileReaderWrapper { const std::vector>& all_row_group_ranges, uint64_t num_rows, ::arrow::MemoryPool* pool, int64_t batch_size); - Result> ReadRangesToRowGroupIds( - const std::vector>& read_ranges) const; - Result GetRowGroupId(std::pair target_range) const; std::unique_ptr<::parquet::arrow::FileReader> file_reader_; std::unique_ptr batch_reader_; diff --git a/src/paimon/format/parquet/file_reader_wrapper_test.cpp b/src/paimon/format/parquet/file_reader_wrapper_test.cpp index 26a9f9501..dc3c5a91c 100644 --- a/src/paimon/format/parquet/file_reader_wrapper_test.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper_test.cpp @@ -378,37 +378,38 @@ TEST_F(FileReaderWrapperTest, GetRowGroupRanges) { ASSERT_TRUE(ranges.empty()); } -TEST_F(FileReaderWrapperTest, ReadRangesToRowGroupIds) { +TEST_F(FileReaderWrapperTest, ApplyReadRanges) { std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet"); PrepareParquetFile(file_path, /*row_count=*/5500); ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path)); - std::set expected_row_group_ids = {0, 3, 5}; - std::vector> read_ranges = { - {0, 1000}, {3000, 4000}, {5000, 5500}}; - ASSERT_OK_AND_ASSIGN(auto row_group_ids, reader_wrapper->ReadRangesToRowGroupIds(read_ranges)); - ASSERT_EQ(expected_row_group_ids, row_group_ids); - std::vector> invalid_ranges = { - {0, 1000}, {3000, 4000}, {5000, 5600}}; - ASSERT_NOK_WITH_MSG(reader_wrapper->ReadRangesToRowGroupIds(invalid_ranges), - "not match with row group range bound"); - ASSERT_OK_AND_ASSIGN(row_group_ids, reader_wrapper->ReadRangesToRowGroupIds({})); - ASSERT_TRUE(row_group_ids.empty()); -} -TEST_F(FileReaderWrapperTest, FilterRowGroupsByReadRanges) { - std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet"); - PrepareParquetFile(file_path, /*row_count=*/5500); - ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path)); - std::set expected_row_group_ids = {0, 5}; + // Prepare with a subset of row groups: {0, 1, 2, 4, 5} + std::vector initial_targets = { + TargetRowGroup(0, false, RowRanges()), TargetRowGroup(1, false, RowRanges()), + TargetRowGroup(2, false, RowRanges()), TargetRowGroup(4, false, RowRanges()), + TargetRowGroup(5, false, RowRanges())}; + std::vector all_columns = {0, 1, 2}; + ASSERT_OK(reader_wrapper->PrepareForReadingLazy(initial_targets, all_columns)); + + // Apply read ranges that match RG 0, 3, 5. Only 0 and 5 are in initial targets. std::vector> read_ranges = { {0, 1000}, {3000, 4000}, {5000, 5500}}; - ASSERT_OK_AND_ASSIGN(auto row_group_ids, - reader_wrapper->FilterRowGroupsByReadRanges(read_ranges, {0, 1, 2, 4, 5})); - ASSERT_EQ(expected_row_group_ids, row_group_ids); + ASSERT_OK(reader_wrapper->ApplyReadRanges(read_ranges)); - ASSERT_OK_AND_ASSIGN(row_group_ids, - reader_wrapper->FilterRowGroupsByReadRanges(read_ranges, {})); - ASSERT_TRUE(row_group_ids.empty()); + // Verify: reading should only produce rows from RG 0 (1000 rows) and RG 5 (500 rows). + int64_t total_rows = 0; + while (true) { + ASSERT_OK_AND_ASSIGN(auto batch, reader_wrapper->Next()); + if (!batch) break; + total_rows += batch->num_rows(); + } + ASSERT_EQ(1500, total_rows); + + // Apply empty read ranges should result in no data. + ASSERT_OK(reader_wrapper->PrepareForReadingLazy(initial_targets, all_columns)); + ASSERT_OK(reader_wrapper->ApplyReadRanges({})); + ASSERT_OK_AND_ASSIGN(auto batch, reader_wrapper->Next()); + ASSERT_FALSE(batch); } TEST_F(FileReaderWrapperTest, PrepareForReading) { diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 24ae76fb2..5979119c5 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -189,20 +189,14 @@ Status ParquetFileBatchReader::SetReadSchema( } read_data_type_ = arrow::struct_(read_schema->fields()); - read_row_groups_ = row_groups; - read_column_indices_ = column_indices; metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_TOTAL, reader_->GetNumberOfRowGroups()); metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_AFTER_FILTER, row_groups.size()); - PAIMON_ASSIGN_OR_RAISE( - std::set ordered_row_groups, - reader_->FilterRowGroupsByReadRanges(read_ranges_, read_row_groups_)); - - // build target row groups with page-level row ranges for PrepareForReadingLazy + // Build TargetRowGroup list with page-filter info in one shot. std::vector target_row_groups; - for (int32_t rg_id : ordered_row_groups) { + for (int32_t rg_id : row_groups) { auto it = row_group_row_ranges.find(rg_id); if (it != row_group_row_ranges.end()) { target_row_groups.emplace_back(rg_id, true, it->second); @@ -211,6 +205,7 @@ Status ParquetFileBatchReader::SetReadSchema( } } PAIMON_RETURN_NOT_OK(reader_->PrepareForReadingLazy(target_row_groups, column_indices)); + PAIMON_RETURN_NOT_OK(reader_->ApplyReadRanges(read_ranges_)); } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("ParquetFileBatchReader::SetReadSchema") return Status::OK(); diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 900c72ab9..d2f825da7 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -103,15 +103,7 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { Status SetReadRanges(const std::vector>& read_ranges) override { read_ranges_ = read_ranges; - PAIMON_ASSIGN_OR_RAISE( - std::set ordered_row_groups, - reader_->FilterRowGroupsByReadRanges(read_ranges_, read_row_groups_)); - - std::vector target_row_groups; - for (const auto& row_group_id : ordered_row_groups) { - target_row_groups.emplace_back(row_group_id, false, RowRanges()); - } - return reader_->PrepareForReadingLazy(target_row_groups, read_column_indices_); + return reader_->ApplyReadRanges(read_ranges); } std::shared_ptr GetReaderMetrics() const override { @@ -192,9 +184,6 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { uint64_t read_rows_ = 0; uint64_t read_batch_count_ = 0; - // last time set read schema - std::vector read_row_groups_; - std::vector read_column_indices_; }; } // namespace paimon::parquet From c4f4ac69a90864585a32a4bd9b7e78644006e86a Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Wed, 3 Jun 2026 15:32:10 +0800 Subject: [PATCH 09/14] refractor: split PrepareForReading(), Next(), SeekToRow() into smaller functions --- .../format/parquet/file_reader_wrapper.cpp | 396 ++++++++---------- .../format/parquet/file_reader_wrapper.h | 21 +- .../parquet/parquet_file_batch_reader.h | 1 - 3 files changed, 201 insertions(+), 217 deletions(-) diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index 04be2c85e..963216a1a 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -159,9 +159,19 @@ void FileReaderWrapper::WaitForPendingPreBuffer() { } } +void FileReaderWrapper::AdvanceToNextRowGroup() { + if (current_row_group_idx_ >= target_row_groups_.size() - 1) { + next_row_to_read_ = num_rows_; + current_row_group_idx_ = target_row_groups_.size(); + } else { + current_row_group_idx_++; + next_row_to_read_ = + all_row_group_ranges_[target_row_groups_[current_row_group_idx_].row_group_index].first; + } +} + Status FileReaderWrapper::SeekToRow(uint64_t row_number) { try { - // Reset any in-progress page-filtered streaming current_page_filtered_reader_.reset(); filtered_global_offset_ = 0; @@ -179,19 +189,16 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { current_row_group_idx_ = i; next_row_to_read_ = rg_start; - // Rebuild batch_reader_ only for non-page-filtered row groups at/after seek - // position. Page-filtered RGs need no seek-side bookkeeping: their per-RG - // reader is constructed on demand in Next() from row_group_row_ranges_ each - // time, so backward seek "just works". - std::vector target_row_group_indices; + // Rebuild batch_reader_ for non-page-filtered RGs at/after seek position. + std::vector fully_matched_indices; for (uint64_t j = i; j < target_row_groups_.size(); j++) { if (!target_row_groups_[j].is_page_filtered) { - target_row_group_indices.push_back(target_row_groups_[j].row_group_index); + fully_matched_indices.push_back(target_row_groups_[j].row_group_index); } } - if (!target_row_group_indices.empty()) { + if (!fully_matched_indices.empty()) { PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader( - target_row_group_indices, target_column_indices_, &batch_reader_)); + fully_matched_indices, target_column_indices_, &batch_reader_)); } else { batch_reader_.reset(); } @@ -205,119 +212,96 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("FileReaderWrapper::SeekToRow") } +Result> FileReaderWrapper::NextPageFiltered() { + int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index; + + // Construct the per-RG streaming reader on demand. + if (!current_page_filtered_reader_) { + const auto& row_ranges = target_row_groups_[current_row_group_idx_].row_ranges; + auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( + file_reader_->parquet_reader(), rg_id, row_ranges, target_column_indices_); + bool pre_buffered = !prebuffered_ranges_.empty(); + int64_t max_chunksize = batch_size_ > 0 ? batch_size_ : std::numeric_limits::max(); + PAIMON_ASSIGN_OR_RAISE( + current_page_filtered_reader_, + PageFilteredRowGroupReader::ReadFilteredRowGroup( + file_reader_->parquet_reader(), rg_id, row_ranges, target_column_indices_, + page_filtered_read_schema_, pool_, file_reader_->properties().cache_options(), + pre_buffered, page_ranges, max_chunksize)); + current_filtered_row_ranges_ = row_ranges; + current_filtered_rg_start_ = all_row_group_ranges_[rg_id].first; + filtered_global_offset_ = 0; + } + + std::shared_ptr record_batch; + PAIMON_RETURN_NOT_OK_FROM_ARROW(current_page_filtered_reader_->ReadNext(&record_batch)); + + if (record_batch) { + auto original_row = + current_filtered_row_ranges_.MapFilteredIndexToOriginalRow(filtered_global_offset_); + previous_first_row_ = original_row.has_value() ? current_filtered_rg_start_ + + static_cast(*original_row) + : current_filtered_rg_start_; + filtered_global_offset_ += record_batch->num_rows(); + return record_batch; + } + + // RG exhausted — reset and advance. + current_page_filtered_reader_.reset(); + filtered_global_offset_ = 0; + AdvanceToNextRowGroup(); + return std::shared_ptr(); +} + +Result> FileReaderWrapper::NextFullyMatched() { + if (!batch_reader_) { + return std::shared_ptr(); + } + + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto record_batch, batch_reader_->Next()); + if (!record_batch) { + return std::shared_ptr(); + } + + int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index; + uint64_t rg_end = all_row_group_ranges_[rg_id].second; + int64_t num_rows = record_batch->num_rows(); + + previous_first_row_ = next_row_to_read_; + if (next_row_to_read_ + num_rows < rg_end) { + next_row_to_read_ += num_rows; + } else if (next_row_to_read_ + num_rows == rg_end) { + AdvanceToNextRowGroup(); + } else { + return Status::Invalid( + fmt::format("Next failed. next_row_to_read {} + num_rows {} exceeds row group end {}", + next_row_to_read_, num_rows, rg_end)); + } + return record_batch; +} + Result> FileReaderWrapper::Next() { try { if (PAIMON_UNLIKELY(!reader_initialized_)) { PAIMON_RETURN_NOT_OK(PrepareForReading(target_row_groups_, target_column_indices_)); } - // Loop until we produce a batch or exhaust all row groups. A null from the active - // per-RG reader means that RG is done; we advance and try the next RG without - // surfacing a spurious null to the caller. while (current_row_group_idx_ < target_row_groups_.size()) { - std::shared_ptr record_batch; - int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index; - uint64_t rg_end = all_row_group_ranges_[rg_id].second; bool is_page_filtered = target_row_groups_[current_row_group_idx_].is_page_filtered; - - if (is_page_filtered) { - // Construct the per-RG streaming reader on demand. Inputs are recomputed each - // time from existing wrapper fields (no per-RG meta cached on the wrapper), - // mirroring how the fully-matched path delegates to Arrow's stateless - // GetRecordBatchReader. This makes both forward and backward seeks work - // uniformly: SeekToRow only resets current_page_filtered_reader_, and the - // next Next() rebuilds from authoritative state. - if (!current_page_filtered_reader_) { - const auto& row_ranges = target_row_groups_[current_row_group_idx_].row_ranges; - auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( - file_reader_->parquet_reader(), rg_id, row_ranges, target_column_indices_); - bool pre_buffered = !prebuffered_ranges_.empty(); - // batch_size_ == 0 means "no per-batch row cap" in the wrapper's contract, - // but TableBatchReader::set_chunksize(0) would loop forever emitting empty - // batches. Translate to int64_max so the reader produces one batch per - // underlying chunk boundary instead. - int64_t max_chunksize = - batch_size_ > 0 ? batch_size_ : std::numeric_limits::max(); - PAIMON_ASSIGN_OR_RAISE(current_page_filtered_reader_, - PageFilteredRowGroupReader::ReadFilteredRowGroup( - file_reader_->parquet_reader(), rg_id, row_ranges, - target_column_indices_, page_filtered_read_schema_, - pool_, file_reader_->properties().cache_options(), - pre_buffered, page_ranges, max_chunksize)); - current_filtered_row_ranges_ = row_ranges; - current_filtered_rg_start_ = all_row_group_ranges_[rg_id].first; - filtered_global_offset_ = 0; - } - PAIMON_RETURN_NOT_OK_FROM_ARROW( - current_page_filtered_reader_->ReadNext(&record_batch)); - } else if (batch_reader_) { - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(record_batch, batch_reader_->Next()); - } - - if (record_batch) { - int64_t num_rows = record_batch->num_rows(); - if (is_page_filtered) { - // Map the cumulative filtered-row offset back to the original row index - // within this row group. Must be evaluated BEFORE incrementing the offset. - auto original_row = current_filtered_row_ranges_.MapFilteredIndexToOriginalRow( - filtered_global_offset_); - previous_first_row_ = - original_row.has_value() - ? current_filtered_rg_start_ + static_cast(*original_row) - : current_filtered_rg_start_; - filtered_global_offset_ += num_rows; - // Stay on this RG; the next ReadNext will either return more data or null. - } else { - previous_first_row_ = next_row_to_read_; - if (next_row_to_read_ + num_rows < rg_end) { - next_row_to_read_ += num_rows; - } else if (next_row_to_read_ + num_rows == rg_end) { - if (current_row_group_idx_ == target_row_groups_.size() - 1) { - next_row_to_read_ = num_rows_; - } else { - current_row_group_idx_++; - next_row_to_read_ = - all_row_group_ranges_[target_row_groups_[current_row_group_idx_] - .row_group_index] - .first; - } - } else { - return Status::Invalid(fmt::format( - "Next failed. Unexpected error, next row to read {} + num rows just " - "read {} should always be within current row group range or exactly " - "equals to current row group end {}", - next_row_to_read_, num_rows, rg_end)); - } - } - return record_batch; - } - - // Null batch: current row group is exhausted (or fully-matched RGs hit a degenerate - // EOF). Advance to the next row group and continue the loop. - if (is_page_filtered) { - current_page_filtered_reader_.reset(); - filtered_global_offset_ = 0; - if (current_row_group_idx_ == target_row_groups_.size() - 1) { - next_row_to_read_ = num_rows_; - current_row_group_idx_ = target_row_groups_.size(); - } else { - current_row_group_idx_++; - next_row_to_read_ = - all_row_group_ranges_[target_row_groups_[current_row_group_idx_] - .row_group_index] - .first; - } - } else { - // Fully-matched path: batch_reader_ is exhausted with no more RBs to align on - // row counts. Stop here — remaining RGs (if any) should be page-filtered and - // will be handled by re-entering the loop, but if we got here without advancing - // first, treat as terminal to avoid an infinite loop. + PAIMON_ASSIGN_OR_RAISE(auto batch, + is_page_filtered ? NextPageFiltered() : NextFullyMatched()); + if (batch) { + return batch; + } else if (!is_page_filtered) { + // Null from fully-matched path means batch_reader_ is globally exhausted. break; } + // current_row_group_idx_ has been advanced in NextPageFiltered() or NextFullyMatched(), + // loop to try next RG. } previous_first_row_ = next_row_to_read_; - return std::shared_ptr(); // EOF + return std::shared_ptr(); } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("FileReaderWrapper::Next") } @@ -344,130 +328,112 @@ Status FileReaderWrapper::PrepareForReadingLazy( return Status::OK(); } +Status FileReaderWrapper::BuildPageFilteredSchema(const std::vector& column_indices) { + if (page_filtered_read_schema_) { + return Status::OK(); + } + std::shared_ptr schema; + PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetSchema(&schema)); + auto parquet_schema = file_reader_->parquet_reader()->metadata()->schema(); + std::vector> fields; + for (int32_t col_idx : column_indices) { + const std::string& col_name = parquet_schema->Column(col_idx)->name(); + auto field = schema->GetFieldByName(col_name); + if (!field) { + return Status::Invalid(fmt::format( + "PrepareForReading: Parquet column {} ('{}') has no matching Arrow field in " + "file schema", + col_idx, col_name)); + } + fields.push_back(field); + } + page_filtered_read_schema_ = arrow::schema(fields); + return Status::OK(); +} + +std::vector<::arrow::io::ReadRange> FileReaderWrapper::CollectPreBufferRanges( + const std::vector& fully_matched_row_groups, + const std::vector& column_indices) { + std::vector<::arrow::io::ReadRange> ranges; + + // Page-filtered RGs: only matching page byte ranges. + for (const auto& trg : target_row_groups_) { + if (trg.is_page_filtered) { + auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( + file_reader_->parquet_reader(), trg.row_group_index, trg.row_ranges, + column_indices); + ranges.insert(ranges.end(), std::make_move_iterator(page_ranges.begin()), + std::make_move_iterator(page_ranges.end())); + } + } + + // Fully-matched RGs: entire column chunk ranges. + auto file_metadata = file_reader_->parquet_reader()->metadata(); + for (int32_t rg_idx : fully_matched_row_groups) { + auto rg_metadata = file_metadata->RowGroup(rg_idx); + for (int32_t col_idx : column_indices) { + auto col_chunk = rg_metadata->ColumnChunk(col_idx); + int64_t offset = col_chunk->data_page_offset(); + if (col_chunk->has_dictionary_page() && col_chunk->dictionary_page_offset() > 0 && + offset > col_chunk->dictionary_page_offset()) { + offset = col_chunk->dictionary_page_offset(); + } + ranges.push_back({offset, col_chunk->total_compressed_size()}); + } + } + return ranges; +} + +void FileReaderWrapper::DispatchPreBuffer(std::vector<::arrow::io::ReadRange> ranges) { + const auto& cache_opts = file_reader_->properties().cache_options(); + ::arrow::io::IOContext io_ctx(pool_); + auto merged_ranges = MergeOverlappingRanges(std::move(ranges)); + try { + file_reader_->parquet_reader()->PreBufferRanges(merged_ranges, io_ctx, cache_opts); + prebuffered_ranges_ = std::move(merged_ranges); + } catch (const std::exception&) { + prebuffered_ranges_.clear(); + } +} + Status FileReaderWrapper::PrepareForReading(const std::vector& target_row_groups, const std::vector& column_indices) { try { target_row_groups_ = target_row_groups; target_column_indices_ = column_indices; - - // Separate row groups into fully matched (Arrow's standard reader) and partially - // matched (page-filtered, per-RG reader constructed on demand in Next()). - // Per-RG metadata for the page-filtered path is NOT cached on the wrapper — it's - // recomputed on demand in Next() from row_group_row_ranges_ + target_column_indices_, - // mirroring how the fully-matched path lets Arrow's FileReader own all metadata. - std::vector fully_matched_row_groups; page_filtered_read_schema_.reset(); - // Page-level byte ranges collected here only for the bulk PreBuffer call below; - // discarded once PreBuffer is dispatched. - std::vector<::arrow::io::ReadRange> page_filtered_byte_ranges; - - for (const auto& target_row_group : target_row_groups_) { - if (target_row_group.is_page_filtered) { - // Build the page-filter read_schema once on first encounter — it's identical - // across all page-filtered RGs in this session. - if (!page_filtered_read_schema_) { - std::shared_ptr schema; - PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetSchema(&schema)); - std::vector> fields; - auto parquet_schema = file_reader_->parquet_reader()->metadata()->schema(); - for (int32_t col_idx : column_indices) { - const std::string& col_name = parquet_schema->Column(col_idx)->name(); - auto field = schema->GetFieldByName(col_name); - if (!field) { - return Status::Invalid(fmt::format( - "PrepareForReading: Parquet column {} ('{}') has no matching Arrow " - "field in file schema", - col_idx, col_name)); - } - fields.push_back(field); - } - page_filtered_read_schema_ = arrow::schema(fields); - } - - auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( - file_reader_->parquet_reader(), target_row_group.row_group_index, - target_row_group.row_ranges, column_indices); - page_filtered_byte_ranges.insert(page_filtered_byte_ranges.end(), - std::make_move_iterator(page_ranges.begin()), - std::make_move_iterator(page_ranges.end())); - } else { - fully_matched_row_groups.push_back(target_row_group.row_group_index); + // Partition into fully-matched and page-filtered row groups. + std::vector fully_matched_row_groups; + for (const auto& trg : target_row_groups_) { + if (!trg.is_page_filtered) { + fully_matched_row_groups.push_back(trg.row_group_index); } } - // Wait for any previously pre-buffered data before starting new pre-buffer. + bool has_page_filtered = fully_matched_row_groups.size() != target_row_groups_.size(); + if (has_page_filtered) { + PAIMON_RETURN_NOT_OK(BuildPageFilteredSchema(column_indices)); + } + WaitForPendingPreBuffer(); - // Create standard reader for fully matched row groups FIRST. - // GetRecordBatchReader internally calls PreBuffer, but we'll override it below - // with a single PreBuffer covering ALL row groups (page-filtered + fully-matched) - // so that async I/O for all files starts in parallel. - std::unique_ptr batch_reader; + // Create standard reader for fully-matched row groups. if (!fully_matched_row_groups.empty()) { PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader( - fully_matched_row_groups, column_indices, &batch_reader)); + fully_matched_row_groups, column_indices, &batch_reader_)); + } else { + batch_reader_.reset(); } - // Collect all byte ranges for a single PreBufferRanges call. - // Page-filtered RGs: only matching page ranges (from ComputePageRanges). - // Fully-matched RGs: entire column chunk ranges. - // - // When there are no page-filtered RGs, skip the manual PreBufferRanges entirely: - // GetRecordBatchReader has already issued PreBuffer internally (driven by - // ArrowReaderProperties::pre_buffer=true), and a second PreBufferRanges call here - // would tear down and rebuild cached_source_, redundantly re-issuing the same IO - // on remote filesystems. The manual path is only needed to merge page-level ranges - // with column-chunk ranges into a single PreBuffer covering both kinds of RGs. - bool has_page_filtered_row_groups = false; - for (const auto& target_row_group : target_row_groups_) { - if (target_row_group.is_page_filtered) { - has_page_filtered_row_groups = true; - break; - } + // When page-filtered RGs exist, issue a single PreBuffer covering both kinds. + // Otherwise GetRecordBatchReader already issued PreBuffer internally. + if (has_page_filtered) { + auto all_ranges = CollectPreBufferRanges(fully_matched_row_groups, column_indices); + DispatchPreBuffer(std::move(all_ranges)); } - if (has_page_filtered_row_groups) { - std::vector<::arrow::io::ReadRange> all_ranges = std::move(page_filtered_byte_ranges); - - // Fully-matched row groups: add entire column chunk ranges - // The correct calculation follows Arrow's ColumnChunkMetaData::file_range(): - // - col_start = data_page_offset (or dictionary_page_offset if present and lower) - // - col_length = total_compressed_size (includes all pages: dictionary + data) - auto file_metadata = file_reader_->parquet_reader()->metadata(); - for (int32_t rg_idx : fully_matched_row_groups) { - auto rg_metadata = file_metadata->RowGroup(rg_idx); - for (int32_t col_idx : column_indices) { - auto col_chunk = rg_metadata->ColumnChunk(col_idx); - int64_t offset = col_chunk->data_page_offset(); - if (col_chunk->has_dictionary_page() && - col_chunk->dictionary_page_offset() > 0 && - offset > col_chunk->dictionary_page_offset()) { - offset = col_chunk->dictionary_page_offset(); - } - int64_t size = col_chunk->total_compressed_size(); - all_ranges.push_back({offset, size}); - } - } - - const auto& cache_opts = file_reader_->properties().cache_options(); - ::arrow::io::IOContext io_ctx(pool_); - // Merge overlapping ranges before calling PreBufferRanges, which rejects overlapping - // ranges. - auto merged_ranges = MergeOverlappingRanges(std::move(all_ranges)); - // PreBuffer is an optimization - if it fails (e.g., IO error during testing), - // continue without pre-buffering. Subsequent reads will fetch data on-demand. - try { - file_reader_->parquet_reader()->PreBufferRanges(merged_ranges, io_ctx, cache_opts); - // Track for cleanup on destruction - prebuffered_ranges_ = std::move(merged_ranges); - } catch (const std::exception& e) { - // Pre-buffering failed, clear ranges to indicate no pre-buffered data available. - // Reading will fall back to on-demand I/O. - prebuffered_ranges_.clear(); - } - } - batch_reader_ = std::move(batch_reader); + // Reset read state. if (target_row_groups_.empty()) { next_row_to_read_ = num_rows_; } else { diff --git a/src/paimon/format/parquet/file_reader_wrapper.h b/src/paimon/format/parquet/file_reader_wrapper.h index a0e2681e1..2e3a784ac 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.h +++ b/src/paimon/format/parquet/file_reader_wrapper.h @@ -140,7 +140,6 @@ class FileReaderWrapper { const std::vector>& all_row_group_ranges, uint64_t num_rows, ::arrow::MemoryPool* pool, int64_t batch_size); - std::unique_ptr<::parquet::arrow::FileReader> file_reader_; std::unique_ptr batch_reader_; @@ -177,6 +176,26 @@ class FileReaderWrapper { /// Wait for all pending PreBuffer operations to complete. void WaitForPendingPreBuffer(); + + /// Advance current_row_group_idx_ to the next row group and update next_row_to_read_. + void AdvanceToNextRowGroup(); + + /// Read next batch from a page-filtered row group. Returns nullptr when the RG is exhausted. + Result> NextPageFiltered(); + + /// Read next batch from the fully-matched batch_reader_. Returns nullptr when exhausted. + Result> NextFullyMatched(); + + /// Build page_filtered_read_schema_ from the given column indices. No-op if already built. + Status BuildPageFilteredSchema(const std::vector& column_indices); + + /// Collect all byte ranges that need pre-buffering (page-filtered + fully-matched). + std::vector<::arrow::io::ReadRange> CollectPreBufferRanges( + const std::vector& fully_matched_row_groups, + const std::vector& column_indices); + + /// Dispatch a single PreBufferRanges call with merged ranges. + void DispatchPreBuffer(std::vector<::arrow::io::ReadRange> ranges); }; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index d2f825da7..41d455520 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -183,7 +183,6 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { uint64_t read_rows_ = 0; uint64_t read_batch_count_ = 0; - }; } // namespace paimon::parquet From 853c651f75a577cce560ccff625ba0f54c4a9507 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Wed, 3 Jun 2026 15:55:51 +0800 Subject: [PATCH 10/14] refractor: separate functions in PageFilteredRowGroupReader into several functions --- .../format/parquet/file_reader_wrapper.cpp | 11 +- .../page_filtered_row_group_reader.cpp | 209 ++++++++---------- .../parquet/page_filtered_row_group_reader.h | 44 ++-- .../page_filtered_row_group_reader_test.cpp | 18 +- 4 files changed, 134 insertions(+), 148 deletions(-) diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index 963216a1a..a59469681 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -217,18 +217,18 @@ Result> FileReaderWrapper::NextPageFiltered( // Construct the per-RG streaming reader on demand. if (!current_page_filtered_reader_) { - const auto& row_ranges = target_row_groups_[current_row_group_idx_].row_ranges; + const auto& target_rg = target_row_groups_[current_row_group_idx_]; auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( - file_reader_->parquet_reader(), rg_id, row_ranges, target_column_indices_); + file_reader_->parquet_reader(), target_rg, target_column_indices_); bool pre_buffered = !prebuffered_ranges_.empty(); int64_t max_chunksize = batch_size_ > 0 ? batch_size_ : std::numeric_limits::max(); PAIMON_ASSIGN_OR_RAISE( current_page_filtered_reader_, PageFilteredRowGroupReader::ReadFilteredRowGroup( - file_reader_->parquet_reader(), rg_id, row_ranges, target_column_indices_, + file_reader_->parquet_reader(), target_rg, target_column_indices_, page_filtered_read_schema_, pool_, file_reader_->properties().cache_options(), pre_buffered, page_ranges, max_chunksize)); - current_filtered_row_ranges_ = row_ranges; + current_filtered_row_ranges_ = target_rg.row_ranges; current_filtered_rg_start_ = all_row_group_ranges_[rg_id].first; filtered_global_offset_ = 0; } @@ -360,8 +360,7 @@ std::vector<::arrow::io::ReadRange> FileReaderWrapper::CollectPreBufferRanges( for (const auto& trg : target_row_groups_) { if (trg.is_page_filtered) { auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( - file_reader_->parquet_reader(), trg.row_group_index, trg.row_ranges, - column_indices); + file_reader_->parquet_reader(), trg, column_indices); ranges.insert(ranges.end(), std::make_move_iterator(page_ranges.begin()), std::make_move_iterator(page_ranges.end())); } diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index fa311eb31..01090d632 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -60,34 +60,30 @@ class TableRecordBatchReader : public arrow::RecordBatchReader { } // namespace +std::pair PageFilteredRowGroupReader::GetPageRowRange( + const std::vector<::parquet::PageLocation>& page_locations, int32_t page_idx, + int64_t row_group_row_count) { + int64_t first_row = page_locations[page_idx].first_row_index; + int64_t last_row = (page_idx + 1 < static_cast(page_locations.size())) + ? page_locations[page_idx + 1].first_row_index - 1 + : row_group_row_count - 1; + return {first_row, last_row}; +} + std::function PageFilteredRowGroupReader::MakePageFilter( const RowRanges& row_ranges, const std::shared_ptr<::parquet::OffsetIndex>& offset_index, int64_t row_group_row_count) { - // Shared counter tracks the current page index as the callback is invoked - // in order for each data page. auto page_counter = std::make_shared(0); - const auto& page_locations = offset_index->page_locations(); auto num_pages = static_cast(page_locations.size()); return [row_ranges, page_locations, num_pages, row_group_row_count, page_counter](const ::parquet::DataPageStats& /*stats*/) -> bool { int32_t page_idx = (*page_counter)++; - if (page_idx >= num_pages) { - // Safety: if more pages than expected, don't skip return false; } - - int64_t first_row = page_locations[page_idx].first_row_index; - int64_t last_row; - if (page_idx + 1 < num_pages) { - last_row = page_locations[page_idx + 1].first_row_index - 1; - } else { - last_row = row_group_row_count - 1; - } - - // Return true to skip this page if it has no overlap with RowRanges + auto [first_row, last_row] = GetPageRowRange(page_locations, page_idx, row_group_row_count); return !row_ranges.IsOverlapping(first_row, last_row); }; } @@ -103,10 +99,7 @@ std::pair PageFilteredRowGroupReader::ComputeCompressedRowRa int64_t compressed_offset = 0; for (int32_t page_idx = 0; page_idx < num_pages; ++page_idx) { - int64_t page_from = page_locations[page_idx].first_row_index; - int64_t page_to = (page_idx + 1 < num_pages) - ? page_locations[page_idx + 1].first_row_index - 1 - : row_group_row_count - 1; + auto [page_from, page_to] = GetPageRowRange(page_locations, page_idx, row_group_row_count); int64_t page_size = page_to - page_from + 1; if (!original_ranges.IsOverlapping(page_from, page_to)) { @@ -114,19 +107,13 @@ std::pair PageFilteredRowGroupReader::ComputeCompressedRowRa continue; } - // Page is kept. Map overlapping original ranges to compressed row space. for (const auto& range : ranges) { - if (range.to < page_from) { - continue; - } - if (range.from > page_to) { - break; // Ranges are sorted - } + if (range.to < page_from) continue; + if (range.from > page_to) break; int64_t overlap_from = std::max(range.from, page_from); int64_t overlap_to = std::min(range.to, page_to); - int64_t c_from = compressed_offset + (overlap_from - page_from); - int64_t c_to = compressed_offset + (overlap_to - page_from); - compressed.Add(RowRanges::Range(c_from, c_to)); + compressed.Add(RowRanges::Range(compressed_offset + (overlap_from - page_from), + compressed_offset + (overlap_to - page_from))); } compressed_offset += page_size; @@ -135,6 +122,38 @@ std::pair PageFilteredRowGroupReader::ComputeCompressedRowRa return {compressed, compressed_offset}; } +Status PageFilteredRowGroupReader::ExecuteSkipReadPattern( + ::parquet::internal::RecordReader* record_reader, const RowRanges& ranges, + int64_t total_row_count, int32_t row_group_index, int32_t column_index) { + int64_t current_row = 0; + for (const auto& range : ranges.GetRanges()) { + if (range.from > current_row) { + int64_t to_skip = range.from - current_row; + int64_t skipped = record_reader->SkipRecords(to_skip); + if (skipped != to_skip) { + return Status::Invalid(fmt::format( + "PageFilteredRowGroupReader: expected to skip {} records but skipped {} " + "(row_group={}, column={})", + to_skip, skipped, row_group_index, column_index)); + } + current_row = range.from; + } + int64_t to_read = range.Count(); + int64_t read = record_reader->ReadRecords(to_read); + if (read != to_read) { + return Status::Invalid( + fmt::format("PageFilteredRowGroupReader: expected to read {} records but read {} " + "(row_group={}, column={}, range=[{},{}])", + to_read, read, row_group_index, column_index, range.from, range.to)); + } + current_row += to_read; + } + if (current_row < total_row_count) { + record_reader->SkipRecords(total_row_count - current_row); + } + return Status::OK(); +} + Result> PageFilteredRowGroupReader::ReadFilteredColumn( const std::shared_ptr<::parquet::RowGroupReader>& row_group_reader, ::parquet::ParquetFileReader* parquet_reader, @@ -173,42 +192,9 @@ Result> PageFilteredRowGroupReader::ReadFil auto record_reader = ::parquet::internal::RecordReader::Make(col_descriptor, leaf_info, pool); record_reader->SetPageReader(std::move(page_reader)); - // Execute skip/read pattern based on effective RowRanges - const auto& ranges = effective_ranges.GetRanges(); - int64_t current_row = 0; - - for (const auto& range : ranges) { - // Skip rows before this range - if (range.from > current_row) { - int64_t to_skip = range.from - current_row; - int64_t skipped = record_reader->SkipRecords(to_skip); - if (skipped != to_skip) { - return Status::Invalid(fmt::format( - "PageFilteredRowGroupReader: expected to skip {} records but skipped {} " - "(row_group={}, column={})", - to_skip, skipped, row_group_index, column_index)); - } - current_row = range.from; - } - - // Read rows in this range - int64_t to_read = range.Count(); - int64_t read = record_reader->ReadRecords(to_read); - if (read != to_read) { - return Status::Invalid( - fmt::format("PageFilteredRowGroupReader: expected to read {} records but read {} " - "(row_group={}, column={}, range=[{},{}])", - to_read, read, row_group_index, column_index, range.from, range.to)); - } - current_row += to_read; - } - - // Skip remaining rows after the last range to properly finalize the reader - if (current_row < effective_row_count) { - record_reader->SkipRecords(effective_row_count - current_row); - } + PAIMON_RETURN_NOT_OK(ExecuteSkipReadPattern( + record_reader.get(), effective_ranges, effective_row_count, row_group_index, column_index)); - // Transfer to Arrow ChunkedArray std::shared_ptr chunked_array; PAIMON_RETURN_NOT_OK_FROM_ARROW(::parquet::arrow::TransferColumnData( record_reader.get(), field, col_descriptor, pool, &chunked_array)); @@ -216,12 +202,38 @@ Result> PageFilteredRowGroupReader::ReadFil return chunked_array; } -Result> PageFilteredRowGroupReader::ReadFilteredRowGroup( +Status PageFilteredRowGroupReader::WaitForPreBuffer( ::parquet::ParquetFileReader* parquet_reader, int32_t row_group_index, - const RowRanges& row_ranges, const std::vector& column_indices, - const std::shared_ptr& arrow_schema, ::arrow::MemoryPool* pool, + const std::vector& column_indices, ::arrow::MemoryPool* pool, const ::arrow::io::CacheOptions& cache_options, bool pre_buffered, + const std::vector<::arrow::io::ReadRange>& page_ranges) { + std::vector rg_vec = {row_group_index}; + std::vector col_vec(column_indices.begin(), column_indices.end()); + if (!pre_buffered) { + ::arrow::io::IOContext io_ctx(pool); + parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options); + } + if (!page_ranges.empty()) { + auto status = parquet_reader->WhenBufferedRanges(page_ranges).status(); + if (!status.ok()) { + ::arrow::io::IOContext io_ctx(pool); + parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options); + PAIMON_RETURN_NOT_OK_FROM_ARROW(parquet_reader->WhenBuffered(rg_vec, col_vec).status()); + } + } else { + PAIMON_RETURN_NOT_OK_FROM_ARROW(parquet_reader->WhenBuffered(rg_vec, col_vec).status()); + } + return Status::OK(); +} + +Result> PageFilteredRowGroupReader::ReadFilteredRowGroup( + ::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group, + const std::vector& column_indices, const std::shared_ptr& arrow_schema, + ::arrow::MemoryPool* pool, const ::arrow::io::CacheOptions& cache_options, bool pre_buffered, const std::vector<::arrow::io::ReadRange>& page_ranges, int64_t max_chunksize) { + const auto& row_ranges = target_row_group.row_ranges; + int32_t row_group_index = target_row_group.row_group_index; + if (row_ranges.IsEmpty()) { PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr empty_table, arrow::Table::MakeEmpty(arrow_schema, pool)); @@ -230,42 +242,17 @@ Result> PageFilteredRowGroupReader::Re int64_t expected_rows = row_ranges.RowCount(); - // Wait for pre-buffered data to be ready. - // When pre_buffered=true, PreBuffer was already called in PrepareForReading() covering - // all row groups in parallel. We only need to wait. Calling PreBuffer again would create - // a new cached_source_, discarding the parallel I/O already in progress. - { - std::vector rg_vec = {row_group_index}; - std::vector col_vec(column_indices.begin(), column_indices.end()); - if (!pre_buffered) { - ::arrow::io::IOContext io_ctx(pool); - parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options); - } - if (!page_ranges.empty()) { - // Page-level PreBuffer: wait on specific page byte ranges - // If pre-buffering failed (e.g., IO error during testing), fall back to on-demand read - auto status = parquet_reader->WhenBufferedRanges(page_ranges).status(); - if (!status.ok()) { - // Pre-buffering failed, fall back to row-group level PreBuffer - ::arrow::io::IOContext io_ctx(pool); - parquet_reader->PreBuffer(rg_vec, col_vec, io_ctx, cache_options); - PAIMON_RETURN_NOT_OK_FROM_ARROW( - parquet_reader->WhenBuffered(rg_vec, col_vec).status()); - } - } else { - PAIMON_RETURN_NOT_OK_FROM_ARROW(parquet_reader->WhenBuffered(rg_vec, col_vec).status()); - } - } + PAIMON_RETURN_NOT_OK(WaitForPreBuffer(parquet_reader, row_group_index, column_indices, pool, + cache_options, pre_buffered, page_ranges)); - // Open row group and page index once, share across all columns auto row_group_reader = parquet_reader->RowGroup(row_group_index); auto rg_metadata = parquet_reader->metadata()->RowGroup(row_group_index); int64_t row_group_row_count = rg_metadata->num_rows(); - auto page_index_reader = parquet_reader->GetPageIndexReader(); // reuse RowGroupPageIndexReader for multiple columns in the same row group to avoid redundant // metadata reads std::shared_ptr<::parquet::RowGroupPageIndexReader> rg_page_index_reader; + auto page_index_reader = parquet_reader->GetPageIndexReader(); if (page_index_reader) { rg_page_index_reader = page_index_reader->RowGroup(row_group_index); } @@ -276,7 +263,7 @@ Result> PageFilteredRowGroupReader::Re for (size_t i = 0; i < column_indices.size(); ++i) { PAIMON_ASSIGN_OR_RAISE( - std::shared_ptr chunked_array, + auto chunked_array, ReadFilteredColumn(row_group_reader, parquet_reader, rg_page_index_reader, row_group_index, column_indices[i], row_ranges, arrow_schema->field(static_cast(i)), row_group_row_count, @@ -292,18 +279,16 @@ Result> PageFilteredRowGroupReader::Re columns.push_back(std::move(chunked_array)); } - // Wrap columns in a Table and stream zero-copy-sliced batches via TableBatchReader. - // For multi-chunk variable-length columns this avoids the deep copy of CombineChunks: - // each emitted batch contains at most max_chunksize rows (capped further by the - // smallest remaining chunk across columns), and every column's Array is a zero-copy - // Slice of its underlying chunk. auto table = arrow::Table::Make(arrow_schema, std::move(columns), expected_rows); return std::make_unique(std::move(table), max_chunksize); } std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRanges( - ::parquet::ParquetFileReader* parquet_reader, int32_t row_group_index, - const RowRanges& row_ranges, const std::vector& column_indices) { + ::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group, + const std::vector& column_indices) { + int32_t row_group_index = target_row_group.row_group_index; + const auto& row_ranges = target_row_group.row_ranges; + std::vector<::arrow::io::ReadRange> ranges; auto file_metadata = parquet_reader->metadata(); auto rg_metadata = file_metadata->RowGroup(row_group_index); @@ -346,23 +331,17 @@ std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRange auto num_pages = static_cast(page_locations.size()); for (int32_t page_idx = 0; page_idx < num_pages; ++page_idx) { - int64_t first_row = page_locations[page_idx].first_row_index; - int64_t last_row = (page_idx + 1 < num_pages) - ? page_locations[page_idx + 1].first_row_index - 1 - : row_group_row_count - 1; + auto [first_row, last_row] = + GetPageRowRange(page_locations, page_idx, row_group_row_count); if (!row_ranges.IsOverlapping(first_row, last_row)) { - continue; // Page doesn't overlap with target rows + continue; } - // Compute page byte range int64_t page_offset = page_locations[page_idx].offset; - int64_t page_size; - if (page_idx + 1 < num_pages) { - page_size = page_locations[page_idx + 1].offset - page_offset; - } else { - page_size = chunk_end - page_offset; - } + int64_t page_size = (page_idx + 1 < num_pages) + ? page_locations[page_idx + 1].offset - page_offset + : chunk_end - page_offset; ranges.push_back({page_offset, page_size}); } } diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.h b/src/paimon/format/parquet/page_filtered_row_group_reader.h index 24c82e253..7294180cc 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.h +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.h @@ -45,8 +45,7 @@ class PageFilteredRowGroupReader { /// Read a row group with page-level filtering. /// @param parquet_reader The underlying ParquetFileReader - /// @param row_group_index Row group to read - /// @param row_ranges Matching row ranges within this row group + /// @param target_row_group Target row group with index and row ranges /// @param column_indices Leaf column indices to read /// @param arrow_schema The target Arrow schema for output columns /// @param pool Memory pool @@ -54,16 +53,11 @@ class PageFilteredRowGroupReader { /// @param pre_buffered If true, assumes PreBuffer was already called externally /// and only waits via WhenBuffered (no redundant PreBuffer). /// @param page_ranges If non-empty, wait via WhenBufferedRanges instead of WhenBuffered - /// @param max_chunksize Per-batch row cap for the returned reader, mirroring Arrow's - /// TableBatchReader::set_chunksize. Each batch yields at most this many rows; - /// actual size may be smaller when an underlying ChunkedArray's chunk boundary - /// is reached first (zero-copy slice). - /// @return A RecordBatchReader streaming the filtered rows. Multi-chunk variable-length - /// columns are emitted as multiple zero-copy-sliced batches along chunk boundaries - /// instead of being concatenated, avoiding the deep copy of CombineChunks. + /// @param max_chunksize Per-batch row cap for the returned reader. + /// @return A RecordBatchReader streaming the filtered rows. static Result> ReadFilteredRowGroup( - ::parquet::ParquetFileReader* parquet_reader, int32_t row_group_index, - const RowRanges& row_ranges, const std::vector& column_indices, + ::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group, + const std::vector& column_indices, const std::shared_ptr& arrow_schema, ::arrow::MemoryPool* pool, const ::arrow::io::CacheOptions& cache_options = ::arrow::io::CacheOptions::Defaults(), bool pre_buffered = false, const std::vector<::arrow::io::ReadRange>& page_ranges = {}, @@ -74,19 +68,35 @@ class PageFilteredRowGroupReader { /// Includes dictionary pages unconditionally. /// Falls back to entire column chunk range if OffsetIndex is unavailable. static std::vector<::arrow::io::ReadRange> ComputePageRanges( - ::parquet::ParquetFileReader* parquet_reader, int32_t row_group_index, - const RowRanges& row_ranges, const std::vector& column_indices); + ::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group, + const std::vector& column_indices); private: + /// Get the [first_row, last_row] range of a page given page locations. + static std::pair GetPageRowRange( + const std::vector<::parquet::PageLocation>& page_locations, int32_t page_idx, + int64_t row_group_row_count); + + /// Wait for pre-buffered data to become available before reading. + static Status WaitForPreBuffer(::parquet::ParquetFileReader* parquet_reader, + int32_t row_group_index, + const std::vector& column_indices, + ::arrow::MemoryPool* pool, + const ::arrow::io::CacheOptions& cache_options, + bool pre_buffered, + const std::vector<::arrow::io::ReadRange>& page_ranges); + + /// Execute the skip/read pattern on a RecordReader based on RowRanges. + static Status ExecuteSkipReadPattern(::parquet::internal::RecordReader* record_reader, + const RowRanges& ranges, int64_t total_row_count, + int32_t row_group_index, int32_t column_index); + /// Create a data_page_filter callback for a column based on RowRanges + OffsetIndex. - /// Returns true (skip) if the page's row range has no overlap with RowRanges. static std::function MakePageFilter( const RowRanges& row_ranges, const std::shared_ptr<::parquet::OffsetIndex>& offset_index, int64_t row_group_row_count); /// Read a single column using skip/read pattern driven by RowRanges. - /// When OffsetIndex is available, uses data_page_filter for I/O-level page skipping - /// and compressed RowRanges for decode-level row skipping. static Result> ReadFilteredColumn( const std::shared_ptr<::parquet::RowGroupReader>& row_group_reader, ::parquet::ParquetFileReader* parquet_reader, @@ -96,8 +106,6 @@ class PageFilteredRowGroupReader { ::arrow::MemoryPool* pool); /// Compute compressed RowRanges after data_page_filter skips non-matching pages. - /// Maps original RowRanges to the compressed row space where skipped pages are removed. - /// @return pair of (compressed RowRanges, compressed total row count) static std::pair ComputeCompressedRowRanges( const RowRanges& original_ranges, const std::shared_ptr<::parquet::OffsetIndex>& offset_index, int64_t row_group_row_count); diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp index bd693730d..604d2a4c3 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp @@ -521,7 +521,7 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesPartialMatch) { row_ranges.Add(RowRanges::Range(50, 59)); auto ranges = PageFilteredRowGroupReader::ComputePageRanges( - parquet_reader.get(), /*row_group_index=*/0, row_ranges, /*column_indices=*/{0}); + parquet_reader.get(), TargetRowGroup(0, true, row_ranges), /*column_indices=*/{0}); // Should have exactly 1 range (page 5 of column 0, no dictionary since disabled) ASSERT_EQ(1, ranges.size()); @@ -544,8 +544,8 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesAllMatch) { RowRanges row_ranges; row_ranges.Add(RowRanges::Range(0, 99)); - auto ranges = - PageFilteredRowGroupReader::ComputePageRanges(parquet_reader.get(), 0, row_ranges, {0}); + auto ranges = PageFilteredRowGroupReader::ComputePageRanges( + parquet_reader.get(), TargetRowGroup(0, true, row_ranges), {0}); // 10 pages, all matching ASSERT_EQ(10, ranges.size()); @@ -568,8 +568,8 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesNoMatch) { RowRanges row_ranges; // empty - auto ranges = - PageFilteredRowGroupReader::ComputePageRanges(parquet_reader.get(), 0, row_ranges, {0}); + auto ranges = PageFilteredRowGroupReader::ComputePageRanges( + parquet_reader.get(), TargetRowGroup(0, true, row_ranges), {0}); ASSERT_EQ(0, ranges.size()); } @@ -589,8 +589,8 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesMultiColumn) { RowRanges row_ranges; row_ranges.Add(RowRanges::Range(50, 59)); - auto ranges = - PageFilteredRowGroupReader::ComputePageRanges(parquet_reader.get(), 0, row_ranges, {0, 1}); + auto ranges = PageFilteredRowGroupReader::ComputePageRanges( + parquet_reader.get(), TargetRowGroup(0, true, row_ranges), {0, 1}); // 1 matching page per column = 2 ranges total ASSERT_EQ(2, ranges.size()); @@ -615,8 +615,8 @@ TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesMultiplePages) { row_ranges.Add(RowRanges::Range(20, 29)); row_ranges.Add(RowRanges::Range(70, 79)); - auto ranges = - PageFilteredRowGroupReader::ComputePageRanges(parquet_reader.get(), 0, row_ranges, {0}); + auto ranges = PageFilteredRowGroupReader::ComputePageRanges( + parquet_reader.get(), TargetRowGroup(0, true, row_ranges), {0}); // 2 matching pages for 1 column ASSERT_EQ(2, ranges.size()); From 3253c65febcdeea2fda851f9a4d3dd4c8edca448 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Wed, 3 Jun 2026 16:35:37 +0800 Subject: [PATCH 11/14] fix: align comments of ApplyReadRanges with actual functionality --- src/paimon/format/parquet/file_reader_wrapper.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/paimon/format/parquet/file_reader_wrapper.h b/src/paimon/format/parquet/file_reader_wrapper.h index 2e3a784ac..503091a94 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.h +++ b/src/paimon/format/parquet/file_reader_wrapper.h @@ -118,7 +118,7 @@ class FileReaderWrapper { const std::vector& column_indices); /// Apply read ranges to the current target_row_groups_, keeping only those - /// whose row-group range overlaps with one of the given read ranges. + /// whose row-group range is equal to one of the given read ranges. /// Resets reader state so that the next Next() call will re-initialize. Status ApplyReadRanges(const std::vector>& read_ranges); From 4d57b9dd40c3b4c1ce570ee4d387cd62943fd329 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Wed, 3 Jun 2026 16:38:36 +0800 Subject: [PATCH 12/14] fix: set default value for TargetRowGroup --- src/paimon/format/parquet/row_ranges.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/paimon/format/parquet/row_ranges.h b/src/paimon/format/parquet/row_ranges.h index 112ac9c86..425f24053 100644 --- a/src/paimon/format/parquet/row_ranges.h +++ b/src/paimon/format/parquet/row_ranges.h @@ -106,8 +106,8 @@ class RowRanges { }; struct TargetRowGroup { - int32_t row_group_index; - bool is_page_filtered; + int32_t row_group_index{-1}; + bool is_page_filtered{false}; // page-filtered row ranges, only valid if is_page_filtered is true. RowRanges row_ranges; From 3fd1ac01d03826377439b03cb0857b2d0ee3443d Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Wed, 3 Jun 2026 16:41:58 +0800 Subject: [PATCH 13/14] fix: avoid unsigned overflow --- src/paimon/format/parquet/file_reader_wrapper.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index a59469681..c89b7ff78 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -160,7 +160,8 @@ void FileReaderWrapper::WaitForPendingPreBuffer() { } void FileReaderWrapper::AdvanceToNextRowGroup() { - if (current_row_group_idx_ >= target_row_groups_.size() - 1) { + // current_row_group_idx_ >= target_row_groups_.size() -1, avoid unsigned overflow + if (current_row_group_idx_ + 1 >= target_row_groups_.size()) { next_row_to_read_ = num_rows_; current_row_group_idx_ = target_row_groups_.size(); } else { From 45c9f5203fc21e3d4f42739287438989f922aa87 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Wed, 3 Jun 2026 17:58:17 +0800 Subject: [PATCH 14/14] refractor: remove unused member variable read_ranges_ in ParquetFileBatchReader --- src/paimon/format/parquet/parquet_file_batch_reader.cpp | 2 -- src/paimon/format/parquet/parquet_file_batch_reader.h | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 5979119c5..95c0614ab 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -65,7 +65,6 @@ ParquetFileBatchReader::ParquetFileBatchReader( arrow_pool_(arrow_pool), input_stream_(std::move(input_stream)), reader_(std::move(reader)), - read_ranges_(reader_->GetAllRowGroupRanges()), metrics_(std::make_shared()), logger_(Logger::GetLogger("ParquetFileBatchReader")) {} @@ -205,7 +204,6 @@ Status ParquetFileBatchReader::SetReadSchema( } } PAIMON_RETURN_NOT_OK(reader_->PrepareForReadingLazy(target_row_groups, column_indices)); - PAIMON_RETURN_NOT_OK(reader_->ApplyReadRanges(read_ranges_)); } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("ParquetFileBatchReader::SetReadSchema") return Status::OK(); diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 41d455520..8dc412c30 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -102,7 +102,6 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { } Status SetReadRanges(const std::vector>& read_ranges) override { - read_ranges_ = read_ranges; return reader_->ApplyReadRanges(read_ranges); } @@ -176,7 +175,6 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { std::unique_ptr reader_; std::shared_ptr read_data_type_; - std::vector> read_ranges_; std::shared_ptr metrics_; std::unique_ptr logger_;