Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11665] Wait for job termination before recovery in Dispatcher #7889

Closed
wants to merge 2 commits into from

Conversation

azagrebin
Copy link
Contributor

What is the purpose of the change

This PR addresses a concurrency problem when dispatcher loses leadership and immediately gains it again. When dispatcher recovers the job it has to wait for the termination of previous job run. Otherwise, it can happen job recovery for the next run adds execution graph in HA store and the concurrent termination of the previous run can subsequently remove it which leads to a further inconsistent state of the storage.

Brief change log

  • Change future ordering in Dispatcher methods which involve job recovery.
  • Add unit tests to check the problem

Verifying this change

run unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 4, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@@ -313,8 +318,6 @@ private void stopDispatcherServices() throws Exception {

final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this changes? Please revert if it is irrelevant. Separated futures/bugfixes and code cleans can help review.

Copy link
Contributor Author

@azagrebin azagrebin Mar 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My PR probably changed timing for events happening in some unit tests and triggered a concurrency issue fixed by this change. The PR would have test instability without this change. I think it is small enough to be part of this PR, though, I can factor it out into a separate commit.

@@ -351,7 +356,8 @@ private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner
(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
// check if we are still the active JobManagerRunner by checking the identity
//noinspection ObjectEquality
if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this changes? Please revert if it is irrelevant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, please revert this change only introduces r which is not necessary.

Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we wait on the previous termination future one job for one job? In your diff it seems before any job recovery, we have to wait all jobs to terminate.

@azagrebin
Copy link
Contributor Author

@tisonkun
Good point, I thought about it but did not want to change how job recovery from store happens sequentially now. Also, it will probably not gain too much speed up, because job is currently run only after checking that the leadership is still owned and not sure we can check leadership before all jobs are recovered. Though, I added a commit to change it and we can discuss whether it makes sense.

@tisonkun
Copy link
Member

tisonkun commented Mar 6, 2019

@azagrebin agree that we gain not too much speed up. My concern is that it is a more natural dependency. After all, a job cannot be recovered because an irrelevant job hasn't terminated sounds a bit weird.

@azagrebin
Copy link
Contributor Author

@flinkbot attention @tillrohrmann

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @azagrebin. I think your solution can work.

However, I would rather revisit how Dispatchers interact with the SubmittedJobGraphStore and also the lifecycle of Dispatchers wrt to leader sessions. We recently discovered another problem with keeping one Dispatcher alive across multiple leader sessions which can cause the Dispatcher to deadlock (see FLINK-11843). I would like to make the Dispatcher not being used across leader sessions and instead creating a new Dispatcher for every new leader session. By not sharing the SubmittedJobGraphStore between different Dispatchers we should be able to solve this issue here as well. Therefore I would be in favor of closing this PR for the moment.

@@ -351,7 +356,8 @@ private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner
(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
// check if we are still the active JobManagerRunner by checking the identity
//noinspection ObjectEquality
if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, please revert this change only introduces r which is not necessary.

new DispatcherException(
String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
throwable)); })
.thenApply(ignored -> jobId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thenApply can be removed if CompletableFuture.completedFuture(jobId)

final CompletableFuture<Acknowledge> persistAndRunFuture = getJobTerminationFuture(jobId)
.thenComposeAsync(
FunctionUtils.uncheckedFunction((ignored) -> {
jobManagerTerminationFutures.remove(jobId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have to spread the removal of the job manager termination futures all over the place?

@@ -176,8 +183,7 @@ public void testSubmittedJobGraphRelease() throws Exception {
leaderElectionService.notLeader();

// wait for the job to properly terminate
final CompletableFuture<Void> jobTerminationFuture = dispatcher.getJobTerminationFuture(jobId, TIMEOUT);
jobTerminationFuture.get();
dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactorings should go into separate commits.

@@ -279,6 +285,7 @@ public void testStandbyDispatcherJobRecovery() throws Exception {
Dispatcher dispatcher1 = null;
Dispatcher dispatcher2 = null;

//noinspection TryFinallyCanBeTryWithResources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

}

private static class WaitingSubmittedJobGraphStore implements SubmittedJobGraphStore {
private static final long WAIT_RECOVER_MILLI = 1000L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The waiting time is too long. It should be ok to set it to something like 10-50 milli seconds

recoverJobGraphLatch.await(WAIT_RECOVER_MILLI, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// ignore
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do the TimeoutException handling in the test code because otherwise one might think that we actually recover if you are only looking at the test code.

dispatcher.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();

assertTrue(submittedJobGraphStore.isJobRemoved());
assertFalse(submittedJobGraphStore.getJobIds().contains(jobGraph.getJobID()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to use hamcrest assertions in the future because they give better error messages and are more expressive.

@tisonkun
Copy link
Member

tisonkun commented Apr 9, 2019

@tillrohrmann

I would like to make the Dispatcher not being used across leader sessions and instead creating a new Dispatcher for every new leader session. By not sharing the SubmittedJobGraphStore between different Dispatchers we should be able to solve this issue here as well.

sounds good to me.

We might not maintain a failing contender(the leader) but just crash it and restart a new one(contender). Following this pattern we can exit on dispatcher lost leadership but it looks like we need a cluster manager to take care of creating a new Dispatcher

@azagrebin
Copy link
Contributor Author

Thanks for the reviews @tillrohrmann @tisonkun
I agree to address this issue with the more proper design in the scope of FLINK-11843.

@azagrebin azagrebin closed this Apr 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants