Skip to content

Commit

Permalink
KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587)
Browse files Browse the repository at this point in the history
Reviewers: Chris Egerton <chrise@aiven.io>
  • Loading branch information
hudeqi authored and C0urante committed Oct 20, 2023
1 parent 33b8ee3 commit 9fca008
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
Expand Up @@ -330,11 +330,20 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {

// if translated offset from upstream is smaller than the current consumer offset
// in the target, skip updating the offset for that partition
long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset();
if (latestDownstreamOffset >= convertedOffset.offset()) {
log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for "
+ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition);
continue;
OffsetAndMetadata targetOffsetAndMetadata = targetConsumerOffset.get(topicPartition);
if (targetOffsetAndMetadata != null) {
long latestDownstreamOffset = targetOffsetAndMetadata.offset();
if (latestDownstreamOffset >= convertedOffset.offset()) {
log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for "
+ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition);
continue;
}
} else {
// It is possible that when resetting offsets are performed in the java kafka client, the reset to -1 will be intercepted.
// However, there are some other types of clients such as sarama, which can magically reset the group offset to -1, which will cause
// `targetOffsetAndMetadata` here is null. For this case, just sync the offset to target.
log.warn("Group {} offset for partition {} may has been reset to a negative offset, just sync the offset to target.",
consumerGroupId, topicPartition);
}
offsetToSync.put(topicPartition, convertedOffset);
}
Expand Down
Expand Up @@ -169,6 +169,33 @@ public void testSyncOffset() {
"Consumer 2 " + topic2 + " failed");
}

@Test
public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();

String consumer = "consumer";
String topic = "topic";
Map<TopicPartition, OffsetAndMetadata> ct = new HashMap<>();
TopicPartition tp = new TopicPartition(topic, 0);
// Simulate other clients such as Sarama, which may reset group offsets to -1. This can cause
// the obtained `OffsetAndMetadata` of the target cluster to be null.
ct.put(tp, null);
idleConsumerGroupsOffset.put(consumer, ct);

Checkpoint cp = new Checkpoint(consumer, new TopicPartition(topic, 0), 200, 101, "metadata");
Map<TopicPartition, Checkpoint> checkpointMap = new HashMap<>();
checkpointMap.put(cp.topicPartition(), cp);
checkpointsPerConsumerGroup.put(consumer, checkpointMap);

MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source", "target",
new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup);

Map<String, Map<TopicPartition, OffsetAndMetadata>> output = mirrorCheckpointTask.syncGroupOffset();

assertEquals(101, output.get(consumer).get(tp).offset(), "Consumer " + topic + " failed");
}

@Test
public void testNoCheckpointForTopicWithoutOffsetSyncs() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
Expand Down

0 comments on commit 9fca008

Please sign in to comment.