A comprehensive, framework-agnostic implementation of the Transactional Inbox-Outbox pattern for reliable message delivery in distributed systems. Available for both Node.js/TypeScript and Python.
The Inbox-Outbox pattern ensures reliable message delivery in distributed systems by:
- Outbox Pattern: Guarantees message publishing by storing outgoing messages in the same transaction as business logic
- Inbox Pattern: Prevents duplicate message processing through deduplication
- Transactional Guarantees: Ensures atomicity between database operations and message creation
- Automatic Retry: Exponential backoff for failed message deliveries
- Polling & Cleanup: Background workers for message processing and cleanup
| Package | Description | PyPI |
|---|---|---|
event-forge |
Complete Python implementation with SQLAlchemy & MongoDB support |
npm install @prodforcode/event-forge-core @prodforcode/event-forge-nestjs @prodforcode/event-forge-typeorm @prodforcode/event-forge-rabbitmq-publisherimport { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { InboxOutboxModule } from '@prodforcode/event-forge-nestjs';
import { TypeOrmOutboxRepository, TypeOrmInboxRepository } from '@prodforcode/event-forge-typeorm';
import { RabbitMQPublisher } from '@prodforcode/event-forge-rabbitmq-publisher';
@Module({
imports: [
TypeOrmModule.forRoot({
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'user',
password: 'password',
database: 'mydb',
entities: [/* your entities */],
synchronize: false, // Use migrations in production
}),
InboxOutboxModule.register({
outbox: {
repository: TypeOrmOutboxRepository,
config: {
pollingInterval: 5000, // Poll every 5 seconds
batchSize: 10,
maxRetries: 3,
retentionDays: 7,
},
},
inbox: {
repository: TypeOrmInboxRepository,
config: {
retentionDays: 30,
},
},
publisher: RabbitMQPublisher,
}),
],
})
export class AppModule {}import { Injectable } from '@nestjs/common';
import { InjectDataSource } from '@nestjs/typeorm';
import { DataSource } from 'typeorm';
import { OutboxService } from '@prodforcode/event-forge-core';
@Injectable()
export class UserService {
constructor(
@InjectDataSource() private dataSource: DataSource,
private outboxService: OutboxService,
) {}
async createUser(email: string, name: string) {
// Use transaction to ensure atomicity
return this.outboxService.withTransaction(async (transactionContext) => {
// 1. Save user to database within transaction
const user = await this.dataSource
.createQueryBuilder()
.insert()
.into('users')
.values({ email, name })
.execute();
// 2. Create outbox message in same transaction
await this.outboxService.createMessage(
{
aggregateType: 'User',
aggregateId: user.identifiers[0].id,
eventType: 'user.created',
payload: { email, name },
},
transactionContext, // Pass transaction context
);
return user;
});
// Message will be published automatically after commit
}
}import { RabbitMQPublisher } from '@prodforcode/event-forge-rabbitmq-publisher';
const publisher = new RabbitMQPublisher({
url: 'amqp://guest:guest@localhost:5672',
exchange: 'events',
exchangeType: 'topic',
});
await publisher.connect();Option A: Using @InboxSubscribe decorator with RabbitMQ (Recommended)
npm install @prodforcode/event-forge-rabbitmq-consumerimport { Injectable } from '@nestjs/common';
import { InboxSubscribe } from '@prodforcode/event-forge-rabbitmq-consumer';
@Injectable()
export class OrderEventHandler {
@InboxSubscribe({
exchange: 'events',
routingKey: 'order.placed',
queue: 'my-service.order.placed',
source: 'order-service',
// Retry configuration
maxRetries: 5,
enableRetry: true,
backoffBaseSeconds: 10,
})
async handleOrderPlaced(message: any) {
// Automatic features:
// - Message recorded in inbox before handler execution
// - Duplicate messages automatically filtered
// - Errors trigger retry with exponential backoff
// - Permanent failure after maxRetries exceeded
console.log('Processing order:', message.payload);
await this.processOrder(message);
}
}Option B: Manual inbox recording
import { Injectable, OnModuleInit } from '@nestjs/common';
import { InboxService } from '@prodforcode/event-forge-core';
@Injectable()
export class OrderEventHandler implements OnModuleInit {
constructor(private inboxService: InboxService) {}
onModuleInit() {
// Register handler for order events
this.inboxService.registerHandler('order.placed', async (message) => {
console.log('Processing order:', message.payload);
// Your business logic here
});
}
async handleIncomingMessage(externalMessage: any) {
// Process incoming message with automatic deduplication
await this.inboxService.receiveMessage({
messageId: externalMessage.id, // External message ID for deduplication
source: 'external-service',
eventType: 'order.placed',
payload: externalMessage.data,
});
// Message will be processed only once, even if received multiple times
}
}import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { OutboxService, InboxService } from '@prodforcode/event-forge-core';
@Injectable()
export class OutboxPollingService implements OnModuleInit, OnModuleDestroy {
constructor(
private outboxService: OutboxService,
private inboxService: InboxService,
) {}
async onModuleInit() {
// Start polling for pending outbox messages
await this.outboxService.startPolling();
// Start cleanup for old messages
await this.outboxService.cleanup();
await this.inboxService.cleanup();
}
async onModuleDestroy() {
// Stop polling on shutdown
await this.outboxService.stopPolling();
}
}# With PostgreSQL support
pip install event-forge
# With RabbitMQ support
pip install event-forge[rabbitmq]
# With MongoDB support
pip install event-forge[mongodb]
# All extras
pip install event-forge[rabbitmq,mongodb]from event_forge import OutboxService, CreateOutboxMessageDto
from event_forge.repositories.sqlalchemy import (
SQLAlchemyOutboxRepository,
Base,
)
from event_forge.publishers.aio_pika_publisher import AioPikaPublisher
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# Setup database
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/mydb",
echo=True,
)
# Create tables (use Alembic migrations in production)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Setup repository and publisher
outbox_repository = SQLAlchemyOutboxRepository(engine)
publisher = AioPikaPublisher(
url="amqp://guest:guest@localhost/",
exchange="events",
exchange_type="topic",
)
# Create service
outbox_service = OutboxService(
repository=outbox_repository,
publisher=publisher,
)from sqlalchemy.ext.asyncio import AsyncSession
async def create_user(email: str, name: str):
"""Create user with reliable message publishing"""
# Create session with transaction
async with AsyncSession(engine) as session:
async with session.begin():
# 1. Insert user into database
result = await session.execute(
"INSERT INTO users (email, name) VALUES (:email, :name) RETURNING id",
{"email": email, "name": name}
)
user_id = result.scalar_one()
# 2. Create outbox message in same transaction
dto = CreateOutboxMessageDto(
aggregate_type="User",
aggregate_id=str(user_id),
event_type="user.created",
payload={"email": email, "name": name},
)
await outbox_service.create_message(dto, session)
# Transaction commits here
# Message will be published automatically after commitfrom event_forge import InboxService, CreateInboxMessageDto
from event_forge.repositories.sqlalchemy import SQLAlchemyInboxRepository
# Setup inbox
inbox_repository = SQLAlchemyInboxRepository(engine)
inbox_service = InboxService(repository=inbox_repository)
# Register event handler
@inbox_service.register_handler("order.placed")
async def handle_order_placed(message):
print(f"Processing order: {message.payload}")
# Your business logic here
# Process incoming message with automatic deduplication
async def handle_external_message(external_msg: dict):
dto = CreateInboxMessageDto(
message_id=external_msg["id"], # External message ID for deduplication
source="external-service",
event_type="order.placed",
payload=external_msg["data"],
)
# Will process only once, even if message is received multiple times
await inbox_service.receive_message(dto)import asyncio
async def start_background_workers():
"""Start polling and cleanup workers"""
# Start outbox polling (retries failed messages)
await outbox_service.start_polling()
# Start cleanup workers (removes old processed messages)
cleanup_task = asyncio.create_task(run_periodic_cleanup())
# Keep running
await asyncio.Event().wait()
async def run_periodic_cleanup():
"""Periodic cleanup of old messages"""
while True:
await asyncio.sleep(3600) # Run every hour
await outbox_service.cleanup()
await inbox_service.cleanup()
# Run application
asyncio.run(start_background_workers())- Create Message in Transaction: Outbox message is created in the same database transaction as business data
- Immediate Publish Attempt: After commit, immediate attempt to publish the message
- Polling for Retries: Background poller picks up failed messages and retries with exponential backoff
- Cleanup: Old published messages are periodically deleted
- Receive Message: External message is recorded with unique message ID
- Deduplication Check: If message ID already exists, skip processing
- Handler Execution: Registered handlers are called for the event type
- Mark as Processed: Message status is updated to prevent reprocessing
- Cleanup: Old processed messages are periodically deleted
pending- Created, waiting to be publishedpublishing- Currently being published (locked)published- Successfully publishedfailed- Permanently failed (max retries exceeded)
received- Message received, waiting to be processedprocessing- Currently being processedprocessed- Successfully processedfailed- Processing failed, scheduled for retrypermanently_failed- Processing failed after max retries or non-recoverable error
CREATE TABLE outbox_messages (
id VARCHAR(36) PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(50) NOT NULL,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
error_message TEXT,
locked_by VARCHAR(255),
locked_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
published_at TIMESTAMP
);
CREATE INDEX idx_outbox_status ON outbox_messages(status);
CREATE INDEX idx_outbox_aggregate ON outbox_messages(aggregate_type, aggregate_id);
CREATE INDEX idx_outbox_created ON outbox_messages(created_at);CREATE TABLE inbox_messages (
id VARCHAR(36) PRIMARY KEY,
message_id VARCHAR(255) NOT NULL,
source VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(50) NOT NULL,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
error_message TEXT,
scheduled_at TIMESTAMP, -- Next retry time (for failed messages)
created_at TIMESTAMP DEFAULT NOW(),
processed_at TIMESTAMP
);
CREATE UNIQUE INDEX idx_inbox_message_source ON inbox_messages(message_id, source);
CREATE INDEX idx_inbox_status ON inbox_messages(status);
CREATE INDEX idx_inbox_status_scheduled ON inbox_messages(status, scheduled_at);
CREATE INDEX idx_inbox_created ON inbox_messages(created_at);{
pollingInterval: 5000, // Polling interval in ms (default: 5000)
batchSize: 10, // Number of messages to process per batch (default: 10)
maxRetries: 3, // Maximum retry attempts (default: 3)
retryDelay: 1000, // Initial retry delay in ms (default: 1000)
retryBackoffMultiplier: 2, // Exponential backoff multiplier (default: 2)
lockTimeout: 30000, // Message lock timeout in ms (default: 30000)
retentionDays: 7, // Days to keep published messages (default: 7)
immediateProcessing: true, // Attempt immediate publish after commit (default: true)
}{
retentionDays: 30, // Days to keep processed messages (default: 30)
enableRetry: true, // Enable automatic retry polling (default: false)
retryPollingInterval: 5000, // Polling interval in ms (default: 5000)
maxRetries: 3, // Max retry attempts (default: 3)
backoffBaseSeconds: 5, // Base delay for exponential backoff (default: 5)
maxBackoffSeconds: 3600, // Max delay in seconds (default: 3600 = 1 hour)
}When a message handler fails, the inbox automatically schedules a retry using exponential backoff:
delay = min(backoffBaseSeconds × 2^retryCount, maxBackoffSeconds) ± 10% jitter
Example retry schedule (default settings):
| Retry | Delay |
|---|---|
| 1 | ~5 seconds |
| 2 | ~10 seconds |
| 3 | ~20 seconds |
| 4 | ~40 seconds |
| 5 | ~80 seconds |
| 6+ | Capped at 1 hour |
Important: To enable retry execution, either:
- Set
enableRetry: truein inbox config (automatic polling) - Call
inboxService.startRetryPolling()manually
Messages are marked as permanently failed when:
retryCount >= maxRetries- Handler throws
ProcessingError(signals non-recoverable error)
Implement the IMessagePublisher interface:
import { IMessagePublisher, OutboxMessage } from '@prodforcode/event-forge-core';
export class CustomPublisher implements IMessagePublisher {
async publish(message: OutboxMessage): Promise<void> {
// Your custom publishing logic
await fetch('https://api.example.com/events', {
method: 'POST',
body: JSON.stringify({
type: message.eventType,
data: message.payload,
}),
});
}
async connect(): Promise<void> {
// Setup connection if needed
}
async disconnect(): Promise<void> {
// Cleanup connection
}
}Implement IOutboxRepository or IInboxRepository:
import { IOutboxRepository, OutboxMessage } from '@prodforcode/event-forge-core';
export class CustomOutboxRepository implements IOutboxRepository {
async create(message: OutboxMessage, transactionContext?: any): Promise<OutboxMessage> {
// Save to your custom database
}
async findPendingMessages(limit: number): Promise<OutboxMessage[]> {
// Query pending messages
}
// Implement other required methods...
}Subscribe to internal events:
import { OutboxService, OutboxEvent } from '@prodforcode/event-forge-core';
outboxService.on(OutboxEvent.MESSAGE_CREATED, (message) => {
console.log('Message created:', message.id);
});
outboxService.on(OutboxEvent.MESSAGE_PUBLISHED, (message) => {
console.log('Message published:', message.id);
});
outboxService.on(OutboxEvent.PUBLISH_FAILED, ({ message, error }) => {
console.error('Publish failed:', message.id, error);
});# Node.js
npm test
# Python
pytest# Requires running PostgreSQL and RabbitMQ
docker-compose up -d
npm run test:integrationcreateMessage(dto, transactionContext?)- Create outbox messagewithTransaction(callback)- Execute operation within transactionstartPolling()- Start background pollingstopPolling()- Stop background pollingcleanup()- Remove old published messages
receiveMessage(dto)- Receive and process message with deduplicationregisterHandler(eventType, handler)- Register event handlerunregisterHandler(eventType, handler)- Unregister event handlerstartRetryPolling()- Start polling for failed messages due for retrystopRetryPolling()- Stop retry pollingcleanup()- Remove old processed messages
Contributions are welcome! Please read our Contributing Guide for details.
MIT License - see LICENSE for details.