From e60de943175c45d64602c0388b502b9763b34f27 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2015 09:38:24 -0700 Subject: [PATCH 1/5] Split consumer test class between Simple and MultiProcess --- test/test_consumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/test_consumer.py b/test/test_consumer.py index 08fd62045..bbced02e6 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -25,10 +25,11 @@ def test_partition_list(self): client = MagicMock() partitions = (0,) with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets: - consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions) + MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions) self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) ) self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member +class TestSimpleConsumer(unittest.TestCase): def test_simple_consumer_failed_payloads(self): client = MagicMock() consumer = SimpleConsumer(client, group=None, From ecdcdf531d232a923f4869f421d3a908dd735d4a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2015 09:43:47 -0700 Subject: [PATCH 2/5] (test) Consumer commit() should log errors and return True/False, not raise exceptions --- test/test_consumer.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/test/test_consumer.py b/test/test_consumer.py index bbced02e6..57c69a219 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -81,6 +81,30 @@ def unknown_topic_partition(request): with self.assertRaises(UnknownTopicOrPartitionError): consumer.get_messages(20) + def test_simple_consumer_commit_does_not_raise(self): + client = MagicMock() + client.get_partition_ids_for_topic.return_value = [0, 1] + + def mock_offset_fetch_request(group, payloads, **kwargs): + return [OffsetFetchResponse(p.topic, p.partition, 0, b'', 0) for p in payloads] + + client.send_offset_fetch_request.side_effect = mock_offset_fetch_request + + def mock_offset_commit_request(group, payloads, **kwargs): + raise FailedPayloadsError(payloads[0]) + + client.send_offset_commit_request.side_effect = mock_offset_commit_request + + consumer = SimpleConsumer(client, group='foobar', + topic='topic', partitions=[0, 1], + auto_commit=False) + + # Mock internal commit check + consumer.count_since_commit = 10 + + # This should not raise an exception + self.assertFalse(consumer.commit(partitions=[0, 1])) + @staticmethod def fail_requests_factory(error_factory): # Mock so that only the first request gets a valid response From f021609911d25dbb7ef20410890483c1439edd58 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2015 09:48:19 -0700 Subject: [PATCH 3/5] Change Consumer commit() to return True/False and log error; dont raise client exceptions --- kafka/consumer/base.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 6365cfa38..b5383a3db 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -8,7 +8,7 @@ import kafka.common from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, - UnknownTopicOrPartitionError, check_error + UnknownTopicOrPartitionError, check_error, KafkaError ) from kafka.util import kafka_bytestring, ReentrantTimer @@ -114,12 +114,13 @@ def fetch_last_known_offsets(self, partitions=None): self.offsets[resp.partition] = resp.offset def commit(self, partitions=None): - """ - Commit offsets for this consumer + """Commit stored offsets to Kafka via OffsetCommitRequest (v0) Keyword Arguments: partitions (list): list of partitions to commit, default is to commit all of them + + Returns: True on success, False on failure """ # short circuit if nothing happened. This check is kept outside @@ -135,22 +136,27 @@ def commit(self, partitions=None): reqs = [] if partitions is None: # commit all partitions - partitions = self.offsets.keys() + partitions = list(self.offsets.keys()) + log.info('Committing new offsets for %s, partitions %s', + self.topic, partitions) for partition in partitions: offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) + log.debug('Commit offset %d in SimpleConsumer: ' + 'group=%s, topic=%s, partition=%s', + offset, self.group, self.topic, partition) reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - kafka.common.check_error(resp) - - self.count_since_commit = 0 + try: + self.client.send_offset_commit_request(self.group, reqs) + except KafkaError as e: + log.error('%s saving offsets: %s', e.__class__.__name__, e) + return False + else: + self.count_since_commit = 0 + return True def _auto_commit(self): """ From 680a8dc3376badccccf0aab27a2307adc0b4cb0d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2015 09:46:33 -0700 Subject: [PATCH 4/5] (test) SimpleConsumer.reset_partition_offset should not raise exception on failure --- test/test_consumer.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/test/test_consumer.py b/test/test_consumer.py index 57c69a219..df1511551 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -4,7 +4,7 @@ from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer from kafka.common import ( - KafkaConfigurationError, FetchResponse, + KafkaConfigurationError, FetchResponse, OffsetFetchResponse, FailedPayloadsError, OffsetAndMessage, NotLeaderForPartitionError, UnknownTopicOrPartitionError ) @@ -105,6 +105,21 @@ def mock_offset_commit_request(group, payloads, **kwargs): # This should not raise an exception self.assertFalse(consumer.commit(partitions=[0, 1])) + def test_simple_consumer_reset_partition_offset(self): + client = MagicMock() + + def mock_offset_request(payloads, **kwargs): + raise FailedPayloadsError(payloads[0]) + + client.send_offset_request.side_effect = mock_offset_request + + consumer = SimpleConsumer(client, group='foobar', + topic='topic', partitions=[0, 1], + auto_commit=False) + + # This should not raise an exception + self.assertEqual(consumer.reset_partition_offset(0), None) + @staticmethod def fail_requests_factory(error_factory): # Mock so that only the first request gets a valid response From ed42d7899117e4bba8ef47afe825c13185cbdfc7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2015 09:50:06 -0700 Subject: [PATCH 5/5] Change SimpleConsumer.reset_partition_offset to return offset / None on failure (dont raise exception) --- kafka/consumer/simple.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index e4233ff6f..c75e78b67 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -27,7 +27,7 @@ NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, OffsetRequest, + FetchRequest, KafkaError, OffsetRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error @@ -144,6 +144,13 @@ def __repr__(self): (self.group, self.topic, str(self.offsets.keys())) def reset_partition_offset(self, partition): + """Update offsets using auto_offset_reset policy (smallest|largest) + + Arguments: + partition (int): the partition for which offsets should be updated + + Returns: Updated offset on success, None on failure + """ LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': @@ -163,10 +170,17 @@ def reset_partition_offset(self, partition): raise # send_offset_request - (resp, ) = self.client.send_offset_request(reqs) - check_error(resp) - self.offsets[partition] = resp.offsets[0] - self.fetch_offsets[partition] = resp.offsets[0] + log.info('Resetting topic-partition offset to %s for %s:%d', + self.auto_offset_reset, self.topic, partition) + try: + (resp, ) = self.client.send_offset_request(reqs) + except KafkaError as e: + log.error('%s sending offset request for %s:%d', + e.__class__.__name__, self.topic, partition) + else: + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + return resp.offsets[0] def provide_partition_info(self): """