KAFKA-13499: Avoid restoring outdated records#22115
Conversation
…ctRocksDBSegmentedBytesStore.java revert
…oreChangelogReaderTest.java
frankvicky
left a comment
There was a problem hiding this comment.
@gabriellefu Thanks for the PR.
Please run ./gradlew clean spotlessApply to fix the CI fail.
bbejeck
left a comment
There was a problem hiding this comment.
Thanks @gabriellefu I made a pass
|
|
||
| newPartitionsWithoutStartOffset.add(partition); | ||
| final long retentionPeriod = storeMetadata.retentionPeriod(); | ||
| if (retentionPeriod > 0 && retentionPeriod != Long.MAX_VALUE) { |
There was a problem hiding this comment.
@gabriellefu I was playing around some more and I think I found something else - New standby tasks won't have a valid endOffset, so they need to be filtered out. Otherwise with the restore consumer's auto.offset.reset=none every batched partition falls back to seek-to-beginning.
So we can update the if block to this
if (retentionPeriod > 0 && retentionPeriod != Long.MAX_VALUE && endOffset != null && endOffset > 0)
There was a problem hiding this comment.
I used restoreConsumer.endOffsets() not which should be able to solve the standby task problem
200526e to
3c1f8d1
Compare
|
Merged #22115 into trunk |

checkpoint doesn't exist to seek to certain timestamp to avoid restoring
outdated records.
Instead of the wall clock, use the latest timestamp in the changelog as
the latest time, and seek from the timestamp of
latest_changelog_stamp_time-rention_period.
Reviewers: TengYao Chi frankvicky@apache.org, Bill Bejeck
bbejeck@apache.org