Improve celery task dispatch and cancellation to prevent stuck jobs#1324
Improve celery task dispatch and cancellation to prevent stuck jobs#1324mihow wants to merge 2 commits into
Conversation
Addresses three reinforcing causes of run_job head-of-line blocking on the jobs queue (#1323) plus an orthogonal cancel bug exposed by the same investigation. - Enable fair scheduling (CELERY_WORKER_POOL_OPTIMIZATION = "fair") so the master process holds prefetched messages in a shared buffer instead of pre-assigning them to specific prefork children. Long heterogeneous tasks (notably run_job inside filter_processed_images) no longer block newer messages stuck behind them on the same child. - Add acks_late=True + reject_on_worker_lost=True to run_job so a worker SIGKILL/OOM mid-task triggers broker redelivery instead of silently dropping the job. Pairs with an early-guard at the top of run_job that returns cleanly if the Job is already in a terminal state or being cancelled, so redelivery never re-runs side effects. - Fix Job.cancel for ASYNC_API: skip terminate=True on the (likely-done) run_job task — the actual work runs on remote ADC workers via NATS, and cleanup_async_job_if_needed is what stops it. Terminating the local bootstrap was a no-op at best and SIGTERM'd a still-bootstrapping child at worst. INTERNAL / SYNC_API keep terminate=True since their celery task body owns the entire job lifecycle. - Document the optional CELERY_WORKER_CONCURRENCY=4 override on the celeryworker_jobs container (commented out for now) so operators can opt in once -O fair is observed in production. Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
|
Warning Review limit reached
More reviews will be available in 46 minutes and 41 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
📝 WalkthroughWalkthroughThis PR addresses blocking behavior in the Celery jobs queue by unifying job cancellation semantics across dispatch modes, adding broker-safe redelivery handling to the ChangesJob Cancellation and Celery Task Resilience
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR improves Celery run_job scheduling and cancellation behavior to reduce stuck-job blast radius and make worker-loss redelivery safer.
Changes:
- Enables fair Celery prefork scheduling globally.
- Adds late acknowledgement/redelivery settings and an early status guard to
run_job. - Changes
Job.cancel()behavior for ASYNC_API jobs and adds regression tests. - Adds a documented optional jobs-worker concurrency override.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
config/settings/base.py |
Adds fair worker pool optimization for Celery workers. |
ami/jobs/tasks.py |
Adds late ack/reject-on-worker-lost and early short-circuit logic for run_job. |
ami/jobs/models.py |
Updates cancellation behavior for async vs sync/internal jobs. |
ami/jobs/tests/test_tasks.py |
Adds early-guard regression tests for run_job. |
ami/jobs/tests/test_jobs.py |
Adds cancellation behavior tests. |
docker-compose.worker.yml |
Documents a possible future jobs-worker concurrency override. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if job.status in JobState.final_states() or job.status == JobState.CANCELING: | ||
| job.logger.info( | ||
| f"Skipping run_job for job {job.pk}: already in status {job.status} " | ||
| f"(redelivery or cancellation in flight)" | ||
| ) | ||
| return |
| task = run_job.AsyncResult(self.task_id) | ||
| if task: | ||
| task.revoke(terminate=True) | ||
| if self.dispatch_mode == JobDispatchMode.ASYNC_API: | ||
| # For async jobs we need to set the status to revoked here since the task already | ||
| # finished (it only queues the images). | ||
| self.status = JobState.REVOKED | ||
| self.save() | ||
| else: | ||
| self.status = JobState.REVOKED | ||
| self.save() | ||
| task.revoke(terminate=not is_async_api) | ||
|
|
||
| self.status = JobState.REVOKED |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 161-170: The pre-run guard using job.status /
JobState.final_states() is insufficient for ASYNC_API jobs because cancellation
may occur after the initial check but before dispatch; to fix, add a second
status refresh and guard immediately before the async dispatch call (right
before queue_images_to_nats) by reloading the Job from the DB (e.g., call the
model refresh/get by PK) and aborting the task (return) if the reloaded
job.status is JobState.CANCELING or in JobState.final_states(), logging a
similar skip message; ensure you reference the same job PK/logger and perform
this check right before queue_images_to_nats to avoid enqueuing work for
canceled jobs.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ed9ca82f-4b76-42e9-ae6e-290f6abf61b2
📒 Files selected for processing (6)
ami/jobs/models.pyami/jobs/tasks.pyami/jobs/tests/test_jobs.pyami/jobs/tests/test_tasks.pyconfig/settings/base.pydocker-compose.worker.yml
| # Early-guard: under acks_late, the broker may redeliver this message after a | ||
| # worker SIGKILL/OOM, and Job.cancel() may also flip status to CANCELING / | ||
| # REVOKED while the message sits in the prefetch buffer. Don't re-run a job | ||
| # that's already settled or being torn down. | ||
| if job.status in JobState.final_states() or job.status == JobState.CANCELING: | ||
| job.logger.info( | ||
| f"Skipping run_job for job {job.pk}: already in status {job.status} " | ||
| f"(redelivery or cancellation in flight)" | ||
| ) | ||
| return |
There was a problem hiding this comment.
Entry-only cancel guard is race-prone for ASYNC_API jobs.
Lines 165-170 guard only before job.run(). If cancel happens after that check, the task can still reach async dispatch and enqueue work under a canceled job because ASYNC_API cancel no longer terminates the worker process. Add a second DB refresh/status check immediately before async dispatch (e.g., right before queue_images_to_nats) and abort when status is CANCELING/terminal.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@ami/jobs/tasks.py` around lines 161 - 170, The pre-run guard using job.status
/ JobState.final_states() is insufficient for ASYNC_API jobs because
cancellation may occur after the initial check but before dispatch; to fix, add
a second status refresh and guard immediately before the async dispatch call
(right before queue_images_to_nats) by reloading the Job from the DB (e.g., call
the model refresh/get by PK) and aborting the task (return) if the reloaded
job.status is JobState.CANCELING or in JobState.final_states(), logging a
similar skip message; ensure you reference the same job PK/logger and perform
this check right before queue_images_to_nats to avoid enqueuing work for
canceled jobs.
Review on #1324 surfaced two races that left the early-guard non-functional in production: 1. ``task_prerun`` (``pre_update_job_status``) wrote PENDING to the row before the ``run_job`` body inspected status. A canceled or redelivered message therefore had its REVOKED/CANCELING overwritten with PENDING, and the early-guard added in the parent commit never tripped. The existing tests passed only because they invoked ``run_job.apply(args=[…])`` while production uses ``kwargs={"job_id": …}`` — under args, the prerun handler raised ``KeyError`` and exited silently. Switching the tests to ``kwargs=`` reproduces the production code path; the prerun handler now short-circuits when ``Job.is_settled()`` is true, preserving the status the early-guard reads next. 2. For ASYNC_API jobs ``Job.cancel()`` revokes without ``terminate=True``, marks the row REVOKED, and tears down the NATS stream + Redis state. ``MLJob.run`` running in a worker that's still inside ``collect_images`` (slow for large collections) would then proceed to ``queue_images_to_nats`` and recreate the stream the cancel just deleted, dispatching real GPU work to ADC for a revoked job; the results came back to no Redis state and ``_fail_job`` silently overwrote REVOKED with FAILURE. The bootstrap now checks ``Job.status`` (via a values-only read so the in-memory ``progress`` mutations don't clobber the cancel's REVOKED) right after the collect stage and bails out before any dispatch. Adds ``Job.is_settled()`` to centralize the "terminal or being torn down" predicate that ``run_job``'s early-guard, the prerun handler, ``_fail_job``, and the bootstrap guard all needed. Adds two regression tests: one for the prerun-then-guard chain, one for the cancel-during-bootstrap race. Co-Authored-By: Claude <noreply@anthropic.com>
Closes #1323.
Last week we hit the symptom Issue #1323 describes: a
run_jobwas sitting insidefilter_processed_images()for ~9 minutes against a huge collection, and a freshrun_jobqueued behind it sat inRESERVEDon the same worker container the entire time — even though 15 of 16 children on that container were idle and the entire sibling container was idle.SIGKILL'ing the blocker let the queued job start in the same second. The job that finally ran was almost certainly fine, but it wasted nine minutes of wall clock for no good reason, and any user clicking "run" during that window saw nothing happen.This PR fixes the three reinforcing causes the issue identified, plus an orthogonal cancel-path bug I noticed while reading the code paths. Each one is a small config or decorator change; the value is in stacking them.
What changes for users
run_jobno longer blocks other jobs in the queue. A slow first job releases the worker slot for the next one as soon as a sibling child is idle, rather than holding 15 idle slots hostage.STARTEDforever until the reaper found it. After: broker holds the message, redelivers it when a worker comes back. The job either resumes or — if it had already settled — exits cleanly.Job.cancelcallsrevoke(terminate=True)on the localrun_jobtask. For ASYNC_API that task has almost always already finished (queue_images_to_natsreturns fast) — terminating it does nothing about the actual work running on the remote ADC worker, and on the rare occasion the bootstrap is still running, the SIGTERM kills it without redelivery. The real cancel mechanism for async is tearing down the NATS stream + Redis state, whichcleanup_async_job_if_neededalready does. We now skip terminate for ASYNC_API.Plain-language summary
run_jobblocking sibling jobs on same containerrun_jobblocking sibling jobs on sibling containerrun_jobWhat's in this PR
config/settings/base.py—CELERY_WORKER_POOL_OPTIMIZATION = "fair". One line. Applies to all queues; the value is largely onjobs.ami/jobs/tasks.py—acks_late=True, reject_on_worker_lost=Trueonrun_job, plus an early-guard at the top of the task body that returns cleanly whenjob.statusis infinal_states()orCANCELING. The guard is what makes redelivery and cancel-race safe.ami/jobs/models.py—Job.cancelno longer passesterminate=Truewhendispatch_mode == ASYNC_API. For other dispatch modes, behavior is unchanged.docker-compose.worker.yml— added a commented-outCELERY_WORKER_CONCURRENCY: "4"onceleryworker_jobswith a TODO referencing this issue, in case we want to enable cause C later.A note on the counter-intuitive concurrency knob
The issue suggests lowering per-container concurrency on the jobs queue as a third fix. I've left this commented out for now (just a discoverability hint in
docker-compose.worker.yml) because it reads as backwards and I'd rather watch the first two fixes in production before pulling this lever too.The reasoning, briefly: celery's prefetch reserves
concurrency × prefetch_multiplier(=1)messages per container at the broker level. With concurrency=16, that's 16 messages held in the container's local buffer. When one of those messages is a stuck task, the container still tells the broker "I have 15 free slots" — and the broker keeps offering new messages to that container instead of spilling to a fully-idle sibling. Lowering concurrency to 4 shrinks the reservation window so the broker spills sooner.The reason this isn't a meaningful capacity cut:
run_jobspends nearly all its time waiting on NATS results to come back, not burning CPU. The 16 was originally raised (#1228) forml_resultsandantenna, which are DB/Redis-bound and benefit from oversubscription. Thejobsqueue inherited the high number incidentally.What the three fixes are actually doing
There are three different head-of-line problems, with three different mechanisms:
-O fair.acks_lateis orthogonal to all three — it's about surviving worker death, not about scheduling. But it's a precondition for both the cancel fix to be safe (cancellation can now terminate a worker without losing the redelivered message that the early-guard will short-circuit) and for the deferred concurrency change to be safe (smaller pools mean each child handles more tasks, and a single SIGKILL hurts more).What's verified
Job.cancelfor ASYNC_API / SYNC_API / no-task-id paths.run_jobearly-guard coveringREVOKED,CANCELING,SUCCESS, and the contract pair (PENDINGstill runs).ami.jobs.tests(118 tests) andami.ml.tests+ami.ml.orchestration.tests(81 tests) green.What still needs verifying in staging/prod
From the issue's "what we still need to verify" section, two of three are now testable:
-O fairworker — confirmRESERVED → ACTIVEhappens immediately when a sibling child is idle. I'd want to do this on dev box (queue two longrun_jobs back-to-back, sleep one).-O fairinteraction withmax-tasks-per-child=100— should be fine but worth watching for the first day after deploy.acks_lateredelivery in practice — the early-guard makes a redeliveredrun_joba no-op when the job is already settled, so this is covered by the tests, but worth eyeballing the celery logs after deploy for unexpectedSkipping run_jobmessages.The cancel fix is the one I'd most like a second pair of eyes on — the docstring is the long version, but the gist is "for ASYNC_API the celery task isn't where the work is, so terminating it doesn't cancel; cleanup does."
Related
filter_processed_imagesslowness, the upstream cause of the longrun_jobthat exposed this. Fixing Preparing jobs takes too long preparing large collections #1321 reduces the frequency of stuck jobs; this PR reduces the blast radius when they happen.CELERY_WORKER_CONCURRENCYto 16 for ml/antenna queues. Context for why the jobs queue inherited the same number.Co-Authored-By: Claude noreply@anthropic.com
Summary by CodeRabbit
Bug Fixes
Chores