diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index eb53954cfad11..efbb423fbf3fd 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -1887,10 +1887,7 @@ public Operator visitIdentitySink(IdentitySinkNode node, LocalExecutionPlanConte IdentitySinkOperator.class.getSimpleName()); context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); - List children = - node.getChildren().stream() - .map(child -> child.accept(this, context)) - .collect(Collectors.toList()); + List children = dealWithConsumeChildrenOneByOneNode(node, context); checkArgument( MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null"); @@ -1922,10 +1919,9 @@ public Operator visitShuffleSink(ShuffleSinkNode node, LocalExecutionPlanContext ShuffleHelperOperator.class.getSimpleName()); context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); - List children = - node.getChildren().stream() - .map(child -> child.accept(this, context)) - .collect(Collectors.toList()); + // TODO implement pipeline division for shuffle sink + context.setDegreeOfParallelism(1); + List children = dealWithConsumeAllChildrenPipelineBreaker(node, context); checkArgument( MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null"); @@ -2552,7 +2548,10 @@ public List dealWithConsumeChildrenOneByOneNode( subContext.setISink(localSinkChannel); subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext()); - int curChildPipelineNum = subContext.getPipelineNumber() - originPipeNum; + // OneByOneChild may be divided into more than dop pipelines, but the number of running + // actually is dop + int curChildPipelineNum = + Math.min(dopForChild, subContext.getPipelineNumber() - originPipeNum); childPipelineNums.add(curChildPipelineNum); sumOfChildPipelines += curChildPipelineNum; // If sumOfChildPipelines > dopForChild, we have to wait until some pipelines finish