Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/vec/exec/file_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) {
}
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter;
std::vector<VExpr*> vexprs;
runtime_filter->get_prepared_vexprs(&vexprs, row_desc());
runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor);
if (vexprs.empty()) {
continue;
}
Expand All @@ -181,7 +181,7 @@ Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) {
last_expr = new_node;
}
auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc());
auto expr_status = new_vconjunct_ctx_ptr->prepare(state, _row_descriptor);
if (UNLIKELY(!expr_status.OK())) {
LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
vexprs.clear();
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ Status VScanNode::_append_rf_into_conjuncts(std::vector<VExpr*>& vexprs) {
if (_vconjunct_ctx_ptr) {
(*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr);
}
RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(_state, row_desc()));
RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(_state, _row_descriptor));
RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(_state));
if (_vconjunct_ctx_ptr) {
(*(_vconjunct_ctx_ptr.get()))->mark_as_stale();
Expand Down Expand Up @@ -879,7 +879,7 @@ Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
++current_arrived_rf_num;
continue;
} else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
_runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, row_desc());
_runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor);
++current_arrived_rf_num;
_runtime_filter_ctxs[i].apply_mark = true;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ Status AggregationNode::_get_without_key_result(RuntimeState* state, Block* bloc
DCHECK(_agg_data.without_key != nullptr);
block->clear();

*block = VectorizedUtils::create_empty_columnswithtypename(row_desc());
*block = VectorizedUtils::create_empty_columnswithtypename(_row_descriptor);
int agg_size = _aggregate_evaluators.size();

MutableColumns columns(agg_size);
Expand Down Expand Up @@ -1002,7 +1002,7 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) {
Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Block* block,
bool* eos) {
bool mem_reuse = block->mem_reuse();
auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(row_desc());
auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(_row_descriptor);
int key_size = _probe_expr_ctxs.size();

MutableColumns key_columns;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/vanalytic_eval_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
DCHECK(child(0)->row_desc().is_prefix_of(row_desc()));
DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor));
_mem_pool.reset(new MemPool(mem_tracker()));
_evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime");
SCOPED_TIMER(_evaluation_timer);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/volap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
DCHECK(runtime_filter != nullptr);
bool ready = runtime_filter->is_ready();
if (ready) {
runtime_filter->get_prepared_vexprs(&vexprs, row_desc());
runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor);
scanner_filter_apply_marks[i] = true;
if (!_runtime_filter_ready_flag[i] && !vexprs.empty()) {
std::lock_guard<std::shared_mutex> l(_rf_lock);
Expand Down Expand Up @@ -1829,7 +1829,7 @@ Status VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state, std::vector
if (_vconjunct_ctx_ptr) {
(*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr);
}
RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, row_desc()));
RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, _row_descriptor));
RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(state));
if (_vconjunct_ctx_ptr) {
(*(_vconjunct_ctx_ptr.get()))->mark_as_stale();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/vtable_function_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Status VTableFunctionNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VExpr::prepare(_vfn_ctxs, state, _row_descriptor));

// get current all output slots
for (const auto& tuple_desc : this->row_desc().tuple_descriptors()) {
for (const auto& tuple_desc : this->_row_descriptor.tuple_descriptors()) {
for (const auto& slot_desc : tuple_desc->slots()) {
_output_slots.push_back(slot_desc);
}
Expand Down
10 changes: 5 additions & 5 deletions be/src/vec/exec/vunion_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Status VUnionNode::prepare(RuntimeState* state) {
ADD_TIMER(_runtime_profile, "MaterializeExprsEvaluateTimer");
// Prepare const expr lists.
for (const std::vector<VExprContext*>& exprs : _const_expr_lists) {
RETURN_IF_ERROR(VExpr::prepare(exprs, state, row_desc()));
RETURN_IF_ERROR(VExpr::prepare(exprs, state, _row_descriptor));
}

// Prepare result expr lists.
Expand Down Expand Up @@ -128,8 +128,8 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
bool mem_reuse = block->mem_reuse();
MutableBlock mblock =
mem_reuse ? MutableBlock::build_mutable_block(block)
: MutableBlock(Block(
VectorizedUtils::create_columns_with_type_and_name(row_desc())));
: MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
_row_descriptor)));

Block child_block;
while (has_more_materialized() && mblock.rows() <= state->batch_size()) {
Expand Down Expand Up @@ -184,8 +184,8 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
bool mem_reuse = block->mem_reuse();
MutableBlock mblock =
mem_reuse ? MutableBlock::build_mutable_block(block)
: MutableBlock(Block(
VectorizedUtils::create_columns_with_type_and_name(row_desc())));
: MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
_row_descriptor)));
for (; _const_expr_list_idx < _const_expr_lists.size(); ++_const_expr_list_idx) {
Block tmp_block;
tmp_block.insert({vectorized::ColumnUInt8::create(1),
Expand Down