Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 85 additions & 23 deletions be/src/format/new_parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,35 +323,27 @@ Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block* file_block
Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* file_block,
SelectionVector* selection,
uint16_t* selected_rows) {
// Expression filters may reference several predicate columns. Execute them only after all
// Conjuncts may reference several predicate columns. Execute them only after all referenced
// predicate columns in the file-local block have been materialized.
for (const auto& expression_filter : _request->expression_filters) {
if (expression_filter.conjunct == nullptr) {
if (expression_filter.delete_conjunct == nullptr) {
continue;
}
} else {
if (*selected_rows == 0) {
break;
}
IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
bool can_filter_all = false;
RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
file_block, filter.data(), static_cast<size_t>(batch_rows), false,
&can_filter_all));
*selected_rows =
can_filter_all ? 0
: _apply_filter_to_selection(filter, selection, *selected_rows);
}
for (const auto& conjunct : _request->conjuncts) {
if (*selected_rows == 0) {
break;
}
if (expression_filter.delete_conjunct == nullptr) {
continue;
IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
bool can_filter_all = false;
RETURN_IF_ERROR(conjunct->execute_filter(file_block, filter.data(),
static_cast<size_t>(batch_rows), false,
&can_filter_all));
*selected_rows =
can_filter_all ? 0 : _apply_filter_to_selection(filter, selection, *selected_rows);
}
for (const auto& delete_conjunct : _request->delete_conjuncts) {
if (*selected_rows == 0) {
break;
}
int result_column_id = -1;
RETURN_IF_ERROR(expression_filter.delete_conjunct->root()->execute(
expression_filter.delete_conjunct.get(), file_block, &result_column_id));
RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(), file_block,
&result_column_id));
DORIS_CHECK(result_column_id >= 0 &&
result_column_id < static_cast<int>(file_block->columns()));
const auto& delete_filter = assert_cast<const ColumnUInt8&>(
Expand Down Expand Up @@ -744,6 +736,76 @@ Status ParquetReader::get_block(Block* file_block, size_t* rows, bool* eof) {
}
}

Status ParquetReader::get_aggregate_result(const reader::FileAggregateRequest& request,
reader::FileAggregateResult* result) {
DORIS_CHECK(result != nullptr);
if (_state == nullptr || _state->metadata == nullptr || _state->schema == nullptr) {
return Status::Uninitialized("ParquetReader is not open");
}
result->count = 0;
result->columns.clear();
if (request.agg_type != TPushAggOp::type::COUNT &&
request.agg_type != TPushAggOp::type::MINMAX) {
return Status::NotSupported("Unsupported parquet aggregate pushdown type {}",
request.agg_type);
}

// Aggregate row count in all selected row groups. For MIN/MAX aggregate, this is used to determine whether there is no row group selected.
for (const auto row_group_idx : _state->selected_row_groups) {
auto row_group_metadata = _state->metadata->RowGroup(row_group_idx);
DORIS_CHECK(row_group_metadata != nullptr);
result->count += row_group_metadata->num_rows();
}
if (request.agg_type == TPushAggOp::type::COUNT) {
return Status::OK();
}

result->columns.resize(request.columns.size());
for (size_t request_column_idx = 0; request_column_idx < request.columns.size();
++request_column_idx) {
const auto file_column_id = request.columns[request_column_idx].file_column_id;
if (file_column_id < 0 ||
file_column_id >= static_cast<int32_t>(_state->file_schema.size())) {
return Status::InvalidArgument("Invalid parquet aggregate column id {}",
file_column_id);
}
const auto& column_schema = _state->file_schema[file_column_id];
DORIS_CHECK(column_schema != nullptr);
// TODO: Support min/max pushdown for complex column by traversing down to the leaf column readers. This requires supporting complex column statistics in parquet file reader, which is currently not implemented in parquet-cpp.
if (column_schema->leaf_column_id < 0) {
return Status::NotSupported(
"Parquet aggregate pushdown only supports primitive column {}",
column_schema->name);
}

auto& aggregate_column = result->columns[request_column_idx];
for (const auto row_group_idx : _state->selected_row_groups) {
auto row_group_metadata = _state->metadata->RowGroup(row_group_idx);
DORIS_CHECK(row_group_metadata != nullptr);
auto column_chunk = row_group_metadata->ColumnChunk(column_schema->leaf_column_id);
DORIS_CHECK(column_chunk != nullptr);
const auto statistics = ParquetStatisticsUtils::TransformColumnStatistics(
*column_schema, column_chunk->statistics());
if (!statistics.has_min_max) {
return Status::NotSupported("Missing parquet min/max statistics for column {}",
column_schema->name);
}
if (!aggregate_column.has_min || statistics.min_value < aggregate_column.min_value) {
aggregate_column.min_value = statistics.min_value;
aggregate_column.has_min = true;
}
if (!aggregate_column.has_max || aggregate_column.max_value < statistics.max_value) {
aggregate_column.max_value = statistics.max_value;
aggregate_column.has_max = true;
}
}
if (!aggregate_column.has_min || !aggregate_column.has_max) {
return Status::NotSupported("No parquet row group selected for min/max pushdown");
}
}
return Status::OK();
}

Status ParquetReader::close() {
if (_state != nullptr) {
if (_state->file_reader != nullptr) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/format/new_parquet/parquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class ParquetReader : public reader::FileReader {
// 返回列必须保持 file-local 语义,不能在这里补 default/generated/partition 列。
Status get_block(Block* file_block, size_t* rows, bool* eof) override;

Status get_aggregate_result(const reader::FileAggregateRequest& request,
reader::FileAggregateResult* result) override;

Status close() override;

protected:
Expand Down
66 changes: 49 additions & 17 deletions be/src/format/reader/column_mapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ struct FileSlotRewriteInfo {
std::string file_column_name;
};

static VExprSPtr create_file_slot_ref(const VSlotRef& slot_ref,
const FileSlotRewriteInfo& rewrite_info) {
return TableSlotRef::create_shared(slot_ref.slot_id(),
cast_set<int>(rewrite_info.block_position), -1,
rewrite_info.file_type, rewrite_info.file_column_name);
}

static VExprSPtr rewrite_table_expr_to_file_expr(
const VExprSPtr& expr,
const std::map<int32_t, FileSlotRewriteInfo>& table_column_to_file_slot) {
Expand All @@ -54,9 +61,7 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
const auto rewrite_it = table_column_to_file_slot.find(slot_ref->slot_id());
if (rewrite_it != table_column_to_file_slot.end()) {
const auto& rewrite_info = rewrite_it->second;
auto file_slot = TableSlotRef::create_shared(
slot_ref->slot_id(), cast_set<int>(rewrite_info.block_position), -1,
rewrite_info.file_type, rewrite_info.file_column_name);
auto file_slot = create_file_slot_ref(*slot_ref, rewrite_info);
if (rewrite_info.file_type->equals(*rewrite_info.table_type)) {
return file_slot;
}
Expand All @@ -66,6 +71,27 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
}
return expr;
}
// rewrite_table_expr_to_file_expr localizes the expression tree in-place because VExpr does
// not provide a generic deep-clone API. A previous split may already have inserted Cast(slot)
// for the same table-level conjunct. Keep that rewrite idempotent: rewrite the cast child
// from table slot to the current split's file slot, and drop the cast when the current split
// no longer needs it.
if (dynamic_cast<const Cast*>(expr.get()) != nullptr && expr->get_num_children() == 1) {
const auto& child = expr->children()[0];
if (child->is_slot_ref()) {
const auto* slot_ref = assert_cast<const VSlotRef*>(child.get());
const auto rewrite_it = table_column_to_file_slot.find(slot_ref->slot_id());
if (rewrite_it != table_column_to_file_slot.end() &&
expr->data_type()->equals(*rewrite_it->second.table_type)) {
auto rewritten_child = create_file_slot_ref(*slot_ref, rewrite_it->second);
if (rewrite_it->second.file_type->equals(*rewrite_it->second.table_type)) {
return rewritten_child;
}
expr->set_children({std::move(rewritten_child)});
return expr;
}
}
}

// VExpr currently does not provide a generic deep-clone API for arbitrary expression types.
// Keep all slot-localization mutation inside ColumnMapper and rebuild it for every split
Expand All @@ -85,13 +111,28 @@ static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_update

static void add_scan_column(FileScanRequest* file_request, ColumnId file_column_id,
std::vector<ColumnId>* scan_columns) {
if (scan_columns == &file_request->non_predicate_columns &&
std::find(file_request->predicate_columns.begin(), file_request->predicate_columns.end(),
file_column_id) != file_request->predicate_columns.end()) {
return;
}
// column_positions is the global read-column index for this scan request, so it also
// deduplicates predicate_columns and non_predicate_columns across all filter/projection paths.
if (file_request->column_positions.count(file_column_id) == 0) {
const bool newly_added = file_request->column_positions.count(file_column_id) == 0;
if (newly_added) {
file_request->column_positions.emplace(file_column_id,
file_request->column_positions.size());
}
if (std::find(scan_columns->begin(), scan_columns->end(), file_column_id) ==
scan_columns->end()) {
scan_columns->push_back(file_column_id);
}
if (scan_columns == &file_request->predicate_columns) {
file_request->non_predicate_columns.erase(
std::remove(file_request->non_predicate_columns.begin(),
file_request->non_predicate_columns.end(), file_column_id),
file_request->non_predicate_columns.end());
}
}

static void rebuild_projection(ColumnMapping* mapping, size_t block_position) {
Expand Down Expand Up @@ -293,7 +334,8 @@ Status TableColumnMapper::create_scan_request(const std::vector<TableFilter>& ta
file_request->non_predicate_columns.clear();
file_request->column_positions.clear();
file_request->complex_projections.clear();
file_request->expression_filters.clear();
file_request->conjuncts.clear();
file_request->delete_conjuncts.clear();
file_request->column_predicate_filters.clear();
file_request->reader_expression_map.clear();
// 1. Build referenced non-predicate columns
Expand Down Expand Up @@ -379,19 +421,9 @@ Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table
continue;
}
if (table_filter.conjunct != nullptr) {
FileExpressionFilter expression_filter;
expression_filter.conjunct =
file_request->conjuncts.push_back(
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
table_filter.conjunct->root(), table_column_to_file_slot));
expression_filter.file_column_ids.reserve(table_filter.slot_ids.size());
for (const auto table_column_id : table_filter.slot_ids) {
const auto* mapping = _find_mapping(table_column_id);
if (mapping == nullptr || !mapping->file_column_id.has_value()) {
continue;
}
expression_filter.file_column_ids.push_back(*mapping->file_column_id);
}
file_request->expression_filters.push_back(std::move(expression_filter));
table_filter.conjunct->root(), table_column_to_file_slot)));
}
}
for (const auto& [table_column_id, predicates] : table_column_predicates) {
Expand Down
6 changes: 4 additions & 2 deletions be/src/format/reader/column_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class TableColumnMapper {

// 把 table-level scan 请求转换成 file-local scan 请求。
// table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的
// projected_file_columns、expression_filters、column_predicate_filters 和
// projected_file_columns、conjuncts、delete_conjuncts、column_predicate_filters 和
// reader_expression_map。
virtual Status create_scan_request(const std::vector<TableFilter>& table_filters,
const TableColumnPredicates& table_column_predicates,
Expand Down Expand Up @@ -149,7 +149,9 @@ class TableColumnMapper {
}

bool _is_same_type(const DataTypePtr& table_type, const DataTypePtr& file_type) const {
return table_type == file_type;
DORIS_CHECK(table_type != nullptr);
DORIS_CHECK(file_type != nullptr);
return table_type->equals(*file_type);
}

TableColumnMapperOptions _options;
Expand Down
43 changes: 33 additions & 10 deletions be/src/format/reader/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

#include "common/status.h"
#include "core/data_type/data_type.h"
#include "core/field.h"
#include "exprs/vexpr_fwd.h"
#include "gen_cpp/PlanNodes_types.h"
#include "io/file_factory.h"
#include "io/fs/file_reader_writer_fwd.h"

Expand Down Expand Up @@ -75,15 +77,6 @@ struct FieldProjection {
std::vector<FieldProjection> children;
};

// File-local expression filter. It may reference multiple predicate_columns, so FileReader should
// evaluate it after all referenced predicate columns have been materialized in the file-local block.
struct FileExpressionFilter {
VExprContextSPtr conjunct;
// DeletePredicate
VExprContextSPtr delete_conjunct;
std::vector<ColumnId> file_column_ids;
};

// File-local single-column predicates for file-layer pruning, such as min/max, page index,
// dictionary and bloom filter. Predicates must all belong to file_column_id.
struct FileColumnPredicateFilter {
Expand All @@ -108,12 +101,37 @@ struct FileScanRequest {
std::vector<ColumnId> non_predicate_columns;
std::map<ColumnId, size_t> column_positions; // file_column_id -> file-local block position
std::map<ColumnId, FieldProjection> complex_projections;
std::vector<FileExpressionFilter> expression_filters;
// Complex conjuncts converted to file-local predicates from table-level predicates.
VExprContextSPtrs conjuncts;
// Delete predicates converted to file-local predicates.
VExprContextSPtrs delete_conjuncts;
// Only simple predicates that can be directly evaluated on column, such as `a` > 1. Now we use it for zone-map filtering.
std::vector<FileColumnPredicateFilter> column_predicate_filters;
// fallback path if filters cannot be localized to file-local predicates. The expression can reference projected_file_columns and partition columns.
std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
};

struct FileAggregateRequest {
struct Column {
ColumnId file_column_id = -1;
};

TPushAggOp::type agg_type = TPushAggOp::type::NONE;
std::vector<Column> columns;
};

struct FileAggregateResult {
struct Column {
bool has_min = false;
bool has_max = false;
Field min_value;
Field max_value;
};

int64_t count = 0;
std::vector<Column> columns;
};

// 文件物理读取层通用接口。
// 该接口只描述 file-local schema、file-local scan request 和 file-local block。
// TableReader/IcebergTableReader 可以通过它组合不同文件格式 reader。
Expand Down Expand Up @@ -188,6 +206,11 @@ class FileReader {
return Status::OK();
}

virtual Status get_aggregate_result(const FileAggregateRequest& request,
FileAggregateResult* result) {
return Status::NotSupported("FileReader does not support aggregate pushdown");
}

// 关闭当前物理文件 reader 并释放文件层状态。
// 该方法不处理 table-level delete/finalize 状态,后者由 TableReader 子类管理。
virtual Status close() {
Expand Down
22 changes: 9 additions & 13 deletions be/src/format/reader/table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Status TableReader::init(TableReadOptions options) {
_io_ctx = options.io_ctx;
_runtime_state = options.runtime_state;
_scanner_profile = options.scanner_profile;
_push_down_agg_type = options.push_down_agg_type;
_projected_columns = std::move(options.projected_columns);
_system_properties = create_system_properties(_scan_params);
_profile = std::move(options.profile);
Expand All @@ -173,19 +174,13 @@ Status TableReader::_build_table_filters_from_conjuncts() {

Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request) {
RowDescriptor row_desc;
for (const auto& expression_filter : file_request.expression_filters) {
if (expression_filter.conjunct == nullptr) {
if (expression_filter.delete_conjunct == nullptr) {
continue;
}
} else {
RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state, row_desc));
RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state));
}
if (expression_filter.delete_conjunct != nullptr) {
RETURN_IF_ERROR(expression_filter.delete_conjunct->prepare(_runtime_state, row_desc));
RETURN_IF_ERROR(expression_filter.delete_conjunct->open(_runtime_state));
}
for (const auto& conjunct : file_request.conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(_runtime_state, row_desc));
RETURN_IF_ERROR(conjunct->open(_runtime_state));
}
for (const auto& delete_conjunct : file_request.delete_conjuncts) {
RETURN_IF_ERROR(delete_conjunct->prepare(_runtime_state, row_desc));
RETURN_IF_ERROR(delete_conjunct->open(_runtime_state));
}
return Status::OK();
}
Expand Down Expand Up @@ -235,6 +230,7 @@ Status TableReader::prepare_split(const SplitReadOptions& options) {
_current_task = std::make_unique<ScanTask>();
_current_task->data_file = create_file_description(options.current_range);
_delete_rows = nullptr;
_aggregate_pushdown_tried = false;
return _parse_delete_predicates(options);
}

Expand Down
Loading
Loading