diff --git a/be/src/format/new_parquet/parquet_profile.h b/be/src/format/new_parquet/parquet_profile.h new file mode 100644 index 00000000000000..094769d11c14cb --- /dev/null +++ b/be/src/format/new_parquet/parquet_profile.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/runtime_profile.h" + +namespace doris::parquet { + +struct ParquetColumnReaderProfile { + RuntimeProfile::Counter* reader_read_rows = nullptr; + RuntimeProfile::Counter* reader_skip_rows = nullptr; + RuntimeProfile::Counter* reader_select_rows = nullptr; + RuntimeProfile::Counter* arrow_read_records_time = nullptr; + RuntimeProfile::Counter* materialization_time = nullptr; +}; + +} // namespace doris::parquet diff --git a/be/src/format/new_parquet/parquet_reader.cpp b/be/src/format/new_parquet/parquet_reader.cpp index 637cf3746a1f19..c47eb303aeb93c 100644 --- a/be/src/format/new_parquet/parquet_reader.cpp +++ b/be/src/format/new_parquet/parquet_reader.cpp @@ -105,6 +105,11 @@ ParquetReader::~ParquetReader() = default; Status ParquetReader::init(RuntimeState* state) { RETURN_IF_ERROR(reader::FileReader::init(state)); + if (_profile != nullptr) { + COUNTER_UPDATE(_parquet_profile.file_reader_create_time, + _reader_statistics.file_reader_create_time); + COUNTER_UPDATE(_parquet_profile.open_file_num, _reader_statistics.open_file_num); + } _state = std::make_unique(); _state->enable_bloom_filter = state != nullptr && state->query_options().enable_parquet_filter_by_bloom_filter; @@ -207,8 +212,33 @@ Status ParquetReader::open(std::unique_ptr& request) { COUNTER_UPDATE(_parquet_profile.page_index_read_calls, pruning_stats.page_index_read_calls); COUNTER_UPDATE(_parquet_profile.bloom_filter_read_time, pruning_stats.bloom_filter_read_time); + COUNTER_UPDATE(_parquet_profile.row_group_filter_time, pruning_stats.row_group_filter_time); + COUNTER_UPDATE(_parquet_profile.page_index_filter_time, + pruning_stats.page_index_filter_time); + COUNTER_UPDATE(_parquet_profile.read_page_index_time, pruning_stats.read_page_index_time); } _state->scan_plan = row_group_plan; + _state->scheduler.set_page_skip_profile( + {.skipped_pages = _parquet_profile.pages_skipped_by_data_page_filter, + .skipped_bytes = _parquet_profile.data_page_filter_skip_bytes}); + _state->scheduler.set_scan_profile({ + .raw_rows_read = _parquet_profile.raw_rows_read, + .selected_rows = _parquet_profile.selected_rows, + .rows_filtered_by_conjunct = _parquet_profile.rows_filtered_by_conjunct, + .total_batches = _parquet_profile.total_batches, + .empty_selection_batches = _parquet_profile.empty_selection_batches, + .range_gap_skipped_rows = _parquet_profile.range_gap_skipped_rows, + .column_read_time = _parquet_profile.column_read_time, + .predicate_filter_time = _parquet_profile.predicate_filter_time, + .column_reader_profile = + { + .reader_read_rows = _parquet_profile.reader_read_rows, + .reader_skip_rows = _parquet_profile.reader_skip_rows, + .reader_select_rows = _parquet_profile.reader_select_rows, + .arrow_read_records_time = _parquet_profile.arrow_read_records_time, + .materialization_time = _parquet_profile.materialization_time, + }, + }); _state->scheduler.set_plan(std::move(row_group_plan)); _eof = _state->scheduler.empty(); return Status::OK(); @@ -334,6 +364,30 @@ void ParquetReader::_init_profile() { _profile, "FilteredRowsByGroup", TUnit::UNIT, parquet_profile, 1); _parquet_profile.filtered_page_rows = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "FilteredRowsByPage", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.pages_skipped_by_data_page_filter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PagesSkippedByDataPageFilter", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.data_page_filter_skip_bytes = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "DataPageFilterSkipBytes", TUnit::BYTES, parquet_profile, 1); + _parquet_profile.selected_rows = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "SelectedRows", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.rows_filtered_by_conjunct = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "RowsFilteredByConjunct", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.total_batches = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "TotalBatches", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.empty_selection_batches = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "EmptySelectionBatches", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.range_gap_skipped_rows = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "RangeGapSkippedRows", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.reader_read_rows = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "ReaderReadRows", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.reader_skip_rows = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "ReaderSkipRows", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.reader_select_rows = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "ReaderSelectRows", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.arrow_read_records_time = + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ArrowReadRecordsTime", parquet_profile, 1); + _parquet_profile.materialization_time = + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "MaterializationTime", parquet_profile, 1); _parquet_profile.lazy_read_filtered_rows = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "FilteredRowsByLazyRead", TUnit::UNIT, parquet_profile, 1); _parquet_profile.filtered_bytes = ADD_CHILD_COUNTER_WITH_LEVEL( diff --git a/be/src/format/new_parquet/parquet_reader.h b/be/src/format/new_parquet/parquet_reader.h index 61ad9b0ae4eeed..5eb1909b599bcf 100644 --- a/be/src/format/new_parquet/parquet_reader.h +++ b/be/src/format/new_parquet/parquet_reader.h @@ -78,6 +78,18 @@ class ParquetReader : public reader::FileReader { RuntimeProfile::Counter* selected_row_ranges = nullptr; RuntimeProfile::Counter* filtered_group_rows = nullptr; RuntimeProfile::Counter* filtered_page_rows = nullptr; + RuntimeProfile::Counter* pages_skipped_by_data_page_filter = nullptr; + RuntimeProfile::Counter* data_page_filter_skip_bytes = nullptr; + RuntimeProfile::Counter* selected_rows = nullptr; + RuntimeProfile::Counter* rows_filtered_by_conjunct = nullptr; + RuntimeProfile::Counter* total_batches = nullptr; + RuntimeProfile::Counter* empty_selection_batches = nullptr; + RuntimeProfile::Counter* range_gap_skipped_rows = nullptr; + RuntimeProfile::Counter* reader_read_rows = nullptr; + RuntimeProfile::Counter* reader_skip_rows = nullptr; + RuntimeProfile::Counter* reader_select_rows = nullptr; + RuntimeProfile::Counter* arrow_read_records_time = nullptr; + RuntimeProfile::Counter* materialization_time = nullptr; RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr; RuntimeProfile::Counter* filtered_bytes = nullptr; RuntimeProfile::Counter* raw_rows_read = nullptr; diff --git a/be/src/format/new_parquet/parquet_scan.cpp b/be/src/format/new_parquet/parquet_scan.cpp index 76d03ad39a17d0..bd27a18f4d8748 100644 --- a/be/src/format/new_parquet/parquet_scan.cpp +++ b/be/src/format/new_parquet/parquet_scan.cpp @@ -123,7 +123,8 @@ Status plan_parquet_row_groups(const ::parquet::FileMetaData& metadata, row_group_plan.row_group_rows = row_group_rows; RETURN_IF_ERROR(select_row_group_ranges_by_page_index( file_reader, file_schema, request, row_group_idx, row_group_rows, - &row_group_plan.selected_ranges, &plan->pruning_stats)); + &row_group_plan.selected_ranges, &row_group_plan.page_skip_plans, + &plan->pruning_stats)); if (row_group_plan.selected_ranges.empty()) { continue; } @@ -283,8 +284,10 @@ Status ParquetScanScheduler::open_next_row_group( _current_predicate_columns.clear(); _current_non_predicate_columns.clear(); - ParquetColumnReaderFactory column_reader_factory(_current_row_group, - file_context.schema->num_columns()); + ParquetColumnReaderFactory column_reader_factory( + _current_row_group, file_context.schema->num_columns(), + &row_group_plan.page_skip_plans, _page_skip_profile, + _scan_profile.column_reader_profile); for (const auto& col : request.predicate_columns) { const auto local_id = col.field_id(); if (local_id == ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID) { @@ -327,6 +330,9 @@ Status ParquetScanScheduler::skip_current_row_group_rows(int64_t rows) { if (rows == 0) { return Status::OK(); } + if (_scan_profile.range_gap_skipped_rows != nullptr) { + COUNTER_UPDATE(_scan_profile.range_gap_skipped_rows, rows); + } for (const auto& column_reader : _current_predicate_columns | std::views::values) { RETURN_IF_ERROR(column_reader->skip(rows)); } @@ -356,21 +362,37 @@ Status ParquetScanScheduler::read_filter_columns(int64_t batch_rows, << " " << type_to_string(column_reader->type()->get_primitive_type()) << " " << column_reader->name() << " " << fid << " " << block_position; int64_t column_rows = 0; - RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows)); + { + SCOPED_TIMER(_scan_profile.column_read_time); + RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows)); + } if (column_rows != batch_rows) { return Status::Corruption("Parquet filter column {} returned {} rows, expected {} rows", column_reader->name(), column_rows, batch_rows); } file_block->replace_by_position(block_position, std::move(column)); } + if (_scan_profile.predicate_filter_time == nullptr) { + return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows); + } + SCOPED_TIMER(_scan_profile.predicate_filter_time); return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows); } Status ParquetScanScheduler::read_current_row_group_batch(int64_t batch_rows, const reader::FileScanRequest& request, Block* file_block, size_t* rows) { + if (_scan_profile.total_batches != nullptr) { + COUNTER_UPDATE(_scan_profile.total_batches, 1); + } + if (_scan_profile.raw_rows_read != nullptr) { + COUNTER_UPDATE(_scan_profile.raw_rows_read, batch_rows); + } if (_current_predicate_columns.empty() && _current_non_predicate_columns.empty()) { *rows = static_cast(batch_rows); + if (_scan_profile.selected_rows != nullptr) { + COUNTER_UPDATE(_scan_profile.selected_rows, batch_rows); + } return Status::OK(); } SelectionVector selection; @@ -380,6 +402,15 @@ Status ParquetScanScheduler::read_current_row_group_batch(int64_t batch_rows, read_filter_columns(batch_rows, request, file_block, &selection, &selected_rows)); const bool need_filter_output = selected_rows != batch_rows; + if (_scan_profile.selected_rows != nullptr) { + COUNTER_UPDATE(_scan_profile.selected_rows, selected_rows); + } + if (_scan_profile.rows_filtered_by_conjunct != nullptr) { + COUNTER_UPDATE(_scan_profile.rows_filtered_by_conjunct, batch_rows - selected_rows); + } + if (selected_rows == 0 && _scan_profile.empty_selection_batches != nullptr) { + COUNTER_UPDATE(_scan_profile.empty_selection_batches, 1); + } if (need_filter_output) { IColumn::Filter output_filter = selection_to_filter(selection, selected_rows, batch_rows); for (const auto& col : request.predicate_columns) { @@ -392,33 +423,36 @@ Status ParquetScanScheduler::read_current_row_group_batch(int64_t batch_rows, } } - for (const auto& [fid, column_reader] : _current_non_predicate_columns) { - auto position_it = request.local_positions.find(reader::LocalColumnId(fid)); - DORIS_CHECK(position_it != request.local_positions.end()); - const auto block_position = position_it->second.value(); - auto column_guard = file_block->mutate_column_scoped(block_position); - auto& col = column_guard.mutable_column(); - DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(), - column_reader->type()->get_primitive_type()) - << type_to_string( - file_block->get_by_position(block_position).type->get_primitive_type()) - << " " << type_to_string(column_reader->type()->get_primitive_type()) << " " - << column_reader->name() << " " << fid << " " << block_position; - if (need_filter_output) { - [[maybe_unused]] auto old_size = col->size(); - RETURN_IF_ERROR(column_reader->select(selection, selected_rows, batch_rows, col)); - if (col->size() != old_size + selected_rows) { - return Status::Corruption( - "Parquet selected output column {} returned {} rows, expected {} rows", - column_reader->name(), col->size(), old_size + selected_rows); - } - } else { - int64_t column_rows = 0; - RETURN_IF_ERROR(column_reader->read(batch_rows, col, &column_rows)); - if (column_rows != batch_rows) { - return Status::Corruption( - "Parquet output column {} returned {} rows, expected {} rows", - column_reader->name(), column_rows, batch_rows); + { + SCOPED_TIMER(_scan_profile.column_read_time); + for (const auto& [fid, column_reader] : _current_non_predicate_columns) { + auto position_it = request.local_positions.find(reader::LocalColumnId(fid)); + DORIS_CHECK(position_it != request.local_positions.end()); + const auto block_position = position_it->second.value(); + auto column_guard = file_block->mutate_column_scoped(block_position); + auto& col = column_guard.mutable_column(); + DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(), + column_reader->type()->get_primitive_type()) + << type_to_string(file_block->get_by_position(block_position) + .type->get_primitive_type()) + << " " << type_to_string(column_reader->type()->get_primitive_type()) << " " + << column_reader->name() << " " << fid << " " << block_position; + if (need_filter_output) { + [[maybe_unused]] auto old_size = col->size(); + RETURN_IF_ERROR(column_reader->select(selection, selected_rows, batch_rows, col)); + if (col->size() != old_size + selected_rows) { + return Status::Corruption( + "Parquet selected output column {} returned {} rows, expected {} rows", + column_reader->name(), col->size(), old_size + selected_rows); + } + } else { + int64_t column_rows = 0; + RETURN_IF_ERROR(column_reader->read(batch_rows, col, &column_rows)); + if (column_rows != batch_rows) { + return Status::Corruption( + "Parquet output column {} returned {} rows, expected {} rows", + column_reader->name(), column_rows, batch_rows); + } } } } diff --git a/be/src/format/new_parquet/parquet_scan.h b/be/src/format/new_parquet/parquet_scan.h index eadc14367e5ff0..acb8e22eee9fa1 100644 --- a/be/src/format/new_parquet/parquet_scan.h +++ b/be/src/format/new_parquet/parquet_scan.h @@ -19,15 +19,18 @@ #include #include +#include #include #include #include "common/status.h" #include "core/column/column.h" +#include "format/new_parquet/parquet_profile.h" #include "format/new_parquet/parquet_statistics.h" #include "format/new_parquet/reader/column_reader.h" #include "format/new_parquet/selection_vector.h" #include "format/reader/file_reader.h" +#include "runtime/runtime_profile.h" namespace parquet { class FileMetaData; @@ -59,6 +62,7 @@ struct RowGroupReadPlan { int64_t first_file_row = 0; int64_t row_group_rows = 0; std::vector selected_ranges; + std::map page_skip_plans; }; struct RowGroupScanPlan { @@ -66,6 +70,18 @@ struct RowGroupScanPlan { ParquetPruningStats pruning_stats; }; +struct ParquetScanProfile { + RuntimeProfile::Counter* raw_rows_read = nullptr; + RuntimeProfile::Counter* selected_rows = nullptr; + RuntimeProfile::Counter* rows_filtered_by_conjunct = nullptr; + RuntimeProfile::Counter* total_batches = nullptr; + RuntimeProfile::Counter* empty_selection_batches = nullptr; + RuntimeProfile::Counter* range_gap_skipped_rows = nullptr; + RuntimeProfile::Counter* column_read_time = nullptr; + RuntimeProfile::Counter* predicate_filter_time = nullptr; + ParquetColumnReaderProfile column_reader_profile; +}; + Status plan_parquet_row_groups(const ::parquet::FileMetaData& metadata, ::parquet::ParquetFileReader* file_reader, const std::vector>& file_schema, @@ -83,6 +99,10 @@ Status execute_batch_filters(const reader::FileScanRequest& request, int64_t bat class ParquetScanScheduler { public: void set_plan(RowGroupScanPlan plan); + void set_page_skip_profile(ParquetPageSkipProfile page_skip_profile) { + _page_skip_profile = page_skip_profile; + } + void set_scan_profile(ParquetScanProfile scan_profile) { _scan_profile = scan_profile; } void reset(); bool empty() const { return _row_group_plans.empty(); } @@ -118,6 +138,8 @@ class ParquetScanScheduler { std::vector _current_selected_ranges; size_t _current_range_idx = 0; int64_t _current_range_rows_read = 0; + ParquetPageSkipProfile _page_skip_profile; + ParquetScanProfile _scan_profile; }; } // namespace doris::parquet diff --git a/be/src/format/new_parquet/parquet_statistics.cpp b/be/src/format/new_parquet/parquet_statistics.cpp index 464849a3787928..4f9c8cccebb889 100644 --- a/be/src/format/new_parquet/parquet_statistics.cpp +++ b/be/src/format/new_parquet/parquet_statistics.cpp @@ -702,6 +702,9 @@ Status ParquetStatisticsUtils::SelectRowGroups( const std::vector>& file_schema, const reader::FileScanRequest& request, std::vector* selected_row_groups, bool enable_bloom_filter, ParquetPruningStats* pruning_stats) { + int64_t row_group_filter_time_sink = 0; + SCOPED_RAW_TIMER(pruning_stats == nullptr ? &row_group_filter_time_sink + : &pruning_stats->row_group_filter_time); if (selected_row_groups == nullptr) { return Status::InvalidArgument("selected_row_groups is null"); } @@ -966,8 +969,8 @@ int64_t count_range_rows(const std::vector& ranges) { return rows; } -void append_page_range(const ::parquet::OffsetIndex& offset_index, size_t page_idx, - int64_t row_group_rows, std::vector* ranges) { +RowRange page_row_range(const ::parquet::OffsetIndex& offset_index, size_t page_idx, + int64_t row_group_rows) { const auto& page_locations = offset_index.page_locations(); const int64_t start = page_locations[page_idx].first_row_index; const int64_t end = page_idx + 1 == page_locations.size() @@ -976,17 +979,21 @@ void append_page_range(const ::parquet::OffsetIndex& offset_index, size_t page_i DORIS_CHECK(start >= 0); DORIS_CHECK(end >= start); DORIS_CHECK(end <= row_group_rows); - if (start == end) { + return RowRange {start, end - start}; +} + +void append_row_range(const RowRange& range, std::vector* ranges) { + if (range.length == 0) { return; } if (!ranges->empty()) { auto& previous = ranges->back(); - if (previous.start + previous.length == start) { - previous.length += end - start; + if (previous.start + previous.length == range.start) { + previous.length += range.length; return; } } - ranges->push_back(RowRange {start, end - start}); + ranges->push_back(range); } bool select_ranges_for_filter(const std::shared_ptr<::parquet::RowGroupPageIndexReader>& row_group, @@ -1025,23 +1032,167 @@ bool select_ranges_for_filter(const std::shared_ptr<::parquet::RowGroupPageIndex ranges->clear(); return false; } + const RowRange row_range = page_row_range(*offset_index, page_idx, row_group_rows); if (ParquetStatisticsUtils::CheckStatistics(column_filter, page_statistics)) { continue; } - append_page_range(*offset_index, page_idx, row_group_rows, ranges); + append_row_range(row_range, ranges); } return true; } +bool ranges_intersect(const std::vector& ranges, const RowRange& range) { + const int64_t range_end = range.start + range.length; + for (const auto& selected_range : ranges) { + const int64_t selected_end = selected_range.start + selected_range.length; + if (selected_end <= range.start) { + continue; + } + if (selected_range.start >= range_end) { + return false; + } + return true; + } + return false; +} + +void collect_leaf_schemas(const ParquetColumnSchema& column_schema, + const reader::LocalColumnIndex* projection, + std::vector* leaf_schemas) { + if (column_schema.kind == ParquetColumnSchemaKind::PRIMITIVE) { + leaf_schemas->push_back(&column_schema); + return; + } + for (const auto& child_schema : column_schema.children) { + if (!reader::is_child_projected(projection, child_schema->local_id)) { + continue; + } + const auto* child_projection = + reader::find_child_projection(projection, child_schema->local_id); + collect_leaf_schemas(*child_schema, child_projection, leaf_schemas); + } +} + +void collect_request_leaf_schemas( + const std::vector>& file_schema, + const reader::FileScanRequest& request, + std::vector* leaf_schemas) { + std::set seen_leaf_ids; + auto collect_projection = [&](const reader::LocalColumnIndex& projection) { + const int32_t local_id = projection.field_id(); + if (local_id < 0 || local_id >= static_cast(file_schema.size())) { + return; + } + std::vector projection_leaf_schemas; + collect_leaf_schemas(*file_schema[local_id], &projection, &projection_leaf_schemas); + for (const auto* leaf_schema : projection_leaf_schemas) { + DORIS_CHECK(leaf_schema != nullptr); + if (seen_leaf_ids.insert(leaf_schema->leaf_column_id).second) { + leaf_schemas->push_back(leaf_schema); + } + } + }; + for (const auto& projection : request.predicate_columns) { + collect_projection(projection); + } + for (const auto& projection : request.non_predicate_columns) { + collect_projection(projection); + } + for (const auto& column_filter : request.column_predicate_filters) { + const auto* leaf_schema = + ParquetStatisticsUtils::ResolvePredicateLeafSchema(file_schema, column_filter); + if (leaf_schema == nullptr) { + continue; + } + if (seen_leaf_ids.insert(leaf_schema->leaf_column_id).second) { + leaf_schemas->push_back(leaf_schema); + } + } +} + +bool build_page_skip_plan_for_leaf( + const std::shared_ptr<::parquet::RowGroupPageIndexReader>& row_group, + const ParquetColumnSchema& column_schema, const std::vector& selected_ranges, + int64_t row_group_rows, ParquetPageSkipPlan* page_skip_plan) { + DORIS_CHECK(page_skip_plan != nullptr); + *page_skip_plan = ParquetPageSkipPlan {}; + // OffsetIndex first_row_index is row-based only for non-repeated leaves. LIST/MAP/repeated + // leaves need repetition-level-aware range mapping and are intentionally left out for now. + if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || + column_schema.descriptor == nullptr || column_schema.leaf_column_id < 0 || + column_schema.descriptor->max_repetition_level() != 0) { + return false; + } + + std::shared_ptr<::parquet::OffsetIndex> offset_index; + try { + offset_index = row_group->GetOffsetIndex(column_schema.leaf_column_id); + } catch (const ::parquet::ParquetException&) { + return false; + } catch (const std::exception&) { + return false; + } + if (offset_index == nullptr) { + return false; + } + + const auto page_count = offset_index->page_locations().size(); + page_skip_plan->leaf_column_id = column_schema.leaf_column_id; + page_skip_plan->skipped_pages.resize(page_count); + page_skip_plan->skipped_page_compressed_sizes.resize(page_count); + const auto& page_locations = offset_index->page_locations(); + for (size_t page_idx = 0; page_idx < page_count; ++page_idx) { + const RowRange row_range = page_row_range(*offset_index, page_idx, row_group_rows); + if (row_range.length == 0 || ranges_intersect(selected_ranges, row_range)) { + continue; + } + page_skip_plan->skipped_pages[page_idx] = 1; + page_skip_plan->skipped_page_compressed_sizes[page_idx] = + page_locations[page_idx].compressed_page_size; + append_row_range(row_range, &page_skip_plan->skipped_ranges); + } + if (page_skip_plan->empty()) { + *page_skip_plan = ParquetPageSkipPlan {}; + return false; + } + return true; +} + +void build_page_skip_plans(const std::shared_ptr<::parquet::RowGroupPageIndexReader>& row_group, + const std::vector>& file_schema, + const reader::FileScanRequest& request, + const std::vector& selected_ranges, int64_t row_group_rows, + std::map* page_skip_plans) { + DORIS_CHECK(page_skip_plans != nullptr); + page_skip_plans->clear(); + std::vector leaf_schemas; + collect_request_leaf_schemas(file_schema, request, &leaf_schemas); + for (const auto* leaf_schema : leaf_schemas) { + DORIS_CHECK(leaf_schema != nullptr); + ParquetPageSkipPlan page_skip_plan; + if (build_page_skip_plan_for_leaf(row_group, *leaf_schema, selected_ranges, row_group_rows, + &page_skip_plan)) { + page_skip_plans->emplace(page_skip_plan.leaf_column_id, std::move(page_skip_plan)); + } + } +} + } // namespace Status select_row_group_ranges_by_page_index( ::parquet::ParquetFileReader* file_reader, const std::vector>& file_schema, const reader::FileScanRequest& request, int row_group_idx, int64_t row_group_rows, - std::vector* selected_ranges, ParquetPruningStats* pruning_stats) { + std::vector* selected_ranges, std::map* page_skip_plans, + ParquetPruningStats* pruning_stats) { + int64_t page_index_filter_time_sink = 0; + SCOPED_RAW_TIMER(pruning_stats == nullptr ? &page_index_filter_time_sink + : &pruning_stats->page_index_filter_time); DORIS_CHECK(selected_ranges != nullptr); selected_ranges->clear(); + if (page_skip_plans != nullptr) { + page_skip_plans->clear(); + } if (row_group_rows <= 0) { return Status::OK(); } @@ -1057,11 +1208,16 @@ Status select_row_group_ranges_by_page_index( if (pruning_stats != nullptr) { ++pruning_stats->page_index_read_calls; } - page_index_reader = file_reader->GetPageIndexReader(); - if (page_index_reader == nullptr) { - return Status::OK(); + { + int64_t read_page_index_time_sink = 0; + SCOPED_RAW_TIMER(pruning_stats == nullptr ? &read_page_index_time_sink + : &pruning_stats->read_page_index_time); + page_index_reader = file_reader->GetPageIndexReader(); + if (page_index_reader == nullptr) { + return Status::OK(); + } + row_group_index_reader = page_index_reader->RowGroup(row_group_idx); } - row_group_index_reader = page_index_reader->RowGroup(row_group_idx); } catch (const ::parquet::ParquetException&) { return Status::OK(); } catch (const std::exception&) { @@ -1079,6 +1235,9 @@ Status select_row_group_ranges_by_page_index( } *selected_ranges = intersect_ranges(*selected_ranges, filter_ranges); if (selected_ranges->empty()) { + if (page_skip_plans != nullptr) { + page_skip_plans->clear(); + } if (pruning_stats != nullptr) { pruning_stats->filtered_page_rows += row_group_rows; ++pruning_stats->filtered_row_groups_by_page_index; @@ -1086,6 +1245,10 @@ Status select_row_group_ranges_by_page_index( return Status::OK(); } } + if (page_skip_plans != nullptr) { + build_page_skip_plans(row_group_index_reader, file_schema, request, *selected_ranges, + row_group_rows, page_skip_plans); + } if (pruning_stats != nullptr) { const int64_t selected_rows = count_range_rows(*selected_ranges); DORIS_CHECK(selected_rows <= row_group_rows); diff --git a/be/src/format/new_parquet/parquet_statistics.h b/be/src/format/new_parquet/parquet_statistics.h index 560a073636c632..35b267bb8d3b3c 100644 --- a/be/src/format/new_parquet/parquet_statistics.h +++ b/be/src/format/new_parquet/parquet_statistics.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -61,6 +62,9 @@ struct ParquetPruningStats { int64_t selected_row_ranges = 0; int64_t page_index_read_calls = 0; int64_t bloom_filter_read_time = 0; + int64_t row_group_filter_time = 0; + int64_t page_index_filter_time = 0; + int64_t read_page_index_time = 0; }; // Parquet row group column statistics 转换后的 Doris 统计视图。 @@ -131,6 +135,7 @@ Status select_row_group_ranges_by_page_index( ::parquet::ParquetFileReader* file_reader, const std::vector>& file_schema, const reader::FileScanRequest& request, int row_group_idx, int64_t row_group_rows, - std::vector* selected_ranges, ParquetPruningStats* pruning_stats); + std::vector* selected_ranges, std::map* page_skip_plans, + ParquetPruningStats* pruning_stats); } // namespace doris::parquet diff --git a/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp b/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp index da64c6a2d2bf9a..630f4df4c0059b 100644 --- a/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp +++ b/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.cpp @@ -32,6 +32,7 @@ #include "core/data_type_serde/decoded_column_view.h" #include "core/string_ref.h" #include "format/new_parquet/reader/nested_column_reader.h" +#include "runtime/runtime_profile.h" namespace doris::parquet { namespace { @@ -138,7 +139,10 @@ Status read_leaf_records(const ArrowLeafReaderContext& context, int64_t batch_ro try { context.record_reader->Reset(); context.record_reader->Reserve(batch_rows); - records_read = context.record_reader->ReadRecords(batch_rows); + { + SCOPED_TIMER(context.profile.arrow_read_records_time); + records_read = context.record_reader->ReadRecords(batch_rows); + } } catch (const ::parquet::ParquetException& e) { return Status::Corruption("Failed to read parquet records for column {}: {}", context.column_name(), e.what()); @@ -204,8 +208,11 @@ Status append_leaf_values(const ArrowLeafReaderContext& context, view.values = record_reader.values(); } - RETURN_IF_ERROR( - context.data_type()->get_serde()->read_column_from_decoded_values(*column, view)); + { + SCOPED_TIMER(context.profile.materialization_time); + RETURN_IF_ERROR( + context.data_type()->get_serde()->read_column_from_decoded_values(*column, view)); + } return Status::OK(); } diff --git a/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.h b/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.h index e90ab002823037..ca77dee54eea00 100644 --- a/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.h +++ b/be/src/format/new_parquet/reader/arrow_leaf_reader_adapter.h @@ -25,6 +25,7 @@ #include "common/status.h" #include "core/column/column.h" #include "core/column/column_nullable.h" +#include "format/new_parquet/parquet_profile.h" #include "format/new_parquet/parquet_type.h" namespace parquet { @@ -45,6 +46,7 @@ struct ArrowLeafReaderContext { DataTypePtr type; std::string name; std::shared_ptr<::parquet::internal::RecordReader> record_reader; + ParquetColumnReaderProfile profile; const std::string& column_name() const { return name; } const DataTypePtr& data_type() const { return type; } diff --git a/be/src/format/new_parquet/reader/column_reader.cpp b/be/src/format/new_parquet/reader/column_reader.cpp index c38450bfdea453..44fd7e9f216417 100644 --- a/be/src/format/new_parquet/reader/column_reader.cpp +++ b/be/src/format/new_parquet/reader/column_reader.cpp @@ -17,12 +17,15 @@ #include "format/new_parquet/reader/column_reader.h" +#include #include #include +#include #include #include #include +#include #include #include #include @@ -40,10 +43,68 @@ #include "format/new_parquet/reader/scalar_column_reader.h" #include "format/new_parquet/reader/struct_column_reader.h" #include "format/reader/file_reader.h" +#include "runtime/runtime_profile.h" namespace doris::parquet { namespace { +class DataPageSkipFilter { +public: + DataPageSkipFilter(const ParquetPageSkipPlan* page_skip_plan, + ParquetPageSkipProfile page_skip_profile) + : _page_skip_plan(page_skip_plan), _page_skip_profile(page_skip_profile) { + DORIS_CHECK(_page_skip_plan != nullptr); + } + + bool operator()(const ::parquet::DataPageStats&) { + // Arrow invokes this callback once for each DATA_PAGE/DATA_PAGE_V2 and never for + // dictionary pages, so this ordinal matches Parquet OffsetIndex page locations. + const size_t page_idx = _next_data_page_idx++; + const bool skip = _page_skip_plan->should_skip_page(page_idx); + if (!skip) { + return false; + } + update_skip_profile(page_idx); + return true; + } + +private: + void update_skip_profile(size_t page_idx) const { + if (_page_skip_profile.skipped_pages != nullptr) { + COUNTER_UPDATE(_page_skip_profile.skipped_pages, 1); + } + if (_page_skip_profile.skipped_bytes != nullptr) { + COUNTER_UPDATE(_page_skip_profile.skipped_bytes, + _page_skip_plan->skipped_page_compressed_size(page_idx)); + } + } + + const ParquetPageSkipPlan* _page_skip_plan = nullptr; + ParquetPageSkipProfile _page_skip_profile; + size_t _next_data_page_idx = 0; +}; + +const ParquetPageSkipPlan* find_page_skip_plan( + const std::map* page_skip_plans, int leaf_column_id) { + if (page_skip_plans == nullptr) { + return nullptr; + } + const auto plan_it = page_skip_plans->find(leaf_column_id); + return plan_it == page_skip_plans->end() ? nullptr : &plan_it->second; +} + +void install_data_page_filter(std::unique_ptr<::parquet::PageReader>& page_reader, + const std::map* page_skip_plans, + int leaf_column_id, ParquetPageSkipProfile page_skip_profile) { + DORIS_CHECK(page_reader != nullptr); + const ParquetPageSkipPlan* page_skip_plan = + find_page_skip_plan(page_skip_plans, leaf_column_id); + if (page_skip_plan == nullptr) { + return; + } + page_reader->set_data_page_filter(DataPageSkipFilter(page_skip_plan, page_skip_profile)); +} + bool supports_nested_scalar_record_reader(const ParquetColumnSchema& column_schema) { if (supports_record_reader(column_schema.type_descriptor)) { return true; @@ -76,6 +137,18 @@ Status ParquetColumnReader::skip(int64_t rows) { return Status::NotSupported("Parquet column skip is not implemented, rows={}", rows); } +void ParquetColumnReader::update_reader_read_rows(int64_t rows) const { + if (_profile.reader_read_rows != nullptr) { + COUNTER_UPDATE(_profile.reader_read_rows, rows); + } +} + +void ParquetColumnReader::update_reader_skip_rows(int64_t rows) const { + if (_profile.reader_skip_rows != nullptr) { + COUNTER_UPDATE(_profile.reader_skip_rows, rows); + } +} + Status ParquetColumnReader::select(const SelectionVector& sel, uint16_t selected_rows, int64_t batch_rows, MutableColumnPtr& column) { if (column.get() == nullptr) { @@ -103,13 +176,21 @@ Status ParquetColumnReader::select(const SelectionVector& sel, uint16_t selected cursor = range.start + range.length; } RETURN_IF_ERROR(skip(batch_rows - cursor)); + if (_profile.reader_select_rows != nullptr) { + COUNTER_UPDATE(_profile.reader_select_rows, selected_rows); + } return Status::OK(); } ParquetColumnReaderFactory::ParquetColumnReaderFactory( - std::shared_ptr<::parquet::RowGroupReader> row_group, int num_leaf_columns) + std::shared_ptr<::parquet::RowGroupReader> row_group, int num_leaf_columns, + const std::map* page_skip_plans, + ParquetPageSkipProfile page_skip_profile, ParquetColumnReaderProfile column_reader_profile) : _row_group(std::move(row_group)), - _record_readers(static_cast(num_leaf_columns)) {} + _record_readers(static_cast(num_leaf_columns)), + _page_skip_plans(page_skip_plans), + _page_skip_profile(page_skip_profile), + _column_reader_profile(column_reader_profile) {} reader::ColumnDefinition ParquetColumnReaderFactory::row_position_column_definition() { reader::ColumnDefinition field; @@ -123,7 +204,7 @@ reader::ColumnDefinition ParquetColumnReaderFactory::row_position_column_definit std::unique_ptr ParquetColumnReaderFactory::create_row_position_column_reader( int64_t row_group_first_row) const { - return std::make_unique(row_group_first_row); + return std::make_unique(row_group_first_row, _column_reader_profile); } Status ParquetColumnReaderFactory::create_scalar_reader( @@ -133,7 +214,9 @@ Status ParquetColumnReaderFactory::create_scalar_reader( if (reader == nullptr) { return Status::InvalidArgument("reader is null"); } - *reader = std::make_unique(column_schema, std::move(record_reader)); + const auto* page_skip_plan = find_page_skip_plan(_page_skip_plans, column_schema.leaf_column_id); + *reader = std::make_unique(column_schema, std::move(record_reader), + page_skip_plan, _column_reader_profile); return Status::OK(); } @@ -214,8 +297,15 @@ Status ParquetColumnReaderFactory::get_record_reader( } if (_record_readers[leaf_column_id] == nullptr) { try { - _record_readers[leaf_column_id] = - _row_group->RecordReader(leaf_column_id, /*read_dictionary=*/false); + auto page_reader = _row_group->GetColumnPageReader(leaf_column_id); + install_data_page_filter(page_reader, _page_skip_plans, leaf_column_id, + _page_skip_profile); + const auto level_info = ::parquet::internal::LevelInfo::ComputeLevelInfo(descriptor); + _record_readers[leaf_column_id] = ::parquet::internal::RecordReader::Make( + descriptor, level_info, ::arrow::default_memory_pool(), + /*read_dictionary=*/false, + /*read_dense_for_nullable=*/false); + _record_readers[leaf_column_id]->SetPageReader(std::move(page_reader)); } catch (const ::parquet::ParquetException& e) { return Status::Corruption("Failed to create parquet record reader for column {}: {}", name, e.what()); @@ -278,9 +368,9 @@ Status ParquetColumnReaderFactory::create_struct_column_reader( type = make_nullable(type); } } - *reader = std::make_unique(column_schema, std::move(type), - std::move(child_readers), - std::move(child_output_indices)); + *reader = std::make_unique( + column_schema, std::move(type), std::move(child_readers), + std::move(child_output_indices), _column_reader_profile); return Status::OK(); } @@ -328,7 +418,7 @@ Status ParquetColumnReaderFactory::create_list_column_reader( } } *reader = std::make_unique(column_schema, std::move(type), - std::move(element_reader)); + std::move(element_reader), _column_reader_profile); return Status::OK(); } @@ -378,8 +468,9 @@ Status ParquetColumnReaderFactory::create_map_column_reader( type = make_nullable(type); } } - *reader = std::make_unique(column_schema, std::move(type), - std::move(key_reader), std::move(value_reader)); + *reader = + std::make_unique(column_schema, std::move(type), std::move(key_reader), + std::move(value_reader), _column_reader_profile); return Status::OK(); } @@ -403,8 +494,10 @@ Status ParquetColumnReaderFactory::create(const ParquetColumnSchema& column_sche column_schema.name); } -ParquetColumnReader::ParquetColumnReader(const ParquetColumnSchema& schema, const DataTypePtr type) - : _field_id(schema.local_id), +ParquetColumnReader::ParquetColumnReader(const ParquetColumnSchema& schema, const DataTypePtr type, + ParquetColumnReaderProfile profile) + : _profile(profile), + _field_id(schema.local_id), _leaf_column_id(schema.leaf_column_id), _nullable_definition_level(schema.nullable_definition_level), _repeated_repetition_level(schema.repeated_repetition_level), diff --git a/be/src/format/new_parquet/reader/column_reader.h b/be/src/format/new_parquet/reader/column_reader.h index 07b9713e852605..6d2f5acdabd9a4 100644 --- a/be/src/format/new_parquet/reader/column_reader.h +++ b/be/src/format/new_parquet/reader/column_reader.h @@ -18,12 +18,14 @@ #pragma once #include +#include #include #include #include #include "common/status.h" #include "core/data_type/data_type.h" +#include "format/new_parquet/parquet_profile.h" #include "format/new_parquet/parquet_type.h" #include "format/new_parquet/selection_vector.h" #include "format/reader/column_data.h" @@ -43,6 +45,11 @@ class IColumn; namespace parquet { struct ParquetColumnSchema; +struct ParquetPageSkipProfile { + RuntimeProfile::Counter* skipped_pages = nullptr; + RuntimeProfile::Counter* skipped_bytes = nullptr; +}; + // Doris 的 Parquet column reader 抽象。 // 该类包装 Arrow Parquet RecordReader,负责将 file-local Parquet leaf column 读取成 // Doris-owned column。它不理解 Iceberg/global schema,也不处理 table-level @@ -63,6 +70,7 @@ class ParquetColumnReader { virtual const DataTypePtr& type() const { return _type; } virtual const std::string& name() const { return _name; } + const ParquetColumnReaderProfile& profile() const { return _profile; } // 读取一个 file-local column batch。 virtual Status read(int64_t rows, MutableColumnPtr& column, int64_t* rows_read) = 0; @@ -76,9 +84,13 @@ class ParquetColumnReader { MutableColumnPtr& column); protected: - ParquetColumnReader(const ParquetColumnSchema& schema, const DataTypePtr type); + ParquetColumnReader(const ParquetColumnSchema& schema, const DataTypePtr type, + ParquetColumnReaderProfile profile = {}); ParquetColumnReader() = default; + void update_reader_read_rows(int64_t rows) const; + void update_reader_skip_rows(int64_t rows) const; + ParquetColumnReaderProfile _profile; const int _field_id = -1; const int _leaf_column_id = -1; const int16_t _nullable_definition_level = 0; @@ -95,7 +107,10 @@ class ParquetColumnReader { class ParquetColumnReaderFactory { public: ParquetColumnReaderFactory(std::shared_ptr<::parquet::RowGroupReader> row_group, - int num_leaf_columns); + int num_leaf_columns, + const std::map* page_skip_plans = nullptr, + ParquetPageSkipProfile page_skip_profile = {}, + ParquetColumnReaderProfile column_reader_profile = {}); static constexpr int ROW_POSITION_COLUMN_ID = -10001; static constexpr const char* ROW_POSITION_COLUMN_NAME = "__parquet_row_position"; @@ -145,6 +160,9 @@ class ParquetColumnReaderFactory { std::shared_ptr<::parquet::RowGroupReader> _row_group; mutable std::vector> _record_readers; + const std::map* _page_skip_plans = nullptr; + ParquetPageSkipProfile _page_skip_profile; + ParquetColumnReaderProfile _column_reader_profile; }; } // namespace parquet diff --git a/be/src/format/new_parquet/reader/list_column_reader.h b/be/src/format/new_parquet/reader/list_column_reader.h index d208cbaa0996a6..8935a59e4caf05 100644 --- a/be/src/format/new_parquet/reader/list_column_reader.h +++ b/be/src/format/new_parquet/reader/list_column_reader.h @@ -31,8 +31,10 @@ namespace doris::parquet { class ListColumnReader final : public ParquetColumnReader { public: ListColumnReader(const ParquetColumnSchema& schema, DataTypePtr type, - std::unique_ptr element_reader) - : ParquetColumnReader(schema, type), _element_reader(std::move(element_reader)) {} + std::unique_ptr element_reader, + ParquetColumnReaderProfile profile = {}) + : ParquetColumnReader(schema, type, profile), + _element_reader(std::move(element_reader)) {} Status read(int64_t rows, MutableColumnPtr& column, int64_t* rows_read) override; Status skip(int64_t rows) override; diff --git a/be/src/format/new_parquet/reader/map_column_reader.cpp b/be/src/format/new_parquet/reader/map_column_reader.cpp index 01272974ec634d..a3ace4d5ebe168 100644 --- a/be/src/format/new_parquet/reader/map_column_reader.cpp +++ b/be/src/format/new_parquet/reader/map_column_reader.cpp @@ -623,12 +623,13 @@ Status MapColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* ro if (readers.scalar_value != nullptr) { const int16_t value_max_definition_level = readers.scalar_value->descriptor()->max_definition_level(); - return read_aligned_map_entries( + RETURN_IF_ERROR(read_aligned_map_entries( _name, _type, _nullable_definition_level, _repeated_repetition_level, *readers.key, key_max_definition_level, *readers.scalar_value, &_value_overflow, &_key_overflow, NestedScalarValueAppender {readers.scalar_value, "MAP", "value", value_max_definition_level}, - rows, &context, rows_read); + rows, &context, rows_read)); + return Status::OK(); } if (readers.list_value != nullptr) { @@ -641,17 +642,19 @@ Status MapColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* ro _name); } - return read_map_list_value_entries( + RETURN_IF_ERROR(read_map_list_value_entries( _name, _type, _nullable_definition_level, _repeated_repetition_level, *readers.key, key_max_definition_level, *readers.list_value, *scalar_list_value_reader, - &_key_overflow, &_value_overflow, rows, &context, rows_read); + &_key_overflow, &_value_overflow, rows, &context, rows_read)); + return Status::OK(); } - return read_aligned_map_entries( + RETURN_IF_ERROR(read_aligned_map_entries( _name, _type, _nullable_definition_level, _repeated_repetition_level, *readers.key, key_max_definition_level, *readers.struct_value, &_struct_value_overflow, &_key_overflow, NestedStructValueAppender {readers.struct_value}, rows, &context, - rows_read); + rows_read)); + return Status::OK(); } Status MapColumnReader::skip(int64_t rows) { diff --git a/be/src/format/new_parquet/reader/map_column_reader.h b/be/src/format/new_parquet/reader/map_column_reader.h index d2f864a9d8610b..50283705cecb98 100644 --- a/be/src/format/new_parquet/reader/map_column_reader.h +++ b/be/src/format/new_parquet/reader/map_column_reader.h @@ -32,8 +32,9 @@ class MapColumnReader final : public ParquetColumnReader { public: MapColumnReader(const ParquetColumnSchema& schema, DataTypePtr type, std::unique_ptr key_reader, - std::unique_ptr value_reader) - : ParquetColumnReader(schema, type), + std::unique_ptr value_reader, + ParquetColumnReaderProfile profile = {}) + : ParquetColumnReader(schema, type, profile), _key_reader(std::move(key_reader)), _value_reader(std::move(value_reader)) {} diff --git a/be/src/format/new_parquet/reader/nested_column_reader.cpp b/be/src/format/new_parquet/reader/nested_column_reader.cpp index c8556816449107..b643d56743ed17 100644 --- a/be/src/format/new_parquet/reader/nested_column_reader.cpp +++ b/be/src/format/new_parquet/reader/nested_column_reader.cpp @@ -33,8 +33,14 @@ namespace doris::parquet { Status read_nested_scalar_batch(ScalarColumnReader& column_reader, int64_t batch_rows, int16_t value_slot_definition_level, NestedScalarBatch* batch, int16_t value_slot_repetition_level) { - return read_nested_leaf_batch(column_reader.leaf_context(), batch_rows, - value_slot_definition_level, batch, value_slot_repetition_level); + RETURN_IF_ERROR(read_nested_leaf_batch(column_reader.leaf_context(), batch_rows, + value_slot_definition_level, batch, + value_slot_repetition_level)); + column_reader.advance_rows_read(batch->records_read); + if (column_reader.profile().reader_read_rows != nullptr) { + COUNTER_UPDATE(column_reader.profile().reader_read_rows, batch->records_read); + } + return Status::OK(); } Status append_scalar_batch_value(const ScalarColumnReader& column_reader, diff --git a/be/src/format/new_parquet/reader/row_position_column_reader.cpp b/be/src/format/new_parquet/reader/row_position_column_reader.cpp index 54d2fca1ac4a47..a8ab6305cd5798 100644 --- a/be/src/format/new_parquet/reader/row_position_column_reader.cpp +++ b/be/src/format/new_parquet/reader/row_position_column_reader.cpp @@ -26,11 +26,12 @@ namespace doris::parquet { -RowPositionColumnReader::RowPositionColumnReader(int64_t row_group_first_row) +RowPositionColumnReader::RowPositionColumnReader(int64_t row_group_first_row, + ParquetColumnReaderProfile profile) : ParquetColumnReader( ParquetColumnSchema { .name = ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME}, - std::make_shared()), + std::make_shared(), profile), _row_group_first_row(row_group_first_row) {} int RowPositionColumnReader::file_column_id() const { diff --git a/be/src/format/new_parquet/reader/row_position_column_reader.h b/be/src/format/new_parquet/reader/row_position_column_reader.h index 9f86ccc9dc9752..994e8e5edbab08 100644 --- a/be/src/format/new_parquet/reader/row_position_column_reader.h +++ b/be/src/format/new_parquet/reader/row_position_column_reader.h @@ -26,7 +26,8 @@ namespace doris::parquet { class RowPositionColumnReader final : public ParquetColumnReader { public: - explicit RowPositionColumnReader(int64_t row_group_first_row); + explicit RowPositionColumnReader(int64_t row_group_first_row, + ParquetColumnReaderProfile profile = {}); int file_column_id() const override; int parquet_leaf_column_id() const override; diff --git a/be/src/format/new_parquet/reader/scalar_column_reader.cpp b/be/src/format/new_parquet/reader/scalar_column_reader.cpp index 77b52bd66bb214..0fae8a312c9278 100644 --- a/be/src/format/new_parquet/reader/scalar_column_reader.cpp +++ b/be/src/format/new_parquet/reader/scalar_column_reader.cpp @@ -19,6 +19,7 @@ #include +#include #include #include @@ -29,11 +30,13 @@ namespace doris::parquet { ScalarColumnReader::ScalarColumnReader( const ParquetColumnSchema& column_schema, - std::shared_ptr<::parquet::internal::RecordReader> record_reader) - : ParquetColumnReader(column_schema, column_schema.type), + std::shared_ptr<::parquet::internal::RecordReader> record_reader, + const ParquetPageSkipPlan* page_skip_plan, ParquetColumnReaderProfile profile) + : ParquetColumnReader(column_schema, column_schema.type, profile), _descriptor(column_schema.descriptor), _type_descriptor(column_schema.type_descriptor), - _record_reader(std::move(record_reader)) {} + _record_reader(std::move(record_reader)), + _page_skip_plan(page_skip_plan) {} Status ScalarColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* rows_read) { if (column.get() == nullptr || rows_read == nullptr) { @@ -57,18 +60,19 @@ Status ScalarColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* RETURN_IF_ERROR(build_leaf_null_map(context, *record_reader, *rows_read, &null_map)); RETURN_IF_ERROR(append_leaf_values(context, *record_reader, *rows_read, &null_map, column)); + advance_rows_read(*rows_read); + update_reader_read_rows(*rows_read); return Status::OK(); } -Status ScalarColumnReader::skip(int64_t rows) { - if (rows <= 0) { - return Status::OK(); - } - +Status ScalarColumnReader::skip_records(int64_t rows) { if (_record_reader == nullptr) { return Status::InternalError("Parquet record reader is not initialized for column {}", _name); } + if (rows <= 0) { + return Status::OK(); + } int64_t skipped_rows = 0; try { _record_reader->Reset(); @@ -88,6 +92,53 @@ Status ScalarColumnReader::skip(int64_t rows) { return Status::InternalError("Failed to skip parquet records for column {}: {}", _name, e.what()); } + update_reader_skip_rows(rows); + return Status::OK(); +} + +int64_t ScalarColumnReader::page_filtered_rows_to_skip(int64_t rows) const { + if (_page_skip_plan == nullptr || rows <= 0) { + return 0; + } + const int64_t skip_end = _row_group_rows_read + rows; + int64_t filtered_rows = 0; + for (const auto& range : _page_skip_plan->skipped_ranges) { + const int64_t range_end = range.start + range.length; + if (range_end <= _row_group_rows_read) { + continue; + } + if (range.start >= skip_end) { + break; + } + const int64_t start = std::max(range.start, _row_group_rows_read); + const int64_t end = std::min(range_end, skip_end); + if (start < end) { + // Scheduler gap skips are derived from page-index selected_ranges. A page-filtered + // range can only overlap such a gap when the whole data page is outside every selected + // range, so partial overlap would mean the planner and scheduler are out of sync. + DORIS_CHECK(start == range.start); + DORIS_CHECK(end == range_end); + filtered_rows += end - start; + } + } + return filtered_rows; +} + +void ScalarColumnReader::advance_rows_read(int64_t rows) { + DORIS_CHECK(rows >= 0); + _row_group_rows_read += rows; +} + +Status ScalarColumnReader::skip(int64_t rows) { + if (rows <= 0) { + return Status::OK(); + } + + const int64_t page_filtered_rows = page_filtered_rows_to_skip(rows); + DORIS_CHECK(page_filtered_rows <= rows); + const int64_t record_reader_skip_rows = rows - page_filtered_rows; + RETURN_IF_ERROR(skip_records(record_reader_skip_rows)); + advance_rows_read(rows); return Status::OK(); } diff --git a/be/src/format/new_parquet/reader/scalar_column_reader.h b/be/src/format/new_parquet/reader/scalar_column_reader.h index 32e1143508fff9..7bd7873d78eabc 100644 --- a/be/src/format/new_parquet/reader/scalar_column_reader.h +++ b/be/src/format/new_parquet/reader/scalar_column_reader.h @@ -37,20 +37,29 @@ namespace doris::parquet { class ScalarColumnReader final : public ParquetColumnReader { public: ScalarColumnReader(const ParquetColumnSchema& column_schema, - std::shared_ptr<::parquet::internal::RecordReader> record_reader); + std::shared_ptr<::parquet::internal::RecordReader> record_reader, + const ParquetPageSkipPlan* page_skip_plan = nullptr, + ParquetColumnReaderProfile profile = {}); Status read(int64_t rows, MutableColumnPtr& column, int64_t* rows_read) override; Status skip(int64_t rows) override; const ::parquet::ColumnDescriptor* descriptor() const { return _descriptor; } ArrowLeafReaderContext leaf_context() const { - return ArrowLeafReaderContext {_descriptor, _type_descriptor, _type, _name, _record_reader}; + return ArrowLeafReaderContext {_descriptor, _type_descriptor, _type, _name, _record_reader, + _profile}; } + void advance_rows_read(int64_t rows); private: + Status skip_records(int64_t rows); + int64_t page_filtered_rows_to_skip(int64_t rows) const; + const ::parquet::ColumnDescriptor* _descriptor = nullptr; ParquetTypeDescriptor _type_descriptor; std::shared_ptr<::parquet::internal::RecordReader> _record_reader; + const ParquetPageSkipPlan* _page_skip_plan = nullptr; + int64_t _row_group_rows_read = 0; }; } // namespace doris::parquet diff --git a/be/src/format/new_parquet/reader/struct_column_reader.h b/be/src/format/new_parquet/reader/struct_column_reader.h index 1505894e049551..fda584ea24e846 100644 --- a/be/src/format/new_parquet/reader/struct_column_reader.h +++ b/be/src/format/new_parquet/reader/struct_column_reader.h @@ -32,8 +32,9 @@ class StructColumnReader final : public ParquetColumnReader { public: StructColumnReader(const ParquetColumnSchema& schema, DataTypePtr type, std::vector> children, - std::vector child_output_indices) - : ParquetColumnReader(schema, type), + std::vector child_output_indices, + ParquetColumnReaderProfile profile = {}) + : ParquetColumnReader(schema, type, profile), _children(std::move(children)), _child_output_indices(std::move(child_output_indices)) { DCHECK_EQ(_children.size(), _child_output_indices.size()); diff --git a/be/src/format/new_parquet/selection_vector.h b/be/src/format/new_parquet/selection_vector.h index 04e4b65a774322..63e50951d07d5e 100644 --- a/be/src/format/new_parquet/selection_vector.h +++ b/be/src/format/new_parquet/selection_vector.h @@ -21,6 +21,7 @@ #include #include +#include "common/check.h" #include "common/status.h" namespace doris::parquet { @@ -30,6 +31,28 @@ struct RowRange { int64_t length = 0; }; +struct ParquetPageSkipPlan { + int leaf_column_id = -1; + // Page ordinal is the data-page ordinal in the column chunk. It intentionally excludes + // dictionary pages, matching Arrow PageReader::set_data_page_filter(). + std::vector skipped_pages; + std::vector skipped_page_compressed_sizes; + // Row ranges covered by skipped data pages. ScalarColumnReader uses these ranges to avoid + // calling RecordReader::SkipRecords() again for pages already skipped by Arrow. + std::vector skipped_ranges; + + bool empty() const { return skipped_ranges.empty(); } + + bool should_skip_page(size_t page_idx) const { + return page_idx < skipped_pages.size() && skipped_pages[page_idx] != 0; + } + + int64_t skipped_page_compressed_size(size_t page_idx) const { + DCHECK_LT(page_idx, skipped_page_compressed_sizes.size()); + return skipped_page_compressed_sizes[page_idx]; + } +}; + // 类似 DuckDB SelectionVector 的轻量行号视图。 // 它只表达一个 batch 内被选中的 row offset,不持有 table/global schema 语义。 // 未绑定 data 时表示 identity selection:get_index(i) == i。 diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp index bf1e36c43c1b27..d88afed00f1858 100644 --- a/be/test/format/new_parquet/parquet_reader_test.cpp +++ b/be/test/format/new_parquet/parquet_reader_test.cpp @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -480,6 +481,36 @@ void write_page_index_filter_parquet_file(const std::string& file_path) { ids.size(), builder.build())); } +void write_page_index_filter_pair_parquet_file(const std::string& file_path) { + std::vector ids(128); + std::iota(ids.begin(), ids.end(), 0); + std::vector payloads; + payloads.reserve(ids.size()); + for (const auto id : ids) { + payloads.push_back(id + 1000); + } + auto schema = arrow::schema({ + arrow::field("id", arrow::int32(), false), + arrow::field("payload", arrow::int32(), false), + }); + auto table = arrow::Table::Make(schema, {build_int32_array(ids), build_int32_array(payloads)}); + + auto file_result = arrow::io::FileOutputStream::Open(file_path); + ASSERT_TRUE(file_result.ok()) << file_result.status(); + std::shared_ptr out = *file_result; + + ::parquet::WriterProperties::Builder builder; + builder.version(::parquet::ParquetVersion::PARQUET_2_6); + builder.data_page_version(::parquet::ParquetDataPageVersion::V2); + builder.compression(::parquet::Compression::UNCOMPRESSED); + builder.disable_dictionary(); + builder.enable_write_page_index(); + builder.write_batch_size(8); + builder.data_pagesize(10); + PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, + ids.size(), builder.build())); +} + parquet::ParquetColumnSchema primitive_bloom_schema(const DataTypePtr& type) { parquet::ParquetColumnSchema schema; schema.local_id = 0; @@ -536,6 +567,15 @@ Block build_file_block_with_row_position(const std::vector& schema) { + DORIS_CHECK(request != nullptr); + for (size_t idx = 0; idx < schema.size(); ++idx) { + request->local_positions.emplace(reader::LocalColumnId(schema[idx].local_id), + reader::LocalIndex(idx)); + } +} + int64_t parquet_column_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) { return column_metadata.has_dictionary_page() ? static_cast(column_metadata.dictionary_page_offset()) @@ -1312,7 +1352,8 @@ class NewParquetReaderTest : public testing::Test { void TearDown() override { std::filesystem::remove_all(_test_dir); } std::unique_ptr create_reader(int64_t range_start_offset = 0, - int64_t range_size = -1) const { + int64_t range_size = -1, + RuntimeProfile* profile = nullptr) const { auto system_properties = std::make_shared(); system_properties->system_type = TFileType::FILE_LOCAL; auto file_description = std::make_unique(); @@ -1321,7 +1362,7 @@ class NewParquetReaderTest : public testing::Test { file_description->range_start_offset = range_start_offset; file_description->range_size = range_size; return std::make_unique(system_properties, file_description, - nullptr, nullptr); + nullptr, profile); } std::filesystem::path _test_dir; @@ -1418,7 +1459,8 @@ TEST_F(NewParquetReaderTest, ReadMultipleRowGroups) { } TEST_F(NewParquetReaderTest, ReadPredicateAndNonPredicateColumnsWithSelection) { - auto reader = create_reader(); + RuntimeProfile profile("new_parquet_reader_filter_profile"); + auto reader = create_reader(0, -1, &profile); RuntimeState state {TQueryOptions(), TQueryGlobals()}; ASSERT_TRUE(reader->init(&state).ok()); @@ -1454,6 +1496,31 @@ TEST_F(NewParquetReaderTest, ReadPredicateAndNonPredicateColumnsWithSelection) { EXPECT_EQ(values.get_data_at(1).to_string(), "four"); EXPECT_EQ(values.get_data_at(2).to_string(), "five"); + ASSERT_NE(profile.get_counter("FileReaderCreateTime"), nullptr); + ASSERT_NE(profile.get_counter("FileNum"), nullptr); + ASSERT_NE(profile.get_counter("RawRowsRead"), nullptr); + ASSERT_NE(profile.get_counter("SelectedRows"), nullptr); + ASSERT_NE(profile.get_counter("RowsFilteredByConjunct"), nullptr); + ASSERT_NE(profile.get_counter("TotalBatches"), nullptr); + ASSERT_NE(profile.get_counter("EmptySelectionBatches"), nullptr); + ASSERT_NE(profile.get_counter("ReaderReadRows"), nullptr); + ASSERT_NE(profile.get_counter("ReaderSkipRows"), nullptr); + ASSERT_NE(profile.get_counter("ReaderSelectRows"), nullptr); + ASSERT_NE(profile.get_counter("ArrowReadRecordsTime"), nullptr); + ASSERT_NE(profile.get_counter("MaterializationTime"), nullptr); + ASSERT_GT(profile.get_counter("FileReaderCreateTime")->value(), 0); + EXPECT_EQ(profile.get_counter("FileNum")->value(), 1); + EXPECT_EQ(profile.get_counter("RawRowsRead")->value(), ROW_COUNT); + EXPECT_EQ(profile.get_counter("SelectedRows")->value(), 3); + EXPECT_EQ(profile.get_counter("RowsFilteredByConjunct")->value(), 2); + EXPECT_EQ(profile.get_counter("TotalBatches")->value(), 1); + EXPECT_EQ(profile.get_counter("EmptySelectionBatches")->value(), 0); + EXPECT_EQ(profile.get_counter("ReaderReadRows")->value(), ROW_COUNT + 3); + EXPECT_EQ(profile.get_counter("ReaderSkipRows")->value(), 2); + EXPECT_EQ(profile.get_counter("ReaderSelectRows")->value(), 3); + EXPECT_GT(profile.get_counter("ArrowReadRecordsTime")->value(), 0); + EXPECT_GT(profile.get_counter("MaterializationTime")->value(), 0); + rows = 0; eof = false; ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); @@ -1496,6 +1563,40 @@ TEST_F(NewParquetReaderTest, ColumnPredicateOnlyPrunesAndDoesNotFilterRowsInside EXPECT_EQ(values.get_data_at(4).to_string(), "five"); } +TEST_F(NewParquetReaderTest, EmptySelectionUpdatesProfileCounters) { + RuntimeProfile profile("new_parquet_reader_empty_selection_profile"); + auto reader = create_reader(0, -1, &profile); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + Block block = build_file_block(schema); + + auto request = std::make_unique(); + request->predicate_columns = {field_projection(0)}; + request->non_predicate_columns = {field_projection(1)}; + request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 10)); + ASSERT_TRUE(reader->open(request).ok()); + + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + EXPECT_TRUE(eof); + EXPECT_EQ(rows, 0); + + ASSERT_NE(profile.get_counter("RawRowsRead"), nullptr); + ASSERT_NE(profile.get_counter("SelectedRows"), nullptr); + ASSERT_NE(profile.get_counter("RowsFilteredByConjunct"), nullptr); + ASSERT_NE(profile.get_counter("TotalBatches"), nullptr); + ASSERT_NE(profile.get_counter("EmptySelectionBatches"), nullptr); + EXPECT_EQ(profile.get_counter("RawRowsRead")->value(), ROW_COUNT); + EXPECT_EQ(profile.get_counter("SelectedRows")->value(), 0); + EXPECT_EQ(profile.get_counter("RowsFilteredByConjunct")->value(), ROW_COUNT); + EXPECT_EQ(profile.get_counter("TotalBatches")->value(), 1); + EXPECT_EQ(profile.get_counter("EmptySelectionBatches")->value(), 1); +} + TEST_F(NewParquetReaderTest, ReadMultiPredicateColumnsBeforeExpressionFilter) { write_int_pair_parquet_file(_file_path); auto reader = create_reader(); @@ -1690,6 +1791,7 @@ TEST_F(NewParquetReaderTest, PredicateFiltersRowGroupsByDictionary) { request->predicate_columns = {field_projection(1)}; request->non_predicate_columns = {field_projection(0)}; request->conjuncts.push_back(create_string_in_conjunct(1, {"lm"})); + use_schema_order_positions(request.get(), schema); reader::FileColumnPredicateFilter column_filter; column_filter.file_column_id = reader::LocalColumnId(1); column_filter.predicates.push_back(create_comparison_predicate( @@ -1836,6 +1938,20 @@ TEST_F(NewParquetReaderTest, PlannerNarrowsRowRangesByPageIndex) { ASSERT_FALSE(plan.row_groups[0].selected_ranges.empty()); EXPECT_GT(plan.row_groups[0].selected_ranges.front().start, 0); EXPECT_LT(plan.row_groups[0].selected_ranges.front().length, 128); + auto skip_plan_it = plan.row_groups[0].page_skip_plans.find(0); + ASSERT_NE(skip_plan_it, plan.row_groups[0].page_skip_plans.end()); + EXPECT_EQ(skip_plan_it->second.leaf_column_id, 0); + EXPECT_GT(skip_plan_it->second.skipped_ranges.size(), 0); + EXPECT_GT(skip_plan_it->second.skipped_pages.size(), 1); + ASSERT_EQ(skip_plan_it->second.skipped_pages.size(), + skip_plan_it->second.skipped_page_compressed_sizes.size()); + int64_t skipped_compressed_bytes = 0; + for (size_t page_idx = 0; page_idx < skip_plan_it->second.skipped_pages.size(); ++page_idx) { + if (skip_plan_it->second.should_skip_page(page_idx)) { + skipped_compressed_bytes += skip_plan_it->second.skipped_page_compressed_size(page_idx); + } + } + EXPECT_GT(skipped_compressed_bytes, 0); EXPECT_EQ(plan.pruning_stats.total_row_groups, 1); EXPECT_EQ(plan.pruning_stats.selected_row_groups, 1); EXPECT_EQ(plan.pruning_stats.filtered_row_groups_by_page_index, 0); @@ -1882,6 +1998,20 @@ TEST_F(NewParquetReaderTest, NestedStructPredicateNarrowsRowRangesByPageIndex) { ASSERT_FALSE(plan.row_groups[0].selected_ranges.empty()); EXPECT_GT(plan.row_groups[0].selected_ranges.front().start, 0); EXPECT_LT(plan.row_groups[0].selected_ranges.front().length, 128); + auto skip_plan_it = plan.row_groups[0].page_skip_plans.find(0); + ASSERT_NE(skip_plan_it, plan.row_groups[0].page_skip_plans.end()); + EXPECT_EQ(skip_plan_it->second.leaf_column_id, 0); + EXPECT_GT(skip_plan_it->second.skipped_ranges.size(), 0); + EXPECT_GT(skip_plan_it->second.skipped_pages.size(), 1); + ASSERT_EQ(skip_plan_it->second.skipped_pages.size(), + skip_plan_it->second.skipped_page_compressed_sizes.size()); + int64_t skipped_compressed_bytes = 0; + for (size_t page_idx = 0; page_idx < skip_plan_it->second.skipped_pages.size(); ++page_idx) { + if (skip_plan_it->second.should_skip_page(page_idx)) { + skipped_compressed_bytes += skip_plan_it->second.skipped_page_compressed_size(page_idx); + } + } + EXPECT_GT(skipped_compressed_bytes, 0); EXPECT_EQ(plan.pruning_stats.total_row_groups, 1); EXPECT_EQ(plan.pruning_stats.selected_row_groups, 1); EXPECT_EQ(plan.pruning_stats.filtered_row_groups_by_page_index, 0); @@ -1889,6 +2019,74 @@ TEST_F(NewParquetReaderTest, NestedStructPredicateNarrowsRowRangesByPageIndex) { EXPECT_EQ(plan.pruning_stats.selected_row_ranges, plan.row_groups[0].selected_ranges.size()); } +TEST_F(NewParquetReaderTest, PageIndexFilteredPagesDoNotDoubleSkipOutputColumns) { + write_page_index_filter_pair_parquet_file(_file_path); + RuntimeProfile profile("new_parquet_reader_page_skip"); + auto reader = create_reader(0, -1, &profile); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + ASSERT_EQ(schema.size(), 2); + Block block = build_file_block(schema); + + auto request = std::make_unique(); + request->predicate_columns = {field_projection(0)}; + request->non_predicate_columns = {field_projection(1)}; + request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 63)); + reader::FileColumnPredicateFilter column_filter; + column_filter.file_column_id = reader::LocalColumnId(0); + column_filter.predicates.push_back(create_comparison_predicate( + 0, "id", schema[0].type, Field::create_field(63), false)); + request->column_predicate_filters.push_back(std::move(column_filter)); + ASSERT_TRUE(reader->open(request).ok()); + + std::vector ids; + std::vector payloads; + bool eof = false; + while (!eof) { + size_t rows = 0; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + if (rows == 0) { + continue; + } + const auto& id_column = assert_cast(*block.get_by_position(0).column); + const auto& payload_column = + assert_cast(*block.get_by_position(1).column); + for (size_t row = 0; row < rows; ++row) { + ids.push_back(id_column.get_element(row)); + payloads.push_back(payload_column.get_element(row)); + } + } + + ASSERT_NE(profile.get_counter("PagesSkippedByDataPageFilter"), nullptr); + ASSERT_NE(profile.get_counter("DataPageFilterSkipBytes"), nullptr); + ASSERT_NE(profile.get_counter("RawRowsRead"), nullptr); + ASSERT_NE(profile.get_counter("SelectedRows"), nullptr); + ASSERT_NE(profile.get_counter("RangeGapSkippedRows"), nullptr); + ASSERT_NE(profile.get_counter("ReaderSkipRows"), nullptr); + ASSERT_NE(profile.get_counter("RowGroupFilterTime"), nullptr); + ASSERT_NE(profile.get_counter("PageIndexFilterTime"), nullptr); + ASSERT_NE(profile.get_counter("PageIndexReadTime"), nullptr); + EXPECT_GT(profile.get_counter("PagesSkippedByDataPageFilter")->value(), 0); + EXPECT_GT(profile.get_counter("DataPageFilterSkipBytes")->value(), 0); + EXPECT_EQ(profile.get_counter("RawRowsRead")->value(), 64); + EXPECT_EQ(profile.get_counter("SelectedRows")->value(), 64); + EXPECT_GT(profile.get_counter("RangeGapSkippedRows")->value(), 0); + EXPECT_EQ(profile.get_counter("ReaderSkipRows")->value(), 0); + EXPECT_GT(profile.get_counter("RowGroupFilterTime")->value(), 0); + EXPECT_GT(profile.get_counter("PageIndexFilterTime")->value(), 0); + EXPECT_GT(profile.get_counter("PageIndexReadTime")->value(), 0); + + ASSERT_EQ(ids.size(), 64); + ASSERT_EQ(payloads.size(), ids.size()); + for (size_t row = 0; row < ids.size(); ++row) { + EXPECT_EQ(ids[row], static_cast(row + 64)); + EXPECT_EQ(payloads[row], ids[row] + 1000); + } +} + TEST_F(NewParquetReaderTest, InPredicateFiltersRowGroupsByDictionary) { write_dictionary_filter_parquet_file(_file_path); auto reader = create_reader(); @@ -1901,6 +2099,7 @@ TEST_F(NewParquetReaderTest, InPredicateFiltersRowGroupsByDictionary) { request->predicate_columns = {field_projection(1)}; request->non_predicate_columns = {field_projection(0)}; request->conjuncts.push_back(create_string_in_conjunct(1, {"az", "za"})); + use_schema_order_positions(request.get(), schema); auto set = build_set(); set->insert(const_cast("az"), 2); set->insert(const_cast("za"), 2); @@ -1954,6 +2153,7 @@ TEST_F(NewParquetReaderTest, DictionaryPageV2StringEdgesSurviveSelection) { request->predicate_columns = {field_projection(1)}; request->non_predicate_columns = {field_projection(0)}; request->conjuncts.push_back(create_string_in_conjunct(1, {"", "same"})); + use_schema_order_positions(request.get(), schema); auto set = build_set(); set->insert(const_cast(""), 0); set->insert(const_cast("same"), 4); diff --git a/docs/observability-profile-plan.md b/docs/observability-profile-plan.md deleted file mode 100644 index 0b6fce41702795..00000000000000 --- a/docs/observability-profile-plan.md +++ /dev/null @@ -1,100 +0,0 @@ -# Observability / Profile 补充方案 - -## 当前已有 - -已接入 profile 的指标(`parquet_reader.cpp::_init_profile()`): - -| 类别 | 指标 | -|---|---| -| Row group | `RowGroupsTotalNum`、`RowGroupsReadNum`、`RowGroupsFiltered`、`RowGroupsFilteredByMinMax`、`RowGroupsFilteredByDictionary`、`RowGroupsFilteredByBloomFilter` | -| Row/page | `FilteredRowsByGroup`、`FilteredRowsByPage`、`FilteredRowsByLazyRead`、`SelectedRowRanges` | -| I/O | `PageIndexReadCalls`、`PageIndexReadTime`、`PageIndexParseTime`、`FileFooterReadCalls`、`FileFooterHitCache` | -| Decode | `ColumnReadTime`、`DecompressTime`、`DecompressCount`、`DecodeValueTime`、`DecodeDictTime`、`DecodeLevelTime`、`DecodeNullMapTime`、`PageHeaderDecodeTime`、`PageHeaderReadTime` | -| Cache | `PageReadCount`、`PageCacheHitCount`、`PageCacheMissingCount`、`PageCacheWriteCount`、`PageCacheCompressedHitCount`、`PageCacheDecompressedHitCount`、`PageCacheCompressedWriteCount`、`PageCacheDecompressedWriteCount` | -| Filter | `PredicateFilterTime`、`DictFilterRewriteTime`、`BloomFilterReadTime` | -| Misc | `FileNum`、`ParseMetaTime`、`ParseFooterTime`、`FileReaderCreateTime`、`RawRowsRead`、`FilteredBytes` | - -## 需要补充 - -### 1. Scheduler 级别 - -| 指标 | 说明 | 用途 | -|---|---|---| -| `SelectedRows` | 每次 batch selected rows 累计 | 了解 filter 选择性 | -| `SkippedRows` | range gap skip 累计行数 | 评估 page-index pruning 跳过的行数占比 | -| `TotalBatches` | batch 总次数 | 基准 | -| `EmptySelectionBatches` | filter 后 selected_rows==0 的 batch 次数 | 了解 filter 效率 | - -实现位置:`parquet_scan.cpp` 的 `read_current_row_group_batch()` 中,每次执行 filter + select 后更新 counter。 - -```cpp -COUNTER_UPDATE(_profile.selected_rows, selected_rows); -COUNTER_UPDATE(_profile.skipped_rows, batch_rows - selected_rows); -if (selected_rows == 0) COUNTER_UPDATE(_profile.empty_selection_batches, 1); -COUNTER_UPDATE(_profile.total_batches, 1); -``` - -### 2. Column Reader 级别 - -| 指标 | 说明 | 用途 | -|---|---|---| -| `ReaderReadRows` | reader tree read() 累计行数 | reader 实际物化的行数 | -| `ReaderSkipRows` | reader tree skip() 累计行数 | reader 跳过的行数(应与 skipped_rows 对应) | -| `ReaderSelectRows` | reader tree select() 累计 selected rows | select 路径的相对占比 | - -实现位置:`ParquetColumnReader` 基类,在 `read()`/`skip()`/`select()` 中更新。 - -### 3. Nested Assembler 级别 - -| 指标 | 说明 | 用途 | -|---|---|---| -| `NestedOverflowCount` | overflow 发生次数 | 评估 batch size 是否合适(overflow 过多说明 batch 太小) | -| `NestedOverflowTailRows` | overflow tail 累计 rows | 评估 overflow 携带的数据量 | -| `NestedLevelSlotsTotal` | 累计处理的 level slots | 基准 | - -实现位置:`list_column_reader.cpp`、`map_column_reader.cpp` 中,每次 move tail 到 overflow 时更新。 - -### 4. Adapter 级别(细分耗时) - -| 指标 | 说明 | 用途 | -|---|---|---| -| `ArrowReadTime` | `RecordReader::ReadRecords()` 总耗时 | 与 decode/materialization 解耦 | -| `ArrowDecodeTime` | page decode(解压+解码)总耗时 | 评估 page-level skip 的潜在收益 | -| `MaterializationTime` | `append_leaf_values()` 总耗时 | 值写入 Doris column 的开销 | - -当前 `ColumnReadTime` 是一个聚合指标,无法区分 ReadRecords 内部开销。需要在 `read_leaf_records()` 和 `append_leaf_values()` 前后加 SCOPED_TIMER。 - -### 5. Page-level Skip(P4 完成后) - -| 指标 | 说明 | 用途 | -|---|---|---| -| `PagesSkippedByFilter` | data_page_filter 跳过的 page 数 | 验证 page-level skip 实际命中率 | -| `PageSkipBytes` | 跳过的 compressed bytes 累计 | page-level skip 节省的 I/O 量 | - -## 实施步骤 - -### Step 1: 添加 Timer/Counter 声明 - -在 `parquet_reader.h` 的 `ParquetProfile` struct 中新增 counter 成员。 - -### Step 2: Scheduler 指标 - -`parquet_scan.cpp` 的 `read_current_row_group_batch()` 中更新。 - -### Step 3: Reader 指标 - -`ParquetColumnReader` 基类中新增 `_reader_read_rows` / `_reader_skip_rows` / `_reader_select_rows`,子类 read/skip/select 中更新。`ParquetReader` 在 close 前聚合所有 reader 的计数。 - -### Step 4: Nested 指标 - -`list_column_reader.cpp`、`map_column_reader.cpp` 的 overflow 路径中更新。 - -### Step 5: Adapter 指标 - -`arrow_leaf_reader_adapter.cpp` 的 `read_leaf_records()` 和 `append_leaf_values()` 中加 SCOPED_TIMER。 - -## 验证 - -1. 运行包含 filter + selection + complex types 的查询,确认所有新 counter 有合理值 -2. 空 selection 场景确认 `EmptySelectionBatches > 0` -3. Page-index pruning 场景确认 `SkippedRows` 与 page-index filter 跳过的行数一致 diff --git a/docs/page-level-skip-plan.md b/docs/page-level-skip-plan.md deleted file mode 100644 index 48a587dc619581..00000000000000 --- a/docs/page-level-skip-plan.md +++ /dev/null @@ -1,111 +0,0 @@ -# Page-level Skip 实现方案 - -## 背景 - -当前 page-index pruning 已能选出需要读的 page range,输出 row ranges。Scan scheduler 通过 `skip(row_count)` 跳过 range gap。但 `skip()` 底层是 Arrow `RecordReader::SkipRecords()`,它仍然解压和解码被跳过的 page(只跳过 value 写入),无法节省 page 级 I/O 和解压开销。 - -旧 reader 的 `VPageReader::skip_page_data()` 仅做 `_offset += compressed_page_size`,可以完全跳过 page 数据。 - -## Arrow 已有 API - -Arrow 的 `PageReader` 提供了 `set_data_page_filter()`,不需要自己实现 decoder: - -```cpp -// thirdparty/installed/include/parquet/column_reader.h:124-151 -class PageReader { - using DataPageFilter = std::function; - void set_data_page_filter(DataPageFilter data_page_filter); -}; - -struct DataPageStats { - const EncodedStatistics* encoded_statistics; // page header 中的 min/max/null_count - int32_t num_values; - std::optional num_rows; -}; -``` - -- `NextPage()` 在返回每个 page 前调用 callback。 -- callback 返回 `true` → Arrow 内部跳过该 page(纯 offset 前进,零解压)。 -- callback 返回 `false` → 正常读取并解压。 - -## DuckDB 参考 - -DuckDB 的 `PrepareRead()` → `PageIsFilteredOut()` → `trans.Skip(page_hdr.compressed_page_size)` 与 Arrow 的 `set_data_page_filter` 等价——都是先读 page header 再决定是否跳过 data。 - -## 当前 gap - -`get_record_reader()`(`column_reader.cpp:210-241`)调用 `_row_group->RecordReader(leaf_column_id)`。Arrow 的 `RowGroupReader::RecordReader()` 内部创建 `PageReader` 时未设置 filter: - -```cpp -_record_readers[leaf_column_id] = - _row_group->RecordReader(leaf_column_id, /*read_dictionary=*/false); -``` - -这导致 `SkipRecords()` 仍然解压所有 page。 - -## 方案 - -### Step 1: 改用 PageReader + ColumnReader 替代 RecordReader - -在 `get_record_reader()` 中: - -```cpp -// 之前 -_record_readers[leaf_column_id] = - _row_group->RecordReader(leaf_column_id, false); - -// 之后 -auto page_reader = _row_group->GetColumnPageReader(leaf_column_id); -page_reader->set_data_page_filter([&](const DataPageStats& stats) -> bool { - // 用已收集的 page-level 统计信息判断是否跳过 - return should_skip_page(stats); -}); -// 创建 ColumnReader 替代 RecordReader -auto column_reader = ColumnReader::Make(descriptor, std::move(page_reader), pool); -``` - -### Step 2: 实现 filter callback - -callback 接收 page header 中的 `EncodedStatistics`(min/max/null_count),用已有的 `ColumnPredicate` 判断该 page 不可能包含匹配数据时返回 `true`: - -```cpp -bool should_skip_page(const DataPageStats& stats, - const std::vector& predicates) { - if (stats.encoded_statistics == nullptr) return false; // 无统计信息,无法判断 - for (const auto& pred : predicates) { - if (!pred_can_skip(*stats.encoded_statistics, pred)) return false; - } - return true; -} -``` - -`pred_can_skip` 复用已有的 `ParquetStatisticsUtils::CheckStatistics()` 逻辑。 - -### Step 3: 适配 ScalarColumnReader - -`ScalarColumnReader` 当前持有 `RecordReader`,需要改为持有 `ColumnReader`(或一个统一的 adapter interface): - -- `ColumnReader::ReadRecords(batch_rows)` → 等价于 `RecordReader::ReadRecords()` -- `ColumnReader::SkipRecords(rows)` → 等价于 `RecordReader::SkipRecords()`,但被跳过的 page 不再解压 -- `ColumnReader::def_levels()` / `ColumnReader::rep_levels()` / `ColumnReader::values()` → 需要确认 `ColumnReader` 是否暴露这些接口。如果接口不同,需要加 adapter - -### Step 4: 保持 read_nested_leaf_batch 接口不变 - -`read_nested_leaf_batch()` 是 adapter 和 nested assembler 之间的桥梁。改成 `ColumnReader` 后,只要 adapter 内部完成适配,上层完全不受影响。 - -## 影响范围 - -| 文件 | 改动 | -|---|---| -| `reader/column_reader.cpp` | `get_record_reader()` 改用 `PageReader` + filter callback | -| `reader/scalar_column_reader.cpp` | `_record_reader` 改为 Arrow `ColumnReader` 或 adapter | -| `reader/arrow_leaf_reader_adapter.cpp` | `read_leaf_records` / `read_nested_leaf_batch` 适配新 API | -| `parquet_statistics.cpp` | 提取 `should_skip_page` 为可复用函数 | - -不需要改动:parquet_scan、list/map/struct reader、nested_column_reader。 - -## 验证 - -1. 构造包含多个 page 的 parquet 文件,其中部分 page 的 min/max 完全落在 filter 范围外 -2. 验证 `SELECT * FROM t WHERE col > 100` 跳过的 page 数量与 page-index pruning 一致 -3. Profile 确认 skipped page 的 compressed size 不产生 I/O