A production-grade distributed task queue and job scheduler microservice built with Python, FastAPI, Celery, PostgreSQL, and Redis.
- Async Task Processing: Distribute heavy workloads to background workers
- Job Persistence: Store task history and results in PostgreSQL
- Message Queue: Redis-backed Celery for reliable task distribution
- REST API: Complete API for task management and monitoring
- Health Checks: Built-in health monitoring for database and Redis
- Retry Logic: Automatic task retries with exponential backoff
- Priority Levels: Support for multiple task priority levels
- Docker Support: Docker and Docker Compose for easy deployment
- Comprehensive Tests: Unit and integration tests with pytest
- Framework: FastAPI
- Task Queue: Celery with Redis
- Database: PostgreSQL with SQLAlchemy ORM
- Validation: Pydantic
- Testing: Pytest
- Containerization: Docker & Docker Compose
task-queue-system/
├── app/
│ ├── api/ # API endpoints
│ │ ├── health.py # Health check endpoints
│ │ └── tasks.py # Task CRUD endpoints
│ ├── core/ # Core configuration
│ │ ├── config.py # Settings management
│ │ └── database.py # Database configuration
│ ├── models/ # SQLAlchemy models
│ │ └── task.py # Task model
│ ├── schemas/ # Pydantic schemas
│ │ └── task.py # Request/response schemas
│ ├── services/ # Business logic
│ │ └── task_service.py # Task operations
│ ├── workers/ # Celery workers
│ │ └── celery_app.py # Celery configuration
│ └── main.py # FastAPI application
├── tests/ # Test suite
│ ├── unit/ # Unit tests
│ └── integration/ # Integration tests
├── requirements.txt # Python dependencies
├── docker-compose.yml # Docker Compose configuration
├── Dockerfile # Docker image
└── README.md # This file
- Docker & Docker Compose (recommended)
- Python 3.11+
- PostgreSQL
- Redis
# Copy environment configuration
cp .env.example .env
# Start all services
docker-compose up -d
# Create database tables
docker-compose exec api python -c "from app.core.database import Base, engine; Base.metadata.create_all(bind=engine)"
# Access API
curl http://localhost:8000/health# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Set up environment
cp .env.example .env
# Create database tables
python -c "from app.core.database import Base, engine; Base.metadata.create_all(bind=engine)"
# Start API server
uvicorn app.main:app --reload
# In another terminal, start Celery worker
celery -A app.workers.celery_app worker --loglevel=infoGET /health- Check application health status
POST /api/v1/tasks- Create a new taskGET /api/v1/tasks- List tasks (with filtering)GET /api/v1/tasks/{task_id}- Get task detailsPOST /api/v1/tasks/{task_id}/cancel- Cancel a taskDELETE /api/v1/tasks/{task_id}- Delete a task
curl -X POST http://localhost:8000/api/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"name": "data_processing",
"description": "Process user records",
"payload": {
"user_ids": [1, 2, 3],
"operation": "export"
},
"priority": "high",
"max_retries": 3
}'pending- Task created, waiting to be processedprocessing- Task is being executedcompleted- Task finished successfullyfailed- Task execution failedcancelled- Task was cancelled
# Run all tests
pytest
# Run with coverage
pytest --cov=app tests/
# Run specific test file
pytest tests/unit/test_task_service.py
# Run integration tests only
pytest tests/integration/Edit .env file to customize:
DATABASE_URL- PostgreSQL connection stringREDIS_URL- Redis connection URLAPI_HOST/API_PORT- API server addressLOG_LEVEL- Logging verbosityDEBUG- Development mode
See ARCHITECTURE.md for detailed architecture documentation.
- Fork the repository
- Create a feature branch
- Make your changes and add tests
- Ensure tests pass:
pytest - Format code:
black . && isort . - Submit a pull request
MIT License - see LICENSE file for details
For issues and questions, please open an issue on GitHub.