In [None]:
# | default_exp _application.app

In [None]:
# | export

import asyncio
import functools
import inspect
import json
import types
from asyncio import iscoroutinefunction  # do not use the version from inspect
from collections import namedtuple
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from functools import wraps
from inspect import signature
from pathlib import Path
from typing import *
from unittest.mock import AsyncMock, MagicMock

import anyio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from pydantic import BaseModel
from pydantic.main import ModelMetaclass

from fastkafka._components.aiokafka_consumer_loop import (
    aiokafka_consumer_loop,
    sanitize_kafka_config,
)
from fastkafka._components.asyncapi import (
    ConsumeCallable,
    ContactInfo,
    KafkaBroker,
    KafkaBrokers,
    KafkaServiceInfo,
    export_async_spec,
)
from fastkafka._components.benchmarking import _benchmark
from fastkafka._components.logger import get_logger
from fastkafka._components.meta import delegates, export, filter_using_signature, patch
from fastkafka._components.producer_decorator import ProduceCallable, producer_decorator
from fastkafka._components.task_streaming import StreamExecutor
from fastkafka._components.helpers import remove_suffix

In [None]:
# | export

if TYPE_CHECKING:
    from fastapi import FastAPI

In [None]:
import os
import shutil
import unittest.mock
from contextlib import asynccontextmanager

import asyncer
import pytest
import requests
import uvicorn
import yaml
from fastapi import FastAPI
from pydantic import EmailStr, Field, HttpUrl

from fastkafka._components.helpers import true_after
from fastkafka._components.logger import suppress_timestamps
from fastkafka._server import run_in_process
from fastkafka.encoder import avro_decoder, avro_encoder, json_decoder, json_encoder
from fastkafka.testing import ApacheKafkaBroker, Tester, mock_AIOKafkaProducer_send

In [None]:
# | export

logger = get_logger(__name__)

In [None]:
suppress_timestamps()
logger = get_logger(__name__, level=20)
logger.info("ok")

[INFO] __main__: ok


In [None]:
# | notest

# allows async calls in notebooks

import nest_asyncio

In [None]:
# | notest

nest_asyncio.apply()

### Constructor utilities

In [None]:
# | export


@delegates(AIOKafkaConsumer, but=["bootstrap_servers_id"])
@delegates(AIOKafkaProducer, but=["bootstrap_servers_id"], keep=True)
def _get_kafka_config(
    **kwargs: Any,
) -> Dict[str, Any]:
    """Get kafka config"""
    allowed_keys = set(signature(_get_kafka_config).parameters.keys())
    if not set(kwargs.keys()) <= allowed_keys:
        unallowed_keys = ", ".join(
            sorted([f"'{x}'" for x in set(kwargs.keys()).difference(allowed_keys)])
        )
        raise ValueError(f"Unallowed key arguments passed: {unallowed_keys}")
    retval = kwargs.copy()

    # todo: check this values
    config_defaults = {
        "bootstrap_servers_id": "localhost",
        "auto_offset_reset": "earliest",
        "max_poll_records": 100,
    }
    for key, value in config_defaults.items():
        if key not in retval:
            retval[key] = value

    return retval

In [None]:
assert _get_kafka_config() == {
    "bootstrap_servers_id": "localhost",
    "auto_offset_reset": "earliest",
    "max_poll_records": 100,
}

assert _get_kafka_config(max_poll_records=1_000) == {
    "bootstrap_servers_id": "localhost",
    "auto_offset_reset": "earliest",
    "max_poll_records": 1_000,
}

In [None]:
with pytest.raises(ValueError) as e:
    _get_kafka_config(random_key=1_000, whatever="whocares")
assert e.value.args == ("Unallowed key arguments passed: 'random_key', 'whatever'",)

In [None]:
# | export


def _get_kafka_brokers(
    kafka_brokers: Optional[Union[Dict[str, Any], KafkaBrokers]] = None
) -> KafkaBrokers:
    """Get Kafka brokers

    Args:
        kafka_brokers: Kafka brokers

    """
    if kafka_brokers is None:
        retval: KafkaBrokers = KafkaBrokers(
            brokers={
                "localhost": KafkaBroker(  # type: ignore
                    url="https://localhost",
                    description="Local (dev) Kafka broker",
                    port="9092",
                    grouping="localhost",
                )
            }
        )
    else:
        if isinstance(kafka_brokers, KafkaBrokers):
            return kafka_brokers

        retval = KafkaBrokers(
            brokers={
                k: (
                    [
                        KafkaBroker.parse_raw(
                            unwrapped_v.json()
                            if hasattr(unwrapped_v, "json")
                            else json.dumps(unwrapped_v)
                        )
                        for unwrapped_v in v
                    ]
                    if isinstance(v, list)
                    else KafkaBroker.parse_raw(
                        v.json() if hasattr(v, "json") else json.dumps(v)
                    )
                )
                for k, v in kafka_brokers.items()
            }
        )

    return retval

In [None]:
assert (
    _get_kafka_brokers(None).json()
    == '{"brokers": {"localhost": {"url": "https://localhost", "description": "Local (dev) Kafka broker", "protocol": "kafka", "variables": {"port": {"default": "9092"}}}}}'
)

assert (
    _get_kafka_brokers(dict(localhost=dict(url="localhost"))).json()
    == '{"brokers": {"localhost": {"url": "localhost", "description": "Kafka broker", "protocol": "kafka", "variables": {"port": {"default": "9092"}}}}}'
)

assert (
    _get_kafka_brokers(
        dict(localhost=dict(url="localhost"), staging=dict(url="staging.airt.ai"))
    ).json()
    == '{"brokers": {"localhost": {"url": "localhost", "description": "Kafka broker", "protocol": "kafka", "variables": {"port": {"default": "9092"}}}, "staging": {"url": "staging.airt.ai", "description": "Kafka broker", "protocol": "kafka", "variables": {"port": {"default": "9092"}}}}}'
)

assert (
    _get_kafka_brokers(
        dict(
            localhost=[dict(url="localhost123"), dict(url="localhost321")],
            staging=dict(url="staging.airt.ai"),
        )
    ).json()
    == '{"brokers": {"localhost-bootstrap-server-0": {"url": "localhost123", "description": "Kafka broker", "protocol": "kafka", "variables": {"port": {"default": "9092"}}}, "localhost-bootstrap-server-1": {"url": "localhost321", "description": "Kafka broker", "protocol": "kafka", "variables": {"port": {"default": "9092"}}}, "staging": {"url": "staging.airt.ai", "description": "Kafka broker", "protocol": "kafka", "variables": {"port": {"default": "9092"}}}}}'
)

In [None]:
# | export

def _get_broker_addr_list(
    brokers: Union[List[KafkaBroker], KafkaBroker]
) -> Union[str, List[str]]:
    if isinstance(brokers, list):
        return [f"{broker.url}:{broker.port}" for broker in brokers]
    else:
        return f"{brokers.url}:{brokers.port}"

In [None]:
brokers_config = _get_kafka_brokers(
    dict(
        localhost=[dict(url="localhost123"), dict(url="localhost321")],
        staging=dict(url="staging.airt.ai"),
    )
)

assert _get_broker_addr_list(brokers_config.brokers["localhost"]) == ['localhost123:9092', 'localhost321:9092']
assert _get_broker_addr_list(brokers_config.brokers["staging"]) == 'staging.airt.ai:9092'

In [None]:
# | export


def _get_topic_name(
    topic_callable: Union[ConsumeCallable, ProduceCallable], prefix: str = "on_"
) -> str:
    """Get topic name
    Args:
        topic_callable: a function
        prefix: prefix of the name of the function followed by the topic name

    Returns:
        The name of the topic
    """
    topic = topic_callable.__name__
    if not topic.startswith(prefix) or len(topic) <= len(prefix):
        raise ValueError(f"Function name '{topic}' must start with {prefix}")
    topic = topic[len(prefix) :]

    return topic

In [None]:
def on_topic_name_1():
    pass


assert _get_topic_name(on_topic_name_1) == "topic_name_1"

assert _get_topic_name(on_topic_name_1, prefix="on_topic_") == "name_1"

In [None]:
# | export


def _get_contact_info(
    name: str = "Author",
    url: str = "https://www.google.com",
    email: str = "noreply@gmail.com",
) -> ContactInfo:
    return ContactInfo(name=name, url=url, email=email)  # type: ignore

In [None]:
assert _get_contact_info() == ContactInfo(
    name="Author",
    url=HttpUrl(url="https://www.google.com", scheme="http"),
    email="noreply@gmail.com",
)

In [None]:
# | exporti

I = TypeVar("I", bound=BaseModel)
O = TypeVar("O", BaseModel, Awaitable[BaseModel])

F = TypeVar("F", bound=Callable)

In [None]:
# | export


@export("fastkafka")
class FastKafka:
    @delegates(_get_kafka_config)
    def __init__(
        self,
        *,
        title: Optional[str] = None,
        description: Optional[str] = None,
        version: Optional[str] = None,
        contact: Optional[Dict[str, str]] = None,
        kafka_brokers: Optional[Dict[str, Any]] = None,
        root_path: Optional[Union[Path, str]] = None,
        lifespan: Optional[Callable[["FastKafka"], AsyncContextManager[None]]] = None,
        **kwargs: Any,
    ):
        """Creates FastKafka application

        Args:
            title: optional title for the documentation. If None,
                the title will be set to empty string
            description: optional description for the documentation. If
                None, the description will be set to empty string
            version: optional version for the documentation. If None,
                the version will be set to empty string
            contact: optional contact for the documentation. If None, the
                contact will be set to placeholder values:
                name='Author' url=HttpUrl(' https://www.google.com ', ) email='noreply@gmail.com'
            kafka_brokers: dictionary describing kafka brokers used for setting
                the bootstrap server when running the applicationa and for
                generating documentation. Defaults to
                    {
                        "localhost": {
                            "url": "localhost",
                            "description": "local kafka broker",
                            "port": "9092",
                        }
                    }
            root_path: path to where documentation will be created
            lifespan: asynccontextmanager that is used for setting lifespan hooks.
                __aenter__ is called before app start and __aexit__ after app stop.
                The lifespan is called whe application is started as async context
                manager, e.g.:`async with kafka_app...`

        """

        # this is needed for documentation generation
        self._title = title if title is not None else ""
        self._description = description if description is not None else ""
        self._version = version if version is not None else ""
        if contact is not None:
            self._contact_info = _get_contact_info(**contact)
        else:
            self._contact_info = _get_contact_info()

        self._kafka_service_info = KafkaServiceInfo(
            title=self._title,
            version=self._version,
            description=self._description,
            contact=self._contact_info,
        )

        if kafka_brokers is None:
            kafka_brokers = {
                "localhost": {
                    "url": "localhost",
                    "description": "local kafka broker",
                    "port": "9092",
                }
            }

        self._kafka_brokers = _get_kafka_brokers(kafka_brokers)

        self._override_brokers: List[KafkaBrokers] = []

        self._root_path = Path(".") if root_path is None else Path(root_path)

        self._asyncapi_path = self._root_path / "asyncapi"
        (self._asyncapi_path / "docs").mkdir(exist_ok=True, parents=True)
        (self._asyncapi_path / "spec").mkdir(exist_ok=True, parents=True)

        # this is used as default parameters for creating AIOProducer and AIOConsumer objects
        self._kafka_config = _get_kafka_config(**kwargs)

        #
        self._consumers_store: Dict[
            str,
            Tuple[
                ConsumeCallable,
                Callable[[bytes, ModelMetaclass], Any],
                Union[str, StreamExecutor, None],
                Optional[KafkaBrokers],
                Dict[str, Any],
            ],
        ] = {}

        self._producers_store: Dict[  # type: ignore
            str,
            Tuple[
                ProduceCallable,
                AIOKafkaProducer,
                Optional[KafkaBrokers],
                Dict[str, Any],
            ],
        ] = {}

        self._producers_list: List[AIOKafkaProducer] = []  # type: ignore

        self.benchmark_results: Dict[str, Dict[str, Any]] = {}

        # background tasks
        self._scheduled_bg_tasks: List[Callable[..., Coroutine[Any, Any, Any]]] = []
        self._bg_task_group_generator: Optional[anyio.abc.TaskGroup] = None
        self._bg_tasks_group: Optional[anyio.abc.TaskGroup] = None

        # todo: use this for errrors
        self._on_error_topic: Optional[str] = None

        self.lifespan = lifespan
        self.lifespan_ctx: Optional[AsyncContextManager[None]] = None

        self._is_started: bool = False
        self._is_shutting_down: bool = False
        self._kafka_consumer_tasks: List[asyncio.Task[Any]] = []
        self._kafka_producer_tasks: List[asyncio.Task[Any]] = []
        self._running_bg_tasks: List[asyncio.Task[Any]] = []
        self.run = False

        # testing functions
        self.AppMocks = None
        self.mocks = None
        self.awaited_mocks = None

    @property
    def is_started(self) -> bool:
        """Property indicating whether the FastKafka object is started.

        The is_started property indicates if the FastKafka object is currently 
        in a started state. This implies that all background tasks, producers, 
        and consumers have been initiated, and the object is successfully connected 
        to the Kafka broker. 

        Returns:
            bool: True if the object is started, False otherwise.
        """
        return self._is_started

    def set_kafka_broker(self, kafka_broker_name: str) -> None:
        """
        Sets the Kafka broker to start FastKafka with

        Args:
            kafka_broker_name: The name of the Kafka broker to start FastKafka

        Raises:
            ValueError: If the provided kafka_broker_name is not found in dictionary of kafka_brokers

        Returns:
            None
        """

        if kafka_broker_name not in self._kafka_brokers.brokers:
            raise ValueError(
                f"Given kafka_broker_name '{kafka_broker_name}' is not found in kafka_brokers, available options are {self._kafka_brokers.brokers.keys()}"
            )

        self._kafka_config["bootstrap_servers_id"] = kafka_broker_name

    async def __aenter__(self) -> "FastKafka":
        if self.lifespan is not None:
            self.lifespan_ctx = self.lifespan(self)
            await self.lifespan_ctx.__aenter__()
        await self._start()
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        tb: Optional[types.TracebackType],
    ) -> None:
        await self._stop()
        if self.lifespan_ctx is not None:
            await self.lifespan_ctx.__aexit__(exc_type, exc, tb)

    async def _start(self) -> None:
        raise NotImplementedError

    async def _stop(self) -> None:
        raise NotImplementedError

    def consumes(
        self,
        topic: Optional[str] = None,
        decoder: str = "json",
        *,
        prefix: str = "on_",
        brokers: Optional[KafkaBrokers] = None,
        description: Optional[str] = None,
        **kwargs: Dict[str, Any],
    ) -> ConsumeCallable:
        raise NotImplementedError

    def produces(
        self,
        topic: Optional[str] = None,
        encoder: str = "json",
        *,
        prefix: str = "to_",
        brokers: Optional[KafkaBrokers] = None,
        description: Optional[str] = None,
        **kwargs: Dict[str, Any],
    ) -> ProduceCallable:
        raise NotImplementedError

    def benchmark(
        self,
        interval: Union[int, timedelta] = 1,
        *,
        sliding_window_size: Optional[int] = None,
    ) -> Callable[[F], F]:
        raise NotImplementedError

    def run_in_background(
        self,
    ) -> Callable[[], Any]:
        raise NotImplementedError

    def _populate_consumers(
        self,
        is_shutting_down_f: Callable[[], bool],
    ) -> None:
        raise NotImplementedError

    def get_topics(self) -> Iterable[str]:
        raise NotImplementedError

    async def _populate_producers(self) -> None:
        raise NotImplementedError

    async def _populate_bg_tasks(self) -> None:
        raise NotImplementedError

    def create_docs(self) -> None:
        raise NotImplementedError

    def create_mocks(self) -> None:
        raise NotImplementedError

    async def _shutdown_consumers(self) -> None:
        raise NotImplementedError

    async def _shutdown_producers(self) -> None:
        raise NotImplementedError

    async def _shutdown_bg_tasks(self) -> None:
        raise NotImplementedError

In [None]:
assert FastKafka.__module__ == "fastkafka"

In [None]:
kafka_app = FastKafka(
    kafka_brokers=dict(
        localhost=[dict(url="localhost", port=9092), dict(url="localhost", port=9093)]
    )
)
assert kafka_app._kafka_brokers == KafkaBrokers(
    brokers={
        "localhost": [
            KafkaBroker(
                url="localhost",
                description="Kafka broker",
                port="9092",
                protocol="kafka",
                security=None,
            ),
            KafkaBroker(
                url="localhost",
                description="Kafka broker",
                port="9093",
                protocol="kafka",
                security=None,
            ),
        ]
    }
)

In [None]:
kafka_app = FastKafka()
assert kafka_app._kafka_brokers == KafkaBrokers(
    brokers={
        "localhost": KafkaBroker(
            url="localhost",
            description="local kafka broker",
            port="9092",
            protocol="kafka",
            security=None,
        )
    }
)

In [None]:
def create_testing_app(
    *, root_path: str = "/tmp/000_FastKafka", bootstrap_servers: Optional[str] = None
):
    if Path(root_path).exists():
        shutil.rmtree(root_path)

    host, port = None, None
    if bootstrap_servers is not None:
        host, port = bootstrap_servers.split(":")

    kafka_app = FastKafka(
        kafka_brokers={
            "localhost": {
                "url": host if host is not None else "localhost",
                "name": "development",
                "description": "Local (dev) Kafka broker",
                "port": port if port is not None else 9092,
            }
        },
        root_path=root_path,
    )
    kafka_app.set_kafka_broker(kafka_broker_name="localhost")

    return kafka_app

In [None]:
app = create_testing_app()
assert Path("/tmp/000_FastKafka").exists()
app

<fastkafka.FastKafka>

In [None]:
# | export


def _get_decoder_fn(decoder: str) -> Callable[[bytes, ModelMetaclass], Any]:
    """
    Imports and returns decoder function based on input
    """
    if decoder == "json":
        from fastkafka._components.encoder.json import json_decoder

        return json_decoder
    elif decoder == "avro":
        try:
            from fastkafka._components.encoder.avro import avro_decoder
        except ModuleNotFoundError:
            raise ModuleNotFoundError(
                "Unable to import avro packages. Please install FastKafka using the command 'fastkafka[avro]'"
            )
        return avro_decoder
    else:
        raise ValueError(f"Unknown decoder - {decoder}")

In [None]:
actual = _get_decoder_fn("json")
assert actual == json_decoder

actual = _get_decoder_fn("avro")
assert actual == avro_decoder

In [None]:
# | export


def _prepare_and_check_brokers(
    app: FastKafka, kafka_brokers: Optional[Union[Dict[str, Any], KafkaBrokers]]
) -> Optional[KafkaBrokers]:
    if kafka_brokers is not None:
        prepared_brokers = _get_kafka_brokers(kafka_brokers)
        if prepared_brokers.brokers.keys() != app._kafka_brokers.brokers.keys():
            raise ValueError(
                f"To override application default brokers, you must define all of the broker options. Default defined: {set(app._kafka_brokers.brokers.keys())}, override defined: {set(prepared_brokers.brokers.keys())}"
            )
        return prepared_brokers
    return None

In [None]:
# | export


def _resolve_key(key: str, dictionary: Dict[str, Any]) -> str:
    i = 0
    resolved_key = f"{key}_{i}"
    while resolved_key in dictionary:
        i += 1
        resolved_key = f"{key}_{i}"
    return resolved_key

In [None]:
# | export


@patch
@delegates(AIOKafkaConsumer)
def consumes(
    self: FastKafka,
    topic: Optional[str] = None,
    decoder: Union[str, Callable[[bytes, ModelMetaclass], Any]] = "json",
    *,
    executor: Union[str, StreamExecutor, None] = None,
    brokers: Optional[Union[Dict[str, Any], KafkaBrokers]] = None,
    prefix: str = "on_",
    description: Optional[str] = None,
    **kwargs: Dict[str, Any],
) -> Callable[[ConsumeCallable], ConsumeCallable]:
    """Decorator registering the callback called when a message is received in a topic.

    This function decorator is also responsible for registering topics for AsyncAPI specificiation and documentation.

    Args:
        topic: Kafka topic that the consumer will subscribe to and execute the
            decorated function when it receives a message from the topic,
            default: None. If the topic is not specified, topic name will be
            inferred from the decorated function name by stripping the defined prefix
        decoder: Decoder to use to decode messages consumed from the topic,
                default: json - By default, it uses json decoder to decode
                bytes to json string and then it creates instance of pydantic
                BaseModel. It also accepts custom decoder function.
        executor: Type of executor to choose for consuming tasks. Avaliable options
                are "SequentialExecutor" and "DynamicTaskExecutor". The default option is
                "SequentialExecutor" which will execute the consuming tasks sequentially.
                If the consuming tasks have high latency it is recommended to use
                "DynamicTaskExecutor" which will wrap the consuming functions into tasks
                and run them in on asyncio loop in background. This comes with a cost of
                increased overhead so use it only in cases when your consume functions have
                high latency such as database queries or some other type of networking.
        prefix: Prefix stripped from the decorated function to define a topic name
                if the topic argument is not passed, default: "on_". If the decorated
                function name is not prefixed with the defined prefix and topic argument
                is not passed, then this method will throw ValueError
        brokers: Optional argument specifying multiple broker clusters for consuming 
                messages from different Kafka clusters in FastKafka.
        description: Optional description of the consuming function async docs. 
                If not provided, consuming function __doc__ attr will be used.

    Returns:
        A function returning the same function

    Throws:
        ValueError

    """

    def _decorator(
        on_topic: ConsumeCallable,
        topic: Optional[str] = topic,
        decoder: Union[str, Callable[[bytes, ModelMetaclass], Any]] = decoder,
        executor: Union[str, StreamExecutor, None] = executor,
        brokers: Optional[Union[Dict[str, Any], KafkaBrokers]] = brokers,
        description: Optional[str] = description,
        kwargs: Dict[str, Any] = kwargs,
    ) -> ConsumeCallable:
        topic_resolved: str = (
            _get_topic_name(topic_callable=on_topic, prefix=prefix)
            if topic is None
            else topic
        )

        decoder_fn = _get_decoder_fn(decoder) if isinstance(decoder, str) else decoder

        prepared_broker = _prepare_and_check_brokers(self, brokers)
        if prepared_broker is not None:
            self._override_brokers.append(prepared_broker.brokers) # type: ignore
            
        if description is not None:
            setattr(on_topic, "description", description)

        self._consumers_store[_resolve_key(topic_resolved, self._consumers_store)] = (
            on_topic,
            decoder_fn,
            executor,
            prepared_broker,
            kwargs,
        )
        setattr(self, on_topic.__name__, on_topic)
        return on_topic

    return _decorator

In [None]:
app = create_testing_app()


# Basic check
@app.consumes()
def on_my_topic_1(msg: BaseModel) -> None:
    pass


assert app._consumers_store["my_topic_1_0"] == (
    on_my_topic_1,
    json_decoder,
    None,
    None,
    {},
), app._consumers_store

assert hasattr(app, "on_my_topic_1")


# Check executor setting
@app.consumes(executor="DynamicTaskExecutor")
def on_my_topic_12(msg: BaseModel) -> None:
    pass


assert app._consumers_store["my_topic_12_0"] == (
    on_my_topic_12,
    json_decoder,
    "DynamicTaskExecutor",
    None,
    {},
), app._consumers_store["my_topic_12"]

assert hasattr(app, "on_my_topic_12")


# Check topic setting
@app.consumes(topic="test_topic_1")
def some_func_name(msg: BaseModel) -> None:
    pass


assert app._consumers_store["test_topic_1_0"] == (
    some_func_name,
    json_decoder,
    None,
    None,
    {},
), app._consumers_store


# Check prefix change
@app.consumes(prefix="for_")
def for_test_topic_3(msg: BaseModel) -> None:
    pass


assert app._consumers_store["test_topic_3_0"] == (
    for_test_topic_3,
    json_decoder,
    None,
    None,
    {},
), app._consumers_store

assert hasattr(app, "for_test_topic_3")

# Check passing of kwargs
kwargs = {"arg1": "val1", "arg2": 2}


@app.consumes(topic="test_topic", **kwargs)
def for_test_kwargs(msg: BaseModel):
    pass


assert app._consumers_store["test_topic_0"] == (
    for_test_kwargs,
    json_decoder,
    None,
    None,
    kwargs,
), app._consumers_store

assert hasattr(app, "for_test_kwargs")

# Check description setting
@app.consumes(description = "Some generic description")
def on_test_topic_description(msg: BaseModel) -> None:
    pass


assert app._consumers_store["test_topic_description_0"][0].description == "Some generic description"

In [None]:
# check broker overriding
@app.consumes(
    brokers=dict(
        localhost=[
            dict(url="localhost", port=9092),
        ]
    ),
)
def on_my_topic_12345(msg: BaseModel) -> None:
    pass


expected = (
    on_my_topic_12345,
    json_decoder,
    None,
    KafkaBrokers(
        brokers={
            "localhost": [
                KafkaBroker(
                    url="localhost",
                    description="Kafka broker",
                    port="9092",
                    protocol="kafka",
                    security=None,
                ),
            ]
        }
    ),
    {},
)

actual = app._consumers_store["my_topic_12345_0"]

assert actual == expected, f"{actual}!={expected}"

assert hasattr(app, "on_my_topic_12345")

with pytest.raises(ValueError) as e:

    @app.consumes(
        brokers=dict(
            not_localhost=[
                dict(url="localhost", port=9092),
                dict(url="localhost", port=9093),
            ]
        ),
    )
    def on_my_topic_12345(msg: BaseModel) -> None:
        pass


assert (
    e.value.args[0]
    == "To override application default brokers, you must define all of the broker options. Default defined: {'localhost'}, override defined: {'not_localhost'}"
)

In [None]:
# | export


def _get_encoder_fn(encoder: str) -> Callable[[BaseModel], bytes]:
    """
    Imports and returns encoder function based on input
    """
    if encoder == "json":
        from fastkafka._components.encoder.json import json_encoder

        return json_encoder
    elif encoder == "avro":
        try:
            from fastkafka._components.encoder.avro import avro_encoder
        except ModuleNotFoundError:
            raise ModuleNotFoundError(
                "Unable to import avro packages. Please install FastKafka using the command 'fastkafka[avro]'"
            )
        return avro_encoder
    else:
        raise ValueError(f"Unknown encoder - {encoder}")

In [None]:
actual = _get_encoder_fn("json")
assert actual == json_encoder

actual = _get_encoder_fn("avro")
assert actual == avro_encoder

In [None]:
# | export


@patch
@delegates(AIOKafkaProducer)
def produces(
    self: FastKafka,
    topic: Optional[str] = None,
    encoder: Union[str, Callable[[BaseModel], bytes]] = "json",
    *,
    prefix: str = "to_",
    brokers: Optional[Union[Dict[str, Any], KafkaBrokers]] = None,
    description: Optional[str] = None,
    **kwargs: Dict[str, Any],
) -> Callable[[ProduceCallable], ProduceCallable]:
    """Decorator registering the callback called when delivery report for a produced message is received

    This function decorator is also responsible for registering topics for AsyncAPI specificiation and documentation.

    Args:
        topic: Kafka topic that the producer will send returned values from
            the decorated function to, default: None- If the topic is not
            specified, topic name will be inferred from the decorated function
            name by stripping the defined prefix.
        encoder: Encoder to use to encode messages before sending it to topic,
                default: json - By default, it uses json encoder to convert
                pydantic basemodel to json string and then encodes the string to bytes
                using 'utf-8' encoding. It also accepts custom encoder function.
        prefix: Prefix stripped from the decorated function to define a topic
            name if the topic argument is not passed, default: "to_". If the
            decorated function name is not prefixed with the defined prefix
            and topic argument is not passed, then this method will throw ValueError
        brokers: Optional argument specifying multiple broker clusters for consuming 
            messages from different Kafka clusters in FastKafka.
        description: Optional description of the producing function async docs. 
                If not provided, producing function __doc__ attr will be used.

    Returns:
        A function returning the same function

    Raises:
        ValueError: when needed
    """

    def _decorator(
        to_topic: ProduceCallable,
        topic: Optional[str] = topic,
        brokers: Optional[Union[Dict[str, Any], KafkaBrokers]] = brokers,
        description: Optional[str] = description,
        kwargs: Dict[str, Any] = kwargs,
    ) -> ProduceCallable:
        topic_resolved: str = (
            _get_topic_name(topic_callable=to_topic, prefix=prefix)
            if topic is None
            else topic
        )

        topic_key = _resolve_key(topic_resolved, self._producers_store)

        prepared_broker = _prepare_and_check_brokers(self, brokers)
        if prepared_broker is not None:
            self._override_brokers.append(prepared_broker.brokers)  # type: ignore

        if description is not None:
            setattr(to_topic, "description", description)
            
        self._producers_store[topic_key] = (
            to_topic,
            None,
            prepared_broker,
            kwargs,
        )
        encoder_fn = _get_encoder_fn(encoder) if isinstance(encoder, str) else encoder
        decorated = producer_decorator(
            self._producers_store,
            to_topic,
            topic_key,
            encoder_fn=encoder_fn,
        )
        setattr(self, to_topic.__name__, decorated)
        return decorated

    return _decorator

In [None]:
app = create_testing_app()


# Basic check
async def to_my_topic_1(msg: BaseModel) -> None:
    pass


# Must be done without sugar to keep the original function reference
check_func = to_my_topic_1
to_my_topic_1 = app.produces()(to_my_topic_1)

assert app._producers_store["my_topic_1_0"] == (
    check_func,
    None,
    None,
    {},
), f"{app._producers_store}, {to_my_topic_1}"

assert hasattr(app, "to_my_topic_1")


# Check topic setting
async def some_func_name(msg: BaseModel) -> None:
    pass


check_func = some_func_name
some_func_name = app.produces(topic="test_topic_2")(some_func_name)

assert app._producers_store["test_topic_2_0"] == (
    check_func,
    None,
    None,
    {},
), app._producers_store

assert hasattr(app, "some_func_name")


# Check prefix change
async def for_test_topic_3(msg: BaseModel) -> None:
    pass


check_func = for_test_topic_3
some_func_name = app.produces(prefix="for_")(for_test_topic_3)

assert app._producers_store["test_topic_3_0"] == (
    check_func,
    None,
    None,
    {},
), app._producers_store

# Check passing of kwargs
kwargs = {"arg1": "val1", "arg2": 2}

assert hasattr(app, "for_test_topic_3")


async def for_test_kwargs(msg: BaseModel):
    pass


check_func = for_test_kwargs
for_test_kwargs = app.produces(topic="test_topic_0", **kwargs)(for_test_kwargs)

assert app._producers_store["test_topic_0_0"] == (
    check_func,
    None,
    None,
    kwargs,
), app._producers_store

assert hasattr(app, "for_test_kwargs")

# Check description setting
async def to_test_topic_description(msg: BaseModel) -> None:
    pass

some_func_name = app.produces(description="Some generic producer")(to_test_topic_description)

assert app._producers_store["test_topic_description_0"][0].description == "Some generic producer"

In [None]:
async def to_test_topic_broker_override(msg: BaseModel) -> None:
    pass


check_func = to_test_topic_broker_override
some_func_name = app.produces(
    brokers=dict(
        localhost=[
            dict(url="localhost", port=9092),
        ]
    ),
)(to_test_topic_broker_override)

assert app._producers_store["test_topic_broker_override_0"] == (
    check_func,
    None,
    KafkaBrokers(
        brokers={
            "localhost": [
                KafkaBroker(
                    url="localhost",
                    description="Kafka broker",
                    port="9092",
                    protocol="kafka",
                    security=None,
                ),
            ]
        }
    ),
    {},
), app._producers_store


async def to_test_topic_broker_wrong(msg: BaseModel) -> None:
    pass


with pytest.raises(ValueError) as e:
    check_func = to_test_topic_broker_wrong
    some_func_name = app.produces(
        brokers=dict(
            not_localhost=[
                dict(url="localhost", port=9092),
                dict(url="localhost", port=9093),
            ]
        ),
    )(to_test_topic_broker_wrong)

assert (
    e.value.args[0]
    == "To override application default brokers, you must define all of the broker options. Default defined: {'localhost'}, override defined: {'not_localhost'}"
)

In [None]:
# | export


@patch
def get_topics(self: FastKafka) -> Iterable[str]:
    """
    Get all topics for both producing and consuming.

    Returns:
        A set of topics for both producing and consuming.
    """
    produce_topics = set([remove_suffix(topic) for topic in self._producers_store])
    consume_topics = set([remove_suffix(topic) for topic in self._consumers_store])
    return consume_topics.union(produce_topics)

In [None]:
app = create_testing_app()


@app.produces()
async def to_topic_1() -> BaseModel:
    pass


@app.consumes()
def on_topic_2(msg: BaseModel):
    pass


assert app.get_topics() == set(["topic_1", "topic_2"]), f"{app.get_topics()=}"

In [None]:
# | export


@patch
def run_in_background(
    self: FastKafka,
) -> Callable[
    [Callable[..., Coroutine[Any, Any, Any]]], Callable[..., Coroutine[Any, Any, Any]]
]:
    """
    Decorator to schedule a task to be run in the background.

    This decorator is used to schedule a task to be run in the background when the app's `_on_startup` event is triggered.

    Returns:
        Callable[None, None]: A decorator function that takes a background task as an input and stores it to be run in the backround.
    """

    def _decorator(
        bg_task: Callable[..., Coroutine[Any, Any, Any]]
    ) -> Callable[..., Coroutine[Any, Any, Any]]:
        """
        Store the background task.

        Args:
            bg_task (Callable[[], None]): The background task to be run asynchronously.

        Returns:
            Callable[[], None]: Original background task.
        """
        logger.info(
            f"run_in_background() : Adding function '{bg_task.__name__}' as background task"
        )
        self._scheduled_bg_tasks.append(bg_task)

        return bg_task

    return _decorator

In [None]:
# Check if the background job is getting registered

app = create_testing_app()


@app.run_in_background()
async def async_background_job():
    """Async background job"""
    pass


assert app._scheduled_bg_tasks[0] == async_background_job, app._scheduled_bg_tasks[0]
assert app._scheduled_bg_tasks.__len__() == 1

[INFO] __main__: run_in_background() : Adding function 'async_background_job' as background task


In [None]:
class MyInfo(BaseModel):
    mobile: str = Field(..., example="+385987654321")
    name: str = Field(..., example="James Bond")


class MyMsgUrl(BaseModel):
    info: MyInfo = Field(..., example=dict(mobile="+385987654321", name="James Bond"))
    url: HttpUrl = Field(..., example="https://sis.gov.uk/agents/007")


class MyMsgEmail(BaseModel):
    msg_url: MyMsgUrl = Field(
        ...,
        example=dict(
            info=dict(mobile="+385987654321", name="James Bond"),
            url="https://sis.gov.uk/agents/007",
        ),
    )
    email: EmailStr = Field(..., example="agent-007@sis.gov.uk")


def setup_testing_app(bootstrap_servers=None, override_bootstrap_servers=None):
    app = create_testing_app(bootstrap_servers=bootstrap_servers)

    host, port = None, None
    if override_bootstrap_servers is not None:
        host, port = override_bootstrap_servers.split(":")

    override_broker = {
        "localhost": {
            "url": host if host is not None else "localhost",
            "name": "development",
            "description": "Local (dev) Kafka broker",
            "port": port if port is not None else 9092,
        }
    }

    @app.consumes("my_topic_1", description="Consumer description")
    def on_my_topic_one(msg: MyMsgUrl) -> None:
        logger.debug(f"on_my_topic_one(msg={msg},)")

    @app.consumes(topic="my_topic_1", brokers=override_broker)
    async def on_my_topic_1(msg: MyMsgEmail) -> None:
        logger.debug(f"on_my_topic_2(msg={msg},)")

    with pytest.raises(ValueError) as e:

        @app.consumes()
        def my_topic_3(msg: MyMsgEmail) -> None:
            raise NotImplemented

    @app.produces(description="Producer description")
    async def to_my_topic_3(url: str) -> MyMsgUrl:
        logger.debug(f"on_my_topic_3(msg={url}")
        return MyMsgUrl(info=MyInfo("+3851987654321", "Sean Connery"), url=url)

    @app.produces()
    async def to_my_topic_4(msg: MyMsgEmail) -> MyMsgEmail:
        logger.debug(f"on_my_topic_4(msg={msg}")
        return msg

    @app.produces(topic="my_topic_4", brokers=override_broker)
    async def to_my_topic_4_2(url: str) -> MyMsgUrl:
        logger.debug(f"on_my_topic_5(msg={url}")
        return MyMsgUrl(info=MyInfo("+3859123456789", "John Wayne"), url=url)

    @app.run_in_background()
    async def long_bg_job():
        logger.debug(f"long_bg_job()")
        await asyncio.sleep(100)

    return app

In [None]:
app = setup_testing_app()

assert set(app._consumers_store.keys()) == set(["my_topic_1_0", "my_topic_1_1"])
assert set(app._producers_store.keys()) == set(
    ["my_topic_3_0", "my_topic_4_0", "my_topic_4_1"]
)

print(f"app._kafka_service_info={app._kafka_service_info}")
print(f"app._kafka_brokers={app._kafka_brokers}")

[INFO] __main__: run_in_background() : Adding function 'long_bg_job' as background task
app._kafka_service_info=title='' version='' description='' contact=ContactInfo(name='Author', url=HttpUrl('https://www.google.com', ), email='noreply@gmail.com')
app._kafka_brokers=brokers={'localhost': KafkaBroker(url='localhost', description='Local (dev) Kafka broker', port='9092', protocol='kafka', security=None)}


In [None]:
# | export


@patch
def _populate_consumers(
    self: FastKafka,
    is_shutting_down_f: Callable[[], bool],
) -> None:
    default_config: Dict[str, Any] = filter_using_signature(
        AIOKafkaConsumer, **self._kafka_config
    )

    bootstrap_server = self._kafka_config["bootstrap_servers_id"]

    self._kafka_consumer_tasks = [
        asyncio.create_task(
            aiokafka_consumer_loop(
                topic="_".join(topic.split("_")[:-1]),
                decoder_fn=decoder_fn,
                callback=consumer,
                msg_type=signature(consumer).parameters["msg"].annotation,
                is_shutting_down_f=is_shutting_down_f,
                executor=executor,
                **{
                    **default_config,
                    **override_config,
                    **{
                        "bootstrap_servers": _get_broker_addr_list(
                            kafka_brokers.brokers[bootstrap_server]
                            if kafka_brokers is not None
                            else self._kafka_brokers.brokers[bootstrap_server]
                        )
                    },
                },
            )
        )
        for topic, (
            consumer,
            decoder_fn,
            executor,
            kafka_brokers,
            override_config,
        ) in self._consumers_store.items()
    ]


@patch
async def _shutdown_consumers(
    self: FastKafka,
) -> None:
    if self._kafka_consumer_tasks:
        await asyncio.wait(self._kafka_consumer_tasks)

In [None]:
async with ApacheKafkaBroker() as bootstrap_server:
    async with ApacheKafkaBroker() as override_bootstrap_server:
        app = setup_testing_app(
            bootstrap_servers=bootstrap_server,
            override_bootstrap_servers=override_bootstrap_server,
        )
        app._populate_consumers(is_shutting_down_f=true_after(1))
        assert len(app._kafka_consumer_tasks) == 2

        await app._shutdown_consumers()

        assert all([t.done() for t in app._kafka_consumer_tasks])

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092
[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=44681
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: kafka startup failed, generating a new port and retryin

In [None]:
# | export


# TODO: Add passing of vars
async def _create_producer(  # type: ignore
    *,
    callback: ProduceCallable,
    default_config: Dict[str, Any],
    override_config: Dict[str, Any],
    bootstrap_servers: Union[str, List[str]],
    producers_list: List[AIOKafkaProducer],
) -> AIOKafkaProducer:
    """Creates a producer

    Args:
        callback: A callback function that is called when the producer is ready.
        producer: An existing producer to use.
        default_config: A dictionary of default configuration values.
        override_config: A dictionary of configuration values to override.
        bootstrap_servers: Bootstrap servers to connect the producer to.
        producers_list: A list of producers to add the new producer to.

    Returns:
        A producer.
    """

    config = {
        **filter_using_signature(AIOKafkaProducer, **default_config),
        **filter_using_signature(AIOKafkaProducer, **override_config),
        **{"bootstrap_servers": bootstrap_servers},
    }

    producer = AIOKafkaProducer(**config)
    logger.info(
        f"_create_producer() : created producer using the config: '{sanitize_kafka_config(**config)}'"
    )

    await producer.start()

    producers_list.append(producer)

    return producer


@patch
async def _populate_producers(self: FastKafka) -> None:
    """Populates the producers for the FastKafka instance.

    Args:
        self: The FastKafka instance.

    Returns:
        None.

    Raises:
        None.
    """
    default_config: Dict[str, Any] = self._kafka_config
    bootstrap_server = default_config["bootstrap_servers_id"]

    self._producers_list = []
    self._producers_store.update(
        {
            topic: (
                callback,
                await _create_producer(
                    callback=callback,
                    default_config=default_config,
                    override_config=override_config,
                    bootstrap_servers=_get_broker_addr_list(
                        kafka_brokers.brokers[bootstrap_server]
                        if kafka_brokers is not None
                        else self._kafka_brokers.brokers[bootstrap_server]
                    ),
                    producers_list=self._producers_list,
                ),
                kafka_brokers,
                override_config,
            )
            for topic, (
                callback,
                _,
                kafka_brokers,
                override_config,
            ) in self._producers_store.items()
        }
    )


@patch
async def _shutdown_producers(self: FastKafka) -> None:
    [await producer.stop() for producer in self._producers_list[::-1]]
    # Remove references to stale producers
    self._producers_list = []
    self._producers_store.update(
        {
            topic: (
                callback,
                None,
                kafka_brokers,
                override_config,
            )
            for topic, (
                callback,
                _,
                kafka_brokers,
                override_config,
            ) in self._producers_store.items()
        }
    )

In [None]:
async with ApacheKafkaBroker() as bootstrap_server:
    async with ApacheKafkaBroker() as override_bootstrap_server:
        app = setup_testing_app(
            bootstrap_servers=bootstrap_server,
            override_bootstrap_servers=override_bootstrap_server,
        )
        print(app._producers_store)
        await app._populate_producers()
        print(app._producers_store)
        assert len(app._producers_list) == 3
        print(app._producers_list)
        await app._shutdown_producers()

        # One more time for reentrancy
        await app._populate_producers()
        assert len(app._producers_list) == 3
        print(app._producers_list)
        await app._shutdown_producers()

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092
[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup failed, generating a new port and retrying...
[INFO] fastkafka._testing.apache_kafka_broker: zookeeper new port=34071
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
stdout=, stderr=, returncode=1
[INFO] fastkafka._testing.apache_kafka_broker: kafka startup failed, generating a new port and retryin

In [None]:
# | export


@patch
async def _populate_bg_tasks(
    self: FastKafka,
) -> None:
    def _start_bg_task(task: Callable[..., Coroutine[Any, Any, Any]]) -> asyncio.Task:
        logger.info(
            f"_populate_bg_tasks() : Starting background task '{task.__name__}'"
        )
        return asyncio.create_task(task(), name=task.__name__)

    self._running_bg_tasks = [_start_bg_task(task) for task in self._scheduled_bg_tasks]


@patch
async def _shutdown_bg_tasks(
    self: FastKafka,
) -> None:
    for task in self._running_bg_tasks:
        logger.info(
            f"_shutdown_bg_tasks() : Cancelling background task '{task.get_name()}'"
        )
        task.cancel()

    for task in self._running_bg_tasks:
        logger.info(
            f"_shutdown_bg_tasks() : Waiting for background task '{task.get_name()}' to finish"
        )
        try:
            await task
        except asyncio.CancelledError:
            pass
        logger.info(
            f"_shutdown_bg_tasks() : Execution finished for background task '{task.get_name()}'"
        )

In [None]:
async with ApacheKafkaBroker() as bootstrap_server:
    app = setup_testing_app(bootstrap_servers=bootstrap_server)

    @app.run_in_background()
    async def long_bg_job():
        logger.debug(f"new_long_bg_job()")
        await asyncio.sleep(100)

    await app._populate_bg_tasks()
    assert len(app._scheduled_bg_tasks) == 2
    assert len(app._running_bg_tasks) == 2
    await app._shutdown_bg_tasks()

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092
[INFO] __main__: run_in_background() : Adding function 'long_bg_job' as background task
[INFO] __main__: run_in_background() : Adding function 'long_bg_job' as background task
[INFO] __main__: _populate_bg_tasks() : Starting background task 'long_bg_job'
[INFO] __main__: _populate_bg_tasks() : Starting background task 'long_bg_job'
[INFO] __main__: _shutdown_bg_tasks() : Cancelling background task 'long_bg_job'
[INFO] __main__: _shutdown_bg_tasks() : Cancelling background task 'long_bg_job'
[INFO] __main__: _shutdown_bg_tasks() : Waiting for background task 'long_bg_job' to finish
[INFO] __main__: _shutdown_bg_tasks(

In [None]:
# | export


@patch
async def _start(self: FastKafka) -> None:
    def is_shutting_down_f(self: FastKafka = self) -> bool:
        return self._is_shutting_down

    #     self.create_docs()
    await self._populate_producers()
    self._populate_consumers(is_shutting_down_f)
    await self._populate_bg_tasks()

    self._is_started = True


@patch
async def _stop(self: FastKafka) -> None:
    self._is_shutting_down = True

    await self._shutdown_bg_tasks()
    await self._shutdown_consumers()
    await self._shutdown_producers()

    self._is_shutting_down = False
    self._is_started = False

In [None]:
# Test app reentrancy

async with ApacheKafkaBroker() as bootstrap_server:
    with mock_AIOKafkaProducer_send() as mock:
        app = create_testing_app(bootstrap_servers=bootstrap_server)

        @app.produces()
        async def to_my_test_topic(mobile: str, url: str) -> MyMsgUrl:
            msg = MyMsgUrl(info=dict(mobile=mobile, name="James Bond"), url=url)
            return msg

        try:
            await app._start()
            await app.to_my_test_topic(mobile="+385912345678", url="https://www.vip.hr")
        finally:
            await app._stop()

        try:
            await app._start()
            await app.to_my_test_topic(mobile="+385987654321", url="https://www.ht.hr")
        finally:
            await app._stop()

        mock.assert_has_calls(
            [
                unittest.mock.call(
                    "my_test_topic",
                    b'{"info": {"mobile": "+385912345678", "name": "James Bond"}, "url": "https://www.vip.hr"}',
                    key=None,
                ),
                unittest.mock.call(
                    "my_test_topic",
                    b'{"info": {"mobile": "+385987654321", "name": "James Bond"}, "url": "https://www.ht.hr"}',
                    key=None,
                ),
            ]
        )

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092
[INFO] __main__: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'
[INFO] __main__: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 60031...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 60031 terminated.
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 59671...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 59671 term

In [None]:
# mock up send method of AIOKafkaProducer
async with ApacheKafkaBroker() as bootstrap_server:
    with mock_AIOKafkaProducer_send() as mock:
        app = create_testing_app(bootstrap_servers=bootstrap_server)

        @app.produces()
        async def to_my_test_topic(mobile: str, url: str) -> MyMsgUrl:
            msg = MyMsgUrl(info=dict(mobile=mobile, name="James Bond"), url=url)
            return msg

        @app.produces()
        async def to_my_test_topic_2(mobile: str, url: str) -> MyMsgUrl:
            msg = MyMsgUrl(info=dict(mobile=mobile, name="James Bond"), url=url)
            return msg

        try:
            await app._start()
            await to_my_test_topic(mobile="+385912345678", url="https://www.vip.hr")
            await to_my_test_topic_2(mobile="+385987654321", url="https://www.ht.hr")
        finally:
            await app._stop()

        mock.assert_has_calls(
            [
                unittest.mock.call(
                    "my_test_topic",
                    b'{"info": {"mobile": "+385912345678", "name": "James Bond"}, "url": "https://www.vip.hr"}',
                    key=None,
                ),
                unittest.mock.call(
                    "my_test_topic_2",
                    b'{"info": {"mobile": "+385987654321", "name": "James Bond"}, "url": "https://www.ht.hr"}',
                    key=None,
                ),
            ]
        )

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092
[INFO] __main__: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'
[INFO] __main__: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 60826...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 60826 terminated.
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 60466...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 60466 term

In [None]:
async with ApacheKafkaBroker() as bootstrap_server:
    app = create_testing_app(bootstrap_servers=bootstrap_server)
    fast_task = unittest.mock.Mock()
    long_task = unittest.mock.Mock()

    @app.run_in_background()
    async def bg_task():
        fast_task()
        await asyncio.sleep(100)
        long_task()

    fast_task_second = unittest.mock.Mock()
    long_task_second = unittest.mock.Mock()

    @app.run_in_background()
    async def bg_task_second():
        fast_task_second()
        await asyncio.sleep(100)
        long_task_second()

    try:
        await app._start()
        await asyncio.sleep(5)
    finally:
        await app._stop()

    fast_task.assert_called()
    long_task.assert_not_called()

    fast_task_second.assert_called()
    long_task_second.assert_not_called()

print("ok")

[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092
[INFO] __main__: run_in_background() : Adding function 'bg_task' as background task
[INFO] __main__: run_in_background() : Adding function 'bg_task_second' as background task
[INFO] __main__: _populate_bg_tasks() : Starting background task 'bg_task'
[INFO] __main__: _populate_bg_tasks() : Starting background task 'bg_task_second'
[INFO] __main__: _shutdown_bg_tasks() : Cancelling background task 'bg_task'
[INFO] __main__: _shutdown_bg_tasks() : Cancelling background task 'bg_task_second'
[INFO] __main__: _shutdown_bg_tasks() : Waiting for background task 'bg_task' to finish
[INFO] __main__: _shutdown_bg_tasks() : Exe

In [None]:
# test lifespan hook

global_dict = {}


@asynccontextmanager
async def lifespan(app: FastKafka):
    try:
        global_dict["set_var"] = 123
        global_dict["app"] = app
        yield
    finally:
        global_dict["set_var"] = 321


with ApacheKafkaBroker(apply_nest_asyncio=True) as bootstrap_servers:
    host, port = bootstrap_servers.split(":")

    kafka_app = FastKafka(
        kafka_brokers={
            "localhost": {
                "url": host if host is not None else "localhost",
                "name": "development",
                "description": "Local (dev) Kafka broker",
                "port": port if port is not None else 9092,
            }
        },
        root_path="/tmp/000_FastKafka",
        lifespan=lifespan,
    )

    kafka_app.set_kafka_broker(kafka_broker_name="localhost")

    # Dict unchanged
    assert global_dict == {}

    async with kafka_app:
        # Lifespan aenter triggered
        assert global_dict["set_var"] == 123
        # Kafka app reference passed
        assert global_dict["app"] == kafka_app

    # Lifespan aexit triggered
    assert global_dict["set_var"] == 321

[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...
[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092
[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092
[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.
[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 62416...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 62416 terminated.
[INFO] fastkafka._components._subprocess: t

## Documentation generation

In [None]:
# | export


@patch
def create_docs(self: FastKafka) -> None:
    """
    Create the asyncapi documentation based on the configured consumers and producers.

    This function exports the asyncapi specification based on the configured consumers
    and producers in the FastKafka instance. It generates the asyncapi documentation by
    extracting the topics and callbacks from the consumers and producers.

    Note:
        The asyncapi documentation is saved to the location specified by the `_asyncapi_path`
        attribute of the FastKafka instance.

    Returns:
        None
    """
    export_async_spec(
        consumers={
            remove_suffix(topic) if topic.endswith("_0") else topic: callback
            for topic, (callback, _, _, _, _) in self._consumers_store.items()
        },
        producers={
            remove_suffix(topic) if topic.endswith("_0") else topic: callback
            for topic, (callback, _, _, _) in self._producers_store.items()
        },
        kafka_brokers=self._kafka_brokers,
        kafka_service_info=self._kafka_service_info,
        asyncapi_path=self._asyncapi_path,
    )

In [None]:
expected = """asyncapi: 2.5.0
channels:
  my_topic_1:
    subscribe:
      message:
        $ref: '#/components/messages/MyMsgUrl'
      description: "Consumer description"
  my_topic_1_1:
    subscribe:
      message:
        $ref: '#/components/messages/MyMsgEmail'
  my_topic_3:
    publish:
      message:
        $ref: '#/components/messages/MyMsgUrl'
      description: "Producer description"
  my_topic_4:
    publish:
      message:
        $ref: '#/components/messages/MyMsgEmail'
  my_topic_4_1:
    publish:
      message:
        $ref: '#/components/messages/MyMsgUrl'
components:
  messages:
    MyMsgEmail:
      payload:
        example:
          email: agent-007@sis.gov.uk
          msg_url:
            info:
              mobile: '+385987654321'
              name: James Bond
            url: https://sis.gov.uk/agents/007
        properties:
          email:
            example: agent-007@sis.gov.uk
            format: email
            title: Email
            type: string
          msg_url:
            allOf:
            - $ref: '#/components/messages/MyMsgUrl'
            example:
              info:
                mobile: '+385987654321'
                name: James Bond
              url: https://sis.gov.uk/agents/007
            title: Msg Url
        required:
        - msg_url
        - email
        title: MyMsgEmail
        type: object
    MyMsgUrl:
      payload:
        example:
          info:
            mobile: '+385987654321'
            name: James Bond
          url: https://sis.gov.uk/agents/007
        properties:
          info:
            allOf:
            - $ref: '#/components/schemas/MyInfo'
            example:
              mobile: '+385987654321'
              name: James Bond
            title: Info
          url:
            example: https://sis.gov.uk/agents/007
            format: uri
            maxLength: 2083
            minLength: 1
            title: Url
            type: string
        required:
        - info
        - url
        title: MyMsgUrl
        type: object
  schemas:
    MyInfo:
      payload:
        properties:
          mobile:
            example: '+385987654321'
            title: Mobile
            type: string
          name:
            example: James Bond
            title: Name
            type: string
        required:
        - mobile
        - name
        title: MyInfo
        type: object
  securitySchemes: {}
info:
  contact:
    email: noreply@gmail.com
    name: Author
    url: https://www.google.com
  description: ''
  title: ''
  version: ''
servers:
  localhost:
    description: Local (dev) Kafka broker
    protocol: kafka
    url: localhost
    variables:
      port:
        default: '9092'
"""

In [None]:
d1, d2 = None, None

docs_path = Path("/tmp/000_FastKafka/asyncapi/spec/asyncapi.yml")
if docs_path.exists():
    os.remove(docs_path)


async def test_me():
    global d1
    global d2
    app = setup_testing_app()
    app.create_docs()
    with open(docs_path) as specs:
        d1 = yaml.safe_load(specs)
        d2 = yaml.safe_load(expected)
        assert d1 == d2, f"{d1} != {d2}"


asyncio.run(test_me())
print("ok")

[INFO] __main__: run_in_background() : Adding function 'long_bg_job' as background task
[INFO] fastkafka._components.asyncapi: Old async specifications at '/tmp/000_FastKafka/asyncapi/spec/asyncapi.yml' does not exist.
[INFO] fastkafka._components.asyncapi: New async specifications generated at: '/tmp/000_FastKafka/asyncapi/spec/asyncapi.yml'
[INFO] fastkafka._components.asyncapi: Async docs generated at '/tmp/000_FastKafka/asyncapi/docs'
[INFO] fastkafka._components.asyncapi: Output of '$ npx -y -p @asyncapi/generator ag /tmp/000_FastKafka/asyncapi/spec/asyncapi.yml @asyncapi/html-template -o /tmp/000_FastKafka/asyncapi/docs --force-write'[32m

Done! ✨[0m
[33mCheck out your shiny new generated files at [0m[35m/tmp/000_FastKafka/asyncapi/docs[0m[33m.[0m


ok


## App mocks

In [None]:
# | export


class AwaitedMock:
    """
    Class representing an awaited mock object.

    Args:
        o: The original object to be wrapped.
    """
    @staticmethod
    def _await_for(f: Callable[..., Any]) -> Callable[..., Any]:
        @delegates(f)
        async def inner(
            *args: Any, f: Callable[..., Any] = f, timeout: int = 60, **kwargs: Any
        ) -> Any:
            """
            Decorator to await the execution of a function.

            Args:
                f: The function to be wrapped.

            Returns:
                The wrapped function.
            """
            if inspect.iscoroutinefunction(f):
                return await asyncio.wait_for(f(*args, **kwargs), timeout=timeout)
            else:
                t0 = datetime.now()
                e: Optional[Exception] = None
                while True:
                    try:
                        return f(*args, **kwargs)
                    except Exception as _e:
                        await asyncio.sleep(1)
                        e = _e

                    if datetime.now() - t0 > timedelta(seconds=timeout):
                        break

                raise e

        return inner

    def __init__(self, o: Any):
        """
        Initializes an instance of AwaitedMock.

        Args:
            o: The original object to be wrapped.
        """
        self._o = o

        for name in o.__dir__():
            if not name.startswith("_"):
                f = getattr(o, name)
                if inspect.ismethod(f):
                    setattr(self, name, self._await_for(f))

In [None]:
# | export


@patch
def create_mocks(self: FastKafka) -> None:
    """Creates self.mocks as a named tuple mapping a new function obtained by calling the original functions and a mock"""
    app_methods = [f for f, _, _, _, _ in self._consumers_store.values()] + [
        f for f, _, _, _ in self._producers_store.values()
    ]
    self.AppMocks = namedtuple(  # type: ignore
        f"{self.__class__.__name__}Mocks", [f.__name__ for f in app_methods]
    )

    self.mocks = self.AppMocks(  # type: ignore
        **{
            f.__name__: AsyncMock() if inspect.iscoroutinefunction(f) else MagicMock()
            for f in app_methods
        }
    )

    self.awaited_mocks = self.AppMocks(  # type: ignore
        **{name: AwaitedMock(mock) for name, mock in self.mocks._asdict().items()}
    )

    def add_mock(
        f: Callable[..., Any], mock: Union[AsyncMock, MagicMock]
    ) -> Callable[..., Any]:
        """Add call to mock when calling function f"""

        @functools.wraps(f)
        async def async_inner(
            *args: Any, f: Callable[..., Any] = f, mock: AsyncMock = mock, **kwargs: Any
        ) -> Any:
            await mock(*args, **kwargs)
            return await f(*args, **kwargs)

        @functools.wraps(f)
        def sync_inner(
            *args: Any, f: Callable[..., Any] = f, mock: MagicMock = mock, **kwargs: Any
        ) -> Any:
            mock(*args, **kwargs)
            return f(*args, **kwargs)

        if inspect.iscoroutinefunction(f):
            return async_inner
        else:
            return sync_inner

    self._consumers_store.update(
        {
            name: (
                add_mock(f, getattr(self.mocks, f.__name__)),
                decoder_fn,
                executor,
                kafka_brokers,
                kwargs,
            )
            for name, (
                f,
                decoder_fn,
                executor,
                kafka_brokers,
                kwargs,
            ) in self._consumers_store.items()
        }
    )

    self._producers_store.update(
        {
            name: (
                add_mock(f, getattr(self.mocks, f.__name__)),
                producer,
                kafka_brokers,
                kwargs,
            )
            for name, (
                f,
                producer,
                kafka_brokers,
                kwargs,
            ) in self._producers_store.items()
        }
    )

In [None]:
class TestMsg(BaseModel):
    msg: str = Field(...)


app = FastKafka(kafka_brokers=dict(localhost=dict(url="localhost", port=9092)))


@app.consumes()
async def on_preprocessed_signals(msg: TestMsg):
    await to_predictions(TestMsg(msg="prediction"))


@app.consumes(
    topic="preprocessed_signals",
    brokers=dict(
        localhost=[
            dict(url="localhost", port=9092),
            dict(url="localhost", port=9093),
        ]
    ),
)
async def on_preprocessed_signals_second(msg: TestMsg):
    await to_predictions(TestMsg(msg="prediction"))


@app.produces()
async def to_predictions(prediction: TestMsg) -> TestMsg:
    print(f"Sending prediction: {prediction}")
    return prediction

In [None]:
app.create_mocks()
app.mocks.on_preprocessed_signals.assert_not_awaited()
app.mocks.on_preprocessed_signals_second.assert_not_awaited()
app.mocks.to_predictions.assert_not_awaited()
app.create_mocks()
app.mocks.on_preprocessed_signals.assert_not_awaited()
app.mocks.on_preprocessed_signals_second.assert_not_awaited()
app.mocks.to_predictions.assert_not_awaited()

In [None]:
with pytest.raises(AssertionError) as e:
    await app.awaited_mocks.on_preprocessed_signals.assert_called_with(123, timeout=2)

In [None]:
app.create_mocks()
app.mocks.on_preprocessed_signals.assert_not_awaited()
await app.awaited_mocks.on_preprocessed_signals.assert_not_awaited(timeout=3)

In [None]:
# | export


@patch
def benchmark(
    self: FastKafka,
    interval: Union[int, timedelta] = 1,
    *,
    sliding_window_size: Optional[int] = None,
) -> Callable[[Callable[[I], Optional[O]]], Callable[[I], Optional[O]]]:
    """Decorator to benchmark produces/consumes functions

    Args:
        interval: Period to use to calculate throughput. If value is of type int,
            then it will be used as seconds. If value is of type timedelta,
            then it will be used as it is. default: 1 - one second
        sliding_window_size: The size of the sliding window to use to calculate
            average throughput. default: None - By default average throughput is
            not calculated
    """

    def _decorator(func: Callable[[I], Optional[O]]) -> Callable[[I], Optional[O]]:
        func_name = f"{func.__module__}.{func.__qualname__}"

        @wraps(func)
        def wrapper(
            *args: I,
            **kwargs: I,
        ) -> Optional[O]:
            _benchmark(
                interval=interval,
                sliding_window_size=sliding_window_size,
                func_name=func_name,
                benchmark_results=self.benchmark_results,
            )
            return func(*args, **kwargs)

        @wraps(func)
        async def async_wrapper(
            *args: I,
            **kwargs: I,
        ) -> Optional[O]:
            _benchmark(
                interval=interval,
                sliding_window_size=sliding_window_size,
                func_name=func_name,
                benchmark_results=self.benchmark_results,
            )
            return await func(*args, **kwargs)  # type: ignore

        if inspect.iscoroutinefunction(func):
            return async_wrapper  # type: ignore
        else:
            return wrapper

    return _decorator

In [None]:
for executor in ["SequentialExecutor", "DynamicTaskExecutor"]:

    class TestMsg(BaseModel):
        msg: str = Field(...)

    app = FastKafka(kafka_brokers=dict(localhost=dict(url="localhost", port=9092)))
    # app.benchmark_results["test"] = dict(count=0)

    @app.consumes(executor=executor)
    @app.benchmark(interval=1, sliding_window_size=5)
    async def on_preprocessed_signals(msg: TestMsg):
        await to_predictions(TestMsg(msg="prediction"))

    @app.produces()
    @app.benchmark(interval=1, sliding_window_size=5)
    async def to_predictions(prediction: TestMsg) -> TestMsg:
        #         print(f"Sending prediction: {prediction}")
        return prediction

    async with Tester(app).using_local_kafka() as tester:
        for i in range(10_000):
            await tester.to_preprocessed_signals(TestMsg(msg=f"signal {i}"))
        print("Hello I am over after 10k msgs")
        await asyncio.sleep(5)
        tester.mocks.on_predictions.assert_called()

print("ok")

In [None]:
# | export


@patch
def fastapi_lifespan(
    self: FastKafka, kafka_broker_name: str
) -> Callable[["FastAPI"], AsyncIterator[None]]:
    """
    Method for managing the lifespan of a FastAPI application with a specific Kafka broker.

    Args:
        kafka_broker_name: The name of the Kafka broker to start FastKafka

    Returns:
        Lifespan function to use for initializing FastAPI
    """

    @asynccontextmanager
    async def lifespan(fastapi_app: "FastAPI") -> AsyncIterator[None]:
        self.set_kafka_broker(kafka_broker_name=kafka_broker_name)
        async with self:
            yield

    return lifespan  # type: ignore

In [None]:
class TestMsg(BaseModel):
    msg: str = Field(...)


app_for_tester = FastKafka(
    kafka_brokers=dict(localhost=dict(url="localhost", port=9092)),
    group_id="app_for_tester_group",
)


@app_for_tester.consumes(topic="preprocessed_signals")
async def on_app_for_tester_preprocessed_signals(msg: TestMsg):
    print("receving messages on app_for_tester")
    await to_app_for_tester_predictions(TestMsg(msg="prediction"))


@app_for_tester.produces(topic="predictions")
async def to_app_for_tester_predictions(prediction: TestMsg) -> TestMsg:
    print("sending predictions on app_for_tester")
    return prediction


def create_app_for_fastapi(port: int):
    app_for_fastapi = FastKafka(
        kafka_brokers=dict(localhost=dict(url="localhost", port=port)),
        group_id="app_for_fastapi_group",
    )

    @app_for_fastapi.consumes(topic="preprocessed_signals")
    async def on_app_for_fastapi_preprocessed_signals(msg: TestMsg):
        print("receving messages on app_for_fastapi")
        await to_app_for_fastapi_predictions(TestMsg(msg="prediction"))

    @app_for_fastapi.produces(topic="predictions")
    async def to_app_for_fastapi_predictions(prediction: TestMsg) -> TestMsg:
        print("sending predictions on app_for_fastapi")
        return prediction

    return app_for_fastapi

In [None]:
def run_uvicorn():
    fastapi_app = FastAPI(lifespan=app_for_fastapi.fastapi_lifespan("localhost"))

    @fastapi_app.get("/predict")
    async def predict():
        return {"result": "hello"}

    uvicorn.run(
        fastapi_app,
        host="0.0.0.0",
        port=8000,
        reload=False,
        log_level="debug",
        workers=1,
    )


async with Tester(app_for_tester).using_local_kafka() as tester:
    port = tester.broker.kafka_kwargs["listener_port"]
    app_for_fastapi = create_app_for_fastapi(port=port)
    with run_in_process(run_uvicorn) as p:
        await asyncio.sleep(3)
        res = requests.get("http://127.0.0.1:8000/predict")
        assert res.ok

        await tester.to_preprocessed_signals(TestMsg(msg=f"signal 10"))
        await asyncio.sleep(3)
        assert (
            tester.mocks.on_predictions.call_count == 2
        ), tester.mocks.on_predictions.call_count

    p.close()

print("ok")