Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20928] Fix flaky test by retrying notifyCheckpointComplete until either commit success or timeout #17342

Merged
merged 1 commit into from
Oct 6, 2021

Conversation

lindong28
Copy link
Member

@lindong28 lindong28 commented Sep 23, 2021

What is the purpose of the change

The test KafkaSourceReaderTest.testOffsetCommitOnCheckpointComplete is flaky according to the test failure history in FLINK-20928. This PR attempts to fix this flaky test.

Brief change log

Here are the problems with the existing code that could explain why the test is flaky:

  1. The test calls KafkaSourceReader.notifyCheckpointComplete(...) once and expects the offset commit to be successful.
  2. However, KafkaSourceReader.notifyCheckpointComplete(...) does not guarantee the offset commit to be successfully. This is because it calls KafkaConsumer.commitAsync(...) just once and won't retry even if the commit fails with an retriable exception.
  3. During in the test, if the coordinator is temporarily unavailable due to e.g. coordinator movement or network disconnection, the test will fail due to TimeoutException.

This PR made the following changes to address the issues described above:

  1. Updated KafkaSourceReader.notifyCheckpointComplete so that it can be called multiple times with the same checkpointId.
  2. Updated CommonTestUtils.waitUtil(...) to support user-specified sleep time. Previously waitUtil(...) hardcodes the sleep time to be 1 ms.
  3. Updated KafkaSourceReaderTest.testOffsetCommitOnCheckpointComplete to retry KafkaSourceReader.notifyCheckpointComplete once per second until either the offset commit has completed or the max wait time has been reached.

Verifying this change

The test KafkaSourceReaderTest#testOffsetCommitOnCheckpointComplete could consistently pass across 200 runs.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (no)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 7f92b13 (Thu Sep 23 14:53:20 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@lindong28 lindong28 changed the title [FLINK-20928] Fix flaky test by invoking notifyCheckpointComplete periodically [FLINK-20928] Fix flaky test by retrying notifyCheckpointComplete until either commit success or timeout Sep 23, 2021
@lindong28 lindong28 force-pushed the FLINK-20928 branch 2 times, most recently from 46d4952 to a6fc284 Compare September 23, 2021 15:14
@flinkbot
Copy link
Collaborator

flinkbot commented Sep 23, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@lindong28
Copy link
Member Author

@flinkbot run azure

Copy link

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 very nice catch. I think your analysis of the cause is correct the KafkaSourceReader treats the offset committing not as mandatory which can lead to flaky tests. Your retry loop should harden the test.

@@ -73,7 +73,7 @@ public void commitOffsets(
if (offsetsToCommit.isEmpty()) {
return;
}
SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher = fetchers.get(0);
SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher = getRunningFetcher();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Does this change have any effect on the fix if not maybe make the change a separate commit?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit more why this is a performance improvement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @fapaul. This change does not affect the fix. I have updated the PR to remove this change.

Regarding the reason why this could improve performance, let's assume the first fetcher created by this KafkaSourceFetcherManager has been closed and removed from fetchers. Prior to this change, every time the commitOffsets() is called, it will create a new SplitFetcher just to commit the offset. If commitOffsets() is called N times, then N SplitFetcher will be created, which seems to be really inefficient.

In order to fix this problem, we can commit the message using any running fetcher in the fetchers, which could be achieved by using getRunningFetcher() here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created https://issues.apache.org/jira/browse/FLINK-24398 to track this issue.

@AHeise AHeise merged commit cd50229 into apache:master Oct 6, 2021
@AHeise
Copy link
Contributor

AHeise commented Oct 6, 2021

Thank you very much for the contribution. I merged it into master. Could you please create backport PRs?

@lindong28
Copy link
Member Author

lindong28 commented Oct 8, 2021

@AHeise Thank you for helping review the PR. This PR just fixes a flaky test. Does this need to be backported?

I am happy to create backport PRs. I have not done this before. Could you let me know which branches need to have this backport PR?

@fapaul
Copy link

fapaul commented Oct 12, 2021

@lindong28 sorry for the late response. Can you cherry-pick your commit and create a pull request against the 1.14 branch?

@lindong28
Copy link
Member Author

Thanks @fapaul. I have created #17457 as suggested.

@AHeise
Copy link
Contributor

AHeise commented Oct 14, 2021

I merged the backport into 1.14. According to the ticket it also affects 1.13. Can you verify that and do another backport? If not, please close the ticket.

@lindong28
Copy link
Member Author

Thanks @AHeise. Sure, I have created #17488 to backport this fix to the 1.13 branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants