Skip to content

Commit

Permalink
Do not list or describe non-consumer groups
Browse files Browse the repository at this point in the history
When listing or describing groups via a `ListGroups` or `DescribeGroups`
request, Kafka also returns non-consumer groups, e.g. Kafka Connect
groups. Those should be omitted when listing or describing consumer
groups, just as in the official implementation [1].

[1] https://github.com/apache/kafka/blob/13ffebe2f1012da473189c4fe86fd14b83809962/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L3428
  • Loading branch information
mborst committed May 12, 2021
1 parent f19e423 commit fe71be7
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions kafka/admin/client.py
Expand Up @@ -1018,24 +1018,23 @@ def _describe_consumer_groups_process_response(self, response):
described_groups_field_schema = response_field.array_of
described_group = response.__dict__[response_name][0]
described_group_information_list = []
protocol_type_is_consumer = False
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
if group_information_name == 'protocol_type':
protocol_type = described_group_information
protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
if not (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type):
return None
if isinstance(group_information_field, Array):
member_information_list = []
member_schema = group_information_field.array_of
for members in described_group_information:
member_information = []
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
if protocol_type_is_consumer:
if member_name == 'member_metadata' and member:
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
elif member_name == 'member_assignment' and member:
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
else:
member_information.append(member)
if member_name == 'member_metadata' and member:
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
elif member_name == 'member_assignment' and member:
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
else:
member_information.append(member)
member_info_tuple = MemberInformation._make(member_information)
member_information_list.append(member_info_tuple)
described_group_information_list.append(member_information_list)
Expand Down Expand Up @@ -1100,7 +1099,8 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include
for future in futures:
response = future.value
group_description = self._describe_consumer_groups_process_response(response)
group_descriptions.append(group_description)
if group_description:
return group_descriptions.append(group_description)

return group_descriptions

Expand Down Expand Up @@ -1131,7 +1131,18 @@ def _list_consumer_groups_process_response(self, response):
raise NotImplementedError(
"Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))
return response.groups

groups_index = response.SCHEMA.names.index('groups')
groups_schema = response.SCHEMA.fields[groups_index].array_of
protocol_type_index = groups_schema.names.index('protocol_type')

def filter_non_consumers(group):
return not (
group[protocol_type_index]
and
group[protocol_type_index] != ConsumerProtocol.PROTOCOL_TYPE
)
return list(filter(filter_non_consumers, response.groups))

def list_consumer_groups(self, broker_ids=None):
"""List all consumer groups known to the cluster.
Expand Down

0 comments on commit fe71be7

Please sign in to comment.