Skip to content

Commit

Permalink
[FLINK-34200][test] Fix the bug that AutoRescalingITCase.testCheckpoi…
Browse files Browse the repository at this point in the history
…ntRescalingInKeyedState fails occasionally
  • Loading branch information
1996fanrui authored and XComp committed Feb 8, 2024
1 parent cfdf05c commit 8b28900
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,49 +343,52 @@ public static void waitForSubtasksToFinish(

/** Wait for (at least) the given number of successful checkpoints. */
public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints)
throws Exception, FlinkJobNotFoundException {
waitUntilCondition(
() -> {
AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobID).get();
if (Optional.ofNullable(graph.getCheckpointStatsSnapshot())
.filter(
st ->
st.getCounts().getNumberOfCompletedCheckpoints()
>= numCheckpoints)
.isPresent()) {
return true;
} else if (graph.getState().isGloballyTerminalState()) {
checkState(
graph.getFailureInfo() != null,
"Job terminated before taking required %s checkpoints: %s",
numCheckpoints,
graph.getState());
throw graph.getFailureInfo().getException();
} else {
throws Exception {
waitForCheckpoints(
jobID,
miniCluster,
checkpointStatsSnapshot ->
checkpointStatsSnapshot != null
&& checkpointStatsSnapshot
.getCounts()
.getNumberOfCompletedCheckpoints()
>= numCheckpoints);
}

/**
* Wait for a new completed checkpoint, the new checkpoint must be triggered after
* waitForNewCheckpoint is called.
*/
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
final long startTime = System.currentTimeMillis();
waitForCheckpoints(
jobID,
miniCluster,
checkpointStatsSnapshot -> {
if (checkpointStatsSnapshot == null) {
return false;
}
final CompletedCheckpointStats latestCompletedCheckpoint =
checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
return latestCompletedCheckpoint != null
&& latestCompletedCheckpoint.getTriggerTimestamp() > startTime;
});
}

/** Wait for on more completed checkpoint. */
public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster)
// Wait for CheckpointStatsSnapshot to meet the condition.
private static void waitForCheckpoints(
JobID jobId, MiniCluster miniCluster, Predicate<CheckpointStatsSnapshot> condition)
throws Exception {
final long[] currentCheckpoint = new long[] {-1L};
waitUntilCondition(
() -> {
AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobID).get();
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot != null) {
long currentCount = snapshot.getCounts().getNumberOfCompletedCheckpoints();
if (currentCheckpoint[0] < 0L) {
currentCheckpoint[0] = currentCount;
} else {
return currentCount > currentCheckpoint[0];
}
final AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobId).get();
final CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (condition.test(snapshot)) {
return true;
} else if (graph.getState().isGloballyTerminalState()) {
checkState(
graph.getFailureInfo() != null,
"Job terminated before taking required checkpoint.",
"Job terminated (state=%s) before completing the requested checkpoint(s).",
graph.getState());
throw graph.getFailureInfo().getException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
import java.util.concurrent.TimeUnit;

import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForOneMoreCheckpoint;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForNewCheckpoint;
import static org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForAvailableSlots;
import static org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForRunningTasks;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -261,7 +261,11 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception

waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false);

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
// We need to wait for a checkpoint to be completed that was triggered after all the
// data was processed. That ensures the entire data being flushed out of the Operator's
// network buffers to avoid reprocessing test data twice after the restore (see
// FLINK-34200).
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

SubtaskIndexSource.SOURCE_LATCH.reset();

Expand Down Expand Up @@ -328,7 +332,7 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
Expand Down Expand Up @@ -411,7 +415,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

SubtaskIndexSource.SOURCE_LATCH.reset();

Expand Down Expand Up @@ -513,7 +517,7 @@ public void testCheckpointRescalingPartitionedOperatorState(
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
Expand Down

0 comments on commit 8b28900

Please sign in to comment.