diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index a51846766a6e..0bf2baa9d853 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -330,11 +330,20 @@ Map> syncGroupOffset() { // if translated offset from upstream is smaller than the current consumer offset // in the target, skip updating the offset for that partition - long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset(); - if (latestDownstreamOffset >= convertedOffset.offset()) { - log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for " - + "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition); - continue; + OffsetAndMetadata targetOffsetAndMetadata = targetConsumerOffset.get(topicPartition); + if (targetOffsetAndMetadata != null) { + long latestDownstreamOffset = targetOffsetAndMetadata.offset(); + if (latestDownstreamOffset >= convertedOffset.offset()) { + log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for " + + "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition); + continue; + } + } else { + // It is possible that when resetting offsets are performed in the java kafka client, the reset to -1 will be intercepted. + // However, there are some other types of clients such as sarama, which can magically reset the group offset to -1, which will cause + // `targetOffsetAndMetadata` here is null. For this case, just sync the offset to target. + log.warn("Group {} offset for partition {} may has been reset to a negative offset, just sync the offset to target.", + consumerGroupId, topicPartition); } offsetToSync.put(topicPartition, convertedOffset); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java index f9bc7bc76cbc..8d8e8bd3a0d0 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java @@ -169,6 +169,33 @@ public void testSyncOffset() { "Consumer 2 " + topic2 + " failed"); } + @Test + public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() { + Map> idleConsumerGroupsOffset = new HashMap<>(); + Map> checkpointsPerConsumerGroup = new HashMap<>(); + + String consumer = "consumer"; + String topic = "topic"; + Map ct = new HashMap<>(); + TopicPartition tp = new TopicPartition(topic, 0); + // Simulate other clients such as Sarama, which may reset group offsets to -1. This can cause + // the obtained `OffsetAndMetadata` of the target cluster to be null. + ct.put(tp, null); + idleConsumerGroupsOffset.put(consumer, ct); + + Checkpoint cp = new Checkpoint(consumer, new TopicPartition(topic, 0), 200, 101, "metadata"); + Map checkpointMap = new HashMap<>(); + checkpointMap.put(cp.topicPartition(), cp); + checkpointsPerConsumerGroup.put(consumer, checkpointMap); + + MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source", "target", + new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup); + + Map> output = mirrorCheckpointTask.syncGroupOffset(); + + assertEquals(101, output.get(consumer).get(tp).offset(), "Consumer " + topic + " failed"); + } + @Test public void testNoCheckpointForTopicWithoutOffsetSyncs() { OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();