Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18677] Added handling of suspended or lost connections #13055

Closed
wants to merge 9 commits into from

Conversation

XComp
Copy link
Contributor

@XComp XComp commented Aug 3, 2020

I added handling of suspended or lost connections within the ZooKeeperLeaderRetrievalService.

The listener needs to be notified in case of a connection loss so that it is able to initiate necessary actions on its side.

What is the purpose of the change

  • The ZooKeeperLeaderRetrievalService needs to inform its listener in case of connection loss since it might miss a leader change. The old implementation didn't trigger the listener in such a case.
  • This behavior was caused, for instance, when the TaskManager was lost its connection to the ZooKeeper and, therefore, wasn't informed about a leader change. Its tasks continued to run even though they got already recovered by another TaskManager. Hence, the same data got processed.

Brief change log

  • A method call was added when handling lost or suspended connections notifying the listener about the lost connection.
  • Two test cases were added to check the behavior.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit test for checking whether the listener is NOT informed when there was not leader information retrieved, yet.
  • Added unit test for checking whether the listener is notified when the connection was lost but there was some previous leader information present already.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? -

…thin the ZooKeeperLeaderRetrievalService.

The listener needs to be notified in case of a connection loss so that it is able to initiate necessary actions on its side.
@flinkbot
Copy link
Collaborator

flinkbot commented Aug 3, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit f5df4b1 (Sat Aug 28 11:16:01 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@XComp XComp changed the title FLINK-18677: [fix] Added handling of suspended or lost connections wi… FLINK-18677: [fix] Added handling of suspended or lost connections Aug 3, 2020
@XComp
Copy link
Contributor Author

XComp commented Aug 3, 2020

@tillrohrmann the PR is ready to be reviewed.

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 3, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@tillrohrmann tillrohrmann changed the title FLINK-18677: [fix] Added handling of suspended or lost connections [FLINK-18677] Added handling of suspended or lost connections Aug 4, 2020
@tillrohrmann tillrohrmann self-assigned this Aug 4, 2020
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @XComp. The changes look good. I had a couple of comments concerning the thread safety of the addition and some smaller suggestions. Please take a look at my comments.

…k and removed redundant code.

The redundant code was moved into notifyIfNewLeaderAddress(String, UUID) which is then used by notifyLeaderLoss() and within nodeChanged(). Additionally, the method call of notifyLeaderLoss() is guarded now by a lock to synchronize the state change (i.e. lastLeaderAddress and lastLeaderSessionID).
…it that the method is not expected to be called.
…ce we're not expecting any objects.

The test does not expect any calls happening. Hence, no CompletableFuture instance will be queued. The longer wait time would just result in a longer running test.
The previous implementation had a fixed timeout. Slower machines might need longer to process the test which might result in test failures. The new implementation removes the timeout so that the test wouldn't fail just because of a poor performance of the machine the test is running on.
@XComp XComp requested a review from tillrohrmann August 4, 2020 13:26
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments. I had a few minor comments left.

tillrohrmann pushed a commit that referenced this pull request Aug 5, 2020
…thin the ZooKeeperLeaderRetrievalService.

The listener needs to be notified in case of a connection loss so that it is able to initiate necessary actions on its side.

[FLINK-18677][runtime] [style] Replaced spaces by TABs to follow the Apache Flink code styles.

[FLINK-18677][runtime] [fix] Synchronize notifyLeaderLoss through lock and removed redundant code.

The redundant code was moved into notifyIfNewLeaderAddress(String, UUID) which is then used by notifyLeaderLoss() and within nodeChanged(). Additionally, the method call of notifyLeaderLoss() is guarded now by a lock to synchronize the state change (i.e. lastLeaderAddress and lastLeaderSessionID).

[FLINK-18677][runtime] The exception was added to make it more explicit that the method is not expected to be called.

[FLINK-18677][runtime] Decreased wait time the queue to be filled since we're not expecting any objects.

The test does not expect any calls happening. Hence, no CompletableFuture instance will be queued. The longer wait time would just result in a longer running test.

[FLINK-18677][runtime] Added infinite wait time to happy test.

The previous implementation had a fixed timeout. Slower machines might need longer to process the test which might result in test failures. The new implementation removes the timeout so that the test wouldn't fail just because of a poor performance of the machine the test is running on.

[FLINK-18677][runtime] Moved log messages out of synchronization blocks.

This closes #13055.
tillrohrmann pushed a commit that referenced this pull request Aug 5, 2020
…thin the ZooKeeperLeaderRetrievalService.

The listener needs to be notified in case of a connection loss so that it is able to initiate necessary actions on its side.

[FLINK-18677][runtime] [style] Replaced spaces by TABs to follow the Apache Flink code styles.

[FLINK-18677][runtime] [fix] Synchronize notifyLeaderLoss through lock and removed redundant code.

The redundant code was moved into notifyIfNewLeaderAddress(String, UUID) which is then used by notifyLeaderLoss() and within nodeChanged(). Additionally, the method call of notifyLeaderLoss() is guarded now by a lock to synchronize the state change (i.e. lastLeaderAddress and lastLeaderSessionID).

[FLINK-18677][runtime] The exception was added to make it more explicit that the method is not expected to be called.

[FLINK-18677][runtime] Decreased wait time the queue to be filled since we're not expecting any objects.

The test does not expect any calls happening. Hence, no CompletableFuture instance will be queued. The longer wait time would just result in a longer running test.

[FLINK-18677][runtime] Added infinite wait time to happy test.

The previous implementation had a fixed timeout. Slower machines might need longer to process the test which might result in test failures. The new implementation removes the timeout so that the test wouldn't fail just because of a poor performance of the machine the test is running on.

[FLINK-18677][runtime] Moved log messages out of synchronization blocks.

This closes #13055.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants