Skip to content
Open
32 changes: 32 additions & 0 deletions be/src/format/new_parquet/parquet_profile.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "runtime/runtime_profile.h"

namespace doris::parquet {

struct ParquetColumnReaderProfile {
RuntimeProfile::Counter* reader_read_rows = nullptr;
RuntimeProfile::Counter* reader_skip_rows = nullptr;
RuntimeProfile::Counter* reader_select_rows = nullptr;
RuntimeProfile::Counter* arrow_read_records_time = nullptr;
RuntimeProfile::Counter* materialization_time = nullptr;
};

} // namespace doris::parquet
54 changes: 54 additions & 0 deletions be/src/format/new_parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ ParquetReader::~ParquetReader() = default;

Status ParquetReader::init(RuntimeState* state) {
RETURN_IF_ERROR(reader::FileReader::init(state));
if (_profile != nullptr) {
COUNTER_UPDATE(_parquet_profile.file_reader_create_time,
_reader_statistics.file_reader_create_time);
COUNTER_UPDATE(_parquet_profile.open_file_num, _reader_statistics.open_file_num);
}
_state = std::make_unique<ParquetReaderScanState>();
_state->enable_bloom_filter =
state != nullptr && state->query_options().enable_parquet_filter_by_bloom_filter;
Expand Down Expand Up @@ -207,8 +212,33 @@ Status ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
COUNTER_UPDATE(_parquet_profile.page_index_read_calls, pruning_stats.page_index_read_calls);
COUNTER_UPDATE(_parquet_profile.bloom_filter_read_time,
pruning_stats.bloom_filter_read_time);
COUNTER_UPDATE(_parquet_profile.row_group_filter_time, pruning_stats.row_group_filter_time);
COUNTER_UPDATE(_parquet_profile.page_index_filter_time,
pruning_stats.page_index_filter_time);
COUNTER_UPDATE(_parquet_profile.read_page_index_time, pruning_stats.read_page_index_time);
}
_state->scan_plan = row_group_plan;
_state->scheduler.set_page_skip_profile(
{.skipped_pages = _parquet_profile.pages_skipped_by_data_page_filter,
.skipped_bytes = _parquet_profile.data_page_filter_skip_bytes});
_state->scheduler.set_scan_profile({
.raw_rows_read = _parquet_profile.raw_rows_read,
.selected_rows = _parquet_profile.selected_rows,
.rows_filtered_by_conjunct = _parquet_profile.rows_filtered_by_conjunct,
.total_batches = _parquet_profile.total_batches,
.empty_selection_batches = _parquet_profile.empty_selection_batches,
.range_gap_skipped_rows = _parquet_profile.range_gap_skipped_rows,
.column_read_time = _parquet_profile.column_read_time,
.predicate_filter_time = _parquet_profile.predicate_filter_time,
.column_reader_profile =
{
.reader_read_rows = _parquet_profile.reader_read_rows,
.reader_skip_rows = _parquet_profile.reader_skip_rows,
.reader_select_rows = _parquet_profile.reader_select_rows,
.arrow_read_records_time = _parquet_profile.arrow_read_records_time,
.materialization_time = _parquet_profile.materialization_time,
},
});
_state->scheduler.set_plan(std::move(row_group_plan));
_eof = _state->scheduler.empty();
return Status::OK();
Expand Down Expand Up @@ -334,6 +364,30 @@ void ParquetReader::_init_profile() {
_profile, "FilteredRowsByGroup", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.filtered_page_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "FilteredRowsByPage", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.pages_skipped_by_data_page_filter = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "PagesSkippedByDataPageFilter", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.data_page_filter_skip_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "DataPageFilterSkipBytes", TUnit::BYTES, parquet_profile, 1);
_parquet_profile.selected_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "SelectedRows", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.rows_filtered_by_conjunct = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "RowsFilteredByConjunct", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.total_batches = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "TotalBatches", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.empty_selection_batches = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "EmptySelectionBatches", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.range_gap_skipped_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "RangeGapSkippedRows", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.reader_read_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "ReaderReadRows", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.reader_skip_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "ReaderSkipRows", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.reader_select_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "ReaderSelectRows", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.arrow_read_records_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ArrowReadRecordsTime", parquet_profile, 1);
_parquet_profile.materialization_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "MaterializationTime", parquet_profile, 1);
_parquet_profile.lazy_read_filtered_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "FilteredRowsByLazyRead", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.filtered_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(
Expand Down
12 changes: 12 additions & 0 deletions be/src/format/new_parquet/parquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ class ParquetReader : public reader::FileReader {
RuntimeProfile::Counter* selected_row_ranges = nullptr;
RuntimeProfile::Counter* filtered_group_rows = nullptr;
RuntimeProfile::Counter* filtered_page_rows = nullptr;
RuntimeProfile::Counter* pages_skipped_by_data_page_filter = nullptr;
RuntimeProfile::Counter* data_page_filter_skip_bytes = nullptr;
RuntimeProfile::Counter* selected_rows = nullptr;
RuntimeProfile::Counter* rows_filtered_by_conjunct = nullptr;
RuntimeProfile::Counter* total_batches = nullptr;
RuntimeProfile::Counter* empty_selection_batches = nullptr;
RuntimeProfile::Counter* range_gap_skipped_rows = nullptr;
RuntimeProfile::Counter* reader_read_rows = nullptr;
RuntimeProfile::Counter* reader_skip_rows = nullptr;
RuntimeProfile::Counter* reader_select_rows = nullptr;
RuntimeProfile::Counter* arrow_read_records_time = nullptr;
RuntimeProfile::Counter* materialization_time = nullptr;
RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
RuntimeProfile::Counter* filtered_bytes = nullptr;
RuntimeProfile::Counter* raw_rows_read = nullptr;
Expand Down
96 changes: 65 additions & 31 deletions be/src/format/new_parquet/parquet_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ Status plan_parquet_row_groups(const ::parquet::FileMetaData& metadata,
row_group_plan.row_group_rows = row_group_rows;
RETURN_IF_ERROR(select_row_group_ranges_by_page_index(
file_reader, file_schema, request, row_group_idx, row_group_rows,
&row_group_plan.selected_ranges, &plan->pruning_stats));
&row_group_plan.selected_ranges, &row_group_plan.page_skip_plans,
&plan->pruning_stats));
if (row_group_plan.selected_ranges.empty()) {
continue;
}
Expand Down Expand Up @@ -283,8 +284,10 @@ Status ParquetScanScheduler::open_next_row_group(
_current_predicate_columns.clear();
_current_non_predicate_columns.clear();

ParquetColumnReaderFactory column_reader_factory(_current_row_group,
file_context.schema->num_columns());
ParquetColumnReaderFactory column_reader_factory(
_current_row_group, file_context.schema->num_columns(),
&row_group_plan.page_skip_plans, _page_skip_profile,
_scan_profile.column_reader_profile);
for (const auto& col : request.predicate_columns) {
const auto local_id = col.field_id();
if (local_id == ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID) {
Expand Down Expand Up @@ -327,6 +330,9 @@ Status ParquetScanScheduler::skip_current_row_group_rows(int64_t rows) {
if (rows == 0) {
return Status::OK();
}
if (_scan_profile.range_gap_skipped_rows != nullptr) {
COUNTER_UPDATE(_scan_profile.range_gap_skipped_rows, rows);
}
for (const auto& column_reader : _current_predicate_columns | std::views::values) {
RETURN_IF_ERROR(column_reader->skip(rows));
}
Expand Down Expand Up @@ -356,21 +362,37 @@ Status ParquetScanScheduler::read_filter_columns(int64_t batch_rows,
<< " " << type_to_string(column_reader->type()->get_primitive_type()) << " "
<< column_reader->name() << " " << fid << " " << block_position;
int64_t column_rows = 0;
RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows));
{
SCOPED_TIMER(_scan_profile.column_read_time);
RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows));
}
if (column_rows != batch_rows) {
return Status::Corruption("Parquet filter column {} returned {} rows, expected {} rows",
column_reader->name(), column_rows, batch_rows);
}
file_block->replace_by_position(block_position, std::move(column));
}
if (_scan_profile.predicate_filter_time == nullptr) {
return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows);
}
SCOPED_TIMER(_scan_profile.predicate_filter_time);
return execute_batch_filters(request, batch_rows, file_block, selection, selected_rows);
}

Status ParquetScanScheduler::read_current_row_group_batch(int64_t batch_rows,
const reader::FileScanRequest& request,
Block* file_block, size_t* rows) {
if (_scan_profile.total_batches != nullptr) {
COUNTER_UPDATE(_scan_profile.total_batches, 1);
}
if (_scan_profile.raw_rows_read != nullptr) {
COUNTER_UPDATE(_scan_profile.raw_rows_read, batch_rows);
}
if (_current_predicate_columns.empty() && _current_non_predicate_columns.empty()) {
*rows = static_cast<size_t>(batch_rows);
if (_scan_profile.selected_rows != nullptr) {
COUNTER_UPDATE(_scan_profile.selected_rows, batch_rows);
}
return Status::OK();
}
SelectionVector selection;
Expand All @@ -380,6 +402,15 @@ Status ParquetScanScheduler::read_current_row_group_batch(int64_t batch_rows,
read_filter_columns(batch_rows, request, file_block, &selection, &selected_rows));

const bool need_filter_output = selected_rows != batch_rows;
if (_scan_profile.selected_rows != nullptr) {
COUNTER_UPDATE(_scan_profile.selected_rows, selected_rows);
}
if (_scan_profile.rows_filtered_by_conjunct != nullptr) {
COUNTER_UPDATE(_scan_profile.rows_filtered_by_conjunct, batch_rows - selected_rows);
}
if (selected_rows == 0 && _scan_profile.empty_selection_batches != nullptr) {
COUNTER_UPDATE(_scan_profile.empty_selection_batches, 1);
}
if (need_filter_output) {
IColumn::Filter output_filter = selection_to_filter(selection, selected_rows, batch_rows);
for (const auto& col : request.predicate_columns) {
Expand All @@ -392,33 +423,36 @@ Status ParquetScanScheduler::read_current_row_group_batch(int64_t batch_rows,
}
}

for (const auto& [fid, column_reader] : _current_non_predicate_columns) {
auto position_it = request.local_positions.find(reader::LocalColumnId(fid));
DORIS_CHECK(position_it != request.local_positions.end());
const auto block_position = position_it->second.value();
auto column_guard = file_block->mutate_column_scoped(block_position);
auto& col = column_guard.mutable_column();
DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(),
column_reader->type()->get_primitive_type())
<< type_to_string(
file_block->get_by_position(block_position).type->get_primitive_type())
<< " " << type_to_string(column_reader->type()->get_primitive_type()) << " "
<< column_reader->name() << " " << fid << " " << block_position;
if (need_filter_output) {
[[maybe_unused]] auto old_size = col->size();
RETURN_IF_ERROR(column_reader->select(selection, selected_rows, batch_rows, col));
if (col->size() != old_size + selected_rows) {
return Status::Corruption(
"Parquet selected output column {} returned {} rows, expected {} rows",
column_reader->name(), col->size(), old_size + selected_rows);
}
} else {
int64_t column_rows = 0;
RETURN_IF_ERROR(column_reader->read(batch_rows, col, &column_rows));
if (column_rows != batch_rows) {
return Status::Corruption(
"Parquet output column {} returned {} rows, expected {} rows",
column_reader->name(), column_rows, batch_rows);
{
SCOPED_TIMER(_scan_profile.column_read_time);
for (const auto& [fid, column_reader] : _current_non_predicate_columns) {
auto position_it = request.local_positions.find(reader::LocalColumnId(fid));
DORIS_CHECK(position_it != request.local_positions.end());
const auto block_position = position_it->second.value();
auto column_guard = file_block->mutate_column_scoped(block_position);
auto& col = column_guard.mutable_column();
DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(),
column_reader->type()->get_primitive_type())
<< type_to_string(file_block->get_by_position(block_position)
.type->get_primitive_type())
<< " " << type_to_string(column_reader->type()->get_primitive_type()) << " "
<< column_reader->name() << " " << fid << " " << block_position;
if (need_filter_output) {
[[maybe_unused]] auto old_size = col->size();
RETURN_IF_ERROR(column_reader->select(selection, selected_rows, batch_rows, col));
if (col->size() != old_size + selected_rows) {
return Status::Corruption(
"Parquet selected output column {} returned {} rows, expected {} rows",
column_reader->name(), col->size(), old_size + selected_rows);
}
} else {
int64_t column_rows = 0;
RETURN_IF_ERROR(column_reader->read(batch_rows, col, &column_rows));
if (column_rows != batch_rows) {
return Status::Corruption(
"Parquet output column {} returned {} rows, expected {} rows",
column_reader->name(), column_rows, batch_rows);
}
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions be/src/format/new_parquet/parquet_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@

#include <cstddef>
#include <cstdint>
#include <map>
#include <memory>
#include <vector>

#include "common/status.h"
#include "core/column/column.h"
#include "format/new_parquet/parquet_profile.h"
#include "format/new_parquet/parquet_statistics.h"
#include "format/new_parquet/reader/column_reader.h"
#include "format/new_parquet/selection_vector.h"
#include "format/reader/file_reader.h"
#include "runtime/runtime_profile.h"

namespace parquet {
class FileMetaData;
Expand Down Expand Up @@ -59,13 +62,26 @@ struct RowGroupReadPlan {
int64_t first_file_row = 0;
int64_t row_group_rows = 0;
std::vector<RowRange> selected_ranges;
std::map<int, ParquetPageSkipPlan> page_skip_plans;
};

struct RowGroupScanPlan {
std::vector<RowGroupReadPlan> row_groups;
ParquetPruningStats pruning_stats;
};

struct ParquetScanProfile {
RuntimeProfile::Counter* raw_rows_read = nullptr;
RuntimeProfile::Counter* selected_rows = nullptr;
RuntimeProfile::Counter* rows_filtered_by_conjunct = nullptr;
RuntimeProfile::Counter* total_batches = nullptr;
RuntimeProfile::Counter* empty_selection_batches = nullptr;
RuntimeProfile::Counter* range_gap_skipped_rows = nullptr;
RuntimeProfile::Counter* column_read_time = nullptr;
RuntimeProfile::Counter* predicate_filter_time = nullptr;
ParquetColumnReaderProfile column_reader_profile;
};

Status plan_parquet_row_groups(const ::parquet::FileMetaData& metadata,
::parquet::ParquetFileReader* file_reader,
const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
Expand All @@ -83,6 +99,10 @@ Status execute_batch_filters(const reader::FileScanRequest& request, int64_t bat
class ParquetScanScheduler {
public:
void set_plan(RowGroupScanPlan plan);
void set_page_skip_profile(ParquetPageSkipProfile page_skip_profile) {
_page_skip_profile = page_skip_profile;
}
void set_scan_profile(ParquetScanProfile scan_profile) { _scan_profile = scan_profile; }
void reset();
bool empty() const { return _row_group_plans.empty(); }

Expand Down Expand Up @@ -118,6 +138,8 @@ class ParquetScanScheduler {
std::vector<RowRange> _current_selected_ranges;
size_t _current_range_idx = 0;
int64_t _current_range_rows_read = 0;
ParquetPageSkipProfile _page_skip_profile;
ParquetScanProfile _scan_profile;
};

} // namespace doris::parquet
Loading
Loading