Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
04b9fc7
restore window code
gabriellefu Mar 30, 2026
87ed81f
revert some space
gabriellefu Mar 30, 2026
b4a9ff0
revert StoreChangelogReader.java
gabriellefu Mar 30, 2026
8d7418f
revert ProcessorStateManager.java
gabriellefu Mar 30, 2026
4b16700
streams/src/main/java/org/apache/kafka/streams/state/internals/Abstra…
gabriellefu Mar 30, 2026
feff5e6
streams/src/test/java/org/apache/kafka/streams/processor/internals/St…
gabriellefu Mar 30, 2026
76d3d94
adding the interface
gabriellefu Mar 31, 2026
d7c0b72
adding WithRetentionPeriod
gabriellefu Mar 31, 2026
72413c0
fix the bug cause by the gap between wall clock and streams clock
gabriellefu Apr 21, 2026
4c5feae
fix the bug cause by the gap between wall clock and streams clock
gabriellefu Apr 21, 2026
4d61fb0
use the latest timestamp as the time to restore
gabriellefu Apr 22, 2026
ee9aee2
cleanup some code
gabriellefu Apr 22, 2026
55535e3
one space
gabriellefu Apr 22, 2026
045b0f2
simply the way of getting the max_timestamp
gabriellefu Apr 22, 2026
c0bf24e
format
gabriellefu Apr 22, 2026
5dc78fa
fix new standby task npe
gabriellefu Apr 29, 2026
3c1f8d1
filter out standby task
gabriellefu May 1, 2026
519a2d4
restoreConsumer.endOffsets() + poll for standby
gabriellefu May 1, 2026
02a61e6
use seek to end to get the last offset
gabriellefu May 1, 2026
1e8d5c4
use restoreConsumer.endOffsets(partitions)
gabriellefu May 2, 2026
2ba2da2
add seek to beginning
gabriellefu May 2, 2026
0b0a866
change the name to make it less confusing
gabriellefu May 2, 2026
c2ec3be
fix the bug
gabriellefu May 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.WithRetentionPeriod;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

import org.slf4j.Logger;

Expand Down Expand Up @@ -103,6 +105,8 @@ public static class StateStoreMetadata {
// corrupted state store should not be included in checkpointing
private boolean corrupted;

private final long retentionPeriod;


private StateStoreMetadata(final StateStore stateStore,
final CommitCallback commitCallback) {
Expand All @@ -113,6 +117,7 @@ private StateStoreMetadata(final StateStore stateStore,
this.changelogPartition = null;
this.corrupted = false;
this.offset = null;
this.retentionPeriod = -1L;
}

private StateStoreMetadata(final StateStore stateStore,
Expand All @@ -130,12 +135,24 @@ private StateStoreMetadata(final StateStore stateStore,
this.commitCallback = commitCallback;
this.recordConverter = recordConverter;
this.offset = null;
this.retentionPeriod = extractRetentionPeriod(stateStore);
}

private void setOffset(final Long offset) {
this.offset = offset;
}

private static long extractRetentionPeriod(final StateStore stateStore) {
StateStore current = stateStore;
while (current instanceof WrappedStateStore) {
current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
}
if (current instanceof WithRetentionPeriod) {
return ((WithRetentionPeriod) current).retentionPeriod();
}
return -1L;
}

// the offset is exposed to the changelog reader to determine if restoration is completed
Long offset() {
return this.offset;
Expand All @@ -145,6 +162,11 @@ Long endOffset() {
return this.endOffset;
}

// the retentionPeriod is exposed to the changelog reader for window restoration
long retentionPeriod() {
return retentionPeriod;
}

public void setEndOffset(final Long endOffset) {
this.endOffset = endOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -969,7 +970,8 @@ private void resumeChangelogsFromRestoreConsumer(final Collection<TopicPartition
private void prepareChangelogs(final Map<TaskId, Task> tasks,
final Set<ChangelogMetadata> newPartitionsToRestore) {
// separate those who do not have the current offset loaded from checkpoint
final Set<TopicPartition> newPartitionsWithoutStartOffset = new HashSet<>();
final Set<TopicPartition> newSeekToBeginningPartitions = new HashSet<>();
final Map<TopicPartition, Long> newWindowedPartitionsRetention = new HashMap<>();

for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) {
final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
Expand All @@ -986,18 +988,18 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
log.debug("Start restoring changelog partition {} from current offset {} to end offset {}.",
partition, currentOffset, recordEndOffset(endOffset));
} else {
log.debug("Start restoring changelog partition {} from the beginning offset to end offset {} " +
"since we cannot find current offset.", partition, recordEndOffset(endOffset));

newPartitionsWithoutStartOffset.add(partition);
final long retentionPeriod = storeMetadata.retentionPeriod();
if (retentionPeriod > 0 && retentionPeriod != Long.MAX_VALUE) {
newWindowedPartitionsRetention.put(partition, retentionPeriod);
} else {
log.debug("Start restoring changelog partition {} from the beginning offset to end offset {} " +
"since we cannot find current offset.", partition, recordEndOffset(endOffset));
newSeekToBeginningPartitions.add(partition);
}
}
}

// optimization: batch all seek-to-beginning offsets in a single request
// seek is not a blocking call so there's nothing to capture
if (!newPartitionsWithoutStartOffset.isEmpty()) {
restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset);
}
seekNewPartitions(newWindowedPartitionsRetention, newSeekToBeginningPartitions);

for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) {
final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
Expand Down Expand Up @@ -1039,6 +1041,99 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
}
}

private void seekNewPartitions(final Map<TopicPartition, Long> windowedPartitionsRetention,
final Set<TopicPartition> seekToBeginningPartitions) {
// Seek non-windowed partitions to beginning.
if (!seekToBeginningPartitions.isEmpty()) {
restoreConsumer.seekToBeginning(seekToBeginningPartitions);
}

// Try to optimize windowed partitions by seeking past expired data.
if (!windowedPartitionsRetention.isEmpty()) {
final Set<TopicPartition> allAssigned = restoreConsumer.assignment();
final Set<TopicPartition> previouslyPaused = new HashSet<>(restoreConsumer.paused());

try {
restoreConsumer.pause(allAssigned);
restoreConsumer.resume(windowedPartitionsRetention.keySet());

final Map<TopicPartition, Long> endOffsets =
restoreConsumer.endOffsets(windowedPartitionsRetention.keySet());

for (final TopicPartition partition : windowedPartitionsRetention.keySet()) {
final Long endOffset = endOffsets.get(partition);
if (endOffset != null && endOffset > 0) {
restoreConsumer.seek(partition, endOffset - 1);
} else {
restoreConsumer.seekToBeginning(Collections.singleton(partition));
seekToBeginningPartitions.add(partition);
}
}
windowedPartitionsRetention.keySet().removeAll(seekToBeginningPartitions);

final ConsumerRecords<byte[], byte[]> polledRecords = restoreConsumer.poll(pollTime);

seekByRetentionFromPolledRecords(polledRecords, windowedPartitionsRetention, seekToBeginningPartitions);
} catch (final TimeoutException e) {
log.debug("Could not seek by timestamp for changelog partitions {}, falling back to seek-to-beginning",
windowedPartitionsRetention.keySet(), e);
seekToBeginningPartitions.addAll(windowedPartitionsRetention.keySet());
} catch (final KafkaException e) {
log.warn("Failed to seek by timestamp for changelog partitions {}, falling back to seek-to-beginning",
windowedPartitionsRetention.keySet(), e);
seekToBeginningPartitions.addAll(windowedPartitionsRetention.keySet());
} finally {
restoreConsumer.pause(allAssigned);
final Set<TopicPartition> toResume = new HashSet<>(allAssigned);
toResume.removeAll(previouslyPaused);
if (!toResume.isEmpty()) {
restoreConsumer.resume(toResume);
}
}
}

// Seek any windowed partitions that failed during the optimization back to the beginning.
// Their position was moved by seek+poll above.
if (!seekToBeginningPartitions.isEmpty()) {
restoreConsumer.seekToBeginning(seekToBeginningPartitions);
}
}

private void seekByRetentionFromPolledRecords(final ConsumerRecords<byte[], byte[]> polledRecords,
final Map<TopicPartition, Long> windowedPartitionsRetention,
final Set<TopicPartition> seekToBeginningPartitions) {
final Map<TopicPartition, Long> seekTimestamps = new HashMap<>();
for (final Map.Entry<TopicPartition, Long> entry : windowedPartitionsRetention.entrySet()) {
final TopicPartition partition = entry.getKey();
final long retentionPeriod = entry.getValue();
final List<ConsumerRecord<byte[], byte[]>> records = polledRecords.records(partition);
if (!records.isEmpty()) {
final long latestTimestamp = records.get(0).timestamp();
final long seekTimestamp = latestTimestamp - retentionPeriod;
if (seekTimestamp > 0) {
seekTimestamps.put(partition, seekTimestamp);
log.debug("Start restoring windowed changelog partition {} from stream-time-based timestamp {} " +
"(maxStreamTime={}, retention={}).", partition, seekTimestamp, latestTimestamp, retentionPeriod);
continue;
}
}
log.debug("Start restoring changelog partition {} from the beginning.", partition);
seekToBeginningPartitions.add(partition);
}

if (!seekTimestamps.isEmpty()) {
final Map<TopicPartition, OffsetAndTimestamp> offsetsByTimestamp =
restoreConsumer.offsetsForTimes(seekTimestamps);
offsetsByTimestamp.forEach((partition, offsetAndTimestamp) -> {
if (offsetAndTimestamp != null) {
restoreConsumer.seek(partition, offsetAndTimestamp.offset());
} else {
seekToBeginningPartitions.add(partition);
}
});
}
}

@Override
public void unregister(final Collection<TopicPartition> revokedChangelogs) {
unregister(revokedChangelogs, StandbyUpdateListener.SuspendReason.MIGRATED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore, WithRetentionPeriod {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);

private final String name;
Expand Down Expand Up @@ -74,6 +74,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
this.retentionPeriod = retentionPeriod;
}

@Override
public long retentionPeriod() {
return retentionPeriod;
}

@Override
public KeyValueIterator<Bytes, byte[]> all() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore, WithRetentionPeriod {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);

private final String name;
Expand All @@ -73,6 +73,11 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
this.segments = segments;
}

@Override
public long retentionPeriod() {
return retentionPeriod;
}

@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;

public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
public class InMemorySessionStore implements SessionStore<Bytes, byte[]>, WithRetentionPeriod {

private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class);

Expand Down Expand Up @@ -91,6 +91,11 @@ public InMemorySessionStore(
this.position = Position.emptyPosition();
}

@Override
public long retentionPeriod() {
return retentionPeriod;
}

@Override
public String name() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;


public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>, WithRetentionPeriod {

private static final Logger LOG = LoggerFactory.getLogger(InMemoryWindowStore.class);
private static final int SEQNUM_SIZE = 4;
Expand Down Expand Up @@ -95,6 +95,11 @@ public InMemoryWindowStore(final String name,
this.position = Position.emptyPosition();
}

@Override
public long retentionPeriod() {
return retentionPeriod;
}

@Override
public String name() {
return name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

public interface WithRetentionPeriod {
long retentionPeriod();
}
Loading
Loading