Skip to content

Split the celery antenna queue into workload classes (ingest / housekeeping) #1229

@mihow

Description

@mihow

Problem

Today, one RabbitMQ queue (antenna) carries every task type: high-throughput NATS result ingestion, per-image post-processing, periodic health-check beat tasks, data-integrity audits, and one-shot user-triggered actions. A single celeryworker service consumes from it.

During bursty ingest (two active async_api jobs, redelivery amplification), the queue can grow into the tens of thousands of messages. While that backlog drains:

  • Beat-scheduled tasks like check_processing_services_online sit in line behind thousands of ingest results. By the time they run, they're late enough that they either miss their next beat tick entirely or hit their hard time limit because stale state has piled up.
  • Data-integrity / reconciliation tasks that should run in minutes end up running in hours.
  • One-shot user-triggered actions (manual reprocess, export) compete with the flood and feel "frozen" from the UI's perspective.
  • Conversely, if housekeeping tasks become slow (e.g. an external service is timing out in a beat task), they can starve ingest throughput on the same pool.

The underlying problem is queue-workload coupling. Everything is sharing one FIFO and one prefork pool, even though the tasks have very different SLAs and failure modes.

Proposal

Split the antenna queue into at least three workload-scoped queues, each consumed by its own celeryworker service with independently tunable concurrency and prefetch settings. Task routing is handled by task_routes in celery config.

Queues

Queue Purpose Example tasks Suggested pool
antenna-ingest Hot path: ML result ingestion + per-image post-processing. Must not be starved. process_nats_pipeline_result, create_detection_images, fast per-image DB writes Prefork, concurrency 16+ (or gevent once vetted — see #TBD)
antenna-housekeeping Scheduled health / integrity / metric tasks triggered by celerybeat. check_processing_services_online, data-integrity audits, metric recomputes, scheduled exports Prefork, concurrency 2–4
antenna-interactive User-triggered one-shots that should not wait behind ingest. Manual reprocess, on-demand export, bulk-delete Prefork, concurrency 2–4

antenna itself can be kept as the default-fallback queue for anything that doesn't route explicitly, to avoid accidentally dropping tasks during rollout.

Routing config sketch

# config/settings/base.py

CELERY_TASK_DEFAULT_QUEUE = "antenna"

CELERY_TASK_ROUTES = {
    # Hot path: NATS result ingestion and per-image post-processing
    "ami.jobs.tasks.process_nats_pipeline_result": {"queue": "antenna-ingest"},
    "ami.ml.tasks.create_detection_images": {"queue": "antenna-ingest"},
    "ami.main.tasks.*_calculated_fields": {"queue": "antenna-ingest"},

    # Periodic/beat-scheduled
    "ami.ml.tasks.check_processing_services_online": {"queue": "antenna-housekeeping"},
    "ami.main.tasks.update_*": {"queue": "antenna-housekeeping"},
    "ami.jobs.tasks.audit_*": {"queue": "antenna-housekeeping"},

    # Interactive / user-triggered
    "ami.exports.tasks.*": {"queue": "antenna-interactive"},
    # ...
}

Worker service topology

Extend docker-compose.worker.yml (or equivalent) with three celeryworker services sharing the same image, differing only in:

  • --queues= flag (one queue per service)
  • --concurrency (per table above; overridable via env var)
  • Optional --max-tasks-per-child / --max-memory-per-child per workload

Beat remains a single celerybeat service — beat only schedules, it doesn't execute.

Rollout

  1. Add task_routes with the new queue names; keep antenna as default-fallback. Merge and deploy — no behaviour change yet because no workers consume the new queues.
  2. Stand up the new worker services (e.g. celeryworker-ingest, celeryworker-housekeeping, celeryworker-interactive). Now tasks flow to the new queues.
  3. Once traffic is steady, the original single-queue celeryworker can be scaled down to zero (or kept as a safety net for a release cycle).

Why this is worth it

  • Isolates the thing that causes most "stuck job" reports (housekeeping beat tasks blocked behind ingest) from the thing that causes most user-visible work (ingest).
  • Lets us tune concurrency / prefetch / pool class per workload. Ingest can eventually move to gevent for I/O-bound parallelism; housekeeping can stay conservative prefork.
  • Makes failure modes legible. Instead of one queue with a 50k backlog, we can see at a glance which class of work is falling behind.
  • Makes future capacity planning tractable: ingest capacity scales with worker hosts; housekeeping capacity rarely needs to scale at all.

What we still need to decide before implementing

  • The exact task-to-queue mapping. The table above is illustrative; an actual pass through the codebase is needed to enumerate every @shared_task / task= and categorise it.
  • Whether we want a fourth antenna-slow queue for long-running exports / training jobs, separate from antenna-interactive.
  • Rollout coordination: whether to ship routing config and worker-service changes in one PR or two.
  • Whether to make ingest the only queue not subject to CELERY_TASK_DEFAULT_QUEUE, so forgetting to route a new task fails safe (task sits in default queue, doesn't starve ingest).

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions