Skip to content

Commit

Permalink
[FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuzhurk committed Apr 21, 2020
1 parent 67835fe commit 99cbaa9
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.strategy;

import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.DeploymentOption;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.util.IterableUtils;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* {@link SchedulingStrategy} instance which schedules tasks in granularity of pipelined regions.
*/
public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {

private final SchedulerOperations schedulerOperations;

private final SchedulingTopology schedulingTopology;

private final DeploymentOption deploymentOption = new DeploymentOption(false);

/** Result partitions are correlated if they have the same result id. */
private final Map<IntermediateDataSetID, Set<SchedulingResultPartition>> correlatedResultPartitions = new HashMap<>();

private final Map<IntermediateResultPartitionID, Set<SchedulingPipelinedRegion>> partitionConsumerRegions = new HashMap<>();

public PipelinedRegionSchedulingStrategy(
final SchedulerOperations schedulerOperations,
final SchedulingTopology schedulingTopology) {

this.schedulerOperations = checkNotNull(schedulerOperations);
this.schedulingTopology = checkNotNull(schedulingTopology);

init();
}

private void init() {
for (SchedulingPipelinedRegion region : schedulingTopology.getAllPipelinedRegions()) {
for (SchedulingResultPartition partition : region.getConsumedResults()) {
checkState(partition.getResultType() == ResultPartitionType.BLOCKING);

partitionConsumerRegions.computeIfAbsent(partition.getId(), pid -> new HashSet<>()).add(region);
correlatedResultPartitions.computeIfAbsent(partition.getResultId(), rid -> new HashSet<>()).add(partition);
}
}
}

@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils
.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(region -> !region.getConsumedResults().iterator().hasNext())
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}

@Override
public void restartTasks(final Set<ExecutionVertexID> verticesToRestart) {
final Set<SchedulingPipelinedRegion> regionsToRestart = verticesToRestart.stream()
.map(schedulingTopology::getPipelinedRegionOfVertex)
.collect(Collectors.toSet());
maybeScheduleRegions(regionsToRestart);
}

@Override
public void onExecutionStateChange(final ExecutionVertexID executionVertexId, final ExecutionState executionState) {
if (executionState == ExecutionState.FINISHED) {
final Set<SchedulingResultPartition> finishedPartitions = IterableUtils
.toStream(schedulingTopology.getVertex(executionVertexId).getProducedResults())
.filter(partition -> partitionConsumerRegions.containsKey(partition.getId()))
.filter(partition -> partition.getState() == ResultPartitionState.CONSUMABLE)
.flatMap(partition -> correlatedResultPartitions.get(partition.getResultId()).stream())
.collect(Collectors.toSet());

final Set<SchedulingPipelinedRegion> consumerRegions = finishedPartitions.stream()
.flatMap(partition -> partitionConsumerRegions.get(partition.getId()).stream())
.collect(Collectors.toSet());
maybeScheduleRegions(consumerRegions);
}
}

@Override
public void onPartitionConsumable(final IntermediateResultPartitionID resultPartitionId) {
}

private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology, regions);
for (SchedulingPipelinedRegion region : regionsSorted) {
maybeScheduleRegion(region);
}
}

private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
if (!areRegionInputsAllConsumable(region)) {
return;
}

checkState(areRegionVerticesAllInCreatedState(region), "BUG: trying to schedule a region which is not in CREATED state");

final Set<ExecutionVertexID> verticesToSchedule = IterableUtils.toStream(region.getVertices())
.map(SchedulingExecutionVertex::getId)
.collect(Collectors.toSet());
final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptionsInTopologicalOrder(
schedulingTopology,
verticesToSchedule,
id -> deploymentOption);
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}

private boolean areRegionInputsAllConsumable(final SchedulingPipelinedRegion region) {
return IterableUtils.toStream(region.getConsumedResults())
.allMatch(partition -> partition.getState() == ResultPartitionState.CONSUMABLE);
}

private boolean areRegionVerticesAllInCreatedState(final SchedulingPipelinedRegion region) {
return IterableUtils.toStream(region.getVertices())
.allMatch(vertex -> vertex.getState() == ExecutionState.CREATED);
}

/**
* The factory for creating {@link PipelinedRegionSchedulingStrategy}.
*/
public static class Factory implements SchedulingStrategyFactory {
@Override
public SchedulingStrategy createInstance(
final SchedulerOperations schedulerOperations,
final SchedulingTopology schedulingTopology) {
return new PipelinedRegionSchedulingStrategy(schedulerOperations, schedulingTopology);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,16 @@ static List<ExecutionVertexDeploymentOption> createExecutionVertexDeploymentOpti
deploymentOptionRetriever.apply(executionVertexID)))
.collect(Collectors.toList());
}

static List<SchedulingPipelinedRegion> sortPipelinedRegionsInTopologicalOrder(
final SchedulingTopology topology,
final Set<SchedulingPipelinedRegion> regions) {

return IterableUtils.toStream(topology.getVertices())
.map(SchedulingExecutionVertex::getId)
.map(topology::getPipelinedRegionOfVertex)
.filter(regions::contains)
.distinct()
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.strategy;

import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.util.TestLogger;

import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

/**
* Unit tests for {@link PipelinedRegionSchedulingStrategy}.
*/
public class PipelinedRegionSchedulingStrategyTest extends TestLogger {

private TestingSchedulerOperations testingSchedulerOperation;

private static final int PARALLELISM = 2;

private TestingSchedulingTopology testingSchedulingTopology;

private List<TestingSchedulingExecutionVertex> source;

private List<TestingSchedulingExecutionVertex> map;

private List<TestingSchedulingExecutionVertex> sink;

@Before
public void setUp() {
testingSchedulerOperation = new TestingSchedulerOperations();

buildTopology();
}

private void buildTopology() {
testingSchedulingTopology = new TestingSchedulingTopology();

source = testingSchedulingTopology.addExecutionVertices().withParallelism(PARALLELISM).finish();
map = testingSchedulingTopology.addExecutionVertices().withParallelism(PARALLELISM).finish();
sink = testingSchedulingTopology.addExecutionVertices().withParallelism(PARALLELISM).finish();

testingSchedulingTopology.connectPointwise(source, map)
.withResultPartitionState(ResultPartitionState.CREATED)
.withResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
.finish();
testingSchedulingTopology.connectAllToAll(map, sink)
.withResultPartitionState(ResultPartitionState.CREATED)
.withResultPartitionType(ResultPartitionType.BLOCKING)
.finish();
}

@Test
public void testStartScheduling() {
startScheduling(testingSchedulingTopology);

final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<>();
expectedScheduledVertices.add(Arrays.asList(source.get(0), map.get(0)));
expectedScheduledVertices.add(Arrays.asList(source.get(1), map.get(1)));
assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
}

@Test
public void testRestartTasks() {
final PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(testingSchedulingTopology);

final Set<ExecutionVertexID> verticesToRestart = Stream.of(source, map, sink)
.flatMap(List::stream)
.map(TestingSchedulingExecutionVertex::getId)
.collect(Collectors.toSet());

schedulingStrategy.restartTasks(verticesToRestart);

final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<>();
expectedScheduledVertices.add(Arrays.asList(source.get(0), map.get(0)));
expectedScheduledVertices.add(Arrays.asList(source.get(1), map.get(1)));
assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
}

@Test
public void testNotifyingBlockingResultPartitionProducerFinished() {
final PipelinedRegionSchedulingStrategy schedulingStrategy = startScheduling(testingSchedulingTopology);

final TestingSchedulingExecutionVertex map1 = map.get(0);
map1.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
schedulingStrategy.onExecutionStateChange(map1.getId(), ExecutionState.FINISHED);

// sinks' inputs are not all consumable yet so they are not scheduled
assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(2));

final TestingSchedulingExecutionVertex map2 = map.get(1);
map2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
schedulingStrategy.onExecutionStateChange(map2.getId(), ExecutionState.FINISHED);

assertThat(testingSchedulerOperation.getScheduledVertices(), hasSize(4));

final List<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<>();
expectedScheduledVertices.add(Arrays.asList(sink.get(0)));
expectedScheduledVertices.add(Arrays.asList(sink.get(1)));
assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices);
}

private PipelinedRegionSchedulingStrategy startScheduling(TestingSchedulingTopology testingSchedulingTopology) {
final PipelinedRegionSchedulingStrategy schedulingStrategy = new PipelinedRegionSchedulingStrategy(
testingSchedulerOperation,
testingSchedulingTopology);
schedulingStrategy.startScheduling();
return schedulingStrategy;
}

private void assertLatestScheduledVerticesAreEqualTo(final List<List<TestingSchedulingExecutionVertex>> expected) {
final List<List<ExecutionVertexDeploymentOption>> deploymentOptions = testingSchedulerOperation.getScheduledVertices();
final int expectedScheduledBulks = expected.size();
assertThat(expectedScheduledBulks, lessThanOrEqualTo(deploymentOptions.size()));
for (int i = 0; i < expectedScheduledBulks; i++) {
assertEquals(
idsFromVertices(expected.get(expectedScheduledBulks - i - 1)),
idsFromDeploymentOptions(deploymentOptions.get(deploymentOptions.size() - i - 1)));
}
}

private static List<ExecutionVertexID> idsFromVertices(final List<TestingSchedulingExecutionVertex> vertices) {
return vertices.stream().map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toList());
}

private static List<ExecutionVertexID> idsFromDeploymentOptions(
final List<ExecutionVertexDeploymentOption> deploymentOptions) {

return deploymentOptions.stream().map(ExecutionVertexDeploymentOption::getExecutionVertexId).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ void setProducer(TestingSchedulingExecutionVertex producer) {
this.producer = checkNotNull(producer);
}

void setState(ResultPartitionState state) {
this.state = state;
}

/**
* Builder for {@link TestingSchedulingResultPartition}.
*/
Expand Down

0 comments on commit 99cbaa9

Please sign in to comment.