Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions kafka/consumer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ def cleanup(obj):
self._cleanup_func = cleanup
atexit.register(cleanup, self)

self.partition_info = False # Do not return partition info in msgs

def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
"""
self.partition_info = True

def fetch_last_known_offsets(self, partitions=None):
if self.group is None:
raise ValueError('KafkaClient.group must not be None')
Expand Down
3 changes: 2 additions & 1 deletion kafka/consumer/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ def get_messages(self, count=1, block=True, timeout=10):
except Empty:
break

messages.append(message)
_msg = (partition, message) if self.partition_info else message
messages.append(_msg)
new_offsets[partition] = message.offset + 1
count -= 1
if timeout is not None:
Expand Down
7 changes: 0 additions & 7 deletions kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
(buffer_size, max_buffer_size))
self.buffer_size = buffer_size
self.max_buffer_size = max_buffer_size
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
Expand Down Expand Up @@ -182,12 +181,6 @@ def reset_partition_offset(self, partition):
self.fetch_offsets[partition] = resp.offsets[0]
return resp.offsets[0]

def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
"""
self.partition_info = True

def seek(self, offset, whence=None, partition=None):
"""
Alter the current offset in the consumer, similar to fseek
Expand Down