Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #834 from Parsely/bugfix/reset_offsets_logic
Browse files Browse the repository at this point in the history
SimpleConsumer.reset_offsets overhaul
  • Loading branch information
Emmett J. Butler committed Jul 18, 2018
2 parents 3fb9c74 + 665495c commit 2419a49
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 122 deletions.
24 changes: 10 additions & 14 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,24 +696,20 @@ def _topics_changed(self, topics):
def reset_offsets(self, partition_offsets=None):
"""Reset offsets for the specified partitions
Issue an OffsetRequest for each partition and set the appropriate
returned offset in the consumer's internal offset counter.
For each value provided in `partition_offsets`: if the value is an integer,
immediately reset the partition's internal offset counter to that value. If
it's a `datetime.datetime` instance or a valid `OffsetType`, issue an
`OffsetRequest` using that timestamp value to discover the latest offset
in the latest log segment before that timestamp, then set the partition's
internal counter to that value.
:param partition_offsets: (`partition`, `timestamp_or_offset`) pairs to
reset where `partition` is the partition for which to reset the offset
and `timestamp_or_offset` is EITHER the timestamp of the message
whose offset the partition should have OR the new offset the
partition should have
and `timestamp_or_offset` is EITHER the timestamp before which to find
a valid offset to set the partition's counter to OR the new offset the
partition's counter should be set to
:type partition_offsets: Sequence of tuples of the form
(:class:`pykafka.partition.Partition`, int)
NOTE: If an instance of `timestamp_or_offset` is treated by kafka as
an invalid offset timestamp, this function directly sets the consumer's
internal offset counter for that partition to that instance of
`timestamp_or_offset`. On the next fetch request, the consumer attempts
to fetch messages starting from that offset. See the following link
for more information on what kafka treats as a valid offset timestamp:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
(:class:`pykafka.partition.Partition`, int OR `datetime.datetime`)
"""
self._raise_worker_exceptions()
if not self._consumer:
Expand Down
191 changes: 83 additions & 108 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
limitations under the License.
"""
__all__ = ["SimpleConsumer"]
import datetime as dt
import itertools
import logging
import json
Expand Down Expand Up @@ -47,6 +48,8 @@


log = logging.getLogger(__name__)
EPOCH = dt.datetime(1970, 1, 1)
MAGIC_OFFSETS = [OffsetType.EARLIEST, OffsetType.LATEST]


class SimpleConsumer(object):
Expand Down Expand Up @@ -666,63 +669,39 @@ def _handle_success(parts):
def reset_offsets(self, partition_offsets=None):
"""Reset offsets for the specified partitions
Issue an OffsetRequest for each partition and set the appropriate
returned offset in the consumer's internal offset counter.
For each value provided in `partition_offsets`: if the value is an integer,
immediately reset the partition's internal offset counter to that value. If
it's a `datetime.datetime` instance or a valid `OffsetType`, issue an
`OffsetRequest` using that timestamp value to discover the latest offset
in the latest log segment before that timestamp, then set the partition's
internal counter to that value.
:param partition_offsets: (`partition`, `timestamp_or_offset`) pairs to
reset where `partition` is the partition for which to reset the offset
and `timestamp_or_offset` is EITHER the timestamp of the message
whose offset the partition should have OR the new "most recently
consumed" offset the partition should have
and `timestamp_or_offset` is EITHER the timestamp before which to find
a valid offset to set the partition's counter to OR the new offset the
partition's counter should be set to.
:type partition_offsets: Sequence of tuples of the form
(:class:`pykafka.partition.Partition`, int)
NOTE: If an instance of `timestamp_or_offset` is treated by kafka as
an invalid offset timestamp, this function directly sets the consumer's
internal offset counter for that partition to that instance of
`timestamp_or_offset`. This counter represents the offset most recently
consumed. On the next fetch request, the consumer attempts
to fetch messages starting from that offset plus one. See the following link
for more information on what kafka treats as a valid offset timestamp:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
(:class:`pykafka.partition.Partition`, int OR `datetime.datetime`)
"""
def _handle_success(parts):
for owned_partition, pres in parts:
if len(pres.offset) > 0:
# offset requests return the next offset to consume,
# so account for this here by passing offset - 1
owned_partition.set_offset(pres.offset[0] - 1)
# there is at least one log segment that starts before the given
# timestamp. set the counter to the latest offset of the latest
# log segment before the given timestamp.
owned_partition_offsets[owned_partition] = pres.offset[0] - 1
else:
# If the number specified in partition_offsets is an invalid
# timestamp value for the partition, kafka does the
# following:
# returns an empty array in pres.offset
# returns error code 0
# Here, we detect this case and set the consumer's internal
# offset to that value. Thus, the next fetch request will
# attempt to fetch from that offset. If it succeeds, all is
# well; if not, reset_offsets is called again by the error
# handlers in fetch() and fetching continues from
# self._auto_offset_reset..
# This amounts to a hacky way to support user-specified
# offsets in reset_offsets by working around a bug or bad
# design decision in kafka.
given_offset = owned_partition_offsets[owned_partition]
log.warning(
"Offset reset for partition {id_} to timestamp {offset}"
" failed. Setting partition {id_}'s internal counter"
" to {offset}".format(
id_=owned_partition.partition.id, offset=given_offset))
owned_partition.set_offset(given_offset)
# release locks on succeeded partitions to allow fetching
# to resume
owned_partition.fetch_lock.release()
log.warning("Partition {id_}: no offsets available before {offset}."
"Defaulting to OffsetType.EARLIEST.".format(
id_=owned_partition.partition.id,
offset=owned_partition_timestamps[owned_partition]))
owned_partition_offsets[owned_partition] = OffsetType.EARLIEST

if partition_offsets is None:
partition_offsets = [(a, self._auto_offset_reset)
for a in self._partitions.keys()]

# turn Partitions into their corresponding OwnedPartitions
try:
owned_partition_offsets = {self._partitions[p]: offset
for p, offset in partition_offsets}
Expand All @@ -731,60 +710,51 @@ def _handle_success(parts):

log.info("Resetting offsets for %s partitions", len(list(owned_partition_offsets)))

for i in range(self._offsets_reset_max_retries):
# sort offsets to avoid deadlocks
sorted_offsets = sorted(iteritems(owned_partition_offsets), key=lambda k: k[0].partition.id)

# group partitions by leader
by_leader = defaultdict(list)
for partition, offset in sorted_offsets:
# acquire lock for each partition to stop fetching during offset
# reset
if partition.fetch_lock.acquire(True):
# empty the queue for this partition to avoid sending
# emitting messages from the old offset
partition.flush()
by_leader[partition.partition.leader].append((partition, offset))

# get valid offset ranges for each partition
for broker, offsets in iteritems(by_leader):
reqs = [owned_partition.build_offset_request(offset)
for owned_partition, offset in offsets]
response = broker.request_offset_limits(reqs)
parts_by_error = handle_partition_responses(
self._default_error_handlers,
response=response,
success_handler=_handle_success,
partitions_by_id=self._partitions_by_id)

if 0 in parts_by_error:
# drop successfully reset partitions for next retry
successful = [part for part, _ in parts_by_error.pop(0)]
# py3 creates a generate so we need to evaluate this
# operation
list(map(owned_partition_offsets.pop, successful))
if not parts_by_error:
continue
log.error("Error resetting offsets for topic '%s' (errors: %s)",
self._topic.name,
{ERROR_CODES[err]: [op.partition.id for op, _ in parts]
for err, parts in iteritems(parts_by_error)})

self._cluster.handler.sleep(i * (self._offsets_channel_backoff_ms / 1000))

for errcode, owned_partitions in iteritems(parts_by_error):
if errcode != 0:
for owned_partition, _ in owned_partitions:
owned_partition.fetch_lock.release()

if not owned_partition_offsets:
break
log.debug("Retrying offset reset")

if owned_partition_offsets:
raise OffsetRequestFailedError("reset_offsets failed after %d "
"retries",
self._offsets_reset_max_retries)
owned_partition_timestamps = {
op: timestamp for op, timestamp
in iteritems(owned_partition_offsets)
if isinstance(timestamp, dt.datetime) or timestamp in MAGIC_OFFSETS}
if owned_partition_timestamps:
for i in range(self._offsets_reset_max_retries):
by_leader = defaultdict(list)
for partition, timestamp in iteritems(owned_partition_timestamps):
by_leader[partition.partition.leader].append((partition, timestamp))
for broker, timestamps in iteritems(by_leader):
reqs = [owned_partition.build_offset_request(timestamp)
for owned_partition, timestamp in timestamps]
response = broker.request_offset_limits(reqs)
parts_by_error = handle_partition_responses(
self._default_error_handlers,
response=response,
success_handler=_handle_success,
partitions_by_id=self._partitions_by_id)
if 0 in parts_by_error:
successful = [part for part, _ in parts_by_error.pop(0)]
list(map(owned_partition_timestamps.pop, successful))
if not parts_by_error:
continue
log.error("Error in OffsetRequest for topic '%s' (errors: %s)",
self._topic.name,
{ERROR_CODES[err]: [op.partition.id for op, _ in parts]
for err, parts in iteritems(parts_by_error)})
self._cluster.handler.sleep(i * (self._offsets_channel_backoff_ms / 1000))
if not owned_partition_timestamps:
break
log.debug("Retrying offset request")
if owned_partition_timestamps:
raise OffsetRequestFailedError("Offset request failed after %d "
"retries", self._offsets_reset_max_retries)

sorted_offsets = sorted(iteritems(owned_partition_offsets),
key=lambda k: k[0].partition.id)
for owned_partition, offset in sorted_offsets:
if not isinstance(offset, int):
raise ValueError("Invalid offset value encountered in reset_offsets:\n\t"
"Partition {pid} got offset '{offset}'."
.format(pid=owned_partition.partition.id, offset=offset))
with owned_partition.fetch_lock:
owned_partition.flush()
owned_partition.set_offset(offset)

if self._consumer_group is not None:
self.commit_offsets()
Expand Down Expand Up @@ -938,21 +908,26 @@ def set_offset(self, last_offset_consumed):
self.last_offset_consumed = last_offset_consumed
self.next_offset = last_offset_consumed + 1

def build_offset_request(self, new_offset):
def build_offset_request(self, offsets_before):
"""Create a :class:`pykafka.protocol.PartitionOffsetRequest` for this
partition
:param new_offset: The offset to which to set this partition. This
setting indicates how to reset the consumer's internal offset
counter when an OffsetOutOfRangeError is encountered.
There are two special values. Specify -1 to receive the latest
offset (i.e. the offset of the next coming message) and -2 to
receive the earliest available offset.
:type new_offset: :class:`pykafka.common.OffsetType` or int
:param offsets_before: Timestamp indicating the
latest write time for returned offsets. Only offsets of messages
written before this timestamp will be returned. Permissible
special values are `common.OffsetType.LATEST`, indicating that
offsets from all available log segments should be returned, and
`common.OffsetType.EARLIEST`, indicating that only the offset of
the earliest available message should be returned.
:type offsets_before: `datetime.datetime`
"""
return PartitionOffsetRequest(
self.partition.topic.name, self.partition.id,
new_offset, 1)
if isinstance(offsets_before, dt.datetime):
offsets_before = round((offsets_before - EPOCH).total_seconds() * 1000)
elif offsets_before not in MAGIC_OFFSETS:
raise ValueError("offsets_before is an invalid timestamp: {}"
.format(offsets_before))
return PartitionOffsetRequest(self.partition.topic.name, self.partition.id,
offsets_before, 1)

def build_fetch_request(self, max_bytes):
"""Create a :class:`pykafka.protocol.FetchPartitionRequest` for this
Expand Down
26 changes: 26 additions & 0 deletions tests/pykafka/test_simpleconsumer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from contextlib import contextmanager
import datetime as dt
import json
import mock
import os
Expand Down Expand Up @@ -45,6 +46,7 @@ def setUpClass(cls):
cls.prod = cls.client.topics[cls.topic_name].get_producer(
min_queued_messages=1
)
cls.after_earliest = dt.datetime.now() + dt.timedelta(seconds=1)
for i in range(cls.total_msgs):
cls.prod.produce('msg {i}'.format(i=i).encode())

Expand Down Expand Up @@ -177,6 +179,30 @@ def test_reset_offset_on_start(self):
for i in latest_offs)
self.assertEqual(difference, self.total_msgs)

def test_reset_offsets_timestamp(self):
"""Test resetting to user-provided timestamps"""
with self._get_simple_consumer(
auto_offset_reset=OffsetType.EARLIEST) as consumer:
# Find us a non-empty partition "target_part"
part_id, latest_offset = next(
(p, res.offset[0])
for p, res in consumer.topic.latest_available_offsets().items()
if res.offset[0] > 0)
target_part = consumer.partitions[part_id]

# Set all other partitions to LATEST, to ensure that any consume()
# calls read from target_part
partition_offsets = {
p: OffsetType.LATEST for p in consumer.partitions.values()}

partition_offsets[target_part] = self.after_earliest
consumer.reset_offsets(partition_offsets.items())

# expect EARLIEST here since our test partition has a single log segment
self.assertEqual(consumer.held_offsets[part_id], OffsetType.EARLIEST)
msg = consumer.consume()
self.assertEqual(msg.offset, 0)

def test_reset_offsets(self):
"""Test resetting to user-provided offsets"""
with self._get_simple_consumer(
Expand Down

0 comments on commit 2419a49

Please sign in to comment.