Skip to content

chaoting-sun/async-task-queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Async Task Queue

A simplified Celery implementation for learning asynchronous processing and the Producer-Consumer pattern.

Overview

This project demonstrates how to build a reliable task queue system using Redis as a message broker. It handles long-running tasks (e.g., image processing, model inference) asynchronously, with built-in retry logic, crash recovery, and dead letter queuing.

Key Learning Concepts

  • Asynchronous Processing: Decouple HTTP request handling from task execution
  • Producer-Consumer Pattern: Separate task submission (API) from task execution (Worker)
  • Visibility Timeout: Automatically recover tasks if workers crash
  • Lease Tokens: Prevent race conditions between workers and the reaper
  • Atomic Operations: Use Redis Lua scripts to eliminate crash windows

Features

  • Atomic Task Claiming: BLMOVE ensures exactly-once delivery per task
  • Visibility Timeout: Tasks are reclaimed if not acknowledged within timeout
  • Lease Tokens: Unique tokens prevent ack/reaper race conditions
  • Heartbeat Extension: Long-running tasks extend their lease to avoid premature reclaim
  • Automatic Retries: Failed tasks are retried up to MAX_RETRIES times
  • Dead Letter Queue: Permanently failed tasks are preserved for inspection
  • Orphan Recovery: Tasks stuck between claim and registration are recovered
  • Graceful Shutdown: Workers complete current task before exiting
  • Horizontal Scaling: Run multiple workers with docker-compose up --scale worker=N

Architecture

┌─────────────────────────────────────────────────────────────┐
│                        Docker Network                       │
│                                                             │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐   │
│  │   FastAPI    │    │    Redis     │    │    Worker    │   │
│  │   (api)      │◄──►│   (redis)    │◄──►│   (worker)   │   │
│  │   :8000      │    │   :6379      │    │              │   │
│  └──────────────┘    └──────────────┘    └──────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘
Component Technology Role
API Server Python + FastAPI Task producer, status queries
Broker Redis Task queues + result storage
Worker Python Task consumer + executor
Orchestration Docker Compose Multi-container deployment

Quick Start

Prerequisites

  • Docker
  • Docker Compose

Run the System

# Start all services
docker-compose up --build

# Submit a task
curl -X POST "http://localhost:8000/process-image?image_url=cat.jpg"
# Response: {"task_id": "abc-123", "message": "Task queued"}

# Check task status
curl "http://localhost:8000/status/abc-123"
# Response: {"task_id": "abc-123", "status": "pending", ...}

# Wait 5+ seconds, check again
curl "http://localhost:8000/status/abc-123"
# Response: {"task_id": "abc-123", "status": "completed", "result_url": "processed_cat.jpg"}

# Check the dead letter queue
redis-cli LRANGE dead_letter_queue 0 -1

# See the worker logs
docker-compose logs -f worker

Scale Workers

# Run 3 worker instances
docker-compose up --scale worker=3

# Submit multiple tasks
for i in {1..10}; do
  curl -X POST "http://localhost:8000/process-image?image_url=image$i.jpg"
done

API Reference

Submit Task

POST /process-image?image_url=<url>

Response:

{
  "task_id": "550e8400-e29b-41d4-a716-446655440000",
  "message": "Task queued"
}

Get Task Status

GET /status/{task_id}

Response (Pending):

{
  "task_id": "550e8400-e29b-41d4-a716-446655440000",
  "image_url": "cat.jpg",
  "status": "pending"
}

Response (Completed):

{
  "task_id": "550e8400-e29b-41d4-a716-446655440000",
  "image_url": "cat.jpg",
  "status": "completed",
  "result_url": "processed_cat.jpg"
}

Response (Failed):

{
  "task_id": "550e8400-e29b-41d4-a716-446655440000",
  "image_url": "cat.jpg",
  "status": "failed",
  "error_msg": "Simulated Network Error!",
  "retry_count": 3
}

Project Structure

async-task-queue/
├── docker-compose.yml          # Container orchestration
├── README.md                   # This file
├── spec/
│   ├── SPEC.md                 # Full specification
│   ├── ARCHITECTURE.md         # System architecture diagrams
│   ├── DATA-FLOW.md            # Sequence diagrams
│   └── TASK-STATES.md          # State machine documentation
├── app/
│   ├── Dockerfile
│   ├── main.py                 # FastAPI application (producer)
│   └── requirements.txt
└── worker/
    ├── Dockerfile
    ├── worker.py               # Main worker loop
    ├── queue_manager.py        # Claim, ack, requeue operations
    ├── task_processor.py       # Task execution logic
    ├── reaper.py               # Recovery of expired/orphan tasks
    ├── heartbeat.py            # Lease extension
    ├── lua_scripts.py          # Atomic Redis Lua scripts
    ├── config.py               # Configuration constants
    ├── redis_client.py         # Redis connection singleton
    └── requirements.txt

Configuration

Variable Default Description
REDIS_HOST localhost Redis server hostname
REDIS_PORT 6379 Redis server port
VISIBILITY_TIMEOUT 30s Time before task is considered abandoned
HEARTBEAT_INTERVAL 10s How often worker extends lease
REAPER_INTERVAL 5s How often reaper checks for expired tasks
REAPER_GRACE_PERIOD 10s Extra time for reaper to complete recovery
MAX_RETRIES 3 Maximum retry attempts before DLQ

How It Works

Task Lifecycle

Submit → Queue → Claim → Process → Complete
                   │
                   └─► Fail → Retry? → Re-queue
                              │
                              └─► Max retries → Dead Letter Queue

Key Mechanisms

  1. Atomic Claiming: BLMOVE atomically moves task from image_queue to processing_queue

  2. Lease Token: Each claim generates a unique token. Only the token holder can acknowledge the task.

  3. Heartbeat: Long-running tasks periodically extend their lease to prevent timeout.

  4. Atomic Ack + Update: Lua scripts ensure ack and status update happen atomically:

    • ACK_COMPLETE_LUA: ack + set completed status
    • ACK_REQUEUE_LUA: ack + update data + push to target queue
  5. Reaper Recovery: Periodically checks for:

    • Expired tasks: Lease timeout exceeded → reclaim and retry/DLQ
    • Orphan tasks: Stuck between BLMOVE and ZADD → immediate requeue

Chaos Testing

The worker includes a 30% random failure rate to simulate real-world flakiness:

if random.random() < 0.3:
    raise Exception("Simulated Network Error!")

This tests retry mechanism, dead letter queue, and reaper recovery.

Limitations and Future Improvements

⚠️ This is a learning demo, not production-ready. Key limitations:

graph TD
    A[Core Limitations] --> B[Race Conditions]
    A --> C[Infrastructure]
    A --> D[Observability]
    
    B --> B1[BLMOVE-ZADD Gap<br/>→ Duplicate Processing]
    B --> B2[Orphan Recovery TOCTOU<br/>→ Unnecessary Requeue]
    
    C --> C1[No Redis Persistence<br/>→ Data Loss on Restart]
    C --> C2[Single Redis Instance<br/>→ No High Availability]
    C --> C3[No Result Expiry<br/>→ Memory Growth]
    
    D --> D1[No Metrics/Monitoring<br/>→ Blind to System Health]
    D --> D2[No Auth/Rate Limiting<br/>→ Security Risk]
Loading

Critical Issues

1. BLMOVE-to-ZADD Gap (Duplicate Processing Risk)

Problem: BLMOVE and ZADD are separate operations. If a worker pauses after BLMOVE, the reaper may reclaim the task before ZADD completes, causing two workers to process it.

Solution: Add grace period to orphan recovery (wait 5s before reclaiming).

2. No Redis Persistence (Data Loss)

Problem: Redis defaults to in-memory only. All tasks/results lost on restart.

Solution: Enable AOF persistence:

redis-server --appendonly yes

3. Single Redis Instance (SPOF)

Problem: If Redis crashes, the entire system stops.

Solution: Use Redis Sentinel (HA) or managed services (ElastiCache, Memorystore).

4. No Result Expiry (Memory Leak)

Problem: task:data:{task_id} keys accumulate forever.

Solution: Set TTL on task completion:

r.setex(f"task:data:{task_id}", 604800, json.dumps(task_data))  # 7 days

5. No Observability

Problem: No metrics for queue depth, latency, or failure rates.

Solution: Export to Prometheus/Grafana for monitoring and alerting.

Alternative: Redis Streams

For production systems, consider Redis Streams instead of Lists. Streams provide:

Why Redis Streams?

Feature Lists (This Demo) Streams
Consumer Groups Manual implementation Built-in XREADGROUP
Message Acknowledgment Custom Lua scripts Native XACK
Pending Messages Custom processing_zset Built-in XPENDING
Auto-Claim Custom reaper Built-in XAUTOCLAIM
Message History Lost after pop Retained until trimmed
Exactly-Once Complex to achieve Easier with consumer groups

Streams Example

# Producer
r.xadd("task_stream", {"task_id": task_id, "image_url": image_url})

# Consumer (with consumer group)
r.xreadgroup("workers", "worker-1", {"task_stream": ">"}, count=1, block=1000)

# Acknowledge
r.xack("task_stream", "workers", message_id)

# Auto-claim stale messages (replaces reaper)
r.xautoclaim("task_stream", "workers", "worker-1", min_idle_time=30000)

Streams Benefits

  1. Built-in visibility timeout: XAUTOCLAIM automatically reclaims messages idle > threshold
  2. Consumer groups: Built-in load balancing across workers
  3. Message persistence: Messages retained until explicitly deleted
  4. Simpler code: Less custom Lua scripts needed
  5. Better observability: XINFO commands for monitoring

When to Use Streams

  • Production systems requiring reliability
  • Need for message history/replay
  • Complex consumer group patterns
  • Built-in pending message tracking

When Lists Are Fine

  • Learning/demo purposes (this project)
  • Simple FIFO queue without replay needs
  • When you want to understand the fundamentals

License

MIT License - See LICENSE for details.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors