Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ jobs:
# exit-zero treats all errors as warnings.
poetry run flake8 . --count --exit-zero --statistics

- name: Typecheck with mypy
run: |
poetry run mypy mockafka tests || true

- name: Test with pytest
run: |
poetry run pytest --cov=./ --cov-report=xml
Expand Down
2 changes: 1 addition & 1 deletion mockafka/admin_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from confluent_kafka.cimpl import NewTopic, NewPartitions
from confluent_kafka.cimpl import NewTopic, NewPartitions # type: ignore[import-untyped]

from mockafka.cluster_metadata import ClusterMetadata
from mockafka.kafka_store import KafkaStore
Expand Down
2 changes: 1 addition & 1 deletion mockafka/aiokafka/aiokafka_admin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import Dict

from aiokafka.admin import NewTopic, NewPartitions
from aiokafka.admin import NewTopic, NewPartitions # type: ignore[import-untyped]

from mockafka.kafka_store import KafkaStore

Expand Down
13 changes: 7 additions & 6 deletions mockafka/aiokafka/aiokafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import random
from copy import deepcopy
from typing import Any

from mockafka.kafka_store import KafkaStore

Expand Down Expand Up @@ -34,18 +35,18 @@ class FakeAIOKafkaConsumer:
- getmany(): Currently just calls getone().
"""

def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.kafka = KafkaStore()
self.consumer_store = {}
self.consumer_store: dict[str, int] = {}
self.subscribed_topic: list = []

async def start(self):
async def start(self) -> None:
self.consumer_store = {}
self.subscribed_topic: list = []
self.subscribed_topic = []

async def stop(self):
async def stop(self) -> None:
self.consumer_store = {}
self.subscribed_topic: list = []
self.subscribed_topic = []

async def commit(self):
for item in self.consumer_store:
Expand Down
2 changes: 1 addition & 1 deletion mockafka/cluster_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class ClusterMetadata(object):
This class is typically not user instantiated.
"""

def __init__(self, topic: str = None):
def __init__(self, topic: str | None = None):
self.kafka = KafkaStore()
self.cluster_id = "test"
self.controller_id = 1
Expand Down
7 changes: 4 additions & 3 deletions mockafka/conumser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import random
from copy import deepcopy
from typing import Any

from mockafka.cluster_metadata import ClusterMetadata
from mockafka.kafka_store import KafkaStore
Expand Down Expand Up @@ -40,7 +41,7 @@ class FakeConsumer(object):
partitions) -> None: Incrementally unassign partitions (unsupported in mockafka).
"""

def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initialize the FakeConsumer.

Expand All @@ -49,7 +50,7 @@ def __init__(self, *args, **kwargs):
- kwargs: Additional keyword arguments (unused).
"""
self.kafka = KafkaStore()
self.consumer_store = {}
self.consumer_store: dict[str, int] = {}
self.subscribed_topic: list = []

def consume(self, num_messages=1, *args, **kwargs) -> list[Message]:
Expand Down Expand Up @@ -83,7 +84,7 @@ def close(self, *args, **kwargs):
self.consumer_store = {}
self.subscribed_topic = []

def commit(self, message: Message = None, *args, **kwargs):
def commit(self, message: Message | None = None, *args, **kwargs):
"""
Commit offsets for consumed messages.

Expand Down
6 changes: 3 additions & 3 deletions mockafka/decorators/asetup_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

from functools import wraps

from aiokafka.admin import NewTopic
from aiokafka.admin import NewTopic # type: ignore[import-untyped]

from mockafka.aiokafka import FakeAIOKafkaAdmin
from mockafka.decorators.typing import TopicConfig


def asetup_kafka(topics: [dict[str, str]], clean: bool = False):
def asetup_kafka(topics: list[TopicConfig], clean: bool = False):
"""
asetup_kafka is a decorator for setting up mock Kafka topics using a FakeAIOKafkaAdminClient.

Expand Down
6 changes: 4 additions & 2 deletions mockafka/decorators/bulk_producer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from __future__ import annotations

from functools import wraps

from mockafka import FakeProducer
from mockafka.decorators.typing import MessageDict


def bulk_produce(list_of_messages: list[dict[str, str]]):
def bulk_produce(list_of_messages: list[MessageDict]):
"""
A decorator for bulk-producing messages using a FakeProducer.

Parameters:
- list_of_messages (list[dict[str, str]]): A list of dictionaries containing message details.
- list_of_messages (list[dict]): A list of dictionaries containing message details.

Each dictionary should have the following optional keys:
- 'value': The value of the message.
Expand Down
6 changes: 4 additions & 2 deletions mockafka/decorators/setup_kafka.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from __future__ import annotations

from functools import wraps

from mockafka.admin_client import FakeAdminClientImpl, NewTopic
from mockafka.decorators.typing import TopicConfig


def setup_kafka(topics: [dict[str, str]], clean: bool = False):
def setup_kafka(topics: list[TopicConfig], clean: bool = False):
"""
A decorator for setting up Mockafka with specified topics using a FakeAdminClient.

Parameters:
- topics (list[dict[str, str]]): A list of dictionaries containing topic details.
- topics (list[dict]): A list of dictionaries containing topic details.
Each dictionary should have the keys 'topic' and 'partition'.
- clean (bool): Option to have a clean Kafka (remove existing topics) or not.

Expand Down
18 changes: 18 additions & 0 deletions mockafka/decorators/typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

from typing import TypedDict
from typing_extensions import NotRequired


class MessageDict(TypedDict):
value: NotRequired[str]
key: NotRequired[str]
topic: NotRequired[str]
partition: NotRequired[int]
timestamp: NotRequired[int]
headers: NotRequired[dict]


class TopicConfig(TypedDict):
topic: str
partition: int
4 changes: 2 additions & 2 deletions mockafka/kafka_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from __future__ import annotations

from confluent_kafka import KafkaException
from confluent_kafka import KafkaException # type: ignore[import-untyped]
from .message import Message
from copy import deepcopy

Expand All @@ -30,7 +30,7 @@


class SingletonMeta(type):
_instances = {}
_instances: dict[type[SingletonMeta], SingletonMeta] = {}

def __call__(cls, *args, **kwargs):
if cls not in cls._instances or "clean" in kwargs.keys():
Expand Down
6 changes: 3 additions & 3 deletions mockafka/message.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from __future__ import annotations

from typing import Optional
from typing import Optional, Any

from confluent_kafka import KafkaError
from confluent_kafka import KafkaError # type: ignore[import-untyped]


class Message:
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
self._headers: Optional[dict] = kwargs.get("headers", None)
self._key: Optional[str] = kwargs.get("key", None)
self._value: Optional[str] = kwargs.get("value", None)
Expand Down
2 changes: 1 addition & 1 deletion mockafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


class FakeProducer(object):
def __init__(self, config: dict = None):
def __init__(self, config: dict | None = None):
self.kafka = KafkaStore()

def produce(self, topic, value=None, *args, **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ confluent-kafka = ">= 1.9.2"
aiokafka = ">=0.10,<0.12"
pytest-cov = ">=4.1,<6.0"
pytest-asyncio = "^0.23.5"
typing-extensions = "^4.12.2"

[tool.poetry.group.dev.dependencies]
flake8 = "*"
Expand Down
4 changes: 4 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
max-complexity = 10
# The GitHub editor is 127 chars wide
max-line-length = 127


[mypy]
enable_error_code = ignore-without-code
22 changes: 7 additions & 15 deletions tests/test_aiokafka/test_aiokafka_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from unittest import IsolatedAsyncioTestCase

import pytest
from aiokafka.admin import NewTopic
from aiokafka.admin import NewTopic # type: ignore[import-untyped]

from mockafka import Message
from mockafka.aiokafka.aiokafka_admin_client import FakeAIOKafkaAdmin
Expand All @@ -18,6 +18,10 @@ def setUp(self) -> None:
self.producer = FakeAIOKafkaProducer()
self.admin_client = FakeAIOKafkaAdmin()

self.topic = "test1"
self.key = "test_key"
self.value = "test_value"

async def _create_mock_topic(self):
await self.admin_client.create_topics(
new_topics=[
Expand All @@ -27,18 +31,6 @@ async def _create_mock_topic(self):
]
)

@pytest.fixture(autouse=True)
def topic(self):
self.topic = "test1"

@pytest.fixture(autouse=True)
def key(self):
self.key = "test_key"

@pytest.fixture(autouse=True)
def value(self):
self.value = "test_value"

async def test_produce_failed_topic_not_exist(self):
with pytest.raises(KafkaException):
await self.producer.send(
Expand Down Expand Up @@ -69,7 +61,7 @@ async def test_produce_fail_for_none_partition(self):
partition=None,
)

async def test_produce_once(self):
async def test_produce_once(self) -> None:
await self._create_mock_topic()
await self.producer.send(
headers={},
Expand All @@ -88,7 +80,7 @@ async def test_produce_once(self):
self.assertEqual(message.error(), None)
self.assertEqual(message.latency(), None)

async def test_send_and_wait(self):
async def test_send_and_wait(self) -> None:
await self._create_mock_topic()

await self.producer.start()
Expand Down
4 changes: 2 additions & 2 deletions tests/test_aiokafka/test_async_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from unittest import IsolatedAsyncioTestCase

import pytest
from aiokafka.admin import NewTopic
from aiokafka.admin import NewTopic # type: ignore[import-untyped]

from mockafka import Message
from mockafka.aiokafka import (
Expand Down Expand Up @@ -93,7 +93,7 @@ async def test_produce_with_kafka_setup_decorator(self):
@aproduce(topic="test_topic", partition=5, key="test_", value="test_value1")
@aproduce(topic="test_topic", partition=5, key="test_", value="test_value1")
@aconsume(topics=["test_topic"])
async def test_consumer_decorator(self, message: Message = None):
async def test_consumer_decorator(self, message: Message | None = None):
if message is None:
return

Expand Down
2 changes: 1 addition & 1 deletion tests/test_aiokafka/test_fake_aiokafka_admin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from unittest import IsolatedAsyncioTestCase

import pytest
from aiokafka.admin import NewTopic, NewPartitions
from aiokafka.admin import NewTopic, NewPartitions # type: ignore[import-untyped]

from mockafka.aiokafka.aiokafka_admin_client import FakeAIOKafkaAdmin
from mockafka.kafka_store import KafkaStore
Expand Down
2 changes: 1 addition & 1 deletion tests/test_async_mockafka.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import pytest
from aiokafka.admin import NewTopic
from aiokafka.admin import NewTopic # type: ignore[import-untyped]

from mockafka import aproduce, asetup_kafka, aconsume
from mockafka.aiokafka import (
Expand Down
5 changes: 3 additions & 2 deletions tests/test_docrators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from mockafka import FakeConsumer, produce, bulk_produce, setup_kafka, Message
from mockafka.admin_client import FakeAdminClientImpl, NewTopic
from mockafka.producer import FakeProducer
from mockafka.decorators.typing import MessageDict
from mockafka.decorators.consumer import consume
from unittest import TestCase

sample_for_bulk_produce = [
sample_for_bulk_produce: list[MessageDict] = [
{
"key": "test_key",
"value": "test_value",
Expand Down Expand Up @@ -98,7 +99,7 @@ def test_produce_with_kafka_setup_decorator(self):
@produce(topic="test_topic", partition=5, key="test_", value="test_value1")
@produce(topic="test_topic", partition=5, key="test_", value="test_value1")
@consume(topics=["test_topic"])
def test_consumer_decorator(self, message: Message = None):
def test_consumer_decorator(self, message: Message | None = None):
if message is None:
return

Expand Down
2 changes: 1 addition & 1 deletion tests/test_kafka_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from unittest import TestCase

from parameterized import parameterized
from parameterized import parameterized # type: ignore[import-untyped]

from mockafka.kafka_store import KafkaStore, KafkaException, Message

Expand Down
Loading