Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #757 from Parsely/feature/metadata_request_versions
Browse files Browse the repository at this point in the history
Metadata request version support
  • Loading branch information
Emmett J. Butler committed Dec 14, 2017
2 parents 9fbacf4 + 5a31aeb commit db568a7
Show file tree
Hide file tree
Showing 4 changed files with 679 additions and 32 deletions.
7 changes: 5 additions & 2 deletions pykafka/broker.py
Expand Up @@ -355,15 +355,18 @@ def request_metadata(self, topics=None):
:param topics: The topic names for which to request metadata
:type topics: Iterable of `bytes`
"""
request_class = MetadataRequest.get_version_impl(self._api_versions)
response_class = MetadataResponse.get_version_impl(self._api_versions)

max_retries = 3
for i in range(max_retries):
if i > 0:
log.debug("Retrying")
time.sleep(i)

try:
future = self._req_handler.request(MetadataRequest(topics=topics))
response = future.get(MetadataResponse)
future = self._req_handler.request(request_class(topics=topics))
response = future.get(response_class)
except SocketDisconnectedError:
log.warning("Encountered SocketDisconnectedError while requesting "
"metadata from broker %s:%s. Continuing.",
Expand Down
12 changes: 10 additions & 2 deletions pykafka/cluster.py
Expand Up @@ -205,6 +205,7 @@ def __init__(self,
self._max_connection_retries_offset_mgr = 8
self._broker_version = broker_version
self._api_versions = None
self.controller_broker = None
if ':' in self._source_address:
self._source_port = int(self._source_address.split(':')[1])
self.fetch_api_versions()
Expand Down Expand Up @@ -357,12 +358,14 @@ def _get_brokers_from_zookeeper(self, zk_connect):
log.exception(e)
return []

def _update_brokers(self, broker_metadata):
def _update_brokers(self, broker_metadata, controller_id):
"""Update brokers with fresh metadata.
:param broker_metadata: Metadata for all brokers.
:type broker_metadata: Dict of `{name: metadata}` where `metadata` is
:class:`pykafka.protocol.BrokerMetadata` and `name` is str.
:param controller_id: The ID of the cluster's controller broker, if applicable
:type controller_id: int
"""
# FIXME: A cluster with no topics returns no brokers in metadata
# Remove old brokers
Expand Down Expand Up @@ -404,6 +407,11 @@ def _update_brokers(self, broker_metadata):
# Figure out and implement update/disconnect/reconnect if
# needed.
raise Exception('Broker host/port change detected! %s', broker)
if controller_id is not None:
if controller_id not in self._brokers:
raise KeyError("Controller ID {} not present in cluster".format(
controller_id))
self.controller_broker = self._brokers[controller_id]

def get_managed_group_descriptions(self):
"""Return detailed descriptions of all managed consumer groups on this cluster
Expand Down Expand Up @@ -495,7 +503,7 @@ def update(self):
'a topic in the cluster. See '
'https://issues.apache.org/jira/browse/KAFKA-2154 '
'for information.')
self._update_brokers(metadata.brokers)
self._update_brokers(metadata.brokers, metadata.controller_id)
try:
self._topics._update_topics(metadata.topics)
except LeaderNotFoundError:
Expand Down

0 comments on commit db568a7

Please sign in to comment.