Skip to content

Commit

Permalink
try to fix uncaught FailedPayloadsError
Browse files Browse the repository at this point in the history
  • Loading branch information
reAsOn2010 committed Jun 8, 2015
1 parent b1aad92 commit 945fc04
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 1 deletion.
2 changes: 2 additions & 0 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ def _iter_broker_errors():


def check_error(response):
if isinstance(response, Exception):
raise response
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)
Expand Down
9 changes: 8 additions & 1 deletion kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
FetchRequest, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, check_error
OffsetOutOfRangeError, FailedPayloadsError, check_error
)
from .base import (
Consumer,
Expand Down Expand Up @@ -355,6 +355,13 @@ def _fetch(self):
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
except FailedPayloadsError as e:
log.warning("Failed payloads of %s"
"Resetting partition offset...",
e.payload)
# Retry this partition
retry_partitions[e.payload.partition] = partitions[e.payload.partition]
continue

partition = resp.partition
buffer_size = partitions[partition]
Expand Down
8 changes: 8 additions & 0 deletions test/test_failover_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ def test_switch_leader_keyed_producer(self):
msg = random_string(10).encode('utf-8')
producer.send_messages(topic, key, msg)

@kafka_versions("all")
def test_switch_leader_simple_consumer(self):
producer = Producer(self.client, async=False)
consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
self._send_random_messages(producer, self.topic, 0, 2)
consumer.get_messages()
self._kill_leader(self.topic, 0)
consumer.get_messages()

def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
Expand Down

0 comments on commit 945fc04

Please sign in to comment.