Skip to content

Commit

Permalink
Merge pull request #70 from aio-libs/difference_docs
Browse files Browse the repository at this point in the history
Added some docs showing kafka-python and aiokafka differences
  • Loading branch information
tvoinarovskyi committed Nov 15, 2016
2 parents e0ee9b3 + 5a8ad95 commit f0a55d3
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 1 deletion.
2 changes: 1 addition & 1 deletion aiokafka/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def _fetch_requests_routine(self):
The algorithm:
* Group partitions per node, which is the leader for it.
* If all partitions for this node need prefetch - do it right alway
* If all partitions for this node need prefetch - do it right away
* If any partition has some data (in `self._records`) wait up till
`prefetch_backoff` so application can consume data from it.
* If data in `self._records` is not consumed up to
Expand Down
5 changes: 5 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
.. _api-doc:

API Documentation
=================

.. _aiokafka-producer:

AIOKafkaProducer class
----------------------

.. autoclass:: aiokafka.AIOKafkaProducer
:members:


.. _aiokafka-consumer:

AIOKafkaConsumer class
----------------------

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ Contents:
.. toctree::
:maxdepth: 2

kafka-python_difference
api
examples

Expand Down
110 changes: 110 additions & 0 deletions docs/kafka-python_difference.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
Difference between aiokafka and kafka-python
--------------------------------------------

.. _kip-41:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records

.. _kip-62:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

Why do we need another library?
===============================

``kafka-python`` is a great project, which tries to fully mimic the interface
of **Java Client API**. It is more *feature* oriented, rather than *speed*, but
still gives quite good throughput. It's actively developed and is fast to react
to changes in the Java client.

While ``kafka-python`` has a lot of great features it is made to be used in a
**Threaded** environment. Even more, it mimics Java's client, making it
**Java's threaded** environment, which does not have that much of
`asynchronous` ways of doing things. It's not **bad** as Java's Threads are
very powerful with the ability to use multiple cores.

The API itself just can't be adopted to be used in an asynchronous way (even
thou the library does asyncronous IO using `selectors`). It has too much
blocking behavior including `blocking` socket usage, threading synchronization,
etc. Examples would be:

* `bootstrap`, which blocks in the constructor itself
* blocking iterator for consumption
* sending produce requests block it buffer is full

All those can't be changed to use `Future` API seamlessly. So to get a normal,
non-blocking interface based on Future's and coroutines a new library needs to
be done


API differences and rationale
=============================

``aiokafka`` has some differences in API design. While the **Producer** is
mostly the same, **Consumer** has some significant differences, that we want
to talk about.


Consumer has no `poll()` method
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In ``kafka-python`` ``KafkaConsumer.poll()`` is a blocking call that performs
not only message fetching, but also:

* Socket polling using `epoll`, `kqueue` or other available API of your OS.
* Sending Heartbeats

This will never be a case where you own the IO loop, at least not with socket
polling. To avoid misunderstandings as to why does those methods behave in a
different way :ref:`aiokafka-consumer` exposes this interface under the name
``getmany()`` with some other differences described below.


Heartbeats are sent between `getmany()` calls
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In original Kafka Java Client heartbeats aren't sent if ``poll()`` is not
called. This can lead to a lot of issues (leading to `KIP-41`_ and `KIP-62`_
proposals) and workarounds using `pause()` and `poll(0)` for heartbeats.

``aiokafka`` delegates heartbeating to a background *Task* and will send
heartbeats to Coordinator as long as the *event loop* is running.


Prefetching is more sophisticated
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In Kafka Java Client and ``python-kafka`` the prefetching is very simple, as
it only performs prefetches:

* in ``poll()`` call if we don't have enough data stored to satisfy another
``poll()``
* in the *iterator* interface if we have processed *nearly* all data.

A very simplified version would be:

.. code:: python
def poll():
max_records = self.config['max_poll_records']
records = consumer.fethed_records(max_records)
if not consumer.has_enough_records(max_records)
consumer.send_fetches() # prefetch another batch
return records
This works great for throughput as the algorithm is simple and we pipeline
IO task with record processing.

But it does not perform as great in case of **semantic partitioning**, where
you may have per-partition processing. In this case latency will be bound to
the time of processing of data in all topics.

Which is why ``aiokafka`` tries to do prefetches **per partition**. For
example, if we processed all data pending for a partition in *iterator*
interface, ``aiokafka`` will *try* to prefetch new data right away. The same
interface can be somehow build on top of ``kafka-python``'s *pause* API, but
you will require a lot of code.

.. note::

Using ``getmany()`` without specifying partitions will result in the same
prefetch behaviour as using ``poll()``

0 comments on commit f0a55d3

Please sign in to comment.