Skip to content

KAFKA-13499: Avoid restoring outdated records#21779

Open
gabriellefu wants to merge 1 commit intoapache:trunkfrom
gabriellefu:restoring_window
Open

KAFKA-13499: Avoid restoring outdated records#21779
gabriellefu wants to merge 1 commit intoapache:trunkfrom
gabriellefu:restoring_window

Conversation

@gabriellefu
Copy link
Contributor

@gabriellefu gabriellefu commented Mar 16, 2026

  1. Expose the retentionPeriod length to storeMetadata
  2. In prepareChangelogs(), switch it from always seektobeginning if
    checkpoint doesn't exist to seek to certain timestamp to avoid
    restoring outdated records.

Reviewers: Bill Bejeck bbejeck@apache.org

@github-actions github-actions bot added triage PRs from the community streams labels Mar 16, 2026
@gabriellefu gabriellefu marked this pull request as ready for review March 16, 2026 20:38
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @gabriellefu! Overall this looks good with a few comments.

current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
}
// Now 'current' is the innermost store. Check what type it is.
if (current instanceof AbstractRocksDBSegmentedBytesStore) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinking instead of the mulitple instanceof checks we could introduce an interface WithRetentionPeriod (the name is up for debate) containing the single method retentionPeriod and have the store types here implement it. Since all of the instances here are internal API this is possble without requiring a KIP. So the multiple checks would go to one

if (current instanceof WithRetentionPeriod) {
        return ((WithRetentionPeriod) current).retentionPeriod();
}

This would also provide the added benefit of automatically picking up any store needing/using a retention period.

\cc @mjsax @aliehsaeedii @lucasbru @frankvicky

@@ -88,6 +88,9 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
this.metricScope = metricScope;
this.position = Position.emptyPosition();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add a space here

}

@Test
public void shouldSeekByTimestampForWindowedStoreWithoutCheckpoint() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add tests for negative test cases, broker doesn't have info and returns null for offsetsForTimes or for the non-windowed store case

@github-actions github-actions bot removed the triage PRs from the community label Mar 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants