From e6c0ec0f6cab1e28222ec00193a2023e6cfaea94 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 31 Oct 2016 00:36:58 +0100 Subject: [PATCH] [hotfix] Improved test stability of RescalingITCase --- .../test/checkpointing/RescalingITCase.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 4819c26585ce1..da25ae641618d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -53,7 +53,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.rules.TemporaryFolder; import scala.Option; @@ -262,17 +261,11 @@ public void testSavepointRescalingNonPartitionedStateCausesException() throws Ex // wait until the operator is started StateSourceBase.workStartedLatch.await(); - while (deadline.hasTimeLeft()) { - Future savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.empty()), deadline.timeLeft()); - FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); - savepointResponse = Await.result(savepointPathFuture, waitingTime); - - if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) { - break; - } - } + Future savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.empty()), deadline.timeLeft()); + FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); + savepointResponse = Await.result(savepointPathFuture, waitingTime); - assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess); + assertTrue(String.valueOf(savepointResponse), savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess); final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess)savepointResponse).savepointPath(); @@ -585,7 +578,7 @@ private static JobGraph createJobGraphWithOperatorState( env.enableCheckpointing(Long.MAX_VALUE); env.setRestartStrategy(RestartStrategies.noRestart()); - StateSourceBase.workStartedLatch = new CountDownLatch(1); + StateSourceBase.workStartedLatch = new CountDownLatch(parallelism); SourceFunction src; @@ -922,6 +915,8 @@ private static class PartitionedStateSource extends StateSourceBase implements C @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { + counterPartitions.clear(); + CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter; int div = counter / NUM_PARTITIONS;