From e19dc30e8891b756dbdf528f62ac8c77f3a18182 Mon Sep 17 00:00:00 2001 From: Thesharing Date: Thu, 25 Feb 2021 13:18:37 +0800 Subject: [PATCH] [fix-up] Optimize the topology building in ExecutionGraph --- .../executiongraph/EdgeManagerBuildUtil.java | 46 ++++++------------- .../runtime/executiongraph/Execution.java | 4 +- .../executiongraph/ExecutionGraph.java | 2 +- .../executiongraph/ExecutionJobVertex.java | 2 +- .../runtime/jobgraph/DistributionPattern.java | 4 +- 5 files changed, 20 insertions(+), 38 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java index 98a4b90f1f0fb4..13b85629bd5b25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java @@ -33,17 +33,17 @@ public class EdgeManagerBuildUtil { public static void connectVertexToResult( - ExecutionVertex[] taskVertices, + ExecutionJobVertex vertex, IntermediateResult ires, int inputNumber, DistributionPattern distributionPattern) { switch (distributionPattern) { case POINTWISE: - connectPointwise(taskVertices, ires, inputNumber); + connectPointwise(vertex.getTaskVertices(), ires, inputNumber); break; case ALL_TO_ALL: - connectAllToAll(taskVertices, ires, inputNumber); + connectAllToAll(vertex.getTaskVertices(), ires, inputNumber); break; default: throw new RuntimeException("Unrecognized distribution pattern."); @@ -122,35 +122,17 @@ private static void connectPointwise( ConsumedPartitionGroup consumerPartitionGroup = new ConsumedPartitionGroup(partition.getPartitionId()); - List consumers = new ArrayList<>(targetCount / sourceCount + 1); - - if (targetCount % sourceCount == 0) { - int factor = targetCount / sourceCount; - int start = partitionNum * factor; - for (int i = 0; i < factor; i++) { - ExecutionVertex executionVertex = taskVertices[start + i]; - executionVertex.addConsumedPartitions(consumerPartitionGroup, inputNumber); - - consumers.add(executionVertex.getID()); - } - } else { - float factor = ((float) targetCount) / sourceCount; - int mirrorPartitionNumber = sourceCount - 1 - partitionNum; - int start = (int) (mirrorPartitionNumber * factor); - int end = - (mirrorPartitionNumber == sourceCount - 1) - ? targetCount - : (int) ((mirrorPartitionNumber + 1) * factor); - - for (int i = 0; i < end - start; i++) { - int mirrorVertexSubTaskIndex = start + i; - int vertexSubtaskIndex = targetCount - 1 - mirrorVertexSubTaskIndex; - - ExecutionVertex executionVertex = taskVertices[vertexSubtaskIndex]; - executionVertex.addConsumedPartitions(consumerPartitionGroup, inputNumber); - - consumers.add(executionVertex.getID()); - } + float factor = ((float) targetCount) / sourceCount; + int start = (int) (Math.ceil(partitionNum * factor)); + int end = (int) (Math.ceil((partitionNum + 1) * factor)); + + List consumers = new ArrayList<>(end - start); + + for (int i = start; i < end; i++) { + ExecutionVertex executionVertex = taskVertices[i]; + executionVertex.addConsumedPartitions(consumerPartitionGroup, inputNumber); + + consumers.add(executionVertex.getID()); } ConsumerVertexGroup consumerVertexGroup = new ConsumerVertexGroup(consumers); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 2a6fb4487586de..593296a57543aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -439,7 +439,7 @@ public CompletableFuture registerProducedPartitions( PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition); int maxParallelism = getPartitionMaxParallelism( - partition, vertex.getExecutionGraph()::getVertexOrThrow); + partition, vertex.getExecutionGraph()::getExecutionVertexOrThrow); CompletableFuture shuffleDescriptorFuture = vertex.getExecutionGraph() .getShuffleMaster() @@ -726,7 +726,7 @@ private void updatePartitionConsumers(final IntermediateResultPartition partitio for (ExecutionVertexID consumerVertexId : allConsumers.get(0).getVertices()) { final ExecutionVertex consumerVertex = - vertex.getExecutionGraph().getVertexOrThrow(consumerVertexId); + vertex.getExecutionGraph().getExecutionVertexOrThrow(consumerVertexId); final Execution consumer = consumerVertex.getCurrentExecutionAttempt(); final ExecutionState consumerState = consumer.getState(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 2f56a2254c9a71..ed2a5f841ad130 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -692,7 +692,7 @@ public void registerResultPartition( resultPartitionsById.put(id, partition); } - public ExecutionVertex getVertexOrThrow(ExecutionVertexID id) { + public ExecutionVertex getExecutionVertexOrThrow(ExecutionVertexID id) { return checkNotNull(executionVerticesById.get(id)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index fe47da62ee0f65..1d5a0cec80af4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -455,7 +455,7 @@ public void connectToPredecessors( this.inputs.add(ires); - connectVertexToResult(taskVertices, ires, num, edge.getDistributionPattern()); + connectVertexToResult(this, ires, num, edge.getDistributionPattern()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/DistributionPattern.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/DistributionPattern.java index 9668d458c9b247..e4a4796f4c6c1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/DistributionPattern.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/DistributionPattern.java @@ -26,8 +26,8 @@ * A distribution pattern determines, which sub tasks of a producing task are connected to which * consuming sub tasks. * - *

It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} is connected in - * {@link EdgeManagerBuildUtil} + *

It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} are connected + * in {@link EdgeManagerBuildUtil} */ public enum DistributionPattern {