From c4987626df02adcb1b2dd49c11f3032995921a31 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 29 Sep 2015 16:23:54 -0700 Subject: [PATCH 1/2] disallow consume() on a stopped consumer --- pykafka/simpleconsumer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 4443e342b..e2b6c5cd1 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -307,6 +307,8 @@ def consume(self, block=True): if self._messages_arrived.acquire(blocking=block, timeout=timeout): # by passing through this semaphore, we know that at # least one message is waiting in some queue. + if not self._running: + raise ConsumerStoppedException() message = None while not message: owned_partition = next(self.partition_cycle) @@ -315,7 +317,7 @@ def consume(self, block=True): else: if not self._running: raise ConsumerStoppedException() - elif not block or self._consumer_timeout_ms > 0: + if not block or self._consumer_timeout_ms > 0: return None def _auto_commit(self): From 0893a5729a41f226bad423a126b57bd90c6ded5d Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 29 Sep 2015 16:26:37 -0700 Subject: [PATCH 2/2] typo --- pykafka/simpleconsumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index e2b6c5cd1..3993997d0 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -317,7 +317,7 @@ def consume(self, block=True): else: if not self._running: raise ConsumerStoppedException() - if not block or self._consumer_timeout_ms > 0: + elif not block or self._consumer_timeout_ms > 0: return None def _auto_commit(self):