Eventing solved. Kafka, outbox, DLQ, idempotency β behind a clean Pythonic interface.
Stop wrestling with Kafka producers, consumer groups, offset management, dead-letter queues, and transactional outbox plumbing. messagekit hides all that complexity under a simple, elegant Pythonic interface β so you ship domain events, not infrastructure code.
# This is all it takes to publish a transactional event
async with db_session.begin():
order = Order.create(cart)
db_session.add(order)
await outbox.publish(OrderCreated(order_id=order.id))
# Kafka delivery, retries, DLQ β handled.- Installation
- When to use this package
- Comparison with alternatives
- Scope
- Quick start: transactional outbox
- Setup
- Advanced: EventBus
- Documentation
- Local development
π Full documentation β comprehensive guides and API reference
pip install messagekit
# or
poetry add messagekitImport package name: messagekit (distribution is messagekit)
from messagekit.core import BaseEvent
from messagekit.infrastructure import SqlAlchemyOutboxRepositoryRequirements:
- Python 3.12+
- PostgreSQL (outbox persistence)
- Kafka Connect with Debezium CDC (publishing infrastructure)
Support scale: β none, β
basic, β
β
strong, β
β
β
first-class
messagekit prioritizes durable messaging (transactional outbox + CDC) and Kafka/RabbitMQ integration over in-process event simplicity:
| Capability | messagekit |
pyventus |
fastapi-events |
Notes |
|---|---|---|---|---|
| Transactional outbox | β β β | β | β | Durable local DB plus outbox boundary is a core feature here |
| Kafka data plane | β β β | β | β | This package is built for Kafka-backed microservice messaging |
| DLQ handling | β β β | β | β | Leverages native RabbitMQ DLX and Kafka Connect DLQ SMT with database bookkeeping |
| Health checks for eventing runtime | β β β | β | β | Outbox health checks plus FastStream ASGI broker health endpoint |
| Typed cross-service event contracts | β β | β | β β | messagekit and fastapi-events are stronger on explicit payload modeling |
| Decorator subscriber registration | β β | β β β | β β | EventBus.subscriber(...) exists now; pyventus is still the most polished here |
| In-process dispatch backend abstraction | β β | β β β | β | DispatchBackend exists here; pyventus offers a broader processor model |
| Lifecycle hooks / callbacks | β β | β β β | β | DispatchHooks covers dispatch, success, failure, disabled, and debug |
| Debug / disable controls | β β | β β | β β β | DispatchSettings(enabled, debug) is implemented; fastapi-events is strongest for app-level toggling |
| Observability / telemetry polish | β β | β | β β β | FastStream native middlewares (KafkaTelemetryMiddleware, KafkaPrometheusMiddleware) integrated |
| Resilience middleware | β β | β | β | CircuitBreakerMiddleware (prevents cascading failures) and RateLimiterMiddleware (throttles consumption rate) |
| CDC-based outbox publishing | β β β | β | β | Kafka Connect with Debezium CDC handles outbox-to-Kafka publishing |
| Consumer dedup helper | β β β | β | β | IdempotentConsumerBase now uses a durable processed-message store instead of process memory |
| Durable cross-service idempotency | β β β | β | β | IProcessedMessageStore plus SqlAlchemyProcessedMessageStore provide transactional duplicate protection |
| Consumer batch handling | β | β | β β β | fastapi-events supports handle_many(...); this package stays one-message-per-consume today |
| FastAPI-local event flow | β | β | β β β | This package intentionally avoids request-lifecycle middleware eventing |
Use messagekit if you need:
- Guaranteed event delivery via transactional outbox pattern
- Kafka-based microservice messaging with CDC publishing
- Dead letter queue handling with database bookkeeping
- Idempotent consumer patterns with durable deduplication
- Native broker integration (FastStream, Debezium CDC, RabbitMQ DLX)
Consider alternatives if:
- Simple in-process events only β
pyventus - FastAPI request-scoped events β
fastapi-events - Non-Kafka message brokers without CDC support
- No need for durable outbox persistence
Included:
- Transactional outbox primitives (write-side only; CDC handles publishing)
- Event contracts and registry
- Kafka/RabbitMQ consumer base classes with idempotency
- Native broker integration (Kafka Connect CDC, RabbitMQ DLX, FastStream middlewares)
- In-process emitter/subscriber facade and hooks
- DLQ bookkeeping consumer for database flag synchronization
NOT included (delegated to external systems):
- Event publishing (handled by Kafka Connect with Debezium CDC)
- Message broker infrastructure setup (use official Kafka/RabbitMQ documentation)
- Schema registry management (use Confluent Schema Registry or alternatives)
- Request-scoped FastAPI event middleware (intentionally avoided)
- Consumer batch handling (use
fastapi-eventsif needed)
π Integration Guide - Step-by-step integration instructions
π API Reference - Complete API documentation
π Event Catalog - Available event types and contracts
- Transactional Outbox Pattern - Guaranteed event delivery (PRIMARY)
- Cross-Service Communication - Database isolation, Kafka/RabbitMQ architecture, production deployment
- Idempotent Consumers - Duplicate message handling
- Health Checks - Monitoring outbox and broker status
Architecture note: This package handles the write side of the outbox pattern (persisting events transactionally with business data). Publishing is delegated to Kafka Connect with Debezium CDC, which captures outbox table changes and publishes to Kafka. The bridge component (part of standard architecture) forwards events from Kafka to RabbitMQ for services preferring AMQP. Dead letter handling leverages native broker mechanisms (RabbitMQ DLX, Kafka Connect DLQ SMT) with a minimal bookkeeping consumer to maintain database failed-event flags.
Cross-service communication pattern: Each microservice has its own PostgreSQL database (database-per-service). Services communicate via Kafka (shared event backbone) and RabbitMQ (dual-broker pattern via bridge).
flowchart LR
subgraph Service_A["Service A (Producer)"]
AppA[FastAPI Route]
DB1[(PostgreSQL A<br/>outbox_events)]
end
subgraph Infrastructure["Shared Infrastructure"]
CDC[Kafka Connect CDC]
Kafka([Apache Kafka<br/>Topic: events])
Bridge[Eventing Bridge<br/>Kafka β RabbitMQ]
RMQ([RabbitMQ<br/>Exchange: events])
end
subgraph Service_B["Service B (Kafka Consumer)"]
Handler1[Event Handler]
DB2[(PostgreSQL B<br/>processed_messages)]
end
subgraph Service_C["Service C (RabbitMQ Consumer)"]
Handler2[Event Handler]
DB3[(PostgreSQL C<br/>processed_messages)]
end
DLQ([DLQ: Native Broker])
%% Primary Path (Kafka)
AppA -->|1. Atomic Write| DB1
DB1 -.->|2. CDC Monitor| CDC
CDC -->|3. Publish| Kafka
Kafka -->|4. Subscribe| Handler1
Handler1 -->|5. Dedup Check| DB2
%% Bridge Path (RabbitMQ)
Kafka -.->|6. Bridge Forward| Bridge
Bridge -->|7. Publish| RMQ
RMQ -.->|8. Subscribe| Handler2
Handler2 -->|9. Dedup Check| DB3
%% Error Flow
Handler1 -.->|Exception| DLQ
Handler2 -.->|Exception| DLQ
%% Styling
classDef app fill:#4CAF50,stroke:#2E7D32,color:#fff
classDef db fill:#2196F3,stroke:#1565C0,color:#fff
classDef broker fill:#FF9800,stroke:#E65100,color:#fff
classDef infra fill:#9C27B0,stroke:#6A1B9A,color:#fff
classDef error fill:#F44336,stroke:#C62828,color:#fff
class AppA,Handler1,Handler2 app
class DB1,DB2,DB3 db
class Kafka,RMQ broker
class CDC,Bridge infra
class DLQ error
Key points:
- Each service has separate PostgreSQL database (DB1, DB2, DB3)
- Kafka is the shared event bus connecting all services
- Kafka Connect CDC watches Service A's outbox and publishes to Kafka
- Service B consumes directly from Kafka
- Bridge forwards Kafka β RabbitMQ for services preferring AMQP
- Service C consumes from RabbitMQ
- Both Kafka and RabbitMQ are part of the standard architecture
Guarantees:
- Write Phase: β Atomic (business data + event in same transaction, same database)
- Publish Phase: β At-least-once (CDC retries on failure)
- Consume Phase: β Exactly-once (idempotency via processed message store in consumer's database)
π Cross-Service Communication Guide - Detailed explanation with production deployment patterns
Critical architecture principle: Each service maintains its own PostgreSQL database (database-per-service pattern). Services do NOT directly access each other's databases. Kafka acts as the shared event bus connecting isolated services.
Complete event flow:
- Service A (Producer) writes event to its own
outbox_eventstable (in postgres-a) - Kafka Connect CDC monitors Service A's outbox table via PostgreSQL WAL
- CDC publishes event to Kafka topic (shared infrastructure)
- Service B (Consumer) subscribes to Kafka topic, processes event, stores idempotency check in its own
processed_messagestable (in postgres-b) - Bridge Service forwards events from Kafka β RabbitMQ (part of standard architecture)
- Service C (Consumer) subscribes to RabbitMQ exchange, processes event, stores idempotency check in its own
processed_messagestable (in postgres-c)
Result: Services remain isolated (no shared database), communicate via Kafka, and maintain exactly-once processing guarantees via their own idempotency stores.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β YOUR SERVICE (FastAPI) β
β β
β ββββββββββββββββ ββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Routes ββββ¬βββΆβ messagekit (SHARED CONTRACTS) β β
β ββββββββββββββββ β β β β
β β β β’ BaseEvent (Pydantic base) β β
β β β β’ IOutboxRepository (Protocol) β β
β β β β’ IProcessedMessageStore (Protocol) β β
β β βββββββββββββββββββββ¬ββββββββββββββββββββββββββββ β
β β β Uses contracts β
β β βΌ β
β β ββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β messagekit (implementations) β β
β β β β β
β β β ββββββββββββββββββββββββββββββββββββββββββ β β
β β β β WRITE SIDE (Primary) β β β
β βββββΌβββ β’ SqlAlchemyOutboxRepository β β β
β β β (implements IOutboxRepository) β β β
β β β β’ OutboxEventHandler β β β
β β β β β β
β β β OPTIONAL (In-process dispatch) β β β
β β β β’ EventBus (NOT wired to outbox) β β β
β β ββββββββββββββββββββββββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββ
β Writes to
βΌ
βββββββββββββββββββββββ
β PostgreSQL DB β
β β
β βββββββββββββββββββ β
β β Business Tables β β
β βββββββββββββββββββ β
β βββββββββββββββββββ ββββ
β β outbox_event_ β β β Monitors
β β record β β β (WAL)
β ββββββββββ¬βββββββββ β β
β ββββββββββΌβββββββββ β β
β β processed_ β β β
β β messages β β β
β βββββββββββββββββββ β β
βββββββββββββββββββββββ β
β β
βββββββββββββββββββββΌβββββββββββββββ
β β
β βΌ
β ββββββββββββββββββββββββββββββββββββ
β β EXTERNAL INFRASTRUCTURE β
β β (Not provided by this package) β
β β β
β β Kafka Connect + Debezium CDC β
β β - PostgreSQL Connector β
β β - Outbox Event Router SMT β
β ββββββββββββββββ¬ββββββββββββββββββββ
β β Publishes
β βΌ
β ββββββββββββββββ
β β Kafka β
β β Topics β
β ββββ¬βββββββββ¬βββ
β β β
β β β
Bookkeeping β β
updates β β
β β β
β βΌ βΌ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β FASTSTREAM BROKER LAYER β
β β (Broker abstraction + middleware) β
β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β KafkaBroker (from faststream.confluent) β β
β β β β’ @broker.subscriber("topic") decorators β β
β β β β’ broker.publish() wrapper β β
β β β β’ Pydantic auto-deserialization β β
β β β β’ make_ping_asgi health endpoint β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β MIDDLEWARE STACK (Wired) β β
β β β β’ CircuitBreakerMiddleware (resilience) β β
β β β β’ RateLimiterMiddleware (optional, disabled) β β
β β β β’ KafkaPrometheusMiddleware (metrics) β β
β β β β’ KafkaTelemetryMiddleware (OTel) β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββ β
β ββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββ
β β
β β Consumes
β βΌ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β DOWNSTREAM SERVICES β
β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β READ SIDE (messagekit) β β
β β β β β
β β β @broker.subscriber("user.created") β β
β β β async def handle(event: UserCreated): β β
β β β # FastStream deserializes to Pydantic β β
β β β ... β β
β β β β β
β β β OR use IdempotentConsumerBase for: β β
β β β β’ Programmatic idempotency control β β
β β β β’ Dynamic/polymorphic event types β β
β β β β’ Legacy dict-based handling β β
β β β β β
β β β Components: β β
β β β β’ EventRegistry (event typeβclass mapping) β β
β β β β’ SqlAlchemyProcessedMessageStore β β
β β β (implements IProcessedMessageStore) β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β BRIDGE COMPONENT (Standard Architecture) β β
β β β β’ BridgeConsumer (KafkaβRabbitMQ bridge) β β
β β β (manual idempotency, not base class) β β
β β β β’ Part of dual-broker event distribution β β
β β β β β
β β β RabbitMQ Broker Middleware Stack: β β
β β β β’ CircuitBreakerMiddleware (resilience) β β
β β β β’ RateLimiterMiddleware (optional) β β
β β β β’ RabbitPrometheusMiddleware (metrics) β β
β β β β’ TelemetryMiddleware (OTel, shared) β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β β On failure β
β β βΌ β
β β ββββββββββββββββββββ β
β β β EXTERNAL DLQ β β
β β β (broker native) β β
β β β β’ Kafka Connect β β
β β β DLQ SMT β β
β β β β’ RabbitMQ DLX β β
β β βββββββββββ¬βββββββββ β
β β β β
β β βΌ β
β β ββββββββββββββββββββ β
β β β DLQ Bookkeeper β β
β β β Consumer β β
β β β (@broker.sub) β β
β β ββββββββββββββββββββ β
β β β β
ββββββΌβββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββ
β Admin/Ops API β
β β’ DLQAdminService β
β β’ Replay API β
β β’ Health Checks β
βββββββββββββββββββββββ
FastStream is the Pythonic adapter that wraps low-level Kafka/RabbitMQ clients (like confluent-kafka, aio-pika) and provides a decorator-based, async/await, type-safe API.
Think of it like SQLAlchemy for message brokers - it hides 200+ lines of boilerplate:
# WITHOUT FastStream (raw Kafka client)
from confluent_kafka import Consumer
consumer = Consumer({'bootstrap.servers': 'localhost:9092', ...})
consumer.subscribe(['user.created'])
while True:
msg = consumer.poll(1.0)
if msg:
event_dict = json.loads(msg.value().decode())
event = UserCreatedEvent(**event_dict) # Manual conversion
await handle_user_created(event)
consumer.commit(msg)
# WITH FastStream (what you actually write)
from faststream.confluent import KafkaBroker
broker = KafkaBroker("localhost:9092")
@broker.subscriber("user.created")
async def handle_user_created(event: UserCreatedEvent): # β
Already a Pydantic model!
# FastStream automatically:
# - Polls Kafka, decodes bytes β JSON β Pydantic
# - Runs middleware (telemetry, circuit breaker, rate limiter)
# - Commits offset on success, sends to DLQ on failure
print(f"User {event.user_id} created!")What FastStream provides:
- π― Decorator-based subscriptions:
@broker.subscriber("topic") - π Automatic Pydantic conversion: Type hints β auto-deserialization
- π Middleware hooks: Circuit breakers, rate limiting, telemetry, metrics
- π₯ Native health checks:
make_ping_asgiendpoint for broker status - π Unified API: Same code works for Kafka, RabbitMQ, NATS, Redis Streams
messagekit uses FastStream for all Kafka/RabbitMQ interactions, giving you a clean, Pythonic API while handling all the low-level complexity.
The outbox table stores events transactionally with your business data:
CREATE TABLE outbox_events (
event_id VARCHAR(36) PRIMARY KEY,
event_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
published BOOLEAN DEFAULT FALSE NOT NULL,
failed BOOLEAN DEFAULT FALSE NOT NULL,
attempt_count INTEGER DEFAULT 0 NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
published_at TIMESTAMPTZ,
failed_at TIMESTAMPTZ,
error_message TEXT
);
CREATE INDEX idx_outbox_unpublished ON outbox_events (published, created_at);Initialize the outbox repository and event bus at application startup:
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine
from messagekit.infrastructure import SqlAlchemyOutboxRepository
from messagekit.core import build_event_bus
app = FastAPI()
@app.on_event("startup")
async def startup():
# Database engine
engine = create_async_engine("postgresql+asyncpg://...")
# Outbox repository
outbox_repo = SqlAlchemyOutboxRepository(engine)
app.state.outbox_repository = outbox_repo
# Optional: EventBus for advanced patterns
event_bus = build_event_bus()
app.state.event_bus = event_busCDC Publishing: Kafka Connect with Debezium CDC automatically detects outbox table changes and publishes to Kafka. See debezium-cdc-architecture.md for configuration.
The core pattern is the transactional outbox - persist events atomically with your business data:
from fastapi import Depends
from messagekit.core import BaseEvent
from messagekit.infrastructure import SqlAlchemyOutboxRepository
# Define domain event
class UserCreated(BaseEvent):
event_type: str = "user.created"
aggregate_id: str
user_id: int
email: str
# Simple, direct approach (recommended)
@app.post("/users")
async def create_user(
data: CreateUserRequest,
session = Depends(get_session),
outbox_repo: SqlAlchemyOutboxRepository = Depends(get_outbox_repo)
):
# 1. Business logic
user = User(**data.dict())
session.add(user)
# 2. Persist event to outbox (same transaction)
await outbox_repo.add_event(
UserCreated(
aggregate_id=f"user-{user.id}",
user_id=user.id,
email=user.email,
),
session=session
)
# 3. Commit both atomically
await session.commit()
# 4. Kafka Connect (Debezium CDC) detects the outbox insert and publishes to Kafka
return {"user_id": user.id}Result: Guaranteed delivery, no lost events, atomic writes.
For decoupled architectures with multiple side effects per event, use the EventBus abstraction layer:
from messagekit.core import BaseEvent
from messagekit.infrastructure import OutboxEventHandler
# Access EventBus (initialized at startup)
event_bus = request.app.state.event_bus
outbox_repo = request.app.state.outbox_repository
# Register handler (typically at startup)
event_bus.register(UserCreated, OutboxEventHandler(outbox_repo))
# Dispatch (same result as direct add_event, but decoupled)
await event_bus.dispatch(UserCreated(...))When to use EventBus:
- β Multiple side effects per event (audit, metrics, cache)
- β Need lifecycle hooks for observability
- β Testing isolation (enable/disable toggle)
- β Decorator-based handler registration
When NOT needed:
- β Simple event persistence (use direct
outbox_repo.add_event()) - β Single handler per event
- β No need for hooks/tracing
π EventBus Documentation - Complete guide for advanced patterns
- PyPI distribution name:
messagekit - Python import package:
messagekit
# Install
pip install messagekit
# Import
from messagekit.core import BaseEvent
from messagekit.infrastructure import SqlAlchemyOutboxRepositoryServices should consume the published package rather than source checkout. Kafka remains shared infrastructure with local producer/consumer clients per service.
poetry install
poetry build
poetry run pytestLocal (Windows):
- Run non-RabbitMQ tests:
poetry run pytest tests/ -v -m "not requires_rabbitmq" - RabbitMQ integration tests fail due to Docker Desktop networking limitations
CI/CD (Recommended for complete coverage):
- GitHub Actions runs all 192 tests on Linux (100% pass rate)
- Workflows configured in
.github/workflows/ - See
.github/workflows/README.mdfor details
