Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #770 from Parsely/bootandy_feature/deprecate_cnsid
Browse files Browse the repository at this point in the history
deprecate SimpleConsumer(consumer_id)
  • Loading branch information
Emmett J. Butler committed Feb 28, 2018
2 parents e9dd6d2 + 594e28d commit a90575a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
11 changes: 7 additions & 4 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ def _get_internal_consumer(self, partitions=None, start=True):
reset_offset_on_start = False
Cls = (rdkafka.RdKafkaSimpleConsumer
if self._use_rdkafka else SimpleConsumer)
return Cls(
cns = Cls(
self._topic,
self._cluster,
consumer_group=self._consumer_group,
Expand All @@ -443,12 +443,15 @@ def _get_internal_consumer(self, partitions=None, start=True):
offsets_commit_max_retries=self._offsets_commit_max_retries,
auto_offset_reset=self._auto_offset_reset,
reset_offset_on_start=reset_offset_on_start,
auto_start=start,
auto_start=False,
compacted_topic=self._is_compacted_topic,
generation_id=self._generation_id,
consumer_id=self._consumer_id,
deserializer=self._deserializer
)
cns.consumer_id = self._consumer_id
cns.generation_id = self._generation_id
if start:
cns.start()
return cns

def _get_participants(self):
"""Use zookeeper to get the other consumers of this topic.
Expand Down
3 changes: 2 additions & 1 deletion pykafka/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ class InconsistentGroupProtocol(ProtocolClientError):

class UnknownMemberId(ProtocolClientError):
"""Returned from group requests (offset commits/fetches, heartbeats, etc) when the
memberId is not in the current generation.
memberId is not in the current generation. Also returned if SimpleConsumer is
incorrectly instantiated with a non-default consumer_id.
"""
ERROR_CODE = 25

Expand Down
42 changes: 32 additions & 10 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ def __init__(self,
consumer to use less stringent message ordering logic because compacted
topics do not provide offsets in strict incrementing order.
:type compacted_topic: bool
:param generation_id: The generation id with which to make group requests
:param generation_id: Deprecated::2.7 Do not set if directly instantiating
SimpleConsumer. The generation id with which to make group requests
:type generation_id: int
:param consumer_id: The identifying string to use for this consumer on group
requests
:param consumer_id: Deprecated::2.7 Do not set if directly instantiating
SimpleConsumer. The identifying string to use for this consumer on
group requests
:type consumer_id: bytes
:param deserializer: A function defining how to deserialize messages returned
from Kafka. A function with the signature d(value, partition_key) that
Expand Down Expand Up @@ -193,9 +195,8 @@ def __init__(self,
self._auto_start = auto_start
self._reset_offset_on_start = reset_offset_on_start
self._is_compacted_topic = compacted_topic
self._generation_id = valid_int(generation_id, allow_zero=True,
allow_negative=True)
self._consumer_id = consumer_id
self._generation_id = -1
self._consumer_id = b''
self._deserializer = deserializer

# incremented for any message arrival from any partition
Expand All @@ -215,16 +216,14 @@ def __init__(self,
self._partitions = {p: OwnedPartition(p,
self._cluster.handler,
self._messages_arrived,
self._is_compacted_topic,
self._consumer_id)
self._is_compacted_topic)
for p in partitions}
else:
self._partitions = {topic.partitions[k]:
OwnedPartition(p,
self._cluster.handler,
self._messages_arrived,
self._is_compacted_topic,
self._consumer_id)
self._is_compacted_topic)
for k, p in iteritems(topic.partitions)}
self._partitions_by_id = {p.partition.id: p
for p in itervalues(self._partitions)}
Expand All @@ -237,6 +236,25 @@ def __init__(self,
if self._auto_start:
self.start()

@property
def consumer_id(self):
return self._consumer_id

@consumer_id.setter
def consumer_id(self, value):
self._consumer_id = value
for op in itervalues(self._partitions):
op.set_consumer_id(self._consumer_id)

@property
def generation_id(self):
return self._generation_id

@generation_id.setter
def generation_id(self, value):
self._generation_id = valid_int(value, allow_zero=True,
allow_negative=True)

def __repr__(self):
return "<{module}.{name} at {id_} (consumer_group={group})>".format(
module=self.__class__.__module__,
Expand Down Expand Up @@ -854,6 +872,10 @@ def __init__(self,
self.last_offset_consumed = -1
self.next_offset = 0
self.fetch_lock = handler.RLock() if handler is not None else threading.RLock()
self.set_consumer_id(self._consumer_id)

def set_consumer_id(self, value):
self._consumer_id = value
# include consumer id in offset metadata for debugging
self._offset_metadata = {
'consumer_id': get_string(self._consumer_id),
Expand Down

0 comments on commit a90575a

Please sign in to comment.