Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 60 additions & 51 deletions be/src/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <unordered_map>
#include <utility>

#include "common/cast_set.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/consts.h"
Expand Down Expand Up @@ -76,6 +77,7 @@
#include "format/table/paimon_jni_reader.h"
#include "format/table/paimon_predicate_converter.h"
#include "format/table/paimon_reader.h"
#include "format/table/partition_column_filler.h"
#include "format/table/remote_doris_reader.h"
#include "format/table/transactional_hive_reader.h"
#include "format/table/trino_connector_jni_reader.h"
Expand Down Expand Up @@ -342,6 +344,11 @@ bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
});
}

bool FileScanner::_contains_runtime_filter(const VExprContextSPtrs& conjuncts) const {
return std::ranges::any_of(
conjuncts, [](const auto& conjunct) { return conjunct->root()->is_rf_wrapper(); });
}

void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
_runtime_filter_partition_prune_ctxs.clear();
for (auto& conjunct : _conjuncts) {
Expand Down Expand Up @@ -375,33 +382,12 @@ Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_al
for (auto const& partition_col_desc : _partition_col_descs) {
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
auto data_type = partition_slot_desc->get_data_type_ptr();
auto test_serde = data_type->get_serde();
auto partition_value_column = data_type->create_column();
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
Slice slice(partition_value.data(), partition_value.size());
uint64_t num_deserialized = 0;
DataTypeSerDe::FormatOptions options {};
if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
// for iceberg/paimon table
// NOTICE: column is always be nullable for iceberg/paimon table now
DCHECK(data_type->is_nullable());
test_serde = test_serde->get_nested_serdes()[0];
auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
if (_partition_value_is_null[partition_slot_desc->col_name()]) {
null_column->insert_many_defaults(partition_value_column_size);
} else {
// If the partition value is not null, we set null map to 0 and deserialize it normally.
null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
null_column->get_nested_column(), slice, partition_value_column_size,
&num_deserialized, options));
}
} else {
// for hive/hudi table, the null value is set as "\\N"
// TODO: this will be unified as iceberg/paimon table in the future
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, options));
}
auto null_it = _partition_value_is_null.find(partition_slot_desc->col_name());
DORIS_CHECK(null_it != _partition_value_is_null.end());
RETURN_IF_ERROR(fill_partition_column_from_path_value(
*partition_value_column, *partition_slot_desc, partition_value,
partition_value_column_size, null_it->second));

partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
}
Expand All @@ -413,20 +399,9 @@ Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_al
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (partition_slot_id_to_column.find(slot_desc->id()) !=
partition_slot_id_to_column.end()) {
auto data_type = slot_desc->get_data_type_ptr();
auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
if (data_type->is_nullable()) {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(
ColumnNullable::create(
std::move(partition_value_column),
ColumnUInt8::create(partition_value_column_size, 0)),
data_type, slot_desc->col_name()));
} else {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
slot_desc->col_name()));
}
_runtime_filter_partition_prune_block.replace_by_position(
index, std::move(partition_value_column));
if (index == 0) {
first_column_filled = true;
}
Expand Down Expand Up @@ -1689,6 +1664,10 @@ Status FileScanner::_generate_partition_columns() {
if (!range.__isset.columns_from_path_keys) {
return Status::OK();
}
DORIS_CHECK(range.__isset.columns_from_path);
DORIS_CHECK(range.__isset.columns_from_path_is_null);
DORIS_CHECK(range.columns_from_path.size() == range.columns_from_path_keys.size());
DORIS_CHECK(range.columns_from_path_is_null.size() == range.columns_from_path_keys.size());

std::unordered_map<std::string, int> partition_name_to_key_index;
int index = 0;
Expand All @@ -1703,16 +1682,12 @@ Status FileScanner::_generate_partition_columns() {
}
auto pit = partition_name_to_key_index.find(col_desc.name);
if (pit != partition_name_to_key_index.end()) {
int values_index = pit->second;
if (range.__isset.columns_from_path && values_index < range.columns_from_path.size()) {
_partition_col_descs.emplace(
col_desc.name,
std::make_tuple(range.columns_from_path[values_index], col_desc.slot_desc));
if (range.__isset.columns_from_path_is_null) {
_partition_value_is_null.emplace(col_desc.name,
range.columns_from_path_is_null[values_index]);
}
}
auto values_index = cast_set<size_t>(pit->second);
_partition_col_descs.emplace(
col_desc.name,
std::make_tuple(range.columns_from_path[values_index], col_desc.slot_desc));
_partition_value_is_null.emplace(col_desc.name,
range.columns_from_path_is_null[values_index]);
}
}
return Status::OK();
Expand Down Expand Up @@ -1898,8 +1873,42 @@ Status FileScanner::_init_expr_ctxes() {

bool FileScanner::_should_enable_condition_cache() {
DCHECK(_should_enable_condition_cache_handler != nullptr);
return _condition_cache_digest != 0 && (this->*_should_enable_condition_cache_handler)() &&
(!_conjuncts.empty() || !_push_down_conjuncts.empty());
if (_condition_cache_digest == 0 || !(this->*_should_enable_condition_cache_handler)()) {
return false;
}

// Condition cache starts as all-false and is turned true only by native readers when a
// row-level predicate leaves at least one row in the granule. COUNT pushdown may replace the
// native reader with CountReader, which only emits row counts and never runs that marking path.
if (_get_push_down_agg_type() == TPushAggOp::type::COUNT) {
return false;
}

// The cache is populated by native readers while evaluating pushed-down predicates.
// Scanner-only predicates cannot mark reader granules, so there is nothing useful to cache.
if (_push_down_conjuncts.empty()) {
return false;
}

// Runtime filters are query-local dynamic predicates. Some ready RF implementations can hash
// their payload into get_digest(), but FileScanner cannot rely on that for all RFs reaching the
// native reader. In particular, ScanLocalState computes _condition_cache_digest during open(),
// while FileScanner may append late-arrival RFs in _process_late_arrival_conjuncts()
// immediately before initializing Parquet/ORC readers.
//
// Reading a weaker cache entry would be safe by itself: if a cached bitmap only represented
// static predicate P, false granules for P are also false for P AND RF. The unsafe part is
// writing. On cache miss, native readers mark survivor granules using all pushed-down
// predicates, including late RFs. Without a read-only cache mode, this would insert a bitmap for
// P AND RF under a digest that only represents P.
//
// Example:
// Q1 static predicate: k = 1, late RF payload: partition_key IN ('2024-02-01')
// Q2 static predicate: k = 1, late RF payload: partition_key IN ('2024-03-01')
// If both scans share the same file/range/digest, reusing Q1's bitmap for Q2 can skip row
// ranges according to the wrong RF payload. Keep RF predicate pushdown enabled for reader-side
// filtering, but do not persist its result in condition cache.
return !_contains_runtime_filter(_conjuncts) && !_contains_runtime_filter(_push_down_conjuncts);
}

bool FileScanner::_should_enable_condition_cache_for_load() const {
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/scan/file_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class FileScanner : public Scanner {
Status _generate_partition_columns();

bool _check_partition_prune_expr(const VExprSPtr& expr);
bool _contains_runtime_filter(const VExprContextSPtrs& conjuncts) const;
void _init_runtime_filter_partition_prune_ctxs();
void _init_runtime_filter_partition_prune_block();
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
Expand Down
2 changes: 1 addition & 1 deletion be/src/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ struct ReaderInitContext {
virtual ~ReaderInitContext() = default;

// ---- Owned by FileScanner, shared by all readers ----
std::vector<ColumnDescriptor>* column_descs = nullptr;
const std::vector<ColumnDescriptor>* column_descs = nullptr;
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx = nullptr;
RuntimeState* state = nullptr;
const TupleDescriptor* tuple_descriptor = nullptr;
Expand Down
81 changes: 81 additions & 0 deletions be/src/format/jni/jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
#include <map>
#include <ostream>
#include <sstream>
#include <tuple>
#include <unordered_map>
#include <utility>

#include "core/block/block.h"
#include "core/types.h"
#include "format/jni/jni_data_bridge.h"
#include "format/table/partition_column_filler.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "util/jni-util.h"
Expand Down Expand Up @@ -67,6 +71,49 @@ JniReader::JniReader(std::string connector_class, std::map<std::string, std::str
_connector_name = split(_connector_class, "/").back();
}

Status JniReader::on_before_init_reader(ReaderInitContext* ctx) {
_column_descs = ctx->column_descs;
if (_col_name_to_block_idx == nullptr) {
_col_name_to_block_idx = ctx->col_name_to_block_idx;
}
_partition_values.clear();
_partition_value_is_null.clear();
if (ctx->range == nullptr || ctx->tuple_descriptor == nullptr ||
!ctx->range->__isset.columns_from_path_keys) {
return Status::OK();
}

DORIS_CHECK(ctx->range->__isset.columns_from_path);
DORIS_CHECK(ctx->range->__isset.columns_from_path_is_null);
DORIS_CHECK(ctx->range->columns_from_path.size() == ctx->range->columns_from_path_keys.size());
DORIS_CHECK(ctx->range->columns_from_path_is_null.size() ==
ctx->range->columns_from_path_keys.size());

std::unordered_map<std::string, const SlotDescriptor*> name_to_slot;
for (auto* slot : ctx->tuple_descriptor->slots()) {
name_to_slot.emplace(slot->col_name(), slot);
}
for (size_t i = 0; i < ctx->range->columns_from_path_keys.size(); ++i) {
const auto& key = ctx->range->columns_from_path_keys[i];
auto slot_it = name_to_slot.find(key);
if (slot_it == name_to_slot.end()) {
continue;
}
_partition_values.emplace(
key, std::make_tuple(ctx->range->columns_from_path[i], slot_it->second));
_partition_value_is_null.emplace(key, ctx->range->columns_from_path_is_null[i]);
}
return Status::OK();
}

Status JniReader::on_after_read_block(Block* block, size_t* read_rows) {
if (_column_descs == nullptr || _partition_values.empty() || *read_rows == 0 ||
_push_down_agg_type == TPushAggOp::type::COUNT) {
return Status::OK();
}
return _fill_partition_columns(block, *read_rows);
}

// =========================================================================
// JniReader::open (merged from JniConnector::open)
// =========================================================================
Expand Down Expand Up @@ -305,6 +352,40 @@ Status JniReader::_fill_block(Block* block, size_t num_rows) {
return Status::OK();
}

Status JniReader::_fill_partition_columns(Block* block, size_t num_rows) {
std::unordered_map<std::string, uint32_t> local_name_to_idx;
const std::unordered_map<std::string, uint32_t>* col_map = _col_name_to_block_idx;
if (col_map == nullptr) {
local_name_to_idx = block->get_name_to_pos_map();
col_map = &local_name_to_idx;
}

for (const auto& desc : *_column_descs) {
if (desc.category != ColumnCategory::PARTITION_KEY) {
continue;
}
auto value_it = _partition_values.find(desc.name);
if (value_it == _partition_values.end()) {
continue;
}
auto col_it = col_map->find(desc.name);
if (col_it == col_map->end()) {
return Status::InternalError("Missing partition column {} in block {}", desc.name,
block->dump_structure());
}

auto& column_with_type_and_name = block->get_by_position(col_it->second);
auto mutable_column = std::move(*column_with_type_and_name.column).mutate();
const auto& [value, slot_desc] = value_it->second;
auto null_it = _partition_value_is_null.find(desc.name);
DORIS_CHECK(null_it != _partition_value_is_null.end());
RETURN_IF_ERROR(fill_partition_column_from_path_value(*mutable_column, *slot_desc, value,
num_rows, null_it->second));
column_with_type_and_name.column = std::move(mutable_column);
}
return Status::OK();
}

// =========================================================================
// JniReader::_get_statistics (merged from JniConnector::get_statistics)
// =========================================================================
Expand Down
6 changes: 6 additions & 0 deletions be/src/format/jni/jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class JniReader : public GenericReader {
}

protected:
Status on_before_init_reader(ReaderInitContext* ctx) override;
Status on_after_read_block(Block* block, size_t* read_rows) override;
void _collect_profile_before_close() override;

/**
Expand All @@ -140,6 +142,7 @@ class JniReader : public GenericReader {
private:
static const std::vector<SlotDescriptor*> _s_empty_slot_descs;

Status _fill_partition_columns(Block* block, size_t num_rows);
Status _init_jni_scanner(JNIEnv* env, int batch_size);
Status _fill_block(Block* block, size_t num_rows);
Status _get_statistics(JNIEnv* env, std::map<std::string, std::string>* result);
Expand Down Expand Up @@ -185,6 +188,9 @@ class JniReader : public GenericReader {

// Column name to block index map, passed from FileScanner to avoid repeated map creation
const std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr;
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
_partition_values;
std::unordered_map<std::string, bool> _partition_value_is_null;

void _set_meta(long meta_addr) { _table_meta.set_meta(meta_addr); }
};
Expand Down
7 changes: 4 additions & 3 deletions be/src/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,10 @@ Status OrcReader::_do_init_reader(ReaderInitContext* base_ctx) {
Status OrcReader::on_before_init_reader(ReaderInitContext* ctx) {
_column_descs = ctx->column_descs;
_fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
RETURN_IF_ERROR(
_extract_partition_values(*ctx->range, ctx->tuple_descriptor, _fill_partition_values));
for (auto& desc : *ctx->column_descs) {
RETURN_IF_ERROR(_extract_partition_values(*ctx->range, ctx->tuple_descriptor,
_fill_partition_values,
&_fill_partition_value_is_null));
for (const auto& desc : *ctx->column_descs) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
ctx->column_names.push_back(desc.name);
Expand Down
14 changes: 12 additions & 2 deletions be/src/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <memory>
#include <numeric>
#include <ostream>

#include "common/config.h"
Expand Down Expand Up @@ -327,6 +328,7 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
// Process external table query task that select columns are all from path.
if (_read_table_columns.empty()) {
bool modify_row_ids = false;
int64_t batch_base_row = _total_read_rows;
RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof, &modify_row_ids));

DCHECK(_table_format_reader);
Expand All @@ -339,9 +341,17 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
}
RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, *read_rows));
RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block, *read_rows));
Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, block, block->columns());
std::vector<uint32_t> columns_to_filter(block->columns());
for (uint32_t i = 0; i < columns_to_filter.size(); ++i) {
columns_to_filter[i] = i;
}
IColumn::Filter result_filter;
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_lazy_read_ctx.conjuncts, block, columns_to_filter, block->columns(),
result_filter));
_mark_condition_cache_granules(result_filter.data(), *read_rows, batch_base_row);
*read_rows = block->rows();
return st;
return Status::OK();
}
if (_lazy_read_ctx.can_lazy_read) {
// call _do_lazy_read recursively when current batch is skipped
Expand Down
7 changes: 4 additions & 3 deletions be/src/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,10 @@ void ParquetReader::_init_file_description() {
Status ParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
_column_descs = ctx->column_descs;
_fill_col_name_to_block_idx = ctx->col_name_to_block_idx;
RETURN_IF_ERROR(
_extract_partition_values(*ctx->range, ctx->tuple_descriptor, _fill_partition_values));
for (auto& desc : *ctx->column_descs) {
RETURN_IF_ERROR(_extract_partition_values(*ctx->range, ctx->tuple_descriptor,
_fill_partition_values,
&_fill_partition_value_is_null));
for (const auto& desc : *ctx->column_descs) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
ctx->column_names.push_back(desc.name);
Expand Down
Loading
Loading