Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,9 @@ class OperatorXBase : public OperatorBase {
void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = parallel_tasks; }
int parallel_tasks() const { return _parallel_tasks; }

// To keep compatibility with older FE
void set_serial_operator() { _is_serial_operator = true; }

protected:
template <typename Dependency>
friend class PipelineXLocalState;
Expand Down
12 changes: 12 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1188,13 +1188,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
std::stringstream error_msg;
bool enable_query_cache = request.fragment.__isset.query_cache_param;

bool fe_with_old_version = false;
switch (tnode.node_type) {
case TPlanNodeType::OLAP_SCAN_NODE: {
op.reset(new OlapScanOperatorX(
pool, tnode, next_operator_id(), descs, _num_instances,
enable_query_cache ? request.fragment.query_cache_param : TQueryCacheParam {}));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
Expand All @@ -1205,6 +1207,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case doris::TPlanNodeType::JDBC_SCAN_NODE: {
Expand All @@ -1217,19 +1220,22 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
"Jdbc scan node is disabled, you can change be config enable_java_support "
"to true and restart be.");
}
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case doris::TPlanNodeType::FILE_SCAN_NODE: {
op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::ES_SCAN_NODE:
case TPlanNodeType::ES_HTTP_SCAN_NODE: {
op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::EXCHANGE_NODE: {
Expand All @@ -1238,6 +1244,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::AGGREGATION_NODE: {
Expand Down Expand Up @@ -1599,6 +1606,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::SCHEMA_SCAN_NODE: {
Expand All @@ -1623,6 +1631,10 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
return Status::InternalError("Unsupported exec type in pipeline: {}",
print_plan_node_type(tnode.node_type));
}
if (request.__isset.parallel_instances && fe_with_old_version) {
cur_pipe->set_num_tasks(request.parallel_instances);
op->set_serial_operator();
}

return Status::OK();
}
Expand Down