Educational Python application demonstrating process-based service orchestration, Python multiprocessing, inter-service communication, shared memory, service registry, and health monitoring.
- UserService - User management running as separate process
- OrderService - Order processing with inter-service calls
- NotificationService - Asynchronous notification handling
- Each service runs in its own OS process with isolated memory
- Queue-based messaging - Asynchronous message passing between services
- Request/Response pattern - Structured communication protocol
- Message Protocol - Standard format for all messages
- Timeout handling - Graceful handling of slow services
- Shared statistics - Real-time metrics across all services
- Heartbeat tracking - Service health monitoring
- Request counters - Track service usage
- Thread-safe access - Proper synchronization
- Service registration - Automatic service discovery
- Metadata storage - PID, queues, status tracking
- Service lookup - Find services by name
- Status management - Track service health states
- Process health checks - Verify processes are alive
- Heartbeat monitoring - Detect unresponsive services
- Auto-restart - Recover from failures (configurable)
- Statistics tracking - Monitor service performance
- REST API - Easy access to services via HTTP
- Flask-based - Simple and familiar interface
- Service proxy - Gateway pattern implementation
- JSON responses - Standard API format
git clone https://github.com/Amruth22/Python-Process-Service-Orchestration.git
cd Python-Process-Service-Orchestrationpython -m venv venv
# On Windows:
venv\Scripts\activate
# On macOS/Linux:
source venv/bin/activatepip install -r requirements.txtpython main.pyThe application will start all services and the HTTP gateway on http://localhost:5000
python tests.pyPython-Process-Service-Orchestration/
│
├── services/
│ ├── base_service.py # Base service class
│ ├── user_service.py # User management service
│ ├── order_service.py # Order processing service
│ └── notification_service.py # Notification service
│
├── orchestrator/
│ ├── service_manager.py # Service lifecycle management
│ ├── registry.py # Service registry
│ └── health_monitor.py # Health monitoring
│
├── communication/
│ └── message_protocol.py # Message format standard
│
├── gateway/
│ └── api.py # HTTP gateway (Flask)
│
├── main.py # Application entry point
├── tests.py # Unit tests (10 tests)
├── requirements.txt # Dependencies
├── .env # Configuration
└── README.md # This file
┌─────────────────────────────────────────────────────────────┐
│ HTTP Gateway (Flask) │
│ http://localhost:5000 │
└────────────────────┬────────────────────────────────────────┘
│
│ HTTP Requests
│
┌────────────────────▼────────────────────────────────────────┐
│ Service Manager │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Service Registry │ │
│ │ - UserService: PID 1234, Queue A, Status: Running │ │
│ │ - OrderService: PID 1235, Queue B, Status: Running │ │
│ │ - NotificationService: PID 1236, Queue C, Running │ │
│ └──────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Health Monitor │ │
│ │ - Checks process health every 5 seconds │ │
│ │ - Monitors heartbeats │ │
│ │ - Auto-restart on failure │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
┌────────────┼────────────┐
│ │ │
┌───────▼──────┐ ┌──▼──────────┐ ┌▼────────────────┐
│ UserService │ │OrderService │ │NotificationSvc │
│ (Process 1) │ │ (Process 2) │ │ (Process 3) │
│ │ │ │ │ │
│ Queue: A │ │ Queue: B │ │ Queue: C │
│ PID: 1234 │ │ PID: 1235 │ │ PID: 1236 │
└──────────────┘ └─────────────┘ └─────────────────┘
│ │ │
└────────────────┴────────────────┘
│
┌────────▼────────┐
│ Shared Memory │
│ - Statistics │
│ - Heartbeats │
│ - Counters │
└─────────────────┘
GET /Response:
{
"message": "Process-Based Service Orchestration API",
"version": "1.0.0",
"endpoints": {
"users": "/users",
"orders": "/orders",
"notifications": "/notifications",
"services": "/services"
}
}GET /healthPOST /users
Content-Type: application/json
{
"username": "john_doe",
"email": "john@example.com"
}GET /users/{user_id}GET /usersPOST /orders
Content-Type: application/json
{
"user_id": 1,
"product": "Laptop",
"quantity": 2
}Note: OrderService validates user by calling UserService
GET /orders/{order_id}GET /ordersPOST /notifications
Content-Type: application/json
{
"user_id": 1,
"message": "Your order has been created",
"type": "email"
}GET /servicesResponse:
{
"status": "success",
"services": {
"UserService": {
"pid": 1234,
"status": "running",
"stats": {
"requests": 10,
"last_heartbeat": 0.5
}
}
},
"count": 3
}GET /services/{service_name}/healthcurl -X POST http://localhost:5000/users \
-H "Content-Type: application/json" \
-d '{
"username": "alice",
"email": "alice@example.com"
}'curl -X POST http://localhost:5000/orders \
-H "Content-Type: application/json" \
-d '{
"user_id": 1,
"product": "Smartphone",
"quantity": 1
}'curl http://localhost:5000/servicesimport requests
BASE_URL = "http://localhost:5000"
# Create a user
response = requests.post(f"{BASE_URL}/users", json={
"username": "bob",
"email": "bob@example.com"
})
print(response.json())
# Create an order
response = requests.post(f"{BASE_URL}/orders", json={
"user_id": 1,
"product": "Laptop",
"quantity": 2
})
print(response.json())
# Check service health
response = requests.get(f"{BASE_URL}/services/UserService/health")
print(response.json())Each service runs in its own OS process:
# Each service is a separate process
user_process = Process(target=user_service.run)
order_process = Process(target=order_service.run)
user_process.start() # Starts in new process
order_process.start() # Starts in another new processBenefits:
- True parallelism (no GIL limitations)
- Fault isolation (one crash doesn't affect others)
- Separate memory spaces
- Can utilize multiple CPU cores
Services communicate via queues:
# OrderService sends request to UserService
request = {
'action': 'validate_user',
'data': {'user_id': 123},
'request_id': 'unique-id'
}
user_service_queue.put(request)
# UserService processes and responds
response = user_service_queue.get()
# Process request...
response_queue.put(response)Services share statistics via Manager:
# Create shared memory
manager = Manager()
shared_stats = manager.dict()
# Services update shared stats
with shared_stats.get_lock():
shared_stats['requests'] = shared_stats.get('requests', 0) + 1Tracks all running services:
registry.register('UserService', {
'pid': process.pid,
'status': 'running',
'queue': request_queue
})
# Later, find the service
service_info = registry.get_service('UserService')Monitors service health:
# Check if process is alive
if process.is_alive():
status = 'healthy'
# Check heartbeat
last_heartbeat = shared_stats['UserService_heartbeat']
if time.now() - last_heartbeat > 10:
status = 'unhealthy'Run the comprehensive test suite:
python tests.py- ✅ Service Process Creation - Test process spawning
- ✅ Service Registration - Test registry operations
- ✅ Queue Communication - Test message passing
- ✅ Shared Memory - Test shared data access
- ✅ Service Discovery - Test finding services
- ✅ Health Check - Test health monitoring
- ✅ Inter-Service Communication - Test service-to-service calls
- ✅ HTTP Gateway - Test REST API endpoints
- ✅ Message Protocol - Test message format
- ✅ Service Manager - Test orchestration
1. Testing service process creation...
✅ Service process created with PID: 12345
✅ Service process stopped successfully
2. Testing service registration...
✅ Service registered successfully
✅ Service info retrieved: PID 12345
✅ Service listed: ['TestService']
✅ Service deregistered successfully
Process:
- Separate memory space
- True parallelism
- Heavier resource usage
- Better isolation
Thread:
- Shared memory space
- GIL limitations in Python
- Lighter resource usage
- Less isolation
When to use processes:
- CPU-intensive tasks
- Need true parallelism
- Want fault isolation
- Microservices architecture
Queue:
- Thread-safe message passing
- FIFO order
- Good for async communication
- Used in this project
Pipe:
- Two-way communication
- Faster for 1-to-1
- Direct connection
- Not used (kept simple)
Shared Memory:
- Fastest IPC method
- Requires synchronization
- Good for shared state
- Used for statistics
Key Concepts:
- Service Registry - Know what's running
- Health Monitoring - Keep services alive
- Graceful Shutdown - Clean exit
- Inter-Service Calls - Services working together
This project demonstrates:
- Service Discovery - Registry pattern
- Health Checks - Monitoring pattern
- API Gateway - Gateway pattern
- Message Passing - Async communication
Edit .env file:
# Service Configuration
SERVICE_CHECK_INTERVAL=5
MAX_RESTART_ATTEMPTS=3
HEARTBEAT_TIMEOUT=10
# HTTP Gateway
GATEWAY_HOST=0.0.0.0
GATEWAY_PORT=5000
DEBUG=True
# Logging
LOG_LEVEL=INFOFor production use:
-
Persistence:
- Replace in-memory storage with database
- Add data persistence layer
- Implement state recovery
-
Scalability:
- Use Redis for shared memory
- Implement load balancing
- Add horizontal scaling
-
Reliability:
- Implement retry logic
- Add circuit breakers
- Improve error handling
-
Monitoring:
- Add metrics collection
- Implement logging aggregation
- Set up alerting
-
Security:
- Add authentication
- Implement authorization
- Secure inter-service communication
- Flask 3.0.0 - HTTP gateway
- python-dotenv 1.0.0 - Environment variables
- pytest 7.4.3 - Testing framework
- requests 2.31.0 - HTTP client for tests
- Check if ports are available
- Verify Python version (3.7+)
- Check logs for errors
- Increase timeout in
.env - Check if services are running
- Verify queue connections
- Ensure no other instance is running
- Check port 5001 is available
- Run tests with verbose output
This project is for educational purposes. Feel free to use and modify as needed.
Happy Learning! 🚀