New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-15347] Add SupervisorActor which monitors the proper termination of AkkaRpcActors #11683
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 07cde12 (Thu Apr 09 08:41:49 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
This PR has still a problem with a potential deadlock because the |
Resolved the problem (hopefully) by introducing a separate |
@flinkbot run travis |
@flinkbot run azure |
fade42d
to
a4765c1
Compare
Preconditions.checkState(actorSystem != null, "ActorSystem must be initialized when calling after."); | ||
AkkaUtils.terminateActorSystem(actorSystem).join(); | ||
|
||
super.after(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed to call super.before()
and super.after()
. Also not done in their own code (e.g., TemporaryFolder
) and in the examples in Junit's Javadocs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I will remove it.
@@ -22,7 +22,7 @@ | |||
* Exception which indicates that the AkkaRpcActor has received an | |||
* unknown message type. | |||
*/ | |||
public class AkkaUnknownMessageException extends AkkaRpcException { | |||
public class AkkaUnknownMessageException extends AkkaRpcRuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the commit message should better reflect why this change was done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add a more detailed commit message
terminationFuture.get(); | ||
fail("Expected the termination future being completed exceptionally"); | ||
} catch (ExecutionException expected) { | ||
final Throwable ignored = ExceptionUtils.findThrowable(expected, e -> e.equals(cause)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why assign the throwable to ignored
in the first place? Also in completesTerminationFutureExceptionallyIfActorStopsExceptionally()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no reason other than stupidity.
import static org.junit.Assert.assertThat; | ||
import static org.junit.Assert.fail; | ||
|
||
public class SupervisorActorTest extends TestLogger { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I will add one. Not sure whether this all that meaningful, though.
if (errorCause != null) { | ||
if (!internalTerminationFuture.completeExceptionally(errorCause)) { | ||
// we have another failure reason -> let's add it | ||
terminationFuture = internalTerminationFuture.<Void>handle( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think <Void>
can be inferred by the compiler:
terminationFuture = internalTerminationFuture.<Void>handle( | |
terminationFuture = internalTerminationFuture.handle( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove it.
terminationFuture -> Props.create(SimpleActor.class, terminationFuture), | ||
"foobar"); | ||
|
||
final SupervisorActor.ActorRegistration actorRegistration = startResponse.orElseThrow(cause -> new AssertionError("Expected the start to succeed.", cause)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the implementations for the throwableFunction
are the same in this test. Maybe there is a way to de-deduplicate this line:
final SupervisorActor.ActorRegistration actorRegistration = startResponse.orElseThrow(cause -> new AssertionError("Expected the start to succeed.", cause));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll look into it.
Looks good all in all. |
2aba0b8
to
7be78ae
Compare
Thanks for the review @GJL. I've addressed your comments (which unfortunately required a force push). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thanks for the review @GJL. I'll rebase the PR and once Travis gives green light, then I will merge it. |
AkkaUnknownMessageException represents a programming error which is not recoverable. Consequently, this exception becomes an unchecked exception with this commit.
…on of AkkaRpcActors In order to properly complete the termination future of an RpcEndpoint, we need to monitor when the underlying AkkaRpcActor has been removed from the ActorSystem. If this is not done, then it can happen that another RpcEndpoint using the same name cannot be started because the old RpcEndpoint is still registered. The way we achieve this monitoring is to introduce a helper actor which is responsible for starting the AkkaRpcActors for all RpcEndpoints. Since the SupervisorActor is the parent of all RpcEndpoints, it can tell when they are being removed from the ActorSystem through the SupervisorStrategy. A consequence of the new actor is that the akka urls change from akka://flink@actorsystem:port/user/xyz to akka://flink@actorsystem:port/user/rpc/xyz. The respective method AkkaRpcServiceUtils.getRpcUrl has been updated to reflect this change. This hierarchy change also warrants the bump of the AkkaRpcService.VERSION. The failure behaviour of the underlying ActorSystem has been changed so that it terminates all running actors if an exception is thrown from the SupervisorActor. The assumption is that such an exception always indicates a programming error and is unrecoverable. The same applies to failure originating from an AkkaRpcActor (children of the SupervisorActor). If such an exception is thrown, then we assume that the system is in an illegal state and shut it down. The way it works is by recording the failure cause for the respective AkkaRpcActor and then terminating the ActorSystem.
The separate dispatcher ensures that the SupervisorActor can always make progress. This again ensures that the other RpcEndpoints won't deadlock when starting a new RpcEndpoint. The reason for this is because a newly started RpcEndpoint needs to register at the SupervisorActor.
FutureUtils.forwardAsync forwards the source value to the target future using the provided executor.
…Actor The SupervisorActor must never block its own main thread. Due to this constraint, we must not complete the external termination futures from the main thread, because one does not know what the user does with this future. Hence, this commit introduces a dedicated termination future executor which is responsible for completing the termination futures. This closes apache#11683.
7be78ae
to
80b7f1a
Compare
What is the purpose of the change
In order to properly complete the termination future of an RpcEndpoint, we need to monitor
when the underlying AkkaRpcActor has been removed from the ActorSystem. If this is not done,
then it can happen that another RpcEndpoint using the same name cannot be started because
the old RpcEndpoint is still registered.
The way we achieve this monitoring is to introduce a helper actor which is responsible for
starting the AkkaRpcActors for all RpcEndpoints. Since the SupervisorActor is the parent
of all RpcEndpoints, it can tell when they are being removed from the ActorSystem through
the SupervisorStrategy.
A consequence of the new actor is that the akka urls change from akka://flink@actorsystem:port/user/xyz
to akka://flink@actorsystem:port/user/rpc/xyz. The respective method AkkaRpcServiceUtils.getRpcUrl
has been updated to reflect this change. This hierarchy change also warrants the bump of the
AkkaRpcService.VERSION.
The failure behaviour of the underlying ActorSystem has been changed so that it terminates
all running actors if an exception is thrown from the SupervisorActor. The assumption is that
such an exception always indicates a programming error and is unrecoverable.
The same applies to failure originating from an AkkaRpcActor (children of the SupervisorActor).
If such an exception is thrown, then we assume that the system is in an illegal state and shut it
down. The way it works is by recording the failure cause for the respective AkkaRpcActor and
then terminating the ActorSystem.
Verifying this change
AkkaActorSystemTest
,SupervisorActorTest
andAkkaRpcActorTest.canReuseEndpointNameAfterTermination
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation