New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-24021][HA]Handle curator framework start error by register UnhandledErrorListener before start zk client #17053
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 3e938a7 (Mon Aug 30 13:18:03 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @Aitozi. I think you are right that we don't handle the case properly where the CuratorZookeeperClient.start
fails. In order to do that we have to register an UnhandledErrorListener
right away.
I left a couple of comments. Please take a look.
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
Outdated
Show resolved
Hide resolved
// This handler is only used to handle the error during start phase, and should be | ||
// removed after start curator success. | ||
UnhandledErrorListener unhandledErrorListener = | ||
(message, throwable) -> { | ||
LOG.error( | ||
"Exiting process for unhandled error in start curator framework, " | ||
+ "error message: {}, exiting code: {}", | ||
message, | ||
ZOOKEEPER_FAILURE_EXIT_CODE, | ||
throwable); | ||
System.exit(ZOOKEEPER_FAILURE_EXIT_CODE); | ||
}; | ||
cf.getUnhandledErrorListenable().addListener(unhandledErrorListener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am actually wondering whether we shouldn't only register this one handler and remove the listeners in the ZooKeeperLeaderElectionDriver
and ZooKeeperLeaderRetrievalDriver
because these two listeners would be called for every unhandled exception that occurs in Curator independent of the actual source. Moreover, the current UnhandledErrorListener
always call the FatalExitExceptionHandler
eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really get your meaning, may be it's should only
?
I think we could safely remove the listener in leaderElectionDriver
and leaderRetrievalDriver
like you said that,all the three handlers always exit the process now, after one execute other will not be executed. So there is no meaning to register other handlers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I meant to only register this one failure handler and remove the listener in leaderElectionDriver
and leaderRetrievalDriver
.
Thanks for your review @tillrohrmann . I have addressed most of your comments. Regarding to whether to remove the listener in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @Aitozi. I think it is ok to remove the UnhandledErrorListener
from the ZooKeeperLeaderElectionDriver
and ZooKeeperLeaderRetrievalDriver
as a follow-up step. What we probably should do is to pass in a FatalErrorHandler
into the ZooKeeperUtils.startCuratorFramework
. That way we can integrate the failure handling with the ClusterEntrypoint
, TaskManagerRunner
, etc.
(message, throwable) -> { | ||
LOG.error( | ||
"Exiting process for unhandled error in curator framework, " | ||
+ "error message: {}, exiting code: {}", | ||
message, | ||
ZOOKEEPER_FAILURE_EXIT_CODE, | ||
throwable); | ||
FlinkSecurityManager.forceProcessExit(ZOOKEEPER_FAILURE_EXIT_CODE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is actually better if we can pass in a FatalErrorHandler
. That way we can integrate the failure handling with the general logic (e.g. in the ClusterEntrypoint
we add additional information if there was a out of memory error).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we wouldn't have the problems of how to test for FlinkSecurityManager.forceProcessExit
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get it, I will add the FatalErrorHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passed FatalErrorHandler
to ZookeeperUtils#startCuratorFramework
. Please help review again.
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @Aitozi. I think it looks almost mergeable. I left some suggestions for improvements and a comment how to set the FatalErrorHandler
in the RestClusterClient
. Please take a look.
this.configuration = checkNotNull(configuration); | ||
|
||
this.restClusterClientConfiguration = | ||
RestClusterClientConfiguration.fromConfiguration(configuration); | ||
|
||
if (restClient != null) { | ||
this.restClient = restClient; | ||
} else { | ||
this.restClient = new RestClient(configuration, executorService); | ||
} | ||
|
||
this.waitStrategy = checkNotNull(waitStrategy); | ||
this.clusterId = checkNotNull(clusterId); | ||
|
||
this.clientHAServices = | ||
HighAvailabilityServicesUtils.createClientHAService( | ||
configuration, | ||
exception -> | ||
webMonitorLeaderRetriever.handleError( | ||
new FlinkException(exception))); | ||
|
||
this.webMonitorRetrievalService = clientHAServices.getClusterRestEndpointLeaderRetriever(); | ||
this.retryExecutorService = | ||
Executors.newSingleThreadScheduledExecutor( | ||
new ExecutorThreadFactory("Flink-RestClusterClient-Retry")); | ||
startLeaderRetrievers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not feel right. We are effectively copying the next constructor. I think there must be another solution. Maybe, if we don't specify a ClientHighAvailabilityServices
, then we instantiate it with a handler that terminates the process in case of an unhandled exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think it's strange to directly terminate the client process. And the current webMonitorRetrievalService
also just complete the future with exception. I have to copy the constructor mainly due to there is a constructor which accept the StandaloneClientHAServices
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I can use an ClientHighAvailabilityServicesFactory
here to decouple the RestClusterClient
with concrete ClientHighAvailabilityServices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this could be a good solution to decouple these two components a bit better.
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
Outdated
Show resolved
Hide resolved
@@ -149,9 +151,12 @@ public static String generateLeaderLatchPath(String path) { | |||
* Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper quorum. | |||
* | |||
* @param configuration {@link Configuration} object containing the configuration values | |||
* @param fatalErrorHandler {@link FatalErrorHandler} fatalErrorHandler to handle unExpected | |||
* error of {@link CuratorFramework} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* error of {@link CuratorFramework} | |
* errors of {@link CuratorFramework} |
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
Outdated
Show resolved
Hide resolved
"Unhandled error in curator framework, error message: {}", | ||
message, | ||
throwable); | ||
// The exception thrown in UnhandledErrorListener will be catch by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which exception is thrown here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I comment to draw attention to UnhandledErrorListener
will catch all exception when executed. The FatalErrorHandler
used here should be aware of this. If a handler just throw an exception may be eat by curator framework.
...src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
Outdated
Show resolved
Hide resolved
fd4a9eb
to
948cd08
Compare
Thanks for your review , I have fixed all of your comments, plz take a look again. @tillrohrmann |
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @Aitozi. I think we are almost done. I only had a few minor comments left concerning the usage of anonymous classes. Once these comments are addressed and Azure gives green light, I will merge this PR.
There are also some merge conflicts. Maybe you can rebase this PR while resolving the last comments onto the latest master. Moreover, it would be awesome if you could create a backport against the release-1.14
branch so that we also run CI for this version before merging it back.
new ClientHighAvailabilityServicesFactory() { | ||
@Override | ||
public ClientHighAvailabilityServices create( | ||
Configuration configuration, FatalErrorHandler fatalErrorHandler) | ||
throws Exception { | ||
return HighAvailabilityServicesUtils.createClientHAService( | ||
configuration, fatalErrorHandler); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create a DefaultClientHighAvailabilityServicesFactory
that has a singleton INSTANCE
that does exactly this here. Then we don't have to use anonymous classes.
new FatalErrorHandler() { | ||
@Override | ||
public void onFatalError(Throwable exception) { | ||
webMonitorLeaderRetriever.handleError( | ||
new FlinkException(exception)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new FatalErrorHandler() { | |
@Override | |
public void onFatalError(Throwable exception) { | |
webMonitorLeaderRetriever.handleError( | |
new FlinkException(exception)); | |
} | |
exception -> | |
webMonitorLeaderRetriever.handleError( | |
new FlinkException(exception)) |
A bit less boilerplate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dose this means add more error msg?
configuration, | ||
clusterId, | ||
new StandaloneClientHAServices(getWebMonitorAddress(configuration))); | ||
String webMonitorAddress = getWebMonitorAddress(configuration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this into the factory because we don't know whether configuration
won't be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a wrong thought here, done!
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
Outdated
Show resolved
Hide resolved
Having addressed all your concerns, thanks for your review time @tillrohrmann |
Created the backport PR |
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. LGTM. Addressing my last comments while merging this PR.
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
Outdated
Show resolved
Hide resolved
new ClientHighAvailabilityServicesFactory() { | ||
@Override | ||
public ClientHighAvailabilityServices create( | ||
Configuration configuration, | ||
FatalErrorHandler fatalErrorHandler) | ||
throws Exception { | ||
return new StandaloneClientHAServices( | ||
getWebMonitorAddress(configuration)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here probably.
…andledErrorListener before start zk client This closes apache#17053.
505c576
to
03dc417
Compare
…andledErrorListener before start zk client This closes apache#17053.
03dc417
to
6b4f8a4
Compare
…andledErrorListener before start zk client This closes apache#17053.
What is the purpose of the change
As described in issue , this PR is meant to fix the potential job unrecoverable due to network failure . This is done by add an unhandledErrorListener before start curator client
Tests:
ZookeeperUtilsTest#testStartCuratorFrameworkFailed
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)