-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-4723] [kafka-connector] Unify committed offsets to Kafka to be the next record to process #2580
Conversation
Seems like one of the new IT tests is a bit unstable, fixing it ... |
Looks quite good. I would suggest one change, though: Can we avoid copying the offsets in the checkpoint into a new map (with increment by one) and passing that to the ZooKeeper Offset Committer or the Kafka Offset Committer? I am just not a big fan of copying things back and forth (especially in "prepareSnaoshot()", which we want to keep as lightweight as possible). Instead, can we have the contract that the offset committers always commit "+1" from the value they get (pretty much as it was in the 0.9 committer after FLINK-4618)? Concerning the tests, is the stability issue fixed there? |
Thanks for the review @StephanEwen. Concerning changing the contract for Thanks for the tip on test stability, I'll do that ;) |
00ce52b
to
a8267dd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the change overall, I had one question regarding a test case. Once that is resolved, and the conflicts are resolved, the change is good to be merged.
} | ||
|
||
/** | ||
* This test first writes a total of 200 records to a test topic, reads the first 100 so that some offsets are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
300, 150
Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); | ||
Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); | ||
|
||
LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", o1, o2, o3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether it makes sense to check that at least one of o1, o2 and o3 is not 300. If they are all 300 below test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be impossible for them to be 300, because we're stopping the first consuming job once it hits the 150th record.
However, I think it is reasonable to check whether at least one of o1, o2, o3 is not null
before proceeding with the next consuming job. We'd want to have at least some start offsets to test the start from committed offsets behaviour. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds reasonable.
Thanks for the review @rmetzger. I've created several local branches to test out the new IT tests stability on Travis as @StephanEwen suggested, and they seem to be fine. I'll rebase this, address the last few comments, and give the changes a final test run before merging. |
… next record to process
865143c
to
f7b4589
Compare
Merging this once tests turn green. |
Merging this to master now ... |
…rd to process This closes apache#2580
The description within the JIRA ticket (FLINK-4723) explains the reasoning for this change.
With this change, offsets committed to Kafka of both 0.8 and 0.9 are larger by 1 compared to the internally checkpointed offsets. This is changed at the
FlinkKafkaConsumerBase
level, so that offsets given through the abstractcommitSpecificOffsetsToKafka()
method to the version-specific implementations are already incremented and represent the next record to process. This way, the version-specific implementations simply commit the given offsets without the need to manipulate them.To test the behaviour on both connector versions, this PR also includes major refactoring of the IT tests by adding offset committing related IT tests to
FlinkKafkaConsumerTestBase
, and let both the 0.8 and 0.9 consumers run offset committing / initial offset startup tests (previously only the 0.8 consumer had these tests).R: @rmetzger what do you think of this?