A FastAPI-style MQTT framework inspired by FastStream architecture.
FasterMQTT brings the elegant router pattern from FastAPI to MQTT, enabling clean subscription management with dependency injection, topic path parameters, and hierarchical routing.
- FastAPI Integration: Seamlessly integrates with FastAPI through lifespan management
- Decorator-based Subscriptions: Define handlers with
@router.subscribe("topic/{param}") - Topic Path Parameters: Automatic extraction like
client/{client_id}/control→client_id="abc123" - Dependency Injection: Full FastAPI
Depends()support in MQTT handlers - Hierarchical Routing: Nested routers with prefix accumulation via
include_router() - Shared Subscriptions: MQTT 5.0
$share/{group}/{topic}consumer groups - Middleware System: Onion model middleware for message interception
- Pydantic/SQLModel Support: Automatic serialization/deserialization of message payloads
- Type Safety: Full type hints throughout the codebase
pip install fastermqttfrom fastapi import FastAPI
from fastermqtt import MqttRouter
# Create root router with MQTT connection config
mqtt_router = MqttRouter(
host="localhost",
port=1883,
username="user",
password="password",
)
# Subscribe to a topic
@mqtt_router.subscribe("sensors/temperature")
async def handle_temperature(payload: bytes):
temperature = float(payload.decode())
print(f"Temperature: {temperature}")
# Integrate with FastAPI
app = FastAPI()
app.include_router(mqtt_router)Extract values from topic segments automatically:
@mqtt_router.subscribe("client/{client_id}/control")
async def handle_control(client_id: str, payload: bytes):
print(f"Command for client {client_id}: {payload}")Organize subscriptions with nested routers:
# Root router (manages MQTT connection)
mqtt_router = MqttRouter(host="localhost", port=1883)
# Sub-router (no connection config, shares parent's broker)
client_router = MqttRouter(prefix="client")
@client_router.subscribe("{client_id}/status")
async def handle_status(client_id: str, payload: bytes):
# Subscribes to: client/{client_id}/status
pass
# Include sub-router
mqtt_router.include_router(client_router)Use FastAPI's dependency injection in MQTT handlers:
from fastapi import Depends
from sqlmodel.ext.asyncio.session import AsyncSession
async def get_session() -> AsyncSession:
async with async_session_maker() as session:
yield session
SessionDep = Annotated[AsyncSession, Depends(get_session)]
@mqtt_router.subscribe("events/{event_type}")
async def handle_event(
event_type: str,
payload: bytes,
session: SessionDep,
):
# Save event to database
event = Event(type=event_type, data=payload.decode())
session.add(event)
await session.commit()# Publish from router (uses router's prefix)
client_router = MqttRouter(prefix="client/{client_id}/response")
await client_router.publish(
payload=b"OK",
client_id="abc123", # Replaces {client_id}
qos=1,
)
# Publishes to: client/abc123/response
# Publish directly via broker
from fastermqtt import MQTTBroker
await MQTTBroker.publish(
topic="notifications/alert",
payload=b"System alert!",
qos=2,
retain=True,
)Distribute messages across multiple service instances:
# Global default consumer group
mqtt_router = MqttRouter(
host="localhost",
port=1883,
default_consumer_group="workers", # All subscriptions use this group
)
# Per-subscription consumer group
@mqtt_router.subscribe("tasks/heavy", group="heavy-workers")
async def handle_heavy_task(payload: bytes):
# Only one instance in "heavy-workers" group receives each message
pass
# Force no shared subscription (override default)
@mqtt_router.subscribe("broadcast/all", group="")
async def handle_broadcast(payload: bytes):
# All instances receive every message
passfrom pydantic import BaseModel
from fastermqtt import encode_payload, decode_payload
class SensorData(BaseModel):
sensor_id: str
value: float
timestamp: int
# Encode for publishing
data = SensorData(sensor_id="temp-1", value=23.5, timestamp=1234567890)
payload = encode_payload(data) # Returns JSON bytes
# Decode in handler
@mqtt_router.subscribe("sensors/data")
async def handle_sensor_data(payload: bytes):
data = decode_payload(payload, SensorData)
print(f"Sensor {data.sensor_id}: {data.value}")Add cross-cutting concerns like logging and error handling:
from fastermqtt import (
BaseMQTTMiddleware,
MiddlewareChain,
LoggingMiddleware,
ErrorHandlingMiddleware,
MQTTMessage,
)
class MetricsMiddleware(BaseMQTTMiddleware):
async def on_receive(self, message: MQTTMessage, call_next):
start = time.time()
result = await call_next(message)
duration = time.time() - start
metrics.record("mqtt_message_duration", duration)
return result
# Build middleware chain
chain = MiddlewareChain()
chain.add(ErrorHandlingMiddleware())
chain.add(LoggingMiddleware(log_payload=True))
chain.add(MetricsMiddleware())The main router class that inherits from FastAPI's APIRouter.
MqttRouter(
host: str | None = None, # MQTT broker address (root router only)
port: int = 8883, # MQTT broker port
username: str | None = None, # Authentication username
password: str | None = None, # Authentication password
client_id: str | None = None, # Client ID (auto-generated if not provided)
keepalive: int = 60, # Heartbeat interval (seconds)
ssl_ca_cert: str | None = None, # SSL CA certificate path
clean_session: bool = True, # Whether to clean session on connect
default_consumer_group: str | None = None, # Default shared subscription group
prefix: str = "", # Topic prefix
)subscribe(topic, qos=0, group=None)- Decorator to register a subscription handlerpublish(payload, qos=0, retain=False, **path_params)- Publish a messageinclude_router(router, prefix="", ...)- Include a sub-router
Singleton manager for the MQTT connection (pure classmethod pattern).
# Lifecycle (called automatically by MqttRouter)
await MQTTBroker.start(config)
await MQTTBroker.stop()
# Publishing
await MQTTBroker.publish(topic, payload, qos=0, retain=False)
# Status
MQTTBroker.is_connected() # bool
MQTTBroker.is_initialized() # boolfrom fastermqtt import (
get_mqtt_message, # Get MQTTMessage object
get_mqtt_topic, # Get topic string
get_mqtt_payload, # Get raw payload bytes
get_mqtt_qos, # Get QoS level
get_topic_param, # Extract topic segment by index
)
# Type aliases for convenience
from fastermqtt import (
MqttMessageDep, # Annotated[MQTTMessage, Depends(get_mqtt_message)]
MqttTopicDep, # Annotated[str, Depends(get_mqtt_topic)]
MqttPayloadDep, # Annotated[bytes, Depends(get_mqtt_payload)]
MqttQosDep, # Annotated[int, Depends(get_mqtt_qos)]
)from fastermqtt import (
MQTTMessage, # Message container (topic, payload, qos, properties)
SubscriptionInfo, # Subscription metadata
MQTTConfig, # Connection configuration
)from fastermqtt import (
MQTTException, # Base exception
MQTTConnectionError, # Connection failures
MQTTSubscriptionError, # Subscription failures
MQTTPublishError, # Publish failures
MQTTSerializationError, # Serialization/deserialization errors
MQTTTopicError, # Topic pattern errors
MQTTMiddlewareError, # Middleware errors
MQTTRouterError, # Router configuration errors
MQTTNotInitializedError, # Broker not initialized
)FasterMQTT follows FastStream's architecture:
MqttRouter (inherits APIRouter)
├── Manages MQTTBroker lifecycle via lifespan
├── Supports include_router() for hierarchical routing
├── Prefix accumulation: sub-router topics prepend parent prefix
└── Shares broker across all routers
MQTTBroker (Singleton, pure classmethod)
├── Manages gmqtt Client connection
├── Dispatches messages to subscribers
├── FastAPI-style dependency injection via solve_dependencies
└── Topic parameter extraction via regex
mqtt_router = MqttRouter(
host="mqtt.example.com",
port=8883,
ssl_ca_cert="/path/to/ca.crt",
)mqtt_router = MqttRouter(
host="localhost",
port=1883,
clean_session=False, # Persist subscriptions across reconnects
)- Python 3.10+
- FastAPI
- gmqtt
- pydantic
- orjson (for JSON serialization)
MIT License
- FastStream - Inspiration for the router pattern architecture
- FastAPI - Dependency injection and router patterns
- gmqtt - Underlying MQTT client