diff --git a/mockafka/conumser.py b/mockafka/conumser.py index 1db17a8..9d0eead 100644 --- a/mockafka/conumser.py +++ b/mockafka/conumser.py @@ -68,7 +68,7 @@ def consume(self, num_messages=1, *args, **kwargs) -> list[Message]: for count in range(num_messages): message = self.poll() if message: - consumed_messages.append(self.poll()) + consumed_messages.append(message) return consumed_messages diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 56d3d8a..3da2534 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -48,6 +48,26 @@ def test_close(self): self.assertEqual(self.consumer.consumer_store, {}) self.assertEqual(self.consumer.consume(), []) + def test_consume_batch_without_commit(self): + """ Test correct number of messages inside batch using `consume` method. """ + # GIVEN: + # - 10 messages inside topic + number_of_message = 10 + self.create_topic() + for _ in range(number_of_message): + self.producer.produce( + topic=self.test_topic, partition=0, key="test1", value="test1" + ) + + # WHEN: + # - consumer uses consume method to get a batch of messages + self.consumer.subscribe(topics=[self.test_topic]) + messages = self.consumer.consume(num_messages=number_of_message) + + # THEN: + # - batch of messages has correct count of messages + assert len(messages) == number_of_message + def test_poll_without_commit(self): self.create_topic() self.produce_message()