Skip to content

Commit

Permalink
[FLINK-3107] [runtime] Defer start of checkpoint ID counter
Browse files Browse the repository at this point in the history
  • Loading branch information
uce committed Feb 12, 2016
1 parent 8e55bbd commit 937963e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
Expand Up @@ -230,6 +230,7 @@ public ApplicationID restoreSavepoint(

// Reset the checkpoint ID counter
long nextCheckpointId = checkpoint.getCheckpointID();
checkpointIdCounter.start();
checkpointIdCounter.setCount(nextCheckpointId + 1);
LOG.info("Reset the checkpoint ID to {}", nextCheckpointId);

Expand Down
Expand Up @@ -175,6 +175,7 @@ public void testSimpleRollbackSavepoint() throws Exception {
}
}

MockCheckpointIdCounter idCounter = new MockCheckpointIdCounter();
StateStore<Savepoint> savepointStore = new HeapStateStore<>();

SavepointCoordinator coordinator = createSavepointCoordinator(
Expand All @@ -184,7 +185,7 @@ public void testSimpleRollbackSavepoint() throws Exception {
triggerVertices,
ackVertices,
new ExecutionVertex[] {},
new MockCheckpointIdCounter(),
idCounter,
savepointStore);

Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
Expand Down Expand Up @@ -213,6 +214,9 @@ public void testSimpleRollbackSavepoint() throws Exception {
// Verify all promises removed
assertEquals(0, getSavepointPromises(coordinator).size());

// Verify checkpoint ID counter started
assertTrue(idCounter.isStarted());

coordinator.shutdown();
}

Expand Down Expand Up @@ -1083,15 +1087,18 @@ private static ExecutionVertex mockExecutionVertex(

private static class MockCheckpointIdCounter implements CheckpointIDCounter {

private boolean started;
private long count;
private long lastReturnedCount;

@Override
public void start() throws Exception {
started = true;
}

@Override
public void stop() throws Exception {
started = false;
}

@Override
Expand All @@ -1108,5 +1115,9 @@ public void setCount(long newCount) {
long getLastReturnedCount() {
return lastReturnedCount;
}

public boolean isStarted() {
return started;
}
}
}

0 comments on commit 937963e

Please sign in to comment.