Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sifex committed May 18, 2024
2 parents d86155f + c555f0f commit 55d85c8
Show file tree
Hide file tree
Showing 65 changed files with 1,235 additions and 269 deletions.
2 changes: 1 addition & 1 deletion .codespell-whitelist.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
dependant
dependant
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ search:
- [KafkaRouter](public_api/faststream/kafka/KafkaRouter.md)
- [TestApp](public_api/faststream/kafka/TestApp.md)
- [TestKafkaBroker](public_api/faststream/kafka/TestKafkaBroker.md)
- [TopicPartition](public_api/faststream/kafka/TopicPartition.md)
- nats
- [AckPolicy](public_api/faststream/nats/AckPolicy.md)
- [ConsumerConfig](public_api/faststream/nats/ConsumerConfig.md)
Expand Down Expand Up @@ -495,6 +496,7 @@ search:
- [KafkaRouter](api/faststream/kafka/KafkaRouter.md)
- [TestApp](api/faststream/kafka/TestApp.md)
- [TestKafkaBroker](api/faststream/kafka/TestKafkaBroker.md)
- [TopicPartition](api/faststream/kafka/TopicPartition.md)
- broker
- [KafkaBroker](api/faststream/kafka/broker/KafkaBroker.md)
- broker
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/kafka/TopicPartition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: aiokafka.structs.TopicPartition
59 changes: 59 additions & 0 deletions docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,65 @@ hide:
---

# Release Notes
## 0.5.6

### What's Changed

* feature: add --factory param by [@Sehat1137](https://github.com/Sehat1137){.external-link target="_blank"} in [#1440](https://github.com/airtai/faststream/pull/1440){.external-link target="_blank"}
* feat: add RMQ channels options, support for prefix for routing_key, a… by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1448](https://github.com/airtai/faststream/pull/1448){.external-link target="_blank"}
* feature: Add `from faststream.rabbit.annotations import Connection, Channel` shortcuts
* Bugfix: RabbitMQ RabbitRouter prefix now affects to queue routing key as well
* Feature (close #1402): add `broker.add_middleware` public API to append a middleware to already created broker
* Feature: add `RabbitBroker(channel_number: int, publisher_confirms: bool, on_return_raises: bool)` options to setup channel settings
* Feature (close #1447): add `StreamMessage.batch_headers` attribute to provide with access to whole batch messages headers

### New Contributors

* [@Sehat1137](https://github.com/Sehat1137){.external-link target="_blank"} made their first contribution in [#1440](https://github.com/airtai/faststream/pull/1440){.external-link target="_blank"}

**Full Changelog**: [#0.5.5...0.5.6](https://github.com/airtai/faststream/compare/0.5.5...0.5.6){.external-link target="_blank"}

## 0.5.5

### What's Changed

Add support for explicit partition assignment in aiokafka `KafkaBroker` (special thanks to @spataphore1337):

```python
from faststream import FastStream
from faststream.kafka import KafkaBroker, TopicPartition

broker = KafkaBroker()

topic_partition_fisrt = TopicPartition("my_topic", 1)
topic_partition_second = TopicPartition("my_topic", 2)

@broker.subscribe(partitions=[topic_partition_fisrt, topic_partition_second])
async def some_consumer(msg):
...
```

* Update Release Notes for 0.5.4 by @faststream-release-notes-updater in [#1421](https://github.com/airtai/faststream/pull/1421){.external-link target="_blank"}
* feature: manual partition assignment to Kafka by [@spataphore1337](https://github.com/spataphore1337){.external-link target="_blank"} in [#1422](https://github.com/airtai/faststream/pull/1422){.external-link target="_blank"}
* Chore/update deps by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1429](https://github.com/airtai/faststream/pull/1429){.external-link target="_blank"}
* Fix/correct dynamic subscriber registration by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1433](https://github.com/airtai/faststream/pull/1433){.external-link target="_blank"}
* chore: bump version by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1435](https://github.com/airtai/faststream/pull/1435){.external-link target="_blank"}


**Full Changelog**: [#0.5.4...0.5.5](https://github.com/airtai/faststream/compare/0.5.4...0.5.5){.external-link target="_blank"}

## 0.5.4

### What's Changed

* Update Release Notes for 0.5.3 by @faststream-release-notes-updater in [#1400](https://github.com/airtai/faststream/pull/1400){.external-link target="_blank"}
* fix (#1415): raise SetupError if rpc and reply_to are using in TestCL… by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1419](https://github.com/airtai/faststream/pull/1419){.external-link target="_blank"}
* Chore/update deps2 by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1418](https://github.com/airtai/faststream/pull/1418){.external-link target="_blank"}
* refactor: correct security with kwarg params merging by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1417](https://github.com/airtai/faststream/pull/1417){.external-link target="_blank"}
* fix (#1414): correct Message.ack error processing by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1420](https://github.com/airtai/faststream/pull/1420){.external-link target="_blank"}

**Full Changelog**: [#0.5.3...0.5.4](https://github.com/airtai/faststream/compare/0.5.3...0.5.4){.external-link target="_blank"}

## 0.5.3

### What's Changed
Expand Down
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.3"
__version__ = "0.5.6"

SERVICE_NAME = f"faststream-{__version__}"

Expand Down
2 changes: 1 addition & 1 deletion faststream/asyncapi/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def serve_app(
) -> None:
"""Serve the HTTPServer with AsyncAPI schema."""
logger.info(f"HTTPServer running on http://{host}:{port} (Press CTRL+C to quit)")
logger.warn("Please, do not use it in production.")
logger.warning("Please, do not use it in production.")

server.HTTPServer(
(host, port),
Expand Down
26 changes: 21 additions & 5 deletions faststream/broker/acknowledgement_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ def __init__(
self,
message: "StreamMessage[MsgType]",
watcher: BaseWatcher,
logger: Optional["LoggerProto"] = None,
**extra_options: Any,
) -> None:
self.watcher = watcher
self.message = message
self.extra_options = extra_options
self.logger = logger

async def __aenter__(self) -> None:
self.watcher.add(self.message.message_id)
Expand Down Expand Up @@ -172,15 +174,29 @@ async def __aexit__(
return not is_test_env()

async def __ack(self) -> None:
await self.message.ack(**self.extra_options)
self.watcher.remove(self.message.message_id)
try:
await self.message.ack(**self.extra_options)
except Exception as er:
if self.logger is not None:
self.logger.log(logging.ERROR, er, exc_info=er)
else:
self.watcher.remove(self.message.message_id)

async def __nack(self) -> None:
await self.message.nack(**self.extra_options)
try:
await self.message.nack(**self.extra_options)
except Exception as er:
if self.logger is not None:
self.logger.log(logging.ERROR, er, exc_info=er)

async def __reject(self) -> None:
await self.message.reject(**self.extra_options)
self.watcher.remove(self.message.message_id)
try:
await self.message.reject(**self.extra_options)
except Exception as er:
if self.logger is not None:
self.logger.log(logging.ERROR, er, exc_info=er)
else:
self.watcher.remove(self.message.message_id)


def get_watcher(
Expand Down
13 changes: 13 additions & 0 deletions faststream/broker/core/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ def __init__(
self._parser = parser
self._decoder = decoder

def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
"""Append BrokerMiddleware to the end of middlewares list.
Current middleware will be used as a most inner of already existed ones.
"""
self._middlewares = (*self._middlewares, middleware)

for sub in self._subscribers.values():
sub.add_middleware(middleware)

for pub in self._publishers.values():
pub.add_middleware(middleware)

@abstractmethod
def subscriber(
self,
Expand Down
2 changes: 1 addition & 1 deletion faststream/broker/core/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ async def publish(
**kwargs: Any,
) -> Optional[Any]:
"""Publish message directly."""
assert producer, NOT_CONNECTED_YET # nosec B101)
assert producer, NOT_CONNECTED_YET # nosec B101

publish: "AsyncFunc" = producer.publish
for m in self._middlewares:
Expand Down
2 changes: 2 additions & 0 deletions faststream/broker/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
TYPE_CHECKING,
Any,
Generic,
List,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -38,6 +39,7 @@ class StreamMessage(Generic[MsgType]):

body: Union[bytes, Any]
headers: "AnyDict" = field(default_factory=dict)
batch_headers: List["AnyDict"] = field(default_factory=list)
path: "AnyDict" = field(default_factory=dict)

content_type: Optional[str] = None
Expand Down
3 changes: 3 additions & 0 deletions faststream/broker/publisher/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class PublisherProto(
_middlewares: Iterable["PublisherMiddleware"]
_producer: Optional["ProducerProto"]

@abstractmethod
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None: ...

@staticmethod
@abstractmethod
def create() -> "PublisherProto[MsgType]":
Expand Down
10 changes: 9 additions & 1 deletion faststream/broker/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
from faststream.asyncapi.message import get_response_schema
from faststream.asyncapi.utils import to_camelcase
from faststream.broker.publisher.proto import PublisherProto
from faststream.broker.types import MsgType, P_HandlerParams, T_HandlerReturn
from faststream.broker.types import (
BrokerMiddleware,
MsgType,
P_HandlerParams,
T_HandlerReturn,
)
from faststream.broker.wrapper.call import HandlerCallWrapper

if TYPE_CHECKING:
Expand Down Expand Up @@ -87,6 +92,9 @@ def __init__(
self.include_in_schema = include_in_schema
self.schema_ = schema_

def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)

@override
def setup( # type: ignore[override]
self,
Expand Down
3 changes: 3 additions & 0 deletions faststream/broker/subscriber/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class SubscriberProto(
_broker_middlewares: Iterable["BrokerMiddleware[MsgType]"]
_producer: Optional["ProducerProto"]

@abstractmethod
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None: ...

@staticmethod
@abstractmethod
def create() -> "SubscriberProto[MsgType]":
Expand Down
3 changes: 3 additions & 0 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def __init__(
self.description_ = description_
self.include_in_schema = include_in_schema

def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)

@override
def setup( # type: ignore[override]
self,
Expand Down
1 change: 1 addition & 0 deletions faststream/broker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def get_watcher_context(
return partial(
WatcherContext,
watcher=get_watcher(logger, retry),
logger=logger,
**extra_options,
)

Expand Down
7 changes: 1 addition & 6 deletions faststream/broker/wrapper/call.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
from functools import wraps
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -186,13 +185,9 @@ def set_wrapped(
def _wrap_decode_message(
func: Callable[..., Awaitable[T_HandlerReturn]],
params_ln: int,
) -> Callable[
["StreamMessage[MsgType]"],
Awaitable[T_HandlerReturn],
]:
) -> Callable[["StreamMessage[MsgType]"], Awaitable[T_HandlerReturn]]:
"""Wraps a function to decode a message and pass it as an argument to the wrapped function."""

@wraps(func)
async def decode_wrapper(message: "StreamMessage[MsgType]") -> T_HandlerReturn:
"""A wrapper function to decode and handle a message."""
msg = message.decoded_body
Expand Down
19 changes: 16 additions & 3 deletions faststream/cli/docs/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def serve(
" Defaults to the current working directory."
),
),
is_factory: bool = typer.Option(
False, "--factory", help="Treat APP as an application factory"
),
) -> None:
"""Serve project AsyncAPI schema."""
if ":" in app:
Expand All @@ -66,18 +69,18 @@ def serve(

except ImportError:
warnings.warn(INSTALL_WATCHFILES, category=ImportWarning, stacklevel=1)
_parse_and_serve(app, host, port)
_parse_and_serve(app, host, port, is_factory)

else:
WatchReloader(
target=_parse_and_serve,
args=(app, host, port),
args=(app, host, port, is_factory),
reload_dirs=(str(module_parent),),
extra_extensions=extra_extensions,
).run()

else:
_parse_and_serve(app, host, port)
_parse_and_serve(app, host, port, is_factory)


@docs_app.command(name="gen")
Expand All @@ -104,12 +107,19 @@ def gen(
" Defaults to the current working directory."
),
),
is_factory: bool = typer.Option(
False,
"--factory",
help="Treat APP as an application factory",
),
) -> None:
"""Generate project AsyncAPI schema."""
if app_dir: # pragma: no branch
sys.path.insert(0, app_dir)

_, app_obj = import_from_string(app)
if callable(app_obj) and is_factory:
app_obj = app_obj()
raw_schema = get_app_schema(app_obj)

if yaml:
Expand Down Expand Up @@ -138,9 +148,12 @@ def _parse_and_serve(
app: str,
host: str = "localhost",
port: int = 8000,
is_factory: bool = False,
) -> None:
if ":" in app:
_, app_obj = import_from_string(app)
if callable(app_obj) and is_factory:
app_obj = app_obj()
raw_schema = get_app_schema(app_obj)

else:
Expand Down
Loading

0 comments on commit 55d85c8

Please sign in to comment.