Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19557] Trigger LeaderRetrievalListener notification upon ZooKeeper reconnection in ZooKeeperLeaderRetrievalService #13725

Closed
wants to merge 3 commits into from

Conversation

tillrohrmann
Copy link
Contributor

What is the purpose of the change

We have to trigger the LeaderRetrievalListener notification upon reconnecting to ZooKeeper
because the NodeCache might not trigger the nodeChanged call if the server state is the
same as the state cached in the NodeCache. Therefore, we would miss to tell the listener
that the old leader (before the connection loss) is still the valid leader.

Verifying this change

  • Added ZooKeeperLeaderElectionConnectionHandlingTest.testSameLeaderAfterReconnectTriggersListenerNotification and ZooKeeperLeaderElectionConnectionHandlingTest.testNewLeaderAfterReconnectTriggersListenerNotification

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no**)**
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

…eper reconnection in ZooKeeperLeaderRetrievalService

We have to trigger the LeaderRetrievalListener notification upon reconnecting to ZooKeeper
because the NodeCache might not trigger the nodeChanged call if the server state is the
same as the state cached in the NodeCache. Therefore, we would miss to tell the listener
that the old leader (before the connection loss) is still the valid leader.
@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit a606f61 (Wed Oct 21 12:25:53 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 21, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@@ -170,7 +174,6 @@ public void nodeChanged() throws Exception {
notifyIfNewLeaderAddress(leaderAddress, leaderSessionID);
} catch (Exception e) {
leaderListener.handleError(new Exception("Could not handle node changed event.", e));
throw e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the only implication of removing this throw is that curator won't log about the exception anymore, and it won't reset the interrupted flag.

I was wondering for a while if this has any implications, but I assume it is fine to remove, since the leader listener will handle the exception in Flink anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is the reasoning. The leaderListener.handleError should handle all exceptions and usually terminate the process. But let me actually add ExceptionUtils.checkInterrupted(e); here again.

Copy link
Contributor

@rmetzger rmetzger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change looks good, tests seem stable and cover the change.

+1 to merge.

@@ -203,6 +207,11 @@ protected void handleStateChange(ConnectionState newState) {
}
}

private void onReconnectedConnectionState() {
// check whether we find some new leader information in ZooKeeper
retrieveLeaderInformationFromZooKeeper();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this is retrieved, I believe it will be equal to the last value that the node cache had before disconnect except when the NodeCache has had time to update the value. On reconnect, the NodeCache sets up an asynchronous fetch of the value so the NodeCache will usually not have had time to update. If the leader changes when the zookeeper seession is lost (which will always be the case if the leader was the one that lost connection), the value that is retrieved here will likely be wrong. It seems like notifyLossOfLeader was implemented to prevent Flink nodes from having the wrong or stale leader. Is it less of a big deal for the Flink node to have the wrong leader in this case than it is in the case that notifyLossOfLeader is trying to solve?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right that in some cases we will report a stale leader here. However, once the asynchronous fetch from the NodeCache completes, the listener should be notified about the new leader. What happens in the meantime is that the listener will try to connect to the old leader which should either be gone or reject all connection attempts since he is no longer the leader.

The problem notifyLossOfLeader tried to solve is that a listener thinks that a stale leader is still the leader and, thus, continues working for it w/o questioning it (e.g. check with the leader) until the connection to the leader times out. With the notifyLossOfLeader change, once the retrieval service loses connection to ZooKeeper, it will tell the listener that the current leader is no longer valid. This will tell the listener to stop working for this leader (e.g. cancelling all tasks, disconnecting from it, etc.). If the listener should shortly after be told that the old leader is still the leader because of stale information, then it will first try to connect to the leader which will fail (assuming that the old leader is indeed no longer the leader) before it can start doing work for the leader.

…n ZooKeeper reconnection in ZooKeeperLeaderRetrievalService
@tillrohrmann
Copy link
Contributor Author

Thanks for the review @rmetzger and @maxmzkr. Merging this PR now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants