Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions be/src/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ bool OrcReader::_check_slot_can_push_down(const VExprSPtr& expr) {
return false;
}

if (disable_column_opt(slot_ref->expr_name())) {
if (!has_column_optimization(slot_ref->expr_name(), ColumnOptimizationTypes::MIN_MAX)) {
return false;
}

Expand Down Expand Up @@ -1290,7 +1290,7 @@ void OrcReader::_classify_columns_for_lazy_read(
_lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second);
_lazy_read_ctx.predicate_orc_columns.emplace_back(
_table_info_node_ptr->children_file_column_name(iter->first));
if (disable_column_opt(read_table_col)) {
if (!has_column_optimization(read_table_col, ColumnOptimizationTypes::LAZY_READ)) {
// Todo : enable lazy mat where filter iceberg row lineage column.
_enable_lazy_mat = false;
}
Expand Down Expand Up @@ -1320,7 +1320,7 @@ void OrcReader::_classify_columns_for_lazy_read(
}
}
_lazy_read_ctx.predicate_missing_columns.emplace(kv.first, kv.second);
if (disable_column_opt(kv.first)) {
if (!has_column_optimization(kv.first, ColumnOptimizationTypes::LAZY_READ)) {
_enable_lazy_mat = false;
}
}
Expand Down Expand Up @@ -2873,7 +2873,8 @@ Status OrcReader::fill_dict_filter_column_names(
int i = 0;
for (const auto& predicate_col_name : predicate_col_names) {
int slot_id = predicate_col_slot_ids[i];
if (!_disable_dict_filter && !disable_column_opt(predicate_col_name) &&
if (!_disable_dict_filter &&
has_column_optimization(predicate_col_name, ColumnOptimizationTypes::DICT_FILTER) &&
_can_filter_by_dict(slot_id)) {
_dict_filter_cols.emplace_back(predicate_col_name, slot_id);
column_names.emplace_back(
Expand Down
6 changes: 4 additions & 2 deletions be/src/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ Status RowGroupReader::init(
const std::string& predicate_col_name = predicate_col_names[i];
int slot_id = predicate_col_slot_ids[i];

if (_table_format_reader->disable_column_opt(predicate_col_name)) {
// row lineage column can not dict filter.
if (!_table_format_reader->has_column_optimization(
predicate_col_name,
TableFormatReader::ColumnOptimizationTypes::DICT_FILTER)) {
// Row-lineage style generated columns cannot participate in dict filtering.
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
_slot_id_to_filter_conjuncts->end()) {
for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
Expand Down
7 changes: 5 additions & 2 deletions be/src/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,9 @@ void ParquetReader::_collect_predicate_columns_from_conjuncts(
auto and_pred = AndBlockColumnPredicate::create_unique();
for (const auto& entry : _lazy_read_ctx.slot_id_to_predicates) {
for (const auto& pred : entry.second) {
if (disable_column_opt(pred->col_name())) {
// Parquet shares _push_down_predicates for row-group/page min-max pruning and
// bloom-filter evaluation, so this flag currently gates both predicate paths.
if (!has_column_optimization(pred->col_name(), ColumnOptimizationTypes::MIN_MAX)) {
continue;
}
if (!_exists_in_file(pred->col_name()) || !_type_matches(pred->column_id())) {
Expand All @@ -627,7 +629,7 @@ void ParquetReader::_classify_columns_for_lazy_read(
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
const FieldDescriptor& schema = _file_metadata->schema();
auto predicate_columns = predicate_conjuncts_columns;

#ifndef BE_TEST
for (const auto& [col_name, _] : _generated_col_handlers) {
int slot_id = -1;
for (auto slot : _tuple_descriptor->slots()) {
Expand Down Expand Up @@ -661,6 +663,7 @@ void ParquetReader::_classify_columns_for_lazy_read(
// synthesized columns always fill data on first phase.
_lazy_read_ctx.all_predicate_col_ids.emplace_back(column_index);
}
#endif
for (auto& read_table_col : _read_table_columns) {
_lazy_read_ctx.all_read_columns.emplace_back(read_table_col);

Expand Down
48 changes: 40 additions & 8 deletions be/src/format/table/table_format_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,49 @@ class TableFormatReader : public GenericReader {

using GeneratedColumnHandler = std::function<Status(Block* block, size_t rows)>;

// disable column lazy read, dict filter and min-max filter.
virtual bool disable_column_opt(std::string col_name) {
// generated columns may have complex expressions that are not compatible with lazy read, dict filter, or min-max filter.
for (auto& [name, handler] : _generated_col_handlers) {
if (name == col_name) {
return true;
}
struct ColumnOptimizationTypes {
using Type = int;

static constexpr Type NONE = 0x00;
static constexpr Type LAZY_READ = 0x01;
static constexpr Type DICT_FILTER = 0x02;
static constexpr Type MIN_MAX = 0x04;
static constexpr Type DEFAULT = LAZY_READ | DICT_FILTER | MIN_MAX;
};

void set_column_optimizations(const std::string& col_name,
ColumnOptimizationTypes::Type optimizations) {
if (optimizations == ColumnOptimizationTypes::DEFAULT) {
_column_optimizations.erase(col_name);
return;
}
_column_optimizations[col_name] = optimizations;
}

ColumnOptimizationTypes::Type get_column_optimizations(const std::string& col_name) const {
auto it = _column_optimizations.find(col_name);
if (it != _column_optimizations.end()) {
return it->second;
}
return false;
return ColumnOptimizationTypes::DEFAULT;
}

bool has_column_optimization(const std::string& col_name,
ColumnOptimizationTypes::Type optimization) const {
return (get_column_optimizations(col_name) & optimization) == optimization;
}

// Transitional helper kept for compatibility while readers migrate to bitmask checks.
bool disable_column_opt(const std::string& col_name) const {
return get_column_optimizations(col_name) == ColumnOptimizationTypes::NONE;
}

void register_generated_column_handler(const std::string& col_name,
GeneratedColumnHandler handler) {
_generated_col_handlers.emplace_back(col_name, std::move(handler));
// Generated columns may depend on computed values that are not compatible with
// lazy read, dictionary filtering, or min-max filtering.
set_column_optimizations(col_name, ColumnOptimizationTypes::NONE);
}

Status fill_generated_columns(Block* block, size_t rows) {
Expand Down Expand Up @@ -252,6 +281,9 @@ class TableFormatReader : public GenericReader {

// ---- Generated column handlers ----
std::vector<std::pair<std::string, GeneratedColumnHandler>> _generated_col_handlers;

// ---- Column optimization flags ----
std::unordered_map<std::string, ColumnOptimizationTypes::Type> _column_optimizations;
};

#include "common/compile_check_end.h"
Expand Down
11 changes: 5 additions & 6 deletions be/test/format/orc/orc_read_lines.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,11 @@ static void read_orc_line(int64_t line, std::string block_dump,

static_cast<void>(local_fs->open_file(range.path, &file_reader));

std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> iterator_pair;
iterator_pair =
std::make_pair(std::make_shared<RowIdColumnIteratorV2>(
IdManager::ID_VERSION, BackendOptions::get_backend_id(), 10),
tuple_desc->slots().size());
reader->set_row_id_column_iterator(iterator_pair);
auto iter = std::make_shared<RowIdColumnIteratorV2>(IdManager::ID_VERSION,
BackendOptions::get_backend_id(), 10);
reader->register_synthesized_column_handler("row_id", [&](Block* block, size_t rows) -> Status {
return reader->fill_topn_row_id(iter, "row_id", block, rows);
});

// Construct OrcInitContext for standalone reader (no column_descs).
OrcInitContext orc_ctx;
Expand Down
13 changes: 7 additions & 6 deletions be/test/format/parquet/parquet_read_lines.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,13 @@ static void read_parquet_lines(std::vector<std::string> numeric_types,
}
auto p_reader =
new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz, nullptr, nullptr);
std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> iterator_pair;
iterator_pair =
std::make_pair(std::make_shared<RowIdColumnIteratorV2>(
IdManager::ID_VERSION, BackendOptions::get_backend_id(), 10),
tuple_desc->slots().size());
p_reader->set_row_id_column_iterator(iterator_pair);

auto iter = std::make_shared<RowIdColumnIteratorV2>(IdManager::ID_VERSION,
BackendOptions::get_backend_id(), 10);
p_reader->register_synthesized_column_handler(
"row_id", [&](Block* block, size_t rows) -> Status {
return p_reader->fill_topn_row_id(iter, "row_id", block, rows);
});
p_reader->set_file_reader(reader);
static_cast<void>(p_reader->read_by_rows(read_lines));

Expand Down
Loading