Production-ready Celery task patterns: retries, chains, groups, chords, periodic tasks, and Flower monitoring.
# Start Redis + worker + beat + Flower
docker-compose up --build
# Open Flower monitoring dashboard
open http://localhost:5555pip install -r requirements.txt
# Terminal 1: Start Redis
docker run -d -p 6379:6379 redis:7-alpine
# Terminal 2: Start worker
celery -A celery_app worker --loglevel=info --concurrency=4
# Terminal 3: Start beat scheduler
celery -A celery_app beat --loglevel=info
# Terminal 4: Start Flower (monitoring)
celery -A celery_app flower --port=5555from tasks.email_tasks import send_welcome_email
# Fire-and-forget (async)
result = send_welcome_email.delay(
user_id=1,
email="alice@example.com",
name="Alice"
)
# Wait for result
output = result.get(timeout=30)
print(output) # {"status": "sent", "to": "alice@example.com", ...}
# Check state
print(result.state) # PENDING, STARTED, SUCCESS, FAILURE, RETRYRetry policy: automatically retries with exponential backoff (60s, 120s, 240s) on failure.
Output of each task becomes first argument of the next.
from celery import chain
from tasks.data_pipeline import extract_data, transform_data, load_data, validate_data
# Build the chain
etl_pipeline = chain(
extract_data.s("orders_db"),
transform_data.s(), # receives output of extract_data
load_data.s("data_warehouse"), # receives output of transform_data
validate_data.s(), # receives output of load_data
)
# Execute
result = etl_pipeline.apply_async()
final = result.get(timeout=60)
print(final)Pipe syntax (equivalent):
result = (extract_data.s("orders_db") | transform_data.s() | load_data.s() | validate_data.s()).apply_async()All tasks run simultaneously; returns list of results in original order.
from celery import group
from tasks.data_pipeline import extract_data
# Run 3 extracts in parallel
job = group(
extract_data.s("orders_db"),
extract_data.s("inventory_db"),
extract_data.s("analytics_db"),
)
results = job.apply_async()
outputs = results.get(timeout=60) # list of 3 resultsRun a group in parallel; when ALL complete, run a callback with the list of results.
from celery import chord, group
from tasks.data_pipeline import extract_data, aggregate_results
job = chord(
group(
extract_data.s("orders"),
extract_data.s("inventory"),
extract_data.s("users"),
),
aggregate_results.s(), # called with [result1, result2, result3]
)
final = job().get(timeout=60)from tasks.data_pipeline import run_multi_source_pipeline
result = run_multi_source_pipeline(
sources=["orders_db", "inventory_db", "clickstream"],
pipeline_id="daily-etl-2024",
)
summary = result.get(timeout=120)
print(summary["total_rows_processed"])from tasks.email_tasks import run_email_campaign
users = list(range(1, 10001)) # 10,000 user IDs
result = run_email_campaign.delay(
campaign_id="promo-2024-06",
segment_user_ids=users,
template="summer_sale",
)
report = result.get(timeout=300)
print(f"Sent: {report['total_sent']}, Failed: {report['total_failed']}")Configure in beat/schedules.py:
app.conf.beat_schedule = {
"health-check-every-minute": {
"task": "tasks.monitoring.health_check",
"schedule": 60.0, # every 60 seconds
},
"cleanup-daily-at-2am": {
"task": "tasks.monitoring.cleanup_sessions",
"schedule": crontab(hour=2, minute=0),
},
"weekday-report-7am": {
"task": "tasks.monitoring.daily_report",
"schedule": crontab(hour=7, day_of_week="mon-fri"),
},
}Flower is a real-time web UI for Celery monitoring.
http://localhost:5555
| Feature | Description |
|---|---|
| Tasks | View pending, active, completed, failed tasks |
| Workers | CPU, memory, task counts per worker |
| Broker | Queue sizes, message rates |
| Charts | Tasks per second, failure rates |
API:
# Get all workers
curl http://localhost:5555/api/workers
# Get task info by ID
curl http://localhost:5555/api/task/result/{task_id}
# Get active tasks
curl http://localhost:5555/api/tasks?state=ACTIVE@app.task(
bind=True, # access self (task instance)
max_retries=3, # max retry attempts
default_retry_delay=60, # seconds before retry
queue="email", # route to specific queue
time_limit=300, # hard kill after 5 minutes
soft_time_limit=240, # raise exception after 4 minutes
rate_limit="100/m", # max 100 tasks/minute
ignore_result=True, # don't store result (fire-and-forget)
acks_late=True, # ack after completion (safe)
)
def my_task(self, arg1, arg2):
try:
do_work(arg1, arg2)
except TransientError as exc:
raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))| Pattern | Use When |
|---|---|
.delay() / .apply_async() |
Single task, fire and forget |
chain |
Sequential steps where each needs previous output |
group |
Independent tasks that can run in parallel |
chord |
Parallel tasks + final aggregation step |
| Beat | Recurring scheduled tasks (cron jobs) |
MIT