Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,6 @@ def _handle_success(parts):
partition.flush()
by_leader[partition.partition.leader].append((partition, offset))

# reset this dict to prepare it for next retry
owned_partition_offsets = {}

# get valid offset ranges for each partition
for broker, offsets in by_leader.iteritems():
reqs = [owned_partition.build_offset_request(offset)
Expand All @@ -511,7 +508,11 @@ def _handle_success(parts):
success_handler=_handle_success,
partitions_by_id=self._partitions_by_id)

if len(parts_by_error) == 1 and 0 in parts_by_error:
if 0 in parts_by_error:
# drop successfully reset partitions for next retry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a much better way to do what I was trying to do.

successful = [part for part, _ in parts_by_error.pop(0)]
map(owned_partition_offsets.pop, successful)
if not parts_by_error:
continue
log.error("Error resetting offsets for topic %s (errors: %s)",
self._topic.name,
Expand All @@ -520,24 +521,16 @@ def _handle_success(parts):

time.sleep(i * (self._offsets_channel_backoff_ms / 1000))

if 0 in parts_by_error:
parts_by_error.pop(0)
errored_partitions = {
part: owned_partition_offsets[part]
for errcode, parts in parts_by_error.iteritems()
for part, _ in parts}
owned_partition_offsets.update(errored_partitions)

for errcode, owned_partitions in parts_by_error.iteritems():
if errcode != 0:
for owned_partition in owned_partitions:
owned_partition.fetch_lock.release()
for errcode, owned_partitions in parts_by_error.iteritems():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch, though I'm surprised it wasn't already fixed. I could've sworn I made this exact change somewhere else.

if errcode != 0:
for owned_partition in owned_partitions:
owned_partition.fetch_lock.release()

if len(parts_by_error) == 1 and 0 in parts_by_error:
if not owned_partition_offsets:
break
log.debug("Retrying offset reset")

if any([a != 0 for a in parts_by_error]):
if owned_partition_offsets:
raise OffsetRequestFailedError("reset_offsets failed after %d "
"retries",
self._offsets_reset_max_retries)
Expand Down Expand Up @@ -643,7 +636,16 @@ def message_count(self):
return self._messages.qsize()

def flush(self):
"""Flush internal queue"""
# Swap out _messages so a concurrent consume/enqueue won't interfere
tmp = self._messages
self._messages = Queue()
while True:
try:
tmp.get_nowait()
self._messages_arrived.acquire(blocking=False)
except Empty:
break
log.info("Flushed queue for partition %d", self.partition.id)

def set_offset(self, last_offset_consumed):
Expand Down
99 changes: 83 additions & 16 deletions tests/pykafka/test_simpleconsumer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from contextlib import contextmanager
import mock
import unittest2
from uuid import uuid4

from pykafka import KafkaClient
from pykafka.simpleconsumer import OwnedPartition
from pykafka.simpleconsumer import OwnedPartition, OffsetType
from pykafka.test.utils import get_cluster, stop_cluster


Expand All @@ -13,12 +14,20 @@ class TestSimpleConsumer(unittest2.TestCase):
@classmethod
def setUpClass(cls):
cls.kafka = get_cluster()
cls.topic_name = 'test-data'
cls.topic_name = uuid4().hex
cls.kafka.create_topic(cls.topic_name, 3, 2)
cls.kafka.produce_messages(
cls.topic_name,
('msg {i}'.format(i=i) for i in xrange(1000))
)

# It turns out that the underlying producer used by KafkaInstance will
# write all messages in a batch to a single partition, though not the
# same partition every time. We try to attain some spread here by
# sending more than one batch:
batch = 300
cls.total_msgs = 3 * batch
for _ in range(3):
cls.kafka.produce_messages(
cls.topic_name,
('msg {i}'.format(i=i) for i in xrange(batch)))

cls.client = KafkaClient(cls.kafka.brokers)

@classmethod
Expand All @@ -35,15 +44,16 @@ def _get_simple_consumer(self, **kwargs):

def test_consume(self):
with self._get_simple_consumer() as consumer:
messages = [consumer.consume() for _ in xrange(1000)]
self.assertEquals(len(messages), 1000)
messages = [consumer.consume() for _ in xrange(self.total_msgs)]
self.assertEquals(len(messages), self.total_msgs)
self.assertTrue(None not in messages)

def test_offset_commit(self):
"""Check fetched offsets match pre-commit internal state"""
with self._get_simple_consumer(
consumer_group='test_offset_commit') as consumer:
[consumer.consume() for _ in xrange(100)]
offsets_committed = self._currently_held_offsets(consumer)
offsets_committed = consumer.held_offsets
consumer.commit_offsets()

offsets_fetched = dict((r[0], r[1].offset)
Expand All @@ -55,18 +65,75 @@ def test_offset_resume(self):
with self._get_simple_consumer(
consumer_group='test_offset_resume') as consumer:
[consumer.consume() for _ in xrange(100)]
offsets_committed = self._currently_held_offsets(consumer)
offsets_committed = consumer.held_offsets
consumer.commit_offsets()

with self._get_simple_consumer(
consumer_group='test_offset_resume') as consumer:
offsets_resumed = self._currently_held_offsets(consumer)
self.assertEquals(offsets_resumed, offsets_committed)
self.assertEquals(consumer.held_offsets, offsets_committed)

def test_reset_offset_on_start(self):
"""Try starting from LATEST and EARLIEST offsets"""
with self._get_simple_consumer(
auto_offset_reset=OffsetType.EARLIEST,
reset_offset_on_start=True) as consumer:
earliest = consumer.topic.earliest_available_offsets()
earliest_minus_one = consumer.held_offsets
self.assertTrue(all(
earliest_minus_one[i] == earliest[i].offset[0] - 1
for i in earliest.keys()))
self.assertIsNotNone(consumer.consume())

@staticmethod
def _currently_held_offsets(consumer):
return dict((p.partition.id, p.last_offset_consumed)
for p in consumer._partitions.itervalues())
with self._get_simple_consumer(
auto_offset_reset=OffsetType.LATEST,
reset_offset_on_start=True,
consumer_timeout_ms=500) as consumer:
latest = consumer.topic.latest_available_offsets()
latest_minus_one = consumer.held_offsets
self.assertTrue(all(
latest_minus_one[i] == latest[i].offset[0] - 1
for i in latest.keys()))
self.assertIsNone(consumer.consume(block=False))

difference = sum(latest_minus_one[i] - earliest_minus_one[i]
for i in latest_minus_one.keys())
self.assertEqual(difference, self.total_msgs)

def test_reset_offsets(self):
"""Test resetting to user-provided offsets"""
with self._get_simple_consumer(
auto_offset_reset=OffsetType.EARLIEST) as consumer:
# Find us a non-empty partition "target_part"
part_id, latest_offset = next(
(p, res.offset[0])
for p, res in consumer.topic.latest_available_offsets().items()
if res.offset[0] > 0)
target_part = consumer.partitions[part_id]

# Set all other partitions to LATEST, to ensure that any consume()
# calls read from target_part
partition_offsets = {
p: OffsetType.LATEST for p in consumer.partitions.values()}

new_offset = latest_offset - 5
partition_offsets[target_part] = new_offset
consumer.reset_offsets(partition_offsets.items())

self.assertEqual(consumer.held_offsets[part_id], new_offset)
msg = consumer.consume()
self.assertEqual(msg.offset, new_offset + 1)

# Invalid offsets should get overwritten as per auto_offset_reset
partition_offsets[target_part] = latest_offset + 5 # invalid!
consumer.reset_offsets(partition_offsets.items())

# SimpleConsumer's fetcher thread will detect the invalid offset
# and reset it immediately. RdKafkaSimpleConsumer however will
# only get to write the valid offset upon a call to consume():
msg = consumer.consume()
expected_offset = target_part.earliest_available_offset()
self.assertEqual(msg.offset, expected_offset)
self.assertEqual(consumer.held_offsets[part_id], expected_offset)


class TestOwnedPartition(unittest2.TestCase):
Expand Down