From 732e5a6f5439f8ba55dc29ac66b6d8824158f567 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Wed, 26 Oct 2016 16:34:28 +0200 Subject: [PATCH 1/2] proper iteration over items and info messages --- kafka/consumer/fetcher.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d09f9da9a..c304496da 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -236,7 +236,7 @@ def _raise_if_offset_out_of_range(self): current_out_of_range_partitions = {} # filter only the fetchable partitions - for partition, offset in self._offset_out_of_range_partitions: + for partition, offset in six.iteritems(self._offset_out_of_range_partitions): if not self._subscriptions.is_fetchable(partition): log.debug("Ignoring fetched records for %s since it is no" " longer fetchable", partition) @@ -740,12 +740,13 @@ def _handle_fetch_response(self, request, send_time, response): self._client.cluster.request_update() elif error_type is Errors.OffsetOutOfRangeError: fetch_offset = fetch_offsets[tp] + log.info("Fetch offset %s is out of range", fetch_offset) if self._subscriptions.has_default_offset_reset_policy(): self._subscriptions.need_offset_reset(tp) + log.info("Resetting offset") else: self._offset_out_of_range_partitions[tp] = fetch_offset - log.info("Fetch offset %s is out of range, resetting offset", - fetch_offset) + log.info("Raising exception") elif error_type is Errors.TopicAuthorizationFailedError: log.warn("Not authorized to read from topic %s.", tp.topic) self._unauthorized_topics.add(tp.topic) From d5b5c3a4237341e92aef940739f1a46b20e3b277 Mon Sep 17 00:00:00 2001 From: Alexander Sibiryakov Date: Fri, 16 Dec 2016 13:49:40 +0100 Subject: [PATCH 2/2] added tp-s and removed ambiguous message --- kafka/consumer/fetcher.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c304496da..b6bbea778 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -740,13 +740,12 @@ def _handle_fetch_response(self, request, send_time, response): self._client.cluster.request_update() elif error_type is Errors.OffsetOutOfRangeError: fetch_offset = fetch_offsets[tp] - log.info("Fetch offset %s is out of range", fetch_offset) + log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp) if self._subscriptions.has_default_offset_reset_policy(): self._subscriptions.need_offset_reset(tp) - log.info("Resetting offset") + log.info("Resetting offset for topic-partition %s", tp) else: self._offset_out_of_range_partitions[tp] = fetch_offset - log.info("Raising exception") elif error_type is Errors.TopicAuthorizationFailedError: log.warn("Not authorized to read from topic %s.", tp.topic) self._unauthorized_topics.add(tp.topic)