-
Notifications
You must be signed in to change notification settings - Fork 14.8k
MM2: fail-fast on truncation + auto-recover on topic reset (MirrorSourceTask) #20515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
MM2: fail-fast on truncation + auto-recover on topic reset (MirrorSourceTask) #20515
Conversation
- Add fail-fast truncation detection with detailed error logging - Add graceful topic reset handling with auto-recovery - Add configuration toggles for fault tolerance features - Add AdminClient-based topic ID tracking for reset detection - Add seekToBeginning for topic reset recovery - Maintain backward compatibility with existing MM2 behavior Features: - mirrorsource.fail.on.truncation=true (default) - mirrorsource.auto.recover.on.reset=true (default) - mirrorsource.topic.reset.retry.ms=5000 (default) This addresses silent data loss scenarios and improves resilience during planned maintenance operations involving topic resets.
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
zheguang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The patch doesn't compile yet. Can you complete it before a formal review?
| this.topicResetRetryMs = Long.parseLong(props.getOrDefault("mirrorsource.topic.reset.retry.ms", "5000")); | ||
|
|
||
| // Build AdminClient for source cluster (same configs as source consumer) | ||
| Map<String, Object> adminProps = new HashMap<>(config.sourceConsumerConfig("replication-consumer")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashMap requires import.
| FT_LOG.warn("TOPIC_RESET_SUSPECTED: {}. Will retry metadata and resubscribe in {} ms.", utpe.getMessage(), topicResetRetryMs); | ||
| sleep(topicResetRetryMs); | ||
| handleTopicResetIfAny(); | ||
| return Collections.emptyList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections requires import.
| if (topics.isEmpty()) return; | ||
| try { | ||
| DescribeTopicsResult res = sourceAdmin.describeTopics(topics); | ||
| Map<String, TopicDescription> desc = res.all().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DescribeTopicResult#all has private access.
| if (!toSeek.isEmpty()) consumer.seekToBeginning(toSeek); | ||
| } | ||
| } | ||
| } catch (InterruptedException ie) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unclear who throws in the try block.
| } | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| } catch (ExecutionException ee) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. Unclear who throws in the try block.
| @@ -0,0 +1,120 @@ | |||
| --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file shouldn't be checked in.
Summary
This PR enhances MirrorMaker 2 (MM2) with fault-tolerance capabilities to address critical data loss scenarios in cross-cluster replication setups.
Problem Statement
Vanilla MM2 has two critical gaps:
Solution
Added fault-tolerance enhancements to
MirrorSourceTask:Fail-Fast Truncation Detection
OffsetOutOfRangeExceptionduring consumer pollingConnectExceptionto fail-fast and alert operators immediatelymirrorsource.fail.on.truncation=true(default)Graceful Topic Reset Handling
AdminClientto track topic IDs and detect delete/recreate eventsUnknownTopicOrPartitionExceptionwith retry logicmirrorsource.auto.recover.on.reset=true(default)Technical Details
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.javamm2.fault.tolerancefor easy filteringTesting
Impact