Educational Python application demonstrating event-driven architecture, local event bus, event handlers, CQRS pattern, event sourcing, and asynchronous event processing.
- Event Bus - Local in-memory publish/subscribe system
- Event Publishing - Emit events when things happen
- Event Subscription - Register handlers for events
- Decoupled Components - Services communicate via events
- Event History - Track all events in the system
- Publish/Subscribe Pattern - Loose coupling between components
- Multiple Subscribers - Many handlers per event
- Synchronous Handlers - Immediate event processing
- Asynchronous Handlers - Non-blocking event processing
- Event Filtering - Subscribe to specific event types
- Decorator-Based - Easy handler registration
- Multiple Handlers - One event triggers many actions
- Error Isolation - Handler errors don't affect others
- Async Support - Background processing
- Handler Chaining - Sequential event processing
- Commands - Write operations (CreateUser, PlaceOrder)
- Queries - Read operations (GetUser, ListOrders)
- Separate Models - Different models for read/write
- Command Handlers - Execute commands, emit events
- Query Handlers - Retrieve data from read models
- Event Store - Store all events
- Aggregate Root - Rebuild state from events
- Event Replay - Reconstruct state at any point
- Version Tracking - Track aggregate versions
- Event History - Complete audit trail
- Async Handlers - Non-blocking event processing
- Background Processing - Events processed in threads
- Async/Await Support - Modern async patterns
- Concurrent Execution - Multiple handlers run in parallel
git clone https://github.com/Amruth22/Python-Event-Driven-Architecture.git
cd Python-Event-Driven-Architecturepython -m venv venv
# On Windows:
venv\Scripts\activate
# On macOS/Linux:
source venv/bin/activatepip install -r requirements.txtpython main.pypython tests.pyPython-Event-Driven-Architecture/
│
├── event_bus/
│ ├── event.py # Event definitions
│ └── event_bus.py # Event bus implementation
│
├── cqrs/
│ ├── commands.py # Command definitions
│ ├── queries.py # Query definitions
│ ├── command_handlers.py # Command handlers
│ └── query_handlers.py # Query handlers
│
├── event_sourcing/
│ ├── aggregate.py # Aggregate root
│ └── event_store.py # Event storage
│
├── handlers/
│ ├── user_handlers.py # User event handlers
│ └── order_handlers.py # Order event handlers
│
├── main.py # Demonstration script
├── tests.py # 10 unit tests
├── requirements.txt # Dependencies
├── .env # Configuration
└── README.md # This file
┌─────────────────────────────────────────────────────────────┐
│ Event Bus │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Subscribers (Event Handlers) │ │
│ │ - UserHandler: send_welcome_email() │ │
│ │ - UserHandler: create_user_profile() │ │
│ │ - OrderHandler: reduce_inventory() │ │
│ │ - OrderHandler: process_payment_async() │ │
│ └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
▲
│ Events
│
┌─────────────────────────┴───────────────────────────────────┐
│ Command Handlers │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ UserCommandHandler│ │OrderCommandHandler│ │
│ │ - CreateUser │ │ - PlaceOrder │ │
│ │ - UpdateEmail │ │ - ConfirmOrder │ │
│ │ - DeactivateUser│ │ - ShipOrder │ │
│ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
│ Emit Events
▼
┌─────────────────────────────────────────────────────────────┐
│ Event Store │
│ - UserCreatedEvent(1, "john", "john@example.com") │
│ - UserEmailChangedEvent(1, "old@", "new@") │
│ - OrderPlacedEvent(101, 1, [...], 999.99) │
│ - OrderShippedEvent(101, "TRACK123") │
└─────────────────────────────────────────────────────────────┘
│
│ Replay Events
▼
┌─────────────────────────────────────────────────────────────┐
│ Aggregate Root │
│ UserAggregate(1) │
│ - username: "john" │
│ - email: "new@example.com" │
│ - version: 2 │
└─────────────────────────────────────────────────────────────┘
from event_bus.event_bus import event_bus, subscribe
from event_bus.event import UserCreatedEvent
# Subscribe to event
@subscribe(UserCreatedEvent)
def handle_user_created(event):
print(f"User created: {event.username}")
# Publish event
event = UserCreatedEvent(1, "john_doe", "john@example.com")
event_bus.publish(event)
# Output: User created: john_doefrom cqrs.commands import CreateUserCommand
from cqrs.command_handlers import UserCommandHandler
# Create command
command = CreateUserCommand(
username="alice",
email="alice@example.com",
password="secure123"
)
# Execute command
handler = UserCommandHandler()
user_id = handler.handle(command)
# Automatically emits UserCreatedEventfrom cqrs.queries import GetUserQuery, ListUsersQuery
from cqrs.query_handlers import UserQueryHandler
# Get single user
query = GetUserQuery(user_id=1)
user = query_handler.handle(query)
# List all users
list_query = ListUsersQuery(active_only=True)
users = query_handler.handle(list_query)from event_sourcing.aggregate import UserAggregate
from event_sourcing.event_store import event_store
from event_bus.event import UserCreatedEvent, UserEmailChangedEvent
# Store events
events = [
UserCreatedEvent(1, "bob", "bob@example.com"),
UserEmailChangedEvent(1, "bob@example.com", "bob.new@example.com")
]
event_store.save_events("user-1", events)
# Rebuild state from events
user = UserAggregate(1)
stored_events = event_store.get_events("user-1")
user.load_from_history(stored_events)
print(user.email) # bob.new@example.com
print(user.version) # 2from event_bus.event_bus import subscribe_async
import asyncio
@subscribe_async(OrderPlacedEvent)
async def process_payment(event):
print(f"Processing payment for order {event.order_id}...")
await asyncio.sleep(1) # Simulate API call
print(f"Payment processed!")
# Publish event (payment processes in background)
event = OrderPlacedEvent(101, 1, [], 999.99)
event_bus.publish(event)1. User submits order
↓
2. PlaceOrderCommand created
↓
3. OrderCommandHandler.handle(command)
- Validates user exists
- Creates order
- Emits OrderPlacedEvent
↓
4. OrderPlacedEvent published to Event Bus
↓
5. Multiple handlers execute:
├─→ reduce_inventory(event)
├─→ send_order_confirmation(event)
├─→ process_payment_async(event) [async]
└─→ notify_warehouse(event)
↓
6. Async payment completes
↓
7. PaymentProcessedEvent emitted
↓
8. update_order_status(event) executes
Run the comprehensive test suite:
python tests.py- ✅ Event Bus Subscribe/Publish - Test basic pub/sub
- ✅ Multiple Handlers - Test multiple subscribers
- ✅ Event Store - Test event storage
- ✅ Command Handling - Test command execution
- ✅ Query Handling - Test query execution
- ✅ Event Sourcing - Test state reconstruction
- ✅ Async Event Processing - Test async handlers
- ✅ CQRS Pattern - Test command/query separation
- ✅ Event Replay - Test rebuilding state
- ✅ Handler Error Handling - Test error scenarios
Advantages:
- Loose Coupling - Components don't know about each other
- Scalability - Easy to add new handlers
- Flexibility - Change behavior without changing code
- Audit Trail - Complete history of what happened
When to Use:
- Microservices communication
- Complex workflows
- Need for audit trail
- Asynchronous processing
Why Separate Commands and Queries?
- Optimize Independently - Different models for read/write
- Scale Separately - Scale reads and writes independently
- Simpler Models - Each side is simpler
- Better Performance - Optimized for specific use case
Command Side:
- Validates business rules
- Changes state
- Emits events
Query Side:
- Reads data
- No side effects
- Optimized for queries
Benefits:
- Complete History - Never lose data
- Audit Trail - Know what happened and when
- Time Travel - Rebuild state at any point
- Debugging - Replay events to reproduce issues
Challenges:
- Storage - Events accumulate over time
- Complexity - More complex than CRUD
- Eventual Consistency - Read models may lag
Why Async?
- Non-Blocking - Don't wait for slow operations
- Better Performance - Handle more requests
- Parallel Processing - Multiple handlers run concurrently
Use Cases:
- Email sending
- Payment processing
- External API calls
- Long-running operations
For production use:
-
Message Broker:
- Replace in-memory bus with RabbitMQ/Kafka
- Add message persistence
- Implement retry logic
-
Event Store:
- Use database for event storage
- Implement snapshots for performance
- Add event versioning
-
Scalability:
- Distribute event handlers
- Add load balancing
- Implement event partitioning
-
Reliability:
- Add dead letter queues
- Implement idempotent handlers
- Add circuit breakers
-
Monitoring:
- Track event processing time
- Monitor queue depths
- Alert on failures
- Flask 3.0.0 - Web framework
- python-dotenv 1.0.0 - Environment variables
- pytest 7.4.3 - Testing framework
- requests 2.31.0 - HTTP client
This project is for educational purposes. Feel free to use and modify as needed.
Happy Learning! 🚀