diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 5da7faeabd364f..be54b7c4999840 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -733,6 +733,9 @@ class OperatorXBase : public OperatorBase { void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = parallel_tasks; } int parallel_tasks() const { return _parallel_tasks; } + // To keep compatibility with older FE + void set_serial_operator() { _is_serial_operator = true; } + protected: template friend class PipelineXLocalState; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index a74f0818cb383c..4e05a39d77cd62 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1188,6 +1188,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo std::stringstream error_msg; bool enable_query_cache = request.fragment.__isset.query_cache_param; + bool fe_with_old_version = false; switch (tnode.node_type) { case TPlanNodeType::OLAP_SCAN_NODE: { op.reset(new OlapScanOperatorX( @@ -1195,6 +1196,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo enable_query_cache ? request.fragment.query_cache_param : TQueryCacheParam {})); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: { @@ -1205,6 +1207,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case doris::TPlanNodeType::JDBC_SCAN_NODE: { @@ -1217,12 +1220,14 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo "Jdbc scan node is disabled, you can change be config enable_java_support " "to true and restart be."); } + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case doris::TPlanNodeType::FILE_SCAN_NODE: { op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case TPlanNodeType::ES_SCAN_NODE: @@ -1230,6 +1235,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case TPlanNodeType::EXCHANGE_NODE: { @@ -1238,6 +1244,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case TPlanNodeType::AGGREGATION_NODE: { @@ -1599,6 +1606,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case TPlanNodeType::SCHEMA_SCAN_NODE: { @@ -1623,6 +1631,10 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo return Status::InternalError("Unsupported exec type in pipeline: {}", print_plan_node_type(tnode.node_type)); } + if (request.__isset.parallel_instances && fe_with_old_version) { + cur_pipe->set_num_tasks(request.parallel_instances); + op->set_serial_operator(); + } return Status::OK(); }