From 9e592bf122042000f4d8b899930e2e6d915deb6f Mon Sep 17 00:00:00 2001 From: Till Date: Tue, 17 Oct 2017 10:57:37 +0200 Subject: [PATCH] [FLINK-7844] [ckPt] Fail unacknowledged pending checkpoints for fine grained recovery This commit will fail all pending checkpoints which have not been acknowledged by the failed task in case of fine grained recovery. This is done in order to avoid long checkpoint timeouts which might block the CheckpointCoordinator from triggering new checkpoints This closes #4844. --- .../checkpoint/CheckpointCoordinator.java | 90 +++++++---- .../runtime/checkpoint/PendingCheckpoint.java | 4 + .../runtime/executiongraph/Execution.java | 2 +- .../executiongraph/ExecutionGraph.java | 5 + .../IndividualRestartsConcurrencyTest.java | 147 +++++++++++++++++- 5 files changed, 220 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index f932354c1b972..8692d43e5b092 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -724,31 +724,11 @@ public void receiveDeclineMessage(DeclineCheckpoint message) { return; } - checkpoint = pendingCheckpoints.get(checkpointId); + checkpoint = pendingCheckpoints.remove(checkpointId); if (checkpoint != null && !checkpoint.isDiscarded()) { - LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}", - checkpointId, message.getTaskExecutionId(), reason); - - pendingCheckpoints.remove(checkpointId); - checkpoint.abortDeclined(); - rememberRecentCheckpointId(checkpointId); - - // we don't have to schedule another "dissolving" checkpoint any more because the - // cancellation barriers take care of breaking downstream alignments - // we only need to make sure that suspended queued requests are resumed - - boolean haveMoreRecentPending = false; - for (PendingCheckpoint p : pendingCheckpoints.values()) { - if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) { - haveMoreRecentPending = true; - break; - } - } - - if (!haveMoreRecentPending) { - triggerQueuedRequests(); - } + LOG.info("Decline checkpoint {} by task {}.", checkpointId, message.getTaskExecutionId()); + discardCheckpoint(checkpoint, message.getReason()); } else if (checkpoint != null) { // this should not happen @@ -893,10 +873,10 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro if (!pendingCheckpoint.isDiscarded()) { pendingCheckpoint.abortError(e1); } - + throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1); } - + // the pending checkpoint must be discarded after the finalization Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null); @@ -928,7 +908,7 @@ public void run() { triggerQueuedRequests(); } - + rememberRecentCheckpointId(checkpointId); // record the time when this was completed, to calculate @@ -962,6 +942,28 @@ public void run() { } } + /** + * Fails all pending checkpoints which have not been acknowledged by the given execution + * attempt id. + * + * @param executionAttemptId for which to discard unaknowledged pending checkpoints + * @param cause of the failure + */ + public void failUnacknowledgedPendingCheckpointsFor(ExecutionAttemptID executionAttemptId, Throwable cause) { + synchronized (lock) { + Iterator pendingCheckpointIterator = pendingCheckpoints.values().iterator(); + + while (pendingCheckpointIterator.hasNext()) { + final PendingCheckpoint pendingCheckpoint = pendingCheckpointIterator.next(); + + if (!pendingCheckpoint.isAcknowledgedBy(executionAttemptId)) { + pendingCheckpointIterator.remove(); + discardCheckpoint(pendingCheckpoint, cause); + } + } + } + } + private void rememberRecentCheckpointId(long id) { if (recentPendingCheckpoints.size() >= NUM_GHOST_CHECKPOINT_IDS) { recentPendingCheckpoints.removeFirst(); @@ -1269,6 +1271,42 @@ public void run() { } } + /** + * Discards the given pending checkpoint because of the given cause. + * + * @param pendingCheckpoint to discard + * @param cause for discarding the checkpoint + */ + private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Throwable cause) { + assert(Thread.holdsLock(lock)); + Preconditions.checkNotNull(pendingCheckpoint); + + final long checkpointId = pendingCheckpoint.getCheckpointId(); + + final String reason = (cause != null) ? cause.getMessage() : ""; + + LOG.info("Discarding checkpoint {} because: {}", checkpointId, reason); + + pendingCheckpoint.abortDeclined(); + rememberRecentCheckpointId(checkpointId); + + // we don't have to schedule another "dissolving" checkpoint any more because the + // cancellation barriers take care of breaking downstream alignments + // we only need to make sure that suspended queued requests are resumed + + boolean haveMoreRecentPending = false; + for (PendingCheckpoint p : pendingCheckpoints.values()) { + if (!p.isDiscarded() && p.getCheckpointId() >= pendingCheckpoint.getCheckpointId()) { + haveMoreRecentPending = true; + break; + } + } + + if (!haveMoreRecentPending) { + triggerQueuedRequests(); + } + } + /** * Discards the given state object asynchronously belonging to the given job, execution attempt * id and checkpoint id. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 16231dd0a9663..a9b6d4df2e787 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -184,6 +184,10 @@ public boolean isFullyAcknowledged() { return this.notYetAcknowledgedTasks.isEmpty() && !discarded; } + public boolean isAcknowledgedBy(ExecutionAttemptID executionAttemptId) { + return !notYetAcknowledgedTasks.containsKey(executionAttemptId); + } + public boolean isDiscarded() { return discarded; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 2074820b41dc3..939c2908818cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -941,7 +941,7 @@ private boolean processFail(Throwable t, boolean isCallback, MapThis test must be in the package it resides in, because it uses package-private methods * from the ExecutionGraph classes. */ -public class IndividualRestartsConcurrencyTest { +public class IndividualRestartsConcurrencyTest extends TestLogger { /** * Tests that a cancellation concurrent to a local failover leads to a properly @@ -261,6 +290,122 @@ public void testGlobalRecoveryConcurrentToLocalRecovery() throws Exception { assertEquals(0, slotProvider.getNumberOfAvailableSlots()); } + /** + * Tests that a local failure fails all pending checkpoints which have not been acknowledged by the failing + * task. + */ + @Test + public void testLocalFailureFailsPendingCheckpoints() throws Exception { + final JobID jid = new JobID(); + final int parallelism = 2; + final long verifyTimeout = 5000L; + + final TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class); + when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + when(taskManagerGateway.cancelTask(any(ExecutionAttemptID.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism, taskManagerGateway); + final Executor executor = TestingUtils.defaultExecutor(); + + + final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration( + 10L, + 100000L, + 1L, + 3, + ExternalizedCheckpointSettings.none(), + true); + + final ExecutionGraph graph = createSampleGraph( + jid, + new IndividualFailoverWithCustomExecutor(executor), + slotProvider, + parallelism); + + final List allVertices = new ArrayList<>(graph.getAllVertices().values()); + + final StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter(); + + graph.enableCheckpointing( + checkpointCoordinatorConfiguration.getCheckpointInterval(), + checkpointCoordinatorConfiguration.getCheckpointTimeout(), + checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(), + checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(), + checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings(), + allVertices, + allVertices, + allVertices, + Collections.emptyList(), + standaloneCheckpointIDCounter, + new StandaloneCompletedCheckpointStore(1), + "", + new MemoryStateBackend(), + new CheckpointStatsTracker( + 1, + allVertices, + checkpointCoordinatorConfiguration, + new UnregisteredTaskMetricsGroup())); + + final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator(); + + final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next(); + final ExecutionVertex vertex1 = ejv.getTaskVertices()[0]; + final ExecutionVertex vertex2 = ejv.getTaskVertices()[1]; + + graph.scheduleForExecution(); + assertEquals(JobStatus.RUNNING, graph.getState()); + + verify(taskManagerGateway, timeout(verifyTimeout).times(parallelism)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + + // switch all executions to running + for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) { + executionVertex.getCurrentExecutionAttempt().switchToRunning(); + } + + // wait for a first checkpoint to be triggered + verify(taskManagerGateway, timeout(verifyTimeout).times(3)).triggerCheckpoint( + eq(vertex1.getCurrentExecutionAttempt().getAttemptId()), + any(JobID.class), + anyLong(), + anyLong(), + any(CheckpointOptions.class)); + + verify(taskManagerGateway, timeout(verifyTimeout).times(3)).triggerCheckpoint( + eq(vertex2.getCurrentExecutionAttempt().getAttemptId()), + any(JobID.class), + anyLong(), + anyLong(), + any(CheckpointOptions.class)); + + assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints()); + + long checkpointToAcknowledge = standaloneCheckpointIDCounter.getLast(); + + checkpointCoordinator.receiveAcknowledgeMessage( + new AcknowledgeCheckpoint( + graph.getJobID(), + vertex1.getCurrentExecutionAttempt().getAttemptId(), + checkpointToAcknowledge)); + + Map oldPendingCheckpoints = new HashMap<>(3); + + for (PendingCheckpoint pendingCheckpoint : checkpointCoordinator.getPendingCheckpoints().values()) { + assertFalse(pendingCheckpoint.isDiscarded()); + oldPendingCheckpoints.put(pendingCheckpoint.getCheckpointId(), pendingCheckpoint); + } + + // let one of the vertices fail - this should trigger the failing of not acknowledged pending checkpoints + vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure")); + + for (PendingCheckpoint pendingCheckpoint : oldPendingCheckpoints.values()) { + if (pendingCheckpoint.getCheckpointId() == checkpointToAcknowledge) { + assertFalse(pendingCheckpoint.isDiscarded()); + } else { + assertTrue(pendingCheckpoint.isDiscarded()); + } + } + } + // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------