diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6d0f5f3ede232..386bb07ee973a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -476,12 +476,17 @@ private CompletableFuture triggerSavepointInternal( checkNotNull(checkpointProperties); + return triggerCheckpointFromCheckpointThread(checkpointProperties, targetLocation, false); + } + + private CompletableFuture triggerCheckpointFromCheckpointThread( + CheckpointProperties checkpointProperties, String targetLocation, boolean isPeriodic) { // TODO, call triggerCheckpoint directly after removing timer thread // for now, execute the trigger in timer thread to avoid competition final CompletableFuture resultFuture = new CompletableFuture<>(); timer.execute( () -> - triggerCheckpoint(checkpointProperties, targetLocation, false) + triggerCheckpoint(checkpointProperties, targetLocation, isPeriodic) .whenComplete( (completedCheckpoint, throwable) -> { if (throwable == null) { @@ -498,16 +503,15 @@ private CompletableFuture triggerSavepointInternal( * The return value is a future. It completes when the checkpoint triggered finishes or an error * occurred. * - * @param isPeriodic Flag indicating whether this triggered checkpoint is periodic. If this flag - * is true, but the periodic scheduler is disabled, the checkpoint will be declined. + * @param isPeriodic Flag indicating whether this triggered checkpoint is periodic. * @return a future to the completed checkpoint. */ public CompletableFuture triggerCheckpoint(boolean isPeriodic) { - return triggerCheckpoint(checkpointProperties, null, isPeriodic); + return triggerCheckpointFromCheckpointThread(checkpointProperties, null, isPeriodic); } @VisibleForTesting - public CompletableFuture triggerCheckpoint( + CompletableFuture triggerCheckpoint( CheckpointProperties props, @Nullable String externalSavepointLocation, boolean isPeriodic) { @@ -976,8 +980,10 @@ private Optional chooseQueuedRequestToExecute() { private Optional chooseRequestToExecute( CheckpointTriggerRequest request) { synchronized (lock) { - return requestDecider.chooseRequestToExecute( - request, isTriggering, lastCheckpointCompletionRelativeTime); + Optional checkpointTriggerRequest = + requestDecider.chooseRequestToExecute( + request, isTriggering, lastCheckpointCompletionRelativeTime); + return checkpointTriggerRequest; } } @@ -2026,7 +2032,7 @@ private final class ScheduledTrigger implements Runnable { @Override public void run() { try { - triggerCheckpoint(true); + triggerCheckpoint(checkpointProperties, null, true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint for job {}.", job, e); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 72172a78b42be..6749b9aa51658 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -3220,6 +3220,7 @@ public void testSavepointScheduledInUnalignedMode() throws Exception { checkpointFutures.add(coordinator.triggerCheckpoint(true)); activeRequests++; } + manuallyTriggeredScheduledExecutor.triggerAll(); assertEquals( activeRequests - maxConcurrentCheckpoints, coordinator.getNumQueuedRequests()); @@ -3730,7 +3731,9 @@ public void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() throws E .build(graph); try { checkpointCoordinator.triggerCheckpoint(false); - // trigger twice to get checkpoint id and create pending checkpoint + // trigger three times to trigger checkpoint, to get checkpoint id and create pending + // checkpoint + manuallyTriggeredScheduledExecutor.trigger(); manuallyTriggeredScheduledExecutor.trigger(); manuallyTriggeredScheduledExecutor.trigger(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java index 3eb7170c0c636..663471946487e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java @@ -23,6 +23,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -32,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorResource; import org.apache.flink.util.ExceptionUtils; @@ -716,7 +718,18 @@ public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() thr .setTimer(new ScheduledExecutorServiceAdapter(scheduledExecutorService)) .setCheckpointCoordinatorConfiguration( CheckpointCoordinatorConfiguration.builder().build()) - .build(EXECUTOR_RESOURCE.getExecutor()); + // Since timer thread != main thread we should override the default main + // thread executor because it initially requires triggering a checkpoint + // from the main test thread. + .build( + new CheckpointCoordinatorTestingUtils + .CheckpointExecutionGraphBuilder() + .addJobVertex(new JobVertexID()) + .setMainThreadExecutor( + ComponentMainThreadExecutorServiceAdapter + .forSingleThreadExecutor( + new DirectScheduledExecutorService())) + .build(EXECUTOR_RESOURCE.getExecutor())); final CompletableFuture masterHookCheckpointFuture = new CompletableFuture<>(); final OneShotLatch triggerCheckpointLatch = new OneShotLatch(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index b7b9b04a85ebe..47c1ae6ff6b53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -788,7 +788,11 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { final CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch(); - final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + final DefaultScheduler scheduler = + createSchedulerAndStartScheduling( + jobGraph, + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + new DirectScheduledExecutorService())); final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement( @@ -818,7 +822,11 @@ public void restoreStateWhenRestartingTasks() throws Exception { final CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch(); - final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + final DefaultScheduler scheduler = + createSchedulerAndStartScheduling( + jobGraph, + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + new DirectScheduledExecutorService())); final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement( @@ -856,7 +864,11 @@ public void failGlobalWhenRestoringStateFails() throws Exception { final CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch(); - final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + final DefaultScheduler scheduler = + createSchedulerAndStartScheduling( + jobGraph, + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + new DirectScheduledExecutorService())); final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement(