Skip to content

Commit

Permalink
Add ignore_leadernotavailable kwarg to SimpleClient.load_metadata_for…
Browse files Browse the repository at this point in the history
…_topics
  • Loading branch information
dpkp committed Mar 13, 2016
1 parent 4f43366 commit 50bad52
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
22 changes: 13 additions & 9 deletions kafka/client.py
Expand Up @@ -450,17 +450,10 @@ def ensure_topic_exists(self, topic, timeout = 30):
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
try:
self.load_metadata_for_topics(topic)
except LeaderNotAvailableError:
pass
except UnknownTopicOrPartitionError:
# Server is not configured to auto-create
# retrying in this case will not help
raise
self.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
time.sleep(.5)

def load_metadata_for_topics(self, *topics):
def load_metadata_for_topics(self, *topics, **kwargs):
"""Fetch broker and topic-partition metadata from the server.
Updates internal data: broker list, topic/partition list, and
Expand All @@ -476,6 +469,9 @@ def load_metadata_for_topics(self, *topics):
*topics (optional): If a list of topics is provided,
the metadata refresh will be limited to the specified topics
only.
ignore_leadernotavailable (bool): suppress LeaderNotAvailableError
so that metadata is loaded correctly during auto-create.
Default: False.
Raises:
UnknownTopicOrPartitionError: Raised for topics that do not exist,
Expand All @@ -484,6 +480,11 @@ def load_metadata_for_topics(self, *topics):
when the broker is configured to auto-create topics. Retry
after a short backoff (topics/partitions are initializing).
"""
if 'ignore_leadernotavailable' in kwargs:
ignore_leadernotavailable = kwargs['ignore_leadernotavailable']
else:
ignore_leadernotavailable = False

if topics:
self.reset_topic_metadata(*topics)
else:
Expand All @@ -506,6 +507,9 @@ def load_metadata_for_topics(self, *topics):
topic, error_type, error)
if topic not in topics:
continue
elif (error_type is LeaderNotAvailableError and
ignore_leadernotavailable):
continue
raise error_type(topic)

self.topic_partitions[topic] = {}
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/base.py
Expand Up @@ -53,7 +53,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
self.client = client
self.topic = topic
self.group = group
self.client.load_metadata_for_topics(topic)
self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
self.offsets = {}

if partitions is None:
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/keyed.py
Expand Up @@ -29,7 +29,7 @@ def __init__(self, *args, **kwargs):
def _next_partition(self, topic, key):
if topic not in self.partitioners:
if not self.client.has_metadata_for_topic(topic):
self.client.load_metadata_for_topics(topic)
self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)

self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))

Expand Down

0 comments on commit 50bad52

Please sign in to comment.