Skip to content

Commit

Permalink
[FLINK-21132][runtime][tests] Test StopWith Savepoint against concurr…
Browse files Browse the repository at this point in the history
…ent EndOfInput
  • Loading branch information
rkhachatryan committed Feb 3, 2021
1 parent bb89788 commit 07d8465
Showing 1 changed file with 141 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
Expand Down Expand Up @@ -86,16 +88,21 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.concurrent.CompletableFuture.allOf;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN;
import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -373,6 +380,8 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>, BoundedOneInput {
static volatile CountDownLatch progressLatch;
static volatile CountDownLatch snapshotAllowedLatch;
static volatile CountDownLatch snapshotStartedLatch;
static volatile boolean inputEnded;

private transient boolean processed;
Expand All @@ -381,6 +390,14 @@ static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
this.chainingStrategy = chainingStrategy;
}

private static void allowSnapshots() {
snapshotAllowedLatch.countDown();
}

public static void awaitSnapshotStarted() throws InterruptedException {
snapshotStartedLatch.await();
}

@Override
public void endInput() throws Exception {
inputEnded = true;
Expand All @@ -395,18 +412,94 @@ public void processElement(StreamRecord<T> element) throws Exception {
}
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
snapshotStartedLatch.countDown();
snapshotAllowedLatch.await();
super.snapshotState(context);
}

// --------------------------------------------------------------------

static CountDownLatch getProgressLatch() {
return progressLatch;
}

static void resetForTest(int parallelism) {
static void resetForTest(int parallelism, boolean allowSnapshots) {
progressLatch = new CountDownLatch(parallelism);
snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism);
snapshotStartedLatch = new CountDownLatch(parallelism);
inputEnded = false;
}
}

@Test
public void testStopSavepointWithBoundedInputConcurrently() throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;

while (true) {

final MiniClusterResourceFactory clusterFactory =
new MiniClusterResourceFactory(
numTaskManagers,
numSlotsPerTaskManager,
getFileBasedCheckpointsConfig());

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);

// It's only possible to test this with chaining. Without it, JM fails the job before
// the downstream gets the abort notification
BoundedPassThroughOperator<Integer> operator =
new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
InfiniteTestSource source = new InfiniteTestSource();
DataStream<Integer> stream =
env.addSource(source)
.transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);

stream.addSink(new DiscardingSink<>());

final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = jobGraph.getJobID();

MiniClusterWithClientResource cluster = clusterFactory.get();
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();

try {
BoundedPassThroughOperator.resetForTest(1, false);
InfiniteTestSource.resetForTest();

client.submitJob(jobGraph).get();

BoundedPassThroughOperator.getProgressLatch().await();
InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
BoundedPassThroughOperator.awaitSnapshotStarted();
InfiniteTestSource.cancelAllAndAwait(); // emulate end of input
BoundedPassThroughOperator.allowSnapshots();
stop.get();
Assert.assertTrue("input NOT ended ", BoundedPassThroughOperator.inputEnded);
return;
} catch (Exception e) {
// if sources and the whole job ends before the checkpoint completes
// then coordinator will shut down and savepoint will be aborted - retry
if (!ischeckpointcoordinatorshutdownError(e)) {
throw e;
}
} finally {
cluster.after();
}
}
}

private static boolean ischeckpointcoordinatorshutdownError(Throwable throwable) {
return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
.filter(e -> e.getCheckpointFailureReason() == CHECKPOINT_COORDINATOR_SHUTDOWN)
.isPresent();
}

@Test
public void testStopSavepointWithBoundedInput() throws Exception {
final int numTaskManagers = 2;
Expand Down Expand Up @@ -438,7 +531,7 @@ public void testStopSavepointWithBoundedInput() throws Exception {
ClusterClient<?> client = cluster.getClusterClient();

try {
BoundedPassThroughOperator.resetForTest(1);
BoundedPassThroughOperator.resetForTest(1, true);

client.submitJob(jobGraph).get();

Expand Down Expand Up @@ -662,21 +755,63 @@ private static class InfiniteTestSource implements SourceFunction<Integer> {

private static final long serialVersionUID = 1L;
private volatile boolean running = true;
private volatile boolean suspended = false;
private static final Collection<InfiniteTestSource> createdSources =
new CopyOnWriteArrayList<>();
private transient volatile CompletableFuture<Void> completeFuture;

public InfiniteTestSource() {
createdSources.add(this);
}

@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(1);
completeFuture = new CompletableFuture<>();
createdSources.add(this);
try {
while (running) {
if (!suspended) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(1);
}
}
Thread.sleep(1);
}
Thread.sleep(1);
completeFuture.complete(null);
} catch (Exception e) {
completeFuture.completeExceptionally(e);
}
}

@Override
public void cancel() {
running = false;
}

public void suspend() {
suspended = true;
}

public static void resetForTest() {
createdSources.clear();
}

public CompletableFuture<Void> getCompleteFuture() {
return completeFuture;
}

public static void cancelAllAndAwait() throws ExecutionException, InterruptedException {
createdSources.forEach(InfiniteTestSource::cancel);
allOf(
createdSources.stream()
.map(InfiniteTestSource::getCompleteFuture)
.toArray(CompletableFuture[]::new))
.get();
}

public static void suspendAll() {
createdSources.forEach(InfiniteTestSource::suspend);
}
}

private static class StatefulCounter extends RichMapFunction<Integer, Integer>
Expand Down

0 comments on commit 07d8465

Please sign in to comment.