Skip to content

Commit

Permalink
Merge pull request #72 from aio-libs/pre0.2.0
Browse files Browse the repository at this point in the history
Added max_poll_requests option for consumer
  • Loading branch information
tvoinarovskyi committed Nov 20, 2016
2 parents f0a55d3 + 3b1b159 commit bd671db
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 21 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@

FLAGS=
SCALA_VERSION?=2.11
KAFKA_VERSION?=0.10.0.0
KAFKA_VERSION?=0.10.1.0
DOCKER_IMAGE=aiolibs/kafka:$(SCALA_VERSION)_$(KAFKA_VERSION)

flake:
extra=$$(python -c "import sys;sys.stdout.write('--exclude tests/test_pep492.py') if sys.version_info[:3] < (3, 5, 0) else sys.stdout.write('')"); \
flake8 aiokafka tests $$extra

test: flake
@py.test -s --no-print-logs --docker-image $(DOCKER_IMAGE) $(FLAGS) tests
py.test -s --no-print-logs --docker-image $(DOCKER_IMAGE) $(FLAGS) tests

vtest: flake
@py.test -s -v --no-print-logs --docker-image $(DOCKER_IMAGE) $(FLAGS) tests
py.test -s -v --no-print-logs --docker-image $(DOCKER_IMAGE) $(FLAGS) tests

cov cover coverage: flake
@py.test -s --no-print-logs --cov aiokafka --cov-report html --docker-image $(DOCKER_IMAGE) $(FLAGS) tests
py.test -s --no-print-logs --cov aiokafka --cov-report html --docker-image $(DOCKER_IMAGE) $(FLAGS) tests
@echo "open file://`pwd`/htmlcov/index.html"

clean:
Expand Down
2 changes: 1 addition & 1 deletion aiokafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys # noqa
PY_35 = sys.version_info >= (3, 5)

__version__ = '0.1.4'
__version__ = '0.2.0.dev'

from .client import AIOKafkaClient # noqa
from .producer import AIOKafkaProducer # noqa
Expand Down
16 changes: 14 additions & 2 deletions aiokafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class AIOKafkaConsumer(object):
send messages larger than the consumer can fetch. If that
happens, the consumer can get stuck trying to fetch a large
message on a certain partition. Default: 1048576.
max_poll_records (int): The maximum number of records returned in a
single call to ``getmany()``. Defaults ``None``, no limit.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
retry_backoff_ms (int): Milliseconds to backoff when retrying on
Expand Down Expand Up @@ -136,6 +138,7 @@ def __init__(self, *topics, loop,
heartbeat_interval_ms=3000,
session_timeout_ms=30000,
consumer_timeout_ms=200,
max_poll_records=None,
api_version='auto'):
if api_version not in ('auto', '0.9', '0.10'):
raise ValueError("Unsupported Kafka API version")
Expand All @@ -156,6 +159,10 @@ def __init__(self, *topics, loop,
self._fetch_min_bytes = fetch_min_bytes
self._fetch_max_wait_ms = fetch_max_wait_ms
self._max_partition_fetch_bytes = max_partition_fetch_bytes
if max_poll_records is not None and (
not isinstance(max_poll_records, int) or max_poll_records < 1):
raise ValueError("`max_poll_records` should be positive Integer")
self._max_poll_records = max_poll_records
self._consumer_timeout = consumer_timeout_ms / 1000
self._check_crcs = check_crcs
self._subscription = SubscriptionState(auto_offset_reset)
Expand Down Expand Up @@ -588,7 +595,7 @@ def getone(self, *partitions):
return msg

@asyncio.coroutine
def getmany(self, *partitions, timeout_ms=0):
def getmany(self, *partitions, timeout_ms=0, max_records=None):
"""Get messages from assigned topics / partitions.
Prefetched messages are returned in batches by topic-partition.
Expand Down Expand Up @@ -622,9 +629,14 @@ def getmany(self, *partitions, timeout_ms=0):
"""
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
if max_records is not None and (
not isinstance(max_records, int) or max_records < 1):
raise ValueError("`max_records` must be a positive Integer")

timeout = timeout_ms / 1000
records = yield from self._fetcher.fetched_records(partitions, timeout)
records = yield from self._fetcher.fetched_records(
partitions, timeout,
max_records=max_records or self._max_poll_records)
return records

if PY_35:
Expand Down
30 changes: 21 additions & 9 deletions aiokafka/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,16 @@ def getone(self):
self._subscriptions.assignment[tp].position += 1
return msg

def getall(self):
def getall(self, max_records=None):
tp = self._topic_partition
if not self._check_assignment(tp):
return []

if max_records is None:
max_records = len(self._messages)
ret_list = []
while True:
if not self._messages:
if not self._messages or len(ret_list) == max_records:
return ret_list

msg = self._messages.popleft()
Expand All @@ -89,6 +91,9 @@ def getall(self):
self._subscriptions.assignment[tp].position += 1
ret_list.append(msg)

def has_more(self):
return bool(self._messages)


class FetchError:
def __init__(self, *, loop, error, backoff):
Expand Down Expand Up @@ -625,7 +630,7 @@ def next_record(self, partitions):
return (yield from self.next_record(partitions))

@asyncio.coroutine
def fetched_records(self, partitions, timeout=0):
def fetched_records(self, partitions, timeout=0, max_records=None):
""" Returns previously fetched records and updates consumed offsets.
"""
drained = {}
Expand All @@ -634,12 +639,18 @@ def fetched_records(self, partitions, timeout=0):
continue
res_or_error = self._records[tp]
if type(res_or_error) == FetchResult:
drained[tp] = res_or_error.getall()
# We processed all messages - request new ones
del self._records[tp]
self._notify(self._wait_consume_future)
drained[tp] = res_or_error.getall(max_records)
if max_records is not None:
max_records -= len(drained[tp])
assert max_records >= 0 # Just in case
if max_records == 0:
break
if not res_or_error.has_more():
# We processed all messages - request new ones
del self._records[tp]
self._notify(self._wait_consume_future)
else:
# We already got some of messages from other partition -
# We already got some messages from another partition -
# return them. We will raise this error on next call
if drained:
return drained
Expand All @@ -658,7 +669,8 @@ def fetched_records(self, partitions, timeout=0):
[self._wait_empty_future], timeout=timeout, loop=self._loop)

if done:
return (yield from self.fetched_records(partitions, 0))
return (yield from self.fetched_records(
partitions, 0, max_records=max_records))
return {}

def _unpack_message_set(self, tp, messages):
Expand Down
3 changes: 3 additions & 0 deletions docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ versions:
-
kafka: "0.10.0.0"
scala: "2.11"
-
kafka: "0.10.0.1"
scala: "2.11"
-
kafka: "0.10.1.0"
scala: "2.11"
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import re, os.path

def get_release():
regexp = re.compile(r"^__version__\W*=\W*'([\d.abrc]+)'")
regexp = re.compile(r"^__version__\W*=\W*'([\d.abrcdev]+)'")
here = os.path.dirname(__file__)
root = os.path.dirname(here)
init_py = os.path.join(root, 'aiokafka', '__init__.py')
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def read(f):


def read_version():
regexp = re.compile(r"^__version__\W*=\W*'([\d.abrc]+)'")
regexp = re.compile(r"^__version__\W*=\W*'([\d.abrcdev]+)'")
init_py = os.path.join(os.path.dirname(__file__),
'aiokafka', '__init__.py')
with open(init_py) as f:
Expand Down
2 changes: 1 addition & 1 deletion tests/_testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def run_until_complete(fun):
def wrapper(test, *args, **kw):
loop = test.loop
ret = loop.run_until_complete(
asyncio.wait_for(fun(test, *args, **kw), 15, loop=loop))
asyncio.wait_for(fun(test, *args, **kw), 30, loop=loop))
return ret
return wrapper

Expand Down
2 changes: 1 addition & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_init_with_list(self):
loop=self.loop, bootstrap_servers=[
'127.0.0.1:9092', '127.0.0.2:9092', '127.0.0.3:9092'])
self.assertEqual(
'<AIOKafkaClient client_id=aiokafka-0.1.4>', client.__repr__())
'<AIOKafkaClient client_id=aiokafka-0.2.0.dev>', client.__repr__())
self.assertEqual(
sorted([('127.0.0.1', 9092, socket.AF_INET),
('127.0.0.2', 9092, socket.AF_INET),
Expand Down
28 changes: 27 additions & 1 deletion tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,4 +465,30 @@ def test_equal_consumption(self):
diff = abs(partition_consumption[0] - partition_consumption[1])
# We are good as long as it's not 100%, as we do rely on randomness of
# a shuffle in code. Ideally it should be 50/50 (0 diff) thou
self.assertNotEqual(diff / sum(partition_consumption), 1.0)
self.assertLess(diff / sum(partition_consumption), 1.0)

@run_until_complete
def test_max_poll_records(self):
# A strange use case of kafka-python, that can be reproduced in
# aiokafka https://github.com/dpkp/kafka-python/issues/675
yield from self.send_messages(0, list(range(100)))

consumer = yield from self.consumer_factory(
max_poll_records=48)
data = yield from consumer.getmany(timeout_ms=1000)
count = sum(map(len, data.values()))
self.assertEqual(count, 48)
data = yield from consumer.getmany(timeout_ms=1000, max_records=42)
count = sum(map(len, data.values()))
self.assertEqual(count, 42)
data = yield from consumer.getmany(timeout_ms=1000, max_records=None)
count = sum(map(len, data.values()))
self.assertEqual(count, 10)

with self.assertRaises(ValueError):
data = yield from consumer.getmany(max_records=0)
yield from consumer.stop()

with self.assertRaises(ValueError):
consumer = yield from self.consumer_factory(
max_poll_records=0)

0 comments on commit bd671db

Please sign in to comment.