Skip to content

Commit

Permalink
Merge pull request #60 from fabregas/kafka0.10_support
Browse files Browse the repository at this point in the history
Kafka0.10 support
  • Loading branch information
tvoinarovskyi committed Nov 1, 2016
2 parents 835413e + 1b40e10 commit 4736ea1
Show file tree
Hide file tree
Showing 17 changed files with 252 additions and 83 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ install:
- pip install pytest pytest-cov pytest-catchlog docker-py
- pip install coveralls
- pip install python-snappy
- pip install lz4tools xxhash
- pip install .

script:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

FLAGS=
SCALA_VERSION?=2.11
KAFKA_VERSION?=0.9.0.1
KAFKA_VERSION?=0.10.0.0
DOCKER_IMAGE=aiolibs/kafka:$(SCALA_VERSION)_$(KAFKA_VERSION)

flake:
Expand Down
7 changes: 4 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ Example of AIOKafkaConsumer usage:
while True:
try:
msg = yield from consumer.getone()
print("consumed: ", msg.topic, msg.partition, msg.offset, msg.value)
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
except KafkaError as err:
print("error while consuming message: ", err)
Expand All @@ -88,12 +89,12 @@ Docker is required to run tests. See https://docs.docker.com/engine/installation
Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+)::

sudo apt-get install -y libsnappy-dev
pip install flake8 pytest pytest-cov pytest-catchlog docker-py python-snappy coveralls .
pip install -r requirements-dev.txt .

Running tests::

make cov

To run tests with a specific version of Kafka (default one is 0.9.0.1) use KAFKA_VERSION variable::

make cov KAFKA_VERSION=0.8.2.1
make cov KAFKA_VERSION=0.10.0.0
49 changes: 41 additions & 8 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
KafkaTimeoutError,
UnrecognizedBrokerVersion)
from kafka.cluster import ClusterMetadata
from kafka.protocol.metadata import MetadataRequest_v0 as MetadataRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.produce import ProduceRequest

from aiokafka.conn import create_conn
Expand All @@ -26,8 +26,10 @@ class AIOKafkaClient:
"""This class implements interface for interact with Kafka cluster"""

def __init__(self, *, loop, bootstrap_servers='localhost',
client_id='aiokafka-'+__version__, metadata_max_age_ms=300000,
request_timeout_ms=40000):
client_id='aiokafka-' + __version__,
metadata_max_age_ms=300000,
request_timeout_ms=40000,
api_version='auto'):
"""Initialize an asynchronous kafka client
Keyword Arguments:
Expand All @@ -48,11 +50,16 @@ def __init__(self, *, loop, bootstrap_servers='localhost',
which we force a refresh of metadata even if we haven't seen
any partition leadership changes to proactively discover any
new brokers or partitions. Default: 300000
api_version (str): specify which kafka API version to use.
AIOKafka supports Kafka API versions >=0.9 only.
If set to 'auto', will attempt to infer the broker version by
probing various APIs. Default: auto
"""
self._bootstrap_servers = bootstrap_servers
self._client_id = client_id
self._metadata_max_age_ms = metadata_max_age_ms
self._request_timeout_ms = request_timeout_ms
self._api_version = api_version

self.cluster = ClusterMetadata(metadata_max_age_ms=metadata_max_age_ms)
self._topics = set() # empty set will fetch all topic metadata
Expand All @@ -67,6 +74,13 @@ def __init__(self, *, loop, bootstrap_servers='localhost',
def __repr__(self):
return '<AIOKafkaClient client_id=%s>' % self._client_id

@property
def api_version(self):
if type(self._api_version) is tuple:
return self._api_version
# unknown api version, return minimal supported version
return (0, 9)

@property
def hosts(self):
return collect_hosts(self._bootstrap_servers)
Expand All @@ -86,7 +100,8 @@ def close(self):
@asyncio.coroutine
def bootstrap(self):
"""Try to to bootstrap initial cluster metadata"""
metadata_request = MetadataRequest([])
# using request v0 for bootstap (bcs api version is not detected yet)
metadata_request = MetadataRequest[0]([])
for host, port, _ in self.hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)

Expand Down Expand Up @@ -121,6 +136,13 @@ def bootstrap(self):
raise ConnectionError(
'Unable to bootstrap from {}'.format(self.hosts))

# detect api version if need
if self._api_version == 'auto':
self._api_version = yield from self.check_version()
if type(self._api_version) is not tuple:
self._api_version = tuple(
map(int, self._api_version.split('.')))

if self._sync_task is None:
# starting metadata synchronizer task
self._sync_task = ensure_future(
Expand Down Expand Up @@ -155,7 +177,11 @@ def get_random_node(self):
@asyncio.coroutine
def _metadata_update(self, cluster_metadata, topics):
assert isinstance(cluster_metadata, ClusterMetadata)
metadata_request = MetadataRequest(list(topics))
topics = list(topics)
version_id = 0 if self.api_version < (0, 10) else 1
if version_id == 1 and not topics:
topics = None
metadata_request = MetadataRequest[version_id](topics)
nodeids = [b.nodeId for b in self.cluster.brokers()]
if 'bootstrap' in self._conns:
nodeids.append('bootstrap')
Expand Down Expand Up @@ -310,11 +336,13 @@ def check_version(self, node_id=None):
assert self.cluster.brokers(), 'no brokers in metadata'
node_id = list(self.cluster.brokers())[0].nodeId

from kafka.protocol.admin import ListGroupsRequest_v0
from kafka.protocol.admin import (
ListGroupsRequest_v0, ApiVersionRequest_v0)
from kafka.protocol.commit import (
OffsetFetchRequest_v0, GroupCoordinatorRequest_v0)
from kafka.protocol.metadata import MetadataRequest_v0
test_cases = [
('0.10', ApiVersionRequest_v0()),
('0.9', ListGroupsRequest_v0()),
('0.8.2', GroupCoordinatorRequest_v0('aiokafka-default-group')),
('0.8.1', OffsetFetchRequest_v0('aiokafka-default-group', [])),
Expand All @@ -335,8 +363,13 @@ def check_version(self, node_id=None):
if not conn.connected():
yield from conn.connect()
assert conn, 'no connection to node with id {}'.format(node_id)
yield from conn.send(request)
except KafkaError:
# request can be ignored by Kafka broker,
# so we send metadata request and wait response
task = self._loop.create_task(conn.send(request))
yield from asyncio.wait([task], timeout=0.1, loop=self._loop)
yield from self.fetch_all_metadata()
yield from task
except (KafkaError, asyncio.CancelledError):
continue
else:
return version
Expand Down
2 changes: 2 additions & 0 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def _read(self):
for _, _, fut in self._requests:
fut.set_exception(conn_exc)
self.close()
except asyncio.CancelledError:
pass

def _next_correlation_id(self):
self._correlation_id = (self._correlation_id + 1) % 2**31
Expand Down
21 changes: 7 additions & 14 deletions aiokafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class AIOKafkaConsumer(object):
"""
def __init__(self, *topics, loop,
bootstrap_servers='localhost',
client_id='aiokafka-'+__version__,
client_id='aiokafka-' + __version__,
group_id=None,
key_deserializer=None, value_deserializer=None,
fetch_max_wait_ms=500,
Expand All @@ -137,14 +137,14 @@ def __init__(self, *topics, loop,
session_timeout_ms=30000,
consumer_timeout_ms=200,
api_version='auto'):
if api_version not in ('auto', '0.9'):
if api_version not in ('auto', '0.9', '0.10'):
raise ValueError("Unsupported Kafka API version")
self._client = AIOKafkaClient(
loop=loop, bootstrap_servers=bootstrap_servers,
client_id=client_id, metadata_max_age_ms=metadata_max_age_ms,
request_timeout_ms=request_timeout_ms)
request_timeout_ms=request_timeout_ms,
api_version=api_version)

self._api_version = api_version
self._group_id = group_id
self._heartbeat_interval_ms = heartbeat_interval_ms
self._retry_backoff_ms = retry_backoff_ms
Expand Down Expand Up @@ -172,16 +172,9 @@ def __init__(self, *topics, loop,
def start(self):
yield from self._client.bootstrap()

# Check Broker Version if not set explicitly
if self._api_version == 'auto':
self._api_version = yield from self._client.check_version()
# Convert api_version config to tuple for easy comparisons
self._api_version = tuple(
map(int, self._api_version.split('.')))

if self._api_version < (0, 9):
raise ValueError(
"Unsupported Kafka version: {}".format(self._api_version))
if self._client.api_version < (0, 9):
raise ValueError("Unsupported Kafka version: {}".format(
self._client.api_version))

self._fetcher = Fetcher(
self._client, self._subscription, loop=self._loop,
Expand Down
54 changes: 49 additions & 5 deletions aiokafka/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import kafka.common as Errors
from kafka.common import TopicPartition
from kafka.protocol.fetch import FetchRequest_v0 as FetchRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import (
OffsetRequest_v0 as OffsetRequest, OffsetResetStrategy)
Expand All @@ -16,7 +16,9 @@


ConsumerRecord = collections.namedtuple(
"ConsumerRecord", ["topic", "partition", "offset", "key", "value"])
"ConsumerRecord", ["topic", "partition", "offset", "timestamp",
"timestamp_type", "key", "value", "checksum",
"serialized_key_size", "serialized_value_size"])


class NoOffsetForPartitionError(Errors.KafkaError):
Expand Down Expand Up @@ -170,6 +172,9 @@ def __init__(self, client, subscriptions, *, loop,
self._wait_consume_future = None
self._wait_empty_future = None

req_version = 2 if client.api_version >= (0, 10) else 1
self._fetch_request_class = FetchRequest[req_version]

self._fetch_task = ensure_future(
self._fetch_requests_routine(), loop=loop)

Expand Down Expand Up @@ -312,7 +317,7 @@ def _create_fetch_requests(self):
if node_id in backoff_by_nodes:
# At least one partition is still waiting to be consumed
continue
req = FetchRequest(
req = self._fetch_request_class(
-1, # replica_id
self._fetch_max_wait_ms,
self._fetch_min_bytes,
Expand Down Expand Up @@ -655,11 +660,50 @@ def _unpack_message_set(self, tp, messages):
if self._check_crcs and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
elif msg.is_compressed():
yield from self._unpack_message_set(tp, msg.decompress())
# If relative offset is used, we need to decompress the entire
# message first to compute the absolute offset.
inner_mset = msg.decompress()
if msg.magic > 0:
last_offset, _, _ = inner_mset[-1]
absolute_base_offset = offset - last_offset
else:
absolute_base_offset = -1

for inner_offset, inner_size, inner_msg in inner_mset:
if msg.magic > 0:
# When magic value is greater than 0, the timestamp
# of a compressed message depends on the
# typestamp type of the wrapper message:
if msg.timestamp_type == 0: # CREATE_TIME (0)
inner_timestamp = inner_msg.timestamp
elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
inner_timestamp = msg.timestamp
else:
raise ValueError('Unknown timestamp type: {0}'
.format(msg.timestamp_type))
else:
inner_timestamp = msg.timestamp

if absolute_base_offset >= 0:
inner_offset += absolute_base_offset

key, value = self._deserialize(inner_msg)
yield ConsumerRecord(
tp.topic, tp.partition, inner_offset,
inner_timestamp, msg.timestamp_type,
key, value, inner_msg.crc,
len(inner_msg.key)
if inner_msg.key is not None else -1,
len(inner_msg.value)
if inner_msg.value is not None else -1)
else:
key, value = self._deserialize(msg)
yield ConsumerRecord(
tp.topic, tp.partition, offset, key, value)
tp.topic, tp.partition, offset,
msg.timestamp, msg.timestamp_type,
key, value, msg.crc,
len(msg.key) if msg.key is not None else -1,
len(msg.value) if msg.value is not None else -1)

def _deserialize(self, msg):
if self._key_deserializer:
Expand Down

0 comments on commit 4736ea1

Please sign in to comment.