Skip to content

Commit

Permalink
Merge d270520 into 209515b
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerLubeck committed Feb 6, 2020
2 parents 209515b + d270520 commit e784aa7
Showing 1 changed file with 40 additions and 6 deletions.
46 changes: 40 additions & 6 deletions kafka/admin/client.py
Expand Up @@ -472,14 +472,48 @@ def delete_topics(self, topics, timeout_ms=None):
.format(version))
return response

# list topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()

# describe topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the controller
def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
"""
topics == None means "get all topics"
"""
version = self._matching_api_version(MetadataRequest)
if version <= 3:
if auto_topic_creation:
raise IncompatibleBrokerVersion(
"auto_topic_creation requires MetadataRequest >= v4, which"
" is not supported by Kafka {}"
.format(self.config['api_version']))

# describe cluster functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()
request = MetadataRequest[version](topics=topics)
elif version <= 5:
request = MetadataRequest[version](
topics=topics,
allow_auto_topic_creation=auto_topic_creation
)

future = self._send_request_to_node(
self._client.least_loaded_node(),
request
)
self._wait_for_futures([future])
return future.value

def list_topics(self):
metadata = self._get_cluster_metadata(topics=None)
obj = metadata.to_object()
return [t['topic'] for t in obj['topics']]

def describe_topics(self, topics=None):
metadata = self._get_cluster_metadata(topics=topics)
obj = metadata.to_object()
return obj['topics']

def describe_cluster(self):
metadata = self._get_cluster_metadata()
obj = metadata.to_object()
obj.pop('topics') # We have 'describe_topics' for this
return obj

@staticmethod
def _convert_describe_acls_response_to_acls(describe_response):
Expand Down

0 comments on commit e784aa7

Please sign in to comment.