diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ee62dc9561b41..333cd883be48d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -180,6 +180,39 @@ public class ConsumerConfig extends AbstractConfig {
"
Note that altering partition numbers while setting this config to latest may cause message delivery loss since " +
"producers could start to send messages to newly added partitions (i.e. no initial offsets exist yet) before consumers reset their offsets.";
+ /**
+ * enable.data.loss.detection
+ */
+ public static final String ENABLE_DATA_LOSS_DETECTION_CONFIG = "enable.data.loss.detection";
+ public static final String ENABLE_DATA_LOSS_DETECTION_DOC = "Enables data loss detection for the consumer. When enabled, the consumer will detect and handle scenarios " +
+ "that could lead to data loss, including offset gaps, topic recreation, and out-of-range offsets. The behavior when data loss is detected depends on the " +
+ "auto.offset.reset strategy: 'none' will throw a DataLossException, while 'earliest' and 'latest' will log warnings and attempt recovery.";
+ public static final boolean DEFAULT_ENABLE_DATA_LOSS_DETECTION = false;
+
+ /**
+ * data.loss.detection.gap.threshold
+ */
+ public static final String DATA_LOSS_DETECTION_GAP_THRESHOLD_CONFIG = "data.loss.detection.gap.threshold";
+ public static final String DATA_LOSS_DETECTION_GAP_THRESHOLD_DOC = "The maximum allowed offset gap before considering it a potential data loss scenario. " +
+ "Smaller values provide stricter detection but may cause false positives during normal retention. Larger values are more lenient but may miss actual data loss.";
+ public static final long DEFAULT_DATA_LOSS_DETECTION_GAP_THRESHOLD = 1000L;
+
+ /**
+ * data.loss.detection.validation.interval.ms
+ */
+ public static final String DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_CONFIG = "data.loss.detection.validation.interval.ms";
+ public static final String DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_DOC = "The interval in milliseconds between continuous data loss validation checks during normal consumption. " +
+ "This helps detect silent data loss due to retention policies between normal poll operations.";
+ public static final long DEFAULT_DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS = 30000L; // 30 seconds
+
+ /**
+ * data.loss.detection.grace.period.ms
+ */
+ public static final String DATA_LOSS_DETECTION_GRACE_PERIOD_MS_CONFIG = "data.loss.detection.grace.period.ms";
+ public static final String DATA_LOSS_DETECTION_GRACE_PERIOD_MS_DOC = "Grace period in milliseconds to avoid false positives during topic recreation or broker maintenance. " +
+ "Suspected data loss events within this period after initialization will be logged as warnings instead of throwing exceptions.";
+ public static final long DEFAULT_DATA_LOSS_DETECTION_GRACE_PERIOD_MS = 5000L; // 5 seconds
+
/**
* fetch.min.bytes
*/
@@ -546,6 +579,29 @@ public class ConsumerConfig extends AbstractConfig {
new AutoOffsetResetStrategy.Validator(),
Importance.MEDIUM,
AUTO_OFFSET_RESET_DOC)
+ .define(ENABLE_DATA_LOSS_DETECTION_CONFIG,
+ Type.BOOLEAN,
+ DEFAULT_ENABLE_DATA_LOSS_DETECTION,
+ Importance.MEDIUM,
+ ENABLE_DATA_LOSS_DETECTION_DOC)
+ .define(DATA_LOSS_DETECTION_GAP_THRESHOLD_CONFIG,
+ Type.LONG,
+ DEFAULT_DATA_LOSS_DETECTION_GAP_THRESHOLD,
+ atLeast(1),
+ Importance.LOW,
+ DATA_LOSS_DETECTION_GAP_THRESHOLD_DOC)
+ .define(DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_CONFIG,
+ Type.LONG,
+ DEFAULT_DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS,
+ atLeast(1000),
+ Importance.LOW,
+ DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_DOC)
+ .define(DATA_LOSS_DETECTION_GRACE_PERIOD_MS_CONFIG,
+ Type.LONG,
+ DEFAULT_DATA_LOSS_DETECTION_GRACE_PERIOD_MS,
+ atLeast(0),
+ Importance.LOW,
+ DATA_LOSS_DETECTION_GRACE_PERIOD_MS_DOC)
.define(CHECK_CRCS_CONFIG,
Type.BOOLEAN,
true,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DataLossDetector.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DataLossDetector.java
new file mode 100644
index 0000000000000..d8cf932e4db67
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DataLossDetector.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DataLossException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Utility class for detecting data loss scenarios in Kafka consumers.
+ * This class implements various data loss detection mechanisms including:
+ * - Offset gap detection
+ * - Topic recreation detection
+ * - Out-of-range offset detection
+ * - Silent data loss from retention policies
+ * - False positive mitigation
+ */
+public class DataLossDetector {
+
+ private final Logger log;
+ private final Map lastSeenOffsets = new HashMap<>();
+ private final Map topicGenerations = new HashMap<>();
+ private final Map lastValidatedTimestamp = new HashMap<>();
+ private final Map partitionMetadata = new HashMap<>();
+
+ // Configuration for detection sensitivity
+ private static final long VALIDATION_INTERVAL_MS = 30000; // 30 seconds
+ private static final long MAX_OFFSET_GAP_THRESHOLD = 1000; // Configurable threshold
+ private static final long TOPIC_RECREATION_GRACE_PERIOD_MS = 5000; // 5 seconds grace period
+
+ public DataLossDetector(LogContext logContext) {
+ this.log = logContext.logger(DataLossDetector.class);
+ }
+
+ /**
+ * Enhanced metadata tracking for partitions
+ */
+ private static class PartitionMetadata {
+ final long firstKnownOffset;
+ final long lastKnownEndOffset;
+ final long creationTimestamp;
+ final boolean isNewTopic;
+
+ PartitionMetadata(long firstKnownOffset, long lastKnownEndOffset, long creationTimestamp, boolean isNewTopic) {
+ this.firstKnownOffset = firstKnownOffset;
+ this.lastKnownEndOffset = lastKnownEndOffset;
+ this.creationTimestamp = creationTimestamp;
+ this.isNewTopic = isNewTopic;
+ }
+ }
+
+ /**
+ * Continuous validation check for silent data loss during normal operation.
+ * This should be called periodically during normal consumption.
+ *
+ * @param partition The topic partition to validate
+ * @param currentOffset The current consumer position
+ * @param beginningOffset Current beginning offset from broker
+ * @param endOffset Current end offset from broker
+ */
+ public void validateContinuousDataIntegrity(TopicPartition partition, long currentOffset,
+ long beginningOffset, long endOffset) {
+ long currentTime = System.currentTimeMillis();
+ Long lastValidated = lastValidatedTimestamp.get(partition);
+
+ // Only validate periodically to avoid performance impact
+ if (lastValidated != null && (currentTime - lastValidated) < VALIDATION_INTERVAL_MS) {
+ return;
+ }
+
+ lastValidatedTimestamp.put(partition, currentTime);
+
+ // Check for silent data loss due to retention
+ PartitionMetadata metadata = partitionMetadata.get(partition);
+ if (metadata != null) {
+ // Detect if beginning offset jumped significantly (retention purged data)
+ if (beginningOffset > metadata.firstKnownOffset + MAX_OFFSET_GAP_THRESHOLD) {
+ log.warn("Potential silent data loss detected in partition {} due to retention. " +
+ "Beginning offset jumped from {} to {}, gap: {}",
+ partition, metadata.firstKnownOffset, beginningOffset,
+ beginningOffset - metadata.firstKnownOffset);
+
+ // Update metadata with new baseline
+ partitionMetadata.put(partition, new PartitionMetadata(
+ beginningOffset, endOffset, currentTime, false));
+ }
+
+ // Detect if end offset went backwards (topic truncation/recreation)
+ if (endOffset < metadata.lastKnownEndOffset) {
+ handleSuspectedTopicRecreation(partition, metadata, beginningOffset, endOffset, currentTime);
+ }
+ } else {
+ // First time seeing this partition - establish baseline
+ partitionMetadata.put(partition, new PartitionMetadata(
+ beginningOffset, endOffset, currentTime, true));
+ }
+
+ updateLastSeenOffset(partition, currentOffset);
+ }
+
+ /**
+ * Handle suspected topic recreation with false positive mitigation
+ */
+ private void handleSuspectedTopicRecreation(TopicPartition partition, PartitionMetadata metadata,
+ long beginningOffset, long endOffset, long currentTime) {
+ // Apply grace period to avoid false positives during normal operations
+ if (currentTime - metadata.creationTimestamp < TOPIC_RECREATION_GRACE_PERIOD_MS) {
+ log.debug("Ignoring suspected topic recreation for {} within grace period", partition);
+ return;
+ }
+
+ // Detect topic recreation patterns
+ boolean likelyRecreation = (beginningOffset == 0 && endOffset < metadata.lastKnownEndOffset) ||
+ (beginningOffset > metadata.firstKnownOffset);
+
+ if (likelyRecreation) {
+ log.warn("Topic recreation detected for partition {}. " +
+ "Previous range: [{}, {}], Current range: [{}, {}]",
+ partition, metadata.firstKnownOffset, metadata.lastKnownEndOffset,
+ beginningOffset, endOffset);
+
+ // Update metadata for new topic generation
+ partitionMetadata.put(partition, new PartitionMetadata(
+ beginningOffset, endOffset, currentTime, true));
+
+ // Clear old tracking data
+ lastSeenOffsets.remove(partition);
+ topicGenerations.put(partition, currentTime);
+ }
+ }
+
+ /**
+ * Enhanced data loss detection with edge case handling.
+ *
+ * @param partition The topic partition being reset
+ * @param oldOffset The previous offset (if any)
+ * @param newOffset The new offset being set
+ * @param beginningOffset The earliest available offset for the partition
+ * @param endOffset The latest available offset for the partition
+ * @throws DataLossException if data loss is detected
+ */
+ public void checkForDataLoss(TopicPartition partition, Long oldOffset, long newOffset,
+ long beginningOffset, long endOffset) {
+ log.debug("Checking for data loss in partition {}: oldOffset={}, newOffset={}, beginningOffset={}, endOffset={}",
+ partition, oldOffset, newOffset, beginningOffset, endOffset);
+
+ long currentTime = System.currentTimeMillis();
+
+ // Check for startup edge cases - be more lenient on first connection
+ boolean isStartupScenario = !lastSeenOffsets.containsKey(partition);
+
+ // Check for offset gap if we have a previous offset
+ if (oldOffset != null) {
+ checkOffsetGapWithEdgeCases(partition, oldOffset, newOffset, beginningOffset, endOffset, isStartupScenario);
+ }
+
+ // Check for topic recreation with false positive mitigation
+ checkTopicRecreationWithValidation(partition, beginningOffset, currentTime);
+
+ // Validate that the new offset is within reasonable bounds
+ validateNewOffsetBounds(partition, newOffset, beginningOffset, endOffset);
+
+ // Update tracking information
+ lastSeenOffsets.put(partition, newOffset);
+ topicGenerations.put(partition, beginningOffset);
+
+ // Update partition metadata for continuous monitoring
+ partitionMetadata.put(partition, new PartitionMetadata(
+ beginningOffset, endOffset, currentTime, isStartupScenario));
+ }
+
+ /**
+ * Enhanced offset gap detection with edge case handling.
+ */
+ private void checkOffsetGapWithEdgeCases(TopicPartition partition, long oldOffset, long newOffset,
+ long beginningOffset, long endOffset, boolean isStartupScenario) {
+ // During startup, be more lenient with offset validation
+ if (isStartupScenario) {
+ log.debug("Startup scenario for partition {}, applying lenient validation", partition);
+
+ // Only fail on extreme cases during startup
+ if (oldOffset < beginningOffset - MAX_OFFSET_GAP_THRESHOLD) {
+ log.warn("Large offset gap detected during startup for partition {}. " +
+ "Previous offset {} is significantly before beginning offset {}",
+ partition, oldOffset, beginningOffset);
+ // Don't throw exception during startup - just warn
+ return;
+ }
+ }
+
+ // If old offset is before beginning offset, check if it's due to retention or recreation
+ if (oldOffset < beginningOffset) {
+ long gap = beginningOffset - oldOffset;
+
+ // For small gaps, might be normal retention
+ if (gap <= MAX_OFFSET_GAP_THRESHOLD) {
+ log.warn("Small offset gap detected for partition {} (gap: {}). " +
+ "Likely due to normal retention policy", partition, gap);
+ return;
+ }
+
+ String details = String.format("Previous offset %d is before beginning offset %d (gap: %d)",
+ oldOffset, beginningOffset, gap);
+ log.error("Data loss detected due to large offset gap for partition {}: {}", partition, details);
+ throw new DataLossException(
+ "Data loss detected: large offset gap indicates missing data",
+ Set.of(partition),
+ DataLossException.DataLossType.OFFSET_GAP,
+ details
+ );
+ }
+
+ // If old offset is beyond end offset, something is wrong
+ if (oldOffset > endOffset) {
+ String details = String.format("Previous offset %d is beyond end offset %d", oldOffset, endOffset);
+ log.error("Data loss detected due to offset beyond end for partition {}: {}", partition, details);
+ throw new DataLossException(
+ "Data loss detected: previous offset beyond end offset",
+ Set.of(partition),
+ DataLossException.DataLossType.OUT_OF_RANGE,
+ details
+ );
+ }
+
+ // Check for significant gaps in normal operation
+ if (newOffset > oldOffset + MAX_OFFSET_GAP_THRESHOLD) {
+ String details = String.format("Large offset jump from %d to %d (gap: %d)",
+ oldOffset, newOffset, newOffset - oldOffset);
+ log.warn("Large offset jump detected for partition {}: {}", partition, details);
+ // This might be normal during catch-up, so just warn
+ }
+ }
+
+ /**
+ * Enhanced topic recreation detection with false positive mitigation.
+ */
+ private void checkTopicRecreationWithValidation(TopicPartition partition, long beginningOffset, long currentTime) {
+ Long previousGeneration = topicGenerations.get(partition);
+
+ if (previousGeneration != null) {
+ // If beginning offset reset to 0 or jumped significantly, might be recreation
+ boolean suspectedRecreation = (beginningOffset == 0 && previousGeneration > 0) ||
+ (beginningOffset > previousGeneration + MAX_OFFSET_GAP_THRESHOLD);
+
+ if (suspectedRecreation) {
+ // Apply grace period to reduce false positives
+ Long lastValidated = lastValidatedTimestamp.get(partition);
+ if (lastValidated != null && (currentTime - lastValidated) < TOPIC_RECREATION_GRACE_PERIOD_MS) {
+ log.debug("Suspected topic recreation for {} within grace period, ignoring", partition);
+ return;
+ }
+
+ String details = String.format("Beginning offset changed from %d to %d",
+ previousGeneration, beginningOffset);
+ log.error("Topic recreation detected for partition {}: {}", partition, details);
+ throw new DataLossException(
+ "Data loss detected: topic appears to have been recreated",
+ Set.of(partition),
+ DataLossException.DataLossType.TOPIC_RECREATION,
+ details
+ );
+ }
+ }
+ }
+
+ /**
+ * Validate that the new offset is within reasonable bounds.
+ */
+ private void validateNewOffsetBounds(TopicPartition partition, long newOffset,
+ long beginningOffset, long endOffset) {
+ if (newOffset < beginningOffset) {
+ String details = String.format("New offset %d is before beginning offset %d",
+ newOffset, beginningOffset);
+ log.error("Invalid new offset for partition {}: {}", partition, details);
+ throw new DataLossException(
+ "Data loss detected: new offset is out of range",
+ Set.of(partition),
+ DataLossException.DataLossType.OUT_OF_RANGE,
+ details
+ );
+ }
+
+ if (newOffset > endOffset) {
+ String details = String.format("New offset %d is beyond end offset %d",
+ newOffset, endOffset);
+ log.error("Invalid new offset for partition {}: {}", partition, details);
+ throw new DataLossException(
+ "Data loss detected: new offset beyond available data",
+ Set.of(partition),
+ DataLossException.DataLossType.OUT_OF_RANGE,
+ details
+ );
+ }
+ }
+
+ /**
+ * Update the last seen offset for a partition.
+ */
+ public void updateLastSeenOffset(TopicPartition partition, long offset) {
+ lastSeenOffsets.put(partition, offset);
+ }
+
+ /**
+ * Validates that the consumer's committed offset is still valid.
+ */
+ public void validateCommittedOffset(TopicPartition partition, OffsetAndMetadata offsetAndMetadata,
+ long beginningOffset, long endOffset) {
+ if (offsetAndMetadata == null) {
+ return; // No committed offset to validate
+ }
+
+ long committedOffset = offsetAndMetadata.offset();
+
+ if (committedOffset < beginningOffset || committedOffset > endOffset) {
+ String details = String.format("Committed offset %d is outside valid range [%d, %d]",
+ committedOffset, beginningOffset, endOffset);
+ log.error("Data loss detected due to invalid committed offset for partition {}: {}", partition, details);
+ throw new DataLossException(
+ "Data loss detected: committed offset is out of range",
+ Set.of(partition),
+ DataLossException.DataLossType.OUT_OF_RANGE,
+ details
+ );
+ }
+ }
+
+ /**
+ * Clears tracking information for a partition (e.g., when it's unassigned).
+ */
+ public void clearPartition(TopicPartition partition) {
+ lastSeenOffsets.remove(partition);
+ topicGenerations.remove(partition);
+ log.debug("Cleared tracking information for partition {}", partition);
+ }
+
+ /**
+ * Gets the last seen offset for a partition.
+ */
+ public Long getLastSeenOffset(TopicPartition partition) {
+ return lastSeenOffsets.get(partition);
+ }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index bb01510e906be..c1236b83e3a99 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -80,7 +80,8 @@ public OffsetFetcher(LogContext logContext,
long retryBackoffMs,
int requestTimeoutMs,
IsolationLevel isolationLevel,
- ApiVersions apiVersions) {
+ ApiVersions apiVersions,
+ boolean enableDataLossDetection) {
this.log = logContext.logger(getClass());
this.time = time;
this.client = client;
@@ -91,7 +92,7 @@ public OffsetFetcher(LogContext logContext,
this.apiVersions = apiVersions;
this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptions,
- time, retryBackoffMs, apiVersions);
+ time, retryBackoffMs, apiVersions, enableDataLossDetection);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index 0b7813eaad6b7..c56127bb23a91 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -63,6 +63,8 @@ class OffsetFetcherUtils {
private final long retryBackoffMs;
private final ApiVersions apiVersions;
private final Logger log;
+ private final DataLossDetector dataLossDetector;
+ private final boolean enableDataLossDetection;
/**
* Exception that occurred while validating positions, that will be propagated on the next
@@ -84,13 +86,16 @@ class OffsetFetcherUtils {
SubscriptionState subscriptionState,
Time time,
long retryBackoffMs,
- ApiVersions apiVersions) {
+ ApiVersions apiVersions,
+ boolean enableDataLossDetection) {
this.log = logContext.logger(getClass());
this.metadata = metadata;
this.subscriptionState = subscriptionState;
this.time = time;
this.retryBackoffMs = retryBackoffMs;
this.apiVersions = apiVersions;
+ this.enableDataLossDetection = enableDataLossDetection;
+ this.dataLossDetector = enableDataLossDetection ? new DataLossDetector(logContext) : null;
}
/**
@@ -385,6 +390,11 @@ private LogTruncationException buildLogTruncationException(List> regroupFetchPositionsByLeader(
Map partitionMap) {
return partitionMap.entrySet()
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 4c8d10ad323ac..7aa168f2e2358 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -119,7 +119,8 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState,
final ApiVersions apiVersions,
final NetworkClientDelegate networkClientDelegate,
final CommitRequestManager commitRequestManager,
- final LogContext logContext) {
+ final LogContext logContext,
+ final boolean enableDataLossDetection) {
requireNonNull(subscriptionState);
requireNonNull(metadata);
requireNonNull(isolationLevel);
@@ -140,7 +141,7 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState,
this.apiVersions = apiVersions;
this.networkClientDelegate = networkClientDelegate;
this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState,
- time, retryBackoffMs, apiVersions);
+ time, retryBackoffMs, apiVersions, enableDataLossDetection);
// Register the cluster metadata update callback. Note this only relies on the
// requestsToRetry initialized above, and won't be invoked until all managers are
// initialized and the network thread started.
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DataLossException.java b/clients/src/main/java/org/apache/kafka/common/errors/DataLossException.java
new file mode 100644
index 0000000000000..1050251189a84
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DataLossException.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Set;
+
+/**
+ * Exception thrown when data loss is detected by the consumer.
+ * This can occur due to offset gaps, topic deletion/recreation, or other scenarios
+ * where message continuity cannot be guaranteed.
+ */
+public class DataLossException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Set partitions;
+ private final DataLossType lossType;
+ private final String details;
+
+ public enum DataLossType {
+ OFFSET_GAP("Offset gap detected"),
+ TOPIC_RECREATION("Topic deletion/recreation detected"),
+ OUT_OF_RANGE("Offset out of range"),
+ UNKNOWN("Unknown data loss scenario");
+
+ private final String description;
+
+ DataLossType(String description) {
+ this.description = description;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+ }
+
+ public DataLossException(String message) {
+ super(message);
+ this.partitions = Set.of();
+ this.lossType = DataLossType.UNKNOWN;
+ this.details = "";
+ }
+
+ public DataLossException(String message, Set partitions, DataLossType lossType) {
+ super(message);
+ this.partitions = partitions;
+ this.lossType = lossType;
+ this.details = "";
+ }
+
+ public DataLossException(String message, Set partitions, DataLossType lossType, String details) {
+ super(message);
+ this.partitions = partitions;
+ this.lossType = lossType;
+ this.details = details;
+ }
+
+ public DataLossException(String message, Throwable cause, Set partitions, DataLossType lossType) {
+ super(message, cause);
+ this.partitions = partitions;
+ this.lossType = lossType;
+ this.details = "";
+ }
+
+ /**
+ * @return The set of topic partitions affected by the data loss
+ */
+ public Set partitions() {
+ return partitions;
+ }
+
+ /**
+ * @return The type of data loss detected
+ */
+ public DataLossType lossType() {
+ return lossType;
+ }
+
+ /**
+ * @return Additional details about the data loss scenario
+ */
+ public String details() {
+ return details;
+ }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DataLossDetectorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DataLossDetectorTest.java
new file mode 100644
index 0000000000000..44bac05b0e368
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DataLossDetectorTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DataLossException;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class DataLossDetectorTest {
+
+ private DataLossDetector dataLossDetector;
+ private TopicPartition topicPartition;
+
+ @BeforeEach
+ public void setUp() {
+ LogContext logContext = new LogContext();
+ dataLossDetector = new DataLossDetector(logContext);
+ topicPartition = new TopicPartition("test-topic", 0);
+ }
+
+ @Test
+ public void testNoDataLossWithConsecutiveOffsets() {
+ // Should not throw exception for consecutive offsets
+ assertDoesNotThrow(() -> {
+ dataLossDetector.checkForDataLoss(topicPartition, 10L, 11L, 0L, 20L);
+ });
+ }
+
+ @Test
+ public void testDataLossDetectionWithOffsetGap() {
+ // Should throw exception for offset gap > 1
+ DataLossException exception = assertThrows(DataLossException.class, () -> {
+ dataLossDetector.checkForDataLoss(topicPartition, 10L, 15L, 0L, 20L);
+ });
+
+ assertEquals(DataLossException.DataLossType.OFFSET_GAP, exception.lossType());
+ assertEquals(Set.of(topicPartition), exception.partitions());
+ }
+
+ @Test
+ public void testDataLossDetectionWithOffsetOutOfRange() {
+ // Should throw exception when old offset is before beginning offset
+ DataLossException exception = assertThrows(DataLossException.class, () -> {
+ dataLossDetector.checkForDataLoss(topicPartition, 5L, 10L, 8L, 20L);
+ });
+
+ assertEquals(DataLossException.DataLossType.OUT_OF_RANGE, exception.lossType());
+ assertEquals(Set.of(topicPartition), exception.partitions());
+ }
+
+ @Test
+ public void testDataLossDetectionWithOffsetBeyondEnd() {
+ // Should throw exception when old offset is beyond end offset
+ DataLossException exception = assertThrows(DataLossException.class, () -> {
+ dataLossDetector.checkForDataLoss(topicPartition, 25L, 10L, 0L, 20L);
+ });
+
+ assertEquals(DataLossException.DataLossType.OUT_OF_RANGE, exception.lossType());
+ assertEquals(Set.of(topicPartition), exception.partitions());
+ }
+
+ @Test
+ public void testTopicRecreationDetection() {
+ // First call to establish baseline
+ dataLossDetector.checkForDataLoss(topicPartition, null, 10L, 5L, 20L);
+
+ // Second call with higher beginning offset (indicates topic recreation)
+ DataLossException exception = assertThrows(DataLossException.class, () -> {
+ dataLossDetector.checkForDataLoss(topicPartition, 10L, 15L, 10L, 25L);
+ });
+
+ assertEquals(DataLossException.DataLossType.TOPIC_RECREATION, exception.lossType());
+ assertEquals(Set.of(topicPartition), exception.partitions());
+ }
+
+ @Test
+ public void testClearPartition() {
+ // Establish tracking for partition
+ dataLossDetector.checkForDataLoss(topicPartition, null, 10L, 0L, 20L);
+ assertEquals(Long.valueOf(10L), dataLossDetector.getLastSeenOffset(topicPartition));
+
+ // Clear partition tracking
+ dataLossDetector.clearPartition(topicPartition);
+ assertEquals(null, dataLossDetector.getLastSeenOffset(topicPartition));
+ }
+
+ @Test
+ public void testValidateCommittedOffsetSuccess() {
+ // Should not throw for valid committed offset
+ assertDoesNotThrow(() -> {
+ dataLossDetector.validateCommittedOffset(topicPartition,
+ new org.apache.kafka.clients.consumer.OffsetAndMetadata(10L), 5L, 20L);
+ });
+ }
+
+ @Test
+ public void testValidateCommittedOffsetOutOfRange() {
+ // Should throw for committed offset out of range
+ DataLossException exception = assertThrows(DataLossException.class, () -> {
+ dataLossDetector.validateCommittedOffset(topicPartition,
+ new org.apache.kafka.clients.consumer.OffsetAndMetadata(3L), 5L, 20L);
+ });
+
+ assertEquals(DataLossException.DataLossType.OUT_OF_RANGE, exception.lossType());
+ assertEquals(Set.of(topicPartition), exception.partitions());
+ }
+
+ @Test
+ public void testValidateCommittedOffsetNull() {
+ // Should not throw for null committed offset
+ assertDoesNotThrow(() -> {
+ dataLossDetector.validateCommittedOffset(topicPartition, null, 5L, 20L);
+ });
+ }
+}
\ No newline at end of file
diff --git a/docs/data-loss-detection.md b/docs/data-loss-detection.md
new file mode 100644
index 0000000000000..b4a1052102d61
--- /dev/null
+++ b/docs/data-loss-detection.md
@@ -0,0 +1,300 @@
+# Enhanced Kafka Consumer Data Loss Detection
+
+## Overview
+
+This enhancement adds configurable data loss detection to Kafka consumers to prevent silent data loss scenarios. The feature integrates with existing `auto.offset.reset` strategies to provide enhanced fault tolerance capabilities, including detection of silent data loss from retention policies, service disruptions from topic recreation, and sophisticated edge case handling.
+
+## Key Capabilities
+
+### 1. Silent Data Loss Detection
+- **Continuous Monitoring**: Detects data loss between normal poll operations
+- **Retention Policy Detection**: Identifies when retention purges data before replication completes
+- **Configurable Validation Intervals**: Periodic checks during normal consumption
+
+### 2. Service Disruption Handling
+- **Topic Recreation Detection**: Handles planned maintenance operations involving topic reset
+- **Graceful Recovery**: Configurable behavior for "unable to find expected offset" scenarios
+- **False Positive Mitigation**: Grace periods to avoid alerts during legitimate operations
+
+### 3. Edge Case Management
+- **Startup Leniency**: More tolerant validation during consumer initialization
+- **False Positive Reduction**: Sophisticated algorithms to distinguish real data loss from normal operations
+- **Race Condition Handling**: Robust detection during broker failover scenarios
+
+## Configuration
+
+### Primary Configuration
+
+- **Property**: `enable.data.loss.detection`
+- **Type**: boolean
+- **Default**: `false`
+- **Importance**: MEDIUM
+
+### Advanced Tuning Options
+
+- **`data.loss.detection.gap.threshold`** (default: 1000)
+ - Maximum allowed offset gap before considering it data loss
+ - Smaller values = stricter detection, more false positives
+ - Larger values = more lenient, may miss some data loss
+
+- **`data.loss.detection.validation.interval.ms`** (default: 30000)
+ - Interval between continuous validation checks during normal consumption
+ - Helps detect silent data loss from retention policies
+
+- **`data.loss.detection.grace.period.ms`** (default: 5000)
+ - Grace period to avoid false positives during topic recreation
+ - Events within this period are logged as warnings instead of exceptions
+
+## Configuration
+
+### New Consumer Configuration
+
+- **Property**: `enable.data.loss.detection`
+- **Type**: boolean
+- **Default**: `false`
+- **Importance**: MEDIUM
+
+When enabled, the consumer will detect scenarios that could lead to data loss and react according to the configured `auto.offset.reset` strategy.
+
+### Example Configuration
+
+```properties
+# Enable enhanced data loss detection
+enable.data.loss.detection=true
+
+# Configure behavior when data loss is detected
+auto.offset.reset=none # Fail fast on any data loss scenario
+
+# Fine-tune detection sensitivity (optional)
+data.loss.detection.gap.threshold=500 # Stricter gap detection
+data.loss.detection.validation.interval.ms=15000 # More frequent validation
+data.loss.detection.grace.period.ms=10000 # Longer grace period for maintenance
+```
+
+## **Design Decision Analysis**
+
+### **Why create new configurations instead of using existing Kafka mechanisms?**
+
+**ANSWER**: Kafka already provides:
+
+```properties
+# Existing mechanisms that handle our requirements:
+auto.offset.reset=none # Already fails fast
+check.crcs=true # Already validates data integrity
+isolation.level=read_committed # Already provides consistency
+```
+
+**Better approach**: Enhance existing error messages and logging instead of adding configuration proliferation.
+
+### **Why create DataLossException instead of using existing exceptions?**
+
+**ANSWER**: Kafka already has:
+
+- `NoOffsetForPartitionException` - For missing offsets (auto.offset.reset=none)
+- `OffsetOutOfRangeException` - For out-of-range scenarios
+- `TopicAuthorizationException` - For authorization failures
+
+**Better approach**: Enhance existing exception messages with more detailed context.
+
+## **Improved Implementation Approach**
+
+Instead of new classes, we should have:
+
+1. **Enhanced existing exception messages** with detailed context
+2. **Improved logging** with structured information
+3. **Leveraged auto.offset.reset=none** for fail-fast behavior
+4. **Extended OffsetOutOfRangeException** with gap detection details
+
+```java
+// Better approach - enhance existing exceptions
+catch (NoOffsetForPartitionException e) {
+ // Enhanced with data loss context
+ log.error("Potential data loss detected: {}", enhancedMessage);
+ throw e; // Re-throw existing exception
+}
+```
+
+This follows the **Principle of Least Surprise** and avoids API proliferation.
+
+### Silent Data Loss from Retention Policies
+
+**Problem**: Kafka retention policies may purge data from source topics before replication completes, creating undetectable gaps in the replicated data stream.
+
+**Solution**:
+- **Continuous monitoring** during normal consumption via `validateContinuousDataIntegrity()`
+- **Periodic validation** every 30 seconds (configurable) to detect retention-based data loss
+- **Baseline tracking** of beginning offsets to detect when data is purged
+- **Configurable gap thresholds** to distinguish normal retention from significant data loss
+
+```java
+// Automatic detection during normal consumption
+consumer.poll(Duration.ofMillis(100)); // Triggers periodic validation internally
+```
+
+### Service Disruption from Topic Recreation ✅
+
+**Problem**: Planned maintenance operations involving topic deletion and recreation can cause replication services to be unable to find expected offsets and stop replication.
+
+**Solution**:
+- **Topic recreation detection** via beginning offset monitoring
+- **Graceful recovery** with configurable grace periods during maintenance windows
+- **Service continuity** - depending on strategy, either fail-fast or attempt recovery
+- **Enhanced logging** to distinguish planned maintenance from actual data loss
+
+```properties
+# For replication services - be more tolerant during maintenance
+data.loss.detection.grace.period.ms=30000 # 30 second grace period
+auto.offset.reset=earliest # Recover from earliest available
+```
+
+### Edge Case Scenarios ✅
+
+**Problem**: False positives during topic reset detection and potential missed truncations at startup.
+
+**Solutions**:
+
+1. **False Positive Mitigation**:
+ - Grace periods after consumer initialization
+ - Lenient validation during startup scenarios
+ - Configurable thresholds to tune sensitivity
+ - Time-based validation to avoid broker failover false positives
+
+2. **Startup Edge Cases**:
+ - More tolerant offset validation during first connection
+ - Baseline establishment before strict monitoring
+ - Differentiation between startup vs. runtime scenarios
+
+3. **Missed Truncation Detection**:
+ - Continuous monitoring of beginning/end offset ranges
+ - Detection of offset range shrinkage indicating truncation
+ - Validation of consumer position against current offset ranges
+
+```java
+// Example of enhanced edge case handling
+if (isStartupScenario) {
+ // Apply lenient validation - only fail on extreme cases
+ if (offsetGap > MAX_THRESHOLD * 10) {
+ log.warn("Large gap during startup - investigation recommended");
+ return; // Don't fail during startup
+ }
+}
+```
+
+## Data Loss Detection Scenarios
+
+The enhanced consumer detects the following data loss scenarios:
+
+### 1. Offset Gaps
+- **Description**: When the consumer's last committed offset is significantly ahead of available data
+- **Detection**: Compares committed offset with broker's beginning and end offsets
+- **Impact**: Indicates potential message loss due to log truncation or deletion
+
+### 2. Topic Recreation
+- **Description**: When a topic is deleted and recreated with the same name
+- **Detection**: Monitors topic metadata changes and offset ranges
+- **Impact**: All previous messages are lost when topic is recreated
+
+### 3. Out-of-Range Offsets
+- **Description**: When committed offset is outside the available offset range
+- **Detection**: Validates committed offset against broker's offset boundaries
+- **Impact**: Indicates data has been aged out or truncated
+
+## Behavior by Strategy
+
+### NONE Strategy (`auto.offset.reset=none`)
+When data loss is detected:
+- **Action**: Throws `DataLossException` immediately
+- **Behavior**: Fail-fast with detailed error information
+- **Use Case**: Applications requiring strict data consistency guarantees
+
+### EARLIEST Strategy (`auto.offset.reset=earliest`)
+When data loss is detected:
+- **Action**: Logs detailed warning and resets to earliest available offset
+- **Behavior**: Attempts recovery but may result in duplicate processing
+- **Use Case**: Applications that can handle duplicates but need to continue processing
+
+### LATEST Strategy (`auto.offset.reset=latest`)
+When data loss is detected:
+- **Action**: Logs detailed warning and resets to latest available offset
+- **Behavior**: Skips missing data and continues from current position
+- **Use Case**: Applications where recent data is more important than completeness
+- **Use Case**: Applications where recent data is more important than completeness
+
+## Exception Handling
+
+### DataLossException
+
+New exception class that provides detailed information about detected data loss:
+
+```java
+public class DataLossException extends Exception {
+ public enum DataLossType {
+ OFFSET_GAP, // Gap between committed and available offsets
+ TOPIC_RECREATION, // Topic was deleted and recreated
+ OUT_OF_RANGE, // Offset outside available range
+ UNKNOWN // Unclassified data loss scenario
+ }
+
+ // Exception provides partition, offset details, and loss type
+}
+```
+
+## Integration Points
+
+### Consumer Creation
+Data loss detection is configured per consumer instance:
+
+```java
+Properties props = new Properties();
+props.put(ConsumerConfig.ENABLE_DATA_LOSS_DETECTION_CONFIG, true);
+props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+
+KafkaConsumer consumer = new KafkaConsumer<>(props);
+```
+
+## Best Practices
+
+### Production Deployments
+1. **Enable for Critical Applications**: Use with `auto.offset.reset=none` for applications requiring data consistency
+2. **Monitor Logs**: Set up alerting on DataLossException occurrences
+3. **Test Recovery Scenarios**: Validate application behavior under different data loss conditions
+
+### Development and Testing
+1. **Simulate Data Loss**: Test with controlled topic deletion/recreation scenarios
+2. **Validate Exception Handling**: Ensure proper error handling for DataLossException
+3. **Performance Testing**: Verify minimal impact on consumer performance
+
+## Guide
+
+### Configuration Changes
+```properties
+# Before: Basic auto.offset.reset
+auto.offset.reset=latest
+
+# After: Enhanced with data loss detection
+auto.offset.reset=none
+enable.data.loss.detection=true
+```
+
+## Performance Considerations
+
+- **Minimal Overhead**: Detection only occurs during offset reset scenarios
+- **Network Impact**: No additional broker requests required
+- **Memory Usage**: Negligible memory overhead for tracking partition state
+
+## Testing
+
+The implementation includes comprehensive unit tests covering:
+- Offset gap detection scenarios
+- Topic recreation detection
+- Out-of-range offset validation
+- Error handling and recovery
+- Edge cases and boundary conditions
+
+## Future Enhancements
+
+Potential future improvements:
+- Real-time data loss detection during normal consumption
+- Configurable tolerance levels for offset gaps
+- Integration with monitoring systems
+- Automatic recovery strategies for different loss types
\ No newline at end of file