Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #911 from Parsely/bugfix/producer_unknowntopicorpa…
Browse files Browse the repository at this point in the history
…rtition

handle UnknownTopicOrPartition in the producer
  • Loading branch information
Emmett J. Butler committed Feb 4, 2019
2 parents 8b71d9a + 9f4b018 commit dd4ae69
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
ProducerQueueFullError,
ProducerStoppedException,
SocketDisconnectedError,
UnknownTopicOrPartition,
)
from .partitioners import RandomPartitioner
from .protocol import Message, ProduceRequest
Expand Down Expand Up @@ -541,7 +542,10 @@ def _get_partition_msgs(partition_id, req):
message.offset = presponse.offset + i
self._mark_as_delivered(owned_broker, messages, req)
continue # All's well
if presponse.err == NotLeaderForPartition.ERROR_CODE:
if presponse.err in (
NotLeaderForPartition.ERROR_CODE,
UnknownTopicOrPartition.ERROR_CODE,
):
# Update cluster metadata to get new leader
self._update()
info = "Produce request for {}/{} to {}:{} failed with error code {}.".format(
Expand Down

0 comments on commit dd4ae69

Please sign in to comment.