Skip to content

Commit

Permalink
refine bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
reAsOn2010 committed Jun 8, 2015
1 parent b6040a5 commit cb6d561
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
2 changes: 2 additions & 0 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ def _iter_broker_errors():


def check_error(response):
if isinstance(response, Exception):
raise response
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)
Expand Down
8 changes: 3 additions & 5 deletions kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,6 @@ def _fetch(self):
for resp in responses:

try:
if isinstance(resp, Exception):
raise resp
check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
self.client.reset_topic_metadata(resp.topic)
Expand All @@ -357,12 +355,12 @@ def _fetch(self):
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
except FailedPayloadsError:
except FailedPayloadsError as e:
log.warning("Failed payloads of %s"
"Resetting partition offset...",
resp.failed_payloads)
e.payload)
# Retry this partition
retry_partitions[resp.failed_payloads.partition] = partitions[resp.failed_payloads.partition]
retry_partitions[e.payload.partition] = partitions[e.payload.partition]
continue

partition = resp.partition
Expand Down
8 changes: 8 additions & 0 deletions test/test_failover_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ def test_switch_leader_keyed_producer(self):
msg = random_string(10).encode('utf-8')
producer.send_messages(topic, key, msg)

@kafka_versions("all")
def test_switch_leader_simple_consumer(self):
producer = Producer(self.client, async=True)
consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
self._send_random_messages(producer, self.topic, 0, 2)
consumer.get_messages()
self._kill_leader(self.topic, 0)
consumer.get_messages()

def _send_random_messages(self, producer, topic, partition, n):
for j in range(n):
Expand Down

0 comments on commit cb6d561

Please sign in to comment.