Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/exprs/vslot_ref.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class VSlotRef : public VExpr {

protected:
VSlotRef(int slot_id, int column_id, int column_uniq_id)
: _slot_id(slot_id), _column_id(column_id), _column_uniq_id(column_uniq_id) {}
: _slot_id(slot_id), _column_id(column_id), _column_uniq_id(column_uniq_id) {
set_node_type(TExprNodeType::SLOT_REF);
}

private:
int _slot_id;
Expand Down
74 changes: 67 additions & 7 deletions be/src/format/reader/column_mapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,43 @@
#include <vector>

#include "common/status.h"
#include "core/assert_cast.h"
#include "format/reader/expr/cast.h"
#include "format/reader/expr/slot_ref.h"
#include "format/reader/file_reader.h"
#include "format/reader/table_reader.h"

namespace doris::reader {

static VExprSPtr rewrite_table_expr_to_file_expr(
const VExprSPtr& expr, const std::map<int32_t, size_t>& table_column_to_file_position) {
if (expr == nullptr) {
return nullptr;
}
if (expr->is_slot_ref()) {
const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
const auto position_it = table_column_to_file_position.find(slot_ref->slot_id());
if (position_it != table_column_to_file_position.end()) {
return TableSlotRef::create_shared(slot_ref->slot_id(),
cast_set<int>(position_it->second), -1,
slot_ref->data_type(), slot_ref->expr_name());
}
return expr;
}

// VExpr currently does not provide a generic deep-clone API for arbitrary expression types.
// Keep all slot-localization mutation inside ColumnMapper and rebuild it for every split
// before the localized expression is prepared/opened by TableReader.
VExprSPtrs rewritten_children;
rewritten_children.reserve(expr->children().size());
for (const auto& child : expr->children()) {
rewritten_children.push_back(
rewrite_table_expr_to_file_expr(child, table_column_to_file_position));
}
expr->set_children(std::move(rewritten_children));
return expr;
}

static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_updated_sequence_number";

Expand Down Expand Up @@ -56,6 +86,21 @@ static void rebuild_projection(ColumnMapping* mapping, size_t block_position) {
mapping->projection = VExprContext::create_shared(expr);
}

static std::map<int32_t, size_t> build_file_position_map(
const std::vector<ColumnMapping>& mappings, const FileScanRequest& file_request) {
std::map<int32_t, size_t> table_column_to_file_position;
for (const auto& mapping : mappings) {
if (!mapping.file_column_id.has_value()) {
continue;
}
const auto position_it = file_request.column_positions.find(*mapping.file_column_id);
if (position_it != file_request.column_positions.end()) {
table_column_to_file_position.emplace(mapping.table_column_id, position_it->second);
}
}
return table_column_to_file_position;
}

Status TableColumnMapper::create_mapping(const std::vector<TableColumn>& projected_columns,
const std::map<std::string, Field>& partition_values,
const std::vector<SchemaField>& file_schema) {
Expand Down Expand Up @@ -102,7 +147,8 @@ Status TableColumnMapper::create_mapping(const std::vector<TableColumn>& project
Status TableColumnMapper::create_scan_request(const std::map<int32_t, TableFilter>& table_filters,
const std::vector<TableColumn>& projected_columns,
FileScanRequest* file_request) {
// 真实实现会把 table projection/filter 转换成 file-local projection/filter。
// FileReader evaluates expressions against a file-local block. This mapper owns the
// table-column to file-column conversion, so it also owns the file-local block positions.
file_request->predicate_columns.clear();
file_request->non_predicate_columns.clear();
file_request->column_positions.clear();
Expand Down Expand Up @@ -141,15 +187,29 @@ Status TableColumnMapper::localize_filters(const std::map<int32_t, TableFilter>&
if (!it.second.can_be_localized()) {
// TODO: Rewrite table filter to reader_expression_map
// file_request->reader_expression_map.emplace_back(mapping->table_column_id, it.second.conjunct);
} else {
FileLocalFilter local_filter;
local_filter.file_column_id = *mapping->file_column_id;
local_filter.conjunct = it.second.conjunct;
local_filter.predicates = it.second.predicates;
file_request->local_filters.push_back(std::move(local_filter));
continue;
}
add_scan_column(file_request, *mapping->file_column_id, &file_request->predicate_columns);
}

// Build the complete table-slot to file-block position map after all predicate columns have
// been assigned. This keeps expression localization independent from filter iteration order.
const auto table_column_to_file_position = build_file_position_map(_mappings, *file_request);
for (const auto& it : table_filters) {
const auto* mapping = _find_mapping(it.first);
if (mapping == nullptr || !mapping->file_column_id.has_value() ||
!it.second.can_be_localized()) {
continue;
}
FileLocalFilter local_filter;
local_filter.file_column_id = *mapping->file_column_id;
if (it.second.conjunct != nullptr) {
local_filter.conjunct = VExprContext::create_shared(rewrite_table_expr_to_file_expr(
it.second.conjunct->root(), table_column_to_file_position));
}
local_filter.predicates = it.second.predicates;
file_request->local_filters.push_back(std::move(local_filter));
}
return Status::OK();
}

Expand Down
61 changes: 59 additions & 2 deletions be/src/format/reader/table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,54 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>

#include <set>
#include <vector>

#include "common/status.h"
#include "core/assert_cast.h"
#include "exprs/vslot_ref.h"
#include "format/new_parquet/parquet_reader.h"
#include "format/reader/column_mapper.h"
#include "format/table/deletion_vector_reader.h"
#include "io/io_common.h"

namespace doris::reader {
namespace {

void collect_table_slot_ids(const VExprSPtr& expr, std::set<int>* slot_ids) {
if (expr == nullptr) {
return;
}
if (expr->is_slot_ref()) {
const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
slot_ids->insert(slot_ref->slot_id());
}
for (const auto& child : expr->children()) {
collect_table_slot_ids(child, slot_ids);
}
}

void build_table_filters_from_conjunct(const VExprSPtr& conjunct,
std::map<int32_t, TableFilter>* table_filters) {
if (conjunct == nullptr) {
return;
}
std::set<int> slot_ids;
collect_table_slot_ids(conjunct, &slot_ids);
if (slot_ids.size() == 1) {
(*table_filters)[*slot_ids.begin()].conjunct = VExprContext::create_shared(conjunct);
return;
}
if (conjunct->node_type() == TExprNodeType::COMPOUND_PRED &&
conjunct->op() == TExprOpcode::COMPOUND_AND) {
for (const auto& child : conjunct->children()) {
build_table_filters_from_conjunct(child, table_filters);
}
return;
}
}

} // namespace

std::shared_ptr<io::FileSystemProperties> create_system_properties(
const TFileScanRangeParams* scan_params) {
Expand Down Expand Up @@ -58,9 +97,27 @@ Status TableReader::init(TableReadOptions options) {
_profile = std::move(options.profile);
TableColumnMapperOptions mapper_options;
mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
mapper_options.allow_missing_columns = options.allow_missing_columns;
_data_reader.column_mapper = TableColumnMapper(mapper_options);
// TODO:
// _table_filters = build_table_filters_from_conjuncts(options.conjuncts);
_conjuncts = std::move(options.conjuncts);
return Status::OK();
}

Status TableReader::_build_table_filters_from_conjuncts() {
_table_filters.clear();
build_table_filters_from_conjunct(_conjuncts.root(), &_table_filters);
return Status::OK();
}

Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request) {
RowDescriptor row_desc;
for (const auto& local_filter : file_request.local_filters) {
if (local_filter.conjunct == nullptr) {
continue;
}
RETURN_IF_ERROR(local_filter.conjunct->prepare(_runtime_state, row_desc));
RETURN_IF_ERROR(local_filter.conjunct->open(_runtime_state));
}
return Status::OK();
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/format/reader/table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ struct TableReadOptions {
std::shared_ptr<io::IOContext> io_ctx;
RuntimeState* runtime_state;
RuntimeProfile* scanner_profile;
const bool allow_missing_columns = true;

std::unique_ptr<ReadProfile> profile;
};
Expand Down Expand Up @@ -219,10 +220,12 @@ class TableReader {
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_projected_columns,
_partition_values, file_schema));
DORIS_CHECK(_data_reader.column_mapper.mappings().size() == _projected_columns.size());
RETURN_IF_ERROR(_build_table_filters_from_conjuncts());

auto file_request = std::make_unique<FileScanRequest>();
RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
_table_filters, _projected_columns, file_request.get()));
RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
_data_reader.scan_schema.clear();
_data_reader.block_template.clear();
_data_reader.scan_schema.resize(file_request->column_positions.size());
Expand All @@ -242,12 +245,16 @@ class TableReader {
return Status::OK();
}

Status _build_table_filters_from_conjuncts();
Status _open_local_filter_exprs(const FileScanRequest& file_request);

// 关闭当前具体 reader。
// 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。
virtual Status close_current_reader() {
RETURN_IF_ERROR(_data_reader.reader->close());
_data_reader.reader.reset();
_data_reader.column_mapper.clear();
_table_filters.clear();
_data_reader.block_schema.clear();
_data_reader.scan_schema.clear();
_data_reader.block_template.clear();
Expand Down Expand Up @@ -314,6 +321,7 @@ class TableReader {
// partition key -> value
std::map<std::string, Field> _partition_values;
std::map<int32_t, TableFilter> _table_filters;
VExprContext _conjuncts {nullptr};
std::unique_ptr<ReadProfile> _profile;
// Parsed from DELETION_VECTOR in Iceberg and Paimon
DeleteRows* _delete_rows;
Expand Down
Loading
Loading