Skip to content

JugalGajjar/TaskForge

Repository files navigation

TaskForge

A production-grade distributed job processing system with a real-time observability dashboard. Submit asynchronous jobs via REST API, process them across horizontally-scalable Celery workers, and watch everything happen live through a React dashboard backed by WebSockets.


Features

Job Processing

  • Five built-in job types: data_processing, web_scraping, ai_inference, security_scan, email
  • Four priority queues: critical → high → default → low
  • Automatic retries with exponential backoff (60s × 2^n)
  • Dead-letter queue after exhausting max retries
  • Job cancellation with Celery task revocation
  • Manual re-queue for failed or dead-letter jobs
  • Scheduled/delayed jobs via ETA dispatch
  • Idempotency keys to prevent duplicate submissions
  • Webhook callbacks on job completion

Architecture

  • Horizontally scalable workers (2 replicas by default, each with 4 concurrent threads)
  • Redis pub/sub for cross-process WebSocket broadcasting — all API instances push events to all connected clients
  • Worker auto-registration with hostname deduplication and race-condition handling
  • task_acks_late=True + task_reject_on_worker_lost=True for at-least-once delivery

API & Auth

  • Dual authentication: Bearer JWT and X-API-Key header
  • Access + refresh token flow
  • Role-based access control: admin, operator, viewer
  • Sliding-window rate limiter backed by Redis (120 req/min per user by default)
  • OpenAPI docs at /api/docs

Observability

  • Prometheus metrics: HTTP request counts, latency histograms, active WebSocket connections
  • Grafana dashboard pre-provisioned with the Prometheus data source
  • Celery Flower UI for live worker and task inspection
  • Celery Beat for scheduled periodic tasks
  • Structured JSON logging throughout

Frontend

  • React 18 + TypeScript + Tailwind CSS
  • Real-time job status updates via WebSocket
  • Recharts for throughput and status distribution charts
  • Framer Motion animations
  • Nginx reverse proxy to backend API and WebSocket

Tech Stack

Layer Technology
Backend API Python 3.12, FastAPI, SQLAlchemy 2, Alembic
Task Queue Celery 5, Redis (broker + result backend)
Database PostgreSQL 16
Frontend React 18, TypeScript, Vite, Tailwind CSS
WebSocket FastAPI WebSocket + Redis pub/sub
Metrics Prometheus, Grafana
Worker UI Celery Flower
Container Docker, Docker Compose
CI GitHub Actions

Architecture

Browser
  │
  ├── HTTP/REST ──► Nginx (:3000) ──► FastAPI (:8000)
  │                                       │
  └── WebSocket ──► Nginx (:3000) ──►     │
                                          ▼
                                    Redis pub/sub ◄── Celery Workers
                                          │                  │
                                    PostgreSQL        4 priority queues
                                    (jobs, users,     (critical/high/
                                     workers)          default/low)

Job lifecycle:

POST /api/v1/jobs
      │
      ▼
  DB: status=QUEUED
      │
      ▼
  Celery dispatch (priority queue)
      │
      ▼
  Worker picks up → DB: status=RUNNING → handler executes
      │
      ├── success → DB: status=SUCCESS  → Redis pub/sub → WebSocket → Browser
      ├── failure (retries left) → DB: status=RETRYING → re-queued with backoff
      └── failure (exhausted) → DB: status=DEAD_LETTER

Quick Start

Prerequisites: Docker Desktop 4.x+ with Compose v2

git clone https://github.com/JugalGajjar/TaskForge.git
cd TaskForge
cp .env.example .env
docker compose up --build

Seed demo users and sample jobs:

docker compose exec backend python scripts/seed.py
Service URL Default credentials
Dashboard http://localhost:3000 admin / admin1234 (after seed)
API docs http://localhost:8000/api/docs
Flower http://localhost:5555
Prometheus http://localhost:9090
Grafana http://localhost:3001 admin / admin

API Reference

All endpoints are prefixed /api/v1. Authentication is required on all routes except /auth/register and /auth/login.

Auth

Method Path Description
POST /auth/register Create a new account
POST /auth/login Get access + refresh tokens
POST /auth/refresh Rotate tokens
GET /auth/me Current user profile
POST /auth/api-key Generate an API key
DELETE /auth/api-key Revoke API key

Jobs

Method Path Auth required Description
POST /jobs operator+ Submit a job
GET /jobs any List jobs (filterable by status, type, priority)
GET /jobs/{id} any Get a single job with full logs
POST /jobs/{id}/cancel operator+ Cancel a queued or running job
POST /jobs/{id}/retry operator+ Re-queue a failed or dead-letter job
DELETE /jobs/{id} operator+ Delete a terminal job
GET /jobs/metrics/summary any Aggregate counts and avg duration

Workers

Method Path Description
GET /workers List all registered workers and their status

WebSocket

Connect to ws://localhost:8000/ws?token=<access_token> to receive real-time events:

Event Fired when
connected Handshake complete
job_running Worker picks up the job
job_success Job completes successfully
job_failed Job fails after exhausting retries
job_retrying Retry is scheduled
job_cancelled Job is cancelled

Send ping to receive {"event":"pong"} for keepalive.


Job Types & Payloads

Submit jobs with POST /api/v1/jobs:

{
  "type": "data_processing",
  "priority": "high",
  "payload": { "dataset": "sales_q1", "operation": "aggregate", "rows": 50000 },
  "max_retries": 3,
  "idempotency_key": "sales-agg-2024-q1",
  "webhook_url": "https://your-service.com/webhooks/taskforge"
}
Type Example payload fields
data_processing dataset, operation, rows
web_scraping url, depth, selectors
ai_inference model, prompt, max_tokens
security_scan target, scan_type
email to, subject, template, recipients
custom any — falls back to data_processing handler

Priority values: low, normal (default), high, critical


Configuration

Copy .env.example to .env and set the values before production deployment:

# Auth — change both of these
SECRET_KEY=<32+ char random string>
JWT_SECRET_KEY=<32+ char random string>

# Database
DATABASE_URL=postgresql://taskforge:taskforge_secret@postgres:5432/taskforge

# Redis
REDIS_URL=redis://redis:6379/0
CELERY_BROKER_URL=redis://redis:6379/1
CELERY_RESULT_BACKEND=redis://redis:6379/2

# Rate limiting
RATE_LIMIT_REQUESTS=120
RATE_LIMIT_WINDOW=60

# Admin user created by the seed script
ADMIN_USERNAME=admin
ADMIN_EMAIL=admin@taskforge.local
ADMIN_PASSWORD=admin1234

Development

Common tasks via the Makefile:

make up             # Start all services (copies .env.example if .env missing)
make down           # Stop all services
make logs           # Tail all logs
make logs-api       # Tail backend logs only
make logs-worker    # Tail worker logs only
make seed           # Seed demo data
make migrate        # Run Alembic migrations
make test           # Run test suite against isolated taskforge_test DB
make shell-backend  # Shell into the backend container
make shell-db       # psql into PostgreSQL
make shell-redis    # redis-cli
make clean          # Remove containers, volumes, and images

Scale workers:

docker compose up -d --scale worker=4

Create a new DB migration:

make migrate-create MSG="add index on jobs.created_by"

Running Tests

make test

This creates a taskforge_test database, runs the full test suite against it, then drops it — the production database is never touched.

To run manually inside the container:

docker compose exec \
  -e TEST_DATABASE_URL=postgresql://taskforge:taskforge_secret@postgres:5432/taskforge_test \
  backend python -m pytest tests/ -v

Project Structure

TaskForge/
├── backend/
│   ├── app/
│   │   ├── api/v1/            # Route handlers (auth, jobs, workers, ws)
│   │   ├── core/              # Security, rate limiting, logging
│   │   ├── models/            # SQLAlchemy models (Job, User, Worker)
│   │   ├── schemas/           # Pydantic request/response schemas
│   │   ├── services/          # Business logic (job_service, ws_manager)
│   │   └── workers/
│   │       ├── celery_app.py
│   │       ├── tasks.py       # Main Celery task + retry/failure logic
│   │       └── handlers/      # Per-type job handlers
│   ├── alembic/               # Database migrations
│   ├── scripts/seed.py        # Demo data seed
│   └── tests/                 # Integration tests
├── frontend/
│   └── src/
│       ├── api/               # Axios client + typed API calls
│       ├── components/        # Jobs table, charts, worker cards, modals
│       ├── contexts/          # AuthContext
│       ├── hooks/             # useWebSocket
│       └── pages/             # Dashboard, Jobs, Workers, Settings
├── monitoring/
│   ├── prometheus/            # Scrape config
│   └── grafana/               # Pre-provisioned data source + dashboard
├── elk/logstash/              # Logstash pipeline config
├── docker-compose.yml
├── Makefile
└── .env.example

License

MIT LICENSE

About

Production-grade distributed job processing system with async workers, priority queues, real-time WebSocket dashboard, JWT auth, Prometheus metrics, and Grafana.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors