Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34995] flink kafka connector source stuck when partition leade… #91

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

yanspirit
Copy link

when partition leader invalid(leader=-1),  the flink streaming job using KafkaSource can't restart or start a new instance with a new groupid,  it will stuck and got following exception:

"org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition aaa-1 could be determined"

when leader=-1,  kafka api like KafkaConsumer.position() will block until either the position could be determined or an unrecoverable error is encountered 

infact,  leader=-1 not easy to avoid,  even replica=3, three disk offline together will trigger the problem, especially when the cluster size is relatively large.    it rely on kafka administrator to fix in time,  but it take risk when in kafka cluster peak period.

This can be addressed by using the invalid leader filter and discovery partition interval.
 

Copy link

boring-cyborg bot commented Apr 7, 2024

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

Copy link
Contributor

@MartijnVisser MartijnVisser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a test for this situation?

@yanspirit
Copy link
Author

Is it possible to add a test for this situation?
This test is a bit tricky as it requires simulating broker crash. I have tested this locally, and it ignores partitions where leader=-1. Once the leader recovers, these partitions will be detected by the discovery-partition and added to process. Should I add a configuration switch for this optimization?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants