Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
simply raise an exception instead of trying to get fancy with offset …
Browse files Browse the repository at this point in the history
…resetting
  • Loading branch information
emmettbutler committed Jul 17, 2015
1 parent 591d5d0 commit 5faf306
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
5 changes: 5 additions & 0 deletions pykafka/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class NoMessagesConsumedError(KafkaException):
pass


class OffsetRequestFailedError(KafkaException):
"""Indicates that OffsetRequests for offset resetting failed more times than the configured maximum"""
pass


class PartitionOwnedError(KafkaException):
"""Indicates a given partition is still owned in Zookeeper."""

Expand Down
17 changes: 7 additions & 10 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
from .exceptions import (OffsetOutOfRangeError, UnknownTopicOrPartition,
OffsetMetadataTooLarge, OffsetsLoadInProgress,
NotCoordinatorForConsumer, SocketDisconnectedError,
ConsumerStoppedException, KafkaException, ERROR_CODES)
ConsumerStoppedException, KafkaException,
OffsetRequestFailedError, ERROR_CODES)
from .protocol import (PartitionFetchRequest, PartitionOffsetCommitRequest,
PartitionOffsetFetchRequest, PartitionOffsetRequest)
from .utils.error_handlers import (handle_partition_responses, raise_error,
Expand Down Expand Up @@ -526,21 +527,17 @@ def _handle_success(parts):
for errcode, owned_partitions in parts_by_error.iteritems():
if errcode != 0:
for owned_partition in owned_partitions:
# set internal counter appropriately to avoid possibly
# leaving it in a state inconsistent with the given
# offset
given_offset = owned_partition_offsets[owned_partition]
# don't reset counter to a magic value, only to a real
# offset
if given_offset != self._auto_offset_reset:
owned_partition.set_offset(given_offset)
# release all locks to allow fetching
owned_partition.fetch_lock.release()

if len(parts_by_error) == 1 and 0 in parts_by_error:
break
log.debug("Retrying offset reset")

if any([a != 0 for a in parts_by_error]):
raise OffsetRequestFailedError("reset_offsets failed after %d "
"retries",
self._offsets_reset_max_retries)

if self._consumer_group is not None:
self.commit_offsets()

Expand Down

0 comments on commit 5faf306

Please sign in to comment.