From 8b289000f65aef39f48d6cc6ddb817208a62fdee Mon Sep 17 00:00:00 2001 From: Rui Fan <1996fanrui@gmail.com> Date: Thu, 1 Feb 2024 21:07:40 +0800 Subject: [PATCH] [FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally --- .../runtime/testutils/CommonTestUtils.java | 67 ++++++++++--------- .../checkpointing/AutoRescalingITCase.java | 14 ++-- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java index da01615050065..0720197e911c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java @@ -343,49 +343,52 @@ public static void waitForSubtasksToFinish( /** Wait for (at least) the given number of successful checkpoints. */ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints) - throws Exception, FlinkJobNotFoundException { - waitUntilCondition( - () -> { - AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobID).get(); - if (Optional.ofNullable(graph.getCheckpointStatsSnapshot()) - .filter( - st -> - st.getCounts().getNumberOfCompletedCheckpoints() - >= numCheckpoints) - .isPresent()) { - return true; - } else if (graph.getState().isGloballyTerminalState()) { - checkState( - graph.getFailureInfo() != null, - "Job terminated before taking required %s checkpoints: %s", - numCheckpoints, - graph.getState()); - throw graph.getFailureInfo().getException(); - } else { + throws Exception { + waitForCheckpoints( + jobID, + miniCluster, + checkpointStatsSnapshot -> + checkpointStatsSnapshot != null + && checkpointStatsSnapshot + .getCounts() + .getNumberOfCompletedCheckpoints() + >= numCheckpoints); + } + + /** + * Wait for a new completed checkpoint, the new checkpoint must be triggered after + * waitForNewCheckpoint is called. + */ + public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { + final long startTime = System.currentTimeMillis(); + waitForCheckpoints( + jobID, + miniCluster, + checkpointStatsSnapshot -> { + if (checkpointStatsSnapshot == null) { return false; } + final CompletedCheckpointStats latestCompletedCheckpoint = + checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint(); + return latestCompletedCheckpoint != null + && latestCompletedCheckpoint.getTriggerTimestamp() > startTime; }); } - /** Wait for on more completed checkpoint. */ - public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) + // Wait for CheckpointStatsSnapshot to meet the condition. + private static void waitForCheckpoints( + JobID jobId, MiniCluster miniCluster, Predicate condition) throws Exception { - final long[] currentCheckpoint = new long[] {-1L}; waitUntilCondition( () -> { - AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobID).get(); - CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); - if (snapshot != null) { - long currentCount = snapshot.getCounts().getNumberOfCompletedCheckpoints(); - if (currentCheckpoint[0] < 0L) { - currentCheckpoint[0] = currentCount; - } else { - return currentCount > currentCheckpoint[0]; - } + final AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobId).get(); + final CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); + if (condition.test(snapshot)) { + return true; } else if (graph.getState().isGloballyTerminalState()) { checkState( graph.getFailureInfo() != null, - "Job terminated before taking required checkpoint.", + "Job terminated (state=%s) before completing the requested checkpoint(s).", graph.getState()); throw graph.getFailureInfo().getException(); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java index 691688ae6022a..eab82b1c72704 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java @@ -86,7 +86,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; -import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForOneMoreCheckpoint; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForNewCheckpoint; import static org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForAvailableSlots; import static org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForRunningTasks; import static org.junit.Assert.assertEquals; @@ -261,7 +261,11 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + // We need to wait for a checkpoint to be completed that was triggered after all the + // data was processed. That ensures the entire data being flushed out of the Operator's + // network buffers to avoid reprocessing test data twice after the restore (see + // FLINK-34200). + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); SubtaskIndexSource.SOURCE_LATCH.reset(); @@ -328,7 +332,7 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder(); for (JobVertex vertex : jobGraph.getVertices()) { @@ -411,7 +415,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); SubtaskIndexSource.SOURCE_LATCH.reset(); @@ -513,7 +517,7 @@ public void testCheckpointRescalingPartitionedOperatorState( // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder(); for (JobVertex vertex : jobGraph.getVertices()) {