diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 57907b7f7..812069305 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -497,9 +497,6 @@ def _handle_success(parts): partition.flush() by_leader[partition.partition.leader].append((partition, offset)) - # reset this dict to prepare it for next retry - owned_partition_offsets = {} - # get valid offset ranges for each partition for broker, offsets in by_leader.iteritems(): reqs = [owned_partition.build_offset_request(offset) @@ -511,7 +508,11 @@ def _handle_success(parts): success_handler=_handle_success, partitions_by_id=self._partitions_by_id) - if len(parts_by_error) == 1 and 0 in parts_by_error: + if 0 in parts_by_error: + # drop successfully reset partitions for next retry + successful = [part for part, _ in parts_by_error.pop(0)] + map(owned_partition_offsets.pop, successful) + if not parts_by_error: continue log.error("Error resetting offsets for topic %s (errors: %s)", self._topic.name, @@ -520,24 +521,16 @@ def _handle_success(parts): time.sleep(i * (self._offsets_channel_backoff_ms / 1000)) - if 0 in parts_by_error: - parts_by_error.pop(0) - errored_partitions = { - part: owned_partition_offsets[part] - for errcode, parts in parts_by_error.iteritems() - for part, _ in parts} - owned_partition_offsets.update(errored_partitions) - - for errcode, owned_partitions in parts_by_error.iteritems(): - if errcode != 0: - for owned_partition in owned_partitions: - owned_partition.fetch_lock.release() + for errcode, owned_partitions in parts_by_error.iteritems(): + if errcode != 0: + for owned_partition in owned_partitions: + owned_partition.fetch_lock.release() - if len(parts_by_error) == 1 and 0 in parts_by_error: + if not owned_partition_offsets: break log.debug("Retrying offset reset") - if any([a != 0 for a in parts_by_error]): + if owned_partition_offsets: raise OffsetRequestFailedError("reset_offsets failed after %d " "retries", self._offsets_reset_max_retries) @@ -643,7 +636,16 @@ def message_count(self): return self._messages.qsize() def flush(self): + """Flush internal queue""" + # Swap out _messages so a concurrent consume/enqueue won't interfere + tmp = self._messages self._messages = Queue() + while True: + try: + tmp.get_nowait() + self._messages_arrived.acquire(blocking=False) + except Empty: + break log.info("Flushed queue for partition %d", self.partition.id) def set_offset(self, last_offset_consumed): diff --git a/tests/pykafka/test_simpleconsumer.py b/tests/pykafka/test_simpleconsumer.py index 734022277..923fc5be2 100644 --- a/tests/pykafka/test_simpleconsumer.py +++ b/tests/pykafka/test_simpleconsumer.py @@ -1,9 +1,10 @@ from contextlib import contextmanager import mock import unittest2 +from uuid import uuid4 from pykafka import KafkaClient -from pykafka.simpleconsumer import OwnedPartition +from pykafka.simpleconsumer import OwnedPartition, OffsetType from pykafka.test.utils import get_cluster, stop_cluster @@ -13,12 +14,20 @@ class TestSimpleConsumer(unittest2.TestCase): @classmethod def setUpClass(cls): cls.kafka = get_cluster() - cls.topic_name = 'test-data' + cls.topic_name = uuid4().hex cls.kafka.create_topic(cls.topic_name, 3, 2) - cls.kafka.produce_messages( - cls.topic_name, - ('msg {i}'.format(i=i) for i in xrange(1000)) - ) + + # It turns out that the underlying producer used by KafkaInstance will + # write all messages in a batch to a single partition, though not the + # same partition every time. We try to attain some spread here by + # sending more than one batch: + batch = 300 + cls.total_msgs = 3 * batch + for _ in range(3): + cls.kafka.produce_messages( + cls.topic_name, + ('msg {i}'.format(i=i) for i in xrange(batch))) + cls.client = KafkaClient(cls.kafka.brokers) @classmethod @@ -35,15 +44,16 @@ def _get_simple_consumer(self, **kwargs): def test_consume(self): with self._get_simple_consumer() as consumer: - messages = [consumer.consume() for _ in xrange(1000)] - self.assertEquals(len(messages), 1000) + messages = [consumer.consume() for _ in xrange(self.total_msgs)] + self.assertEquals(len(messages), self.total_msgs) + self.assertTrue(None not in messages) def test_offset_commit(self): """Check fetched offsets match pre-commit internal state""" with self._get_simple_consumer( consumer_group='test_offset_commit') as consumer: [consumer.consume() for _ in xrange(100)] - offsets_committed = self._currently_held_offsets(consumer) + offsets_committed = consumer.held_offsets consumer.commit_offsets() offsets_fetched = dict((r[0], r[1].offset) @@ -55,18 +65,75 @@ def test_offset_resume(self): with self._get_simple_consumer( consumer_group='test_offset_resume') as consumer: [consumer.consume() for _ in xrange(100)] - offsets_committed = self._currently_held_offsets(consumer) + offsets_committed = consumer.held_offsets consumer.commit_offsets() with self._get_simple_consumer( consumer_group='test_offset_resume') as consumer: - offsets_resumed = self._currently_held_offsets(consumer) - self.assertEquals(offsets_resumed, offsets_committed) + self.assertEquals(consumer.held_offsets, offsets_committed) + + def test_reset_offset_on_start(self): + """Try starting from LATEST and EARLIEST offsets""" + with self._get_simple_consumer( + auto_offset_reset=OffsetType.EARLIEST, + reset_offset_on_start=True) as consumer: + earliest = consumer.topic.earliest_available_offsets() + earliest_minus_one = consumer.held_offsets + self.assertTrue(all( + earliest_minus_one[i] == earliest[i].offset[0] - 1 + for i in earliest.keys())) + self.assertIsNotNone(consumer.consume()) - @staticmethod - def _currently_held_offsets(consumer): - return dict((p.partition.id, p.last_offset_consumed) - for p in consumer._partitions.itervalues()) + with self._get_simple_consumer( + auto_offset_reset=OffsetType.LATEST, + reset_offset_on_start=True, + consumer_timeout_ms=500) as consumer: + latest = consumer.topic.latest_available_offsets() + latest_minus_one = consumer.held_offsets + self.assertTrue(all( + latest_minus_one[i] == latest[i].offset[0] - 1 + for i in latest.keys())) + self.assertIsNone(consumer.consume(block=False)) + + difference = sum(latest_minus_one[i] - earliest_minus_one[i] + for i in latest_minus_one.keys()) + self.assertEqual(difference, self.total_msgs) + + def test_reset_offsets(self): + """Test resetting to user-provided offsets""" + with self._get_simple_consumer( + auto_offset_reset=OffsetType.EARLIEST) as consumer: + # Find us a non-empty partition "target_part" + part_id, latest_offset = next( + (p, res.offset[0]) + for p, res in consumer.topic.latest_available_offsets().items() + if res.offset[0] > 0) + target_part = consumer.partitions[part_id] + + # Set all other partitions to LATEST, to ensure that any consume() + # calls read from target_part + partition_offsets = { + p: OffsetType.LATEST for p in consumer.partitions.values()} + + new_offset = latest_offset - 5 + partition_offsets[target_part] = new_offset + consumer.reset_offsets(partition_offsets.items()) + + self.assertEqual(consumer.held_offsets[part_id], new_offset) + msg = consumer.consume() + self.assertEqual(msg.offset, new_offset + 1) + + # Invalid offsets should get overwritten as per auto_offset_reset + partition_offsets[target_part] = latest_offset + 5 # invalid! + consumer.reset_offsets(partition_offsets.items()) + + # SimpleConsumer's fetcher thread will detect the invalid offset + # and reset it immediately. RdKafkaSimpleConsumer however will + # only get to write the valid offset upon a call to consume(): + msg = consumer.consume() + expected_offset = target_part.earliest_available_offset() + self.assertEqual(msg.offset, expected_offset) + self.assertEqual(consumer.held_offsets[part_id], expected_offset) class TestOwnedPartition(unittest2.TestCase):