diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 4443e342b..3993997d0 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -307,6 +307,8 @@ def consume(self, block=True): if self._messages_arrived.acquire(blocking=block, timeout=timeout): # by passing through this semaphore, we know that at # least one message is waiting in some queue. + if not self._running: + raise ConsumerStoppedException() message = None while not message: owned_partition = next(self.partition_cycle)