Skip to content

Commit

Permalink
[FLINK-8531] [checkpoints] (part 2) Add CheckpointType to CheckpointP…
Browse files Browse the repository at this point in the history
…roperties
  • Loading branch information
StephanEwen committed Feb 1, 2018
1 parent 99495c9 commit 35c7d93
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 29 deletions.
Expand Up @@ -18,45 +18,50 @@


package org.apache.flink.runtime.checkpoint; package org.apache.flink.runtime.checkpoint;


import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;


import java.io.Serializable; import java.io.Serializable;


import static org.apache.flink.util.Preconditions.checkNotNull;

/** /**
* The configuration of a checkpoint, such as whether * The configuration of a checkpoint. This describes whether
* <ul> * <ul>
* <li>The checkpoint should be persisted</li> * <li>The checkpoint is s regular checkpoint or a savepoint.</li>
* <li>The checkpoint must be full, or may be incremental (not yet implemented)</li> * <li>When the checkpoint should be garbage collected.</li>
* <li>The checkpoint format must be the common (cross backend) format,
* or may be state-backend specific (not yet implemented)</li>
* <li>when the checkpoint should be garbage collected</li>
* </ul> * </ul>
*/ */
public class CheckpointProperties implements Serializable { public class CheckpointProperties implements Serializable {


private static final long serialVersionUID = -8835900655844879470L; private static final long serialVersionUID = 2L;


private final boolean forced; /** Type - checkpoint / savepoint. */
private final CheckpointType checkpointType;


private final boolean savepoint; /** This has a misleading name and actually means whether the snapshot must be triggered,
* or whether it may be rejected by the checkpoint coordinator if too many checkpoints are
* currently in progress. */
private final boolean forced;


private final boolean discardSubsumed; private final boolean discardSubsumed;
private final boolean discardFinished; private final boolean discardFinished;
private final boolean discardCancelled; private final boolean discardCancelled;
private final boolean discardFailed; private final boolean discardFailed;
private final boolean discardSuspended; private final boolean discardSuspended;


@VisibleForTesting
CheckpointProperties( CheckpointProperties(
boolean forced, boolean forced,
boolean savepoint, CheckpointType checkpointType,
boolean discardSubsumed, boolean discardSubsumed,
boolean discardFinished, boolean discardFinished,
boolean discardCancelled, boolean discardCancelled,
boolean discardFailed, boolean discardFailed,
boolean discardSuspended) { boolean discardSuspended) {


this.forced = forced; this.forced = forced;
this.savepoint = savepoint; this.checkpointType = checkNotNull(checkpointType);
this.discardSubsumed = discardSubsumed; this.discardSubsumed = discardSubsumed;
this.discardFinished = discardFinished; this.discardFinished = discardFinished;
this.discardCancelled = discardCancelled; this.discardCancelled = discardCancelled;
Expand Down Expand Up @@ -158,13 +163,20 @@ boolean discardOnJobSuspended() {
return discardSuspended; return discardSuspended;
} }


/**
* Gets the type of the checkpoint (checkpoint / savepoint).
*/
public CheckpointType getCheckpointType() {
return checkpointType;
}

/** /**
* Returns whether the checkpoint properties describe a standard savepoint. * Returns whether the checkpoint properties describe a standard savepoint.
* *
* @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise. * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
*/ */
public boolean isSavepoint() { public boolean isSavepoint() {
return savepoint; return checkpointType == CheckpointType.SAVEPOINT;
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand All @@ -181,7 +193,7 @@ public boolean equals(Object o) {


CheckpointProperties that = (CheckpointProperties) o; CheckpointProperties that = (CheckpointProperties) o;
return forced == that.forced && return forced == that.forced &&
savepoint == that.savepoint && checkpointType == that.checkpointType &&
discardSubsumed == that.discardSubsumed && discardSubsumed == that.discardSubsumed &&
discardFinished == that.discardFinished && discardFinished == that.discardFinished &&
discardCancelled == that.discardCancelled && discardCancelled == that.discardCancelled &&
Expand All @@ -192,7 +204,7 @@ public boolean equals(Object o) {
@Override @Override
public int hashCode() { public int hashCode() {
int result = (forced ? 1 : 0); int result = (forced ? 1 : 0);
result = 31 * result + (savepoint ? 1 : 0); result = 31 * result + checkpointType.hashCode();
result = 31 * result + (discardSubsumed ? 1 : 0); result = 31 * result + (discardSubsumed ? 1 : 0);
result = 31 * result + (discardFinished ? 1 : 0); result = 31 * result + (discardFinished ? 1 : 0);
result = 31 * result + (discardCancelled ? 1 : 0); result = 31 * result + (discardCancelled ? 1 : 0);
Expand All @@ -205,7 +217,7 @@ public int hashCode() {
public String toString() { public String toString() {
return "CheckpointProperties{" + return "CheckpointProperties{" +
"forced=" + forced + "forced=" + forced +
", savepoint=" + savepoint + ", checkpointType=" + checkpointType +
", discardSubsumed=" + discardSubsumed + ", discardSubsumed=" + discardSubsumed +
", discardFinished=" + discardFinished + ", discardFinished=" + discardFinished +
", discardCancelled=" + discardCancelled + ", discardCancelled=" + discardCancelled +
Expand All @@ -214,11 +226,13 @@ public String toString() {
'}'; '}';
} }


// ------------------------------------------------------------------------
// Factories and pre-configured properties
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


private static final CheckpointProperties SAVEPOINT = new CheckpointProperties( private static final CheckpointProperties SAVEPOINT = new CheckpointProperties(
true, true,
true, CheckpointType.SAVEPOINT,
false, false,
false, false,
false, false,
Expand All @@ -227,7 +241,7 @@ public String toString() {


private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = new CheckpointProperties( private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = new CheckpointProperties(
false, false,
false, CheckpointType.CHECKPOINT,
true, true,
true, // Delete on success true, // Delete on success
true, // Delete on cancellation true, // Delete on cancellation
Expand All @@ -236,7 +250,7 @@ public String toString() {


private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties( private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties(
false, false,
false, CheckpointType.CHECKPOINT,
true, true,
true, // Delete on success true, // Delete on success
true, // Delete on cancellation true, // Delete on cancellation
Expand All @@ -245,7 +259,7 @@ public String toString() {


private static final CheckpointProperties CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties( private static final CheckpointProperties CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties(
false, false,
false, CheckpointType.CHECKPOINT,
true, true,
true, // Delete on success true, // Delete on success
false, // Retain on cancellation false, // Retain on cancellation
Expand Down
Expand Up @@ -85,8 +85,8 @@ public void testCleanUpOnSubsume() throws Exception {
StreamStateHandle metadataHandle = mock(StreamStateHandle.class); StreamStateHandle metadataHandle = mock(StreamStateHandle.class);


boolean discardSubsumed = true; boolean discardSubsumed = true;
CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true); CheckpointProperties props = new CheckpointProperties(false, CheckpointType.CHECKPOINT, discardSubsumed, true, true, true, true);

CompletedCheckpoint checkpoint = new CompletedCheckpoint( CompletedCheckpoint checkpoint = new CompletedCheckpoint(
new JobID(), 0, 0, 1, new JobID(), 0, 0, 1,
operatorStates, operatorStates,
Expand Down Expand Up @@ -124,7 +124,7 @@ public void testCleanUpOnShutdown() throws Exception {
StreamStateHandle metadataHandle = mock(StreamStateHandle.class); StreamStateHandle metadataHandle = mock(StreamStateHandle.class);


// Keep // Keep
CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false); CheckpointProperties props = new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false);
CompletedCheckpoint checkpoint = new CompletedCheckpoint( CompletedCheckpoint checkpoint = new CompletedCheckpoint(
new JobID(), 0, 0, 1, new JobID(), 0, 0, 1,
new HashMap<>(operatorStates), new HashMap<>(operatorStates),
Expand All @@ -141,7 +141,7 @@ public void testCleanUpOnShutdown() throws Exception {
verify(metadataHandle, times(0)).discardState(); verify(metadataHandle, times(0)).discardState();


// Discard // Discard
props = new CheckpointProperties(false, false, true, true, true, true, true); props = new CheckpointProperties(false, CheckpointType.CHECKPOINT, true, true, true, true, true);
checkpoint = new CompletedCheckpoint( checkpoint = new CompletedCheckpoint(
new JobID(), 0, 0, 1, new JobID(), 0, 0, 1,
new HashMap<>(operatorStates), new HashMap<>(operatorStates),
Expand Down
Expand Up @@ -86,7 +86,7 @@ public class PendingCheckpointTest {
@Test @Test
public void testCanBeSubsumed() throws Exception { public void testCanBeSubsumed() throws Exception {
// Forced checkpoints cannot be subsumed // Forced checkpoints cannot be subsumed
CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false); CheckpointProperties forced = new CheckpointProperties(true, CheckpointType.SAVEPOINT, false, false, false, false, false);
PendingCheckpoint pending = createPendingCheckpoint(forced); PendingCheckpoint pending = createPendingCheckpoint(forced);
assertFalse(pending.canBeSubsumed()); assertFalse(pending.canBeSubsumed());


Expand All @@ -98,7 +98,7 @@ public void testCanBeSubsumed() throws Exception {
} }


// Non-forced checkpoints can be subsumed // Non-forced checkpoints can be subsumed
CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false); CheckpointProperties subsumed = new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false);
pending = createPendingCheckpoint(subsumed); pending = createPendingCheckpoint(subsumed);
assertTrue(pending.canBeSubsumed()); assertTrue(pending.canBeSubsumed());
} }
Expand All @@ -109,7 +109,7 @@ public void testCanBeSubsumed() throws Exception {
*/ */
@Test @Test
public void testCompletionFuture() throws Exception { public void testCompletionFuture() throws Exception {
CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false); CheckpointProperties props = new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false);


// Abort declined // Abort declined
PendingCheckpoint pending = createPendingCheckpoint(props); PendingCheckpoint pending = createPendingCheckpoint(props);
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testCompletionFuture() throws Exception {
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testAbortDiscardsState() throws Exception { public void testAbortDiscardsState() throws Exception {
CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false); CheckpointProperties props = new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false);
QueueExecutor executor = new QueueExecutor(); QueueExecutor executor = new QueueExecutor();


OperatorState state = mock(OperatorState.class); OperatorState state = mock(OperatorState.class);
Expand Down Expand Up @@ -307,7 +307,7 @@ public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {


@Test @Test
public void testSetCanceller() throws Exception { public void testSetCanceller() throws Exception {
final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true); final CheckpointProperties props = new CheckpointProperties(false, CheckpointType.CHECKPOINT, true, true, true, true, true);


PendingCheckpoint aborted = createPendingCheckpoint(props); PendingCheckpoint aborted = createPendingCheckpoint(props);
aborted.abortDeclined(); aborted.abortDeclined();
Expand Down
Expand Up @@ -31,7 +31,7 @@ public class RestoredCheckpointStatsTest {
public void testSimpleAccess() throws Exception { public void testSimpleAccess() throws Exception {
long checkpointId = Integer.MAX_VALUE + 1L; long checkpointId = Integer.MAX_VALUE + 1L;
long triggerTimestamp = Integer.MAX_VALUE + 1L; long triggerTimestamp = Integer.MAX_VALUE + 1L;
CheckpointProperties props = new CheckpointProperties(true, true, false, false, true, false, true); CheckpointProperties props = new CheckpointProperties(true, CheckpointType.SAVEPOINT, false, false, true, false, true);
long restoreTimestamp = Integer.MAX_VALUE + 1L; long restoreTimestamp = Integer.MAX_VALUE + 1L;
String externalPath = "external-path"; String externalPath = "external-path";


Expand Down

0 comments on commit 35c7d93

Please sign in to comment.