Skip to content
Merged
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ CONF_Int32(quick_compaction_batch_size, "10");
// do compaction min rowsets
CONF_Int32(quick_compaction_min_rowsets, "10");

CONF_mBool(enable_function_pushdown, "false");

// cooldown task configs
CONF_Int32(cooldown_thread_num, "5");
CONF_mInt64(generate_cooldown_task_interval_sec, "20");
Expand Down
93 changes: 82 additions & 11 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ Status OlapScanNode::close(RuntimeState* state) {
}

VLOG_CRITICAL << "OlapScanNode::close()";
// pushed functions close
Expr::close(_pushed_func_conjunct_ctxs, state);

return ScanNode::close(state);
}

Expand Down Expand Up @@ -477,8 +480,10 @@ Status OlapScanNode::start_scan(RuntimeState* state) {
}

VLOG_CRITICAL << "BuildKeyRangesAndFilters";
// 3. Using ColumnValueRange to Build StorageEngine filters
// 3.1 Using ColumnValueRange to Build StorageEngine filters
RETURN_IF_ERROR(build_key_ranges_and_filters());
// 3.2 Function pushdown
if (config::enable_function_pushdown) RETURN_IF_ERROR(build_function_filters());

VLOG_CRITICAL << "Filter idle conjuncts";
// 4. Filter idle conjunct which already trans to olap filters
Expand All @@ -505,29 +510,34 @@ bool OlapScanNode::is_key_column(const std::string& key_name) {
}

void OlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
if (_pushed_conjuncts_index.empty()) {
if (_pushed_conjuncts_index.empty() && _pushed_func_conjuncts_index.empty()) {
return;
}

// dispose direct conjunct first
std::vector<ExprContext*> new_conjunct_ctxs;
for (int i = 0; i < _direct_conjunct_size; ++i) {
if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) ==
_pushed_conjuncts_index.cend()) {
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
if (!_pushed_conjuncts_index.empty() && _pushed_conjuncts_index.count(i)) {
_conjunct_ctxs[i]->close(state); // pushed condition, just close
} else if (!_pushed_func_conjuncts_index.empty() && _pushed_func_conjuncts_index.count(i)) {
_pushed_func_conjunct_ctxs.emplace_back(
_conjunct_ctxs[i]); // pushed functions, need keep ctxs
} else {
_conjunct_ctxs[i]->close(state);
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
}
}

auto new_direct_conjunct_size = new_conjunct_ctxs.size();

// dispose hash join push down conjunct second
for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) {
if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) ==
_pushed_conjuncts_index.cend()) {
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
if (!_pushed_conjuncts_index.empty() && _pushed_conjuncts_index.count(i)) {
_conjunct_ctxs[i]->close(state); // pushed condition, just close
} else if (!_pushed_func_conjuncts_index.empty() && _pushed_func_conjuncts_index.count(i)) {
_pushed_func_conjunct_ctxs.emplace_back(
_conjunct_ctxs[i]); // pushed functions, need keep ctxs
} else {
_conjunct_ctxs[i]->close(state);
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
}
}

Expand Down Expand Up @@ -689,6 +699,67 @@ static std::string olap_filters_to_string(const std::vector<doris::TCondition>&
return filters_string;
}

Status OlapScanNode::build_function_filters() {
for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
ExprContext* ex_ctx = _conjunct_ctxs[conj_idx];
Expr* fn_expr = ex_ctx->root();
bool opposite = false;

if (TExprNodeType::COMPOUND_PRED == fn_expr->node_type() &&
TExprOpcode::COMPOUND_NOT == fn_expr->op()) {
fn_expr = fn_expr->get_child(0);
opposite = true;
}

// currently only support like / not like
if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type() &&
"like" == fn_expr->fn().name.function_name) {
doris_udf::FunctionContext* func_cxt =
ex_ctx->fn_context(fn_expr->get_fn_context_index());

if (!func_cxt) {
continue;
}
if (fn_expr->children().size() != 2) {
continue;
}
SlotRef* slot_ref = nullptr;
Expr* literal_expr = nullptr;

if (TExprNodeType::SLOT_REF == fn_expr->get_child(0)->node_type()) {
literal_expr = fn_expr->get_child(1);
slot_ref = (SlotRef*)(fn_expr->get_child(0));
} else if (TExprNodeType::SLOT_REF == fn_expr->get_child(1)->node_type()) {
literal_expr = fn_expr->get_child(0);
slot_ref = (SlotRef*)(fn_expr->get_child(1));
} else {
continue;
}

if (TExprNodeType::STRING_LITERAL != literal_expr->node_type()) continue;

const SlotDescriptor* slot_desc = nullptr;
std::vector<SlotId> slot_ids;
slot_ref->get_slot_ids(&slot_ids);
for (SlotDescriptor* slot : _tuple_desc->slots()) {
if (slot->id() == slot_ids[0]) {
slot_desc = slot;
break;
}
}

if (!slot_desc) {
continue;
}
std::string col = slot_desc->col_name();
StringVal val = literal_expr->get_string_val(ex_ctx, nullptr);
_push_down_functions.emplace_back(opposite, col, func_cxt, val);
_pushed_func_conjuncts_index.insert(conj_idx);
}
}
return Status::OK();
}

Status OlapScanNode::build_key_ranges_and_filters() {
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types = _olap_scan_node.key_column_type;
Expand Down Expand Up @@ -873,7 +944,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
// so that scanner can be automatically deconstructed if prepare failed.
_scanner_pool.add(scanner);
RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter,
_bloom_filters_push_down));
_bloom_filters_push_down, _push_down_functions));

_olap_scanners.push_back(scanner);
disk_set.insert(scanner->scan_disk());
Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "exec/olap_scanner.h"
#include "exec/scan_node.h"
#include "exprs/bloomfilter_predicate.h"
#include "exprs/function_filter.h"
#include "exprs/in_predicate.h"
#include "runtime/descriptors.h"
#include "util/progress_updater.h"
Expand Down Expand Up @@ -108,6 +109,8 @@ class OlapScanNode : public ScanNode {
void eval_const_conjuncts();
Status normalize_conjuncts();
Status build_key_ranges_and_filters();
Status build_function_filters();

Status start_scan_thread(RuntimeState* state);

template <PrimitiveType T>
Expand Down Expand Up @@ -190,6 +193,15 @@ class OlapScanNode : public ScanNode {
std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>
_bloom_filters_push_down;

// push down functions to storage engine
// only support scalar functions, now just support like / not like
std::vector<FunctionFilter> _push_down_functions;
// functions conjunct's index which already be push down storage engine
std::set<uint32_t> _pushed_func_conjuncts_index;
// need keep these conjunct to the end of scan node,
// since some memory referenced by pushed function filters
std::vector<ExprContext*> _pushed_func_conjunct_ctxs;

// Pool for storing allocated scanner objects. We don't want to use the
// runtime pool to ensure that the scanner objects are deleted before this
// object is.
Expand Down
17 changes: 11 additions & 6 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
Status OlapScanner::prepare(
const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters) {
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
set_tablet_reader();
// set limit to reduce end of rowset and segment mem use
Expand Down Expand Up @@ -124,8 +124,9 @@ Status OlapScanner::prepare(
}

{
// Initialize tablet_reader_params
RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters, bloom_filters));
// Initialize _params
RETURN_IF_ERROR(
_init_tablet_reader_params(key_ranges, filters, bloom_filters, function_filters));
}

return Status::OK();
Expand Down Expand Up @@ -157,8 +158,8 @@ Status OlapScanner::open() {
// it will be called under tablet read lock because capture rs readers need
Status OlapScanner::_init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters) {
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
bool single_version =
(_tablet_reader_params.rs_readers.size() == 1 &&
Expand Down Expand Up @@ -193,6 +194,10 @@ Status OlapScanner::_init_tablet_reader_params(
std::inserter(_tablet_reader_params.bloom_filters,
_tablet_reader_params.bloom_filters.begin()));

std::copy(function_filters.cbegin(), function_filters.cend(),
std::inserter(_tablet_reader_params.function_filters,
_tablet_reader_params.function_filters.begin()));

// Range
for (auto key_range : key_ranges) {
if (key_range->begin_scan_range.size() == 1 &&
Expand Down
8 changes: 6 additions & 2 deletions be/src/exec/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "exec/exec_node.h"
#include "exec/olap_utils.h"
#include "exprs/bloomfilter_predicate.h"
#include "exprs/expr.h"
#include "exprs/function_filter.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "olap/tuple_reader.h"
Expand All @@ -48,7 +50,8 @@ class OlapScanner {
Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
const std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters);
bloom_filters,
const std::vector<FunctionFilter>& function_filters);

Status open();

Expand Down Expand Up @@ -92,7 +95,8 @@ class OlapScanner {
Status _init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters);
bloom_filters,
const std::vector<FunctionFilter>& function_filters);
Status _init_return_columns(bool need_seq_col);
void _convert_row_to_tuple(Tuple* tuple);

Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ class Expr {
};

static Expr* copy(ObjectPool* pool, Expr* old_expr);
int get_fn_context_index() { return _fn_context_index; }

protected:
friend class AggFnEvaluator;
Expand Down
43 changes: 43 additions & 0 deletions be/src/exprs/function_filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <memory>

#include "udf/udf.h"
#include "udf/udf_internal.h"

namespace doris {

class FunctionFilter {
public:
FunctionFilter(bool opposite, const std::string& col_name, doris_udf::FunctionContext* fn_ctx,
doris_udf::StringVal string_param)
: _opposite(opposite),
_col_name(col_name),
_fn_ctx(fn_ctx),
_string_param(string_param) {}

bool _opposite;
std::string _col_name;
// these pointer's life time controlled by scan node
doris_udf::FunctionContext* _fn_ctx;
doris_udf::StringVal
_string_param; // only one param from conjunct, because now only support like predicate
};

} // namespace doris
Loading