Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33053][runtime] Refactoring of workaround to fix thread leakage on the ZooKeeper server side. #23773

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

XComp
Copy link
Contributor

@XComp XComp commented Nov 22, 2023

What is the purpose of the change

This is a follow-up of PR #23415 which tries to come up with a more general solution for working around the thread leakage on the ZooKeeper server side. The motivation was that the approach which was introduced in #23415 changed some visibility constraints that are not really desired (making the ZooKeeperLeaderRetrievalDriver aware of the ResourceManager).

This change is meant as a draft and should rather be used as a base for a discussion whether FLINK-33053 is actually an issue that should be addressed in Flink itself. As far as I understand, the thread leakage is only happening in test code when the ZooKeeper test server implementation is set up.

Brief change log

  • Introduced a reference counter in the ZooKeeperLeaderRetrievalFactory that counts the number of instances that are created by this factory instance. This reference counter is passed into all driver instances and is used to decide whether the removeAll call should be performed during close().

Verifying this change

I tried to come up with a test to verify the behavior. But it's hard to test because the thread leakage is actually happening in the ZooKeeper server itself.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@@ -69,8 +68,40 @@ public class ZooKeeperLeaderRetrievalDriver implements LeaderRetrievalDriver {

private final FatalErrorHandler fatalErrorHandler;

/**
* Each {@code ZooKeeperLeaderRetrievalDriver} has its own watcher initialized. There is a bug
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is meant as a draft and should rather be used as a base for a discussion whether FLINK-33053 is actually an issue that should be addressed in Flink itself. As far as I understand, the thread leakage is only happening in test code when the ZooKeeper test server implementation is set up.

@KarmaGYZ Is it correct that the thread leakage happens in ZooKeeper's TestServer implementation? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not familiar with it. At least I can't find a public api to retrieve the watch info. Maybe you can verify with this guide to see whether we can get the watches of TestServer and wether there are leakage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just asking because you mentioned in the description of FLINK-33053 that you observed the issue in stress tests. Did you use the MiniCluster and ZooKeeper in a single JVM or did you have your ZooKeeper deployed separately for the stress tests?

I'd like to understand whether it's on the ZooKeeper side (that's how it sounds to me right now) or on the Flink/Curator side. If I misunderstood the discussion in FLINK-33053 and the related PR #23415 and it's not a ZooKeeper server issue, fixing it on the Flink side is reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I did the stress tests with a real ZK cluster. So, I'm afraid we might need an e2e test for it.
The leackage is happened in the ZK side. Your fix loods reasonable to me, at least for the current HA mechanism.

Copy link
Contributor Author

@XComp XComp Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, ok ... thanks for clarification. But that brings me to the point where I would be in favor of not fixing it on Flink's side at all. That needs to be addressed in ZooKeeper (as @tisonkun pointed out with his reference to ZOOKEEPER-4625). I'm curious to hear what your take on this is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the core issue is a specific component should take the responsibility to manage the lifecycle of watches. This component can be ZooKeeper, Curator, or Flink. ZK can provide a "closeWatch" interface to reconcile watch references on the server-side. Similarly, Curator and Flink can proactively handle this task and clean up by using "closeAll," just like what you did in this PR.
I personally believe that let ZK to handle this task is more reasonable. However, considering the release cycles of ZK and the need to maintain compatibility with multiple versions of ZK in Flink, we may need to proactively manage the lifecycle of watches within Flink in the foreseeable future.

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 22, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

// Ignore the no watcher exception as it's just a safetynet to fix watcher leak issue.
// For more details, please refer to FLINK-33053.
if (client.getZookeeperClient().isConnected()
&& watcherReferenceCounter.getAndIncrement() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getAndDecrement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yikes, I introduced that one when I did another code change. Good catch 👍 That proofs once more that a proper test should be added to cover this code path. 🤔

@@ -69,8 +68,40 @@ public class ZooKeeperLeaderRetrievalDriver implements LeaderRetrievalDriver {

private final FatalErrorHandler fatalErrorHandler;

/**
* Each {@code ZooKeeperLeaderRetrievalDriver} has its own watcher initialized. There is a bug
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not familiar with it. At least I can't find a public api to retrieve the watch info. Maybe you can verify with this guide to see whether we can get the watches of TestServer and wether there are leakage.

@KarmaGYZ
Copy link
Contributor

Thanks for the fix, @XComp . Great job!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants