Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/exprs/create_predicate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ typename Traits::BasePtr create_predicate_function(PrimitiveType type) {
APPLY_FOR_PRIMTYPE(M)
#undef M
default:
DCHECK(false) << "Invalid type.";
DCHECK(false) << "Invalid type: " << type_to_string(type);
}

return nullptr;
Expand All @@ -142,7 +142,7 @@ typename Traits::BasePtr create_bitmap_predicate_function(PrimitiveType type) {
case TYPE_BIGINT:
return Creator::template create<TYPE_BIGINT>();
default:
DCHECK(false) << "Invalid type.";
DCHECK(false) << "Invalid type: " << type_to_string(type);
}

return nullptr;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,

int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
ref_block->replace_by_position_if_const(result_column_id);

if (ref_block->get_by_position(result_column_id).column->size() != row_size) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"{} size invalid, expect={}, real={}", new_block->get_by_position(idx).name,
Expand Down
8 changes: 3 additions & 5 deletions be/src/vec/exprs/vcase_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "common/status.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_numbers.h"
#include "vec/core/column_with_type_and_name.h"
Expand All @@ -54,8 +55,7 @@ VCaseExpr::VCaseExpr(const TExprNode& node)
}
}

Status VCaseExpr::prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
VExprContext* context) {
Status VCaseExpr::prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) {
RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));

ColumnsWithTypeAndName argument_template;
Expand Down Expand Up @@ -92,14 +92,12 @@ void VCaseExpr::close(RuntimeState* state, VExprContext* context,

Status VCaseExpr::execute(VExprContext* context, Block* block, int* result_column_id) {
ColumnNumbers arguments(_children.size());

for (int i = 0; i < _children.size(); i++) {
int column_id = -1;
RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id));
arguments[i] = column_id;

block->replace_by_position_if_const(column_id);
}
RETURN_IF_ERROR(check_constant(*block, arguments));

size_t num_columns_without_result = block->columns();
block->insert({nullptr, _data_type, _expr_name});
Expand Down
22 changes: 13 additions & 9 deletions be/src/vec/exprs/vectorized_fn_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ class TExprNode;

namespace doris::vectorized {

VectorizedFnCall::VectorizedFnCall(const doris::TExprNode& node) : VExpr(node) {}
VectorizedFnCall::VectorizedFnCall(const TExprNode& node) : VExpr(node) {}

doris::Status VectorizedFnCall::prepare(doris::RuntimeState* state,
const doris::RowDescriptor& desc, VExprContext* context) {
Status VectorizedFnCall::prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) {
RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
ColumnsWithTypeAndName argument_template;
argument_template.reserve(_children.size());
Expand Down Expand Up @@ -89,33 +89,36 @@ doris::Status VectorizedFnCall::prepare(doris::RuntimeState* state,
}
VExpr::register_function_context(state, context);
_expr_name = fmt::format("{}({})", _fn.name.function_name, child_expr_name);
_function_name = _fn.name.function_name;
_can_fast_execute = _function->can_fast_execute();

return Status::OK();
}

doris::Status VectorizedFnCall::open(doris::RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
RETURN_IF_ERROR(VExpr::open(state, context, scope));
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
return Status::OK();
}

void VectorizedFnCall::close(doris::RuntimeState* state, VExprContext* context,
void VectorizedFnCall::close(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
VExpr::close_function_context(context, scope, _function);
VExpr::close(state, context, scope);
}

doris::Status VectorizedFnCall::execute(VExprContext* context, doris::vectorized::Block* block,
int* result_column_id) {
Status VectorizedFnCall::execute(VExprContext* context, vectorized::Block* block,
int* result_column_id) {
// TODO: not execute const expr again, but use the const column in function context
doris::vectorized::ColumnNumbers arguments(_children.size());
vectorized::ColumnNumbers arguments(_children.size());
for (int i = 0; i < _children.size(); ++i) {
int column_id = -1;
RETURN_IF_ERROR(_children[i]->execute(context, block, &column_id));
arguments[i] = column_id;
}
RETURN_IF_ERROR(check_constant(*block, arguments));

// call function
size_t num_columns_without_result = block->columns();
// prepare a column to save result
Expand All @@ -133,6 +136,7 @@ doris::Status VectorizedFnCall::execute(VExprContext* context, doris::vectorized
RETURN_IF_ERROR(_function->execute(context->fn_context(_fn_context_index), *block, arguments,
num_columns_without_result, block->rows(), false));
*result_column_id = num_columns_without_result;

return Status::OK();
}

Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/exprs/vectorized_fn_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class VectorizedFnCall : public VExpr {
}
const std::string& expr_name() const override;
std::string debug_string() const override;
bool is_constant() const override {
if (!_function->is_use_default_implementation_for_constants()) {
return false;
}
return VExpr::is_constant();
}
static std::string debug_string(const std::vector<VectorizedFnCall*>& exprs);

bool fast_execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
Expand All @@ -65,5 +71,6 @@ class VectorizedFnCall : public VExpr {
FunctionBasePtr _function;
bool _can_fast_execute = false;
std::string _expr_name;
std::string _function_name;
};
} // namespace doris::vectorized
57 changes: 32 additions & 25 deletions be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "vec/exprs/vslot_ref.h"
#include "vec/exprs/vstruct_literal.h"
#include "vec/exprs/vtuple_is_null_predicate.h"
#include "vec/utils/util.hpp"

namespace doris {
class RowDescriptor;
Expand All @@ -61,7 +62,7 @@ using doris::RuntimeState;
using doris::RowDescriptor;
using doris::TypeDescriptor;

VExpr::VExpr(const doris::TExprNode& node)
VExpr::VExpr(const TExprNode& node)
: _node_type(node.node_type),
_opcode(node.__isset.opcode ? node.opcode : TExprOpcode::INVALID_OPCODE),
_type(TypeDescriptor::from_thrift(node.type)),
Expand Down Expand Up @@ -119,14 +120,14 @@ Status VExpr::open(RuntimeState* state, VExprContext* context,
return Status::OK();
}

void VExpr::close(doris::RuntimeState* state, VExprContext* context,
void VExpr::close(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
for (int i = 0; i < _children.size(); ++i) {
_children[i]->close(state, context, scope);
}
}

Status VExpr::create_expr(ObjectPool* pool, const doris::TExprNode& texpr_node, VExpr** expr) {
Status VExpr::create_expr(ObjectPool* pool, const TExprNode& texpr_node, VExpr** expr) {
try {
switch (texpr_node.node_type) {
case TExprNodeType::BOOL_LITERAL:
Expand All @@ -153,43 +154,43 @@ Status VExpr::create_expr(ObjectPool* pool, const doris::TExprNode& texpr_node,
*expr = pool->add(VStructLiteral::create_unique(texpr_node).release());
break;
}
case doris::TExprNodeType::SLOT_REF: {
case TExprNodeType::SLOT_REF: {
*expr = pool->add(VSlotRef::create_unique(texpr_node).release());
break;
}
case doris::TExprNodeType::COLUMN_REF: {
case TExprNodeType::COLUMN_REF: {
*expr = pool->add(VColumnRef::create_unique(texpr_node).release());
break;
}
case doris::TExprNodeType::COMPOUND_PRED: {
case TExprNodeType::COMPOUND_PRED: {
*expr = pool->add(VcompoundPred::create_unique(texpr_node).release());
break;
}
case doris::TExprNodeType::LAMBDA_FUNCTION_EXPR: {
case TExprNodeType::LAMBDA_FUNCTION_EXPR: {
*expr = pool->add(VLambdaFunctionExpr::create_unique(texpr_node).release());
break;
}
case doris::TExprNodeType::LAMBDA_FUNCTION_CALL_EXPR: {
case TExprNodeType::LAMBDA_FUNCTION_CALL_EXPR: {
*expr = pool->add(VLambdaFunctionCallExpr::create_unique(texpr_node).release());
break;
}
case doris::TExprNodeType::ARITHMETIC_EXPR:
case doris::TExprNodeType::BINARY_PRED:
case doris::TExprNodeType::FUNCTION_CALL:
case doris::TExprNodeType::COMPUTE_FUNCTION_CALL:
case doris::TExprNodeType::MATCH_PRED: {
case TExprNodeType::ARITHMETIC_EXPR:
case TExprNodeType::BINARY_PRED:
case TExprNodeType::FUNCTION_CALL:
case TExprNodeType::COMPUTE_FUNCTION_CALL:
case TExprNodeType::MATCH_PRED: {
*expr = pool->add(VectorizedFnCall::create_unique(texpr_node).release());
break;
}
case doris::TExprNodeType::CAST_EXPR: {
case TExprNodeType::CAST_EXPR: {
*expr = pool->add(VCastExpr::create_unique(texpr_node).release());
break;
}
case doris::TExprNodeType::IN_PRED: {
case TExprNodeType::IN_PRED: {
*expr = pool->add(VInPredicate::create_unique(texpr_node).release());
break;
}
case doris::TExprNodeType::CASE_EXPR: {
case TExprNodeType::CASE_EXPR: {
if (!texpr_node.__isset.case_expr) {
return Status::InternalError("Case expression not set in thrift node");
}
Expand All @@ -211,7 +212,7 @@ Status VExpr::create_expr(ObjectPool* pool, const doris::TExprNode& texpr_node,
default:
return Status::InternalError("Unknown expr node type: {}", texpr_node.node_type);
}
} catch (const doris::Exception& e) {
} catch (const Exception& e) {
return Status::Error(e.code(), e.to_string());
}
if (!(*expr)->data_type()) {
Expand All @@ -220,9 +221,8 @@ Status VExpr::create_expr(ObjectPool* pool, const doris::TExprNode& texpr_node,
return Status::OK();
}

Status VExpr::create_tree_from_thrift(doris::ObjectPool* pool,
const std::vector<doris::TExprNode>& nodes, int* node_idx,
VExpr** root_expr, VExprContext** ctx) {
Status VExpr::create_tree_from_thrift(ObjectPool* pool, const std::vector<TExprNode>& nodes,
int* node_idx, VExpr** root_expr, VExprContext** ctx) {
// propagate error case
if (*node_idx >= nodes.size()) {
return Status::InternalError("Failed to reconstruct expression tree from thrift.");
Expand All @@ -237,9 +237,9 @@ Status VExpr::create_tree_from_thrift(doris::ObjectPool* pool,
DCHECK(root_expr != nullptr);
DCHECK(ctx != nullptr);
*root_expr = root;
*ctx = pool->add(VExprContext::create_unique(root).release());
// short path for leaf node
if (root_children <= 0) {
*ctx = pool->add(VExprContext::create_unique(root).release());
return Status::OK();
}

Expand All @@ -266,11 +266,11 @@ Status VExpr::create_tree_from_thrift(doris::ObjectPool* pool,
s.push({expr, num_children});
}
}
*ctx = pool->add(VExprContext::create_unique(root).release());
return Status::OK();
}

Status VExpr::create_expr_tree(doris::ObjectPool* pool, const doris::TExpr& texpr,
VExprContext** ctx) {
Status VExpr::create_expr_tree(ObjectPool* pool, const TExpr& texpr, VExprContext** ctx) {
if (texpr.nodes.size() == 0) {
*ctx = nullptr;
return Status::OK();
Expand All @@ -291,7 +291,7 @@ Status VExpr::create_expr_tree(doris::ObjectPool* pool, const doris::TExpr& texp
return status;
}

Status VExpr::create_expr_trees(ObjectPool* pool, const std::vector<doris::TExpr>& texprs,
Status VExpr::create_expr_trees(ObjectPool* pool, const std::vector<TExpr>& texprs,
std::vector<VExprContext*>* ctxs) {
ctxs->clear();
for (int i = 0; i < texprs.size(); ++i) {
Expand Down Expand Up @@ -407,7 +407,7 @@ Status VExpr::get_const_col(VExprContext* context,
return Status::OK();
}

void VExpr::register_function_context(doris::RuntimeState* state, VExprContext* context) {
void VExpr::register_function_context(RuntimeState* state, VExprContext* context) {
std::vector<TypeDescriptor> arg_types;
for (int i = 0; i < _children.size(); ++i) {
arg_types.push_back(_children[i]->type());
Expand Down Expand Up @@ -448,4 +448,11 @@ void VExpr::close_function_context(VExprContext* context, FunctionContext::Funct
}
}

Status VExpr::check_constant(const Block& block, ColumnNumbers arguments) const {
if (is_constant() && !VectorizedUtils::all_arguments_are_constant(block, arguments)) {
return Status::InternalError("const check failed, expr={}", debug_string());
}
return Status::OK();
}

} // namespace doris::vectorized
11 changes: 6 additions & 5 deletions be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,8 @@ class VExpr {

static Status create_expr(ObjectPool* pool, const TExprNode& texpr_node, VExpr** expr);

static Status create_tree_from_thrift(doris::ObjectPool* pool,
const std::vector<doris::TExprNode>& nodes, int* node_idx,
VExpr** root_expr, VExprContext** ctx);
static Status create_tree_from_thrift(ObjectPool* pool, const std::vector<TExprNode>& nodes,
int* node_idx, VExpr** root_expr, VExprContext** ctx);
virtual const std::vector<VExpr*>& children() const { return _children; }
void set_children(std::vector<VExpr*> children) { _children = children; }
virtual std::string debug_string() const;
Expand All @@ -178,7 +177,7 @@ class VExpr {
int fn_context_index() const { return _fn_context_index; }

static const VExpr* expr_without_cast(const VExpr* expr) {
if (expr->node_type() == doris::TExprNodeType::CAST_EXPR) {
if (expr->node_type() == TExprNodeType::CAST_EXPR) {
return expr_without_cast(expr->_children[0]);
}
return expr;
Expand Down Expand Up @@ -211,9 +210,11 @@ class VExpr {
return out.str();
}

Status check_constant(const Block& block, ColumnNumbers arguments) const;

/// Helper function that calls ctx->register(), sets fn_context_index_, and returns the
/// registered FunctionContext
void register_function_context(doris::RuntimeState* state, VExprContext* context);
void register_function_context(RuntimeState* state, VExprContext* context);

/// Helper function to initialize function context, called in `open` phase of VExpr:
/// 1. Set constant columns result of function arguments.
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/functions/array/function_array_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class FunctionArrayBinary : public IFunction {
bool is_variadic() const override { return false; }
size_t get_number_of_arguments() const override { return 2; }

bool use_default_implementation_for_constants() const override { return true; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
DCHECK(is_array(arguments[0])) << arguments[0]->get_name();
DCHECK(is_array(arguments[1])) << arguments[1]->get_name();
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/functions/array/function_array_count.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class FunctionArrayCount : public IFunction {

bool use_default_implementation_for_nulls() const override { return false; }

bool use_default_implementation_for_constants() const override { return true; }

ColumnNumbers get_arguments_that_are_always_constant() const override { return {1}; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/functions/array/function_array_range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ class FunctionArrayRange : public IFunction {

bool use_default_implementation_for_nulls() const override { return false; }

bool use_default_implementation_for_constants() const override { return true; }

size_t get_number_of_arguments() const override {
return get_variadic_argument_types_impl().size();
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/functions/array/function_array_zip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ class FunctionArrayZip : public IFunction {

bool is_variadic() const override { return true; }

bool use_default_implementation_for_constants() const override { return true; }

size_t get_number_of_arguments() const override { return 0; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/functions/comparison_equal_for_null.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class FunctionEqForNull : public IFunction {
}

bool use_default_implementation_for_nulls() const override { return false; }
bool use_default_implementation_for_constants() const override { return true; }

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) override {
Expand Down
Loading