Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,39 @@ public class ConsumerConfig extends AbstractConfig {
"<p>Note that altering partition numbers while setting this config to latest may cause message delivery loss since " +
"producers could start to send messages to newly added partitions (i.e. no initial offsets exist yet) before consumers reset their offsets.";

/**
* <code>enable.data.loss.detection</code>
*/
public static final String ENABLE_DATA_LOSS_DETECTION_CONFIG = "enable.data.loss.detection";
public static final String ENABLE_DATA_LOSS_DETECTION_DOC = "Enables data loss detection for the consumer. When enabled, the consumer will detect and handle scenarios " +
"that could lead to data loss, including offset gaps, topic recreation, and out-of-range offsets. The behavior when data loss is detected depends on the " +
"auto.offset.reset strategy: 'none' will throw a DataLossException, while 'earliest' and 'latest' will log warnings and attempt recovery.";
public static final boolean DEFAULT_ENABLE_DATA_LOSS_DETECTION = false;

/**
* <code>data.loss.detection.gap.threshold</code>
*/
public static final String DATA_LOSS_DETECTION_GAP_THRESHOLD_CONFIG = "data.loss.detection.gap.threshold";
public static final String DATA_LOSS_DETECTION_GAP_THRESHOLD_DOC = "The maximum allowed offset gap before considering it a potential data loss scenario. " +
"Smaller values provide stricter detection but may cause false positives during normal retention. Larger values are more lenient but may miss actual data loss.";
public static final long DEFAULT_DATA_LOSS_DETECTION_GAP_THRESHOLD = 1000L;

/**
* <code>data.loss.detection.validation.interval.ms</code>
*/
public static final String DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_CONFIG = "data.loss.detection.validation.interval.ms";
public static final String DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_DOC = "The interval in milliseconds between continuous data loss validation checks during normal consumption. " +
"This helps detect silent data loss due to retention policies between normal poll operations.";
public static final long DEFAULT_DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS = 30000L; // 30 seconds

/**
* <code>data.loss.detection.grace.period.ms</code>
*/
public static final String DATA_LOSS_DETECTION_GRACE_PERIOD_MS_CONFIG = "data.loss.detection.grace.period.ms";
public static final String DATA_LOSS_DETECTION_GRACE_PERIOD_MS_DOC = "Grace period in milliseconds to avoid false positives during topic recreation or broker maintenance. " +
"Suspected data loss events within this period after initialization will be logged as warnings instead of throwing exceptions.";
public static final long DEFAULT_DATA_LOSS_DETECTION_GRACE_PERIOD_MS = 5000L; // 5 seconds

/**
* <code>fetch.min.bytes</code>
*/
Expand Down Expand Up @@ -546,6 +579,29 @@ public class ConsumerConfig extends AbstractConfig {
new AutoOffsetResetStrategy.Validator(),
Importance.MEDIUM,
AUTO_OFFSET_RESET_DOC)
.define(ENABLE_DATA_LOSS_DETECTION_CONFIG,
Type.BOOLEAN,
DEFAULT_ENABLE_DATA_LOSS_DETECTION,
Importance.MEDIUM,
ENABLE_DATA_LOSS_DETECTION_DOC)
.define(DATA_LOSS_DETECTION_GAP_THRESHOLD_CONFIG,
Type.LONG,
DEFAULT_DATA_LOSS_DETECTION_GAP_THRESHOLD,
atLeast(1),
Importance.LOW,
DATA_LOSS_DETECTION_GAP_THRESHOLD_DOC)
.define(DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_CONFIG,
Type.LONG,
DEFAULT_DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS,
atLeast(1000),
Importance.LOW,
DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_DOC)
.define(DATA_LOSS_DETECTION_GRACE_PERIOD_MS_CONFIG,
Type.LONG,
DEFAULT_DATA_LOSS_DETECTION_GRACE_PERIOD_MS,
atLeast(0),
Importance.LOW,
DATA_LOSS_DETECTION_GRACE_PERIOD_MS_DOC)
.define(CHECK_CRCS_CONFIG,
Type.BOOLEAN,
true,
Expand Down
Loading