From 7e92c06f997b0ee16d8461a6eb5b80a5afd141b1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 30 Mar 2015 17:51:36 -0700 Subject: [PATCH] Honor get_partition_info in consumer._get_message on timeout / queue empty --- kafka/consumer/simple.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index ae00dab5f..541e46fe8 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -277,6 +277,9 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None, If get_partition_info is True, returns (partition, message) If get_partition_info is False, returns message """ + if get_partition_info is None: + get_partition_info = self.partition_info + if self.queue.empty(): # We're out of messages, go grab some more. with FetchContext(self, block, timeout): @@ -292,14 +295,16 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None, self.count_since_commit += 1 self._auto_commit() - if get_partition_info is None: - get_partition_info = self.partition_info if get_partition_info: return partition, message else: return message + except Empty: - return None + if get_partition_info: + return None, None + else: + return None def __iter__(self): if self.iter_timeout is None: