Skip to content

Commit

Permalink
[FLINK-19693] ITCases for Approximate Local Recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
curcur committed Nov 6, 2020
1 parent a774003 commit efd6909
Showing 1 changed file with 285 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.test.checkpointing;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.TestLogger;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import java.util.Arrays;
import java.util.Collections;

import static org.apache.flink.test.util.TestUtils.tryExecute;

/**
* To test approximate downstream failover.
*
* <p>If a task fails, all its downstream tasks restart, including itself.
*/

public class ApproximateLocalRecoveryDownstreamITCase extends TestLogger {
private static final int BUFFER_SIZE = 4096;

@Rule
public MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(createConfig())
.setNumberTaskManagers(4)
.setNumberSlotsPerTaskManager(1)
.build());

@Rule
public final Timeout timeout = Timeout.millis(300000L);

/**
* Test the following topology.
* <pre>
* (source1/1) -----> (map1/1) -----> (sink1/1)
* </pre>
* (map1/1) fails, (map1/1) and (sink1/1) restart
*/
@Test
public void localTaskFailureRecoveryThreeTasks() throws Exception {
final int failAfterElements = 150;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.setParallelism(1)
.setBufferTimeout(0)
.setMaxParallelism(128)
.disableOperatorChaining()
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getCheckpointConfig().enableApproximateLocalRecovery(true);

env.addSource(new AppSourceFunction())
.slotSharingGroup("source")
.map(new FailingMapper<>(failAfterElements))
.slotSharingGroup("map")
.addSink(new ValidatingAtMostOnceSink(300))
.slotSharingGroup("sink");

FailingMapper.failedBefore = false;
tryExecute(env, "testThreeTasks");
}

/**
* Test the following topology.
* <pre>
* (source1/1) -----> (map1/2) -----> (sink1/1)
* | ^
* -------------> (map2/2) ---------|
* </pre>
* (map1/2) fails, (map1/2) and (sink1/1) restart
*/
@Test
public void localTaskFailureRecoveryTwoMapTasks() throws Exception {
final int failAfterElements = 20;
final int keyByChannelNumber = 2;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.setParallelism(1)
.setBufferTimeout(0)
.disableOperatorChaining()
.setMaxParallelism(128)
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getCheckpointConfig().enableApproximateLocalRecovery(true);

env.addSource(new AppSourceFunction(BUFFER_SIZE, env.getMaxParallelism(), keyByChannelNumber))
.slotSharingGroup("source")
.keyBy(1)
.map(new FailingMapper<>(failAfterElements))
.setParallelism(keyByChannelNumber)
.slotSharingGroup("map")
.addSink(new ValidatingAtMostOnceSink(200, keyByChannelNumber))
.slotSharingGroup("sink");

FailingMapper.failedBefore = false;
tryExecute(env, "testTwoMapTasks");
}

// Schema: (index, key, assignedChannel, long string).
private static class AppSourceFunction extends RichParallelSourceFunction<Tuple4<Integer, Integer, Integer, String>> {
private final String shortString = "I am a very long string to test partial records hohoho hahaha ";
private final String longOrShortString;
private final int maxParallelism;
private final int numberOfChannels;
private final int[] keys;
private int index = 0;
private volatile boolean running = true;

// short-length string
AppSourceFunction() {
this.longOrShortString = shortString;
this.maxParallelism = 128;
this.numberOfChannels = 1;
this.keys = initKeys(numberOfChannels);
}

// long-length string
AppSourceFunction(int bufferSize, int maxParallelism, int numberOfChannels) {
this.maxParallelism = maxParallelism;
this.numberOfChannels = numberOfChannels;
this.keys = initKeys(numberOfChannels);

StringBuilder builder = new StringBuilder(shortString);
for (int i = 0; i <= 2 * bufferSize / shortString.length() + 1; i++) {
builder.append(shortString);
}
this.longOrShortString = builder.toString();
}

@Override
public void run(SourceContext<Tuple4<Integer, Integer, Integer, String>> ctx) throws Exception{
while (running) {
synchronized (ctx.getCheckpointLock()) {
if (index % 100 == 0) {
Thread.sleep(50);
}
int key = keys[index % numberOfChannels];
ctx.collect(new Tuple4<>(index, key, assignedIndex(key), longOrShortString));
}
index++;
}
}

@Override
public void cancel() {
running = false;
}

private int[] initKeys(int numberOfChannels) {
int[] keys = new int[numberOfChannels];

for (int i = 0; i < numberOfChannels; i++) {
int key = 0;
while (key < 1000 && assignedIndex(key) != i) {
key++;
}
assert key < 1000 : "Can not find a key within number 1000";
keys[i] = key;
}

return keys;
}

private int assignedIndex(int key) {
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}
}

private static class FailingMapper<T> extends RichMapFunction<T, T> {
private static final long serialVersionUID = 6334389850158703L;

public static volatile boolean failedBefore;

private final int failCount;
private int numElementsTotal;

private boolean failer;

public FailingMapper(int failCount) {
this.failCount = failCount;
}

@Override
public void open(Configuration parameters) {
failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
}

@Override
public T map(T value) throws Exception {
numElementsTotal++;

if (!failedBefore) {
Thread.sleep(10);

if (failer && numElementsTotal >= failCount) {
failedBefore = true;
throw new Exception("Artificial Test Failure");
}
}

return value;
}
}

private static class ValidatingAtMostOnceSink extends RichSinkFunction<Tuple4<Integer, Integer, Integer, String>> {
private static final long serialVersionUID = 1748426382527469932L;
private final int numElementsTotal;
private final int[] numElements;
private final Integer[] indexReachingNumElements;
private final int numberOfInputChannels;

public ValidatingAtMostOnceSink(int numElementsTotal, int numberOfInputChannels) {
this.numElementsTotal = numElementsTotal;
this.numberOfInputChannels = numberOfInputChannels;
this.numElements = new int[numberOfInputChannels];
this.indexReachingNumElements = new Integer[numberOfInputChannels];
}

public ValidatingAtMostOnceSink(int numElementsTotal) {
this.numElementsTotal = numElementsTotal;
this.numberOfInputChannels = 1;
this.numElements = new int[numberOfInputChannels];
this.indexReachingNumElements = new Integer[numberOfInputChannels];
}

@Override
public void invoke(Tuple4<Integer, Integer, Integer, String> value) throws Exception {
assert value.f2 < numberOfInputChannels;
numElements[value.f2]++;

boolean allReachNumElementsTotal = true;
for (int i = 0; i < numberOfInputChannels; i++) {
if (numElements[i] == numElementsTotal) {
indexReachingNumElements[i] = value.f0;
} else if (numElements[i] < numElementsTotal) {
allReachNumElementsTotal = false;
}
}
if (allReachNumElementsTotal) {
assert Collections.max(Arrays.asList(indexReachingNumElements)).intValue() >= numElementsTotal * numberOfInputChannels;
throw new SuccessException();
}
}
}

private static Configuration createConfig() {
Configuration config = new Configuration();
config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy");
config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(Integer.toString(BUFFER_SIZE)));

return config;
}
}

0 comments on commit efd6909

Please sign in to comment.