-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-7732][kafka-consumer] Do not commit to kafka Flink's sentinel offsets #4928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
tzulitai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch on this issue @pnowojski.
One question:
With this change, if we restore a savepoint from say 1.3, that has a sentinel offset in it, and after the restore a checkpoint also occurs before that sentinel is replaced, that partition state would then be dropped, correct? I think we might need to cover that case also before applying this fix.
| return offset != KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET; | ||
| } | ||
|
|
||
| public final boolean isSentinel() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would hasSentinelOffset be a better name here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In new approach method was dropped, but indeed that would be a better name :)
| public static final long GROUP_OFFSET = -915623761773L; | ||
|
|
||
| public static boolean isSentinel(long offset) { | ||
| return offset < 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this implementation could be a bit too broad. Could be a bit more specific by matching the static values in KafkaTopicPartitionStateSentinel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka doesn't allow to commit any negative values, so this check makes sense.
|
By the way, what exactly was the error that caused the application crash in the described case? |
|
@tzulitai please check the details in the ticket: https://issues.apache.org/jira/browse/FLINK-7732 I have changed the approach as we discussed and now we filtering out happens just before committing offsets. |
tzulitai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pnowojski, changes LGTM. Will wait for Travis to finish a test run.
| Map<KafkaTopicPartition, Long> offsets, | ||
| @Nonnull KafkaCommitCallback commitCallback) throws Exception; | ||
|
|
||
| private Map<KafkaTopicPartition, Long> filerOutSentinels(Map<KafkaTopicPartition, Long> offsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: filterOutSentinels, missing t.
|
Travis passes, merging ... |
|
Thanks! |
What is the purpose of the change
This pull request fixes a bug in all versions of
KafkaConsumers, that resulted in application crash, if snapshot beforeKafkaConsumerwas able to asynchronously properly initialize.Verifying this change
This change adds additional unit test in
AbstractFetcherTestand a check state to ensure/enforce contract that internal sentinel offsets will not be committed to Kafka.Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation