Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,49 +343,52 @@ public static void waitForSubtasksToFinish(

/** Wait for (at least) the given number of successful checkpoints. */
public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints)
throws Exception, FlinkJobNotFoundException {
waitUntilCondition(
() -> {
AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobID).get();
if (Optional.ofNullable(graph.getCheckpointStatsSnapshot())
.filter(
st ->
st.getCounts().getNumberOfCompletedCheckpoints()
>= numCheckpoints)
.isPresent()) {
return true;
} else if (graph.getState().isGloballyTerminalState()) {
checkState(
graph.getFailureInfo() != null,
"Job terminated before taking required %s checkpoints: %s",
numCheckpoints,
graph.getState());
throw graph.getFailureInfo().getException();
} else {
throws Exception {
waitForCheckpoints(
jobID,
miniCluster,
checkpointStatsSnapshot ->
checkpointStatsSnapshot != null
&& checkpointStatsSnapshot
.getCounts()
.getNumberOfCompletedCheckpoints()
>= numCheckpoints);
}

/**
* Wait for a new completed checkpoint, the new checkpoint must be triggered after
* waitForNewCheckpoint is called.
*/
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster, int checkpointCount) throws Exception {

Can't we make the number of checkpoints to wait for configurable? That way, we can pass in 2 in the test implementation analogously to waitForCheckpoint. I also feel like we can remove some redundant code within the two methods. 🤔

Copy link
Member Author

@1996fanrui 1996fanrui Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way, we can pass in 2 in the test implementation analogously to waitForCheckpoint. I also feel like we can remove some redundant code within the two methods. 🤔

IIUC, the semantic between waitForCheckpoint and waitForOneMoreCheckpoint are different. (waitForOneMoreCheckpoint is renamed to waitForNewCheckpoint in this PR.)

  • waitForCheckpoint check the total count of all completed checkpoints.
  • waitForOneMoreCheckpoint check the whether the new checkpoint is completed after it's called.
    • For example, the job has 10 completed checkpoint before it's called.
    • waitForOneMoreCheckpoint will wait for checkpoint-11 is completed.

BTW, I have refactored the waitForNewCheckpoint. I check the checkpoint trigger time instead of checkpointCount.

I think checking trigger time is clearer than checkpointCount >= 2. Other developers might don't know why check 2 checkpoint here, and checkpointCount >= 2 doesn't work when enabling the concurrent checkpoint.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Checking the trigger time is a better solution. I like that. 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the redundant code:

    /** Wait for (at least) the given number of successful checkpoints. */
    public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints)
            throws Exception {
        waitForCheckpoints(
                jobID,
                miniCluster,
                checkpointStatsSnapshot ->
                        checkpointStatsSnapshot != null
                                && checkpointStatsSnapshot
                                                .getCounts()
                                                .getNumberOfCompletedCheckpoints()
                                        >= numCheckpoints);
    }

    /**
     * Wait for a new completed checkpoint, the new checkpoint must be triggered after
     * waitForNewCheckpoint is called.
     */
    public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
        final long startTime = System.currentTimeMillis();
        waitForCheckpoints(
                jobID,
                miniCluster,
                checkpointStatsSnapshot -> {
                    if (checkpointStatsSnapshot != null) {
                        final CompletedCheckpointStats latestCompletedCheckpoint =
                                checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
                        return latestCompletedCheckpoint != null
                                && latestCompletedCheckpoint.getTriggerTimestamp() > startTime;
                    }

                    return false;
                });
    }

    private static void waitForCheckpoints(
            JobID jobId, MiniCluster miniCluster, Predicate<CheckpointStatsSnapshot> condition)
            throws Exception {
        waitUntilCondition(
                () -> {
                    final AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobId).get();
                    final CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
                    if (condition.test(snapshot)) {
                        return true;
                    } else if (graph.getState().isGloballyTerminalState()) {
                        checkState(
                                graph.getFailureInfo() != null,
                                "Job terminated (state=%s) before completing the requested checkpoint(s).",
                                graph.getState());
                        throw graph.getFailureInfo().getException();
                    }
                    return false;
                });
    }

...just to clarify what I meant. Feel free to ignore that one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your code, this refactor makes sense to me. I have updated.

final long startTime = System.currentTimeMillis();
waitForCheckpoints(
jobID,
miniCluster,
checkpointStatsSnapshot -> {
if (checkpointStatsSnapshot == null) {
return false;
}
final CompletedCheckpointStats latestCompletedCheckpoint =
checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
return latestCompletedCheckpoint != null
&& latestCompletedCheckpoint.getTriggerTimestamp() > startTime;
});
}

/** Wait for on more completed checkpoint. */
public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster)
// Wait for CheckpointStatsSnapshot to meet the condition.
private static void waitForCheckpoints(
JobID jobId, MiniCluster miniCluster, Predicate<CheckpointStatsSnapshot> condition)
throws Exception {
final long[] currentCheckpoint = new long[] {-1L};
waitUntilCondition(
() -> {
AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobID).get();
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot != null) {
long currentCount = snapshot.getCounts().getNumberOfCompletedCheckpoints();
if (currentCheckpoint[0] < 0L) {
currentCheckpoint[0] = currentCount;
} else {
return currentCount > currentCheckpoint[0];
}
final AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobId).get();
final CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (condition.test(snapshot)) {
return true;
} else if (graph.getState().isGloballyTerminalState()) {
checkState(
graph.getFailureInfo() != null,
"Job terminated before taking required checkpoint.",
"Job terminated (state=%s) before completing the requested checkpoint(s).",
graph.getState());
throw graph.getFailureInfo().getException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
import java.util.concurrent.TimeUnit;

import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForOneMoreCheckpoint;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForNewCheckpoint;
import static org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForAvailableSlots;
import static org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForRunningTasks;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -163,6 +163,8 @@ public void setup() throws Exception {
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, buffersPerChannel);

config.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
// Disable the scaling cooldown to speed up the test
config.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit message needs to be changed to someting like [hotfix][tests] Disables cool down phase for faster test execution


// speed the test suite up
// - lower refresh interval -> controls how fast we invalidate ExecutionGraphCache
Expand Down Expand Up @@ -261,7 +263,11 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception

waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false);

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
// We need to wait for a checkpoint to be completed that was triggered after all the
// data was processed. That ensures the entire data being flushed out of the Operator's
// network buffers to avoid reprocessing test data twice after the restore (see
// FLINK-34200).
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

SubtaskIndexSource.SOURCE_LATCH.reset();

Expand Down Expand Up @@ -328,7 +334,7 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
Expand Down Expand Up @@ -411,7 +417,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the scenario can happen in this test as well because it's almost the same test implementation as in #testCheckpointRescalingKeyedState 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether the redundant code could be removed here. But that's probably a bit out-of-scope for this issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean createJobGraphWithKeyedState and createJobGraphWithKeyedAndNonPartitionedOperatorState have redundant code ? Or testCheckpointRescalingWithKeyedAndNonPartitionedState and testCheckpointRescalingKeyedState?

I checked them, they have a lot of differences in details. Such as:

  • Source is different
  • The parallelism and MaxParallelism is fixed parallelism for NonPartitionedOperator

I will check could they extract some common code later. If yes, I can submit a hotfix PR and cc you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a PR to show what I had in mind for the code redundancy reduction

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems out-of-scope of this JIRA. Would you mind if we refactor it in a hotfix PR?

Your code is ready. After this PR, you can submit a official PR, and I can help review. WDYT?


SubtaskIndexSource.SOURCE_LATCH.reset();

Expand Down Expand Up @@ -513,7 +519,7 @@ public void testCheckpointRescalingPartitionedOperatorState(
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();

waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
waitForNewCheckpoint(jobID, cluster.getMiniCluster());

JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
Expand Down