A powerful Python-based workflow automation engine with n8n integration, real-time streaming, and comprehensive monitoring.
- FastAPI Backend: High-performance REST API with automatic documentation
- PostgreSQL Database: Robust data storage with SQLAlchemy ORM
- Celery Task Queue: Asynchronous workflow execution with Redis backend
- n8n Integration: Seamless workflow synchronization and execution
- WebSocket Support: Real-time workflow monitoring and event streaming
- Webhook Management: Dynamic webhook endpoints for external integrations
- Authentication & Security: JWT tokens, API keys, and role-based access control
- Monitoring & Logging: Comprehensive system metrics and health checks
- Docker Support: Complete containerization with Docker Compose
- Python 3.11+
- PostgreSQL 15+
- Redis 7+
- Docker & Docker Compose (optional)
-
Clone the repository
git clone <repository-url> cd workflow-engine
-
Install dependencies
pip install -r requirements.txt
-
Set up environment variables
cp .env.example .env # Edit .env with your configuration -
Initialize the database
# Make sure PostgreSQL is running python -c "from app.core.database import engine, Base; Base.metadata.create_all(bind=engine)"
-
Start Redis
redis-server
-
Start Celery worker
celery -A app.celery_app worker --loglevel=info
-
Start the application
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
-
Build and start all services
docker-compose up -d
-
View logs
docker-compose logs -f
-
Stop services
docker-compose down
Once the application is running, visit:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
- FastAPI Application (
app/main.py): Main web server - Database Models (
app/models/): SQLAlchemy models for data persistence - API Endpoints (
app/api/v1/): REST API routes and handlers - Services (
app/services/): Business logic and external integrations - Tasks (
app/tasks/): Celery background tasks - WebSocket (
app/websocket/): Real-time communication
- Users: User accounts and authentication
- Workflows: Workflow definitions and configurations
- Executions: Workflow execution records and results
- Triggers: Workflow trigger configurations
- Webhooks: Dynamic webhook endpoints
- Integrations: External service connections
- API Keys: API access management
- Execution Logs: Detailed execution logging
# Database
DATABASE_URL=postgresql://user:password@localhost:5432/workflow_engine
# Redis
REDIS_URL=redis://localhost:6379/0
# Security
SECRET_KEY=your-secret-key-here
ACCESS_TOKEN_EXPIRE_MINUTES=30
# n8n Integration
N8N_BASE_URL=http://localhost:5678
N8N_API_KEY=your-n8n-api-key
# Application
ENVIRONMENT=development
LOG_LEVEL=INFO-
Register a new user
curl -X POST "http://localhost:8000/api/v1/auth/register" \ -H "Content-Type: application/json" \ -d '{"email": "user@example.com", "password": "password123"}'
-
Login
curl -X POST "http://localhost:8000/api/v1/auth/login" \ -H "Content-Type: application/json" \ -d '{"username": "user@example.com", "password": "password123"}'
-
Create a workflow
curl -X POST "http://localhost:8000/api/v1/workflows/" \ -H "Authorization: Bearer <token>" \ -H "Content-Type: application/json" \ -d '{"name": "My Workflow", "definition": {...}}'
-
Execute a workflow
curl -X POST "http://localhost:8000/api/v1/executions/execute/<workflow_id>" \ -H "Authorization: Bearer <token>" \ -H "Content-Type: application/json" \ -d '{"input_data": {...}}'
const ws = new WebSocket('ws://localhost:8000/api/v1/ws/ws?token=<your_token>');
ws.onopen = function() {
// Subscribe to workflow updates
ws.send(JSON.stringify({
type: 'subscribe_workflow',
workflow_id: 'workflow-id-here'
}));
};
ws.onmessage = function(event) {
const message = JSON.parse(event.data);
console.log('Received:', message);
};- Basic Health:
GET /api/v1/health - Detailed Status:
GET /api/v1/status(authenticated) - System Metrics:
GET /api/v1/metrics/system(admin only)
Access Flower dashboard at: http://localhost:5555
- Application logs:
logs/app.log - Docker logs:
docker-compose logs
workflow-engine/
├── app/
│ ├── api/v1/ # API endpoints
│ ├── core/ # Core configuration and utilities
│ ├── models/ # Database models
│ ├── services/ # Business logic
│ ├── tasks/ # Celery tasks
│ ├── websocket/ # WebSocket handlers
│ └── main.py # FastAPI application
├── logs/ # Application logs
├── docker-compose.yml # Docker services
├── Dockerfile # Application container
├── requirements.txt # Python dependencies
└── README.md # This file
# Install test dependencies
pip install pytest pytest-asyncio httpx
# Run tests
pytest# Format code
black app/
# Lint code
flake8 app/
# Type checking
mypy app/-
Start n8n
docker run -it --rm --name n8n -p 5678:5678 n8nio/n8n
-
Configure API access
- Enable API access in n8n settings
- Generate API key
- Update
N8N_API_KEYenvironment variable
-
Sync workflows
curl -X POST "http://localhost:8000/api/v1/integrations/n8n/sync" \ -H "Authorization: Bearer <token>" \ -H "Content-Type: application/json" \ -d '{"sync_direction": "from_n8n"}'
-
Security
- Use strong secret keys
- Enable HTTPS
- Configure proper CORS settings
- Use environment-specific configurations
-
Performance
- Scale Celery workers based on load
- Configure database connection pooling
- Use Redis clustering for high availability
- Implement proper caching strategies
-
Monitoring
- Set up log aggregation
- Configure alerting for critical metrics
- Monitor database performance
- Track API response times
# docker-compose.prod.yml
version: '3.8'
services:
app:
build: .
environment:
- ENVIRONMENT=production
- SECRET_KEY=${SECRET_KEY}
deploy:
replicas: 3
resources:
limits:
memory: 512M
reservations:
memory: 256M- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
This project is licensed under the MIT License - see the LICENSE file for details.
For support and questions:
- Create an issue on GitHub
- Check the documentation
- Review the API documentation at
/docs