Skip to content

Conversation

@ramkrish86
Copy link
Contributor

Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.

  • General
    • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
    • The pull request addresses only one issue
    • Each commit in the PR has a meaningful commit message (including the JIRA id)
  • Documentation
    • Documentation has been added for new functionality
    • Old documentation affected by the pull request has been updated
    • JavaDoc for public methods has been added
  • Tests & Build
    • Functionality added by the pull request is covered by tests
    • mvn clean verify has been executed successfully locally or a Travis build has passed

…point

@ramkrish86
Copy link
Contributor Author

Now if a save point is triggered then we don't check for the MINIMUM_TIME_BETWEEN_CHECKPOINTS.
Also in addition to that, when the checkpoint was meeting that condition we were nullifying the currentPeriodicTrigger but not assigning it again after the new trigger was created. This PR fixes it also.

I ran the CheckpointcoordinatorTest,CheckpointStateRestoreTest and SavepointITCase.
I had to make one change in the CheckpointCoordinatorTest#testSavepointsAreNotSubsumed() the 2nd save point was getting triggered at the same timestamp. Ideally it should be at the latest time? The test case passes after that change.

Feedback and suggestions are welcome @StephanEwen and @uce .

@StephanEwen
Copy link
Contributor

Looks good to me.
+1

@uce
Copy link
Contributor

uce commented Aug 19, 2016

I will merge this and add the following test for min pause. This fails with the current master, but works with your PR.

/**
 * Tests that no minimum delay between savepoints is enforced.
 */
@Test
public void testMinDelayBetweenSavepoints() throws Exception {
    JobID jobId = new JobID();

    final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
    ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);

    CheckpointCoordinator coord = new CheckpointCoordinator(
            jobId,
            100000,
            200000,
            100000000L, // very long min delay => should not affect savepoints
            1,
            42,
            new ExecutionVertex[] { vertex1 },
            new ExecutionVertex[] { vertex1 },
            new ExecutionVertex[] { vertex1 },
            cl,
            new StandaloneCheckpointIDCounter(),
            new StandaloneCompletedCheckpointStore(2, cl),
            new HeapSavepointStore(),
            new DisabledCheckpointStatsTracker());

    Future<String> savepoint0 = coord.triggerSavepoint(0);
    assertFalse("Did not trigger savepoint", savepoint0.isCompleted());

    Future<String> savepoint1 = coord.triggerSavepoint(1);
    assertFalse("Did not trigger savepoint", savepoint1.isCompleted());
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants