From b372f21b107f7ad138b23a92f557402f2e05fe87 Mon Sep 17 00:00:00 2001 From: Mohit Tambi Date: Tue, 14 Oct 2025 18:44:34 +0530 Subject: [PATCH] Offset handling, data loss, fault tolerance added --- .../clients/consumer/ConsumerConfig.java | 56 +++ .../consumer/internals/DataLossDetector.java | 362 ++++++++++++++++++ .../consumer/internals/OffsetFetcher.java | 5 +- .../internals/OffsetFetcherUtils.java | 34 +- .../internals/OffsetsRequestManager.java | 5 +- .../common/errors/DataLossException.java | 101 +++++ .../internals/DataLossDetectorTest.java | 137 +++++++ docs/data-loss-detection.md | 300 +++++++++++++++ 8 files changed, 995 insertions(+), 5 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DataLossDetector.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/DataLossException.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DataLossDetectorTest.java create mode 100644 docs/data-loss-detection.md diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index ee62dc9561b41..333cd883be48d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -180,6 +180,39 @@ public class ConsumerConfig extends AbstractConfig { "

Note that altering partition numbers while setting this config to latest may cause message delivery loss since " + "producers could start to send messages to newly added partitions (i.e. no initial offsets exist yet) before consumers reset their offsets."; + /** + * enable.data.loss.detection + */ + public static final String ENABLE_DATA_LOSS_DETECTION_CONFIG = "enable.data.loss.detection"; + public static final String ENABLE_DATA_LOSS_DETECTION_DOC = "Enables data loss detection for the consumer. When enabled, the consumer will detect and handle scenarios " + + "that could lead to data loss, including offset gaps, topic recreation, and out-of-range offsets. The behavior when data loss is detected depends on the " + + "auto.offset.reset strategy: 'none' will throw a DataLossException, while 'earliest' and 'latest' will log warnings and attempt recovery."; + public static final boolean DEFAULT_ENABLE_DATA_LOSS_DETECTION = false; + + /** + * data.loss.detection.gap.threshold + */ + public static final String DATA_LOSS_DETECTION_GAP_THRESHOLD_CONFIG = "data.loss.detection.gap.threshold"; + public static final String DATA_LOSS_DETECTION_GAP_THRESHOLD_DOC = "The maximum allowed offset gap before considering it a potential data loss scenario. " + + "Smaller values provide stricter detection but may cause false positives during normal retention. Larger values are more lenient but may miss actual data loss."; + public static final long DEFAULT_DATA_LOSS_DETECTION_GAP_THRESHOLD = 1000L; + + /** + * data.loss.detection.validation.interval.ms + */ + public static final String DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_CONFIG = "data.loss.detection.validation.interval.ms"; + public static final String DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_DOC = "The interval in milliseconds between continuous data loss validation checks during normal consumption. " + + "This helps detect silent data loss due to retention policies between normal poll operations."; + public static final long DEFAULT_DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS = 30000L; // 30 seconds + + /** + * data.loss.detection.grace.period.ms + */ + public static final String DATA_LOSS_DETECTION_GRACE_PERIOD_MS_CONFIG = "data.loss.detection.grace.period.ms"; + public static final String DATA_LOSS_DETECTION_GRACE_PERIOD_MS_DOC = "Grace period in milliseconds to avoid false positives during topic recreation or broker maintenance. " + + "Suspected data loss events within this period after initialization will be logged as warnings instead of throwing exceptions."; + public static final long DEFAULT_DATA_LOSS_DETECTION_GRACE_PERIOD_MS = 5000L; // 5 seconds + /** * fetch.min.bytes */ @@ -546,6 +579,29 @@ public class ConsumerConfig extends AbstractConfig { new AutoOffsetResetStrategy.Validator(), Importance.MEDIUM, AUTO_OFFSET_RESET_DOC) + .define(ENABLE_DATA_LOSS_DETECTION_CONFIG, + Type.BOOLEAN, + DEFAULT_ENABLE_DATA_LOSS_DETECTION, + Importance.MEDIUM, + ENABLE_DATA_LOSS_DETECTION_DOC) + .define(DATA_LOSS_DETECTION_GAP_THRESHOLD_CONFIG, + Type.LONG, + DEFAULT_DATA_LOSS_DETECTION_GAP_THRESHOLD, + atLeast(1), + Importance.LOW, + DATA_LOSS_DETECTION_GAP_THRESHOLD_DOC) + .define(DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_CONFIG, + Type.LONG, + DEFAULT_DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS, + atLeast(1000), + Importance.LOW, + DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_DOC) + .define(DATA_LOSS_DETECTION_GRACE_PERIOD_MS_CONFIG, + Type.LONG, + DEFAULT_DATA_LOSS_DETECTION_GRACE_PERIOD_MS, + atLeast(0), + Importance.LOW, + DATA_LOSS_DETECTION_GRACE_PERIOD_MS_DOC) .define(CHECK_CRCS_CONFIG, Type.BOOLEAN, true, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DataLossDetector.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DataLossDetector.java new file mode 100644 index 0000000000000..d8cf932e4db67 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DataLossDetector.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.DataLossException; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Utility class for detecting data loss scenarios in Kafka consumers. + * This class implements various data loss detection mechanisms including: + * - Offset gap detection + * - Topic recreation detection + * - Out-of-range offset detection + * - Silent data loss from retention policies + * - False positive mitigation + */ +public class DataLossDetector { + + private final Logger log; + private final Map lastSeenOffsets = new HashMap<>(); + private final Map topicGenerations = new HashMap<>(); + private final Map lastValidatedTimestamp = new HashMap<>(); + private final Map partitionMetadata = new HashMap<>(); + + // Configuration for detection sensitivity + private static final long VALIDATION_INTERVAL_MS = 30000; // 30 seconds + private static final long MAX_OFFSET_GAP_THRESHOLD = 1000; // Configurable threshold + private static final long TOPIC_RECREATION_GRACE_PERIOD_MS = 5000; // 5 seconds grace period + + public DataLossDetector(LogContext logContext) { + this.log = logContext.logger(DataLossDetector.class); + } + + /** + * Enhanced metadata tracking for partitions + */ + private static class PartitionMetadata { + final long firstKnownOffset; + final long lastKnownEndOffset; + final long creationTimestamp; + final boolean isNewTopic; + + PartitionMetadata(long firstKnownOffset, long lastKnownEndOffset, long creationTimestamp, boolean isNewTopic) { + this.firstKnownOffset = firstKnownOffset; + this.lastKnownEndOffset = lastKnownEndOffset; + this.creationTimestamp = creationTimestamp; + this.isNewTopic = isNewTopic; + } + } + + /** + * Continuous validation check for silent data loss during normal operation. + * This should be called periodically during normal consumption. + * + * @param partition The topic partition to validate + * @param currentOffset The current consumer position + * @param beginningOffset Current beginning offset from broker + * @param endOffset Current end offset from broker + */ + public void validateContinuousDataIntegrity(TopicPartition partition, long currentOffset, + long beginningOffset, long endOffset) { + long currentTime = System.currentTimeMillis(); + Long lastValidated = lastValidatedTimestamp.get(partition); + + // Only validate periodically to avoid performance impact + if (lastValidated != null && (currentTime - lastValidated) < VALIDATION_INTERVAL_MS) { + return; + } + + lastValidatedTimestamp.put(partition, currentTime); + + // Check for silent data loss due to retention + PartitionMetadata metadata = partitionMetadata.get(partition); + if (metadata != null) { + // Detect if beginning offset jumped significantly (retention purged data) + if (beginningOffset > metadata.firstKnownOffset + MAX_OFFSET_GAP_THRESHOLD) { + log.warn("Potential silent data loss detected in partition {} due to retention. " + + "Beginning offset jumped from {} to {}, gap: {}", + partition, metadata.firstKnownOffset, beginningOffset, + beginningOffset - metadata.firstKnownOffset); + + // Update metadata with new baseline + partitionMetadata.put(partition, new PartitionMetadata( + beginningOffset, endOffset, currentTime, false)); + } + + // Detect if end offset went backwards (topic truncation/recreation) + if (endOffset < metadata.lastKnownEndOffset) { + handleSuspectedTopicRecreation(partition, metadata, beginningOffset, endOffset, currentTime); + } + } else { + // First time seeing this partition - establish baseline + partitionMetadata.put(partition, new PartitionMetadata( + beginningOffset, endOffset, currentTime, true)); + } + + updateLastSeenOffset(partition, currentOffset); + } + + /** + * Handle suspected topic recreation with false positive mitigation + */ + private void handleSuspectedTopicRecreation(TopicPartition partition, PartitionMetadata metadata, + long beginningOffset, long endOffset, long currentTime) { + // Apply grace period to avoid false positives during normal operations + if (currentTime - metadata.creationTimestamp < TOPIC_RECREATION_GRACE_PERIOD_MS) { + log.debug("Ignoring suspected topic recreation for {} within grace period", partition); + return; + } + + // Detect topic recreation patterns + boolean likelyRecreation = (beginningOffset == 0 && endOffset < metadata.lastKnownEndOffset) || + (beginningOffset > metadata.firstKnownOffset); + + if (likelyRecreation) { + log.warn("Topic recreation detected for partition {}. " + + "Previous range: [{}, {}], Current range: [{}, {}]", + partition, metadata.firstKnownOffset, metadata.lastKnownEndOffset, + beginningOffset, endOffset); + + // Update metadata for new topic generation + partitionMetadata.put(partition, new PartitionMetadata( + beginningOffset, endOffset, currentTime, true)); + + // Clear old tracking data + lastSeenOffsets.remove(partition); + topicGenerations.put(partition, currentTime); + } + } + + /** + * Enhanced data loss detection with edge case handling. + * + * @param partition The topic partition being reset + * @param oldOffset The previous offset (if any) + * @param newOffset The new offset being set + * @param beginningOffset The earliest available offset for the partition + * @param endOffset The latest available offset for the partition + * @throws DataLossException if data loss is detected + */ + public void checkForDataLoss(TopicPartition partition, Long oldOffset, long newOffset, + long beginningOffset, long endOffset) { + log.debug("Checking for data loss in partition {}: oldOffset={}, newOffset={}, beginningOffset={}, endOffset={}", + partition, oldOffset, newOffset, beginningOffset, endOffset); + + long currentTime = System.currentTimeMillis(); + + // Check for startup edge cases - be more lenient on first connection + boolean isStartupScenario = !lastSeenOffsets.containsKey(partition); + + // Check for offset gap if we have a previous offset + if (oldOffset != null) { + checkOffsetGapWithEdgeCases(partition, oldOffset, newOffset, beginningOffset, endOffset, isStartupScenario); + } + + // Check for topic recreation with false positive mitigation + checkTopicRecreationWithValidation(partition, beginningOffset, currentTime); + + // Validate that the new offset is within reasonable bounds + validateNewOffsetBounds(partition, newOffset, beginningOffset, endOffset); + + // Update tracking information + lastSeenOffsets.put(partition, newOffset); + topicGenerations.put(partition, beginningOffset); + + // Update partition metadata for continuous monitoring + partitionMetadata.put(partition, new PartitionMetadata( + beginningOffset, endOffset, currentTime, isStartupScenario)); + } + + /** + * Enhanced offset gap detection with edge case handling. + */ + private void checkOffsetGapWithEdgeCases(TopicPartition partition, long oldOffset, long newOffset, + long beginningOffset, long endOffset, boolean isStartupScenario) { + // During startup, be more lenient with offset validation + if (isStartupScenario) { + log.debug("Startup scenario for partition {}, applying lenient validation", partition); + + // Only fail on extreme cases during startup + if (oldOffset < beginningOffset - MAX_OFFSET_GAP_THRESHOLD) { + log.warn("Large offset gap detected during startup for partition {}. " + + "Previous offset {} is significantly before beginning offset {}", + partition, oldOffset, beginningOffset); + // Don't throw exception during startup - just warn + return; + } + } + + // If old offset is before beginning offset, check if it's due to retention or recreation + if (oldOffset < beginningOffset) { + long gap = beginningOffset - oldOffset; + + // For small gaps, might be normal retention + if (gap <= MAX_OFFSET_GAP_THRESHOLD) { + log.warn("Small offset gap detected for partition {} (gap: {}). " + + "Likely due to normal retention policy", partition, gap); + return; + } + + String details = String.format("Previous offset %d is before beginning offset %d (gap: %d)", + oldOffset, beginningOffset, gap); + log.error("Data loss detected due to large offset gap for partition {}: {}", partition, details); + throw new DataLossException( + "Data loss detected: large offset gap indicates missing data", + Set.of(partition), + DataLossException.DataLossType.OFFSET_GAP, + details + ); + } + + // If old offset is beyond end offset, something is wrong + if (oldOffset > endOffset) { + String details = String.format("Previous offset %d is beyond end offset %d", oldOffset, endOffset); + log.error("Data loss detected due to offset beyond end for partition {}: {}", partition, details); + throw new DataLossException( + "Data loss detected: previous offset beyond end offset", + Set.of(partition), + DataLossException.DataLossType.OUT_OF_RANGE, + details + ); + } + + // Check for significant gaps in normal operation + if (newOffset > oldOffset + MAX_OFFSET_GAP_THRESHOLD) { + String details = String.format("Large offset jump from %d to %d (gap: %d)", + oldOffset, newOffset, newOffset - oldOffset); + log.warn("Large offset jump detected for partition {}: {}", partition, details); + // This might be normal during catch-up, so just warn + } + } + + /** + * Enhanced topic recreation detection with false positive mitigation. + */ + private void checkTopicRecreationWithValidation(TopicPartition partition, long beginningOffset, long currentTime) { + Long previousGeneration = topicGenerations.get(partition); + + if (previousGeneration != null) { + // If beginning offset reset to 0 or jumped significantly, might be recreation + boolean suspectedRecreation = (beginningOffset == 0 && previousGeneration > 0) || + (beginningOffset > previousGeneration + MAX_OFFSET_GAP_THRESHOLD); + + if (suspectedRecreation) { + // Apply grace period to reduce false positives + Long lastValidated = lastValidatedTimestamp.get(partition); + if (lastValidated != null && (currentTime - lastValidated) < TOPIC_RECREATION_GRACE_PERIOD_MS) { + log.debug("Suspected topic recreation for {} within grace period, ignoring", partition); + return; + } + + String details = String.format("Beginning offset changed from %d to %d", + previousGeneration, beginningOffset); + log.error("Topic recreation detected for partition {}: {}", partition, details); + throw new DataLossException( + "Data loss detected: topic appears to have been recreated", + Set.of(partition), + DataLossException.DataLossType.TOPIC_RECREATION, + details + ); + } + } + } + + /** + * Validate that the new offset is within reasonable bounds. + */ + private void validateNewOffsetBounds(TopicPartition partition, long newOffset, + long beginningOffset, long endOffset) { + if (newOffset < beginningOffset) { + String details = String.format("New offset %d is before beginning offset %d", + newOffset, beginningOffset); + log.error("Invalid new offset for partition {}: {}", partition, details); + throw new DataLossException( + "Data loss detected: new offset is out of range", + Set.of(partition), + DataLossException.DataLossType.OUT_OF_RANGE, + details + ); + } + + if (newOffset > endOffset) { + String details = String.format("New offset %d is beyond end offset %d", + newOffset, endOffset); + log.error("Invalid new offset for partition {}: {}", partition, details); + throw new DataLossException( + "Data loss detected: new offset beyond available data", + Set.of(partition), + DataLossException.DataLossType.OUT_OF_RANGE, + details + ); + } + } + + /** + * Update the last seen offset for a partition. + */ + public void updateLastSeenOffset(TopicPartition partition, long offset) { + lastSeenOffsets.put(partition, offset); + } + + /** + * Validates that the consumer's committed offset is still valid. + */ + public void validateCommittedOffset(TopicPartition partition, OffsetAndMetadata offsetAndMetadata, + long beginningOffset, long endOffset) { + if (offsetAndMetadata == null) { + return; // No committed offset to validate + } + + long committedOffset = offsetAndMetadata.offset(); + + if (committedOffset < beginningOffset || committedOffset > endOffset) { + String details = String.format("Committed offset %d is outside valid range [%d, %d]", + committedOffset, beginningOffset, endOffset); + log.error("Data loss detected due to invalid committed offset for partition {}: {}", partition, details); + throw new DataLossException( + "Data loss detected: committed offset is out of range", + Set.of(partition), + DataLossException.DataLossType.OUT_OF_RANGE, + details + ); + } + } + + /** + * Clears tracking information for a partition (e.g., when it's unassigned). + */ + public void clearPartition(TopicPartition partition) { + lastSeenOffsets.remove(partition); + topicGenerations.remove(partition); + log.debug("Cleared tracking information for partition {}", partition); + } + + /** + * Gets the last seen offset for a partition. + */ + public Long getLastSeenOffset(TopicPartition partition) { + return lastSeenOffsets.get(partition); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java index bb01510e906be..c1236b83e3a99 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java @@ -80,7 +80,8 @@ public OffsetFetcher(LogContext logContext, long retryBackoffMs, int requestTimeoutMs, IsolationLevel isolationLevel, - ApiVersions apiVersions) { + ApiVersions apiVersions, + boolean enableDataLossDetection) { this.log = logContext.logger(getClass()); this.time = time; this.client = client; @@ -91,7 +92,7 @@ public OffsetFetcher(LogContext logContext, this.apiVersions = apiVersions; this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext); this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptions, - time, retryBackoffMs, apiVersions); + time, retryBackoffMs, apiVersions, enableDataLossDetection); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java index 0b7813eaad6b7..c56127bb23a91 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java @@ -63,6 +63,8 @@ class OffsetFetcherUtils { private final long retryBackoffMs; private final ApiVersions apiVersions; private final Logger log; + private final DataLossDetector dataLossDetector; + private final boolean enableDataLossDetection; /** * Exception that occurred while validating positions, that will be propagated on the next @@ -84,13 +86,16 @@ class OffsetFetcherUtils { SubscriptionState subscriptionState, Time time, long retryBackoffMs, - ApiVersions apiVersions) { + ApiVersions apiVersions, + boolean enableDataLossDetection) { this.log = logContext.logger(getClass()); this.metadata = metadata; this.subscriptionState = subscriptionState; this.time = time; this.retryBackoffMs = retryBackoffMs; this.apiVersions = apiVersions; + this.enableDataLossDetection = enableDataLossDetection; + this.dataLossDetector = enableDataLossDetection ? new DataLossDetector(logContext) : null; } /** @@ -385,6 +390,11 @@ private LogTruncationException buildLogTruncationException(List> regroupFetchPositionsByLeader( Map partitionMap) { return partitionMap.entrySet() diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index 4c8d10ad323ac..7aa168f2e2358 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -119,7 +119,8 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, final ApiVersions apiVersions, final NetworkClientDelegate networkClientDelegate, final CommitRequestManager commitRequestManager, - final LogContext logContext) { + final LogContext logContext, + final boolean enableDataLossDetection) { requireNonNull(subscriptionState); requireNonNull(metadata); requireNonNull(isolationLevel); @@ -140,7 +141,7 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, this.apiVersions = apiVersions; this.networkClientDelegate = networkClientDelegate; this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState, - time, retryBackoffMs, apiVersions); + time, retryBackoffMs, apiVersions, enableDataLossDetection); // Register the cluster metadata update callback. Note this only relies on the // requestsToRetry initialized above, and won't be invoked until all managers are // initialized and the network thread started. diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DataLossException.java b/clients/src/main/java/org/apache/kafka/common/errors/DataLossException.java new file mode 100644 index 0000000000000..1050251189a84 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/DataLossException.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Set; + +/** + * Exception thrown when data loss is detected by the consumer. + * This can occur due to offset gaps, topic deletion/recreation, or other scenarios + * where message continuity cannot be guaranteed. + */ +public class DataLossException extends ApiException { + + private static final long serialVersionUID = 1L; + + private final Set partitions; + private final DataLossType lossType; + private final String details; + + public enum DataLossType { + OFFSET_GAP("Offset gap detected"), + TOPIC_RECREATION("Topic deletion/recreation detected"), + OUT_OF_RANGE("Offset out of range"), + UNKNOWN("Unknown data loss scenario"); + + private final String description; + + DataLossType(String description) { + this.description = description; + } + + public String getDescription() { + return description; + } + } + + public DataLossException(String message) { + super(message); + this.partitions = Set.of(); + this.lossType = DataLossType.UNKNOWN; + this.details = ""; + } + + public DataLossException(String message, Set partitions, DataLossType lossType) { + super(message); + this.partitions = partitions; + this.lossType = lossType; + this.details = ""; + } + + public DataLossException(String message, Set partitions, DataLossType lossType, String details) { + super(message); + this.partitions = partitions; + this.lossType = lossType; + this.details = details; + } + + public DataLossException(String message, Throwable cause, Set partitions, DataLossType lossType) { + super(message, cause); + this.partitions = partitions; + this.lossType = lossType; + this.details = ""; + } + + /** + * @return The set of topic partitions affected by the data loss + */ + public Set partitions() { + return partitions; + } + + /** + * @return The type of data loss detected + */ + public DataLossType lossType() { + return lossType; + } + + /** + * @return Additional details about the data loss scenario + */ + public String details() { + return details; + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DataLossDetectorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DataLossDetectorTest.java new file mode 100644 index 0000000000000..44bac05b0e368 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DataLossDetectorTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.DataLossException; +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class DataLossDetectorTest { + + private DataLossDetector dataLossDetector; + private TopicPartition topicPartition; + + @BeforeEach + public void setUp() { + LogContext logContext = new LogContext(); + dataLossDetector = new DataLossDetector(logContext); + topicPartition = new TopicPartition("test-topic", 0); + } + + @Test + public void testNoDataLossWithConsecutiveOffsets() { + // Should not throw exception for consecutive offsets + assertDoesNotThrow(() -> { + dataLossDetector.checkForDataLoss(topicPartition, 10L, 11L, 0L, 20L); + }); + } + + @Test + public void testDataLossDetectionWithOffsetGap() { + // Should throw exception for offset gap > 1 + DataLossException exception = assertThrows(DataLossException.class, () -> { + dataLossDetector.checkForDataLoss(topicPartition, 10L, 15L, 0L, 20L); + }); + + assertEquals(DataLossException.DataLossType.OFFSET_GAP, exception.lossType()); + assertEquals(Set.of(topicPartition), exception.partitions()); + } + + @Test + public void testDataLossDetectionWithOffsetOutOfRange() { + // Should throw exception when old offset is before beginning offset + DataLossException exception = assertThrows(DataLossException.class, () -> { + dataLossDetector.checkForDataLoss(topicPartition, 5L, 10L, 8L, 20L); + }); + + assertEquals(DataLossException.DataLossType.OUT_OF_RANGE, exception.lossType()); + assertEquals(Set.of(topicPartition), exception.partitions()); + } + + @Test + public void testDataLossDetectionWithOffsetBeyondEnd() { + // Should throw exception when old offset is beyond end offset + DataLossException exception = assertThrows(DataLossException.class, () -> { + dataLossDetector.checkForDataLoss(topicPartition, 25L, 10L, 0L, 20L); + }); + + assertEquals(DataLossException.DataLossType.OUT_OF_RANGE, exception.lossType()); + assertEquals(Set.of(topicPartition), exception.partitions()); + } + + @Test + public void testTopicRecreationDetection() { + // First call to establish baseline + dataLossDetector.checkForDataLoss(topicPartition, null, 10L, 5L, 20L); + + // Second call with higher beginning offset (indicates topic recreation) + DataLossException exception = assertThrows(DataLossException.class, () -> { + dataLossDetector.checkForDataLoss(topicPartition, 10L, 15L, 10L, 25L); + }); + + assertEquals(DataLossException.DataLossType.TOPIC_RECREATION, exception.lossType()); + assertEquals(Set.of(topicPartition), exception.partitions()); + } + + @Test + public void testClearPartition() { + // Establish tracking for partition + dataLossDetector.checkForDataLoss(topicPartition, null, 10L, 0L, 20L); + assertEquals(Long.valueOf(10L), dataLossDetector.getLastSeenOffset(topicPartition)); + + // Clear partition tracking + dataLossDetector.clearPartition(topicPartition); + assertEquals(null, dataLossDetector.getLastSeenOffset(topicPartition)); + } + + @Test + public void testValidateCommittedOffsetSuccess() { + // Should not throw for valid committed offset + assertDoesNotThrow(() -> { + dataLossDetector.validateCommittedOffset(topicPartition, + new org.apache.kafka.clients.consumer.OffsetAndMetadata(10L), 5L, 20L); + }); + } + + @Test + public void testValidateCommittedOffsetOutOfRange() { + // Should throw for committed offset out of range + DataLossException exception = assertThrows(DataLossException.class, () -> { + dataLossDetector.validateCommittedOffset(topicPartition, + new org.apache.kafka.clients.consumer.OffsetAndMetadata(3L), 5L, 20L); + }); + + assertEquals(DataLossException.DataLossType.OUT_OF_RANGE, exception.lossType()); + assertEquals(Set.of(topicPartition), exception.partitions()); + } + + @Test + public void testValidateCommittedOffsetNull() { + // Should not throw for null committed offset + assertDoesNotThrow(() -> { + dataLossDetector.validateCommittedOffset(topicPartition, null, 5L, 20L); + }); + } +} \ No newline at end of file diff --git a/docs/data-loss-detection.md b/docs/data-loss-detection.md new file mode 100644 index 0000000000000..b4a1052102d61 --- /dev/null +++ b/docs/data-loss-detection.md @@ -0,0 +1,300 @@ +# Enhanced Kafka Consumer Data Loss Detection + +## Overview + +This enhancement adds configurable data loss detection to Kafka consumers to prevent silent data loss scenarios. The feature integrates with existing `auto.offset.reset` strategies to provide enhanced fault tolerance capabilities, including detection of silent data loss from retention policies, service disruptions from topic recreation, and sophisticated edge case handling. + +## Key Capabilities + +### 1. Silent Data Loss Detection +- **Continuous Monitoring**: Detects data loss between normal poll operations +- **Retention Policy Detection**: Identifies when retention purges data before replication completes +- **Configurable Validation Intervals**: Periodic checks during normal consumption + +### 2. Service Disruption Handling +- **Topic Recreation Detection**: Handles planned maintenance operations involving topic reset +- **Graceful Recovery**: Configurable behavior for "unable to find expected offset" scenarios +- **False Positive Mitigation**: Grace periods to avoid alerts during legitimate operations + +### 3. Edge Case Management +- **Startup Leniency**: More tolerant validation during consumer initialization +- **False Positive Reduction**: Sophisticated algorithms to distinguish real data loss from normal operations +- **Race Condition Handling**: Robust detection during broker failover scenarios + +## Configuration + +### Primary Configuration + +- **Property**: `enable.data.loss.detection` +- **Type**: boolean +- **Default**: `false` +- **Importance**: MEDIUM + +### Advanced Tuning Options + +- **`data.loss.detection.gap.threshold`** (default: 1000) + - Maximum allowed offset gap before considering it data loss + - Smaller values = stricter detection, more false positives + - Larger values = more lenient, may miss some data loss + +- **`data.loss.detection.validation.interval.ms`** (default: 30000) + - Interval between continuous validation checks during normal consumption + - Helps detect silent data loss from retention policies + +- **`data.loss.detection.grace.period.ms`** (default: 5000) + - Grace period to avoid false positives during topic recreation + - Events within this period are logged as warnings instead of exceptions + +## Configuration + +### New Consumer Configuration + +- **Property**: `enable.data.loss.detection` +- **Type**: boolean +- **Default**: `false` +- **Importance**: MEDIUM + +When enabled, the consumer will detect scenarios that could lead to data loss and react according to the configured `auto.offset.reset` strategy. + +### Example Configuration + +```properties +# Enable enhanced data loss detection +enable.data.loss.detection=true + +# Configure behavior when data loss is detected +auto.offset.reset=none # Fail fast on any data loss scenario + +# Fine-tune detection sensitivity (optional) +data.loss.detection.gap.threshold=500 # Stricter gap detection +data.loss.detection.validation.interval.ms=15000 # More frequent validation +data.loss.detection.grace.period.ms=10000 # Longer grace period for maintenance +``` + +## **Design Decision Analysis** + +### **Why create new configurations instead of using existing Kafka mechanisms?** + +**ANSWER**: Kafka already provides: + +```properties +# Existing mechanisms that handle our requirements: +auto.offset.reset=none # Already fails fast +check.crcs=true # Already validates data integrity +isolation.level=read_committed # Already provides consistency +``` + +**Better approach**: Enhance existing error messages and logging instead of adding configuration proliferation. + +### **Why create DataLossException instead of using existing exceptions?** + +**ANSWER**: Kafka already has: + +- `NoOffsetForPartitionException` - For missing offsets (auto.offset.reset=none) +- `OffsetOutOfRangeException` - For out-of-range scenarios +- `TopicAuthorizationException` - For authorization failures + +**Better approach**: Enhance existing exception messages with more detailed context. + +## **Improved Implementation Approach** + +Instead of new classes, we should have: + +1. **Enhanced existing exception messages** with detailed context +2. **Improved logging** with structured information +3. **Leveraged auto.offset.reset=none** for fail-fast behavior +4. **Extended OffsetOutOfRangeException** with gap detection details + +```java +// Better approach - enhance existing exceptions +catch (NoOffsetForPartitionException e) { + // Enhanced with data loss context + log.error("Potential data loss detected: {}", enhancedMessage); + throw e; // Re-throw existing exception +} +``` + +This follows the **Principle of Least Surprise** and avoids API proliferation. + +### Silent Data Loss from Retention Policies + +**Problem**: Kafka retention policies may purge data from source topics before replication completes, creating undetectable gaps in the replicated data stream. + +**Solution**: +- **Continuous monitoring** during normal consumption via `validateContinuousDataIntegrity()` +- **Periodic validation** every 30 seconds (configurable) to detect retention-based data loss +- **Baseline tracking** of beginning offsets to detect when data is purged +- **Configurable gap thresholds** to distinguish normal retention from significant data loss + +```java +// Automatic detection during normal consumption +consumer.poll(Duration.ofMillis(100)); // Triggers periodic validation internally +``` + +### Service Disruption from Topic Recreation ✅ + +**Problem**: Planned maintenance operations involving topic deletion and recreation can cause replication services to be unable to find expected offsets and stop replication. + +**Solution**: +- **Topic recreation detection** via beginning offset monitoring +- **Graceful recovery** with configurable grace periods during maintenance windows +- **Service continuity** - depending on strategy, either fail-fast or attempt recovery +- **Enhanced logging** to distinguish planned maintenance from actual data loss + +```properties +# For replication services - be more tolerant during maintenance +data.loss.detection.grace.period.ms=30000 # 30 second grace period +auto.offset.reset=earliest # Recover from earliest available +``` + +### Edge Case Scenarios ✅ + +**Problem**: False positives during topic reset detection and potential missed truncations at startup. + +**Solutions**: + +1. **False Positive Mitigation**: + - Grace periods after consumer initialization + - Lenient validation during startup scenarios + - Configurable thresholds to tune sensitivity + - Time-based validation to avoid broker failover false positives + +2. **Startup Edge Cases**: + - More tolerant offset validation during first connection + - Baseline establishment before strict monitoring + - Differentiation between startup vs. runtime scenarios + +3. **Missed Truncation Detection**: + - Continuous monitoring of beginning/end offset ranges + - Detection of offset range shrinkage indicating truncation + - Validation of consumer position against current offset ranges + +```java +// Example of enhanced edge case handling +if (isStartupScenario) { + // Apply lenient validation - only fail on extreme cases + if (offsetGap > MAX_THRESHOLD * 10) { + log.warn("Large gap during startup - investigation recommended"); + return; // Don't fail during startup + } +} +``` + +## Data Loss Detection Scenarios + +The enhanced consumer detects the following data loss scenarios: + +### 1. Offset Gaps +- **Description**: When the consumer's last committed offset is significantly ahead of available data +- **Detection**: Compares committed offset with broker's beginning and end offsets +- **Impact**: Indicates potential message loss due to log truncation or deletion + +### 2. Topic Recreation +- **Description**: When a topic is deleted and recreated with the same name +- **Detection**: Monitors topic metadata changes and offset ranges +- **Impact**: All previous messages are lost when topic is recreated + +### 3. Out-of-Range Offsets +- **Description**: When committed offset is outside the available offset range +- **Detection**: Validates committed offset against broker's offset boundaries +- **Impact**: Indicates data has been aged out or truncated + +## Behavior by Strategy + +### NONE Strategy (`auto.offset.reset=none`) +When data loss is detected: +- **Action**: Throws `DataLossException` immediately +- **Behavior**: Fail-fast with detailed error information +- **Use Case**: Applications requiring strict data consistency guarantees + +### EARLIEST Strategy (`auto.offset.reset=earliest`) +When data loss is detected: +- **Action**: Logs detailed warning and resets to earliest available offset +- **Behavior**: Attempts recovery but may result in duplicate processing +- **Use Case**: Applications that can handle duplicates but need to continue processing + +### LATEST Strategy (`auto.offset.reset=latest`) +When data loss is detected: +- **Action**: Logs detailed warning and resets to latest available offset +- **Behavior**: Skips missing data and continues from current position +- **Use Case**: Applications where recent data is more important than completeness +- **Use Case**: Applications where recent data is more important than completeness + +## Exception Handling + +### DataLossException + +New exception class that provides detailed information about detected data loss: + +```java +public class DataLossException extends Exception { + public enum DataLossType { + OFFSET_GAP, // Gap between committed and available offsets + TOPIC_RECREATION, // Topic was deleted and recreated + OUT_OF_RANGE, // Offset outside available range + UNKNOWN // Unclassified data loss scenario + } + + // Exception provides partition, offset details, and loss type +} +``` + +## Integration Points + +### Consumer Creation +Data loss detection is configured per consumer instance: + +```java +Properties props = new Properties(); +props.put(ConsumerConfig.ENABLE_DATA_LOSS_DETECTION_CONFIG, true); +props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); + +KafkaConsumer consumer = new KafkaConsumer<>(props); +``` + +## Best Practices + +### Production Deployments +1. **Enable for Critical Applications**: Use with `auto.offset.reset=none` for applications requiring data consistency +2. **Monitor Logs**: Set up alerting on DataLossException occurrences +3. **Test Recovery Scenarios**: Validate application behavior under different data loss conditions + +### Development and Testing +1. **Simulate Data Loss**: Test with controlled topic deletion/recreation scenarios +2. **Validate Exception Handling**: Ensure proper error handling for DataLossException +3. **Performance Testing**: Verify minimal impact on consumer performance + +## Guide + +### Configuration Changes +```properties +# Before: Basic auto.offset.reset +auto.offset.reset=latest + +# After: Enhanced with data loss detection +auto.offset.reset=none +enable.data.loss.detection=true +``` + +## Performance Considerations + +- **Minimal Overhead**: Detection only occurs during offset reset scenarios +- **Network Impact**: No additional broker requests required +- **Memory Usage**: Negligible memory overhead for tracking partition state + +## Testing + +The implementation includes comprehensive unit tests covering: +- Offset gap detection scenarios +- Topic recreation detection +- Out-of-range offset validation +- Error handling and recovery +- Edge cases and boundary conditions + +## Future Enhancements + +Potential future improvements: +- Real-time data loss detection during normal consumption +- Configurable tolerance levels for offset gaps +- Integration with monitoring systems +- Automatic recovery strategies for different loss types \ No newline at end of file