Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6521] Add per job cleanup methods to HighAvailabilityServices #4376

Closed
wants to merge 5 commits into from

Conversation

FangYongs
Copy link
Contributor

Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.

  • General

    • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
    • The pull request addresses only one issue
    • Each commit in the PR has a meaningful commit message (including the JIRA id)
  • Documentation

    • Documentation has been added for new functionality
    • Old documentation affected by the pull request has been updated
    • JavaDoc for public methods has been added
  • Tests & Build

    • Functionality added by the pull request is covered by tests
    • mvn clean verify has been executed successfully locally or a Travis build has passed

try {
this.submittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
} catch (Exception e) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not throw RuntimeException but instead a meaningful checked exception.

import org.junit.Assert;
import org.junit.rules.ExpectedException;

public class HighAvailabilityServiceTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we let the test case extend from TestLogger, then we get nice testing log statement on Travis.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution @zjureel. The changes look good to me. I had some minor comments which we could address. Moreover, there are some test cases failing most likely due to your changes:

ZooKeeperRegistryTest.testZooKeeperRegistry
ZooKeeperLeaderRetrievalTest.before


SubmittedJobGraph recoverJobGraph2 = submittedJobGraphStore.recoverJobGraph(jobGraph2.getJobId());
Assert.assertEquals(recoverJobGraph2.getJobId(), jobGraph2.getJobId());
thrown.expectMessage("Could not retrieve the submitted job graph state handle for /" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rather check for the exception type? Matching exception messages is really brittle.

@tillrohrmann
Copy link
Contributor

The test case JobManagerHACheckpointRecoveryITCase.testCheckpointedStreamingProgramIncrementalRocksDB seems to be failing on Travis. It might be something caused by the changes.

@FangYongs
Copy link
Contributor Author

I found the following kinda stuff from CI, and it seems not relevant to this issue, what do you think? @tillrohrmann

Running org.apache.flink.test.recovery.JobManagerHACheckpointRecoveryITCase
java.lang.RuntimeException: org.apache.zookeeper.server.ZooKeeperServer class is frozen
	at javassist.CtClassType.checkModify(CtClassType.java:288)
	at javassist.CtBehavior.setBody(CtBehavior.java:432)
	at javassist.CtBehavior.setBody(CtBehavior.java:412)
	at org.apache.curator.test.ByteCodeRewrite.fixMethods(ByteCodeRewrite.java:91)
	at org.apache.curator.test.ByteCodeRewrite.<clinit>(ByteCodeRewrite.java:50)
	at org.apache.curator.test.TestingServer.<clinit>(TestingServer.java:33)
	at org.apache.flink.test.recovery.JobManagerHACheckpointRecoveryITCase.testCheckpointedStreamingProgram(JobManagerHACheckpointRecoveryITCase.java:350)
	at org.apache.flink.test.recovery.JobManagerHACheckpointRecoveryITCase.testCheckpointedStreamingProgramIncrementalRocksDB(JobManagerHACheckpointRecoveryITCase.java:336)

@tisonkun
Copy link
Member

@tillrohrmann @zjureel

I think the functionality is implemented occasionally by #6587 FLINK-10011

However, it is still a valid question that who is the proper actor to do the clean-up job. As for SubmittedJobGraph, it is managed by Dispatcher, but the RunningJobsRegistry is tricky that both JobManagerRunner and Dispatcher can write it.

Under the topic "per job clean up", I'd like to raise a question that how flink considered the status of a job? If we said that "per job clean up" is Dispatcher's responsibility, then we should prevent JM from writing such a RunningJobsRegistry and also it means that the status on Dispatcher is what we(users) think that of a certain job.

@tillrohrmann
Copy link
Contributor

Closed for inactivity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants