From ea3a0609388d19caf2de6587a5efdbcb851a3224 Mon Sep 17 00:00:00 2001 From: Ishita Mandhan Date: Fri, 2 Dec 2016 01:45:57 -0800 Subject: [PATCH] KAFKA-2857 ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created #1548 --- .../main/scala/kafka/admin/AdminClient.scala | 56 +++++++++++-------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 9cd4823bd93b1..3ca7ac4b3c278 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -64,11 +64,15 @@ class AdminClient(val time: Time, throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers") } - private def findCoordinator(groupId: String): Node = { + private def findCoordinator(groupId: String): Option[Node] = { val request = new GroupCoordinatorRequest(groupId) val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request).asInstanceOf[GroupCoordinatorResponse] - Errors.forCode(response.errorCode()).maybeThrow() - response.node() + Errors.forCode(response.errorCode()) match { + case Errors.GROUP_COORDINATOR_NOT_AVAILABLE => None + case error => + error.maybeThrow() + Some(response.node()) + } } def listGroups(node: Node): List[GroupOverview] = { @@ -132,27 +136,31 @@ class AdminClient(val time: Time, coordinator: Node) def describeConsumerGroup(groupId: String): ConsumerGroupSummary = { - val coordinator = findCoordinator(groupId) - val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(Collections.singletonList(groupId))) - val response = responseBody.asInstanceOf[DescribeGroupsResponse] - val metadata = response.groups.get(groupId) - if (metadata == null) - throw new KafkaException(s"Response from broker contained no metadata for group $groupId") - if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group") - - Errors.forCode(metadata.errorCode()).maybeThrow() - val consumers = metadata.members.asScala.map { consumer => - ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match { - case "Stable" => - val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment))) - assignment.partitions.asScala.toList - case _ => - List() - }) - }.toList - - ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator) + findCoordinator(groupId) match { + case None => + throw new KafkaException(s"Could not find coordinator for group $groupId, which implies that one of the consumer offsets partitions may be offline or in the process of being created if this is a new cluster.") + case Some(coordinator) => + val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(Collections.singletonList(groupId))) + val response = responseBody.asInstanceOf[DescribeGroupsResponse] + val metadata = response.groups.get(groupId) + if (metadata == null) + throw new KafkaException(s"Response from broker contained no metadata for group $groupId") + if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE) + throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group") + + Errors.forCode(metadata.errorCode()).maybeThrow() + val consumers = metadata.members.asScala.map { consumer => + ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match { + case "Stable" => + val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment))) + assignment.partitions.asScala.toList + case _ => + List() + }) + }.toList + + ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator) + } } def close() {