Skip to content

Commit

Permalink
[Feature](topn) BE adaptive choose whether push topn filter down to s…
Browse files Browse the repository at this point in the history
…torage layer (apache#34713)

support judge topn filter push down
topn filter will push down to storage layer when 2 case:

    filter target is key column
    table data model is dup/merge on write
  • Loading branch information
BiteTheDDDDt authored and M1saka2003 committed May 24, 2024
1 parent 9342910 commit 7d104b6
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 21 deletions.
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {

bool _storage_no_merge() override;

bool _push_down_topn() override { return true; }
bool _push_down_topn(const vectorized::RuntimePredicate& predicate) override {
if (!predicate.target_is_slot()) {
return false;
}
return _is_key_column(predicate.get_col_name()) || _storage_no_merge();
}

Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;

Expand Down
9 changes: 2 additions & 7 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
init_value_range(_slot_id_to_slot_desc[_colname_to_slot_id[colname]], type);
}

if (!_push_down_topn()) {
RETURN_IF_ERROR(_get_topn_filters(state));
}
RETURN_IF_ERROR(_get_topn_filters(state));

for (auto it = _conjuncts.begin(); it != _conjuncts.end();) {
auto& conjunct = *it;
Expand Down Expand Up @@ -1269,11 +1267,8 @@ Status ScanLocalState<Derived>::_init_profile() {

template <typename Derived>
Status ScanLocalState<Derived>::_get_topn_filters(RuntimeState* state) {
for (auto id : get_topn_filter_source_node_ids()) {
for (auto id : get_topn_filter_source_node_ids(state, false)) {
const auto& pred = state->get_query_ctx()->get_runtime_predicate(id);
if (!pred.inited()) {
continue;
}
SlotDescriptor* slot_desc = _slot_id_to_slot_desc[_colname_to_slot_id[pred.get_col_name()]];

vectorized::VExprSPtr topn_pred;
Expand Down
16 changes: 13 additions & 3 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,18 @@ class ScanLocalState : public ScanLocalStateBase {

std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }

std::vector<int> get_topn_filter_source_node_ids() {
return _parent->cast<typename Derived::Parent>().topn_filter_source_node_ids;
std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool push_down) {
std::vector<int> result;
for (int id : _parent->cast<typename Derived::Parent>().topn_filter_source_node_ids) {
const auto& pred = state->get_query_ctx()->get_runtime_predicate(id);
if (!pred.inited()) {
continue;
}
if (_push_down_topn(pred) == push_down) {
result.push_back(id);
}
}
return result;
}

protected:
Expand All @@ -176,7 +186,7 @@ class ScanLocalState : public ScanLocalStateBase {
virtual bool _should_push_down_common_expr() { return false; }

virtual bool _storage_no_merge() { return false; }
virtual bool _push_down_topn() { return false; }
virtual bool _push_down_topn(const vectorized::RuntimePredicate& predicate) { return false; }
virtual bool _is_key_column(const std::string& col_name) { return false; }
virtual vectorized::VScanNode::PushDownType _should_push_down_bloom_filter() {
return vectorized::VScanNode::PushDownType::UNACCEPTABLE;
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/runtime_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class RuntimePredicate {
Status init(PrimitiveType type, bool nulls_first, bool is_asc, const std::string& col_name);

bool inited() const {
// when sort node and scan node are not in the same fragment, predicate will not be initialized
std::shared_lock<std::shared_mutex> rlock(_rwlock);
return _inited;
}
Expand All @@ -58,7 +59,6 @@ class RuntimePredicate {

Status set_tablet_schema(TabletSchemaSPtr tablet_schema) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
// when sort node and scan node are not in the same backend, predicate will not be initialized
if (_tablet_schema || !_inited) {
return Status::OK();
}
Expand Down Expand Up @@ -92,6 +92,8 @@ class RuntimePredicate {

bool nulls_first() const { return _nulls_first; }

bool target_is_slot() const { return true; }

private:
mutable std::shared_mutex _rwlock;
Field _orderby_extrem {Field::Types::Null};
Expand Down
15 changes: 6 additions & 9 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,24 +401,21 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.filter_block_conjuncts = _conjuncts;
}

// runtime predicate push down optimization for topn
if (!_parent && !((pipeline::OlapScanLocalState*)_local_state)
->get_topn_filter_source_node_ids()
.empty()) {
// the new topn whitch support external table
if (!_parent) {
_tablet_reader_params.topn_filter_source_node_ids =
((pipeline::OlapScanLocalState*)_local_state)
->get_topn_filter_source_node_ids();
} else {
->get_topn_filter_source_node_ids(_state, true);
}

if (_tablet_reader_params.topn_filter_source_node_ids.empty()) {
// old topn logic
_tablet_reader_params.use_topn_opt = olap_scan_node.use_topn_opt;
if (_tablet_reader_params.use_topn_opt) {
if (olap_scan_node.__isset.topn_filter_source_node_ids) {
// the 2.1 new multiple topn
_tablet_reader_params.topn_filter_source_node_ids =
olap_scan_node.topn_filter_source_node_ids;

} else {
// the 2.0 old topn
_tablet_reader_params.topn_filter_source_node_ids = {0};
}
}
Expand Down

0 comments on commit 7d104b6

Please sign in to comment.