Skip to content

Commit

Permalink
[FLINK-17014][runtime] TestingSchedulingTopology implements pipelined…
Browse files Browse the repository at this point in the history
… region getter interfaces
  • Loading branch information
zhuzhurk committed Apr 21, 2020
1 parent bda9b77 commit 67835fe
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.strategy;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* A simple implementation of {@link SchedulingPipelinedRegion} for testing.
*/
public class TestingSchedulingPipelinedRegion implements SchedulingPipelinedRegion {

private final Map<ExecutionVertexID, TestingSchedulingExecutionVertex> regionVertices = new HashMap<>();

private final Set<TestingSchedulingResultPartition> consumedPartitions = new HashSet<>();

public TestingSchedulingPipelinedRegion(final Set<TestingSchedulingExecutionVertex> vertices) {
for (TestingSchedulingExecutionVertex vertex : vertices) {
regionVertices.put(vertex.getId(), vertex);

for (TestingSchedulingResultPartition consumedPartition : vertex.getConsumedResults()) {
if (!vertices.contains(consumedPartition.getProducer())) {
consumedPartitions.add(consumedPartition);
}
}
}
}

@Override
public Iterable<TestingSchedulingExecutionVertex> getVertices() {
return Collections.unmodifiableCollection(regionVertices.values());
}

@Override
public TestingSchedulingExecutionVertex getVertex(ExecutionVertexID vertexId) {
return regionVertices.get(vertexId);
}

@Override
public Iterable<TestingSchedulingResultPartition> getConsumedResults() {
return Collections.unmodifiableSet(consumedPartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler.strategy;

import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
Expand All @@ -28,9 +29,12 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkState;

Expand All @@ -44,6 +48,8 @@ public class TestingSchedulingTopology implements SchedulingTopology {

private final Map<IntermediateResultPartitionID, TestingSchedulingResultPartition> schedulingResultPartitions = new HashMap<>();

private Map<ExecutionVertexID, TestingSchedulingPipelinedRegion> vertexRegions;

private boolean containsCoLocationConstraints;

@Override
Expand Down Expand Up @@ -78,11 +84,50 @@ public TestingSchedulingResultPartition getResultPartition(final IntermediateRes
return resultPartition;
}

@Override
public Iterable<SchedulingPipelinedRegion> getAllPipelinedRegions() {
return new HashSet<>(getVertexRegions().values());
}

@Override
public SchedulingPipelinedRegion getPipelinedRegionOfVertex(ExecutionVertexID vertexId) {
return getVertexRegions().get(vertexId);
}

private Map<ExecutionVertexID, TestingSchedulingPipelinedRegion> getVertexRegions() {
if (vertexRegions == null) {
generatePipelinedRegions();
}
return vertexRegions;
}

private void generatePipelinedRegions() {
vertexRegions = new HashMap<>();

final Set<Set<SchedulingExecutionVertex>> rawRegions =
PipelinedRegionComputeUtil.computePipelinedRegions(this);

for (Set<SchedulingExecutionVertex> rawRegion : rawRegions) {
final Set<TestingSchedulingExecutionVertex> vertices = rawRegion.stream()
.map(vertex -> schedulingExecutionVertices.get(vertex.getId()))
.collect(Collectors.toSet());
final TestingSchedulingPipelinedRegion region = new TestingSchedulingPipelinedRegion(vertices);
for (TestingSchedulingExecutionVertex vertex : vertices) {
vertexRegions.put(vertex.getId(), region);
}
}
}

private void resetPipelinedRegions() {
vertexRegions = null;
}

void addSchedulingExecutionVertex(TestingSchedulingExecutionVertex schedulingExecutionVertex) {
checkState(!schedulingExecutionVertices.containsKey(schedulingExecutionVertex.getId()));

schedulingExecutionVertices.put(schedulingExecutionVertex.getId(), schedulingExecutionVertex);
updateVertexResultPartitions(schedulingExecutionVertex);
resetPipelinedRegions();
}

private void updateVertexResultPartitions(final TestingSchedulingExecutionVertex schedulingExecutionVertex) {
Expand Down Expand Up @@ -130,6 +175,8 @@ public TestingSchedulingTopology connect(
updateVertexResultPartitions(producer);
updateVertexResultPartitions(consumer);

resetPipelinedRegions();

return this;
}

Expand Down

0 comments on commit 67835fe

Please sign in to comment.