Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.WithRetentionPeriod;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

import org.slf4j.Logger;

Expand Down Expand Up @@ -102,6 +104,8 @@ public static class StateStoreMetadata {
// corrupted state store should not be included in checkpointing
private boolean corrupted;

private final long retentionPeriod;


private StateStoreMetadata(final StateStore stateStore,
final CommitCallback commitCallback) {
Expand All @@ -112,6 +116,7 @@ private StateStoreMetadata(final StateStore stateStore,
this.changelogPartition = null;
this.corrupted = false;
this.offset = null;
this.retentionPeriod = -1L;
}

private StateStoreMetadata(final StateStore stateStore,
Expand All @@ -129,12 +134,24 @@ private StateStoreMetadata(final StateStore stateStore,
this.commitCallback = commitCallback;
this.recordConverter = recordConverter;
this.offset = null;
this.retentionPeriod = extractRetentionPeriod(stateStore);
}

private void setOffset(final Long offset) {
this.offset = offset;
}

private static long extractRetentionPeriod(final StateStore stateStore) {
StateStore current = stateStore;
while (current instanceof WrappedStateStore) {
current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
}
if (current instanceof WithRetentionPeriod) {
return ((WithRetentionPeriod) current).retentionPeriod();
}
return -1L;
}

// the offset is exposed to the changelog reader to determine if restoration is completed
Long offset() {
return this.offset;
Expand All @@ -144,6 +161,11 @@ Long endOffset() {
return this.endOffset;
}

// the retentionPeriod is exposed to the changelog reader for window restoration
long retentionPeriod() {
return retentionPeriod;
}

public void setEndOffset(final Long endOffset) {
this.endOffset = endOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -970,6 +971,7 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
final Set<ChangelogMetadata> newPartitionsToRestore) {
// separate those who do not have the current offset loaded from checkpoint
final Set<TopicPartition> newPartitionsWithoutStartOffset = new HashSet<>();
final Map<TopicPartition, Long> newPartitionsWithTimestampSeek = new HashMap<>();

for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) {
final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
Expand All @@ -986,18 +988,22 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
log.debug("Start restoring changelog partition {} from current offset {} to end offset {}.",
partition, currentOffset, recordEndOffset(endOffset));
} else {
log.debug("Start restoring changelog partition {} from the beginning offset to end offset {} " +
"since we cannot find current offset.", partition, recordEndOffset(endOffset));

newPartitionsWithoutStartOffset.add(partition);
final long retentionPeriod = storeMetadata.retentionPeriod();
final long seekTimestamp = retentionPeriod > 0 && retentionPeriod != Long.MAX_VALUE
? time.milliseconds() - retentionPeriod : -1L;
if (seekTimestamp > 0) {
newPartitionsWithTimestampSeek.put(partition, seekTimestamp);
log.debug("Start restoring windowed changelog partition {} from timestamp {} to end offset {}.",
partition, seekTimestamp, recordEndOffset(endOffset));
} else {
log.debug("Start restoring changelog partition {} from the beginning offset to end offset {} " +
"since we cannot find current offset.", partition, recordEndOffset(endOffset));
newPartitionsWithoutStartOffset.add(partition);
}
}
}

// optimization: batch all seek-to-beginning offsets in a single request
// seek is not a blocking call so there's nothing to capture
if (!newPartitionsWithoutStartOffset.isEmpty()) {
restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset);
}
seekToTimestampOrBeginning(newPartitionsWithTimestampSeek, newPartitionsWithoutStartOffset);

for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) {
final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
Expand Down Expand Up @@ -1039,6 +1045,29 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
}
}

private void seekToTimestampOrBeginning(final Map<TopicPartition, Long> partitionsWithTimestampSeek,
final Set<TopicPartition> partitionsWithoutStartOffset) {
// optimization: seek windowed stores by timestamp to skip expired data
if (!partitionsWithTimestampSeek.isEmpty()) {
final Map<TopicPartition, OffsetAndTimestamp> offsetsByTimestamp =
restoreConsumer.offsetsForTimes(partitionsWithTimestampSeek);
offsetsByTimestamp.forEach((key, value) -> {
if (value != null) {
restoreConsumer.seek(key, value.offset());
} else {
// no offset found for the timestamp, fall back to seeking to the beginning
partitionsWithoutStartOffset.add(key);
}
});
}

// optimization: batch all seek-to-beginning offsets in a single request
// seek is not a blocking call so there's nothing to capture
if (!partitionsWithoutStartOffset.isEmpty()) {
restoreConsumer.seekToBeginning(partitionsWithoutStartOffset);
}
}

@Override
public void unregister(final Collection<TopicPartition> revokedChangelogs) {
unregister(revokedChangelogs, StandbyUpdateListener.SuspendReason.MIGRATED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore, WithRetentionPeriod {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);

private final String name;
Expand Down Expand Up @@ -74,6 +74,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
this.retentionPeriod = retentionPeriod;
}

@Override
Comment thread
gabriellefu marked this conversation as resolved.
public long retentionPeriod() {
return retentionPeriod;
}

@Override
public KeyValueIterator<Bytes, byte[]> all() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore, WithRetentionPeriod {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);

private final String name;
Expand All @@ -73,6 +73,11 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
this.segments = segments;
}

@Override
public long retentionPeriod() {
return retentionPeriod;
}

@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;

public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
public class InMemorySessionStore implements SessionStore<Bytes, byte[]>, WithRetentionPeriod {

private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class);

Expand Down Expand Up @@ -89,6 +89,11 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
this.position = Position.emptyPosition();
}

@Override
public long retentionPeriod() {
return retentionPeriod;
}

@Override
public String name() {
return name;
Expand Down Expand Up @@ -590,4 +595,4 @@ private void getNextIterators() {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;


public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>, WithRetentionPeriod {

private static final Logger LOG = LoggerFactory.getLogger(InMemoryWindowStore.class);
private static final int SEQNUM_SIZE = 4;
Expand Down Expand Up @@ -95,6 +95,11 @@ public InMemoryWindowStore(final String name,
this.position = Position.emptyPosition();
}

@Override
public long retentionPeriod() {
return retentionPeriod;
}

@Override
public String name() {
return name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

public interface WithRetentionPeriod {
long retentionPeriod();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -59,6 +60,7 @@

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -1430,6 +1432,116 @@ private void addRecords(final long messages, final TopicPartition topicPartition
}
}

@Test
public void shouldSeekByTimestampForWindowedStoreWithoutCheckpoint() {
final long retentionMs = Duration.ofHours(2).toMillis();
final long offsetForTimestamp = 42L;

// Use a MockConsumer subclass that supports offsetsForTimes
final MockConsumer<byte[], byte[]> timestampConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
@Override
public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
final Map<TopicPartition, OffsetAndTimestamp> result = new HashMap<>();
timestampsToSearch.forEach((key, value) -> result.put(key, new OffsetAndTimestamp(offsetForTimestamp, value)));
return result;
}
};

// Set up mocks - storeMetadata returns null offset (no checkpoint) and positive retentionPeriod
final StateStoreMetadata windowStoreMetadata = mock(StateStoreMetadata.class);
final ProcessorStateManager windowStateManager = mock(ProcessorStateManager.class);
final StateStore windowStore = mock(StateStore.class);
when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
when(windowStoreMetadata.store()).thenReturn(windowStore);
when(windowStoreMetadata.offset()).thenReturn(null);
when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
when(windowStore.name()).thenReturn(storeName);
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
when(windowStateManager.taskType()).thenReturn(ACTIVE);

final TaskId taskId = new TaskId(0, 0);
when(windowStateManager.taskId()).thenReturn(taskId);

timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));

final StoreChangelogReader reader =
new StoreChangelogReader(time, config, logContext, adminClient, timestampConsumer, callback, standbyListener);

reader.register(tp, windowStateManager);
reader.restore(Collections.singletonMap(taskId, mock(Task.class)));

assertEquals(offsetForTimestamp, timestampConsumer.position(tp), "The consumer should be seeked to the offset returned by offsetsForTimes, not to the beginning");
}

@Test
public void shouldSeekToBeginningWhenBrokerReturnsNullForOffsetsForTimes() {
final long retentionMs = Duration.ofHours(2).toMillis();

// Use a MockConsumer subclass that returns null for offsetsForTimes
final MockConsumer<byte[], byte[]> timestampConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
@Override
public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
final Map<TopicPartition, OffsetAndTimestamp> result = new HashMap<>();
timestampsToSearch.forEach((key, value) -> result.put(key, null));
return result;
}
};

final StateStoreMetadata windowStoreMetadata = mock(StateStoreMetadata.class);
final ProcessorStateManager windowStateManager = mock(ProcessorStateManager.class);
final StateStore windowStore = mock(StateStore.class);
when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
when(windowStoreMetadata.store()).thenReturn(windowStore);
when(windowStoreMetadata.offset()).thenReturn(null);
when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
when(windowStore.name()).thenReturn(storeName);
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
when(windowStateManager.taskType()).thenReturn(ACTIVE);

final TaskId taskId = new TaskId(0, 0);
when(windowStateManager.taskId()).thenReturn(taskId);

timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));

final StoreChangelogReader reader =
new StoreChangelogReader(time, config, logContext, adminClient, timestampConsumer, callback, standbyListener);

reader.register(tp, windowStateManager);
reader.restore(Collections.singletonMap(taskId, mock(Task.class)));

assertEquals(0L, timestampConsumer.position(tp), "When broker returns null, should fall back to seeking to the beginning");
}

@Test
public void shouldSeekToBeginningForNonWindowedStoreWithoutCheckpoint() {
final StateStoreMetadata kvStoreMetadata = mock(StateStoreMetadata.class);
final ProcessorStateManager kvStateManager = mock(ProcessorStateManager.class);
final StateStore kvStore = mock(StateStore.class);
when(kvStoreMetadata.changelogPartition()).thenReturn(tp);
when(kvStoreMetadata.store()).thenReturn(kvStore);
when(kvStoreMetadata.offset()).thenReturn(null);
when(kvStoreMetadata.retentionPeriod()).thenReturn(-1L);
when(kvStore.name()).thenReturn(storeName);
when(kvStateManager.storeMetadata(tp)).thenReturn(kvStoreMetadata);
when(kvStateManager.taskType()).thenReturn(ACTIVE);

final TaskId taskId = new TaskId(0, 0);
when(kvStateManager.taskId()).thenReturn(taskId);

consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));

final StoreChangelogReader reader =
new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener);

reader.register(tp, kvStateManager);
reader.restore(Collections.singletonMap(taskId, mock(Task.class)));

assertEquals(0L, consumer.position(tp), "Non-windowed store should seek to beginning, not by timestamp");
}

private void assignPartition(final long messages,
final TopicPartition topicPartition) {
consumer.updatePartitions(
Expand Down
Loading