Skip to content

Commit

Permalink
KAFKA-3144; Report members with no assigned partitions in ConsumerGro…
Browse files Browse the repository at this point in the history
…upCommand

This PR makes a couple of enhancements to the `--describe` option of `ConsumerGroupCommand`:
1. Listing members with no assigned partitions.
2. Showing the member id along with the owner of each partition (owner is supposed to be the logical application id and all members in the same group are supposed to set the same owner).
3. Printing a warning indicating whether ZooKeeper based or new consumer API based information is being reported.

It also adds unit tests to verify the added functionality.

Note: The third request on the corresponding JIRA (listing active offsets for empty groups of new consumers) is not implemented as part of this PR, and has been moved to its own JIRA (KAFKA-3853).

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #1336 from vahidhashemian/KAFKA-3144
  • Loading branch information
vahidhashemian authored and hachikuji committed Oct 22, 2016
1 parent 74014af commit 7afdad8
Show file tree
Hide file tree
Showing 9 changed files with 463 additions and 162 deletions.
56 changes: 25 additions & 31 deletions core/src/main/scala/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger

import kafka.common.KafkaException
import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
import kafka.coordinator.{GroupOverview, MemberSummary}
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
Expand Down Expand Up @@ -121,44 +121,38 @@ class AdminClient(val time: Time,
listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
}

def describeGroup(groupId: String): GroupSummary = {
/**
* Case class used to represent a consumer of a consumer group
*/
case class ConsumerSummary(consumerId: String,
clientId: String,
host: String,
assignment: List[TopicPartition])

/**
* Case class used to represent group metadata (including the group coordinator) for the DescribeGroup API
*/
case class ConsumerGroupSummary(state: String,
assignmentStrategy: String,
consumers: Option[List[ConsumerSummary]],
coordinator: Node)

def describeConsumerGroup(groupId: String): ConsumerGroupSummary = {
val coordinator = findCoordinator(groupId)
val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava))
val response = new DescribeGroupsResponse(responseBody)
val metadata = response.groups().get(groupId)
val metadata = response.groups.get(groupId)
if (metadata == null)
throw new KafkaException(s"Response from broker contained no metadata for group $groupId")
if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group")

Errors.forCode(metadata.errorCode()).maybeThrow()
val members = metadata.members().map { member =>
val metadata = Utils.readBytes(member.memberMetadata())
val assignment = Utils.readBytes(member.memberAssignment())
MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment)
val consumers = metadata.members.map { consumer =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment)))
ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, assignment.partitions.toList)
}.toList
GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
}

case class ConsumerSummary(memberId: String,
clientId: String,
clientHost: String,
assignment: List[TopicPartition])

def describeConsumerGroup(groupId: String): Option[List[ConsumerSummary]] = {
val group = describeGroup(groupId)
if (group.state == "Dead")
return None

if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"Group $groupId with protocol type '${group.protocolType}' is not a valid consumer group")

if (group.state == "Stable") {
Some(group.members.map { member =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList)
})
} else {
Some(List.empty)
}
ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
}

def close() {
Expand Down
297 changes: 191 additions & 106 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions core/src/main/scala/kafka/coordinator/GroupMetadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package kafka.coordinator

import kafka.utils.nonthreadsafe
import collection.mutable
import java.util.UUID

import kafka.common.OffsetAndMetadata
import kafka.utils.nonthreadsafe
import org.apache.kafka.common.TopicPartition

import collection.mutable

private[coordinator] sealed trait GroupState { def state: Byte }

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/StreamsResetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public int run(final String[] args, final Properties config) {

adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
final String groupId = this.options.valueOf(applicationIdOption);
if (!adminClient.describeGroup(groupId).members().isEmpty()) {
if (!adminClient.describeConsumerGroup(groupId).consumers().isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " +
"Make sure to stop all running application instances before running the reset tool.");
}
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ class ZkUtils(val zkClient: ZkClient,
zkClient.exists(path)
}

def getCluster() : Cluster = {
def getCluster(): Cluster = {
val cluster = new Cluster
val nodes = getChildrenParentMayNotExist(BrokerIdsPath)
for (node <- nodes) {
Expand Down Expand Up @@ -783,7 +783,7 @@ class ZkUtils(val zkClient: ZkClient,
getChildren(dirs.consumerRegistryDir)
}

def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = {
def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = {
val dirs = new ZKGroupDirs(group)
val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]
Expand All @@ -802,6 +802,15 @@ class ZkUtils(val zkClient: ZkClient,
consumersPerTopicMap
}

def getTopicsPerMemberId(group: String, excludeInternalTopics: Boolean = true): Map[String, List[String]] = {
val dirs = new ZKGroupDirs(group)
val memberIds = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
memberIds.map { memberId =>
val topicCount = TopicCount.constructTopicCount(group, memberId, this, excludeInternalTopics)
memberId -> topicCount.getTopicCountMap.keys.toList
}.toMap
}

/**
* This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
* or throws an exception if the broker dies before the query to zookeeper finishes
Expand Down Expand Up @@ -891,10 +900,10 @@ class ZkUtils(val zkClient: ZkClient,
private object ZKStringSerializer extends ZkSerializer {

@throws(classOf[ZkMarshallingError])
def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
def serialize(data : Object): Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")

@throws(classOf[ZkMarshallingError])
def deserialize(bytes : Array[Byte]) : Object = {
def deserialize(bytes : Array[Byte]): Object = {
if (bytes == null)
null
else
Expand Down
30 changes: 15 additions & 15 deletions core/src/test/scala/integration/kafka/api/AdminClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
consumers.head.subscribe(List(topic))
TestUtils.waitUntilTrue(() => {
consumers.head.poll(0)
!consumers.head.assignment().isEmpty
!consumers.head.assignment.isEmpty
}, "Expected non-empty assignment")

val groups = client.listAllGroupsFlattened
Expand All @@ -77,41 +77,41 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
}

@Test
def testDescribeGroup() {
def testGetConsumerGroupSummary() {
consumers.head.subscribe(List(topic))
TestUtils.waitUntilTrue(() => {
consumers.head.poll(0)
!consumers.head.assignment().isEmpty
!consumers.head.assignment.isEmpty
}, "Expected non-empty assignment")

val group = client.describeGroup(groupId)
assertEquals("consumer", group.protocolType)
assertEquals("range", group.protocol)
val group = client.describeConsumerGroup(groupId)
assertEquals("range", group.assignmentStrategy)
assertEquals("Stable", group.state)
assertFalse(group.members.isEmpty)
assertFalse(group.consumers.isEmpty)

val member = group.members.head
val member = group.consumers.get.head
assertEquals(clientId, member.clientId)
assertFalse(member.clientHost.isEmpty)
assertFalse(member.memberId.isEmpty)
assertFalse(member.host.isEmpty)
assertFalse(member.consumerId.isEmpty)
}

@Test
def testDescribeConsumerGroup() {
consumers.head.subscribe(List(topic))
TestUtils.waitUntilTrue(() => {
consumers.head.poll(0)
!consumers.head.assignment().isEmpty
!consumers.head.assignment.isEmpty
}, "Expected non-empty assignment")

val consumerSummaries = client.describeConsumerGroup(groupId)
assertEquals(1, consumerSummaries.size)
assertEquals(Some(Set(tp, tp2)), consumerSummaries.map(_.head.assignment.toSet))
val consumerGroupSummary = client.describeConsumerGroup(groupId)
assertEquals(1, consumerGroupSummary.consumers.get.size)
assertEquals(List(tp, tp2), consumerGroupSummary.consumers.get.flatMap(_.assignment))
}

@Test
def testDescribeConsumerGroupForNonExistentGroup() {
val nonExistentGroup = "non" + groupId
assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).isEmpty)
val sum = client.describeConsumerGroup(nonExistentGroup).consumers
assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
}
}
128 changes: 128 additions & 0 deletions core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin

import java.util.Properties

import org.easymock.EasyMock
import org.junit.Before
import org.junit.Test

import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
import kafka.consumer.OldConsumer
import kafka.consumer.Whitelist
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils


class DescribeConsumerGroupTest extends KafkaServerTestHarness {

val overridingProps = new Properties()
val topic = "foo"
val topicFilter = new Whitelist(topic)
val group = "test.group"
val props = new Properties

// configure the servers and clients
override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))

@Before
override def setUp() {
super.setUp()

AdminUtils.createTopic(zkUtils, topic, 1, 1)
props.setProperty("group.id", group)
props.setProperty("zookeeper.connect", zkConnect)
}

@Test
def testDescribeNonExistingGroup() {
// mocks
val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()

// stubs
val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group"))
val consumerGroupCommand = new ZkConsumerGroupService(opts)

// simulation
EasyMock.replay(consumerMock)

// action/test
TestUtils.waitUntilTrue(() => {
!consumerGroupCommand.describeGroup()._2.isDefined
}, "Expected no rows in describe group results.")

// cleanup
consumerGroupCommand.close()
consumerMock.stop()
}

@Test
def testDescribeExistingGroup() {
// mocks
val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()

// stubs
val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
val consumerGroupCommand = new ZkConsumerGroupService(opts)

// simulation
EasyMock.replay(consumerMock)

// action/test
TestUtils.waitUntilTrue(() => {
val (state, assignments) = consumerGroupCommand.describeGroup()
assignments.isDefined &&
assignments.get.filter(_.group == group).size == 1 &&
assignments.get.filter(_.group == group).head.consumerId.isDefined
}, "Expected rows and a member id column in describe group results.")

// cleanup
consumerGroupCommand.close()
consumerMock.stop()
}

@Test
def testDescribeConsumersWithNoAssignedPartitions() {
// mocks
val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
val consumer2Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()

// stubs
val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
val consumerGroupCommand = new ZkConsumerGroupService(opts)

EasyMock.replay(consumer1Mock)
EasyMock.replay(consumer2Mock)

// action/test
TestUtils.waitUntilTrue(() => {
val (state, assignments) = consumerGroupCommand.describeGroup()
assignments.isDefined &&
assignments.get.filter(_.group == group).size == 2 &&
assignments.get.filter{ x => x.group == group && x.partition.isDefined}.size == 1 &&
assignments.get.filter{ x => x.group == group && !x.partition.isDefined}.size == 1
}, "Expected rows for consumers with no assigned partitions in describe group results.")

// cleanup
consumerGroupCommand.close()
consumer1Mock.stop()
consumer2Mock.stop()
}
}

0 comments on commit 7afdad8

Please sign in to comment.