Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #815 from Parsely/feature/partition_offsets_commit
Browse files Browse the repository at this point in the history
customizable offsets in commit_offsets
  • Loading branch information
Emmett J. Butler committed Jun 14, 2018
2 parents 56efe39 + c308a9d commit 483b9c1
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 11 deletions.
17 changes: 13 additions & 4 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ def __init__(self,
FetchRequests
:type num_consumer_fetchers: int
:param auto_commit_enable: If true, periodically commit to kafka the
offset of messages already fetched by this consumer. This also
requires that `consumer_group` is not `None`.
offset of messages already returned from consume() calls. Requires that
`consumer_group` is not `None`.
:type auto_commit_enable: bool
:param auto_commit_interval_ms: The frequency (in milliseconds) at which
the consumer's offsets are committed to kafka. This setting is
Expand Down Expand Up @@ -768,12 +768,21 @@ def __iter__(self):
return
yield message

def commit_offsets(self):
def commit_offsets(self, partition_offsets=None):
"""Commit offsets for this consumer's partitions
Uses the offset commit/fetch API
:param partition_offsets: (`partition`, `offset`) pairs to
commit where `partition` is the partition for which to commit the offset
and `offset` is the offset to commit for the partition. Note that using
this argument when `auto_commit_enable` is enabled can cause inconsistencies
in committed offsets. For best results, use *either* this argument *or*
`auto_commit_enable`.
:type partition_offsets: Sequence of tuples of the form
(:class:`pykafka.partition.Partition`, int)
"""
self._raise_worker_exceptions()
if not self._consumer:
raise KafkaException("Cannot commit offsets - consumer not started")
return self._consumer.commit_offsets()
return self._consumer.commit_offsets(partition_offsets=partition_offsets)
39 changes: 32 additions & 7 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def __init__(self,
FetchRequests
:type num_consumer_fetchers: int
:param auto_commit_enable: If true, periodically commit to kafka the
offset of messages already fetched by this consumer. This also
requires that `consumer_group` is not `None`.
offset of messages already returned from consume() calls. Requires that
`consumer_group` is not `None`.
:type auto_commit_enable: bool
:param auto_commit_interval_ms: The frequency (in milliseconds) at which the
consumer offsets are committed to kafka. This setting is ignored if
Expand Down Expand Up @@ -518,15 +518,35 @@ def _auto_commit(self):
self.commit_offsets()
self._last_auto_commit = time.time()

def commit_offsets(self):
def commit_offsets(self, partition_offsets=None):
"""Commit offsets for this consumer's partitions
Uses the offset commit/fetch API
:param partition_offsets: (`partition`, `offset`) pairs to
commit where `partition` is the partition for which to commit the offset
and `offset` is the offset to commit for the partition. Note that using
this argument when `auto_commit_enable` is enabled can cause inconsistencies
in committed offsets. For best results, use *either* this argument *or*
`auto_commit_enable`.
:type partition_offsets: Sequence of tuples of the form
(:class:`pykafka.partition.Partition`, int)
"""
if not self._consumer_group:
raise Exception("consumer group must be specified to commit offsets")

reqs = [p.build_offset_commit_request() for p in self._partitions.values()]
if partition_offsets is None:
partition_offsets = [(p, None) for p in self._partitions.keys()]

# turn Partitions into their corresponding OwnedPartitions
try:
owned_partition_offsets = {self._partitions[p]: offset
for p, offset in partition_offsets}
except KeyError as e:
raise KafkaException("Unknown partition supplied to commit_offsets\n%s", e)
reqs = [p.build_offset_commit_request(offset=o) for p, o
in iteritems(owned_partition_offsets)]

log.debug("Committing offsets for %d partitions to broker id %s", len(reqs),
self._group_coordinator.id)
for i in range(self._offsets_commit_max_retries):
Expand Down Expand Up @@ -565,7 +585,8 @@ def commit_offsets(self):
op for code, err_group in iteritems(parts_by_error)
for op, res in err_group
]
reqs = [p.build_offset_commit_request() for p in errored_partitions]
reqs = [op.build_offset_commit_request(offset=owned_partition_offsets[op])
for op in errored_partitions]

def fetch_offsets(self):
"""Fetch offsets for this consumer's topic
Expand Down Expand Up @@ -945,14 +966,18 @@ def build_fetch_request(self, max_bytes):
self.partition.topic.name, self.partition.id,
self.next_offset, max_bytes)

def build_offset_commit_request(self):
def build_offset_commit_request(self, offset=None):
"""Create a :class:`pykafka.protocol.PartitionOffsetCommitRequest`
for this partition
:param offset: The offset to send in the request. If None, defaults to
last_offset_consumed + 1
:type offset: int
"""
return PartitionOffsetCommitRequest(
self.partition.topic.name,
self.partition.id,
self.last_offset_consumed + 1,
offset if offset is not None else self.last_offset_consumed + 1,
int(time.time() * 1000),
get_bytes('{}'.format(self._offset_metadata_json))
)
Expand Down
13 changes: 13 additions & 0 deletions tests/pykafka/test_simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,19 @@ def test_offset_commit(self):
offsets_fetched = self._convert_offsets(consumer.fetch_offsets())
self.assertEquals(offsets_fetched, offsets_committed)

def test_offset_commit_override(self):
"""Check fetched offsets match committed offsets"""
with self._get_simple_consumer(
consumer_group=b'test_offset_commit') as consumer:
[consumer.consume() for _ in range(100)]
offset = 69
offsets_committed = [(p, offset) for p in consumer.partitions.values()]
consumer.commit_offsets(partition_offsets=offsets_committed)

offsets_fetched = self._convert_offsets(consumer.fetch_offsets())
offsets_committed = {p.id: offset - 1 for p in consumer.partitions.values()}
self.assertEquals(offsets_fetched, offsets_committed)

def test_offset_resume(self):
"""Check resumed internal state matches committed offsets"""
with self._get_simple_consumer(
Expand Down

0 comments on commit 483b9c1

Please sign in to comment.