Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15607:Possible NPE is thrown in MirrorCheckpointTask #14587

Merged
merged 2 commits into from Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -330,11 +330,20 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {

// if translated offset from upstream is smaller than the current consumer offset
// in the target, skip updating the offset for that partition
long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset();
if (latestDownstreamOffset >= convertedOffset.offset()) {
log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for "
+ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition);
continue;
OffsetAndMetadata targetOffsetAndMetadata = targetConsumerOffset.get(topicPartition);
if (targetOffsetAndMetadata != null) {
long latestDownstreamOffset = targetOffsetAndMetadata.offset();
if (latestDownstreamOffset >= convertedOffset.offset()) {
log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for "
+ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition);
continue;
}
} else {
// It is possible that when resetting offsets are performed in the java kafka client, the reset to -1 will be intercepted.
// However, there are some other types of clients such as sarama, which can magically reset the group offset to -1, which will cause
// `targetOffsetAndMetadata` here is null. For this case, just sync the offset to target.
log.warn("Group {} offset for partition {} may has been reset to a negative offset, just sync the offset to target.",
consumerGroupId, topicPartition);
}
offsetToSync.put(topicPartition, convertedOffset);
}
Expand Down
Expand Up @@ -169,6 +169,33 @@ public void testSyncOffset() {
"Consumer 2 " + topic2 + " failed");
}

@Test
public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();

String consumer = "consumer";
String topic = "topic";
Map<TopicPartition, OffsetAndMetadata> ct = new HashMap<>();
TopicPartition tp = new TopicPartition(topic, 0);
// Simulate other clients such as sarama to reset the group offset of the target cluster to -1. At this time,
// the obtained `OffsetAndMetadata` of the target cluster is null.
hudeqi marked this conversation as resolved.
Show resolved Hide resolved
ct.put(tp, null);
idleConsumerGroupsOffset.put(consumer, ct);

Checkpoint cp = new Checkpoint(consumer, new TopicPartition(topic, 0), 200, 101, "metadata");
Map<TopicPartition, Checkpoint> checkpointMap = new HashMap<>();
checkpointMap.put(cp.topicPartition(), cp);
checkpointsPerConsumerGroup.put(consumer, checkpointMap);

MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source", "target",
new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup);

Map<String, Map<TopicPartition, OffsetAndMetadata>> output = mirrorCheckpointTask.syncGroupOffset();

assertEquals(101, output.get(consumer).get(tp).offset(), "Consumer " + topic + " failed");
}

@Test
public void testNoCheckpointForTopicWithoutOffsetSyncs() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
Expand Down