diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index 35d33d7af9ba0..d2de69d6ec3f3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; @@ -54,15 +55,16 @@ import org.apache.flink.runtime.shuffle.ShuffleServiceOptions; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.throwable.ThrowableAnnotation; +import org.apache.flink.runtime.throwable.ThrowableType; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction; import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.util.RestartStrategyUtils; import org.apache.flink.test.util.MiniClusterWithClientResource; -import org.apache.flink.test.util.TestUtils; import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; @@ -184,24 +186,28 @@ protected File execute(UnalignedSettings settings) throws Exception { final CompletableFuture result = miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph()); + final JobID jobID = result.get().getJobID(); checkCounters( miniCluster .getMiniCluster() - .requestJobResult(result.get().getJobID()) + .requestJobResult(jobID) .get() .toJobExecutionResult(getClass().getClassLoader())); System.out.println( "Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + "."); + if (settings.generateCheckpoint) { + return CommonTestUtils.getLatestCompletedCheckpointPath( + jobID, miniCluster.getMiniCluster()) + .map(File::new) + .orElseThrow(() -> new AssertionError("Could not generate checkpoint")); + } } catch (Exception e) { - if (!ExceptionUtils.findThrowable(e, TestException.class).isPresent()) { + if (ExceptionUtils.findThrowable(e, TestException.class).isEmpty()) { throw e; } } finally { miniCluster.after(); } - if (settings.generateCheckpoint) { - return TestUtils.getMostRecentCompletedCheckpoint(checkpointDir); - } return null; } @@ -748,8 +754,6 @@ public void configure(StreamExecutionEnvironment env) { env.getCheckpointConfig() .setTolerableCheckpointFailureNumber(tolerableCheckpointFailures); env.setParallelism(parallelism); - RestartStrategyUtils.configureFixedDelayRestartStrategy( - env, generateCheckpoint ? expectedFailures / 2 : expectedFailures, 100L); env.getCheckpointConfig().enableUnalignedCheckpoints(true); // for custom partitioner env.getCheckpointConfig().setForceUnalignedCheckpoints(true); @@ -1128,6 +1132,7 @@ protected static long checkHeader(long value) { return value; } + @ThrowableAnnotation(ThrowableType.NonRecoverableError) static class TestException extends Exception { public TestException(String s) { super(s);