Skip to content
asyncio client for kafka
Python C Other
Branch: master
Clone or download
tvoinarovskyi Merge pull request #560 from aio-libs/pyup-scheduled-update-2019-10-14
Scheduled weekly dependency update for week 41
Latest commit 8ebde79 Oct 17, 2019
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
aiokafka added headers param to Producer.send_and_wait Sep 5, 2019
benchmark Fix documentation for benchmark Aug 2, 2019
docker Add unstaged docker/scripts/kafka_server_gssapi_jaas.conf Mar 9, 2019
docs fix typo in error handling code sample Oct 17, 2019
examples Fix abort transaction. Add more tests and examples Nov 3, 2018
tests added headers param to Producer.send_and_wait Sep 5, 2019
tools Fix appveyor Jun 28, 2019
.appveyor.yml Fix appveyor Jun 28, 2019
.codecov.yml Try to fix coverage Jun 29, 2019
.gitignore edit kdc docker conf and gssapi unit test Mar 5, 2019
.pyup.yml Add pyup config Sep 2, 2018
.travis.yml Added krb5-user to travis Mar 5, 2019
CHANGES.rst Add news fragment for changes. Oct 4, 2019
LICENSE Added some documentation fixes Mar 29, 2016
MANIFEST.in Fixed packaging issue with including tests and not including CHANGES.… Apr 15, 2016
Makefile Return run for all tests Jul 28, 2019
README.rst * Add a few fixes for docs. Sep 24, 2018
benchmark_results_v0.4.0_vs_v0.3.1.rst Add changes for release and benchmark results on latest master Jan 29, 2018
dump_pem.py Added SSL support, docs and tests Nov 23, 2016
gen-ssl-certs.sh Fix issues with Python3.7 test run Aug 24, 2018
pytest.ini Added Topic resource ACL suppor Dec 31, 2018
requirements-ci.txt Update xxhash from 1.4.1 to 1.4.2 Oct 14, 2019
requirements-dev.txt Update diff-cover from 2.2.0 to 2.3.0 Jul 1, 2019
requirements-docs.txt Update sphinx from 2.1.2 to 2.2.0 Sep 2, 2019
requirements-win-test.txt Update xxhash from 1.4.1 to 1.4.2 Oct 14, 2019
setup.py Upgrade to kafka-python 1.4.7. Oct 4, 2019

README.rst

aiokafka

|Build status| |Coverage| |Chat on Gitter|

asyncio client for Kafka

AIOKafkaProducer

AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

from aiokafka import AIOKafkaProducer
import asyncio

loop = asyncio.get_event_loop()

async def send_one():
    producer = AIOKafkaProducer(
        loop=loop, bootstrap_servers='localhost:9092')
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()
    try:
        # Produce message
        await producer.send_and_wait("my_topic", b"Super message")
    finally:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

loop.run_until_complete(send_one())

AIOKafkaConsumer

AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

Example of AIOKafkaConsumer usage:

from aiokafka import AIOKafkaConsumer
import asyncio

loop = asyncio.get_event_loop()

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        loop=loop, bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

loop.run_until_complete(consume())

Running tests

Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that lz4 compression libraries for python will require python-dev package, or python source header files for compilation on Linux. NOTE: You will also need a valid java installation. It's required for the keytool utility, used to generate ssh keys for some tests.

Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+):

sudo apt-get install -y libsnappy-dev
make setup

Running tests with coverage:

make cov

To run tests with a specific version of Kafka (default one is 1.0.2) use KAFKA_VERSION variable:

make cov KAFKA_VERSION=0.10.2.1

Test running cheatsheat:

  • make test FLAGS="-l -x --ff" - run until 1 failure, rerun failed tests fitst. Great for cleaning up a lot of errors, say after a big refactor.
  • make test FLAGS="-k consumer" - run only the consumer tests.
  • make test FLAGS="-m 'not ssl'" - run tests excluding ssl.
  • make test FLAGS="--no-pull" - do not try to pull new docker image before test run.
You can’t perform that action at this time.