From 1e8bc21bfcc0b62cfd15c7f505773545764d4c8b Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 19 Sep 2016 17:54:23 +0200 Subject: [PATCH] [hotfix] RescalingITCase: race condition could make the test stuck in a blocking call until timeout --- .../test/checkpointing/RescalingITCase.java | 51 +++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 7f1d7f3360f78..263bf790eb042 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -69,9 +69,9 @@ public class RescalingITCase extends TestLogger { - private static int numTaskManagers = 2; - private static int slotsPerTaskManager = 2; - private static int numSlots = numTaskManagers * slotsPerTaskManager; + private static final int numTaskManagers = 2; + private static final int slotsPerTaskManager = 2; + private static final int numSlots = numTaskManagers * slotsPerTaskManager; private static TestingCluster cluster; @@ -109,12 +109,12 @@ public static void teardown() { */ @Test public void testSavepointRescalingWithPartitionedState() throws Exception { - int numberKeys = 42; - int numberElements = 1000; - int numberElements2 = 500; - int parallelism = numSlots / 2; - int parallelism2 = numSlots; - int maxParallelism = 13; + final int numberKeys = 42; + final int numberElements = 1000; + final int numberElements2 = 500; + final int parallelism = numSlots / 2; + final int parallelism2 = numSlots; + final int maxParallelism = 13; FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES); Deadline deadline = timeout.fromNow(); @@ -214,9 +214,9 @@ public void testSavepointRescalingWithPartitionedState() throws Exception { */ @Test public void testSavepointRescalingFailureWithNonPartitionedState() throws Exception { - int parallelism = numSlots / 2; - int parallelism2 = numSlots; - int maxParallelism = 13; + final int parallelism = numSlots / 2; + final int parallelism2 = numSlots; + final int maxParallelism = 13; FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES); Deadline deadline = timeout.fromNow(); @@ -235,12 +235,14 @@ public void testSavepointRescalingFailureWithNonPartitionedState() throws Except Object savepointResponse = null; - // we might be too early for taking a savepoint if the operators have not been started yet + // wait until the operator is started + NonPartitionedStateSource.workStartedLatch.await(); + while (deadline.hasTimeLeft()) { Future savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft()); - - savepointResponse = Await.result(savepointPathFuture, deadline.timeLeft()); + FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); + savepointResponse = Await.result(savepointPathFuture, waitingTime); if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) { break; @@ -428,6 +430,8 @@ private static JobGraph createNonPartitionedStateJobGraph(int parallelism, int m env.enableCheckpointing(checkpointInterval); env.setRestartStrategy(RestartStrategies.noRestart()); + NonPartitionedStateSource.workStartedLatch = new CountDownLatch(1); + DataStream input = env.addSource(new NonPartitionedStateSource()); input.addSink(new DiscardingSink()); @@ -466,7 +470,7 @@ public Integer getKey(Integer value) throws Exception { DataStream> result = input.flatMap(new SubtaskIndexFlatMapper(numberElements)); - result.addSink(new CollectionSink()); + result.addSink(new CollectionSink>()); return env.getStreamGraph().getJobGraph(); } @@ -504,7 +508,7 @@ public Integer getKey(Integer value) throws Exception { DataStream> result = input.flatMap(new SubtaskIndexFlatMapper(numberElements)); - result.addSink(new CollectionSink()); + result.addSink(new CollectionSink>()); return env.getStreamGraph().getJobGraph(); } @@ -645,8 +649,10 @@ private static class NonPartitionedStateSource extends RichParallelSourceFunctio private static final long serialVersionUID = -8108185918123186841L; - private int counter = 0; - private boolean running = true; + private static volatile CountDownLatch workStartedLatch = new CountDownLatch(1); + + private volatile int counter = 0; + private volatile boolean running = true; @Override public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { @@ -669,13 +675,16 @@ public void run(SourceContext ctx) throws Exception { ctx.collect(counter * getRuntimeContext().getIndexOfThisSubtask()); } - Thread.sleep(100); + Thread.sleep(2); + if(counter == 10) { + workStartedLatch.countDown(); + } } } @Override public void cancel() { - running = true; + running = false; } } }