Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10255] Only react to onAddedJobGraph signal when being leader #6678

Closed
wants to merge 7 commits into from

Conversation

tillrohrmann
Copy link
Contributor

What is the purpose of the change

The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
In all other cases the signal should be ignored since the jobs will be recovered once
the Dispatcher becomes the leader.

In order to still support non-blocking job recoveries, this commit serializes all
recovery operations by introducing a recoveryOperation future which first needs to
complete before a subsequent operation is started. That way we can avoid race conditions
between granting and revoking leadership as well as the onAddedJobGraph signals. This is
important since we can only lock each JobGraph once and, thus, need to make sure that
we don't release a lock of a properly recovered job in a concurrent operation.

cc @GJL

Brief change log

  • Only react to SubmittedJobGraphListener#onAddedJobGraph when being the leader
  • Serialize recovery operations by introducing a recoveryOperation future in order to avoid wrong unlocking of guarded resources

Verifying this change

  • Added ZooKeeperHADispatcherTest#testStandbyDispatcherJobExecution and ZooKeeperHADispatcherTest#testStandbyDispatcherJobRecovery

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@GJL GJL self-assigned this Sep 10, 2018
@@ -85,5 +87,13 @@
return FutureUtils.completeAll(terminationFutures);
}

public static void stopActor(AkkaActorGateway akkaActorGateway) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR we introduce stopActor which is used at one place. After checking all our project, we have define many stopActor here and there. Most usages of them are from TestingUtils but there are also some from MesosResourceManager and FlinkUntypedActorTest. Sometimes use PoisonPill and sometimes use Kill.
Apart from this PR, since all stuff interact with Akka would depend on flink-runtime, let's unify stopActor Utils.
I think here, ActorUtils is the best place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will clean this up as a follow up.

@tisonkun
Copy link
Member

Travis show relevant failures, will take a close look later.

testGrantingRevokingLeadership(org.apache.flink.runtime.dispatcher.DispatcherHATest)  Time elapsed: 0.024 sec  <<< ERROR!
org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: java.lang.UnsupportedOperationException: Should not be called.
	at org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51)
	at org.apache.flink.runtime.dispatcher.DispatcherHATest.teardown(DispatcherHATest.java:98)
Caused by: java.lang.UnsupportedOperationException: Should not be called.
	at org.apache.flink.runtime.dispatcher.DispatcherHATest$BlockingSubmittedJobGraphStore.releaseJobGraph(DispatcherHATest.java:306)
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:809)
	at org.apache.flink.util.function.BiFunctionWithException.apply(BiFunctionWithException.java:49)
	at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
	at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.45 sec <<< FAILURE! - in org.apache.flink.runtime.dispatcher.DispatcherHATest
testGrantingRevokingLeadership(org.apache.flink.runtime.dispatcher.DispatcherHATest)  Time elapsed: 0.028 sec  <<< ERROR!
org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: java.lang.UnsupportedOperationException: Should not be called.
	at org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51)
	at org.apache.flink.runtime.dispatcher.DispatcherHATest.teardown(DispatcherHATest.java:98)
Caused by: java.lang.UnsupportedOperationException: Should not be called.
	at org.apache.flink.runtime.dispatcher.DispatcherHATest$BlockingSubmittedJobGraphStore.releaseJobGraph(DispatcherHATest.java:306)
	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:809)
	at org.apache.flink.util.function.BiFunctionWithException.apply(BiFunctionWithException.java:49)
	at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
	at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
A TaskManager should go into a clean state in case of a JobManager failure(org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase)  Time elapsed: 121.247 sec  <<< FAILURE!
java.lang.AssertionError: assertion failed: timeout (119585594930 nanoseconds) during expectMsg while waiting for Acknowledge
	at scala.Predef$.assert(Predef.scala:170)
	at akka.testkit.TestKitBase$class.expectMsg_internal(TestKit.scala:387)
	at akka.testkit.TestKitBase$class.expectMsg(TestKit.scala:364)
	at akka.testkit.TestKit.expectMsg(TestKit.scala:814)
	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerFailsITCase.scala:118)
	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply(JobManagerFailsITCase.scala:104)
	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply(JobManagerFailsITCase.scala:104)
	at akka.testkit.TestKitBase$class.within(TestKit.scala:345)
	at akka.testkit.TestKit.within(TestKit.scala:814)
	at akka.testkit.TestKitBase$class.within(TestKit.scala:359)
	at akka.testkit.TestKit.within(TestKit.scala:814)
	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply$mcV$sp(JobManagerFailsITCase.scala:104)
	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(JobManagerFailsITCase.scala:85)
	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(JobManagerFailsITCase.scala:85)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953)
	at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase.withFixture(JobManagerFailsITCase.scala:37)

@tillrohrmann
Copy link
Contributor Author

Thanks for the comments @tisonkun. I've fixed the failing DispatcherTest#testOnAddedJobGraphWithFinishedJob.

@@ -0,0 +1,48 @@
/*
Copy link
Member

@GJL GJL Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding this file should be in a commit that is before [FLINK-10255] Only react to onAddedJobGraph signal when being leader, or it should be squashed. Without this class your previous commits would not work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will rearrange the commits before merging. Locally 3a07ee8 is before 868c7dd which makes it work.

Copy link
Member

@GJL GJL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a review.

return callAsyncWithoutFencing(
() -> getJobTerminationFuture(jobId),
timeout).thenCompose(Function.identity());
}

@VisibleForTesting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about @VisibleForTesting here. This class is already a test utility. It is even in the test sources directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Will change it.

@@ -861,14 +861,18 @@ public void grantLeadership(final UUID newLeaderSessionID) {
getMainThreadExecutor());
}

protected CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be private.

Copy link
Contributor Author

@tillrohrmann tillrohrmann Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot be private since the TestingDispatcher needs to access it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I missed it.

if (jobManagerRunners.containsKey(jobId)) {
return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
} else {
return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
}
}

CompletableFuture<Void> getRecoveryOperation() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has wider visibility scope than necessary, and is part of production code. I think @VisibleForTesting should be added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will add it.

final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync(
(FunctionWithThrowable<JobGraph, CompletableFuture<Void>, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
.thenAcceptAsync(
(ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
Copy link
Member

@GJL GJL Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo we are doing this wrong. The code would be much more readible with static factory methods:


/**
 * {@link Consumer} that can throw checked exceptions.
 */
@FunctionalInterface
public interface CheckedConsumer<T> {

	void checkedAccept(T t) throws Exception;

	static <T> Consumer<T> unchecked(CheckedConsumer<T> checkedConsumer) {
		return (t) -> {
			try {
				checkedConsumer.checkedAccept(t);
			} catch (Exception e) {
				ExceptionUtils.rethrow(e);
			}
		};
	}
}

This allows for:

.thenAcceptAsync(CheckedConsumer.unchecked(isRecoveredJobRunning -> {
     ...
}));
...

No casts are required. Also when interacting with the Java API, it does not matter what exact type of exception can be thrown – what matters is that the checked exception becomes unchecked. We do not need to generify the exception type in ConsumerWithException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I like this approach better. Will adapt the existing interfaces.


final DispatcherId dispatcherId = getFencingToken();
final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync(
(FunctionWithThrowable<JobGraph, CompletableFuture<Void>, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment regarding the ConsumerWithException.

@@ -167,7 +181,7 @@ public void testSubmittedJobGraphRelease() throws Exception {
// recover the job
final SubmittedJobGraph submittedJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobId);

assertThat(submittedJobGraph, Matchers.is(Matchers.notNullValue()));
assertThat(submittedJobGraph, is(Matchers.notNullValue()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You added a static import for is but not for notNullValue. I think this should be consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Will change it.

@tillrohrmann
Copy link
Contributor Author

Thanks for the review @GJL. I've addressed your comments and after Travis gives green light, I'll merge it.

@tillrohrmann tillrohrmann force-pushed the fixJobRecovery branch 3 times, most recently from f255bb2 to 3d94a81 Compare September 13, 2018 16:26
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Sep 13, 2018
The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
In all other cases the signal should be ignored since the jobs will be recovered once
the Dispatcher becomes the leader.

In order to still support non-blocking job recoveries, this commit serializes all
recovery operations by introducing a recoveryOperation future which first needs to
complete before a subsequent operation is started. That way we can avoid race conditions
between granting and revoking leadership as well as the onAddedJobGraph signals. This is
important since we can only lock each JobGraph once and, thus, need to make sure that
we don't release a lock of a properly recovered job in a concurrent operation.

This closes apache#6678.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Sep 13, 2018
The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
In all other cases the signal should be ignored since the jobs will be recovered once
the Dispatcher becomes the leader.

In order to still support non-blocking job recoveries, this commit serializes all
recovery operations by introducing a recoveryOperation future which first needs to
complete before a subsequent operation is started. That way we can avoid race conditions
between granting and revoking leadership as well as the onAddedJobGraph signals. This is
important since we can only lock each JobGraph once and, thus, need to make sure that
we don't release a lock of a properly recovered job in a concurrent operation.

This closes apache#6678.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Sep 13, 2018
The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
In all other cases the signal should be ignored since the jobs will be recovered once
the Dispatcher becomes the leader.

In order to still support non-blocking job recoveries, this commit serializes all
recovery operations by introducing a recoveryOperation future which first needs to
complete before a subsequent operation is started. That way we can avoid race conditions
between granting and revoking leadership as well as the onAddedJobGraph signals. This is
important since we can only lock each JobGraph once and, thus, need to make sure that
we don't release a lock of a properly recovered job in a concurrent operation.

This closes apache#6678.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Sep 13, 2018
The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
In all other cases the signal should be ignored since the jobs will be recovered once
the Dispatcher becomes the leader.

In order to still support non-blocking job recoveries, this commit serializes all
recovery operations by introducing a recoveryOperation future which first needs to
complete before a subsequent operation is started. That way we can avoid race conditions
between granting and revoking leadership as well as the onAddedJobGraph signals. This is
important since we can only lock each JobGraph once and, thus, need to make sure that
we don't release a lock of a properly recovered job in a concurrent operation.

This closes apache#6678.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Sep 13, 2018
The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
In all other cases the signal should be ignored since the jobs will be recovered once
the Dispatcher becomes the leader.

In order to still support non-blocking job recoveries, this commit serializes all
recovery operations by introducing a recoveryOperation future which first needs to
complete before a subsequent operation is started. That way we can avoid race conditions
between granting and revoking leadership as well as the onAddedJobGraph signals. This is
important since we can only lock each JobGraph once and, thus, need to make sure that
we don't release a lock of a properly recovered job in a concurrent operation.

This closes apache#6678.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Sep 14, 2018
The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
In all other cases the signal should be ignored since the jobs will be recovered once
the Dispatcher becomes the leader.

In order to still support non-blocking job recoveries, this commit serializes all
recovery operations by introducing a recoveryOperation future which first needs to
complete before a subsequent operation is started. That way we can avoid race conditions
between granting and revoking leadership as well as the onAddedJobGraph signals. This is
important since we can only lock each JobGraph once and, thus, need to make sure that
we don't release a lock of a properly recovered job in a concurrent operation.

This closes apache#6678.
The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
In all other cases the signal should be ignored since the jobs will be recovered once
the Dispatcher becomes the leader.

In order to still support non-blocking job recoveries, this commit serializes all
recovery operations by introducing a recoveryOperation future which first needs to
complete before a subsequent operation is started. That way we can avoid race conditions
between granting and revoking leadership as well as the onAddedJobGraph signals. This is
important since we can only lock each JobGraph once and, thus, need to make sure that
we don't release a lock of a properly recovered job in a concurrent operation.

This closes apache#6678.
…nsumer

ThrowingRunnable#unchecked converts a ThrowingRunnable into a Runnable which throws checked
exceptions as unchecked ones. FunctionUtils#uncheckedConsmer(ThrowingConsumer) converts a
ThrowingConsumer into a Consumer which throws checked exceptions as unchecked ones. This is
necessary because ThrowingConsumer is public and we cannot add new methods to the interface.
…point

This is necessary to support the command line syntax used by the multi master
standalone start-up scripts.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Sep 14, 2018
The Dispatcher should only react to the onAddedJobGraph signal if it is the leader.
In all other cases the signal should be ignored since the jobs will be recovered once
the Dispatcher becomes the leader.

In order to still support non-blocking job recoveries, this commit serializes all
recovery operations by introducing a recoveryOperation future which first needs to
complete before a subsequent operation is started. That way we can avoid race conditions
between granting and revoking leadership as well as the onAddedJobGraph signals. This is
important since we can only lock each JobGraph once and, thus, need to make sure that
we don't release a lock of a properly recovered job in a concurrent operation.

This closes apache#6678.
@asfgit asfgit closed this in 3e5d07c Sep 14, 2018
@tillrohrmann tillrohrmann deleted the fixJobRecovery branch September 14, 2018 13:21
@Clarkkkkk
Copy link
Contributor

Hi @tillrohrmann , is it possible that two async operation that modifies the same recoveryOperation at the same time? Would that be serializable in that case?

@tillrohrmann
Copy link
Contributor Author

I think it should not be possible to have two async recovery operations ongoing since either of them will have to wait for the other to complete. That was the idea of the fix.

@Clarkkkkk
Copy link
Contributor

Thanks for the reply, that'll make sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants