Skip to content

Commit

Permalink
Admin client (#811)
Browse files Browse the repository at this point in the history
* Add basic admin client functionality and tests

* Admin functionality plus tests

Topic, consumer groups, metadata and configs and admin functionality
implemented. Most methods "borrow" heavily from kafka-python when it
comes to protocol classes or request parameter types

Unrelated fix for a Makefile description typo

* Review changes

* Test describe_configs for BROKER

* Test descibe_cluser()

* Test describe_consumer_groups()

* Test list_consumer_group_offsets()

* Add CHANGES line

Co-authored-by: Tincu Gabriel <gabi@aiven.io>
  • Loading branch information
ods and Tincu Gabriel committed Jan 1, 2022
1 parent ced4b62 commit ccbcc25
Show file tree
Hide file tree
Showing 7 changed files with 755 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES/811.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add basic admin client functionality (pr #811 started by @gabriel-tincu)
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ To run tests with a specific version of Kafka (default one is 1.0.2) use KAFKA_V

Test running cheatsheat:

* ``make test FLAGS="-l -x --ff"`` - run until 1 failure, rerun failed tests fitst. Great for cleaning up a lot of errors, say after a big refactor.
* ``make test FLAGS="-l -x --ff"`` - run until 1 failure, rerun failed tests first. Great for cleaning up a lot of errors, say after a big refactor.
* ``make test FLAGS="-k consumer"`` - run only the consumer tests.
* ``make test FLAGS="-m 'not ssl'"`` - run tests excluding ssl.
* ``make test FLAGS="--no-pull"`` - do not try to pull new docker image before test run.
Expand Down
540 changes: 540 additions & 0 deletions aiokafka/admin.py

Large diffs are not rendered by default.

Empty file added aiokafka/protocol/__init__.py
Empty file.
10 changes: 7 additions & 3 deletions tests/_testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,17 @@ def silence_loop_exception_handler(self):
yield
self.loop.set_exception_handler(orig_handler)

def random_topic_name(self):
return "topic-{}-{}".format(
self.id()[self.id().rindex(".") + 1:],
random_string(10).decode('utf-8')
)

def setUp(self):
super().setUp()
self._messages = {}
if not self.topic:
self.topic = "topic-{}-{}".format(
self.id()[self.id().rindex(".") + 1:],
random_string(10).decode('utf-8'))
self.topic = self.random_topic_name()
self._cleanup = []

def tearDown(self):
Expand Down
204 changes: 204 additions & 0 deletions tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import asyncio

from kafka.admin import NewTopic, NewPartitions
from kafka.admin.config_resource import ConfigResource, ConfigResourceType

from aiokafka.admin import AIOKafkaAdminClient
from aiokafka.consumer import AIOKafkaConsumer
from aiokafka.producer import AIOKafkaProducer
from aiokafka.structs import TopicPartition
from ._testutil import (
KafkaIntegrationTestCase, kafka_versions, run_until_complete
)


class TestAdmin(KafkaIntegrationTestCase):
async def create_admin(self):
admin = AIOKafkaAdminClient(bootstrap_servers=self.hosts)
await admin.start()
self.add_cleanup(admin.close)
return admin

@kafka_versions('>=0.10.0.0')
@run_until_complete
async def test_metadata(self):
admin = await self.create_admin()
metadata = await admin._get_cluster_metadata()
assert metadata.brokers is not None
assert metadata.topics is not None
assert len(metadata.brokers) == 1

@kafka_versions('>=0.10.1.0')
@run_until_complete
async def test_create_topics(self):
admin = await self.create_admin()
resp = await admin.create_topics([NewTopic(self.topic, 1, 1)])
assert resp.topic_errors is not None
assert len(resp.topic_errors) == 1
topic, error_code, error = resp.topic_errors[0]
assert topic == self.topic
assert error_code == 0
assert not error

@kafka_versions('>=0.10.1.0') # Since we use `create_topics()`
@run_until_complete
async def test_list_topics(self):
admin = await self.create_admin()
topic_names = {self.random_topic_name(), self.random_topic_name()}
topics = [NewTopic(tn, 1, 1) for tn in topic_names]
await admin.create_topics(topics)
actual = await admin.list_topics()
assert set(actual) >= topic_names

# @kafka_versions('>=0.10.1.0')
@kafka_versions('>=1.0.0') # XXX Times out with 0.10.2.1 and 0.11.0.3
@run_until_complete
async def test_delete_topics(self):
admin = await self.create_admin()
resp = await admin.create_topics([NewTopic(self.topic, 1, 1)])
assert resp.topic_errors[0][2] is None
topics = await admin.list_topics()
assert self.topic in topics
resp = await admin.delete_topics([self.topic])
errors = resp.topic_error_codes
assert len(errors) == 1
topic, error_code = errors[0]
assert topic == self.topic
assert error_code == 0
topics = await admin.list_topics()
assert self.topic not in topics

@kafka_versions('>=0.11.0.0')
@run_until_complete
async def test_describe_configs_topic(self):
admin = await self.create_admin()
await admin.create_topics([NewTopic(self.topic, 1, 1)])
cr = ConfigResource(ConfigResourceType.TOPIC, self.topic)
resp = await admin.describe_configs([cr])
assert len(resp) == 1
assert len(resp[0].resources) == 1
config_resource = resp[0].resources[0]
error_code, error_message, resource_type, resource_name, *_ = config_resource
assert error_code == 0
assert not error_message # None or "" depending on kafka version
assert resource_type == ConfigResourceType.TOPIC
assert resource_name == self.topic

@kafka_versions('>=0.11.0.0')
@run_until_complete
async def test_describe_configs_broker(self):
admin = await self.create_admin()
[broker_id] = admin._client.cluster._brokers.keys()
cr = ConfigResource(ConfigResourceType.BROKER, broker_id)
resp = await admin.describe_configs([cr])
assert len(resp) == 1
assert len(resp[0].resources) == 1
config_resource = resp[0].resources[0]
error_code, error_message, resource_type, resource_name, *_ = config_resource
assert error_code == 0
assert not error_message # None or "" depending on kafka version
assert resource_type == ConfigResourceType.BROKER
assert resource_name == str(broker_id)

@kafka_versions('>=0.11.0.0')
@run_until_complete
async def test_alter_configs(self):
admin = await self.create_admin()
await admin.create_topics([NewTopic(self.topic, 1, 1)])
cr = ConfigResource(
ConfigResourceType.TOPIC, self.topic, {"cleanup.policy": "delete"}
)
await admin.alter_configs([cr])
new_configs_resp = await admin.describe_configs([cr])
assert len(new_configs_resp) == 1
assert len(new_configs_resp[0].resources) == 1
config_entries = new_configs_resp[0].resources[0][4]
assert len(config_entries) == 1
name, value, *_ = config_entries[0]
assert name == "cleanup.policy"
assert value == "delete"

@kafka_versions('>=0.10.0.0')
@run_until_complete
async def test_describe_cluster(self):
admin = await self.create_admin()
[broker_id] = admin._client.cluster._brokers.keys()
resp = await admin.describe_cluster()
assert len(resp['brokers']) == 1
assert resp['brokers'][0]['node_id'] == broker_id

@kafka_versions('>=1.0.0')
@run_until_complete
async def test_create_partitions(self):
admin = await self.create_admin()
await admin.create_topics([NewTopic(self.topic, 1, 1)])
old_desc = await admin.describe_topics([self.topic])
old_partitions = {p["partition"] for p in old_desc[0]["partitions"]}
assert len(old_partitions) == 1

new_part = NewPartitions(total_count=2)
await admin.create_partitions({self.topic: new_part})
new_desc = await admin.describe_topics([self.topic])
new_partitions = {p["partition"] for p in new_desc[0]["partitions"]}
assert len(new_partitions) == 2
assert new_partitions > old_partitions

@kafka_versions('>=0.10.0.0')
@run_until_complete
async def test_list_consumer_groups(self):
admin = await self.create_admin()
group_id = f'group-{self.id()}'
consumer = AIOKafkaConsumer(
self.topic, group_id=group_id, bootstrap_servers=self.hosts
)
await consumer.start()
self.add_cleanup(consumer.stop)
await asyncio.sleep(0.1) # Otherwise we can get GroupLoadInProgressError

resp = await admin.list_consumer_groups()
assert len(resp) >= 1 # There can be group left from other test
groups = [group for group, *_ in resp]
assert group_id in groups

@kafka_versions('>=0.10.0.0')
@run_until_complete
async def test_describe_consumer_groups(self):
admin = await self.create_admin()
group_id = f'group-{self.id()}'
consumer = AIOKafkaConsumer(
self.topic, group_id=group_id, bootstrap_servers=self.hosts
)
await consumer.start()
self.add_cleanup(consumer.stop)

resp = await admin.describe_consumer_groups([group_id])
assert len(resp) == 1
assert len(resp[0].groups) == 1
error_code, group, *_ = resp[0].groups[0]
assert error_code == 0
assert group == group_id

@kafka_versions('>=0.10.0.0')
@run_until_complete
async def test_list_consumer_group_offsets(self):
admin = await self.create_admin()
group_id = f'group-{self.id()}'

consumer = AIOKafkaConsumer(
self.topic, group_id=group_id, bootstrap_servers=self.hosts,
enable_auto_commit=False
)
await consumer.start()
self.add_cleanup(consumer.stop)

async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer:
await producer.send_and_wait(self.topic, b'some-message')
await producer.send_and_wait(self.topic, b'other-message')

msg = await consumer.getone()
await consumer.commit()
resp = await admin.list_consumer_group_offsets(group_id)
tp = TopicPartition(msg.topic, msg.partition)
assert resp[tp].offset == msg.offset + 1
resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp])
assert resp[tp].offset == msg.offset + 1
4 changes: 2 additions & 2 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,8 @@ async def _test_compress_decompress(self, compression_type):
await self.wait_topic(producer.client, self.topic)
msg1 = b'some-message' * 10
msg2 = b'other-message' * 30
await (await producer.send(self.topic, msg1, partition=1))
await (await producer.send(self.topic, msg2, partition=1))
await producer.send_and_wait(self.topic, msg1, partition=1)
await producer.send_and_wait(self.topic, msg2, partition=1)

consumer = await self.consumer_factory()
rmsg1 = await consumer.getone()
Expand Down

0 comments on commit ccbcc25

Please sign in to comment.