Skip to content

Commit

Permalink
KAFKA-14367; Add OffsetFetch to the new GroupCoordinator interface (
Browse files Browse the repository at this point in the history
#12870)

This patch adds OffsetFetch to the new GroupCoordinator interface and updates KafkaApis to use it. 

Reviewers: Philip Nee <pnee@confluent.i>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
  • Loading branch information
dajac committed Jan 10, 2023
1 parent 2fc1875 commit 24a8642
Show file tree
Hide file tree
Showing 9 changed files with 799 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,24 @@ public boolean requireStable() {
return data.requireStable();
}

public List<OffsetFetchRequestData.OffsetFetchRequestGroup> groups() {
if (version() >= 8) {
return data.groups();
} else {
OffsetFetchRequestData.OffsetFetchRequestGroup group =
new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId());

data.topics().forEach(topic -> {
group.topics().add(new OffsetFetchRequestTopics()
.setName(topic.name())
.setPartitionIndexes(topic.partitionIndexes())
);
});

return Collections.singletonList(group);
}
}

public Map<String, List<TopicPartition>> groupIdsToPartitions() {
Map<String, List<TopicPartition>> groupIdsToPartitions = new HashMap<>();
for (OffsetFetchRequestGroup group : data.groups()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseGroup;
import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
Expand Down Expand Up @@ -119,12 +120,6 @@ public int hashCode() {
}
}

public OffsetFetchResponse(OffsetFetchResponseData data) {
super(ApiKeys.OFFSET_FETCH);
this.data = data;
this.error = null;
}

/**
* Constructor without throttle time.
* @param error Potential coordinator or group level error code (for api version 2 and later)
Expand Down Expand Up @@ -208,6 +203,59 @@ public OffsetFetchResponse(int throttleTimeMs,
this.error = null;
}

public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short version) {
super(ApiKeys.OFFSET_FETCH);
data = new OffsetFetchResponseData();

if (version >= 8) {
data.setGroups(groups);
error = null;

for (OffsetFetchResponseGroup group : data.groups()) {
this.groupLevelErrors.put(group.groupId(), Errors.forCode(group.errorCode()));
}
} else {
if (groups.size() != 1) {
throw new UnsupportedVersionException(
"Version " + version + " of OffsetFetchResponse only supports one group."
);
}

OffsetFetchResponseGroup group = groups.get(0);
data.setErrorCode(group.errorCode());
error = Errors.forCode(group.errorCode());

group.topics().forEach(topic -> {
OffsetFetchResponseTopic newTopic = new OffsetFetchResponseTopic().setName(topic.name());
data.topics().add(newTopic);

topic.partitions().forEach(partition -> {
OffsetFetchResponsePartition newPartition;

if (version < 2 && group.errorCode() != Errors.NONE.code()) {
// Versions prior to version 2 do not support a top level error. Therefore,
// we put it at the partition level.
newPartition = new OffsetFetchResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(group.errorCode())
.setCommittedOffset(INVALID_OFFSET)
.setMetadata(NO_METADATA)
.setCommittedLeaderEpoch(NO_PARTITION_LEADER_EPOCH);
} else {
newPartition = new OffsetFetchResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(partition.errorCode())
.setCommittedOffset(partition.committedOffset())
.setMetadata(partition.metadata())
.setCommittedLeaderEpoch(partition.committedLeaderEpoch());
}

newTopic.partitions().add(newPartition);
});
});
}
}

public OffsetFetchResponse(OffsetFetchResponseData data, short version) {
super(ApiKeys.OFFSET_FETCH);
this.data = data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
]},
{ "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true,
"about": "The top-level error code, or 0 if there was no error." },
{"name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
{ "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
"about": "The responses per group id.", "fields": [
{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
"about": "The group ID." },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package kafka.coordinator.group

import kafka.server.RequestLocal
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, SyncGroupRequestData, SyncGroupResponseData}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.utils.BufferSupplier

import java.util
import java.util.concurrent.CompletableFuture
import scala.collection.immutable
import scala.collection.{immutable, mutable}
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -234,4 +236,80 @@ class GroupCoordinatorAdapter(
}
CompletableFuture.completedFuture(results)
}

override def fetchAllOffsets(
context: RequestContext,
groupId: String,
requireStable: Boolean
): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
handleFetchOffset(
groupId,
requireStable,
None
)
}

override def fetchOffsets(
context: RequestContext,
groupId: String,
topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics],
requireStable: Boolean
): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
val topicPartitions = new mutable.ArrayBuffer[TopicPartition]()
topics.forEach { topic =>
topic.partitionIndexes.forEach { partition =>
topicPartitions += new TopicPartition(topic.name, partition)
}
}

handleFetchOffset(
groupId,
requireStable,
Some(topicPartitions.toSeq)
)
}

private def handleFetchOffset(
groupId: String,
requireStable: Boolean,
partitions: Option[Seq[TopicPartition]]
): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
val (error, results) = coordinator.handleFetchOffsets(
groupId,
requireStable,
partitions
)

val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
if (error != Errors.NONE) {
future.completeExceptionally(error.exception)
} else {
val topicsList = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]()
val topicsMap = new mutable.HashMap[String, OffsetFetchResponseData.OffsetFetchResponseTopics]()

results.forKeyValue { (tp, offset) =>
val topic = topicsMap.get(tp.topic) match {
case Some(topic) =>
topic

case None =>
val topicOffsets = new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(tp.topic)
topicsMap += tp.topic -> topicOffsets
topicsList.add(topicOffsets)
topicOffsets
}

topic.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(tp.partition)
.setMetadata(offset.metadata)
.setCommittedOffset(offset.offset)
.setCommittedLeaderEpoch(offset.leaderEpoch.orElse(-1))
.setErrorCode(offset.error.code))
}

future.complete(topicsList)
}

future
}
}

0 comments on commit 24a8642

Please sign in to comment.