From 5b55c257b861fdc51636ae071de4a4397dbd3ccd Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Wed, 7 Sep 2016 16:06:00 -0700 Subject: [PATCH] This PR makes a few enhancements to the --describe option of ConsumerGroupCommand: 1. Listing members with no assigned partitions. 2. Reporting the member id along with the owner of each partition (owner is supposed to be the logical application id and all members in the same group are supposed to set the same owner). 3. Printing a message indicating whether ZooKeeper based or new consumer API based information is being reported. It also adds unit tests to verify the added functionality. Note: The third request on the corresponding JIRA (listing active offsets for empty groups of new consumers) is not implemented as part of this PR, and has been moved to its own JIRA [KAFKA-3853](https://issues.apache.org/jira/browse/KAFKA-3853). --- .../main/scala/kafka/admin/AdminClient.scala | 56 ++-- .../kafka/admin/ConsumerGroupCommand.scala | 297 +++++++++++------- .../kafka/coordinator/GroupMetadata.scala | 6 +- .../scala/kafka/tools/StreamsResetter.java | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 17 +- .../kafka/api/AdminClientTest.scala | 30 +- .../admin/DescribeConsumerGroupTest.scala | 128 ++++++++ .../kafka/admin/ListConsumerGroupTest.scala | 87 +++++ .../integration/ResetIntegrationTest.java | 2 +- 9 files changed, 463 insertions(+), 162 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala create mode 100644 core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 556a02b9d411..22a8abb30439 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -17,7 +17,7 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicInteger import kafka.common.KafkaException -import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary} +import kafka.coordinator.{GroupOverview, MemberSummary} import kafka.utils.Logging import org.apache.kafka.clients._ import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture} @@ -121,44 +121,38 @@ class AdminClient(val time: Time, listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE) } - def describeGroup(groupId: String): GroupSummary = { + /** + * Case class used to represent a consumer of a consumer group + */ + case class ConsumerSummary(consumerId: String, + clientId: String, + host: String, + assignment: List[TopicPartition]) + + /** + * Case class used to represent group metadata (including the group coordinator) for the DescribeGroup API + */ + case class ConsumerGroupSummary(state: String, + assignmentStrategy: String, + consumers: Option[List[ConsumerSummary]], + coordinator: Node) + + def describeConsumerGroup(groupId: String): ConsumerGroupSummary = { val coordinator = findCoordinator(groupId) val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava)) val response = new DescribeGroupsResponse(responseBody) - val metadata = response.groups().get(groupId) + val metadata = response.groups.get(groupId) if (metadata == null) throw new KafkaException(s"Response from broker contained no metadata for group $groupId") + if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE) + throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group") Errors.forCode(metadata.errorCode()).maybeThrow() - val members = metadata.members().map { member => - val metadata = Utils.readBytes(member.memberMetadata()) - val assignment = Utils.readBytes(member.memberAssignment()) - MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment) + val consumers = metadata.members.map { consumer => + val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment))) + ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, assignment.partitions.toList) }.toList - GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members) - } - - case class ConsumerSummary(memberId: String, - clientId: String, - clientHost: String, - assignment: List[TopicPartition]) - - def describeConsumerGroup(groupId: String): Option[List[ConsumerSummary]] = { - val group = describeGroup(groupId) - if (group.state == "Dead") - return None - - if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - throw new IllegalArgumentException(s"Group $groupId with protocol type '${group.protocolType}' is not a valid consumer group") - - if (group.state == "Stable") { - Some(group.members.map { member => - val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) - new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList) - }) - } else { - Some(List.empty) - } + ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator) } def close() { diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 5de2d26c505b..354e6a2bebdc 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -28,8 +28,8 @@ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNoNodeException import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.Node import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.BrokerNotAvailableException import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.serialization.StringDeserializer @@ -38,7 +38,7 @@ import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ import scala.collection.{Set, mutable} -object ConsumerGroupCommand { +object ConsumerGroupCommand extends Logging { def main(args: Array[String]) { val opts = new ConsumerGroupCommandOptions(args) @@ -55,77 +55,140 @@ object ConsumerGroupCommand { val consumerGroupService = { if (opts.useOldConsumer) { + System.err.println("Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).\n") new ZkConsumerGroupService(opts) } else { + System.err.println("Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).\n") new KafkaConsumerGroupService(opts) } } try { if (opts.options.has(opts.listOpt)) - consumerGroupService.list() - else if (opts.options.has(opts.describeOpt)) - consumerGroupService.describe() + consumerGroupService.listGroups().foreach(println(_)) + else if (opts.options.has(opts.describeOpt)) { + val (state, assignments) = consumerGroupService.describeGroup() + val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head + assignments match { + case None => + printError(s"The consumer group '$groupId' does not exist.") + case Some(assignments) => + if (assignments.isEmpty) + state match { + case Some("Dead") => + printError(s"Consumer group '$groupId' does not exist.") + case Some("Empty") => + printError(s"Consumer group '$groupId' has no active members.") + case Some(_) => + printError(s"Consumer group '$groupId' is rebalancing.") + case None => + // the control should never reach here + throw new KafkaException("Expected a valid consumer group state, but none found.") + } + else + printAssignment(assignments, !opts.useOldConsumer) + } + } else if (opts.options.has(opts.deleteOpt)) { consumerGroupService match { - case service: ZkConsumerGroupService => service.delete() - case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService") + case service: ZkConsumerGroupService => service.deleteGroups() + case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.") } } } catch { case e: Throwable => - println("Error while executing consumer group command " + e.getMessage) - println(Utils.stackTrace(e)) + printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e)) } finally { consumerGroupService.close() } } + def printError(msg: String, e: Option[Throwable] = None): Unit = { + println(s"Error: $msg") + e.foreach(debug("Exception in consumer group command", _)) + } + + def printAssignment(groupAssignment: Seq[PartitionAssignmentState], useNewConsumer: Boolean): Unit = { + print("\n%-30s %-10s %-15s %-15s %-10s %-50s".format("TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID")) + if (useNewConsumer) + print("%-30s %s".format("HOST", "CLIENT-ID")) + println() + + groupAssignment.foreach { consumerAssignment => + print("%-30s %-10s %-15s %-15s %-10s %-50s".format( + consumerAssignment.topic.getOrElse("-"), consumerAssignment.partition.getOrElse("-"), + consumerAssignment.offset.getOrElse("-"), consumerAssignment.logEndOffset.getOrElse("-"), + consumerAssignment.lag.getOrElse("-"), consumerAssignment.consumerId.getOrElse("-"))) + if (useNewConsumer) + print("%-30s %s".format(consumerAssignment.host.getOrElse("-"), consumerAssignment.clientId.getOrElse("-"))) + println() + } + } + + protected case class PartitionAssignmentState(group: String, coordinator: Option[Node], topic: Option[String], + partition: Option[Int], offset: Option[Long], lag: Option[Long], + consumerId: Option[String], host: Option[String], + clientId: Option[String], logEndOffset: Option[Long]) + sealed trait ConsumerGroupService { - def list(): Unit + def listGroups(): List[String] - def describe() { - describeGroup(opts.options.valueOf(opts.groupOpt)) + def describeGroup(): (Option[String], Option[Seq[PartitionAssignmentState]]) = { + collectGroupAssignment(opts.options.valueOf(opts.groupOpt)) } def close(): Unit protected def opts: ConsumerGroupCommandOptions - protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult - - protected def describeGroup(group: String): Unit - - protected def describeTopicPartition(group: String, - topicPartitions: Seq[TopicAndPartition], - getPartitionOffset: TopicAndPartition => Option[Long], - getOwner: TopicAndPartition => Option[String]): Unit = { - topicPartitions - .sortBy { case topicPartition => topicPartition.partition } - .foreach { topicPartition => - describePartition(group, topicPartition.topic, topicPartition.partition, getPartitionOffset(topicPartition), - getOwner(topicPartition)) - } + protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult + + protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]]) + + protected def collectConsumerAssignment(group: String, + coordinator: Option[Node], + topicPartitions: Seq[TopicAndPartition], + getPartitionOffset: TopicAndPartition => Option[Long], + consumerIdOpt: Option[String], + hostOpt: Option[String], + clientIdOpt: Option[String]): Array[PartitionAssignmentState] = { + if (topicPartitions.isEmpty) + Array[PartitionAssignmentState]( + PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None) + ) + else { + var assignmentRows: Array[PartitionAssignmentState] = Array() + topicPartitions + .sortBy(_.partition) + .foreach { topicPartition => + assignmentRows = assignmentRows :+ describePartition(group, coordinator, topicPartition.topic, topicPartition.partition, getPartitionOffset(topicPartition), + consumerIdOpt, hostOpt, clientIdOpt) + } + assignmentRows + } } - protected def printDescribeHeader() { - println("%-30s %-30s %-10s %-15s %-15s %-15s %s".format("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "OWNER")) - } + protected def getLag(offset: Option[Long], logEndOffset: Option[Long]): Option[Long] = + offset.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset)) private def describePartition(group: String, + coordinator: Option[Node], topic: String, partition: Int, offsetOpt: Option[Long], - ownerOpt: Option[String]) { - def print(logEndOffset: Option[Long]): Unit = { - val lag = offsetOpt.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset)) - println("%-30s %-30s %-10s %-15s %-15s %-15s %s".format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset.getOrElse("unknown"), lag.getOrElse("unknown"), ownerOpt.getOrElse("none"))) - } - getLogEndOffset(topic, partition) match { - case LogEndOffsetResult.LogEndOffset(logEndOffset) => print(Some(logEndOffset)) - case LogEndOffsetResult.Unknown => print(None) - case LogEndOffsetResult.Ignore => + consumerIdOpt: Option[String], + hostOpt: Option[String], + clientIdOpt: Option[String]): PartitionAssignmentState = { + def getDescribePartitionResult(logEndOffsetOpt: Option[Long]): PartitionAssignmentState = + PartitionAssignmentState(group, coordinator, Option(topic), Option(partition), offsetOpt, + getLag(offsetOpt, logEndOffsetOpt), consumerIdOpt, hostOpt, + clientIdOpt, logEndOffsetOpt) + + getLogEndOffset(new TopicPartition(topic, partition)) match { + case LogEndOffsetResult.LogEndOffset(logEndOffset) => getDescribePartitionResult(Some(logEndOffset)) + case LogEndOffsetResult.Unknown => getDescribePartitionResult(None) + case LogEndOffsetResult.Ignore => null } } @@ -142,11 +205,11 @@ object ConsumerGroupCommand { zkUtils.close() } - def list() { - zkUtils.getConsumerGroups().foreach(println) + def listGroups(): List[String] = { + zkUtils.getConsumerGroups().toList } - def delete() { + def deleteGroups() { if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) deleteForTopic() else if (opts.options.has(opts.groupOpt)) @@ -155,51 +218,72 @@ object ConsumerGroupCommand { deleteAllForTopic() } - protected def describeGroup(group: String) { + protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = { val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties() val channelSocketTimeoutMs = props.getProperty("channelSocketTimeoutMs", "600").toInt val channelRetryBackoffMs = props.getProperty("channelRetryBackoffMsOpt", "300").toInt + if (!zkUtils.getConsumerGroups().contains(group)) + return (None, None) + val topics = zkUtils.getTopicsByConsumerGroup(group) - if (topics.isEmpty) - println("No topic available for consumer group provided") - printDescribeHeader() - topics.foreach(topic => describeTopic(group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) - } + val topicPartitions = getAllTopicPartitions(topics) + var groupConsumerIds = zkUtils.getConsumersInGroup(group) + + // mapping of topic partition -> consumer id + val consumerIdByTopicPartition = topicPartitions.map { topicPartition => + val owner = zkUtils.readDataMaybeNull(new ZKGroupTopicDirs(group, topicPartition.topic).consumerOwnerDir + "/" + topicPartition.partition)._1 + var consumerId = "" + owner.foreach(o => consumerId = o.substring(0, o.lastIndexOf('-'))) + topicPartition -> consumerId + }.toMap + + // mapping of consumer id -> list of topic partitions + val consumerTopicPartitions = consumerIdByTopicPartition groupBy{_._2} map { + case (key, value) => (key, value.unzip._1.toArray) } + + // mapping of consumer id -> list of subscribed topics + val topicsByConsumerId = zkUtils.getTopicsPerMemberId(group) + + var assignmentRows = topicPartitions.flatMap { topicPartition => + val partitionOffsets = getPartitionOffsets(group, List(topicPartition), channelSocketTimeoutMs, channelRetryBackoffMs) + val consumerId = consumerIdByTopicPartition.get(topicPartition) + // since consumer id is repeated in client id, leave host and client id empty + consumerId.foreach(id => groupConsumerIds = groupConsumerIds.filterNot(_ == id)) + collectConsumerAssignment(group, None, List(topicPartition), partitionOffsets.get, consumerId, None, None) + } - private def describeTopic(group: String, - topic: String, - channelSocketTimeoutMs: Int, - channelRetryBackoffMs: Int) { - val topicPartitions = getTopicPartitions(topic) - val groupDirs = new ZKGroupTopicDirs(group, topic) - val ownerByTopicPartition = topicPartitions.flatMap { topicPartition => - zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + topicPartition.partition)._1.map { owner => - topicPartition -> owner + assignmentRows ++= groupConsumerIds.sortBy(- consumerTopicPartitions.get(_).size).flatMap { consumerId => + topicsByConsumerId(consumerId).flatMap { topic => + // since consumers with no topic partitions are processed here, we pass empty for topic partitions and offsets + // since consumer id is repeated in client id, leave host and client id empty + collectConsumerAssignment(group, None, Array[TopicAndPartition](), Map[TopicAndPartition, Option[Long]](), Some(consumerId), None, None) } - }.toMap - val partitionOffsets = getPartitionOffsets(group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) - describeTopicPartition(group, topicPartitions, partitionOffsets.get, ownerByTopicPartition.get) + } + + (None, Some(assignmentRows)) } - private def getTopicPartitions(topic: String): Seq[TopicAndPartition] = { - val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic)) - val partitions = topicPartitionMap.getOrElse(topic, Seq.empty) - partitions.map(TopicAndPartition(topic, _)) + private def getAllTopicPartitions(topics: Seq[String]): Seq[TopicAndPartition] = { + val topicPartitionMap = zkUtils.getPartitionsForTopics(topics) + topics.flatMap { topic => + val partitions = topicPartitionMap.getOrElse(topic, Seq.empty) + partitions.map(TopicAndPartition(topic, _)) + } } - protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = { - zkUtils.getLeaderForPartition(topic, partition) match { + protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult = { + zkUtils.getLeaderForPartition(topicPartition.topic, topicPartition.partition) match { case Some(-1) => LogEndOffsetResult.Unknown case Some(brokerId) => getZkConsumer(brokerId).map { consumer => - val topicAndPartition = new TopicAndPartition(topic, partition) + val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head consumer.close() LogEndOffsetResult.LogEndOffset(logEndOffset) }.getOrElse(LogEndOffsetResult.Ignore) case None => - println(s"No broker for partition ${new TopicPartition(topic, partition)}") + printError(s"No broker for partition '$topicPartition'") LogEndOffsetResult.Ignore } } @@ -223,15 +307,13 @@ object ConsumerGroupCommand { offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException => - println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper." - .format(group, topicAndPartition)) + printError(s"Could not fetch offset from zookeeper for group '$group' partition '$topicAndPartition' due to missing offset data in zookeeper.", Some(z)) } } else if (offsetAndMetadata.error == Errors.NONE.code) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else - println("Could not fetch offset from kafka for group %s partition %s due to %s." - .format(group, topicAndPartition, Errors.forCode(offsetAndMetadata.error).exception)) + printError(s"Could not fetch offset from kafka for group '$group' partition '$topicAndPartition' due to ${Errors.forCode(offsetAndMetadata.error).exception}.") } channel.disconnect() offsetMap.toMap @@ -242,13 +324,13 @@ object ConsumerGroupCommand { groups.asScala.foreach { group => try { if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group)) - println("Deleted all consumer group information for group %s in zookeeper.".format(group)) + println(s"Deleted all consumer group information for group '$group' in zookeeper.") else - println("Delete for group %s failed because its consumers are still active.".format(group)) + printError(s"Delete for group '$group' failed because its consumers are still active.") } catch { case e: ZkNoNodeException => - println("Delete for group %s failed because group does not exist.".format(group)) + printError(s"Delete for group '$group' failed because group does not exist.", Some(e)) } } } @@ -260,13 +342,13 @@ object ConsumerGroupCommand { groups.asScala.foreach { group => try { if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) - println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic)) + println(s"Deleted consumer group information for group '$group' topic '$topic' in zookeeper.") else - println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic)) + printError(s"Delete for group '$group' topic '$topic' failed because its consumers are still active.") } catch { case e: ZkNoNodeException => - println("Delete for group %s topic %s failed because group does not exist.".format(group, topic)) + printError(s"Delete for group '$group' topic '$topic' failed because group does not exist.", Some(e)) } } } @@ -275,7 +357,7 @@ object ConsumerGroupCommand { val topic = opts.options.valueOf(opts.topicOpt) Topic.validate(topic) AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic) - println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic)) + println(s"Deleted consumer group information for all inactive consumer groups for topic '$topic' in zookeeper.") } private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = { @@ -286,7 +368,7 @@ object ConsumerGroupCommand { .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))) } catch { case t: Throwable => - println("Could not parse broker info due to " + t.getMessage) + printError(s"Could not parse broker info due to ${t.getMessage}", Some(t)) None } } @@ -300,36 +382,39 @@ object ConsumerGroupCommand { // `consumer` is only needed for `describe`, so we instantiate it lazily private var consumer: KafkaConsumer[String, String] = null - def list() { - adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId)) + def listGroups(): List[String] = { + adminClient.listAllConsumerGroupsFlattened().map(_.groupId) } - protected def describeGroup(group: String) { - adminClient.describeConsumerGroup(group) match { - case None => println(s"Consumer group `${group}` does not exist.") - case Some(consumerSummaries) => - if (consumerSummaries.isEmpty) - println(s"Consumer group `${group}` is rebalancing.") - else { - val consumer = getConsumer() - printDescribeHeader() - consumerSummaries.foreach { consumerSummary => - val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) - val partitionOffsets = topicPartitions.flatMap { topicPartition => - Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => - topicPartition -> offsetAndMetadata.offset - } - }.toMap - describeTopicPartition(group, topicPartitions, partitionOffsets.get, - _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}")) + protected def collectGroupAssignment(group: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = { + val consumerGroupSummary = adminClient.describeConsumerGroup(group) + (Some(consumerGroupSummary.state), + consumerGroupSummary.consumers match { + case None => + None + case Some(consumers) => + if (consumers.isEmpty) + Some(Array[PartitionAssignmentState]()) + else { + val consumer = getConsumer() + Some(consumers.sortWith(_.assignment.size > _.assignment.size).flatMap { consumerSummary => + val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) + val partitionOffsets = topicPartitions.flatMap { topicPartition => + Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => + topicPartition -> offsetAndMetadata.offset + } + }.toMap + collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), topicPartitions, + partitionOffsets.get, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"), + Some(s"${consumerSummary.clientId}")) + }) } - } - } + } + ) } - protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = { + protected def getLogEndOffset(topicPartition: TopicPartition): LogEndOffsetResult = { val consumer = getConsumer() - val topicPartition = new TopicPartition(topic, partition) consumer.assign(List(topicPartition).asJava) consumer.seekToEnd(List(topicPartition).asJava) val logEndOffset = consumer.position(topicPartition) @@ -431,14 +516,14 @@ object ConsumerGroupCommand { // check required args if (useOldConsumer) { if (options.has(bootstrapServerOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid with $zkConnectOpt.") + CommandLineUtils.printUsageAndDie(parser, s"Option '$bootstrapServerOpt' is not valid with '$zkConnectOpt'.") else if (options.has(newConsumerOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.") + CommandLineUtils.printUsageAndDie(parser, s"Option '$newConsumerOpt' is not valid with '$zkConnectOpt'.") } else { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) if (options.has(deleteOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is only valid with $zkConnectOpt. Note that " + + CommandLineUtils.printUsageAndDie(parser, s"Option '$deleteOpt' is only valid with '$zkConnectOpt'. Note that " + "there's no need to delete group metadata for the new consumer as the group is deleted when the last " + "committed offset for that group expires.") } diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala index c86c7f8d7b96..6b889f474db8 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala @@ -17,14 +17,12 @@ package kafka.coordinator -import kafka.utils.nonthreadsafe +import collection.mutable import java.util.UUID - import kafka.common.OffsetAndMetadata +import kafka.utils.nonthreadsafe import org.apache.kafka.common.TopicPartition -import collection.mutable - private[coordinator] sealed trait GroupState { def state: Byte } /** diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 7153790ab80b..8392f660b93d 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -90,7 +90,7 @@ public int run(final String[] args, final Properties config) { adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption)); final String groupId = this.options.valueOf(applicationIdOption); - if (!adminClient.describeGroup(groupId).members().isEmpty()) { + if (!adminClient.describeConsumerGroup(groupId).consumers().isEmpty()) { throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + "Make sure to stop all running application instances before running the reset tool."); } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index e5b1b6a89651..80a9f1a67c71 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -653,7 +653,7 @@ class ZkUtils(val zkClient: ZkClient, zkClient.exists(path) } - def getCluster() : Cluster = { + def getCluster(): Cluster = { val cluster = new Cluster val nodes = getChildrenParentMayNotExist(BrokerIdsPath) for (node <- nodes) { @@ -783,7 +783,7 @@ class ZkUtils(val zkClient: ZkClient, getChildren(dirs.consumerRegistryDir) } - def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = { + def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = { val dirs = new ZKGroupDirs(group) val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir) val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]] @@ -802,6 +802,15 @@ class ZkUtils(val zkClient: ZkClient, consumersPerTopicMap } + def getTopicsPerMemberId(group: String, excludeInternalTopics: Boolean = true): Map[String, List[String]] = { + val dirs = new ZKGroupDirs(group) + val memberIds = getChildrenParentMayNotExist(dirs.consumerRegistryDir) + memberIds.map { memberId => + val topicCount = TopicCount.constructTopicCount(group, memberId, this, excludeInternalTopics) + memberId -> topicCount.getTopicCountMap.keys.toList + }.toMap + } + /** * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker * or throws an exception if the broker dies before the query to zookeeper finishes @@ -891,10 +900,10 @@ class ZkUtils(val zkClient: ZkClient, private object ZKStringSerializer extends ZkSerializer { @throws(classOf[ZkMarshallingError]) - def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8") + def serialize(data : Object): Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8") @throws(classOf[ZkMarshallingError]) - def deserialize(bytes : Array[Byte]) : Object = { + def deserialize(bytes : Array[Byte]): Object = { if (bytes == null) null else diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala index 891a72cb402a..ce91a30ae832 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala @@ -66,7 +66,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging { consumers.head.subscribe(List(topic)) TestUtils.waitUntilTrue(() => { consumers.head.poll(0) - !consumers.head.assignment().isEmpty + !consumers.head.assignment.isEmpty }, "Expected non-empty assignment") val groups = client.listAllGroupsFlattened @@ -77,23 +77,22 @@ class AdminClientTest extends IntegrationTestHarness with Logging { } @Test - def testDescribeGroup() { + def testGetConsumerGroupSummary() { consumers.head.subscribe(List(topic)) TestUtils.waitUntilTrue(() => { consumers.head.poll(0) - !consumers.head.assignment().isEmpty + !consumers.head.assignment.isEmpty }, "Expected non-empty assignment") - val group = client.describeGroup(groupId) - assertEquals("consumer", group.protocolType) - assertEquals("range", group.protocol) + val group = client.describeConsumerGroup(groupId) + assertEquals("range", group.assignmentStrategy) assertEquals("Stable", group.state) - assertFalse(group.members.isEmpty) + assertFalse(group.consumers.isEmpty) - val member = group.members.head + val member = group.consumers.get.head assertEquals(clientId, member.clientId) - assertFalse(member.clientHost.isEmpty) - assertFalse(member.memberId.isEmpty) + assertFalse(member.host.isEmpty) + assertFalse(member.consumerId.isEmpty) } @Test @@ -101,17 +100,18 @@ class AdminClientTest extends IntegrationTestHarness with Logging { consumers.head.subscribe(List(topic)) TestUtils.waitUntilTrue(() => { consumers.head.poll(0) - !consumers.head.assignment().isEmpty + !consumers.head.assignment.isEmpty }, "Expected non-empty assignment") - val consumerSummaries = client.describeConsumerGroup(groupId) - assertEquals(1, consumerSummaries.size) - assertEquals(Some(Set(tp, tp2)), consumerSummaries.map(_.head.assignment.toSet)) + val consumerGroupSummary = client.describeConsumerGroup(groupId) + assertEquals(1, consumerGroupSummary.consumers.get.size) + assertEquals(List(tp, tp2), consumerGroupSummary.consumers.get.flatMap(_.assignment)) } @Test def testDescribeConsumerGroupForNonExistentGroup() { val nonExistentGroup = "non" + groupId - assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).isEmpty) + val sum = client.describeConsumerGroup(nonExistentGroup).consumers + assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty) } } diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala new file mode 100644 index 000000000000..3691919941ff --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import java.util.Properties + +import org.easymock.EasyMock +import org.junit.Before +import org.junit.Test + +import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions +import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService +import kafka.consumer.OldConsumer +import kafka.consumer.Whitelist +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils + + +class DescribeConsumerGroupTest extends KafkaServerTestHarness { + + val overridingProps = new Properties() + val topic = "foo" + val topicFilter = new Whitelist(topic) + val group = "test.group" + val props = new Properties + + // configure the servers and clients + override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) + + @Before + override def setUp() { + super.setUp() + + AdminUtils.createTopic(zkUtils, topic, 1, 1) + props.setProperty("group.id", group) + props.setProperty("zookeeper.connect", zkConnect) + } + + @Test + def testDescribeNonExistingGroup() { + // mocks + val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock() + + // stubs + val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group")) + val consumerGroupCommand = new ZkConsumerGroupService(opts) + + // simulation + EasyMock.replay(consumerMock) + + // action/test + TestUtils.waitUntilTrue(() => { + !consumerGroupCommand.describeGroup()._2.isDefined + }, "Expected no rows in describe group results.") + + // cleanup + consumerGroupCommand.close() + consumerMock.stop() + } + + @Test + def testDescribeExistingGroup() { + // mocks + val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock() + + // stubs + val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group)) + val consumerGroupCommand = new ZkConsumerGroupService(opts) + + // simulation + EasyMock.replay(consumerMock) + + // action/test + TestUtils.waitUntilTrue(() => { + val (state, assignments) = consumerGroupCommand.describeGroup() + assignments.isDefined && + assignments.get.filter(_.group == group).size == 1 && + assignments.get.filter(_.group == group).head.consumerId.isDefined + }, "Expected rows and a member id column in describe group results.") + + // cleanup + consumerGroupCommand.close() + consumerMock.stop() + } + + @Test + def testDescribeConsumersWithNoAssignedPartitions() { + // mocks + val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock() + val consumer2Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock() + + // stubs + val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group)) + val consumerGroupCommand = new ZkConsumerGroupService(opts) + + EasyMock.replay(consumer1Mock) + EasyMock.replay(consumer2Mock) + + // action/test + TestUtils.waitUntilTrue(() => { + val (state, assignments) = consumerGroupCommand.describeGroup() + assignments.isDefined && + assignments.get.filter(_.group == group).size == 2 && + assignments.get.filter{ x => x.group == group && x.partition.isDefined}.size == 1 && + assignments.get.filter{ x => x.group == group && !x.partition.isDefined}.size == 1 + }, "Expected rows for consumers with no assigned partitions in describe group results.") + + // cleanup + consumerGroupCommand.close() + consumer1Mock.stop() + consumer2Mock.stop() + } +} diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala new file mode 100644 index 000000000000..f4494c7911fe --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import java.util.Properties + +import org.easymock.EasyMock +import org.junit.Before +import org.junit.Test + +import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions +import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService +import kafka.consumer.OldConsumer +import kafka.consumer.Whitelist +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils + + +class ListConsumerGroupTest extends KafkaServerTestHarness { + + val overridingProps = new Properties() + val topic = "foo" + val topicFilter = new Whitelist(topic) + val group = "test.group" + val props = new Properties + + // configure the servers and clients + override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) + + @Before + override def setUp() { + super.setUp() + + AdminUtils.createTopic(zkUtils, topic, 1, 1) + props.setProperty("group.id", group) + props.setProperty("zookeeper.connect", zkConnect) + } + + @Test + def testListGroupWithNoExistingGroup() { + val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect)) + val consumerGroupCommand = new ZkConsumerGroupService(opts) + assert(consumerGroupCommand.listGroups().isEmpty) + } + + @Test + def testListGroupWithSomeGroups() { + // mocks + val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock() + props.setProperty("group.id", "some.other.group") + val consumer2Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock() + + // stubs + val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect)) + val consumerGroupCommand = new ZkConsumerGroupService(opts) + + // simulation + EasyMock.replay(consumer1Mock) + EasyMock.replay(consumer2Mock) + + // action/test + TestUtils.waitUntilTrue(() => { + val groups = consumerGroupCommand.listGroups() + groups.size == 2 && groups.contains(group) + }, "Expected a different list group results.") + + // cleanup + consumerGroupCommand.close() + consumer1Mock.stop() + consumer2Mock.stop() + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 5847fb1a9c91..ced11094a257 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -300,7 +300,7 @@ private void assertInternalTopicsGotDeleted() { private class WaitUntilConsumerGroupGotClosed implements TestCondition { @Override public boolean conditionMet() { - return adminClient.describeGroup(APP_ID).members().isEmpty(); + return adminClient.describeConsumerGroup(APP_ID).consumers().isEmpty(); } }