Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
add parameter to balanced consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler committed Mar 8, 2018
1 parent 9268476 commit 62bae63
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
10 changes: 8 additions & 2 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ def __init__(self,
use_rdkafka=False,
compacted_topic=False,
membership_protocol=RangeProtocol,
deserializer=None):
deserializer=None,
reset_offset_on_fetch=True):
"""Create a BalancedConsumer instance
:param topic: The topic this consumer should consume
Expand Down Expand Up @@ -213,6 +214,9 @@ def __init__(self,
fields transformed according to the client code's serialization logic.
See `pykafka.utils.__init__` for stock implemtations.
:type deserializer: function
:param reset_offset_on_fetch: Whether to update offsets during fetch_offsets.
Disable for read-only use cases to prevent side-effects.
:type reset_offset_on_fetch: bool
"""
self._cluster = cluster
try:
Expand Down Expand Up @@ -248,6 +252,7 @@ def __init__(self,
self._is_compacted_topic = compacted_topic
self._membership_protocol = membership_protocol
self._deserializer = deserializer
self._reset_offset_on_fetch = reset_offset_on_fetch

if not rdkafka and use_rdkafka:
raise ImportError("use_rdkafka requires rdkafka to be installed")
Expand Down Expand Up @@ -445,7 +450,8 @@ def _get_internal_consumer(self, partitions=None, start=True):
reset_offset_on_start=reset_offset_on_start,
auto_start=False,
compacted_topic=self._is_compacted_topic,
deserializer=self._deserializer
deserializer=self._deserializer,
reset_offset_on_fetch=self._reset_offset_on_fetch
)
cns.consumer_id = self._consumer_id
cns.generation_id = self._generation_id
Expand Down
7 changes: 6 additions & 1 deletion pykafka/managedbalancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def __init__(self,
compacted_topic=True,
heartbeat_interval_ms=3000,
membership_protocol=RangeProtocol,
deserializer=None):
deserializer=None,
reset_offset_on_fetch=True):
"""Create a ManagedBalancedConsumer instance
:param topic: The topic this consumer should consume
Expand Down Expand Up @@ -177,6 +178,9 @@ def __init__(self,
fields transformed according to the client code's serialization logic.
See `pykafka.utils.__init__` for stock implemtations.
:type deserializer: function
:param reset_offset_on_fetch: Whether to update offsets during fetch_offsets.
Disable for read-only use cases to prevent side-effects.
:type reset_offset_on_fetch: bool
"""

self._cluster = cluster
Expand Down Expand Up @@ -209,6 +213,7 @@ def __init__(self,
self._membership_protocol.metadata.topic_names = [self._topic.name]
self._heartbeat_interval_ms = valid_int(heartbeat_interval_ms)
self._deserializer = deserializer
self._reset_offset_on_fetch = reset_offset_on_fetch
if use_rdkafka is True:
raise ImportError("use_rdkafka is not available for {}".format(
self.__class__.__name__))
Expand Down
2 changes: 1 addition & 1 deletion pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def __init__(self,
self._generation_id = -1
self._consumer_id = b''
self._deserializer = deserializer
self._reset_offset_on_fetch = reset_offset_on_fetch

# incremented for any message arrival from any partition
# the initial value is 0 (no messages waiting)
Expand Down Expand Up @@ -236,7 +237,6 @@ def __init__(self,
self.partition_cycle = itertools.cycle(self._partitions.values())

self._default_error_handlers = self._build_default_error_handlers()
self._reset_offset_on_fetch = reset_offset_on_fetch

if self._auto_start:
self.start()
Expand Down

0 comments on commit 62bae63

Please sign in to comment.