In [None]:
# | default_exp _components.aiokafka_consumer_loop

In [None]:
# | export


from asyncio import iscoroutinefunction, Task  # do not use the version from inspect
from typing import *
from dataclasses import dataclass

import asyncer
from aiokafka import AIOKafkaConsumer
from aiokafka.structs import ConsumerRecord
from pydantic import BaseModel
from pydantic.main import ModelMetaclass

from fastkafka._components.logger import get_logger
from fastkafka._components.meta import delegates, export
from fastkafka._components.task_streaming import get_executor, StreamExecutor

In [None]:
import asyncio
import pytest
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, Mock, call, patch, create_autospec

import anyio
from aiokafka.structs import TopicPartition
from pydantic import Field, HttpUrl, NonNegativeInt
from tqdm.notebook import tqdm

from fastkafka._components.helpers import true_after
from fastkafka._components.logger import suppress_timestamps
from fastkafka._helpers import produce_messages
from fastkafka.encoder import avro_decoder, avro_encoder, json_decoder
from fastkafka.testing import ApacheKafkaBroker

In [None]:
# | notest
# allows async calls in notebooks

import nest_asyncio

In [None]:
# | notest

nest_asyncio.apply()

In [None]:
# | export

logger = get_logger(__name__)

In [None]:
suppress_timestamps()
logger = get_logger(__name__, level=20)
logger.info("ok")

[INFO] __main__: ok


In [None]:
class MyMessage(BaseModel):
    url: HttpUrl = Field(..., example="http://www.acme.com", description="Url example")
    port: NonNegativeInt = Field(1000)

In [None]:
# | export


@dataclass
@export("fastkafka")
class EventMetadata:
    """A class for encapsulating Kafka record metadata.

    Args:
        topic: The topic this record is received from
        partition: The partition from which this record is received
        offset: The position of this record in the corresponding Kafka partition
        timestamp: The timestamp of this record
        timestamp_type: The timestamp type of this record
        key: The key (or `None` if no key is specified)
        value: The value
        serialized_key_size: The size of the serialized, uncompressed key in bytes
        serialized_value_size: The size of the serialized, uncompressed value in bytes
        headers: The headers
    """

    topic: str
    partition: int
    offset: int
    timestamp: int
    timestamp_type: int
    key: Optional[bytes]
    value: Optional[bytes]
    checksum: int
    serialized_key_size: int
    serialized_value_size: int
    headers: Sequence[Tuple[str, bytes]]

    @staticmethod
    def create_event_metadata(record: ConsumerRecord) -> "EventMetadata":  # type: ignore
        """Creates an instance of EventMetadata from a ConsumerRecord.

        Args:
            record: The Kafka ConsumerRecord.

        Returns:
            The created EventMetadata instance.
        """
        return EventMetadata(
            topic=record.topic,
            partition=record.partition,
            offset=record.offset,
            timestamp=record.timestamp,
            timestamp_type=record.timestamp_type,
            value=record.value,
            checksum=record.checksum,
            key=record.key,
            serialized_key_size=record.serialized_key_size,
            serialized_value_size=record.serialized_value_size,
            headers=record.headers,
        )

In [None]:
def create_consumer_record(topic: str, partition: int, msg: BaseModel):
    record = ConsumerRecord(
        topic=topic,
        partition=partition,
        offset=0,
        timestamp=0,
        timestamp_type=0,
        key=None,
        value=msg.json().encode("utf-8")
        if hasattr(msg, "json")
        else msg.encode("utf-8"),
        checksum=0,
        serialized_key_size=0,
        serialized_value_size=0,
        headers=[],
    )
    return record

In [None]:
meta = EventMetadata.create_event_metadata(create_consumer_record("topic", 1, MyMessage(url="http://www.acme.com", port=22)))
assert meta.topic == "topic"
assert meta.partition == 1

In [None]:
# | export

AsyncConsume = Callable[[Union[List[BaseModel], BaseModel]], Awaitable[None]]
AsyncConsumeMeta =  Callable[[Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], Awaitable[None]]
SyncConsume = Callable[[Union[List[BaseModel], BaseModel]], None]
SyncConsumeMeta =  Callable[[Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], None]

ConsumeCallable = Union[
    AsyncConsume, AsyncConsumeMeta, SyncConsume, SyncConsumeMeta
]

In [None]:
# | export


def _callback_parameters_wrapper(
    callback: Union[AsyncConsume, AsyncConsumeMeta]
) -> AsyncConsumeMeta:
    """Wraps an async callback and filters the arguments to pass based on if the function accepts EventMetadata as argument

    Args:
        callback: async callable that will be wrapped

    Returns:
        Wrapped callback with filtered params
    """

    async def _params_wrap(
        msg: Union[BaseModel, List[BaseModel]],
        meta: Union[EventMetadata, List[EventMetadata]],
        callback: Union[AsyncConsume, AsyncConsumeMeta] = callback,
    ) -> None:
        types = list(get_type_hints(callback).values())
        args: List[Union[BaseModel, List[BaseModel], EventMetadata, List[EventMetadata]]] = [msg]
        if EventMetadata in types:
            args.insert(types.index(EventMetadata), meta)
        if List[EventMetadata] in types:
            args.insert(types.index(List[EventMetadata]), meta)
        await callback(*args)  # type: ignore

    return _params_wrap

In [None]:
async def without_meta(msg: BaseModel):
    assert msg == "Example_msg"

with pytest.raises(TypeError) as e:
    await without_meta("Example_msg", "Some_meta")

In [None]:
@_callback_parameters_wrapper
async def without_meta(msg: BaseModel):
    assert msg == "Example_msg"

await without_meta("Example_msg", "Some_meta")

In [None]:
@_callback_parameters_wrapper
async def with_meta(msg: BaseModel, meta: EventMetadata):
    assert msg == "Example_msg"
    assert meta == "Some_meta"

await with_meta("Example_msg", "Some_meta")

In [None]:
@_callback_parameters_wrapper
async def with_meta(msg: List[BaseModel], meta: List[EventMetadata]):
    assert msg == "Example_msg"
    assert meta == "Some_meta"

await with_meta("Example_msg", "Some_meta")

In [None]:
# | export


def _prepare_callback(
    callback: ConsumeCallable
) -> AsyncConsumeMeta:
    """
    Prepares a callback to be used in the consumer loop.
        1. If callback is sync, asyncify it
        2. Wrap the callback into a safe callback for exception handling

    Args:
        callback: async callable that will be prepared for use in consumer

    Returns:
        Prepared callback
    """
    async_callback: Union[AsyncConsume, AsyncConsumeMeta] = (
        callback if iscoroutinefunction(callback) else asyncer.asyncify(callback)  # type: ignore
    )
    return _callback_parameters_wrapper(async_callback)

In [None]:
# Check if callback is called when wrapped

for is_async in [False, True]:
    example_msg = "Example msg"
    callback = AsyncMock() if is_async else Mock()
    prepared_callback = _prepare_callback(callback)

    with patch("__main__.get_type_hints") as mock:
        mock.return_value = {"msg": BaseModel}
        await prepared_callback(f"{example_msg}", "Some meta")

    callback.assert_called_once_with(f"{example_msg}")

In [None]:
async def _stream_msgs(  # type: ignore
    msgs: Dict[TopicPartition, bytes],
    send_stream: anyio.streams.memory.MemoryObjectSendStream[Any],
) -> None:
    """
    Decodes and streams the message and topic to the send_stream.

    Args:
        msgs:
        send_stream:
    """
    for topic_partition, topic_msgs in msgs.items():
        topic = topic_partition.topic
        try:
            await send_stream.send(topic_msgs)
        except Exception as e:
            logger.warning(
                f"_stream_msgs(): Unexpected exception '{e.__repr__()}' caught and ignored for topic='{topic_partition.topic}', partition='{topic_partition.partition}' and messages: {topic_msgs!r}"
            )


def _decode_streamed_msgs(  # type: ignore
    msgs: List[ConsumerRecord], msg_type: BaseModel
) -> List[BaseModel]:
    decoded_msgs = [msg_type.parse_raw(msg.value.decode("utf-8")) for msg in msgs]
    return decoded_msgs

In [None]:
# Sanity check: one msg, one topic

with patch("anyio.streams.memory.MemoryObjectSendStream.send") as mock:
    send_stream, receive_stream = anyio.create_memory_object_stream()

    topic = "topic_0"
    partition = 0
    topic_part_0_0 = TopicPartition(topic, partition)
    msg = MyMessage(url="http://www.acme.com", port=22)
    record = create_consumer_record(topic=topic, partition=partition, msg=msg)

    await _stream_msgs(
        msgs={topic_part_0_0: [record]},
        send_stream=send_stream,
    )

    mock.assert_called_once()
    mock.assert_has_calls([call([record])])

In [None]:
# Check different topics

# Two msg, two topics, send called twice with each topic

with patch("anyio.streams.memory.MemoryObjectSendStream.send") as mock:
    send_stream, receive_stream = anyio.create_memory_object_stream()

    topic_partitions = [("topic_0", 0), ("topic_1", 0)]

    msg = MyMessage(url="http://www.acme.com", port=22)
    msgs = {
        TopicPartition(topic, partition): [
            create_consumer_record(topic=topic, partition=partition, msg=msg)
        ]
        for topic, partition in topic_partitions
    }

    await _stream_msgs(
        msgs=msgs,
        send_stream=send_stream,
    )

    assert mock.call_count == 2

    mock.assert_has_calls([call(msg) for msg in msgs.values()])

In [None]:
# Check multiple msgs in same topic

# Two msg, one topic, send called twice for same topic

with patch("anyio.streams.memory.MemoryObjectSendStream.send") as mock:
    send_stream, receive_stream = anyio.create_memory_object_stream()

    topic_partitions = [("topic_0", 0)]

    msg = MyMessage(url="http://www.acme.com", port=22)
    record = create_consumer_record(topic=topic, partition=partition, msg=msg)

    msgs = {
        TopicPartition(topic, partition): [
            create_consumer_record(topic=topic, partition=partition, msg=msg),
            create_consumer_record(topic=topic, partition=partition, msg=msg),
        ]
        for topic, partition in topic_partitions
    }

    await _stream_msgs(
        msgs=msgs,
        send_stream=send_stream,
    )

    mock.assert_has_calls([call(msg) for msg in msgs.values()])

In [None]:
# Check multiple partitions

# Two msg, one topic, differenct partitions, send called twice for same topic

with patch("anyio.streams.memory.MemoryObjectSendStream.send") as mock:
    send_stream, receive_stream = anyio.create_memory_object_stream()

    topic_partitions = [("topic_0", 0), ("topic_0", 1)]

    msg = MyMessage(url="http://www.acme.com", port=22)
    msgs = {
        TopicPartition(topic, partition): [
            create_consumer_record(topic=topic, partition=partition, msg=msg)
        ]
        for topic, partition in topic_partitions
    }
    record = create_consumer_record(topic=topic, partition=partition, msg=msg)

    await _stream_msgs(
        msgs=msgs,
        send_stream=send_stream,
    )

    mock.assert_has_calls([call(msg) for msg in msgs.values()])

In [None]:
# | export


def _get_single_msg_handlers( # type: ignore
    *,
    consumer: AIOKafkaConsumer,
    callback: AsyncConsumeMeta,
    decoder_fn: Callable[[bytes, ModelMetaclass], Any],
    msg_type: Type[BaseModel],
    **kwargs: Any,
) -> Tuple[
    Callable[
        [
            ConsumerRecord,
            AsyncConsumeMeta,
            Callable[[bytes, ModelMetaclass], Any],
            Type[BaseModel],
        ],
        Awaitable[None],
    ],
    Callable[[AIOKafkaConsumer, Any], Awaitable[List[ConsumerRecord]]],
]:
    
    """
    Retrieves the message handlers for consuming single messages from a Kafka topic.

    Args:
        consumer: The Kafka consumer instance.
        callback: The callback function to handle the consumed message.
        decoder_fn: The function to decode the consumed message.
        msg_type: The type of the consumed message.
        **kwargs: Additional keyword arguments for the consumer.

    Returns:
        The handle_msg function and poll_consumer function.
    """
    async def handle_msg(  # type: ignore
        record: ConsumerRecord,
        callback: AsyncConsumeMeta = callback,
        decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,
        msg_type: Type[BaseModel] = msg_type,
    ) -> None:
        await callback(
            decoder_fn(record.value, msg_type),
            EventMetadata.create_event_metadata(record),
        )

    async def poll_consumer(  # type: ignore
        consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs
    ) -> List[ConsumerRecord]:
        msgs = await consumer.getmany(**kwargs)
        return [msg for msg_group in msgs.values() for msg in msg_group]

    return handle_msg, poll_consumer

In [None]:
topic_partitions = [("topic_0", 0), ("topic_0", 1)]

msg = MyMessage(url="http://www.acme.com", port=22)
msgs = {
    TopicPartition(topic, partition): [
        create_consumer_record(topic=topic, partition=partition, msg=msg)
    ]
    for topic, partition in topic_partitions
}
record = create_consumer_record(topic=topic, partition=partition, msg=msg)

consumer = AsyncMock()
consumer.getmany.return_value = msgs

callback = AsyncMock()
decoder_fn = json_decoder
msg_type = MyMessage

handle_msg, poll_consumer = _get_single_msg_handlers(
    consumer=consumer, callback=callback, decoder_fn=decoder_fn, msg_type=msg_type
)

got_msgs = await poll_consumer()
assert len(msgs.values()) == len(got_msgs)

for msg in got_msgs:
    await handle_msg(msg)

callback.assert_has_awaits(
    [
        call(
            json_decoder(msg.value, msg_type), EventMetadata.create_event_metadata(msg)
        )
        for msg in got_msgs
    ]
)

In [None]:
# | export


def _get_batch_msg_handlers(  # type: ignore
    *,
    consumer: AIOKafkaConsumer,
    callback: AsyncConsumeMeta,
    decoder_fn: Callable[[bytes, ModelMetaclass], Any],
    msg_type: Type[BaseModel],
    **kwargs: Any,
) -> Tuple[
    Callable[
        [
            List[ConsumerRecord],
            AsyncConsumeMeta,
            Callable[[bytes, ModelMetaclass], Any],
            Type[BaseModel],
        ],
        Awaitable[None],
    ],
    Callable[[AIOKafkaConsumer, Any], Awaitable[List[List[ConsumerRecord]]]],
]:
    """
    Retrieves the message handlers for consuming messages in batches from a Kafka topic.

    Args:
        consumer: The Kafka consumer instance.
        callback: The callback function to handle the consumed messages.
        decoder_fn: The function to decode the consumed messages.
        msg_type: The type of the consumed messages.
        **kwargs: Additional keyword arguments for the consumer.

    Returns:
        The handle_msg function and poll_consumer function.
    """

    async def handle_msg(  # type: ignore
        records: List[ConsumerRecord],
        callback: AsyncConsumeMeta = callback,
        decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,
        msg_type: Type[BaseModel] = msg_type,
    ) -> None:
        await callback(
            [decoder_fn(record.value, msg_type) for record in records],
            [EventMetadata.create_event_metadata(record) for record in records],
        )

    async def poll_consumer(  # type: ignore
        consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs
    ) -> List[List[ConsumerRecord]]:
        msgs = await consumer.getmany(**kwargs)
        return [value for value in msgs.values() if len(value)>0]

    return handle_msg, poll_consumer

In [None]:
topic_partitions = [("topic_0", 0), ("topic_0", 1)]

msg = MyMessage(url="http://www.acme.com", port=22)
msgs = {
    TopicPartition(topic, partition): [
        create_consumer_record(topic=topic, partition=partition, msg=msg)
    ]
    for topic, partition in topic_partitions
}
record = create_consumer_record(topic=topic, partition=partition, msg=msg)

consumer = AsyncMock()
consumer.getmany.return_value = msgs

callback = AsyncMock()
decoder_fn = json_decoder
msg_type = MyMessage

handle_msg, poll_consumer = _get_batch_msg_handlers(
    consumer=consumer, callback=callback, decoder_fn=decoder_fn, msg_type=msg_type
)

got_msgs = await poll_consumer()
assert len(msgs.values()) == len(got_msgs)

for msgs in got_msgs:
    assert len(msgs) == 1

for msg in got_msgs:
    await handle_msg(msg)

callback.assert_has_awaits(
    [
        call(
            [json_decoder(msg_unwrapped.value, msg_type) for msg_unwrapped in msg],
            [EventMetadata.create_event_metadata(msg_unwrapped) for msg_unwrapped in msg],
        )
        for msg in got_msgs
    ]
)

In [None]:
# | export


@delegates(AIOKafkaConsumer.getmany)
async def _aiokafka_consumer_loop(  # type: ignore
    consumer: AIOKafkaConsumer,
    *,
    topic: str,
    decoder_fn: Callable[[bytes, ModelMetaclass], Any],
    callback: ConsumeCallable,
    max_buffer_size: int = 100_000,
    msg_type: Union[Type[List[BaseModel]], Type[BaseModel]],
    is_shutting_down_f: Callable[[], bool],
    executor: Union[str, StreamExecutor, None] = None,
    **kwargs: Any,
) -> None:
    """
    Consumer loop for infinite pooling of the AIOKafka consumer for new messages. Calls consumer.getmany()
    and after the consumer return messages or times out, messages are decoded and streamed to defined callback.

    Args:
        topic: Topic to subscribe
        decoder_fn: Function to decode the messages consumed from the topic
        callbacks: Dict of callbacks mapped to their respective topics
        timeout_ms: Time to timeut the getmany request by the consumer
        max_buffer_size: Maximum number of unconsumed messages in the callback buffer
        msg_types: Dict of message types mapped to their respective topics
        is_shutting_down_f: Function for controlling the shutdown of consumer loop
    """

    prepared_callback = _prepare_callback(callback)

    if hasattr(msg_type, "__origin__") and msg_type.__origin__ == list:
        handle_msg, poll_consumer = _get_batch_msg_handlers(
            consumer=consumer,
            callback=prepared_callback,
            decoder_fn=decoder_fn,
            msg_type=msg_type.__args__[0],  # type: ignore
            **kwargs,
        )
    else:
        handle_msg, poll_consumer = _get_single_msg_handlers(
            consumer=consumer,
            callback=prepared_callback,
            decoder_fn=decoder_fn,
            msg_type=msg_type,  # type: ignore
            **kwargs,
        )

    await get_executor(executor).run(
        is_shutting_down_f=is_shutting_down_f,
        generator=poll_consumer,  # type: ignore
        processor=handle_msg,  # type: ignore
    )

In [None]:
def is_shutting_down_f(mock_func: Mock, num_calls: int = 1) -> Callable[[], bool]:
    def _is_shutting_down_f():
        return mock_func.call_count == num_calls

    return _is_shutting_down_f

In [None]:
from fastkafka._components.task_streaming import SequentialExecutor

In [None]:
topic = "topic_0"
partition = 0
msg = MyMessage(url="http://www.acme.com", port=22)
record = create_consumer_record(topic=topic, partition=partition, msg=msg)

mock_consumer = MagicMock()
msgs = {TopicPartition(topic, 0): [record]}

f = asyncio.Future()
f.set_result(msgs)
mock_consumer.configure_mock(**{"getmany.return_value": f})

def f(msg: MyMessage): pass
mock_callback = MagicMock(spec=f)


for is_async in [True, False]:
    for executor_type in ["DynamicTaskExecutor", "SequentialExecutor"]:
        await _aiokafka_consumer_loop(
            consumer=mock_consumer,
            topic=topic,
            decoder_fn=json_decoder,
            max_buffer_size=100,
            timeout_ms=10,
            callback=asyncer.asyncify(mock_callback) if is_async else mock_callback,
            msg_type=MyMessage,
            is_shutting_down_f=is_shutting_down_f(mock_consumer.getmany),
            executor_type=executor_type,
        )

        assert mock_consumer.getmany.call_count == 1
        mock_callback.assert_called_once_with(msg)

print("ok")

ok


In [None]:
topic = "topic_0"
partition = 0
msg = MyMessage(url="http://www.acme.com", port=22)
record = create_consumer_record(topic=topic, partition=partition, msg=msg)

mock_consumer = MagicMock()
msgs = {TopicPartition(topic, 0): [record]}

f = asyncio.Future()
f.set_result(msgs)
mock_consumer.configure_mock(**{"getmany.return_value": f})

def f(msg: List[MyMessage]): pass
mock_callback = MagicMock(spec=f)


for is_async in [True, False]:
    for executor_type in ["DynamicTaskExecutor", "SequentialExecutor"]:
        await _aiokafka_consumer_loop(
            consumer=mock_consumer,
            topic=topic,
            decoder_fn=json_decoder,
            max_buffer_size=100,
            timeout_ms=10,
            callback=asyncer.asyncify(mock_callback) if is_async else mock_callback,
            msg_type=List[MyMessage],
            is_shutting_down_f=is_shutting_down_f(mock_consumer.getmany),
            executor_type=executor_type,
        )

        assert mock_consumer.getmany.call_count == 1
        mock_callback.assert_called_once_with([msg])

print("ok")

ok


In [None]:
# Sanity check: exception in callback recovery
# Two msg, one topic, process_f called twice even tough it throws

for is_async in [True, False]:
    for executor_type in ["DynamicTaskExecutor", "SequentialExecutor"]:
        topic = "topic_0"
        partition = 0
        msg = MyMessage(url="http://www.acme.com", port=22)
        record = create_consumer_record(topic=topic, partition=partition, msg=msg)

        num_msgs = 2

        mock_consumer = MagicMock()
        msgs = {TopicPartition(topic, 0): [record, record]}

        f = asyncio.Future()
        f.set_result(msgs)

        mock_consumer.configure_mock(**{"getmany.return_value": f})
        mock_callback = Mock()

        exception = Exception("")
        mock_callback.side_effect = exception


        await _aiokafka_consumer_loop(
            consumer=mock_consumer,
            topic=topic,
            decoder_fn=json_decoder,
            max_buffer_size=100,
            timeout_ms=1,
            callback=asyncer.asyncify(mock_callback) if is_async else mock_callback,
            msg_type=MyMessage,
            is_shutting_down_f=is_shutting_down_f(mock_consumer.getmany, num_calls=1),
            executor_type=executor_type,
        )

        assert mock_callback.call_count == num_msgs, mock_callback.call_count
        mock_callback.assert_has_calls([call(msg), call(msg)])

print("ok")

ok


In [None]:
# Sanity check: malformed msgs
# One msg of wrong type, two normal msg, one topic, process_f called twice

topic = "topic_0"
partition = 0
msg = MyMessage(url="http://www.acme.com", port=22)
correct_record = create_consumer_record(topic=topic, partition=partition, msg=msg)
faulty_record = create_consumer_record(topic=topic, partition=partition, msg="Wrong!")

mock_consumer = MagicMock()
msgs = {TopicPartition(topic, 0): [faulty_record, correct_record, correct_record]}

mock_consumer.configure_mock(**{"getmany.return_value": f})
mock_callback = Mock()

exception = Exception("")
callback.side_effect = exception

for is_async in [True, False]:
    await _aiokafka_consumer_loop(
        consumer=mock_consumer,
        topic=topic,
        decoder_fn=json_decoder,
        max_buffer_size=100,
        timeout_ms=10,
        callback=asyncer.asyncify(mock_callback) if is_async else mock_callback,
        msg_type=MyMessage,
        is_shutting_down_f=is_shutting_down_f(mock_consumer.getmany),
    )

    assert mock_consumer.getmany.call_count == 1
    mock_callback.assert_has_calls([call(msg), call(msg)])

print("ok")

ok


In [None]:
# | export


def sanitize_kafka_config(**kwargs: Any) -> Dict[str, Any]:
    """Sanitize Kafka config"""
    return {k: "*" * len(v) if "pass" in k.lower() else v for k, v in kwargs.items()}

In [None]:
kwargs = {
    "bootstrap_servers": "whatever.cloud:9092",
    "auto_offset_reset": "earliest",
    "security_protocol": "SASL_SSL",
    "sasl_mechanism": "PLAIN",
    "sasl_plain_username": "username",
    "sasl_plain_password": "password",
    "ssl_context": "something",
}

assert sanitize_kafka_config(**kwargs)["sasl_plain_password"] == "********"

In [None]:
# | export


@delegates(AIOKafkaConsumer)
@delegates(_aiokafka_consumer_loop, keep=True)
async def aiokafka_consumer_loop(
    topic: str,
    decoder_fn: Callable[[bytes, ModelMetaclass], Any],
    *,
    timeout_ms: int = 100,
    max_buffer_size: int = 100_000,
    callback: ConsumeCallable,
    msg_type: Union[Type[List[BaseModel]], Type[BaseModel]],
    is_shutting_down_f: Callable[[], bool],
    executor: Union[str, StreamExecutor, None] = None,
    **kwargs: Any,
) -> None:
    """Consumer loop for infinite pooling of the AIOKafka consumer for new messages. Creates and starts AIOKafkaConsumer
    and runs _aio_kafka_consumer loop fo infinite poling of the consumer for new messages.

    Args:
        topic: name of the topic to subscribe to
        decoder_fn: Function to decode the messages consumed from the topic
        callback: callback function to be called after decoding and parsing a consumed message
        timeout_ms: Time to timeut the getmany request by the consumer
        max_buffer_size: Maximum number of unconsumed messages in the callback buffer
        msg_type: Type with `parse_json` method used for parsing a decoded message
        is_shutting_down_f: Function for controlling the shutdown of consumer loop
    """
    logger.info(f"aiokafka_consumer_loop() starting...")
    try:
        consumer = AIOKafkaConsumer(
            **kwargs,
        )
        logger.info(
            f"aiokafka_consumer_loop(): Consumer created using the following parameters: {sanitize_kafka_config(**kwargs)}"
        )

        await consumer.start()
        logger.info("aiokafka_consumer_loop(): Consumer started.")
        consumer.subscribe([topic])
        logger.info("aiokafka_consumer_loop(): Consumer subscribed.")

        try:
            await _aiokafka_consumer_loop(
                consumer=consumer,
                topic=topic,
                decoder_fn=decoder_fn,
                max_buffer_size=max_buffer_size,
                timeout_ms=timeout_ms,
                callback=callback,
                msg_type=msg_type,
                is_shutting_down_f=is_shutting_down_f,
                executor = executor,
            )
        finally:
            await consumer.stop()
            logger.info(f"aiokafka_consumer_loop(): Consumer stopped.")
            logger.info(f"aiokafka_consumer_loop() finished.")
    except Exception as e:
        logger.error(
            f"aiokafka_consumer_loop(): unexpected exception raised: '{e.__repr__()}'"
        )
        raise e

In [None]:

for executor in ["DynamicTaskExecutor", "SequentialExecutor"]:
    topic = "test_topic"
    msgs_sent = 9178
    msgs = [
        MyMessage(url="http://www.ai.com", port=port).json().encode("utf-8")
        for port in range(msgs_sent)
    ]
    msgs_received = 0


    async def count_msg(msg: MyMessage):
        global msgs_received
        msgs_received = msgs_received + 1
        if msgs_received % 1000 == 0:
            logger.info(f"{msgs_received=}")


    async with ApacheKafkaBroker(topics=[topic], listener_port=11992) as bootstrap_server:
        await produce_messages(topic=topic, bootstrap_servers=bootstrap_server, msgs=msgs)
        await aiokafka_consumer_loop(
            topic=topic,
            decoder_fn=json_decoder,
            auto_offset_reset="earliest",
            callback=count_msg,
            msg_type=MyMessage,
            is_shutting_down_f=true_after(2),
            bootstrap_servers=bootstrap_server,
            executor=executor,
        )

        assert msgs_sent == msgs_received, f"{msgs_sent} != {msgs_received}"

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=46809
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092


producing to 'test_topic':   0%|          | 0/9178 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: msgs_received=1000
[INFO] __main__: msgs_received=2000
[INFO] __main__: msgs_received=3000
[INFO] __main__: msgs_received=4000
[INFO] __main__: msgs_received=5000
[INFO] __main__: msgs_received=6000
[INFO] __main__: msgs_received=7000
[INFO] __main__: msgs_received=8000
[INFO] __main__: msgs_received=9000
[INFO] __main__: aiokafka_consumer_loop(): Consume

producing to 'test_topic':   0%|          | 0/9178 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: msgs_received=1000
[INFO] __main__: msgs_received=2000
[INFO] __main__: msgs_received=3000
[INFO] __main__: msgs_received=4000
[INFO] __main__: msgs_received=5000
[INFO] __main__: msgs_received=6000
[INFO] __main__: msgs_received=7000
[INFO] __main__: msgs_received=8000
[INFO] __main__: msgs_received=9000
[INFO] __main__: aiokafka_consumer_loop(): Consume

In [None]:
for executor in ["DynamicTaskExecutor", "SequentialExecutor"]:
    topic = "test_topic"
    msgs_sent = 9178
    msgs = [
        MyMessage(url="http://www.ai.com", port=port).json().encode("utf-8")
        for port in range(msgs_sent)
    ]
    msgs_received = 0


    async def count_msg(msg: List[MyMessage], meta: List[EventMetadata]):
        global msgs_received
        msgs_received = msgs_received + len(msg)
        logger.info(f"{msgs_received=}")


    async with ApacheKafkaBroker(topics=[topic], listener_port=11992) as bootstrap_server:
        await produce_messages(topic=topic, bootstrap_servers=bootstrap_server, msgs=msgs)
        await aiokafka_consumer_loop(
            topic=topic,
            decoder_fn=json_decoder,
            auto_offset_reset="earliest",
            callback=count_msg,
            msg_type=List[MyMessage],
            is_shutting_down_f=true_after(2),
            bootstrap_servers=bootstrap_server,
            executor=executor,
        )

        assert msgs_sent == msgs_received, f"{msgs_sent} != {msgs_received}"

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=42673
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092


producing to 'test_topic':   0%|          | 0/9178 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: msgs_received=9178
[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.
[INFO] __main__: aiokafka_consumer_loop() finished.
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 168561...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 168561 terminated.
[INFO] fastkafka._comp

producing to 'test_topic':   0%|          | 0/9178 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: msgs_received=9178
[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.
[INFO] __main__: aiokafka_consumer_loop() finished.
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 170166...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 170166 terminated.
[INFO] fastkafka._comp

In [None]:
# Test with meta

for executor in ["DynamicTaskExecutor", "SequentialExecutor"]:
    topic = "test_topic"
    msgs_sent = 9178
    msgs = [
        MyMessage(url="http://www.ai.com", port=port).json().encode("utf-8")
        for port in range(msgs_sent)
    ]
    msgs_received = 0
    meta_samples = []

    async def count_msg(msg: MyMessage, meta: EventMetadata):
        global msgs_received
        msgs_received = msgs_received + 1
        if msgs_received % 1000 == 0:
            meta_samples.append(meta)
            logger.info(f"{msgs_received=}, {meta=}")


    async with ApacheKafkaBroker(topics=[topic], listener_port=11992) as bootstrap_server:
        await produce_messages(topic=topic, bootstrap_servers=bootstrap_server, msgs=msgs)
        await aiokafka_consumer_loop(
            topic=topic,
            decoder_fn=json_decoder,
            auto_offset_reset="earliest",
            callback=count_msg,
            msg_type=MyMessage,
            is_shutting_down_f=true_after(2),
            bootstrap_servers=bootstrap_server,
            executor = executor
        )

        assert msgs_sent == msgs_received, f"{msgs_sent} != {msgs_received}"
        assert all(isinstance(meta, EventMetadata) for meta in meta_samples)

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=34735
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092


producing to 'test_topic':   0%|          | 0/9178 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: msgs_received=1000, meta=EventMetadata(topic='test_topic', partition=0, offset=999, timestamp=1687160704618, timestamp_type=0, key=None, value=b'{"url": "http://www.ai.com", "port": 999}', checksum=None, serialized_key_size=-1, serialized_value_size=41, headers=())
[INFO] __main__: msgs_received=2000, meta=EventMetadata(topic='test_topic', partition=0, of

producing to 'test_topic':   0%|          | 0/9178 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: msgs_received=1000, meta=EventMetadata(topic='test_topic', partition=0, offset=999, timestamp=1687160714836, timestamp_type=0, key=None, value=b'{"url": "http://www.ai.com", "port": 999}', checksum=None, serialized_key_size=-1, serialized_value_size=41, headers=())
[INFO] __main__: msgs_received=2000, meta=EventMetadata(topic='test_topic', partition=0, of

In [None]:
# Test with avro_decoder

for executor in ["DynamicTaskExecutor", "SequentialExecutor"]:
    topic = "test_topic"
    msgs_sent = 9178
    msgs = [
        avro_encoder(MyMessage(url="http://www.ai.com", port=port))
        for port in range(msgs_sent)
    ]
    msgs_received = 0


    async def count_msg(msg: MyMessage):
        global msgs_received
        msgs_received = msgs_received + 1
        if msgs_received % 1000 == 0:
            logger.info(f"{msgs_received=}")


    async with ApacheKafkaBroker(topics=[topic], listener_port=11992) as bootstrap_server:
        await produce_messages(topic=topic, bootstrap_servers=bootstrap_server, msgs=msgs)
        await aiokafka_consumer_loop(
            topic=topic,
            decoder_fn=avro_decoder,
            auto_offset_reset="earliest",
            callback=count_msg,
            msg_type=MyMessage,
            is_shutting_down_f=true_after(2),
            bootstrap_servers=bootstrap_server,
            executor=executor,
        )

        assert msgs_sent == msgs_received, f"{msgs_sent} != {msgs_received}"


[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=44275
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092


producing to 'test_topic':   0%|          | 0/9178 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: msgs_received=1000
[INFO] __main__: msgs_received=2000
[INFO] __main__: msgs_received=3000
[INFO] __main__: msgs_received=4000
[INFO] __main__: msgs_received=5000
[INFO] __main__: msgs_received=6000
[INFO] __main__: msgs_received=7000
[INFO] __main__: msgs_received=8000
[INFO] __main__: msgs_received=9000
[INFO] __main__: aiokafka_consumer_loop(): Consume

producing to 'test_topic':   0%|          | 0/9178 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: msgs_received=1000
[INFO] __main__: msgs_received=2000
[INFO] __main__: msgs_received=3000
[INFO] __main__: msgs_received=4000
[INFO] __main__: msgs_received=5000
[INFO] __main__: msgs_received=6000
[INFO] __main__: msgs_received=7000
[INFO] __main__: msgs_received=8000
[INFO] __main__: msgs_received=9000
[INFO] __main__: aiokafka_consumer_loop(): Consume

In [None]:
# Test with avro_decoder and meta

for executor in ["DynamicTaskExecutor", "SequentialExecutor"]:
    topic = "test_topic"
    msgs_sent = 9178
    msgs = [
        avro_encoder(MyMessage(url="http://www.ai.com", port=port))
        for port in range(msgs_sent)
    ]
    msgs_received = 0
    meta_samples = []

    async def count_msg(msg: MyMessage, meta: EventMetadata):
        global msgs_received
        msgs_received = msgs_received + 1
        if msgs_received % 1000 == 0:
            logger.info(f"{msgs_received=}")
            meta_samples.append(meta)


    async with ApacheKafkaBroker(topics=[topic], listener_port=11992) as bootstrap_server:
        await produce_messages(topic=topic, bootstrap_servers=bootstrap_server, msgs=msgs)
        await aiokafka_consumer_loop(
            topic=topic,
            decoder_fn=avro_decoder,
            auto_offset_reset="earliest",
            callback=count_msg,
            msg_type=MyMessage,
            is_shutting_down_f=true_after(2),
            bootstrap_servers=bootstrap_server,
            executor=executor,
        )

        assert msgs_sent == msgs_received, f"{msgs_sent} != {msgs_received}"
        assert all(isinstance(meta, EventMetadata) for meta in meta_samples)

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=36279
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092


producing to 'test_topic':   0%|          | 0/9178 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: msgs_received=1000
[INFO] __main__: msgs_received=2000
[INFO] __main__: msgs_received=3000
[INFO] __main__: msgs_received=4000
[INFO] __main__: msgs_received=5000
[INFO] __main__: msgs_received=6000
[INFO] __main__: msgs_received=7000
[INFO] __main__: msgs_received=8000
[INFO] __main__: msgs_received=9000
[INFO] __main__: aiokafka_consumer_loop(): Consume

producing to 'test_topic':   0%|          | 0/9178 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: msgs_received=1000
[INFO] __main__: msgs_received=2000
[INFO] __main__: msgs_received=3000
[INFO] __main__: msgs_received=4000
[INFO] __main__: msgs_received=5000
[INFO] __main__: msgs_received=6000
[INFO] __main__: msgs_received=7000
[INFO] __main__: msgs_received=8000
[INFO] __main__: msgs_received=9000
[INFO] __main__: aiokafka_consumer_loop(): Consume

In [None]:
for executor in ["DynamicTaskExecutor", "SequentialExecutor"]:
    topic = "test_topic"
    msgs_sent = 500_00
    msgs = [
        MyMessage(url="http://www.ai.com", port=port).json().encode("utf-8")
        for port in range(msgs_sent)
    ]


    async def count_msg(msg: MyMessage):
        pbar.update(1)


    def _is_shutting_down_f():
        return pbar.n >= pbar.total


    async with ApacheKafkaBroker(topics=[topic], listener_port=11992) as bootstrap_server:
        await produce_messages(topic=topic, bootstrap_servers=bootstrap_server, msgs=msgs)
        with tqdm(total=msgs_sent, desc="consuming messages") as _pbar:
            global pbar
            pbar = _pbar

            start = datetime.now()
            await aiokafka_consumer_loop(
                topic=topic,
                decoder_fn=json_decoder,
                auto_offset_reset="earliest",
                callback=count_msg,
                msg_type=MyMessage,
                is_shutting_down_f=_is_shutting_down_f,
                bootstrap_servers=bootstrap_server,
                executor=executor
            )
            t = (datetime.now() - start) / timedelta(seconds=1)
            thrp = pbar.n / t

            print(f"Messages processed: {pbar.n:,d}")
            print(f"Time              : {t:.2f} s")
            print(f"Throughput.       : {thrp:,.0f} msg/s")

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=57819
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092


producing to 'test_topic':   0%|          | 0/50000 [00:00<?, ?it/s]

consuming messages:   0%|          | 0/50000 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.
[INFO] __main__: aiokafka_consumer_loop() finished.
Messages processed: 50,000
Time              : 3.07 s
Throughput.       : 16,284 msg/s
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 181391...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process()

producing to 'test_topic':   0%|          | 0/50000 [00:00<?, ?it/s]

consuming messages:   0%|          | 0/50000 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.
[INFO] __main__: aiokafka_consumer_loop() finished.
Messages processed: 50,000
Time              : 1.97 s
Throughput.       : 25,418 msg/s
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 183329...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process()

In [None]:
# Test with avro_decoder

for executor in ["DynamicTaskExecutor", "SequentialExecutor"]:
    topic = "test_topic"
    msgs_sent = 500_00
    msgs = [
        avro_encoder(MyMessage(url="http://www.ai.com", port=port))
        for port in range(msgs_sent)
    ]


    async def count_msg(msg: MyMessage):
        pbar.update(1)


    def _is_shutting_down_f():
        return pbar.n >= pbar.total


    async with ApacheKafkaBroker(topics=[topic], listener_port=11992) as bootstrap_server:
        await produce_messages(topic=topic, bootstrap_servers=bootstrap_server, msgs=msgs)
        with tqdm(total=msgs_sent, desc="consuming messages") as _pbar:
            global pbar
            pbar = _pbar

            start = datetime.now()
            await aiokafka_consumer_loop(
                topic=topic,
                decoder_fn=avro_decoder,
                auto_offset_reset="earliest",
                callback=count_msg,
                msg_type=MyMessage,
                is_shutting_down_f=_is_shutting_down_f,
                bootstrap_servers=bootstrap_server,
                executor=executor
            )
            t = (datetime.now() - start) / timedelta(seconds=1)
            thrp = pbar.n / t

            print(f"Messages processed: {pbar.n:,d}")
            print(f"Time              : {t:.2f} s")
            print(f"Throughput.       : {thrp:,.0f} msg/s")

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=41935
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092


producing to 'test_topic':   0%|          | 0/50000 [00:00<?, ?it/s]

consuming messages:   0%|          | 0/50000 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.
[INFO] __main__: aiokafka_consumer_loop() finished.
Messages processed: 50,000
Time              : 4.37 s
Throughput.       : 11,443 msg/s
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 184940...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process()

producing to 'test_topic':   0%|          | 0/50000 [00:00<?, ?it/s]

consuming messages:   0%|          | 0/50000 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.
[INFO] __main__: aiokafka_consumer_loop() finished.
Messages processed: 50,000
Time              : 2.94 s
Throughput.       : 17,030 msg/s
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 186550...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process()

## Consumer loop benchmark and coroutine sanity check

In [None]:
# | notest

topic = "test_topic"
msgs_sent = 500_00
msgs = [
    MyMessage(url="http://www.ai.com", port=port).json().encode("utf-8")
    for port in range(msgs_sent)
]


async def count_msg(msg: MyMessage):
    pbar.update(1)
    await asyncio.sleep(1)
    pbar.update(1)

def _is_shutting_down_f():
    return pbar.n >= pbar.total


async with ApacheKafkaBroker(topics=[topic], listener_port=11992) as bootstrap_server:
    await produce_messages(topic=topic, bootstrap_servers=bootstrap_server, msgs=msgs)
    with tqdm(total=msgs_sent*2, desc="consuming messages") as _pbar:
        global pbar
        pbar = _pbar

        start = datetime.now()
        await aiokafka_consumer_loop(
            topic=topic,
            decoder_fn=json_decoder,
            auto_offset_reset="earliest",
            callback=count_msg,
            msg_type=MyMessage,
            is_shutting_down_f=_is_shutting_down_f,
            bootstrap_servers=bootstrap_server,
            executor = "DynamicTaskExecutor"
        )
        t = (datetime.now() - start) / timedelta(seconds=1)
        thrp = pbar.n / t

        print(f"Messages processed: {pbar.n:,d}")
        print(f"Time              : {t:.2f} s")
        print(f"Throughput.       : {thrp:,.0f} msg/s")
        
assert t < 15

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=41393
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092


producing to 'test_topic':   0%|          | 0/50000 [00:00<?, ?it/s]

consuming messages:   0%|          | 0/100000 [00:00<?, ?it/s]

[INFO] __main__: aiokafka_consumer_loop() starting...
[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer started.
[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})
[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}
[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. 
[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.
[INFO] __main__: aiokafka_consumer_loop() finished.
Messages processed: 100,000
Time              : 6.24 s
Throughput.       : 16,031 msg/s
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 188160...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(