Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #774 from Parsely/tanaysoni_feature/read-only-consume
Browse files Browse the repository at this point in the history
allow toggling of offset reset on fetch
  • Loading branch information
Emmett J. Butler committed Mar 5, 2018
2 parents 50d3fca + b803460 commit 4cbd79e
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def __init__(self,
compacted_topic=False,
generation_id=-1,
consumer_id=b'',
deserializer=None):
deserializer=None,
reset_offset_on_fetch=True):
"""Create a SimpleConsumer.
Settings and default values are taken from the Scala
Expand Down Expand Up @@ -166,6 +167,9 @@ def __init__(self,
fields transformed according to the client code's serialization logic.
See `pykafka.utils.__init__` for stock implemtations.
:type deserializer: function
:param reset_offset_on_fetch: Whether to update offsets during fetch_offsets.
Disable for read-only use cases to prevent side-effects.
:type reset_offset_on_fetch: bool
"""
self._running = False
self._cluster = cluster
Expand Down Expand Up @@ -232,6 +236,7 @@ def __init__(self,
self.partition_cycle = itertools.cycle(self._partitions.values())

self._default_error_handlers = self._build_default_error_handlers()
self.reset_offset_on_fetch = reset_offset_on_fetch

if self._auto_start:
self.start()
Expand Down Expand Up @@ -618,7 +623,7 @@ def _handle_success(parts):
parts_by_error = handle_partition_responses(
self._default_error_handlers,
response=res,
success_handler=_handle_success,
success_handler=_handle_success if self.reset_offsets_on_fetch else None,
partitions_by_id=self._partitions_by_id)

success_responses.extend([(op.partition.id, r)
Expand Down

0 comments on commit 4cbd79e

Please sign in to comment.