Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 18 additions & 31 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ def __init__(self, **configs):
metric_group_prefix='admin',
**self.config
)
# Goal: migrate all self._client calls -> self._manager (skipping compat layer)
self._manager = self._client._manager

# Get auto-discovered version from client if necessary
self.config['api_version'] = self._client.get_broker_version(timeout_ms=self.config['api_version_auto_timeout_ms'])
Expand Down Expand Up @@ -607,33 +609,18 @@ def _process_acl_operations(self, obj):
obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations'])))
return obj

def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
"""
topics == None means "get all topics"
"""
version = self._client.api_version(MetadataRequest, max_version=8)
if version <= 3:
if auto_topic_creation:
raise IncompatibleBrokerVersion(
"auto_topic_creation requires MetadataRequest >= v4, which"
" is not supported by Kafka {}"
.format(self.config['api_version']))

request = MetadataRequest[version](topics=topics)
elif version <= 7:
request = MetadataRequest[version](
topics=topics,
allow_auto_topic_creation=auto_topic_creation
)
else:
request = MetadataRequest[version](
topics=topics,
allow_auto_topic_creation=auto_topic_creation,
include_cluster_authorized_operations=True,
include_topic_authorized_operations=True,
)

metadata = self.send_request(request).to_dict()
async def _get_cluster_metadata(self, topics):
"""topics = [] for no topics, None for all."""
request = MetadataRequest(
topics=[
MetadataRequest.MetadataRequestTopic(name=topic)
for topic in topics] if topics is not None else None,
allow_auto_topic_creation=False,
include_cluster_authorized_operations=True,
include_topic_authorized_operations=True,
)
response = await self._manager.send(request)
metadata = response.to_dict()
self._process_acl_operations(metadata)
for topic in metadata['topics']:
self._process_acl_operations(topic)
Expand All @@ -645,7 +632,7 @@ def list_topics(self):
Returns:
A list of topic name strings.
"""
metadata = self._get_cluster_metadata(topics=None)
metadata = self._manager.run(self._get_cluster_metadata, None) # None => request all topics
return [t['name'] for t in metadata['topics']]

def describe_topics(self, topics=None):
Expand All @@ -658,7 +645,7 @@ def describe_topics(self, topics=None):
Returns:
A list of dicts describing each topic (including partition info).
"""
metadata = self._get_cluster_metadata(topics=topics)
metadata = self._manager.run(self._get_cluster_metadata, topics)
return metadata['topics']

def describe_cluster(self):
Expand All @@ -668,7 +655,7 @@ def describe_cluster(self):
Returns:
A dict with cluster-wide metadata, excluding topic details.
"""
metadata = self._get_cluster_metadata()
metadata = self._manager.run(self._get_cluster_metadata, []) # [] => no topics
metadata.pop('topics') # We have 'describe_topics' for this
return metadata

Expand Down Expand Up @@ -1174,7 +1161,7 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None):
partitions = set(partitions)
topics = set(tp.topic for tp in partitions)

metadata = self._get_cluster_metadata(topics=topics)
metadata = self._manager.run(self._get_cluster_metadata, topics)

leader2partitions = defaultdict(list)
valid_partitions = set()
Expand Down
Loading