diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 0992d33f70095..10963c8bf82e6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -36,6 +36,8 @@ import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore; import org.apache.kafka.streams.state.internals.RecordConverter; import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer; +import org.apache.kafka.streams.state.internals.WithRetentionPeriod; +import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.slf4j.Logger; @@ -103,6 +105,8 @@ public static class StateStoreMetadata { // corrupted state store should not be included in checkpointing private boolean corrupted; + private final long retentionPeriod; + private StateStoreMetadata(final StateStore stateStore, final CommitCallback commitCallback) { @@ -113,6 +117,7 @@ private StateStoreMetadata(final StateStore stateStore, this.changelogPartition = null; this.corrupted = false; this.offset = null; + this.retentionPeriod = -1L; } private StateStoreMetadata(final StateStore stateStore, @@ -130,12 +135,24 @@ private StateStoreMetadata(final StateStore stateStore, this.commitCallback = commitCallback; this.recordConverter = recordConverter; this.offset = null; + this.retentionPeriod = extractRetentionPeriod(stateStore); } private void setOffset(final Long offset) { this.offset = offset; } + private static long extractRetentionPeriod(final StateStore stateStore) { + StateStore current = stateStore; + while (current instanceof WrappedStateStore) { + current = ((WrappedStateStore) current).wrapped(); + } + if (current instanceof WithRetentionPeriod) { + return ((WithRetentionPeriod) current).retentionPeriod(); + } + return -1L; + } + // the offset is exposed to the changelog reader to determine if restoration is completed Long offset() { return this.offset; @@ -145,6 +162,11 @@ Long endOffset() { return this.endOffset; } + // the retentionPeriod is exposed to the changelog reader for window restoration + long retentionPeriod() { + return retentionPeriod; + } + public void setEndOffset(final Long endOffset) { this.endOffset = endOffset; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index eab7da800d882..51287a9df55ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -969,7 +970,8 @@ private void resumeChangelogsFromRestoreConsumer(final Collection tasks, final Set newPartitionsToRestore) { // separate those who do not have the current offset loaded from checkpoint - final Set newPartitionsWithoutStartOffset = new HashSet<>(); + final Set newSeekToBeginningPartitions = new HashSet<>(); + final Map newWindowedPartitionsRetention = new HashMap<>(); for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) { final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; @@ -986,18 +988,18 @@ private void prepareChangelogs(final Map tasks, log.debug("Start restoring changelog partition {} from current offset {} to end offset {}.", partition, currentOffset, recordEndOffset(endOffset)); } else { - log.debug("Start restoring changelog partition {} from the beginning offset to end offset {} " + - "since we cannot find current offset.", partition, recordEndOffset(endOffset)); - - newPartitionsWithoutStartOffset.add(partition); + final long retentionPeriod = storeMetadata.retentionPeriod(); + if (retentionPeriod > 0 && retentionPeriod != Long.MAX_VALUE) { + newWindowedPartitionsRetention.put(partition, retentionPeriod); + } else { + log.debug("Start restoring changelog partition {} from the beginning offset to end offset {} " + + "since we cannot find current offset.", partition, recordEndOffset(endOffset)); + newSeekToBeginningPartitions.add(partition); + } } } - // optimization: batch all seek-to-beginning offsets in a single request - // seek is not a blocking call so there's nothing to capture - if (!newPartitionsWithoutStartOffset.isEmpty()) { - restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset); - } + seekNewPartitions(newWindowedPartitionsRetention, newSeekToBeginningPartitions); for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) { final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; @@ -1039,6 +1041,99 @@ private void prepareChangelogs(final Map tasks, } } + private void seekNewPartitions(final Map windowedPartitionsRetention, + final Set seekToBeginningPartitions) { + // Seek non-windowed partitions to beginning. + if (!seekToBeginningPartitions.isEmpty()) { + restoreConsumer.seekToBeginning(seekToBeginningPartitions); + } + + // Try to optimize windowed partitions by seeking past expired data. + if (!windowedPartitionsRetention.isEmpty()) { + final Set allAssigned = restoreConsumer.assignment(); + final Set previouslyPaused = new HashSet<>(restoreConsumer.paused()); + + try { + restoreConsumer.pause(allAssigned); + restoreConsumer.resume(windowedPartitionsRetention.keySet()); + + final Map endOffsets = + restoreConsumer.endOffsets(windowedPartitionsRetention.keySet()); + + for (final TopicPartition partition : windowedPartitionsRetention.keySet()) { + final Long endOffset = endOffsets.get(partition); + if (endOffset != null && endOffset > 0) { + restoreConsumer.seek(partition, endOffset - 1); + } else { + restoreConsumer.seekToBeginning(Collections.singleton(partition)); + seekToBeginningPartitions.add(partition); + } + } + windowedPartitionsRetention.keySet().removeAll(seekToBeginningPartitions); + + final ConsumerRecords polledRecords = restoreConsumer.poll(pollTime); + + seekByRetentionFromPolledRecords(polledRecords, windowedPartitionsRetention, seekToBeginningPartitions); + } catch (final TimeoutException e) { + log.debug("Could not seek by timestamp for changelog partitions {}, falling back to seek-to-beginning", + windowedPartitionsRetention.keySet(), e); + seekToBeginningPartitions.addAll(windowedPartitionsRetention.keySet()); + } catch (final KafkaException e) { + log.warn("Failed to seek by timestamp for changelog partitions {}, falling back to seek-to-beginning", + windowedPartitionsRetention.keySet(), e); + seekToBeginningPartitions.addAll(windowedPartitionsRetention.keySet()); + } finally { + restoreConsumer.pause(allAssigned); + final Set toResume = new HashSet<>(allAssigned); + toResume.removeAll(previouslyPaused); + if (!toResume.isEmpty()) { + restoreConsumer.resume(toResume); + } + } + } + + // Seek any windowed partitions that failed during the optimization back to the beginning. + // Their position was moved by seek+poll above. + if (!seekToBeginningPartitions.isEmpty()) { + restoreConsumer.seekToBeginning(seekToBeginningPartitions); + } + } + + private void seekByRetentionFromPolledRecords(final ConsumerRecords polledRecords, + final Map windowedPartitionsRetention, + final Set seekToBeginningPartitions) { + final Map seekTimestamps = new HashMap<>(); + for (final Map.Entry entry : windowedPartitionsRetention.entrySet()) { + final TopicPartition partition = entry.getKey(); + final long retentionPeriod = entry.getValue(); + final List> records = polledRecords.records(partition); + if (!records.isEmpty()) { + final long latestTimestamp = records.get(0).timestamp(); + final long seekTimestamp = latestTimestamp - retentionPeriod; + if (seekTimestamp > 0) { + seekTimestamps.put(partition, seekTimestamp); + log.debug("Start restoring windowed changelog partition {} from stream-time-based timestamp {} " + + "(maxStreamTime={}, retention={}).", partition, seekTimestamp, latestTimestamp, retentionPeriod); + continue; + } + } + log.debug("Start restoring changelog partition {} from the beginning.", partition); + seekToBeginningPartitions.add(partition); + } + + if (!seekTimestamps.isEmpty()) { + final Map offsetsByTimestamp = + restoreConsumer.offsetsForTimes(seekTimestamps); + offsetsByTimestamp.forEach((partition, offsetAndTimestamp) -> { + if (offsetAndTimestamp != null) { + restoreConsumer.seek(partition, offsetAndTimestamp.offset()); + } else { + seekToBeginningPartitions.add(partition); + } + }); + } + } + @Override public void unregister(final Collection revokedChangelogs) { unregister(revokedChangelogs, StandbyUpdateListener.SuspendReason.MIGRATED); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index f353e56f7df77..9dbb4210aa7c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -46,7 +46,7 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; -public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements SegmentedBytesStore { +public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements SegmentedBytesStore, WithRetentionPeriod { private static final Logger LOG = LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class); private final String name; @@ -74,6 +74,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore all() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index c6b57411067c2..c11e276a19e3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -48,7 +48,7 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext; -public class AbstractRocksDBSegmentedBytesStore implements SegmentedBytesStore { +public class AbstractRocksDBSegmentedBytesStore implements SegmentedBytesStore, WithRetentionPeriod { private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class); private final String name; @@ -73,6 +73,11 @@ public class AbstractRocksDBSegmentedBytesStore implements Se this.segments = segments; } + @Override + public long retentionPeriod() { + return retentionPeriod; + } + @Override public KeyValueIterator fetch(final Bytes key, final long from, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 9d3936d0eb0b6..dd3f083aa10ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -54,7 +54,7 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; -public class InMemorySessionStore implements SessionStore { +public class InMemorySessionStore implements SessionStore, WithRetentionPeriod { private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class); @@ -91,6 +91,11 @@ public InMemorySessionStore( this.position = Position.emptyPosition(); } + @Override + public long retentionPeriod() { + return retentionPeriod; + } + @Override public String name() { return name; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 8d2228db5c3d5..6f5b1095f47ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -59,7 +59,7 @@ import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp; -public class InMemoryWindowStore implements WindowStore { +public class InMemoryWindowStore implements WindowStore, WithRetentionPeriod { private static final Logger LOG = LoggerFactory.getLogger(InMemoryWindowStore.class); private static final int SEQNUM_SIZE = 4; @@ -95,6 +95,11 @@ public InMemoryWindowStore(final String name, this.position = Position.emptyPosition(); } + @Override + public long retentionPeriod() { + return retentionPeriod; + } + @Override public String name() { return name; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java new file mode 100644 index 0000000000000..44dee28eafaac --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +public interface WithRetentionPeriod { + long retentionPeriod(); +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 72954175f5b4a..e3276cefefbac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -27,11 +27,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -59,7 +62,9 @@ import java.time.Duration; import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -1430,6 +1435,133 @@ private void addRecords(final long messages, final TopicPartition topicPartition } } + @Test + public void shouldSeekByTimestampForWindowedStoreWithoutCheckpoint() { + final long retentionMs = Duration.ofHours(2).toMillis(); + final long offsetForTimestamp = 42L; + final long latestRecordTimestamp = 10_000_000L; + final long endOffset = 100L; + + final MockConsumer timestampConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { + @Override + public synchronized Map offsetsForTimes(final Map timestampsToSearch) { + final Map result = new HashMap<>(); + timestampsToSearch.forEach((key, value) -> result.put(key, new OffsetAndTimestamp(offsetForTimestamp, value))); + return result; + } + }; + + final StateStoreMetadata windowStoreMetadata = mock(StateStoreMetadata.class); + final ProcessorStateManager windowStateManager = mock(ProcessorStateManager.class); + final StateStore windowStore = mock(StateStore.class); + when(windowStoreMetadata.changelogPartition()).thenReturn(tp); + when(windowStoreMetadata.store()).thenReturn(windowStore); + when(windowStoreMetadata.offset()).thenReturn(null); + when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs); + when(windowStore.name()).thenReturn(storeName); + when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata); + when(windowStateManager.taskType()).thenReturn(ACTIVE); + + final TaskId taskId = new TaskId(0, 0); + when(windowStateManager.taskId()).thenReturn(taskId); + + timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L)); + timestampConsumer.updateEndOffsets(Collections.singletonMap(tp, endOffset)); + adminClient.updateEndOffsets(Collections.singletonMap(tp, endOffset)); + + // schedule adding the record during poll, after the partition is assigned + timestampConsumer.schedulePollTask(() -> timestampConsumer.addRecord(new ConsumerRecord<>( + tp.topic(), tp.partition(), endOffset - 1, + latestRecordTimestamp, TimestampType.CREATE_TIME, + 0, 0, new byte[0], new byte[0], + new RecordHeaders(), Optional.empty()))); + + final StoreChangelogReader reader = + new StoreChangelogReader(time, config, logContext, adminClient, timestampConsumer, callback, standbyListener); + + reader.register(tp, windowStateManager); + reader.restore(Collections.singletonMap(taskId, mock(Task.class))); + + assertEquals(offsetForTimestamp, timestampConsumer.position(tp), "The consumer should be seeked to the offset returned by offsetsForTimes, not to the beginning"); + } + + @Test + public void shouldSeekToBeginningWhenBrokerReturnsNullForOffsetsForTimes() { + final long retentionMs = Duration.ofHours(2).toMillis(); + final long latestRecordTimestamp = 10_000_000L; + final long endOffset = 100L; + + final MockConsumer timestampConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) { + @Override + public synchronized Map offsetsForTimes(final Map timestampsToSearch) { + final Map result = new HashMap<>(); + timestampsToSearch.forEach((key, value) -> result.put(key, null)); + return result; + } + }; + + final StateStoreMetadata windowStoreMetadata = mock(StateStoreMetadata.class); + final ProcessorStateManager windowStateManager = mock(ProcessorStateManager.class); + final StateStore windowStore = mock(StateStore.class); + when(windowStoreMetadata.changelogPartition()).thenReturn(tp); + when(windowStoreMetadata.store()).thenReturn(windowStore); + when(windowStoreMetadata.offset()).thenReturn(null); + when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs); + when(windowStore.name()).thenReturn(storeName); + when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata); + when(windowStateManager.taskType()).thenReturn(ACTIVE); + + final TaskId taskId = new TaskId(0, 0); + when(windowStateManager.taskId()).thenReturn(taskId); + + timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L)); + timestampConsumer.updateEndOffsets(Collections.singletonMap(tp, endOffset)); + adminClient.updateEndOffsets(Collections.singletonMap(tp, endOffset)); + + // schedule adding the record during poll, after the partition is assigned + timestampConsumer.schedulePollTask(() -> timestampConsumer.addRecord(new ConsumerRecord<>( + tp.topic(), tp.partition(), endOffset - 1, + latestRecordTimestamp, TimestampType.CREATE_TIME, + 0, 0, new byte[0], new byte[0], + new RecordHeaders(), Optional.empty()))); + + final StoreChangelogReader reader = + new StoreChangelogReader(time, config, logContext, adminClient, timestampConsumer, callback, standbyListener); + + reader.register(tp, windowStateManager); + reader.restore(Collections.singletonMap(taskId, mock(Task.class))); + + assertEquals(0L, timestampConsumer.position(tp), "When broker returns null, should fall back to seeking to the beginning"); + } + + @Test + public void shouldSeekToBeginningForNonWindowedStoreWithoutCheckpoint() { + final StateStoreMetadata kvStoreMetadata = mock(StateStoreMetadata.class); + final ProcessorStateManager kvStateManager = mock(ProcessorStateManager.class); + final StateStore kvStore = mock(StateStore.class); + when(kvStoreMetadata.changelogPartition()).thenReturn(tp); + when(kvStoreMetadata.store()).thenReturn(kvStore); + when(kvStoreMetadata.offset()).thenReturn(null); + when(kvStoreMetadata.retentionPeriod()).thenReturn(-1L); + when(kvStore.name()).thenReturn(storeName); + when(kvStateManager.storeMetadata(tp)).thenReturn(kvStoreMetadata); + when(kvStateManager.taskType()).thenReturn(ACTIVE); + + final TaskId taskId = new TaskId(0, 0); + when(kvStateManager.taskId()).thenReturn(taskId); + + consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L)); + adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L)); + + final StoreChangelogReader reader = + new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback, standbyListener); + + reader.register(tp, kvStateManager); + reader.restore(Collections.singletonMap(taskId, mock(Task.class))); + + assertEquals(0L, consumer.position(tp), "Non-windowed store should seek to beginning, not by timestamp"); + } + private void assignPartition(final long messages, final TopicPartition topicPartition) { consumer.updatePartitions(