Skip to content

feat: add multibrokers support#2867

Merged
Lancetnik merged 17 commits into
mainfrom
feat/526
May 27, 2026
Merged

feat: add multibrokers support#2867
Lancetnik merged 17 commits into
mainfrom
feat/526

Conversation

@Lancetnik
Copy link
Copy Markdown
Member

@Lancetnik Lancetnik commented May 13, 2026

Adds multibroker support across the application stack (internal application, FastStream app, ASGI integration) and updates related tests.

from faststream import FastStream, Logger
from faststream.nats import NatsBroker
from faststream.rabbit import RabbitBroker, RabbitResponse

rbroker = RabbitBroker()

@rbroker.subscriber("test")
async def handler_rmq(msg: str, logger: Logger) -> None:
    logger.info(msg)

nbroker = NatsBroker()

@nbroker.subscriber("test2")
@rbroker.publisher("test")
async def handler_nats(msg: str, logger: Logger):
    logger.info(msg)
    return RabbitResponse(msg)

app = FastStream(rbroker, nbroker)

@app.after_startup
async def _() -> None:
    await nbroker.publish("Hi!", "test2")


# testing
import pytest

from faststream import TestApp
from faststream.nats import TestNatsBroker
from faststream.rabbit import TestRabbitBroker

@pytest.mark.asyncio()
async def test_smoke():
    async with TestApp(app), TestNatsBroker(nbroker) as nbr, TestRabbitBroker(rbroker):
        await nbr.publish("Hi!", "test2")
        await handler_rmq.wait_call(3)
        handler_rmq.mock.assert_called_once_with("Hi!")

Changed files: faststream/_internal/application.py, faststream/app.py, faststream/asgi/app.py, plus tests in tests/application/test_delayed_broker.py, tests/asgi/testcase.py, and tests/cli/require_broker/test_app.py.

Closes #526

@Lancetnik Lancetnik requested a review from Sehat1137 as a code owner May 13, 2026 17:56
@Lancetnik Lancetnik marked this pull request as draft May 13, 2026 17:59
Lancetnik and others added 2 commits May 18, 2026 19:34
Apply the TestRabbitBroker(broker1, broker2) multi-broker pattern to
Kafka, Confluent, NATS, Redis, and MQTT test brokers. Fixes test
ordering so patches enter before TestApp and unmarks RabbitMQ exchange
publish tests that work in-memory.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added Confluent Issues related to `faststream.confluent` module AioKafka Issues related to `faststream.kafka` module NATS Issues related to `faststream.nats` module and NATS broker features Redis Issues related to `faststream.redis` module and Redis features MQTT Issues related to `faststream.mqtt` module labels May 18, 2026
Lancetnik and others added 2 commits May 25, 2026 23:48
Merging Redis Cluster support into the multibroker test client refactor
left TestRedisBroker referencing the removed self.broker attribute and
FakeProducer._build_child missing the brokers arg. Also fix
RedisClusterBroker startup_nodes defaulting to None, which broke the
iterable contract of _resolve_url_options.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Lancetnik Lancetnik marked this pull request as ready for review May 25, 2026 21:09
@Lancetnik Lancetnik requested a review from borisalekseev as a code owner May 25, 2026 21:09
Lancetnik and others added 4 commits May 26, 2026 00:26
…rker

Replace stderr-scrape assertion with a filesystem-marker poll. The
multi-worker spawn on macOS could lose one worker's stderr line inside
the wait_for_stderr window, causing repeated reruns. Workers now write
the resolved log level to a temp file that the test polls.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread tests/brokers/base/consume.py
Lancetnik and others added 3 commits May 26, 2026 23:30
Add an "MQTT" tab as the last entry in every broker comparison tab block
across the English documentation and shared includes, alongside new
docs_src/.../mqtt/ example files mirroring the existing Kafka/Rabbit/NATS/
Redis examples. Where sibling tabs use {!> docs_src/... !} includes, MQTT
follows the same convention; inline-only blocks stay inline.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirror the Redis test case in every tests/docs/.../test_*.py so the new
MQTT example files get exercised, and add the four docs_src/mqtt example
files that the previous pass left missing (broker_context, direct_testing,
pydantic_annotated_fields, delay_equal). routers/index.md gets its
delay_equal MQTT block converted from inline to a file include.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added documentation Improvements or additions to documentation dependencies Pull requests that update a dependency file labels May 26, 2026
Lancetnik and others added 4 commits May 27, 2026 00:02
* feat(asgi): multi-broker support in AsyncAPI try-it-out

* pre-commit fixes

* fix(asgi): make TryItOutForm fields required again

* feat(asgi): improve try-it-out dispatching to support exact channel matching

The `TryItOutProcessor` now attempts to match the exact AsyncAPI channel
name before falling back to destination-based matching. This allows
more precise routing in multi-broker setups where different brokers
might share similar destination prefixes but have distinct AsyncAPI
channel identifiers.

Added `_iter_broker_channels` to extract full channel keys from broker
subscribers and publishers.

* refactor(confluent): use composite key for group ID tracking in tests

Update the testing utility to use a tuple of the handler's configuration
ID and the group ID when tracking published groups. This prevents
collisions when multiple handlers share the same group ID but belong to
different configurations during testing.

* fix(confluent): simplify group ID tracking in testing utility

Revert the composite key approach in `faststream/confluent/testing.py` to
use the raw `group_id` for tracking published groups.

Additionally, update `tests/brokers/base/consume.py` to use `call_count`
instead of an event flag for more reliable consumption synchronization
in multibroker tests.

* chore(confluent): fix whitespace in testing utility

* feat(asgi): support multiple brokers in try_it_out processor

Update the `TryItOutProcessor` to handle multiple brokers of the same type
when using test brokers. This allows the AsyncAPI "try it out" feature
to correctly wrap all relevant brokers in the test context, enabling
multi-broker testing scenarios (e.g., multiple Kafka brokers) instead of
being limited to a single broker per test class.

Also update the multibroker test suite to remove Redis dependencies and
use multiple Kafka brokers for validation.

* refactor(fastapi): wrap broker in list for TryItOutProcessor

Pass `self.broker` as a single-element list to `TryItOutProcessor` to
ensure compatibility with the updated processor that expects an
iterable of brokers. This change aligns the FastAPI router with the
new multi-broker support in the `try_it_out` functionality.

* refactor(asgi): change TryItOutProcessor to accept variable arguments

Update TryItOutProcessor to accept multiple brokers using `*args` instead of an iterable. This allows for a more flexible API when initializing the processor and simplifies the call sites in the FastAPI router and AsyncAPI factory.

- Update `TryItOutProcessor.__init__` to use `*brokers`
- Update `StreamRouter` to pass `self.broker` directly
- Update `make_try_it_out_handler` to unpack `brokers`
- Update tests to reflect the change from passing an empty list to passing no arguments

---------

Co-authored-by: Pastukhov Nikita <nikita@pastukhov-dev.ru>
Make root docker-compose.yaml the source of truth: update quick-start
docker run commands (rabbitmq:3.13-alpine, redis:alpine, artemis for
MQTT) and mirror all broker services (rabbitmq, kafka, nats, redis,
redis-cluster, artemis) in docs/includes/docker-compose.yaml.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Lancetnik Lancetnik added this pull request to the merge queue May 27, 2026
Merged via the queue into main with commit bbe5ec6 May 27, 2026
31 checks passed
@Lancetnik Lancetnik deleted the feat/526 branch May 27, 2026 21:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AioKafka Issues related to `faststream.kafka` module Confluent Issues related to `faststream.confluent` module dependencies Pull requests that update a dependency file documentation Improvements or additions to documentation MQTT Issues related to `faststream.mqtt` module NATS Issues related to `faststream.nats` module and NATS broker features Redis Issues related to `faststream.redis` module and Redis features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Using multiple brokers

3 participants