From a413eda02a9a7dc71d40574dedd4384666bc74f7 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Thu, 3 Nov 2016 15:52:24 +0100 Subject: [PATCH] [FLINK-5007] [checkpointing] Retain externalized checkpoint on suspension Handles graceful cluster shut down (non-HA) like cancellation. --- .../flink/runtime/checkpoint/CheckpointProperties.java | 4 +++- .../flink/runtime/checkpoint/CheckpointPropertiesTest.java | 4 ++-- .../flink/streaming/api/environment/CheckpointConfig.java | 5 +++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java index e4856cf43ae46..68a49984447e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java @@ -264,6 +264,8 @@ public static CheckpointProperties forStandardCheckpoint() { * @return Checkpoint properties for an external checkpoint. */ public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) { - return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, true); + // Handle suspension like cancellation as graceful cluster shut down + // suspends all jobs (non-HA). + return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, deleteOnCancellation); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java index c9968861d1597..11bddb95e9b15 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java @@ -48,7 +48,7 @@ public void testCheckpointProperties() { * Tests the external checkpoints properties. */ @Test - public void testPersistentCheckpointProperties() { + public void testExternalizedCheckpointProperties() { CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true); assertFalse(props.forceCheckpoint()); @@ -67,7 +67,7 @@ public void testPersistentCheckpointProperties() { assertTrue(props.discardOnJobFinished()); assertFalse(props.discardOnJobCancelled()); assertFalse(props.discardOnJobFailed()); - assertTrue(props.discardOnJobSuspended()); + assertFalse(props.discardOnJobSuspended()); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 0a7f65ed31f1d..eb7833a636e32 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -235,8 +235,9 @@ public void setForceCheckpointing(boolean forceCheckpointing) { * *

Externalized checkpoints write their meta data out to persistent * storage and are not automatically cleaned up when - * the owning job fails (terminating with job status {@link JobStatus#FAILED}). - * In this case, you have to manually clean up the checkpoint state, both + * the owning job fails or is suspended (terminating with job status + * {@link JobStatus#FAILED} or {@link JobStatus#SUSPENDED}). In this + * case, you have to manually clean up the checkpoint state, both * the meta data and actual program state. * *

The {@link ExternalizedCheckpointCleanup} mode defines how an