From 66d2e9cce3af4b412ce34558e727ec89aea51aaa Mon Sep 17 00:00:00 2001 From: Pastukhov Nikita Date: Tue, 23 May 2023 16:59:12 +0300 Subject: [PATCH] Kafka 1.2.0 --- .github/workflows/tests.yml | 16 ++ CONTRIBUTING.md | 16 ++ README.md | 7 +- docs/docs/en/CHANGELOG.md | 16 +- .../en/contributing/2_contributing-index.md | 16 ++ docs/docs/en/index.md | 7 +- docs/docs/ru/CHANGELOG.md | 12 ++ .../ru/contributing/2_contributing-index.md | 16 ++ docs/docs/ru/index.md | 7 +- docs/docs_src/contributing/adapter/parent.py | 2 +- propan/__about__.py | 12 +- propan/__init__.py | 17 +- propan/annotations.py | 24 ++- propan/brokers/kafka/__init__.py | 3 + propan/brokers/kafka/kafka_broker.py | 204 ++++++++++++++++++ propan/brokers/kafka/kafka_broker.pyi | 191 ++++++++++++++++ propan/brokers/kafka/schemas.py | 17 ++ propan/cli/startproject/async_app/app.py | 8 + propan/cli/startproject/async_app/core.py | 2 +- propan/cli/startproject/async_app/kafka.py | 92 ++++++++ propan/fastapi/__init__.py | 11 +- propan/fastapi/kafka/__init__.py | 3 + propan/fastapi/kafka/router.py | 7 + propan/fastapi/kafka/router.pyi | 170 +++++++++++++++ propan/test/__init__.py | 10 + propan/test/kafka.py | 93 ++++++++ pyproject.toml | 7 + tests/brokers/base/__init__.py | 0 tests/brokers/kafka/__init__.py | 0 tests/brokers/kafka/conftest.py | 36 ++++ tests/brokers/kafka/test_connect.py | 19 ++ tests/brokers/kafka/test_consume.py | 77 +++++++ tests/brokers/kafka/test_publish.py | 35 +++ tests/brokers/kafka/test_test_client.py | 49 +++++ .../rabbit/{test_acc.py => test_consume.py} | 0 tests/brokers/redis/__init__.py | 0 .../redis/{test_acc.py => test_consume.py} | 0 tests/cli/conftest.py | 7 + tests/cli/supervisors/__init__.py | 0 tests/cli/test_run.py | 12 ++ tests/fastapi/__init__.py | 0 tests/fastapi/test_app.py | 84 ++++---- tests/log/__init__.py | 0 tests/utils/type_cast/__init__.py | 0 44 files changed, 1229 insertions(+), 76 deletions(-) create mode 100644 propan/brokers/kafka/__init__.py create mode 100644 propan/brokers/kafka/kafka_broker.py create mode 100644 propan/brokers/kafka/kafka_broker.pyi create mode 100644 propan/brokers/kafka/schemas.py create mode 100644 propan/cli/startproject/async_app/kafka.py create mode 100644 propan/fastapi/kafka/__init__.py create mode 100644 propan/fastapi/kafka/router.py create mode 100644 propan/fastapi/kafka/router.pyi create mode 100644 propan/test/kafka.py create mode 100644 tests/brokers/base/__init__.py create mode 100644 tests/brokers/kafka/__init__.py create mode 100644 tests/brokers/kafka/conftest.py create mode 100644 tests/brokers/kafka/test_connect.py create mode 100644 tests/brokers/kafka/test_consume.py create mode 100644 tests/brokers/kafka/test_publish.py create mode 100644 tests/brokers/kafka/test_test_client.py rename tests/brokers/rabbit/{test_acc.py => test_consume.py} (100%) create mode 100644 tests/brokers/redis/__init__.py rename tests/brokers/redis/{test_acc.py => test_consume.py} (100%) create mode 100644 tests/cli/supervisors/__init__.py create mode 100644 tests/fastapi/__init__.py create mode 100644 tests/log/__init__.py create mode 100644 tests/utils/type_cast/__init__.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b189aeae..110f16ca 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -30,6 +30,22 @@ jobs: image: nats ports: - 4222:4222 + + kafka: + image: bitnami/kafka + ports: + - 9092:9092 + env: + KAFKA_ENABLE_KRAFT: "true" + KAFKA_CFG_NODE_ID: "1" + KAFKA_CFG_PROCESS_ROLES: "broker,controller" + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" + KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://127.0.0.1:9092" + KAFKA_BROKER_ID: "1" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093" + ALLOW_PLAINTEXT_LISTENER: "true" steps: - uses: actions/checkout@v3 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 57b8f8ce..97c1385c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -105,6 +105,22 @@ services: ports: - 4222:4222 - 8222:8222 # management + + kafka: + image: bitnami/kafka + ports: + - 9092:9092 + environment: + - KAFKA_ENABLE_KRAFT=yes + - KAFKA_CFG_NODE_ID=1 + - KAFKA_CFG_PROCESS_ROLES=broker,controller + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - ALLOW_PLAINTEXT_LISTENER=yes ``` ```bash diff --git a/README.md b/README.md index 4c2c5723..c86f6b51 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,8 @@ It is a modern, high-level framework on top of popular specific Python brokers l * **MQ independent**: Single interface to popular MQ: * **Redis** (based on redis-py) * **RabbitMQ** (based on aio-pika) - * **NATS** (based on nats-py) + * **Kafka** (based on aiokafka) + * **Nats** (based on nats-py) * **RPC**: The framework supports RPC requests over MQ, which will allow performing long operations on remote services asynchronously. * [**Great to develop**](#cli-power): CLI tool provides great development experience: * framework-independent way to manage the project environment @@ -63,10 +64,10 @@ It is a modern, high-level framework on top of popular specific Python brokers l |-------------------|:-------------------------------------------------------:|:--------------------:| | **RabbitMQ** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: | | **Redis** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: | +| **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: | | **Nats** | :warning: **beta** :warning: | :mag: planning :mag: | | **NatsJS** | :hammer_and_wrench: **in progress** :hammer_and_wrench: | :mag: planning :mag: | | **MQTT** | :mag: planning :mag: | :mag: planning :mag: | -| **Kafka** | :mag: planning :mag: | :mag: planning :mag: | | **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: | | **Pulsar** | :mag: planning :mag: | :mag: planning :mag: | | **SQS** | :mag: planning :mag: | :mag: planning :mag: | @@ -141,6 +142,8 @@ pip install "propan[async-rabbit]" pip install "propan[async-nats]" # or pip install "propan[async-redis]" +# or +pip install "propan[async-kafka]" ``` ### Basic usage diff --git a/docs/docs/en/CHANGELOG.md b/docs/docs/en/CHANGELOG.md index f49d9ded..881c0283 100644 --- a/docs/docs/en/CHANGELOG.md +++ b/docs/docs/en/CHANGELOG.md @@ -1,8 +1,22 @@ # CHANGELOG +## 2023-05-23 **0.1.2.0** Kafka + +**Propan** added support for *Kafka* as a message broker. This functionality is full tested. + +*KafkaBroker* supports: + +* message delivery +* test client, without the need to run *Kafka* +* *FastAPI* Plugin + +*KafkaBroker* not supports **RPC** yet. + +Also, **Propan CLI** is able to generate templates to any supported broker + ## 2023-05-18 **0.1.1.0** REDIS -**Propan** added support for *Redis Pub/Sub* as a message broker. This functionality is fully tested and described in the documentation. +**Propan** added support for *Redis Pub/Sub* as a message broker. This functionality is full tested and described in the documentation. *RedisBroker* supports: diff --git a/docs/docs/en/contributing/2_contributing-index.md b/docs/docs/en/contributing/2_contributing-index.md index 3cf3eec2..a61bcded 100644 --- a/docs/docs/en/contributing/2_contributing-index.md +++ b/docs/docs/en/contributing/2_contributing-index.md @@ -105,6 +105,22 @@ services: ports: - 4222:4222 - 8222:8222 # management + + kafka: + image: bitnami/kafka + ports: + - 9092:9092 + environment: + - KAFKA_ENABLE_KRAFT=yes + - KAFKA_CFG_NODE_ID=1 + - KAFKA_CFG_PROCESS_ROLES=broker,controller + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - ALLOW_PLAINTEXT_LISTENER=yes ``` ```bash diff --git a/docs/docs/en/index.md b/docs/docs/en/index.md index 111b3f92..f13a700b 100644 --- a/docs/docs/en/index.md +++ b/docs/docs/en/index.md @@ -43,7 +43,8 @@ It is a modern, high-level framework on top of popular Python libraries for vari * **MQ independent**: Single interface to popular MQ: * **Redis** (based on [redis-py]("https://redis.readthedocs.io/en/stable/index.html"){target="_blank"}) * **RabbitMQ** (based on [aio-pika](https://aio-pika.readthedocs.io/en/latest/){target="_blank"}) - * **NATS** (based on [nats-py](https://github.com/nats-io/nats.py){target="_blank"}) + * **Kafka** (based on [aiokafka](https://aiokafka.readthedocs.io/en/stable/){target="_blank"}) + * **Nats** (based on [nats-py](https://github.com/nats-io/nats.py){target="_blank"}) * [**RPC**](getting_started/4_broker/5_rpc/): The framework supports RPC requests on top of message brokers, which will allow performing long operations on remote services asynchronously. * [**Great to develop**](getting_started/2_cli/): CLI tool provides great development experience: * framework-independent way to manage the project environment @@ -57,7 +58,7 @@ It is a modern, high-level framework on top of popular Python libraries for vari With declarative tools you can define **what you need to get**. With traditional imperative tools you must write **what you need to do**. -Take a look at classic imperative tools, such as [aio-pika](https://aio-pika.readthedocs.io/en/latest/){target="_blank"}, [pika](https://pika.readthedocs.io/en/stable/){target="_blank"}, [redis-py]("https://redis.readthedocs.io/en/stable/index.html"){target="_blank"}, [nats-py](https://github.com/nats-io/nats.py){target="_blank"}, etc. +Take a look at classic imperative tools, such as [aio-pika](https://aio-pika.readthedocs.io/en/latest/){target="_blank"}, [pika](https://pika.readthedocs.io/en/stable/){target="_blank"}, [redis-py]("https://redis.readthedocs.io/en/stable/index.html"){target="_blank"}, [nats-py](https://github.com/nats-io/nats.py){target="_blank"}, [aiokafka](https://aiokafka.readthedocs.io/en/stable/){target="_blank"}, etc. This is the **Quickstart** with the *aio-pika*: @@ -114,10 +115,10 @@ This is the **Propan** declarative way to write the same code. That is so much e |-------------------|:-------------------------------------------------------:|:--------------------:| | **RabbitMQ** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: | | **Redis** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: | +| **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: | | **Nats** | :warning: **beta** :warning: | :mag: planning :mag: | | **NatsJS** | :hammer_and_wrench: **in progress** :hammer_and_wrench: | :mag: planning :mag: | | **MQTT** | :mag: planning :mag: | :mag: planning :mag: | -| **Kafka** | :mag: planning :mag: | :mag: planning :mag: | | **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: | | **Pulsar** | :mag: planning :mag: | :mag: planning :mag: | | **SQS** | :mag: planning :mag: | :mag: planning :mag: | diff --git a/docs/docs/ru/CHANGELOG.md b/docs/docs/ru/CHANGELOG.md index 44aed75b..6a975640 100644 --- a/docs/docs/ru/CHANGELOG.md +++ b/docs/docs/ru/CHANGELOG.md @@ -1,5 +1,17 @@ # CHANGELOG +## 2023-05-23 **0.1.2.0** Kafka + +В **Propan** добавлена поддержка *Kafka* в качестве брокера сообщений. Данный функционал полностью протестирован. + +*KafkaBroker* поддерживает: + +* доставку сообщений +* тестовый клиент, без необходимости запуска *Kafka* +* В качестве плагина *FastAPI* + +*KafkaBroker* на данный момент не поддерживает **RPC** запросы. + ## 2023-05-18 **0.1.1.0** REDIS В **Propan** добавлена поддержка *Redis Pub/Sub* в качестве брокера сообщений. Данный функционал полностью протестирован и описан в документации. diff --git a/docs/docs/ru/contributing/2_contributing-index.md b/docs/docs/ru/contributing/2_contributing-index.md index c6120251..efb26cb0 100644 --- a/docs/docs/ru/contributing/2_contributing-index.md +++ b/docs/docs/ru/contributing/2_contributing-index.md @@ -114,6 +114,22 @@ services: ports: - 4222:4222 - 8222:8222 # management + + kafka: + image: bitnami/kafka + ports: + - 9092:9092 + environment: + - KAFKA_ENABLE_KRAFT=yes + - KAFKA_CFG_NODE_ID=1 + - KAFKA_CFG_PROCESS_ROLES=broker,controller + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - ALLOW_PLAINTEXT_LISTENER=yes ``` #### Hatch diff --git a/docs/docs/ru/index.md b/docs/docs/ru/index.md index ee604093..80a0e800 100644 --- a/docs/docs/ru/index.md +++ b/docs/docs/ru/index.md @@ -42,8 +42,9 @@ * [**Интeграция**](getting_started/1_quick-start/#http): Propan полностью совместим с [любыми HTTP фреймворками](integrations/1_integrations-index/) * **Независимость от брокеров**: Единый интерфейс для популярных брокеров: * **Redis** (основан на [redis-py]("https://redis.readthedocs.io/en/stable/index.html"){target="_blank"}) - * **NATS** (основан на [nats-py](https://github.com/nats-io/nats.py){target="_blank"}) * **RabbitMQ** (основан на [aio-pika](https://aio-pika.readthedocs.io/en/latest/){target="_blank"}) + * **Kafka** (основан на [aiokafka](https://aiokafka.readthedocs.io/en/stable/){target="_blank"}) + * **Nats** (основан на [nats-py](https://github.com/nats-io/nats.py){target="_blank"}) * [**RPC**](getting_started/4_broker/5_rpc/): Фреймворк поддерживает RPC запросы поверх брокеров сообщений, что позволит выполнять длительные операции на удаленных сервисах асинхронно. * [**Скорость разработки**](getting_started/2_cli/): собственный *CLI* инструмент предоставляет отличный опыт разработки: * Полностью совместимый с любым фреймворком способ управлять окружением проекта @@ -58,7 +59,7 @@ Декларативные иснтрументы позволяют нам описывать **что мы хотим получить**, в то время как традиционные императивные инструменты заставляют нас писать **что мы хотим сделать**. -К традиционным императивным библиотекам относятся [aio-pika](https://aio-pika.readthedocs.io/en/latest/){target="_blank"}, [pika](https://pika.readthedocs.io/en/stable/){target="_blank"}, [redis-py]("https://redis.readthedocs.io/en/stable/index.html"){target="_blank"}, [nats-py](https://github.com/nats-io/nats.py){target="_blank"} и подобные. +К традиционным императивным библиотекам относятся [aio-pika](https://aio-pika.readthedocs.io/en/latest/){target="_blank"}, [pika](https://pika.readthedocs.io/en/stable/){target="_blank"}, [redis-py]("https://redis.readthedocs.io/en/stable/index.html"){target="_blank"}, [nats-py](https://github.com/nats-io/nats.py){target="_blank"}, [aiokafka](https://aiokafka.readthedocs.io/en/stable/){target="_blank"} и подобные. Например, это **Quickstart** из библиотеки *aio-pika*: @@ -117,10 +118,10 @@ async def base_handler(body): |-------------------|:-------------------------------------------------------:|:--------------------:| | **RabbitMQ** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: | | **Redis** | :heavy_check_mark: **stable** :heavy_check_mark: | :mag: planning :mag: | +| **Kafka** | :warning: **beta** :warning: | :mag: planning :mag: | | **Nats** | :warning: **beta** :warning: | :mag: planning :mag: | | **NatsJS** | :hammer_and_wrench: **in progress** :hammer_and_wrench: | :mag: planning :mag: | | **MQTT** | :mag: planning :mag: | :mag: planning :mag: | -| **Kafka** | :mag: planning :mag: | :mag: planning :mag: | | **Redis Streams** | :mag: planning :mag: | :mag: planning :mag: | | **Pulsar** | :mag: planning :mag: | :mag: planning :mag: | | **SQS** | :mag: planning :mag: | :mag: planning :mag: | \ No newline at end of file diff --git a/docs/docs_src/contributing/adapter/parent.py b/docs/docs_src/contributing/adapter/parent.py index 637c37d3..8a93d15e 100644 --- a/docs/docs_src/contributing/adapter/parent.py +++ b/docs/docs_src/contributing/adapter/parent.py @@ -37,7 +37,7 @@ async def publish( message: SendableMessage, *args: Any, callback: bool = False, - callback_timeout: float | None = None, + callback_timeout: Optional[float] = None, raise_timeout: bool = False, **kwargs: Any, ) -> Any: diff --git a/propan/__about__.py b/propan/__about__.py index 3e002756..c9501676 100644 --- a/propan/__about__.py +++ b/propan/__about__.py @@ -1,3 +1,13 @@ """Simple and fast framework to create message brokers based microservices""" -__version__ = "0.1.1.4" +__version__ = "0.1.2.0" + + +INSTALL_MESSAGE = ( + "You should specify using broker!\n" + "Install it using one of the following commands:\n" + 'pip install "propan[async-rabbit]"\n' + 'pip install "propan[async-nats]"\n' + 'pip install "propan[async-redis]"\n' + 'pip install "propan[async-kafka]"\n' +) diff --git a/propan/__init__.py b/propan/__init__.py index 3256c45f..d10656fc 100644 --- a/propan/__init__.py +++ b/propan/__init__.py @@ -1,4 +1,5 @@ # Imports to use at __all__ +from propan.__about__ import INSTALL_MESSAGE from propan.cli.app import * # noqa: F403 from propan.log import * # noqa: F403 from propan.utils import * # noqa: F403 @@ -15,18 +16,15 @@ try: from propan.brokers.redis import RedisBroker -except Exception as e: - print(e) +except Exception: RedisBroker = None # type: ignore -assert any((RabbitBroker, NatsBroker, RedisBroker)), ( - "You should specify using broker!\n" - "Install it using one of the following commands:\n" - 'pip install "propan[async-rabbit]"\n' - 'pip install "propan[async-nats]"\n' - 'pip install "propan[async-redis]"\n' -) +try: + from propan.brokers.kafka import KafkaBroker +except Exception: + KafkaBroker = None # type: ignore +assert any((RabbitBroker, NatsBroker, RedisBroker)), INSTALL_MESSAGE __all__ = ( # noqa: F405 # app @@ -46,4 +44,5 @@ "NatsBroker", "RabbitBroker", "RedisBroker", + "KafkaBroker", ) diff --git a/propan/annotations.py b/propan/annotations.py index 87845694..9a64a47e 100644 --- a/propan/annotations.py +++ b/propan/annotations.py @@ -2,6 +2,7 @@ from typing_extensions import Annotated +from propan.__about__ import INSTALL_MESSAGE from propan.cli.app import PropanApp from propan.utils.context import Context as ContextField from propan.utils.context import ContextRepo as CR @@ -21,6 +22,7 @@ except Exception: RabbitBroker = RabbitMessage = None # type: ignore + try: from nats.aio.msg import Msg @@ -31,6 +33,7 @@ except Exception: NatsBroker = NatsMessage = None # type: ignore + try: from propan.brokers.redis import RedisBroker as RedB @@ -38,16 +41,23 @@ except Exception: RedisBroker = None # type: ignore + +try: + from aiokafka.structs import ConsumerRecord + + from propan.brokers.kafka import KafkaBroker as KB + + KafkaBroker = Annotated[KB, ContextField("broker")] + KafkaMessage = Annotated[ConsumerRecord, ContextField("message")] +except Exception: + KafkaBroker = KafkaMessage = None # type: ignore + + assert any( ( all((RabbitBroker, RabbitMessage)), all((NatsBroker, NatsMessage)), RedisBroker, + all((KafkaBroker, KafkaMessage)), ) -), ( - "You should specify using broker!\n" - "Install it using one of the following commands:\n" - 'pip install "propan[async-rabbit]"\n' - 'pip install "propan[async-nats]"\n' - 'pip install "propan[async-redis]"\n' -) +), INSTALL_MESSAGE diff --git a/propan/brokers/kafka/__init__.py b/propan/brokers/kafka/__init__.py new file mode 100644 index 00000000..03601b41 --- /dev/null +++ b/propan/brokers/kafka/__init__.py @@ -0,0 +1,3 @@ +from propan.brokers.kafka.kafka_broker import KafkaBroker + +__all__ = ("KafkaBroker",) diff --git a/propan/brokers/kafka/kafka_broker.py b/propan/brokers/kafka/kafka_broker.py new file mode 100644 index 00000000..266d751d --- /dev/null +++ b/propan/brokers/kafka/kafka_broker.py @@ -0,0 +1,204 @@ +import asyncio +from functools import partial, wraps +from typing import Any, Callable, Dict, List, NoReturn, Optional, Sequence, Tuple, Union + +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +from aiokafka.structs import ConsumerRecord +from typing_extensions import TypeVar + +from propan.__about__ import __version__ +from propan.brokers.kafka.schemas import Handler +from propan.brokers.model.broker_usecase import BrokerUsecase +from propan.brokers.model.schemas import PropanMessage +from propan.brokers.push_back_watcher import BaseWatcher +from propan.types import ( + AnyCallable, + AnyDict, + DecoratedCallable, + SendableMessage, + Wrapper, +) +from propan.utils.context import context + +T = TypeVar("T") + + +class KafkaBroker(BrokerUsecase): + _publisher: Optional[AIOKafkaProducer] + _connection: Callable[[Tuple[str, ...]], AIOKafkaConsumer] + __max_topic_len: int + handlers: List[Handler] + + def __init__( + self, + bootstrap_servers: Union[str, List[str]] = "localhost", + *, + log_fmt: Optional[str] = None, + **kwargs: AnyDict, + ) -> None: + super().__init__(bootstrap_servers, log_fmt=log_fmt, **kwargs) + self.__max_topic_len = 4 + self._publisher = None + + async def _connect( + self, + bootstrap_servers: Union[str, List[str]] = "localhost", + **kwargs: Any, + ) -> AIOKafkaConsumer: + kwargs["client_id"] = kwargs.get("client_id", "propan-" + __version__) + kwargs["bootstrap_servers"] = bootstrap_servers + + producer = AIOKafkaProducer(**kwargs) + context.set_global("producer", producer) + await producer.start() + self._publisher = producer + consumer_kwargs = { + k: v + for k, v in kwargs.items() + if k + in { + "bootstrap_servers", + "loop", + "client_id", + "request_timeout_ms", + "retry_backoff_ms", + "metadata_max_age_ms", + "security_protocol", + "api_version", + "connections_max_idle_ms", + "sasl_mechanism", + "sasl_plain_password", + "sasl_plain_username", + "sasl_kerberos_service_name", + "sasl_kerberos_domain_name", + "sasl_oauth_token_provider", + } + and v + } + return partial(AIOKafkaConsumer, **consumer_kwargs) + + async def close(self) -> None: + for handler in self.handlers: + if handler.task is not None: + handler.task.cancel() + + if handler.consumer is not None: + await handler.consumer.stop() + + if self._publisher is not None: + await self._publisher.stop() + self._publisher = None + + def handle( + self, + *topics: str, + **kwargs: AnyDict, + ) -> Wrapper: + def wrapper(func: AnyCallable) -> DecoratedCallable: + for t in topics: + self.__max_topic_len = max((self.__max_topic_len, len(t))) + + func = self._wrap_handler(func) + handler = Handler( + callback=func, + topics=topics, + consumer_kwargs=kwargs, + ) + self.handlers.append(handler) + + return func + + return wrapper + + async def start(self) -> None: + await super().start() + + for handler in self.handlers: # pragma: no branch + c = self._get_log_context(None, handler.topics) + self._log(f"`{handler.callback.__name__}` waiting for messages", extra=c) + + consumer = self._connection(*handler.topics, **handler.consumer_kwargs) + await consumer.start() + handler.consumer = consumer + handler.task = asyncio.create_task(_consume(handler)) + + @staticmethod + async def _parse_message(message: ConsumerRecord) -> PropanMessage: + headers = {i: j.decode() for i, j in message.headers} + return PropanMessage( + body=message.value, + raw_message=message, + message_id=f"{message.offset}-{message.timestamp}", + reply_to=headers.get("reply_to", ""), + content_type=headers.get("content-type"), + headers=headers, + ) + + def _process_message( + self, func: Callable[[PropanMessage], T], watcher: Optional[BaseWatcher] + ) -> Callable[[PropanMessage], T]: + @wraps(func) + async def wrapper(message: PropanMessage) -> T: + return await func(message) + + return wrapper + + async def publish( + self, + message: SendableMessage, + topic: str, + key: Optional[bytes] = None, + partition: Optional[int] = None, + timestamp_ms: Optional[int] = None, + headers: Optional[Dict[str, str]] = None, + *, + reply_to: str = "", + callback: bool = False, + callback_timeout: Optional[float] = None, + raise_timeout: bool = False, + ) -> Any: + message, content_type = super()._encode_message(message) + + headers = { + "content-type": content_type or "", + "reply_to": reply_to, + **(headers or {}), + } + + return await self._publisher.send( + topic=topic, + value=message, + key=key, + partition=partition, + timestamp_ms=timestamp_ms, + headers=[(i, j.encode()) for i, j in headers.items()], + ) + + @property + def fmt(self) -> str: + return self._fmt or ( + "%(asctime)s %(levelname)s - " + f"%(topic)-{self.__max_topic_len}s | " + "%(message_id)-10s " + "- %(message)s" + ) + + def _get_log_context( + self, + message: PropanMessage, + topics: Sequence[str] = (), + ) -> Dict[str, Any]: + if topics: + topic = ", ".join(topics) + else: + topic = message.raw_message.topic + + return { + "topic": topic, + **super()._get_log_context(message), + } + + +async def _consume(handler: Handler) -> NoReturn: + async for msg in handler.consumer: + await handler.callback(msg) diff --git a/propan/brokers/kafka/kafka_broker.pyi b/propan/brokers/kafka/kafka_broker.pyi new file mode 100644 index 00000000..04238b11 --- /dev/null +++ b/propan/brokers/kafka/kafka_broker.pyi @@ -0,0 +1,191 @@ +import logging +from asyncio import AbstractEventLoop +from ssl import SSLContext +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union + +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +from aiokafka.abc import AbstractTokenProvider +from aiokafka.producer.producer import _missing +from aiokafka.structs import ConsumerRecord +from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.partitioner.default import DefaultPartitioner +from typing_extensions import Literal, TypeVar + +from propan.__about__ import __version__ +from propan.brokers.kafka.schemas import Handler +from propan.brokers.model.broker_usecase import BrokerUsecase +from propan.brokers.model.schemas import PropanMessage +from propan.brokers.push_back_watcher import BaseWatcher +from propan.log import access_logger +from propan.types import SendableMessage, Wrapper + +T = TypeVar("T") +Partition = TypeVar("Partition") + +class KafkaBroker(BrokerUsecase): + _publisher: Optional[AIOKafkaProducer] + _connection: Callable[[Tuple[str, ...]], AIOKafkaConsumer] + __max_topic_len: int + handlers: List[Handler] + + def __init__( + self, + bootstrap_servers: Union[str, List[str]] = "localhost", + *, + # both + loop: Optional[AbstractEventLoop] = None, + client_id: str = "propan-" + __version__, + request_timeout_ms: int = 40 * 1000, + retry_backoff_ms: int = 100, + metadata_max_age_ms: int = 5 * 60 * 1000, + security_protocol: Literal[ + "SSL", + "PLAINTEXT", + ] = "PLAINTEXT", + api_version: str = "auto", + connections_max_idle_ms: int = 540000, + sasl_mechanism: Literal[ + "PLAIN", + "GSSAPI", + "SCRAM-SHA-256", + "SCRAM-SHA-512", + "OAUTHBEARER", + ] = "PLAIN", + sasl_plain_password: Optional[str] = None, + sasl_plain_username: Optional[str] = None, + sasl_kerberos_service_name: str = "kafka", + sasl_kerberos_domain_name: Optional[str] = None, + sasl_oauth_token_provider: Optional[AbstractTokenProvider] = None, + # publisher + acks: Union[Literal[0, 1, -1, "all"], object] = _missing, + key_serializer: Optional[Callable[[Any], bytes]] = None, + value_serializer: Optional[Callable[[Any], bytes]] = None, + compression_type: Optional[Literal["gzip", "snappy", "lz4", "zstd"]] = None, + max_batch_size: int = 16384, + partitioner: Callable[ + [bytes, List[Partition], List[Partition]], + Partition, + ] = DefaultPartitioner(), + max_request_size: int = 1048576, + linger_ms: int = 0, + send_backoff_ms: int = 100, + ssl_context: Optional[SSLContext] = None, + enable_idempotence: bool = False, + transactional_id: Optional[str] = None, + transaction_timeout_ms: int = 60000, + # broker + logger: Optional[logging.Logger] = access_logger, + log_level: int = logging.INFO, + log_fmt: Optional[str] = None, + apply_types: bool = True, + ) -> None: ... + async def connect( + self, + bootstrap_servers: Union[str, List[str]] = "localhost", + *, + # both + loop: Optional[AbstractEventLoop] = None, + client_id: str = "propan-" + __version__, + request_timeout_ms: int = 40 * 1000, + retry_backoff_ms: int = 100, + metadata_max_age_ms: int = 5 * 60 * 1000, + security_protocol: Literal[ + "SSL", + "PLAINTEXT", + ] = "PLAINTEXT", + api_version: str = "auto", + connections_max_idle_ms: int = 540000, + sasl_mechanism: Literal[ + "PLAIN", + "GSSAPI", + "SCRAM-SHA-256", + "SCRAM-SHA-512", + "OAUTHBEARER", + ] = "PLAIN", + sasl_plain_password: Optional[str] = None, + sasl_plain_username: Optional[str] = None, + sasl_kerberos_service_name: str = "kafka", + sasl_kerberos_domain_name: Optional[str] = None, + sasl_oauth_token_provider: Optional[AbstractTokenProvider] = None, + # publisher + acks: Union[Literal[0, 1, -1, "all"], object] = _missing, + key_serializer: Optional[Callable[[Any], bytes]] = None, + value_serializer: Optional[Callable[[Any], bytes]] = None, + compression_type: Optional[Literal["gzip", "snappy", "lz4", "zstd"]] = None, + max_batch_size: int = 16384, + partitioner: Callable[ + [bytes, List[Partition], List[Partition]], + Partition, + ] = DefaultPartitioner(), + max_request_size: int = 1048576, + linger_ms: int = 0, + send_backoff_ms: int = 100, + ssl_context: Optional[SSLContext] = None, + enable_idempotence: bool = False, + transactional_id: Optional[str] = None, + transaction_timeout_ms: int = 60000, + ) -> AIOKafkaConsumer: ... + async def _connect(self, *args: Any, **kwargs: Any) -> AIOKafkaConsumer: ... + async def close(self) -> None: ... + def handle( # type: ignore[override] + self, + *topics: str, + group_id: Optional[str] = None, + key_deserializer: Optional[Callable[[bytes], Any]] = None, + value_deserializer: Optional[Callable[[bytes], Any]] = None, + fetch_max_wait_ms: int = 500, + fetch_max_bytes: int = 52428800, + fetch_min_bytes: int = 1, + max_partition_fetch_bytes: int = 1 * 1024 * 1024, + auto_offset_reset: Literal[ + "latest", + "earliest", + "none", + ] = "latest", + enable_auto_commit: bool = True, + auto_commit_interval_ms: int = 5000, + check_crcs: bool = True, + partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = ( + RoundRobinPartitionAssignor, + ), + max_poll_interval_ms: int = 300000, + rebalance_timeout_ms: Optional[int] = None, + session_timeout_ms: int = 10000, + heartbeat_interval_ms: int = 3000, + consumer_timeout_ms: int = 200, + max_poll_records: Optional[int] = None, + exclude_internal_topics: bool = True, + isolation_level: Literal[ + "read_uncommitted", + "read_committed", + ] = "read_uncommitted", + retry: Union[bool, int] = False, + ) -> Wrapper: ... + async def start(self) -> None: ... + @staticmethod + async def _parse_message(message: ConsumerRecord) -> PropanMessage: ... + def _process_message( + self, func: Callable[[PropanMessage], T], watcher: Optional[BaseWatcher] + ) -> Callable[[PropanMessage], T]: ... + async def publish( # type: ignore[override] + self, + message: SendableMessage, + topic: str, + key: Optional[bytes] = None, + partition: Optional[int] = None, + timestamp_ms: Optional[int] = None, + headers: Optional[Dict[str, str]] = None, + *, + reply_to: str = "", + callback: bool = False, + callback_timeout: Optional[float] = None, + raise_timeout: bool = False, + ) -> Any: ... + @property + def fmt(self) -> str: ... + def _get_log_context( # type: ignore[override] + self, + message: PropanMessage, + topics: Sequence[str] = (), + ) -> Dict[str, Any]: ... diff --git a/propan/brokers/kafka/schemas.py b/propan/brokers/kafka/schemas.py new file mode 100644 index 00000000..3eb7f0fa --- /dev/null +++ b/propan/brokers/kafka/schemas.py @@ -0,0 +1,17 @@ +import asyncio +from dataclasses import dataclass, field +from typing import Any, List, Optional + +from aiokafka import AIOKafkaConsumer + +from propan.brokers.model.schemas import BaseHandler +from propan.types import AnyDict + + +@dataclass +class Handler(BaseHandler): + topics: List[str] + + consumer: Optional[AIOKafkaConsumer] = None + task: Optional["asyncio.Task[Any]"] = None + consumer_kwargs: AnyDict = field(default_factory=dict) diff --git a/propan/cli/startproject/async_app/app.py b/propan/cli/startproject/async_app/app.py index 83bc89b9..ece28c5c 100644 --- a/propan/cli/startproject/async_app/app.py +++ b/propan/cli/startproject/async_app/app.py @@ -2,6 +2,7 @@ import typer +from propan.cli.startproject.async_app.kafka import create_kafka from propan.cli.startproject.async_app.nats import create_nats from propan.cli.startproject.async_app.rabbit import create_rabbit from propan.cli.startproject.async_app.redis import create_redis @@ -28,3 +29,10 @@ def nats(appname: str) -> None: """Create an asyncronous Nats Propan project at [APPNAME] directory""" project = create_nats(Path.cwd() / appname) typer.echo(f"Create an asyncronous Nats Propan project at: {project}") + + +@async_app.command() +def kafka(appname: str) -> None: + """Create an asyncronous Kafka Propan project at [APPNAME] directory""" + project = create_kafka(Path.cwd() / appname) + typer.echo(f"Create an asyncronous Kafka Propan project at: {project}") diff --git a/propan/cli/startproject/async_app/core.py b/propan/cli/startproject/async_app/core.py index 44c9f0a7..08d18c5f 100644 --- a/propan/cli/startproject/async_app/core.py +++ b/propan/cli/startproject/async_app/core.py @@ -29,7 +29,7 @@ def create_app_file(app_dir: Path, broker_annotation: str) -> None: " app.logger.setLevel(logger_level)", " broker.logger.setLevel(logger_level)", "", - " await broker.connect(url=settings.broker.url)", + " await broker.connect(settings.broker.url)", "", "", 'if __name__ == "__main__":', diff --git a/propan/cli/startproject/async_app/kafka.py b/propan/cli/startproject/async_app/kafka.py new file mode 100644 index 00000000..d7faccd3 --- /dev/null +++ b/propan/cli/startproject/async_app/kafka.py @@ -0,0 +1,92 @@ +from pathlib import Path + +from propan.cli.startproject.async_app.core import create_app_file +from propan.cli.startproject.core import ( + create_apps_dir, + create_config_dir, + create_core_dir, + create_env, + create_project_dir, +) +from propan.cli.startproject.utils import touch_dir, write_file + + +def create_kafka(dir: Path) -> Path: + project_dir = _create_project_dir(dir) + app_dir = _create_app_dir(project_dir / "app") + _create_config_dir(app_dir / "config") + _create_core_dir(app_dir / "core") + _create_apps_dir(app_dir / "apps") + return project_dir + + +def _create_project_dir(dirname: Path) -> Path: + project_dir = create_project_dir(dirname, "propan[async-kafka]") + + write_file( + project_dir / "docker-compose.yaml", + 'version: "3"', + "", + "services:", + " kafka:", + " image: bitnami/kafka", + " ports:", + " - 9092:9092", + " environment:", + " - KAFKA_ENABLE_KRAFT=yes", + " - KAFKA_CFG_NODE_ID=1", + " - KAFKA_CFG_PROCESS_ROLES=broker,controller", + " - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER", + " - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093", + " - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT", + " - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092", + " - KAFKA_BROKER_ID=1", + " - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093", + " - ALLOW_PLAINTEXT_LISTENER=yes", + "", + " app:", + " build: .", + " environment:", + " APP_BROKER__URL: kafka:9092", + " volumes:", + " - ./app:/home/user/app:ro", + " depends_on:", + " - kafka", + ) + + return project_dir + + +def _create_app_dir(app: Path) -> Path: + app_dir = touch_dir(app) + create_app_file(app_dir, "KafkaBroker") + return app_dir + + +def _create_config_dir(config: Path) -> Path: + config_dir = create_config_dir(config) + create_env(config_dir, "localhost:9092") + return config_dir + + +def _create_core_dir(core: Path) -> Path: + core_dir = create_core_dir(core, "KafkaBroker") + return core_dir + + +def _create_apps_dir(apps: Path) -> Path: + apps_dir = create_apps_dir(apps) + + write_file( + apps_dir / "handlers.py", + "from propan.annotations import Logger", + "", + "from core import broker", + "", + "", + "@broker.handle('test')", + "async def base_handler(body: dict, logger: Logger):", + " logger.info(body)", + ) + + return apps_dir diff --git a/propan/fastapi/__init__.py b/propan/fastapi/__init__.py index 6ef56fef..25cd75a6 100644 --- a/propan/fastapi/__init__.py +++ b/propan/fastapi/__init__.py @@ -1,3 +1,5 @@ +from propan.__about__ import INSTALL_MESSAGE + try: from propan.fastapi.rabbit import RabbitRouter except Exception: @@ -8,4 +10,11 @@ except Exception: RedisRouter = None # type: ignore -__all__ = ("RabbitRouter", "RedisRouter") +try: + from propan.fastapi.kafka import KafkaRouter +except Exception: + KafkaRouter = None # type: ignore + +assert any((RabbitRouter, RedisRouter, KafkaRouter)), INSTALL_MESSAGE + +__all__ = ("RabbitRouter", "RedisRouter", "KafkaRouter") diff --git a/propan/fastapi/kafka/__init__.py b/propan/fastapi/kafka/__init__.py new file mode 100644 index 00000000..2c104085 --- /dev/null +++ b/propan/fastapi/kafka/__init__.py @@ -0,0 +1,3 @@ +from propan.fastapi.kafka.router import KafkaRouter + +__all__ = ("KafkaRouter",) diff --git a/propan/fastapi/kafka/router.py b/propan/fastapi/kafka/router.py new file mode 100644 index 00000000..6752ace3 --- /dev/null +++ b/propan/fastapi/kafka/router.py @@ -0,0 +1,7 @@ +from propan import KafkaBroker +from propan.fastapi.core.router import PropanRouter + + +class KafkaRouter(PropanRouter): + broker_class = KafkaBroker + broker: KafkaBroker diff --git a/propan/fastapi/kafka/router.pyi b/propan/fastapi/kafka/router.pyi new file mode 100644 index 00000000..60990f35 --- /dev/null +++ b/propan/fastapi/kafka/router.pyi @@ -0,0 +1,170 @@ +import logging +from asyncio import AbstractEventLoop +from enum import Enum +from ssl import SSLContext +from typing import Any, Callable, Dict, List, Optional, Sequence, Type, Union + +from aiokafka.abc import AbstractTokenProvider +from aiokafka.producer.producer import _missing +from fastapi import params +from fastapi.datastructures import Default +from fastapi.routing import APIRoute +from fastapi.utils import generate_unique_id +from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.partitioner.default import DefaultPartitioner +from starlette import routing +from starlette.responses import JSONResponse, Response +from starlette.types import ASGIApp +from typing_extensions import Literal, TypeVar + +from propan.__about__ import __version__ +from propan.fastapi.core import PropanRouter +from propan.log import access_logger +from propan.types import AnyCallable + +Partition = TypeVar("Partition") + +class KafkaRouter(PropanRouter): + def __init__( + self, + bootstrap_servers: Union[str, List[str]] = "localhost", + *, + # both + loop: Optional[AbstractEventLoop] = None, + client_id: str = "propan-" + __version__, + request_timeout_ms: int = 40 * 1000, + retry_backoff_ms: int = 100, + metadata_max_age_ms: int = 5 * 60 * 1000, + security_protocol: Literal[ + "SSL", + "PLAINTEXT", + ] = "PLAINTEXT", + api_version: str = "auto", + connections_max_idle_ms: int = 540000, + sasl_mechanism: Literal[ + "PLAIN", + "GSSAPI", + "SCRAM-SHA-256", + "SCRAM-SHA-512", + "OAUTHBEARER", + ] = "PLAIN", + sasl_plain_password: Optional[str] = None, + sasl_plain_username: Optional[str] = None, + sasl_kerberos_service_name: str = "kafka", + sasl_kerberos_domain_name: Optional[str] = None, + sasl_oauth_token_provider: Optional[AbstractTokenProvider] = None, + # publisher + acks: Union[Literal[0, 1, -1, "all"], object] = _missing, + key_serializer: Optional[Callable[[Any], bytes]] = None, + value_serializer: Optional[Callable[[Any], bytes]] = None, + compression_type: Optional[Literal["gzip", "snappy", "lz4", "zstd"]] = None, + max_batch_size: int = 16384, + partitioner: Callable[ + [bytes, List[Partition], List[Partition]], + Partition, + ] = DefaultPartitioner(), + max_request_size: int = 1048576, + linger_ms: int = 0, + send_backoff_ms: int = 100, + ssl_context: Optional[SSLContext] = None, + enable_idempotence: bool = False, + transactional_id: Optional[str] = None, + transaction_timeout_ms: int = 60000, + # FastAPI kwargs + prefix: str = "", + tags: Optional[List[Union[str, Enum]]] = None, + dependencies: Optional[Sequence[params.Depends]] = None, + default_response_class: Type[Response] = Default(JSONResponse), + responses: Optional[Dict[Union[int, str], Dict[str, Any]]] = None, + callbacks: Optional[List[routing.BaseRoute]] = None, + routes: Optional[List[routing.BaseRoute]] = None, + redirect_slashes: bool = True, + default: Optional[ASGIApp] = None, + dependency_overrides_provider: Optional[Any] = None, + route_class: Type[APIRoute] = APIRoute, + on_startup: Optional[Sequence[Callable[[], Any]]] = None, + on_shutdown: Optional[Sequence[Callable[[], Any]]] = None, + deprecated: Optional[bool] = None, + include_in_schema: bool = True, + generate_unique_id_function: Callable[[APIRoute], str] = Default( + generate_unique_id + ), + # Broker kwargs + logger: Optional[logging.Logger] = access_logger, + log_level: int = logging.INFO, + log_fmt: Optional[str] = None, + apply_types: bool = True, + ) -> None: + pass + def add_api_mq_route( # type: ignore[override] + self, + *topics: str, + endpoint: AnyCallable, + group_id: Optional[str] = None, + key_deserializer: Optional[Callable[[bytes], Any]] = None, + value_deserializer: Optional[Callable[[bytes], Any]] = None, + fetch_max_wait_ms: int = 500, + fetch_max_bytes: int = 52428800, + fetch_min_bytes: int = 1, + max_partition_fetch_bytes: int = 1 * 1024 * 1024, + auto_offset_reset: Literal[ + "latest", + "earliest", + "none", + ] = "latest", + enable_auto_commit: bool = True, + auto_commit_interval_ms: int = 5000, + check_crcs: bool = True, + partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = ( + RoundRobinPartitionAssignor, + ), + max_poll_interval_ms: int = 300000, + rebalance_timeout_ms: Optional[int] = None, + session_timeout_ms: int = 10000, + heartbeat_interval_ms: int = 3000, + consumer_timeout_ms: int = 200, + max_poll_records: Optional[int] = None, + exclude_internal_topics: bool = True, + isolation_level: Literal[ + "read_uncommitted", + "read_committed", + ] = "read_uncommitted", + retry: Union[bool, int] = False, + ) -> None: + pass + def event( # type: ignore[override] + self, + *topics: str, + group_id: Optional[str] = None, + key_deserializer: Optional[Callable[[bytes], Any]] = None, + value_deserializer: Optional[Callable[[bytes], Any]] = None, + fetch_max_wait_ms: int = 500, + fetch_max_bytes: int = 52428800, + fetch_min_bytes: int = 1, + max_partition_fetch_bytes: int = 1 * 1024 * 1024, + auto_offset_reset: Literal[ + "latest", + "earliest", + "none", + ] = "latest", + enable_auto_commit: bool = True, + auto_commit_interval_ms: int = 5000, + check_crcs: bool = True, + partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = ( + RoundRobinPartitionAssignor, + ), + max_poll_interval_ms: int = 300000, + rebalance_timeout_ms: Optional[int] = None, + session_timeout_ms: int = 10000, + heartbeat_interval_ms: int = 3000, + consumer_timeout_ms: int = 200, + max_poll_records: Optional[int] = None, + exclude_internal_topics: bool = True, + isolation_level: Literal[ + "read_uncommitted", + "read_committed", + ] = "read_uncommitted", + retry: Union[bool, int] = False, + ) -> None: + pass diff --git a/propan/test/__init__.py b/propan/test/__init__.py index 33ba3aa7..a20fcb00 100644 --- a/propan/test/__init__.py +++ b/propan/test/__init__.py @@ -1,3 +1,5 @@ +from propan.__about__ import INSTALL_MESSAGE + try: from propan.test.rabbit import TestRabbitBroker except Exception: @@ -8,7 +10,15 @@ except Exception: TestRedisBroker = None # type: ignore +try: + from propan.test.kafka import TestKafkaBroker +except Exception: + TestKafkaBroker = None # type: ignore + +assert any((TestRabbitBroker, TestRedisBroker, TestKafkaBroker)), INSTALL_MESSAGE + __all__ = ( "TestRabbitBroker", "TestRedisBroker", + "TestKafkaBroker", ) diff --git a/propan/test/kafka.py b/propan/test/kafka.py new file mode 100644 index 00000000..3cb28eee --- /dev/null +++ b/propan/test/kafka.py @@ -0,0 +1,93 @@ +import sys +from datetime import datetime +from types import MethodType +from typing import Any, Dict, Optional + +from aiokafka.structs import ConsumerRecord + +if sys.version_info < (3, 8): + from asyncmock import AsyncMock +else: + from unittest.mock import AsyncMock + +from propan import KafkaBroker +from propan.test.utils import call_handler +from propan.types import SendableMessage + +__all__ = ( + "build_message", + "TestKafkaBroker", +) + + +def build_message( + message: SendableMessage, + topic: str, + partition: Optional[int] = None, + timestamp_ms: Optional[int] = None, + key: Optional[bytes] = None, + headers: Optional[Dict[str, str]] = None, + *, + reply_to: str = "", +) -> ConsumerRecord: + msg, content_type = KafkaBroker._encode_message(message) + k = key or b"" + headers = { + "content-type": content_type or "", + "reply_to": reply_to, + **(headers or {}), + } + + return ConsumerRecord( + value=msg, + topic=topic, + partition=partition or 0, + timestamp=timestamp_ms or int(datetime.now().timestamp()), + timestamp_type=0, + key=k, + serialized_key_size=len(k), + serialized_value_size=len(msg), + checksum=sum(msg), + offset=0, + headers=[(i, j.encode()) for i, j in headers.items()], + ) + + +async def publish( + self: KafkaBroker, + message: SendableMessage, + topic: str, + key: Optional[bytes] = None, + partition: Optional[int] = None, + timestamp_ms: Optional[int] = None, + headers: Optional[Dict[str, str]] = None, + *, + reply_to: str = "", + callback: bool = False, + callback_timeout: Optional[float] = None, + raise_timeout: bool = False, +) -> Any: + incoming = build_message( + message=message, + topic=topic, + key=key, + partition=partition, + timestamp_ms=timestamp_ms, + reply_to=reply_to, + headers=headers, + ) + + for handler in self.handlers: # pragma: no branch + if topic in handler.topics: # pragma: no branch + r = await call_handler( + handler, incoming, callback, callback_timeout, raise_timeout + ) + if callback: # pragma: no branch + return r + + +def TestKafkaBroker(broker: KafkaBroker) -> KafkaBroker: + broker.connect = AsyncMock() # type: ignore + broker.start = AsyncMock() # type: ignore + broker.publish = MethodType(publish, broker) # type: ignore + return broker diff --git a/pyproject.toml b/pyproject.toml index f2a0cb14..df3ee035 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,10 +75,15 @@ async-redis = [ "redis>=4.2.0rc1" ] +async-kafka = [ + "aiokafka>=0.8" +] + test = [ "propan[async-rabbit]", "propan[async-nats]", "propan[async-redis]", + "propan[async-kafka]", "coverage[toml]>=7.2", "pytest>=7", @@ -199,6 +204,7 @@ extend-immutable-calls = [ "propan.utils.context.Depends", "propan.utils.context.Context", "typer.Argument", "typer.Option", "fastapi.Depends", "fastapi.datastructures.Default", + "kafka.partitioner.default.DefaultPartitioner", ] [tool.pytest.ini_options] @@ -212,6 +218,7 @@ markers = [ "rabbit", "nats", "redis", + "kafka", "all", ] diff --git a/tests/brokers/base/__init__.py b/tests/brokers/base/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/brokers/kafka/__init__.py b/tests/brokers/kafka/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/brokers/kafka/conftest.py b/tests/brokers/kafka/conftest.py new file mode 100644 index 00000000..8d4b03d5 --- /dev/null +++ b/tests/brokers/kafka/conftest.py @@ -0,0 +1,36 @@ +from uuid import uuid4 + +import pytest +import pytest_asyncio +from pydantic import BaseSettings + +from propan import KafkaBroker +from propan.test.kafka import TestKafkaBroker + + +class Settings(BaseSettings): + url = "localhost:9092" + + +@pytest.fixture +def topic(): + return str(uuid4()) + + +@pytest.fixture(scope="session") +def settings(): + return Settings() + + +@pytest_asyncio.fixture +@pytest.mark.kafka +async def broker(settings): + broker = KafkaBroker(settings.url, apply_types=False) + yield broker + await broker.close() + + +@pytest_asyncio.fixture +async def test_broker(): + broker = KafkaBroker() + yield TestKafkaBroker(broker) diff --git a/tests/brokers/kafka/test_connect.py b/tests/brokers/kafka/test_connect.py new file mode 100644 index 00000000..3f981fdd --- /dev/null +++ b/tests/brokers/kafka/test_connect.py @@ -0,0 +1,19 @@ +import pytest + +from propan import KafkaBroker + + +@pytest.mark.asyncio +@pytest.mark.kafka +async def test_init_connect_by_url(settings): + broker = KafkaBroker(settings.url) + assert await broker.connect() + await broker.close() + + +@pytest.mark.asyncio +@pytest.mark.kafka +async def test_connection_by_url(settings): + broker = KafkaBroker() + assert await broker.connect(settings.url) + await broker.close() diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py new file mode 100644 index 00000000..e3496bf8 --- /dev/null +++ b/tests/brokers/kafka/test_consume.py @@ -0,0 +1,77 @@ +from asyncio import Event, wait_for + +import pytest + +from propan import KafkaBroker + + +@pytest.mark.asyncio +@pytest.mark.kafka +async def test_consume( + mock, + topic: str, + broker: KafkaBroker, +): + consume = Event() + mock.side_effect = lambda *_: consume.set() # pragma: no branch + + async with broker: + broker.handle(topic)(mock) + await broker.start() + await broker.publish("hello", topic) + await wait_for(consume.wait(), 3) + + mock.assert_called_once() + + +@pytest.mark.asyncio +@pytest.mark.kafka +async def test_consume_double( + mock, + topic: str, + broker: KafkaBroker, +): + consume = Event() + mock.side_effect = lambda *_: consume.set() # pragma: no branch + + async with broker: + broker.handle(topic)(mock) + await broker.start() + + await broker.publish("hello", topic) + await wait_for(consume.wait(), 3) + + consume.clear() + await broker.publish("hello", topic) + await wait_for(consume.wait(), 3) + + assert mock.call_count == 2 + + +@pytest.mark.asyncio +@pytest.mark.kafka +async def test_different_consume( + mock, + topic: str, + broker: KafkaBroker, +): + first_consume = Event() + second_consume = Event() + + mock.method.side_effect = lambda *_: first_consume.set() # pragma: no branch + mock.method2.side_effect = lambda *_: second_consume.set() # pragma: no branch + + another_topic = topic + "1" + async with broker: + broker.handle(topic)(mock.method) + broker.handle(another_topic)(mock.method2) + await broker.start() + + await broker.publish("hello", topic) + await broker.publish("hello", another_topic) + + await wait_for(first_consume.wait(), 3) + await wait_for(second_consume.wait(), 3) + + mock.method.assert_called_once() + mock.method2.assert_called_once() diff --git a/tests/brokers/kafka/test_publish.py b/tests/brokers/kafka/test_publish.py new file mode 100644 index 00000000..64aad1c1 --- /dev/null +++ b/tests/brokers/kafka/test_publish.py @@ -0,0 +1,35 @@ +from asyncio import Event, wait_for +from unittest.mock import Mock + +import pytest +from pydantic import create_model + +from propan import KafkaBroker + + +@pytest.mark.asyncio +@pytest.mark.kafka +@pytest.mark.parametrize( + "message", + ( + b"hello!", + "hello", + {"message": "hello!"}, + create_model("Message", r=str)(r="hello!"), + [1, 2, 3], + ), +) +async def test_rpc(message, mock: Mock, topic: str, broker: KafkaBroker): + consume = Event() + mock.side_effect = lambda *_: consume.set() # pragma: no branch + + @broker.handle(topic) + async def handler(m): + mock(m) + + async with broker: + await broker.start() + await broker.publish(message, topic) + await wait_for(consume.wait(), 3) + + mock.assert_called_with(message) diff --git a/tests/brokers/kafka/test_test_client.py b/tests/brokers/kafka/test_test_client.py new file mode 100644 index 00000000..a1dcc252 --- /dev/null +++ b/tests/brokers/kafka/test_test_client.py @@ -0,0 +1,49 @@ +import pytest +from pydantic import ValidationError, create_model + +from propan import KafkaBroker +from propan.test.kafka import build_message + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "message", + ( + b"hello!", + "hello", + {"message": "hello!"}, + create_model("Message", r=str)(r="hello!"), + [1, 2, 3], + ), +) +async def test_rpc(message, topic: str, test_broker: KafkaBroker): + @test_broker.handle(topic) + async def handler(m): + return m + + async with test_broker: + await test_broker.start() + r = await test_broker.publish(message, topic, callback=True) + + assert r == message + + +@pytest.mark.asyncio +async def test_handler_calling(topic: str, test_broker: KafkaBroker): + @test_broker.handle(topic) + async def handler(m: dict): + return m + + raw_msg = {"msg": "hello!"} + message = build_message(raw_msg, topic) + + wrong_msg = build_message("Hi!", topic) + + async with test_broker: + await test_broker.start() + assert raw_msg == await handler(message) + + await handler(wrong_msg) + + with pytest.raises(ValidationError): + await handler(wrong_msg, reraise_exc=True) diff --git a/tests/brokers/rabbit/test_acc.py b/tests/brokers/rabbit/test_consume.py similarity index 100% rename from tests/brokers/rabbit/test_acc.py rename to tests/brokers/rabbit/test_consume.py diff --git a/tests/brokers/redis/__init__.py b/tests/brokers/redis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/brokers/redis/test_acc.py b/tests/brokers/redis/test_consume.py similarity index 100% rename from tests/brokers/redis/test_acc.py rename to tests/brokers/redis/test_consume.py diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index 332f1eea..f23f0d3f 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -5,6 +5,7 @@ from propan import PropanApp from propan.brokers.rabbit import RabbitBroker +from propan.cli.startproject.async_app.kafka import create_kafka from propan.cli.startproject.async_app.nats import create_nats from propan.cli.startproject.async_app.rabbit import create_rabbit from propan.cli.startproject.async_app.redis import create_redis @@ -51,3 +52,9 @@ def redis_async_project(): def nats_async_project(): with TemporaryDirectory() as dir: yield create_nats(dir) + + +@pytest.fixture(scope="module") +def kafka_async_project(): + with TemporaryDirectory() as dir: + yield create_kafka(dir) diff --git a/tests/cli/supervisors/__init__.py b/tests/cli/supervisors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cli/test_run.py b/tests/cli/test_run.py index acaa77a1..3c2e0e8b 100644 --- a/tests/cli/test_run.py +++ b/tests/cli/test_run.py @@ -42,3 +42,15 @@ def test_run_nats_correct(nats_async_project): time.sleep(0.1) p.terminate() p.join() + + +@pytest.mark.kafka +@pytest.mark.slow +def test_run_kafka_correct(kafka_async_project): + module, app = get_app_path(f'{kafka_async_project / "app" / "serve"}:app') + sys.path.insert(0, str(module.parent)) + p = Process(target=_run, args=(module, app, {})) + p.start() + time.sleep(0.1) + p.terminate() + p.join() diff --git a/tests/fastapi/__init__.py b/tests/fastapi/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/fastapi/test_app.py b/tests/fastapi/test_app.py index 492db3f6..79d6f3d5 100644 --- a/tests/fastapi/test_app.py +++ b/tests/fastapi/test_app.py @@ -1,69 +1,59 @@ +from typing import Callable, Type, TypeVar from uuid import uuid4 import pytest -from fastapi import FastAPI +from fastapi import APIRouter, FastAPI -from propan.fastapi import RabbitRouter, RedisRouter -from propan.test import TestRabbitBroker, TestRedisBroker +from propan.fastapi import KafkaRouter, RabbitRouter, RedisRouter +from propan.test import TestKafkaBroker, TestRabbitBroker, TestRedisBroker +Broker = TypeVar("Broker") -@pytest.mark.asyncio -async def test_rabbit(): - name = str(uuid4()) - name2 = name + "1" - router = RabbitRouter("amqp://guest:guest@localhost:5672") - router.broker = TestRabbitBroker(router.broker) +class FastAPITestcase: + router_class: Type[APIRouter] + broker_test: Callable[[Broker], Broker] - app = FastAPI() - app.include_router(router) + @pytest.mark.asyncio + async def test(self): + name = str(uuid4()) + name2 = name + "1" - @router.event(name) - async def hello(): - return "1" + router = self.router_class() + router.broker = self.broker_test(router.broker) - @router.event(name2) - async def hello2(b: int): - return "2" + app = FastAPI() + app.include_router(router) - await router.startup() + @router.event(name) + async def hello(): + return "1" - r = await router.broker.publish("", queue=name, callback=True, callback_timeout=0.5) - assert r == "1" + @router.event(name2) + async def hello2(b: int): + return "2" - r = await router.broker.publish( - "2", queue=name2, callback=True, callback_timeout=0.5 - ) - assert r == "2" + await router.startup() - await router.shutdown() + r = await router.broker.publish("", name, callback=True, callback_timeout=0.5) + assert r == "1" + r = await router.broker.publish("2", name2, callback=True, callback_timeout=0.5) + assert r == "2" -@pytest.mark.asyncio -async def test_redis(): - name = str(uuid4()) - name2 = name + "1" + await router.shutdown() - router = RedisRouter() - router.broker = TestRedisBroker(router.broker) - app = FastAPI() - app.include_router(router) +class TestRabbitRouter(FastAPITestcase): + router_class = RabbitRouter + broker_test = staticmethod(TestRabbitBroker) - @router.event(name) - async def hello(): - return "1" - @router.event(name2) - async def hello2(b: int): - return "2" +class TestRedisRouter(FastAPITestcase): + router_class = RedisRouter + broker_test = staticmethod(TestRedisBroker) - await router.startup() - r = await router.broker.publish("", name, callback=True, callback_timeout=0.5) - assert r == "1" - - r = await router.broker.publish("2", name2, callback=True, callback_timeout=0.5) - assert r == "2" - - await router.shutdown() +class TestKafkaRouter(FastAPITestcase): + router_class = KafkaRouter + broker_test = staticmethod(TestKafkaBroker) diff --git a/tests/log/__init__.py b/tests/log/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/utils/type_cast/__init__.py b/tests/utils/type_cast/__init__.py new file mode 100644 index 00000000..e69de29b