Skip to content

Conversation

@shuai-xu
Copy link
Contributor

This pr if for jira-#5501.

The main changes are:

  1. Add interface isJobFinished() and clearJob() to RunningJobRegistry and implement them.
  2. After grantLeadership, JMRunner will first check whether the job is finished, if finished, it means that other JM has finished the job, it only need to exist.
  3. Then JMRunner will check whether the job is running, if running, it means other JM has run it, but not succeeded, so it need to recover it.
  4. If the job is not running, it mean the first running, the JMRunner will setJobRunning in RunningJobRegistry.
  5. After job finished, will clear the job state from RunningJobRegistry

@StephanEwen
Copy link
Contributor

StephanEwen commented Feb 24, 2017

PR looks like a good start, but I think we need to add a few things on top:

  • The file-based registry cannot distinguish between "job created but not running" and "job running". This distinction is important to decide whether to start reconciliation.
  • There are currently no tests for the extended functionality of the file-based registry

@StephanEwen
Copy link
Contributor

I would like to merge this and make a few edits on top...

@StephanEwen
Copy link
Contributor

One issue I think can happen in practice is that the checks "isRunning" and "isFinished" are not atomic. Imagine this scenario:

  • job is running
  • JobManager checks "isFinished" -> false
  • job finishes
  • JobManager checks "isRunning" -> false
  • JobManager starts job = bug

@StephanEwen
Copy link
Contributor

With the problem observed above, I think we should change the approach a bit:

  • The registry should have an enum that it returns: getJobSchedulingStatus or so, which can be PENDING, RUNNING, and DONE. That way there is only one access to the registry and we don't have the problem that the internal status is changed between checks.

  • The file-based registry would create one file for the transition to RUNNING and another for the transition to DONE. Important is that the transition to DONE does not remove the file for RUNNING. The status check checks backwards - first for the DONE file, then for the RUNNING file.

String zkPath = runningJobPath + jobID.toString();
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
this.client.setData().forPath(zkPath);
this.client.setData().forPath(zkPath, RUNNING.getBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String to bytes conversion (and bytes to string) must always explicitly specify the encoding (Charset). Otherwise, there can be mismatches when different machines configure different default Charsets.

@shuai-xu
Copy link
Contributor Author

hi @StephanEwen , thank for you review, I modify it according to your comments, add getJobSchedulingStatus to it and add tests.

@StephanEwen
Copy link
Contributor

Thanks!
I think I can take this over now...

StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Feb 27, 2017
@StephanEwen
Copy link
Contributor

One test case seemed to be failing in this PR:
I have merged the PR to my local repository, fixed the test, and added some fixes/cleanups on top.
Will merge back to Flink master tomorrow...

@shuai-xu
Copy link
Contributor Author

@StephanEwen , Thank you very much, sorry for the test break, next time I will be more careful.

StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Feb 28, 2017
@asfgit asfgit closed this in e7011d7 Feb 28, 2017
p16i pushed a commit to p16i/flink that referenced this pull request Apr 16, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants