diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 15dfd821772a6a1..daff2167f7fe142 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -80,7 +80,12 @@ class OlapScanLocalState final : public ScanLocalState { bool _storage_no_merge() override; - bool _push_down_topn() override { return true; } + bool _push_down_topn(const vectorized::RuntimePredicate& predicate) override { + if (!predicate.target_is_slot()) { + return false; + } + return _is_key_column(predicate.get_col_name()) || _storage_no_merge(); + } Status _init_scanners(std::list* scanners) override; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index ddb379e0977f3fe..19a3911c6a7e266 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -189,9 +189,7 @@ Status ScanLocalState::_normalize_conjuncts(RuntimeState* state) { init_value_range(_slot_id_to_slot_desc[_colname_to_slot_id[colname]], type); } - if (!_push_down_topn()) { - RETURN_IF_ERROR(_get_topn_filters(state)); - } + RETURN_IF_ERROR(_get_topn_filters(state)); for (auto it = _conjuncts.begin(); it != _conjuncts.end();) { auto& conjunct = *it; @@ -1269,11 +1267,8 @@ Status ScanLocalState::_init_profile() { template Status ScanLocalState::_get_topn_filters(RuntimeState* state) { - for (auto id : get_topn_filter_source_node_ids()) { + for (auto id : get_topn_filter_source_node_ids(state, false)) { const auto& pred = state->get_query_ctx()->get_runtime_predicate(id); - if (!pred.inited()) { - continue; - } SlotDescriptor* slot_desc = _slot_id_to_slot_desc[_colname_to_slot_id[pred.get_col_name()]]; vectorized::VExprSPtr topn_pred; diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 3ebd573fc719f34..e7ce9b31d19d18f 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -158,8 +158,18 @@ class ScanLocalState : public ScanLocalStateBase { std::vector dependencies() const override { return {_scan_dependency.get()}; } - std::vector get_topn_filter_source_node_ids() { - return _parent->cast().topn_filter_source_node_ids; + std::vector get_topn_filter_source_node_ids(RuntimeState* state, bool push_down) { + std::vector result; + for (int id : _parent->cast().topn_filter_source_node_ids) { + const auto& pred = state->get_query_ctx()->get_runtime_predicate(id); + if (!pred.inited()) { + continue; + } + if (_push_down_topn(pred) == push_down) { + result.push_back(id); + } + } + return result; } protected: @@ -176,7 +186,7 @@ class ScanLocalState : public ScanLocalStateBase { virtual bool _should_push_down_common_expr() { return false; } virtual bool _storage_no_merge() { return false; } - virtual bool _push_down_topn() { return false; } + virtual bool _push_down_topn(const vectorized::RuntimePredicate& predicate) { return false; } virtual bool _is_key_column(const std::string& col_name) { return false; } virtual vectorized::VScanNode::PushDownType _should_push_down_bloom_filter() { return vectorized::VScanNode::PushDownType::UNACCEPTABLE; diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index 00fbd62dd88fcb8..0305994e0fc8bb2 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -47,6 +47,7 @@ class RuntimePredicate { Status init(PrimitiveType type, bool nulls_first, bool is_asc, const std::string& col_name); bool inited() const { + // when sort node and scan node are not in the same fragment, predicate will not be initialized std::shared_lock rlock(_rwlock); return _inited; } @@ -58,7 +59,6 @@ class RuntimePredicate { Status set_tablet_schema(TabletSchemaSPtr tablet_schema) { std::unique_lock wlock(_rwlock); - // when sort node and scan node are not in the same backend, predicate will not be initialized if (_tablet_schema || !_inited) { return Status::OK(); } @@ -92,6 +92,8 @@ class RuntimePredicate { bool nulls_first() const { return _nulls_first; } + bool target_is_slot() const { return true; } + private: mutable std::shared_mutex _rwlock; Field _orderby_extrem {Field::Types::Null}; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index c058c8f32994a2c..56593ae61e56f7e 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -401,24 +401,21 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.filter_block_conjuncts = _conjuncts; } - // runtime predicate push down optimization for topn - if (!_parent && !((pipeline::OlapScanLocalState*)_local_state) - ->get_topn_filter_source_node_ids() - .empty()) { - // the new topn whitch support external table + if (!_parent) { _tablet_reader_params.topn_filter_source_node_ids = ((pipeline::OlapScanLocalState*)_local_state) - ->get_topn_filter_source_node_ids(); - } else { + ->get_topn_filter_source_node_ids(_state, true); + } + + if (_tablet_reader_params.topn_filter_source_node_ids.empty()) { + // old topn logic _tablet_reader_params.use_topn_opt = olap_scan_node.use_topn_opt; if (_tablet_reader_params.use_topn_opt) { if (olap_scan_node.__isset.topn_filter_source_node_ids) { - // the 2.1 new multiple topn _tablet_reader_params.topn_filter_source_node_ids = olap_scan_node.topn_filter_source_node_ids; } else { - // the 2.0 old topn _tablet_reader_params.topn_filter_source_node_ids = {0}; } }