From be67a8d25a3609f76a4559aaf7f21ee77e9ecf00 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 6 Nov 2015 14:34:38 -0800 Subject: [PATCH 1/4] KAFKA-2768: New-consumer sends invalid describeGroupResponse while restabilizing --- core/src/main/scala/kafka/admin/AdminClient.scala | 12 ++++++++---- .../scala/kafka/admin/ConsumerGroupCommand.scala | 12 ++++++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index ff1d3fee4c5b..aeb6f7a3062f 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -161,10 +161,14 @@ class AdminClient(val time: Time, if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group") - group.members.map { - case member => - val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) - new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList) + if (group.state == "Stable") { + group.members.map { + case member => + val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) + new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList) + } + } else { + List.empty } } } diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index b682812a6bd3..3c2fedab992b 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -92,9 +92,6 @@ object ConsumerGroupCommand { val configs = parseConfigs(opts) val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt - def warnNoTopicsForGroupFound: Unit = { - println("No topic available for consumer group provided") - } println("%s, %s, %s, %s, %s, %s, %s" .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) @@ -102,14 +99,17 @@ object ConsumerGroupCommand { if (!useNewConsumer) { val topics = zkUtils.getTopicsByConsumerGroup(group) if (topics.isEmpty) { - warnNoTopicsForGroupFound + { + println("No topic available for consumer group provided") + } } topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) } else { val consumers = createAndGetAdminClient(opts).describeConsumerGroup(group) - if (consumers.isEmpty) - warnNoTopicsForGroupFound + if (consumers.isEmpty) { + println(s"Consumer group, ${group}, does not exist or is rebalancing.") + } consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), Option("%s_%s".format(x.clientId, x.clientHost)))) } } From 7d795ffd01b83041ab012abb8bd5e92666ed1c03 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 12 Nov 2015 12:34:02 -0800 Subject: [PATCH 2/4] Address review comments --- core/src/main/scala/kafka/admin/AdminClient.scala | 6 ++---- core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala | 4 +--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index aeb6f7a3062f..2752d1f311d9 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -138,8 +138,7 @@ class AdminClient(val time: Time, throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}") Errors.forCode(metadata.errorCode()).maybeThrow() - val members = metadata.members().map { - case member => + val members = metadata.members().map { member => val metadata = Utils.readBytes(member.memberMetadata()) val assignment = Utils.readBytes(member.memberAssignment()) MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment) @@ -162,8 +161,7 @@ class AdminClient(val time: Time, throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group") if (group.state == "Stable") { - group.members.map { - case member => + group.members.map { member => val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList) } diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 3c2fedab992b..778f60d49cb4 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -99,9 +99,7 @@ object ConsumerGroupCommand { if (!useNewConsumer) { val topics = zkUtils.getTopicsByConsumerGroup(group) if (topics.isEmpty) { - { - println("No topic available for consumer group provided") - } + println("No topic available for consumer group provided") } topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) } else { From 0d7ddb0bdd81abafc229c1d1c3e56eda5051a5b9 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 12 Nov 2015 12:39:17 -0800 Subject: [PATCH 3/4] Fix indentation --- core/src/main/scala/kafka/admin/AdminClient.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 2752d1f311d9..1dea28b11c6d 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -139,9 +139,9 @@ class AdminClient(val time: Time, Errors.forCode(metadata.errorCode()).maybeThrow() val members = metadata.members().map { member => - val metadata = Utils.readBytes(member.memberMetadata()) - val assignment = Utils.readBytes(member.memberAssignment()) - MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment) + val metadata = Utils.readBytes(member.memberMetadata()) + val assignment = Utils.readBytes(member.memberAssignment()) + MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment) }.toList GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members) } @@ -162,8 +162,8 @@ class AdminClient(val time: Time, if (group.state == "Stable") { group.members.map { member => - val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) - new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList) + val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) + new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList) } } else { List.empty From 228eef0f2f16ea866c78343f8fba05a81d0d4275 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 12 Nov 2015 16:49:02 -0800 Subject: [PATCH 4/4] Address review comments --- core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 778f60d49cb4..c29efe434f50 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -100,15 +100,17 @@ object ConsumerGroupCommand { val topics = zkUtils.getTopicsByConsumerGroup(group) if (topics.isEmpty) { println("No topic available for consumer group provided") + } else { + topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) } - topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) } else { val consumers = createAndGetAdminClient(opts).describeConsumerGroup(group) if (consumers.isEmpty) { println(s"Consumer group, ${group}, does not exist or is rebalancing.") + } else { + consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), Option("%s_%s".format(x.clientId, x.clientHost)))) } - consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), Option("%s_%s".format(x.clientId, x.clientHost)))) } }