From d4b072e09992ec4f2d4b00df136ea84b7c7ae9e4 Mon Sep 17 00:00:00 2001 From: Alima777 Date: Tue, 28 Feb 2023 21:14:15 +0800 Subject: [PATCH 1/3] Fix identitySink and shuffleSink problem --- .../mpp/plan/planner/OperatorTreeGenerator.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index eb53954cfad11..d5a3abdb98b18 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -1887,10 +1887,7 @@ public Operator visitIdentitySink(IdentitySinkNode node, LocalExecutionPlanConte IdentitySinkOperator.class.getSimpleName()); context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); - List children = - node.getChildren().stream() - .map(child -> child.accept(this, context)) - .collect(Collectors.toList()); + List children = dealWithConsumeChildrenOneByOneNode(node, context); checkArgument( MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null"); @@ -1922,10 +1919,7 @@ public Operator visitShuffleSink(ShuffleSinkNode node, LocalExecutionPlanContext ShuffleHelperOperator.class.getSimpleName()); context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); - List children = - node.getChildren().stream() - .map(child -> child.accept(this, context)) - .collect(Collectors.toList()); + List children = dealWithConsumeAllChildrenPipelineBreaker(node, context); checkArgument( MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null"); @@ -2552,7 +2546,10 @@ public List dealWithConsumeChildrenOneByOneNode( subContext.setISink(localSinkChannel); subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext()); - int curChildPipelineNum = subContext.getPipelineNumber() - originPipeNum; + // OneByOneChild may be divided into more than dop pipelines, but the number of running + // is dop actually + int curChildPipelineNum = + Math.min(dopForChild, subContext.getPipelineNumber() - originPipeNum); childPipelineNums.add(curChildPipelineNum); sumOfChildPipelines += curChildPipelineNum; // If sumOfChildPipelines > dopForChild, we have to wait until some pipelines finish From 6c36939915afa49fa3d1257d5f07c5695f9e382f Mon Sep 17 00:00:00 2001 From: Alima777 Date: Tue, 28 Feb 2023 21:14:36 +0800 Subject: [PATCH 2/3] Fix onebyOne with oneByOne child --- .../apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index d5a3abdb98b18..af268073c3b34 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -2547,7 +2547,7 @@ public List dealWithConsumeChildrenOneByOneNode( subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext()); // OneByOneChild may be divided into more than dop pipelines, but the number of running - // is dop actually + // actually is dop int curChildPipelineNum = Math.min(dopForChild, subContext.getPipelineNumber() - originPipeNum); childPipelineNums.add(curChildPipelineNum); From d1833021af9f3191ab6beb699c51bc93ce348ffa Mon Sep 17 00:00:00 2001 From: Alima777 Date: Wed, 1 Mar 2023 10:10:31 +0800 Subject: [PATCH 3/3] set dop 1 for shuffle sink --- .../apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index af268073c3b34..efbb423fbf3fd 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -1919,6 +1919,8 @@ public Operator visitShuffleSink(ShuffleSinkNode node, LocalExecutionPlanContext ShuffleHelperOperator.class.getSimpleName()); context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); + // TODO implement pipeline division for shuffle sink + context.setDegreeOfParallelism(1); List children = dealWithConsumeAllChildrenPipelineBreaker(node, context); checkArgument(