Skip to content

Commit

Permalink
[FLINK-13056][runtime] Introduce FastRestartPipelinedRegionStrategy
Browse files Browse the repository at this point in the history
This strategy has better failover handling performance over RestartPipelinedRegionStrategy.
The side effect is slower region building and more cache in memory.
  • Loading branch information
zhuzhurk committed Oct 29, 2019
1 parent 7f21a0c commit 176de01
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph.failover.flip1;

import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

/**
* This failover strategy makes the same restart decision as {@link RestartPipelinedRegionStrategy}.
* It has better failover handling performance at the cost of slower region building and more memory
* used for region boundary cache.
*/
public class FastRestartPipelinedRegionStrategy extends RestartPipelinedRegionStrategy {

/** Maps a failover region to its input result partitions. */
private final IdentityHashMap<FailoverRegion, Collection<IntermediateResultPartitionID>> regionInputs;

/** Maps a failover region to its consumer regions. */
private final IdentityHashMap<FailoverRegion, Collection<FailoverRegion>> regionConsumers;

/** Maps result partition id to its producer failover region. Only for inter-region consumed result partitions. */
private final Map<IntermediateResultPartitionID, FailoverRegion> resultPartitionProducerRegion;

/**
* Creates a new failover strategy to restart pipelined regions that works on the given topology.
*
* @param topology containing info about all the vertices and edges
* @param resultPartitionAvailabilityChecker helps to query result partition availability
*/
public FastRestartPipelinedRegionStrategy(
final FailoverTopology<?, ?> topology,
final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {

super(topology, resultPartitionAvailabilityChecker);

this.regionInputs = new IdentityHashMap<>();
this.regionConsumers = new IdentityHashMap<>();
this.resultPartitionProducerRegion = new HashMap<>();
buildRegionInputsAndOutputs();
}

private void buildRegionInputsAndOutputs() {
for (FailoverRegion region : regions) {
final Set<IntermediateResultPartitionID> consumedExternalResultPartitions = new HashSet<>();
final Set<ExecutionVertexID> externalConsumerVertices = new HashSet<>();

final Set<? extends FailoverVertex<?, ?>> regionVertices = region.getAllExecutionVertices();
for (FailoverVertex<?, ?> vertex : regionVertices) {
for (FailoverResultPartition<?, ?> consumedResultPartition : vertex.getConsumedResults()) {
if (!regionVertices.contains(consumedResultPartition.getProducer())) {
consumedExternalResultPartitions.add(consumedResultPartition.getId());
}
}
for (FailoverResultPartition<?, ?> producedResult : vertex.getProducedResults()) {
for (FailoverVertex<?, ?> consumerVertex : producedResult.getConsumers()) {
if (!regionVertices.contains(consumerVertex)) {
externalConsumerVertices.add(consumerVertex.getId());

this.resultPartitionProducerRegion.put(producedResult.getId(), region);
}
}
}
}

final Set<FailoverRegion> consumerRegions = Collections.newSetFromMap(new IdentityHashMap<>());
externalConsumerVertices.forEach(id -> consumerRegions.add(getFailoverRegion(id)));

this.regionInputs.put(region, new ArrayList<>(consumedExternalResultPartitions));
this.regionConsumers.put(region, new ArrayList<>(consumerRegions));
}
}

@Override
protected void determineProducerRegionsToVisit(
final FailoverRegion currentRegion,
final Queue<FailoverRegion> regionsToVisit,
final Set<FailoverRegion> visitedRegions) {

for (IntermediateResultPartitionID resultPartitionID : regionInputs.get(currentRegion)) {
if (!isResultPartitionAvailable(resultPartitionID)) {
final FailoverRegion producerRegion = resultPartitionProducerRegion.get(resultPartitionID);
if (!visitedRegions.contains(producerRegion)) {
visitedRegions.add(producerRegion);
regionsToVisit.add(producerRegion);
}
}
}
}

@Override
protected void determineConsumerRegionsToVisit(
final FailoverRegion currentRegion,
final Queue<FailoverRegion> regionsToVisit,
final Set<FailoverRegion> visitedRegions) {

for (FailoverRegion consumerRegion : regionConsumers.get(currentRegion)) {
if (!visitedRegions.contains(consumerRegion)) {
visitedRegions.add(consumerRegion);
regionsToVisit.add(consumerRegion);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
private final FailoverTopology<?, ?> topology;

/** All failover regions. */
private final Set<FailoverRegion> regions;
protected final Set<FailoverRegion> regions;

/** Maps execution vertex id to failover region. */
private final Map<ExecutionVertexID, FailoverRegion> vertexToRegionMap;
Expand Down Expand Up @@ -90,6 +90,7 @@ public RestartPipelinedRegionStrategy(
LOG.info("Start building failover regions.");
buildFailoverRegions();
}

// ------------------------------------------------------------------------
// region building
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -172,48 +173,68 @@ public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID execution
* the region containing the partition producer task is involved
* 3. If a region is involved, all of its consumer regions are involved
*/
private Set<FailoverRegion> getRegionsToRestart(FailoverRegion failedRegion) {
Set<FailoverRegion> regionsToRestart = Collections.newSetFromMap(new IdentityHashMap<>());
Set<FailoverRegion> visitedRegions = Collections.newSetFromMap(new IdentityHashMap<>());
private Set<FailoverRegion> getRegionsToRestart(final FailoverRegion failedRegion) {
final Set<FailoverRegion> regionsToRestart = Collections.newSetFromMap(new IdentityHashMap<>());
final Set<FailoverRegion> visitedRegions = Collections.newSetFromMap(new IdentityHashMap<>());

// start from the failed region to visit all involved regions
Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
final Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
visitedRegions.add(failedRegion);
regionsToVisit.add(failedRegion);
while (!regionsToVisit.isEmpty()) {
FailoverRegion regionToRestart = regionsToVisit.poll();
final FailoverRegion regionToRestart = regionsToVisit.poll();

// an involved region should be restarted
regionsToRestart.add(regionToRestart);

// if a needed input result partition is not available, its producer region is involved
for (FailoverVertex<?, ?> vertex : regionToRestart.getAllExecutionVertices()) {
for (FailoverResultPartition<?, ?> consumedPartition : vertex.getConsumedResults()) {
if (!resultPartitionAvailabilityChecker.isAvailable(consumedPartition.getId())) {
FailoverRegion producerRegion = vertexToRegionMap.get(consumedPartition.getProducer().getId());
if (!visitedRegions.contains(producerRegion)) {
visitedRegions.add(producerRegion);
regionsToVisit.add(producerRegion);
}
determineProducerRegionsToVisit(regionToRestart, regionsToVisit, visitedRegions);

// all consumer regions of an involved region should be involved
determineConsumerRegionsToVisit(regionToRestart, regionsToVisit, visitedRegions);
}

return regionsToRestart;
}

protected void determineProducerRegionsToVisit(
final FailoverRegion currentRegion,
final Queue<FailoverRegion> regionsToVisit,
final Set<FailoverRegion> visitedRegions) {

for (FailoverVertex<?, ?> vertex : currentRegion.getAllExecutionVertices()) {
for (FailoverResultPartition<?, ?> consumedPartition : vertex.getConsumedResults()) {
if (!isResultPartitionAvailable(consumedPartition.getId())) {
FailoverRegion producerRegion = vertexToRegionMap.get(consumedPartition.getProducer().getId());
if (!visitedRegions.contains(producerRegion)) {
visitedRegions.add(producerRegion);
regionsToVisit.add(producerRegion);
}
}
}
}
}

// all consumer regions of an involved region should be involved
for (FailoverVertex<?, ?> vertex : regionToRestart.getAllExecutionVertices()) {
for (FailoverResultPartition<?, ?> producedPartition : vertex.getProducedResults()) {
for (FailoverVertex<?, ?> consumerVertex : producedPartition.getConsumers()) {
FailoverRegion consumerRegion = vertexToRegionMap.get(consumerVertex.getId());
if (!visitedRegions.contains(consumerRegion)) {
visitedRegions.add(consumerRegion);
regionsToVisit.add(consumerRegion);
}
protected void determineConsumerRegionsToVisit(
final FailoverRegion currentRegion,
final Queue<FailoverRegion> regionsToVisit,
final Set<FailoverRegion> visitedRegions) {

for (FailoverVertex<?, ?> vertex : currentRegion.getAllExecutionVertices()) {
for (FailoverResultPartition<?, ?> producedPartition : vertex.getProducedResults()) {
for (FailoverVertex<?, ?> consumerVertex : producedPartition.getConsumers()) {
FailoverRegion consumerRegion = vertexToRegionMap.get(consumerVertex.getId());
if (!visitedRegions.contains(consumerRegion)) {
visitedRegions.add(consumerRegion);
regionsToVisit.add(consumerRegion);
}
}
}
}
}

return regionsToRestart;
protected boolean isResultPartitionAvailable(final IntermediateResultPartitionID intermediateResultPartitionID) {
return resultPartitionAvailabilityChecker.isAvailable(intermediateResultPartitionID);
}

// ------------------------------------------------------------------------
Expand All @@ -233,7 +254,7 @@ public FailoverRegion getFailoverRegion(ExecutionVertexID vertexID) {
/**
* A stateful {@link ResultPartitionAvailabilityChecker} which maintains the failed partitions which are not available.
*/
private static class RegionFailoverResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {
protected static class RegionFailoverResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {

/** Result partition state checker from the shuffle master. */
private final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph.failover.flip1;

/**
* Tests the failure handling logic of {@link FastRestartPipelinedRegionStrategy}.
*/
public class FastRestartPipelinedRegionStrategyTest extends RestartPipelinedRegionStrategyTest {

@Override
protected FastRestartPipelinedRegionStrategy createFailoverStrategy(FailoverTopology topology) {
return new FastRestartPipelinedRegionStrategy(topology, resultPartitionID -> true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testRegionFailoverForRegionInternalErrors() {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionStrategy strategy = createFailoverStrategy(topology);

// when v1 fails, {v1,v4,v5} should be restarted
HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
Expand Down Expand Up @@ -156,7 +156,7 @@ public void testRegionFailoverForDataConsumptionErrors() {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionStrategy strategy = createFailoverStrategy(topology);

// when v4 fails to consume data from v1, {v1,v4,v5} should be restarted
HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
Expand Down Expand Up @@ -372,7 +372,7 @@ public void testRegionFailoverForMultipleVerticesRegions() {

TestFailoverTopology topology = topologyBuilder.build();

RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
RestartPipelinedRegionStrategy strategy = createFailoverStrategy(topology);

// when v3 fails due to internal error, {v3,v4,v5,v6} should be restarted
HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
Expand Down Expand Up @@ -404,6 +404,10 @@ public void testRegionFailoverForMultipleVerticesRegions() {
// utilities
// ------------------------------------------------------------------------

protected RestartPipelinedRegionStrategy createFailoverStrategy(FailoverTopology topology) {
return new RestartPipelinedRegionStrategy(topology);
}

private static class TestResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {

private final HashSet<IntermediateResultPartitionID> failedPartitions;
Expand Down

0 comments on commit 176de01

Please sign in to comment.