Skip to content

Commit

Permalink
[FLINK-19693][runtime] ITCases for Approximate Local Recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
curcur committed Nov 2, 2020
1 parent f95c893 commit aab0707
Showing 1 changed file with 275 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.test.checkpointing;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.SuccessException;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;

import java.util.BitSet;

import static org.apache.flink.test.util.TestUtils.tryExecute;

/** Use RemoteInputChannel. Only for Upstream Reconnection. **/

public class ApproximateLocalRecoveryDownstreamITCase {
private static MiniClusterWithClientResource cluster;
private static final int BUFFER_SIZE = 4096;

@Before
public void setup() throws Exception {
Configuration config = new Configuration();
config.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy");
config.setString(HighAvailabilityOptions.HA_MODE, RegionFailoverITCase.TestingHAFactory.class.getName());

config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(Integer.toString(BUFFER_SIZE)));
config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 1000000L);
config.setString(AkkaOptions.ASK_TIMEOUT, "1 h");

cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberTaskManagers(4)
.setNumberSlotsPerTaskManager(1)
.build());
cluster.before();
}

@AfterClass
public static void shutDownExistingCluster() {
if (cluster != null) {
cluster.after();
cluster = null;
}
}

/**
* Test the following topology.
* <pre>
* (source1/1) -----> (map1/1) -----> (sink1/1)
* </pre>
* (map1/1) fails, (map1/1) and (sink1/1) restart
*/
@Test
public void localTaskFailureRecoveryThreeTasks() throws Exception {
final int failAfterElements = 150;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.setParallelism(1)
.setBufferTimeout(0)
.setMaxParallelism(128)
.disableOperatorChaining()
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getCheckpointConfig().enableApproximateLocalRecovery(true);

env.addSource(new AppSourceFunction())
.slotSharingGroup("source")
.map(new FailingMapper<>(failAfterElements))
.slotSharingGroup("map")
.addSink(new ValidatingAtMostOnceSink(300))
.slotSharingGroup("sink");

FailingMapper.failedBefore = false;
tryExecute(env, "testThreeTasks");
}

/**
* Test the following topology.
* <pre>
* (source1/1) -----> (map1/2) -----> (sink1/1)
* | ^
* -------------> (map2/2) ---------|
* </pre>
* (map1/2) fails, (map1/2) and (sink1/1) restart
*/
@Test
public void localTaskFailureRecoveryTwoMapTasks() throws Exception {
final int failAfterElements = 100;
final int keyByChannelNumber = 2;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.setParallelism(1)
.setBufferTimeout(0)
.disableOperatorChaining()
.setMaxParallelism(128)
.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getCheckpointConfig().enableApproximateLocalRecovery(true);

env.addSource(new AppSourceFunction(BUFFER_SIZE, env.getMaxParallelism(), keyByChannelNumber))
.slotSharingGroup("source")
.keyBy(0)
.map(new FailingMapper<>(failAfterElements))
.setParallelism(keyByChannelNumber)
.slotSharingGroup("map")
.addSink(new ValidatingAtMostOnceSink(200, keyByChannelNumber))
.slotSharingGroup("sink");

FailingMapper.failedBefore = false;
tryExecute(env, "testTwoMapTasks");
}

// Schema: (key, timestamp, index, long string).
private static class AppSourceFunction extends RichParallelSourceFunction<Tuple4<Integer, Long, Integer, String>> {
private final String longOrShortString;
private final int maxParallelism;
private final int numberOfChannels;

private int index = 0;
private volatile boolean running = true;

// short-length string
AppSourceFunction() {
this.longOrShortString = "I am a very long string to test partial records hohoho hahaha ";
this.maxParallelism = 128;
this.numberOfChannels = 1;
}

// long-length string
AppSourceFunction(int bufferSize, int maxParallelism, int numberOfChannels) {
this.maxParallelism = maxParallelism;
this.numberOfChannels = numberOfChannels;

String shortString = "I am a very long string to test partial records hohoho hahaha ";
StringBuilder builder = new StringBuilder(shortString);

for (int i = 0; i <= 2 * bufferSize / shortString.length() + 1; i++) {
builder.append(shortString);
}
this.longOrShortString = builder.toString();
}

@Override
public void run(SourceContext<Tuple4<Integer, Long, Integer, String>> ctx) throws Exception{
long timestamp = 1593575900000L;
while (running) {
synchronized (ctx.getCheckpointLock()) {
if (index % 100 == 0) {
Thread.sleep(500);
}
ctx.collect(new Tuple4<>(index, timestamp++, channelIndex(index), longOrShortString));
}
index++;
}
}

@Override
public void cancel() {
running = false;
}

private int channelIndex(int key) {
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}
}

private static class FailingMapper<T> extends RichMapFunction<T, T> {
private static final long serialVersionUID = 6334389850158703L;

public static volatile boolean failedBefore;

private final int failCount;
private int numElementsTotal;

private boolean failer;

public FailingMapper(int failCount) {
this.failCount = failCount;
}

@Override
public void open(Configuration parameters) {
failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
}

@Override
public T map(T value) throws Exception {
numElementsTotal++;

if (!failedBefore) {
Thread.sleep(10);

if (failer && numElementsTotal >= failCount) {
failedBefore = true;
throw new Exception("Artificial Test Failure");
}
}

return value;
}
}

private static class ValidatingAtMostOnceSink extends RichSinkFunction<Tuple4<Integer, Long, Integer, String>> {

private static final long serialVersionUID = 1748426382527469932L;
private final int numElementsTotal;
private final BitSet duplicateChecker = new BitSet();
private final int[] numElements;
private final int numberOfInputChannels;

public ValidatingAtMostOnceSink(int numElementsTotal, int numberOfInputChannels) {
this.numElementsTotal = numElementsTotal;
this.numberOfInputChannels = numberOfInputChannels;
this.numElements = new int[numberOfInputChannels];
}

public ValidatingAtMostOnceSink(int numElementsTotal) {
this.numElementsTotal = numElementsTotal;
this.numberOfInputChannels = 1;
this.numElements = new int[numberOfInputChannels];
}

@Override
public void invoke(Tuple4<Integer, Long, Integer, String> value) throws Exception {
assert value.f2 < numberOfInputChannels;
numElements[value.f2]++;

if (duplicateChecker.get(value.f0)) {
throw new Exception("Received a duplicate: " + value);
}

duplicateChecker.set(value.f0);

for (int numElement : numElements) {
if (numElement < numElementsTotal) {
return;
}
throw new SuccessException();
}
}
}
}

0 comments on commit aab0707

Please sign in to comment.