Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #819 from lokesh-b/bugfix/producer_cpu_spike
Browse files Browse the repository at this point in the history
Issue-816 : Fix for CPU issue seen in _produce because of metadata …
  • Loading branch information
Emmett J. Butler committed Jun 20, 2018
2 parents d94bc46 + 89eded6 commit e61447b
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,17 +434,25 @@ def get_delivery_report(self, block=True, timeout=None):

def _produce(self, message):
"""Enqueue a message for the relevant broker
Attempts to update metadata in response to missing brokers.
:param message: Message with valid `partition_id`, ready to be sent
:type message: `pykafka.protocol.Message`
"""
success = False
retry = 0
while not success:
leader_id = self._topic.partitions[message.partition_id].leader.id
if leader_id in self._owned_brokers:
self._owned_brokers[leader_id].enqueue(message)
success = True
else:
retry += 1
if retry < 10:
log.debug("Failed to enqueue produced message. Updating metdata.")
self._update()
else:
raise ProduceFailureError("Message could not be enqueued due to missing broker "
"metadata for broker {}".format(leader_id))
success = False

def _mark_as_delivered(self, owned_broker, message_batch, req):
Expand Down

0 comments on commit e61447b

Please sign in to comment.