Skip to content

fix(jobs): prevent jobs from hanging in STARTED state with no progress#1234

Merged
mihow merged 4 commits intomainfrom
fix/async-job-ack-srem-ordering
Apr 15, 2026
Merged

fix(jobs): prevent jobs from hanging in STARTED state with no progress#1234
mihow merged 4 commits intomainfrom
fix/async-job-ack-srem-ordering

Conversation

@mihow
Copy link
Copy Markdown
Collaborator

@mihow mihow commented Apr 15, 2026

What this fixes, in plain language

Each NATS message represents one image being processed. The result handler does three things that need to be durable: (1) save the detections to the database, (2) tell Redis "this image is done" by removing it from the pending set, and (3) tell NATS "this message is done, don't redeliver it" (ACK).

Before this PR the order was 1 → 32 — ACK happened before the Redis update. If the worker process died in the tiny window between the ACK and the Redis write, NATS thought the work was complete (no redelivery, ever) but Redis still listed the image as pending. The job's results stage could never reach 100%, so the job sat in STARTED forever and no code path noticed.

The fix is the obvious one: do them in 1 → 23. ACK last. If anything between save_results and ACK crashes, NATS will notice no ACK arrived within ack_wait (30s) and redeliver. The redelivered worker re-runs everything safely — save_results deduplicates by (detection, source_image), the Redis SREM is a no-op for already-removed ids (we count "0 newly removed" and skip the counter accumulation so detections/captures don't double), and the progress percentage is clamped by max() so it can't go backwards.

The Bug C guard is the same idea for the outer Celery task: if run_job raises after queueing images to NATS, the task_failure signal previously marked the job FAILURE and tore down the NATS stream + Redis state — killing in-flight processing. The guard now defers FAILURE for ASYNC_API jobs that aren't progress.is_complete(), letting the result handler own the terminal state.


Summary

Resolves several causes of dispatch_mode=async_api ML jobs landing in unrecoverable states. Built off observed production incidents where jobs sat in STARTED forever or transitioned to FAILURE within seconds despite valid in-flight processing.

Bug A — stranded STARTED (the main symptom): process_nats_pipeline_result acked NATS before the results-stage Redis SREM. A worker death between those two writes drained the NATS message permanently while Redis kept the image id in pending_images:results. Job stayed at partial progress; nothing ever reconciled it. ACK now runs last, after save_results, the results-stage SREM, and _update_job_progress are all durable. On crash, NATS redelivers; the path re-runs idempotently (save_results dedupes, SREM is a no-op for already-removed ids, percentage is clamped by max()).

Counter-inflation on replay (antenna#1232 territory): _update_job_progress("results") reads existing detection/classification/capture counts from Job.progress and adds new ones, so a NATS redeliver or Celery retry that lands past the first SREM would double-count. AsyncJobStateManager.update_state now returns the SREM integer result via JobStateProgress.newly_removed; the caller passes 0 counts when newly_removed == 0.

Bug C — task_failure collapsing in-flight jobs: the task_failure signal unconditionally marked ASYNC_API jobs FAILURE and tore down NATS/Redis state, even when results were still arriving. Added a guard mirroring the SUCCESS guard in task_postrun: defer terminal state to the async progress handler when dispatch_mode==ASYNC_API and progress.is_complete() is False.

Reduce NATS max_deliver 5 → 2: a processing service that consistently returns bad data (e.g. a result referencing an algorithm not declared in the pipeline) burns ADC + worker time on every retry. One retry covers a transient blip; more is waste. Hoisted to a NATS_MAX_DELIVER module constant configurable via settings.NATS_MAX_DELIVER.

Diagnostic logging in _update_job_progress: when max() lifts the passed percentage to 1.0 from a partial value (catches state-race symptoms that were previously invisible), and a structured log line at cleanup time showing which stages satisfied is_complete(). These are how we'll spot the next class of premature-cleanup bug.

Docs:

  • docs/claude/processing-lifecycle.md — single-source-of-truth lifecycle reference for ASYNC_API jobs (happy path, invariants, failure modes, call-site map, full per-job trace block)
  • docs/claude/debugging/chaos-scenarios.md — manual fault-injection runbook (sentinel-file Redis fault, FLUSHDB, ACK/SREM crash window, max-retries) for verifying the retry path against a live stack

Operator notes

Visibility trade-off for the Bug C guard. Before this PR, any run_job exception (including genuine terminal bugs raised after queuing) produced an immediate FAILURE row with the traceback on Job.logger.error. After this PR, those exceptions become a single job.logger.warning line with the phrase "run_job raised but async processing is in-flight; deferring FAILURE to async progress handler" and the job row stays STARTED. A true terminal failure now only surfaces as FAILURE when the stale-job reaper picks it up — the companion PR reducing that cutoff from 72h to the Job.STALLED_JOBS_MAX_MINUTES window (5–10 min) closes this gap. If you run log-based alerting, consider a warn-level alert on the "deferring FAILURE" phrase to catch regressions even faster.

max_deliver: 5 → 2 is a behavioural change for flaky networks. The new default is right for deployments where retries are mostly burning time on persistent bad-data; deployments with high packet loss or unreliable processing-service latency may want to raise it. Override via settings.NATS_MAX_DELIVER (per environment) without a code change.

Test plan

  • Existing tests pass (docker compose -f docker-compose.ci.yml run --rm django python manage.py test ami.jobs.tests.test_tasks ami.ml.tests --keepdb → 61 tests OK)
  • New regression tests:
    • test_ack_deferred_until_after_results_stage_srem — patches update_state to fail on results stage; asserts ACK is not called.
    • test_results_counter_does_not_inflate_on_replay — double-delivers the same result; asserts captures stays at 1.
    • test_task_failure_guard_defers_for_async_api_in_flight — calls update_job_failure for an ASYNC_API job with Redis state initialised; asserts the job stays STARTED, cleanup is NOT called, Redis pending set survives, and the deferral warning is logged.
    • test_task_failure_marks_sync_api_job_failure_and_cleans_up — the contract pair: SYNC_API jobs still take the terminal FAILURE + cleanup path.
  • Live e2e against local stack: 56-image collection through pipeline mothbot_insect_orders_2025. Confirmed no jobs strand in STARTED. Known-bad results path (algorithm mismatch in processing service) exercises the broad-except + redeliver flow as expected.
  • Live verification of the diagnostic logs catching a Bug-B-class state race (followup work in the integration branch — not required for this PR's correctness)

Summary by CodeRabbit

  • New Features

    • Configurable NATS message retry limits (defaults to 2 attempts).
  • Bug Fixes

    • Improved async job reliability with better acknowledgment handling.
    • Prevented duplicate result counters on message replays.
    • Enhanced transient error recovery in async workflows.
  • Documentation

    • Added async job processing lifecycle guide.
    • Added debugging and chaos testing runbook.

…in-flight async jobs

Resolves two related causes of async_api jobs landing in unrecoverable states:

* Bug A (stranded STARTED): process_nats_pipeline_result acked NATS before
  the results-stage Redis SREM. A worker crash between those two writes
  drained the message from NATS (no redelivery) while Redis kept the image
  id in pending_images:results, leaving the job at partial progress with
  no path to completion. ACK now happens last, after save_results, the
  results-stage SREM, and _update_job_progress are all durable.

* Counter-inflation on replay: AsyncJobStateManager.update_state now returns
  the SREM integer result via JobStateProgress.newly_removed, and the result
  handler gates detections/classifications/captures accumulation on it being
  > 0. Replays (NATS redeliver or Celery retry) pass zeros so counters stay
  idempotent.

* Bug C (task_failure collapsing in-flight jobs): the task_failure signal
  unconditionally marked ASYNC_API jobs FAILURE and tore down NATS/Redis
  state, even when images were still being processed. Added a guard
  mirroring the SUCCESS guard in task_postrun: defer terminal state to the
  async progress handler when dispatch_mode==ASYNC_API and progress is not
  complete.

Also reduces NATS max_deliver from 5 to 2 (one retry covers a transient
blip; more burns ADC + worker time on consistently-bad results) and hoists
it to a top-of-file NATS_MAX_DELIVER constant configurable via settings.

Adds diagnostic logging in _update_job_progress: warns when max() lifts
the passed percentage to 1.0 (catches state-race symptoms otherwise
invisible) and logs which stages satisfied is_complete() at cleanup time.

Tests:
- test_ack_deferred_until_after_results_stage_srem: patches update_state
  to fail on results stage; asserts ACK is not called.
- test_results_counter_does_not_inflate_on_replay: double-delivers the
  same result; asserts captures stays at 1.

Co-Authored-By: Claude <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 15, 2026 18:15
@netlify
Copy link
Copy Markdown

netlify bot commented Apr 15, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 7f3b0e1
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69dfe1ce342da400083f9837

@netlify
Copy link
Copy Markdown

netlify bot commented Apr 15, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 7f3b0e1
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69dfe1ce7ea5bb0008af64b4

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 15, 2026

Warning

Rate limit exceeded

@mihow has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 9 minutes and 20 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 9 minutes and 20 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e95e846b-68b5-4c59-9652-979f6d8dc867

📥 Commits

Reviewing files that changed from the base of the PR and between 476a710 and 7f3b0e1.

📒 Files selected for processing (6)
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_tasks.py
  • ami/ml/orchestration/nats_queue.py
  • ami/ml/orchestration/tests/test_cleanup.py
  • docs/claude/debugging/chaos-scenarios.md
  • docs/claude/processing-lifecycle.md
📝 Walkthrough

Walkthrough

This PR improves async job processing reliability by deferring NATS message acknowledgment until after Redis state updates complete, adding idempotency guards to prevent progress counter inflation on replays, introducing tracking of removed items from Redis sets, making JetStream delivery limits configurable, and implementing failure-state deferral for incomplete ASYNC_API jobs. Includes comprehensive documentation and new test coverage.

Changes

Cohort / File(s) Summary
Core Async Job Logic
ami/jobs/tasks.py, ami/ml/orchestration/async_job_state.py, ami/ml/orchestration/nats_queue.py
Defers NATS ACK until results-stage Redis SREM and progress updates complete; adds newly_removed field to track SREM return values; guards progress counter accumulation with idempotency check (only increment when newly_removed > 0); defers terminal FAILURE for incomplete ASYNC_API jobs; makes NATS delivery attempt cap configurable via NATS_MAX_DELIVER setting.
Test Coverage
ami/jobs/tests/test_tasks.py
Adds two new test cases: one validating that ACK is not called when results-stage SREM fails, another verifying counters do not double-count on repeated message delivery (replay scenarios).
Documentation
docs/claude/debugging/chaos-scenarios.md, docs/claude/processing-lifecycle.md
New runbook for live fault-injection testing of async job execution with five ordered scenarios covering happy path, transient errors, Redis state loss, ACK/SREM ordering crashes, and exhausted retries; comprehensive lifecycle documentation mapping dispatch through cleanup with state invariants, failure modes, and diagnostic workflows.

Sequence Diagram

sequenceDiagram
    participant NATS as NATS JetStream
    participant Task as process_nats_<br/>pipeline_result
    participant StateMan as AsyncJobStateManager
    participant Redis as Redis
    participant Fail as update_job_<br/>failure
    
    NATS->>Task: Deliver PipelineResultsResponse
    Task->>StateMan: update_state(...,<br/>stage="results",<br/>processed_image_ids=...)
    activate StateMan
    StateMan->>Redis: SREM (remove pending ids)
    Redis-->>StateMan: newly_removed count
    StateMan->>Redis: Update progress fields
    StateMan-->>Task: Return JobStateProgress<br/>(with newly_removed)
    deactivate StateMan
    
    Task->>Task: _update_job_progress<br/>(results stage)
    activate Task
    Task->>Task: Check if newly_removed > 0<br/>(idempotency guard)
    alt Replay (newly_removed == 0)
        Task->>Task: Update % & status only,<br/>counters = 0
    else First delivery (newly_removed > 0)
        Task->>Task: Accumulate counters,<br/>update % & status
    end
    deactivate Task
    
    Task->>NATS: ACK message<br/>(deferred until after SREM)
    
    alt ASYNC_API & not is_complete()
        Task->>Fail: update_job_failure(...)
        Fail->>Fail: Defer FAILURE,<br/>return early
    else Sync or Complete
        Fail->>Fail: Set FAILURE<br/>and cleanup
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐰 Hops to SREM, then ACKs the queue,
Idempotent counters—no double-count brew!
Replays bounce gently, state stays just right,
Async jobs flourish in chaos-proof night!

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 72.73% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title is vague and overly generic; it uses the phrase 'prevent jobs from hanging' without clearly specifying the root cause (ACK/SREM ordering) or the main technical fix. Consider a more specific title like 'fix(jobs): ACK NATS after results-stage SREM and defer task_failure for in-flight ASYNC_API jobs' to clearly communicate the primary changes.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The PR description is comprehensive and well-structured, covering all major template sections including summary, detailed description, testing approach, and deployment notes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/async-job-ack-srem-ordering

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@mihow mihow changed the title fix(jobs): ack NATS after results-stage SREM; defer task_failure for in-flight async jobs fix(jobs): prevent jobs from hanging in STARTED state with no progress Apr 15, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes reliability issues for dispatch_mode=ASYNC_API job processing so NATS redelivery remains possible until Redis/DB progress is durably recorded, and prevents premature terminal failure for in-flight async jobs.

Changes:

  • Reorders process_nats_pipeline_result to ACK NATS only after results-stage Redis state + job progress are updated; adds replay-safe counter gating.
  • Adds an ASYNC_API guard to task_failure to avoid collapsing jobs to FAILURE while async processing is still in-flight.
  • Introduces configurable NATS_MAX_DELIVER, adds diagnostics, and adds regression tests + lifecycle/runbook docs.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
docs/claude/processing-lifecycle.md New lifecycle doc describing ASYNC_API invariants/flows and debugging reference.
docs/claude/debugging/chaos-scenarios.md New runbook for manual fault-injection to validate retry/redelivery behavior.
ami/ml/orchestration/nats_queue.py Makes max_deliver configurable via NATS_MAX_DELIVER constant.
ami/ml/orchestration/async_job_state.py Adds newly_removed to propagate SREM result for replay/idempotency gating.
ami/jobs/tests/test_tasks.py Adds regression tests for ACK ordering and replay counter inflation.
ami/jobs/tasks.py Defers ACK until after results-stage SREM/progress update; gates counters on newly_removed; adds diagnostics; guards task_failure for ASYNC_API in-flight jobs.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread docs/claude/processing-lifecycle.md Outdated
Comment thread docs/claude/processing-lifecycle.md Outdated
Comment thread docs/claude/processing-lifecycle.md Outdated
Comment thread docs/claude/processing-lifecycle.md Outdated
Comment thread docs/claude/processing-lifecycle.md Outdated
Comment thread docs/claude/debugging/chaos-scenarios.md Outdated
Comment thread ami/ml/orchestration/nats_queue.py Outdated
Comment thread ami/jobs/tasks.py Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
ami/jobs/tasks.py (1)

382-391: Placeholder issue number in diagnostic comment.

Line 385 references antenna#???? — consider filling in the actual issue number or removing the placeholder before merge.

Suggested fix
-        # Diagnostic: when max() lifts the percentage to 1.0 from a partial value
-        # this worker computed, surface it. A legitimate jump means another
-        # worker concurrently completed the stage; an unexpected jump (e.g. the
-        # premature-cleanup pattern from antenna#????) is otherwise invisible.
+        # Diagnostic: when max() lifts the percentage to 1.0 from a partial value
+        # this worker computed, surface it. A legitimate jump means another
+        # worker concurrently completed the stage; an unexpected jump (e.g. the
+        # premature-cleanup pattern) is otherwise invisible.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tasks.py` around lines 382 - 391, The diagnostic comment contains a
placeholder "antenna#????"—either replace it with the correct issue/PR number or
remove the placeholder entirely; locate the block around variables
existing_progress, progress_percentage, passed_progress and the
job.logger.warning call in the function that logs stage progress (the comment
above the if-statement referencing max() guard) and update the comment text to
either the real issue reference (e.g., antenna#1234) or remove the
"antenna#????" fragment so no placeholder remains in the message.
docs/claude/debugging/chaos-scenarios.md (1)

26-29: Add language specifier to fenced code blocks.

Multiple code blocks in this document lack language specifiers (lines 26, 70, 75). Adding bash or shell improves rendering and satisfies linting.

Example fix for line 26
-```
+```bash
 docker compose exec django python manage.py chaos_monkey flush redis
 docker compose exec django python manage.py chaos_monkey flush nats
</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against the current code and only fix it if needed.

In @docs/claude/debugging/chaos-scenarios.md around lines 26 - 29, The fenced
code blocks containing shell commands (e.g., the block with "docker compose exec
django python manage.py chaos_monkey flush redis" and "docker compose exec
django python manage.py chaos_monkey flush nats", plus the other blocks noted in
the review) are missing a language specifier; update each triple-backtick fence
to open with bash (or shell) so the blocks render and satisfy linting,
ensuring you change the fences surrounding those exact command blocks and any
other code fences called out in the review.


</details>

</blockquote></details>
<details>
<summary>docs/claude/processing-lifecycle.md (1)</summary><blockquote>

`13-13`: **Add language specifier to fenced code block.**

The diagram code block should specify a language for proper rendering and to satisfy linting.

<details>
<summary>Suggested fix</summary>

```diff
-```
+```text
 [user] POST /api/v2/jobs/{id}/run?start_now=true
```
</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against the current code and only fix it if needed.

In @docs/claude/processing-lifecycle.md at line 13, The fenced code block in the
document is missing a language specifier; update the opening fence in
docs/claude/processing-lifecycle.md from to include a language (e.g., change to ```text) so the diagram block is rendered and linted correctly — look for
the code block containing "[user] POST /api/v2/jobs/{id}/run?start_now=true" and
add the language tag to the opening fence.


</details>

</blockquote></details>

</blockquote></details>

<details>
<summary>🤖 Prompt for all review comments with AI agents</summary>

Verify each finding against the current code and only fix it if needed.

Inline comments:
In @docs/claude/processing-lifecycle.md:

  • Line 73: The docs line references the wrong field name: replace the typo
    newly_processed with the actual field newly_removed used in the code (see
    the newly_removed variable in async job state) so the description of
    _update_job_progress's counter-accumulator on the results stage correctly
    matches the implementation and avoids confusion about the SREM behavior.
  • Line 96: Update the doc sentence that claims update_job_failure always calls
    cleanup_async_job_if_needed: change it to state that update_job_failure now
    guards cleanup for ASYNC_API runs by returning early when progress.is_complete()
    is false (see the check in update_job_failure), so cleanup_async_job_if_needed
    is not invoked for in-flight async jobs; reference the functions
    update_job_failure and cleanup_async_job_if_needed and the
    progress.is_complete() guard when making the wording change.
  • Line 83: Update the docs/claude/processing-lifecycle.md row for "Bug C" to
    reflect that the guard has been implemented: replace the "Separate issue. Add
    not job.progress.is_complete() and dispatch_mode==ASYNC_API guard" TODO with a
    statement that the task_failure signal now includes the not job.progress.is_complete() and dispatch_mode==ASYNC_API guard (matching
    task_postrun), and mark the issue as fixed/implemented in the table.

Nitpick comments:
In @ami/jobs/tasks.py:

  • Around line 382-391: The diagnostic comment contains a placeholder
    "antenna#????"—either replace it with the correct issue/PR number or remove the
    placeholder entirely; locate the block around variables existing_progress,
    progress_percentage, passed_progress and the job.logger.warning call in the
    function that logs stage progress (the comment above the if-statement
    referencing max() guard) and update the comment text to either the real issue
    reference (e.g., antenna#1234) or remove the "antenna#????" fragment so no
    placeholder remains in the message.

In @docs/claude/debugging/chaos-scenarios.md:

  • Around line 26-29: The fenced code blocks containing shell commands (e.g., the
    block with "docker compose exec django python manage.py chaos_monkey flush
    redis" and "docker compose exec django python manage.py chaos_monkey flush
    nats", plus the other blocks noted in the review) are missing a language
    specifier; update each triple-backtick fence to open with bash (or shell)
    so the blocks render and satisfy linting, ensuring you change the fences
    surrounding those exact command blocks and any other code fences called out in
    the review.

In @docs/claude/processing-lifecycle.md:

  • Line 13: The fenced code block in the document is missing a language
    specifier; update the opening fence in docs/claude/processing-lifecycle.md from
    to include a language (e.g., change to ```text) so the diagram block is
    rendered and linted correctly — look for the code block containing "[user] POST
    /api/v2/jobs/{id}/run?start_now=true" and add the language tag to the opening
    fence.

</details>

<details>
<summary>🪄 Autofix (Beta)</summary>

Fix all unresolved CodeRabbit comments on this PR:

- [ ] <!-- {"checkboxId": "4b0d0e0a-96d7-4f10-b296-3a18ea78f0b9"} --> Push a commit to this branch (recommended)
- [ ] <!-- {"checkboxId": "ff5b1114-7d8c-49e6-8ac1-43f82af23a33"} --> Create a new PR with the fixes

</details>

---

<details>
<summary>ℹ️ Review info</summary>

<details>
<summary>⚙️ Run configuration</summary>

**Configuration used**: defaults

**Review profile**: CHILL

**Plan**: Pro

**Run ID**: `a2286aa1-ead8-43be-adfb-fb9f4fe8c24b`

</details>

<details>
<summary>📥 Commits</summary>

Reviewing files that changed from the base of the PR and between 4686340c8dd6cc14ae014c84c13ae10c8e8e5551 and 476a71049104dcb02f33ecc063e9c4964acd451c.

</details>

<details>
<summary>📒 Files selected for processing (6)</summary>

* `ami/jobs/tasks.py`
* `ami/jobs/tests/test_tasks.py`
* `ami/ml/orchestration/async_job_state.py`
* `ami/ml/orchestration/nats_queue.py`
* `docs/claude/debugging/chaos-scenarios.md`
* `docs/claude/processing-lifecycle.md`

</details>

</details>

<!-- This is an auto-generated comment by CodeRabbit for review status -->

Comment thread docs/claude/processing-lifecycle.md Outdated
Comment thread docs/claude/processing-lifecycle.md Outdated
Comment thread docs/claude/processing-lifecycle.md Outdated
… code

Copilot and CodeRabbit flagged that processing-lifecycle.md still described
pre-Fix-1 state: flow diagram showed ACK before the results-stage SREM,
invariants table referenced a nonexistent 'newly_processed' gate, Bug C row
said "not in scope" despite the guard being in this PR, and update_job_failure
was described as "always" calling cleanup. Also reworded chaos-scenarios.md
Scenario D so the prose matches the crash-injection point in the procedure.

Also:
- Remove 'antenna#????' placeholder from the max()-lift diagnostic comment in
  _update_job_progress; point at the lifecycle doc's "Bug B" section instead.
- Reword the NATS_MAX_DELIVER comment in nats_queue.py so it does not read as
  a TODO when the settings override is already wired up on the next line.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow
Copy link
Copy Markdown
Collaborator Author

mihow commented Apr 15, 2026

Claude says: Takeaway-review pass (run before the automated line-by-line). Addressed the 11 doc/comment nits in 976f37b. Structural observations that are harder to see in the diff:

  1. Test coverage is asymmetric. Bug A has test_ack_deferred_until_after_results_stage_srem, the counter replay has test_results_counter_does_not_inflate_on_replay, but the Bug C guard in update_job_failure (the biggest behavioural change in this PR — a run_job crash now doesn't flip the job to FAILURE for ASYNC_API) has no dedicated regression test. Worth adding one before merge: dispatch an ASYNC_API job, raise in run_job post-queue, assert the job stays STARTED and the NATS/Redis state is intact.

  2. Operator-visible trade-off worth flagging in the PR body. Pre-PR, a run_job exception after queuing gave a FAILURE row + traceback immediately. Post-PR, it's a warning log line and STARTED until the 15-min stale-job reaper picks it up. Correct for the stuck-job problem, but the surface-level visibility of the crash is reduced. Ops may want to alert on the new warning message.

  3. max_deliver: 5 → 2 is a behavioral regression for flaky networks, even if it's correct for persistent-bad-data cases. The settings.NATS_MAX_DELIVER override gives ops an escape hatch — worth a PR-body line saying so explicitly so deployments with high packet loss know to bump it back up.

  4. Minor style: tasks.py:228-247 — the two _update_job_progress('results', ...) branches differ only in whether counts are passed or 0s. Could collapse to one call with counts_to_apply = (detections_count, classifications_count, captures_count) if progress_info.newly_removed > 0 else (0, 0, 0) and a single call. Not blocking.

None of these block merge — (1) is the one I'd most recommend adding before landing.

…age progress call

Two small follow-ups surfaced in PR review:

- Bug C (the task_failure guard added in 476a710) had no dedicated
  regression test. Add two: one asserts the ASYNC_API in-flight path
  keeps the job STARTED, preserves Redis state, and emits the expected
  deferral warning; a companion asserts SYNC_API still takes the
  terminal FAILURE + cleanup path. Tests invoke the handler body
  directly — Celery signal wiring isn't the subject.

- Collapse the two _update_job_progress("results", ...) branches in
  process_nats_pipeline_result into one call driven by a counts tuple
  that picks between real counts and zeros based on newly_removed. No
  behaviour change; easier to read. The inline comment still explains
  the idempotency contract.

Co-Authored-By: Claude <noreply@anthropic.com>
…sertion

The Bug C guard in update_job_failure defers FAILURE for ASYNC_API jobs that
are still in-flight, so test_cleanup_on_job_failure's pre-existing setup
(ASYNC_API + queued images + non-complete progress) now correctly takes the
deferred path and cleanup does not run. Drive all stages to complete before
firing task_failure so the guard falls through to the terminal cleanup path
this test is meant to cover. Deferred-path behaviour is already covered by
TestTaskFailureGuard in ami/jobs/tests/test_tasks.py.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow merged commit 9ea9149 into main Apr 15, 2026
7 checks passed
@mihow mihow deleted the fix/async-job-ack-srem-ordering branch April 15, 2026 20:04
mihow added a commit that referenced this pull request Apr 17, 2026
When NATS exhausts max_deliver=2 (per #1234) for a message whose
processing kept failing non-retriably, JetStream stops redelivering but
keeps the message in the consumer's num_ack_pending indefinitely. Redis
still tracks the image id in pending_images:{process,results}. The job
sits at <100% progress until check_stale_jobs REVOKEs it at 10 min,
wiping all the legitimately processed work alongside the lost images.

Adds mark_lost_images_failed() — a new sub-check wired into
jobs_health_check ahead of check_stale_jobs. For each running async_api
job whose updated_at is older than STALLED_JOBS_MAX_MINUTES and whose
Redis pending sets still hold ids, SREMs those ids from pending, SADDs
them to failed_images, and runs _update_job_progress for both stages.
The existing completion logic (FAILURE_THRESHOLD = 0.5) then decides
SUCCESS vs FAILURE on accurate per-image counts instead of REVOKE-ing
the whole job.

Design decisions:

* Idle-threshold signal is Job.updated_at, not NATS consumer counters.
  JetStream does not clear num_ack_pending when max_deliver is hit —
  guarding on num_ack_pending > 0 would block the exact production case
  this is meant to reconcile. Verified empirically with chaos_monkey
  exhaust_max_deliver (next commit) against a live NATS JetStream.
* get_consumer_state() is still called per candidate, but only to log
  current counters in the diagnostic appended to progress.errors. The
  reconcile decision itself does not read them.
* Runs BEFORE _run_stale_jobs_check so a reconcilable job lands in its
  natural completion state instead of being REVOKEd.

Tests:

* 8 TransactionTestCase cases in TestMarkLostImagesFailed covering the
  main job-2421 shape, the mixed-failure path, the >50% FAILURE
  threshold, the idle-threshold guard, and two cases (num_pending > 0,
  num_ack_pending > 0) that now reconcile because the idle cutoff is
  the load-bearing signal — previously (in earlier revisions of this
  branch) those were guarded noops.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 17, 2026
… + chaos runbook

* processing-lifecycle.md — add a new failure-mode row describing the
  symptom (job stuck at <100% indefinitely, some images still in Redis
  pending sets, NATS consumer shows num_redelivered > 0 and
  num_ack_pending stays elevated) and point at mark_lost_images_failed
  as the fix. Distinct from Bug A (pre-#1234 ACK/SREM ordering) — that
  was a crash-window race; this one is max_deliver giving up.
* chaos-scenarios.md — add Scenario F: a step-by-step runbook for
  validating the reconciler against a live stack using the new
  chaos_monkey exhaust_max_deliver subcommand. No code patches or
  celeryworker restart required, unlike Scenarios B/D which rely on
  sentinel-file sentinel patches.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 17, 2026
…complete (#1244)

* feat(ml): add ConsumerState + get_consumer_state + get_pending_image_ids

Two small primitives the upcoming mark_lost_images_failed reconciler
(next commit) needs to inspect running async_api jobs.

TaskQueueManager.get_consumer_state(job_id) → ConsumerState | None

  Thin, mockable projection over nats.js.api.ConsumerInfo. Returns only
  num_pending / num_ack_pending / num_redelivered — the counters that
  matter for diagnostic logging. A missing stream/consumer returns None
  rather than raising, mirroring log_consumer_stats_snapshot's
  tolerance: a job that was already cleaned up should not blow up a
  caller iterating over a batch.

AsyncJobStateManager.get_pending_image_ids() → set[str]

  SUNION of both stage pending sets. Used by the reconciler to find ids
  that Redis still tracks as pending-for-processing. Returns empty set
  on RedisError so one pooled-connection blip does not abort a sweep.

No call sites added in this commit. Unit tests for these primitives
come in the reconciler commit, where they are exercised as part of the
reconciliation path.

Co-Authored-By: Claude <noreply@anthropic.com>

* feat(jobs): reconcile async jobs stuck on NATS-lost images

When NATS exhausts max_deliver=2 (per #1234) for a message whose
processing kept failing non-retriably, JetStream stops redelivering but
keeps the message in the consumer's num_ack_pending indefinitely. Redis
still tracks the image id in pending_images:{process,results}. The job
sits at <100% progress until check_stale_jobs REVOKEs it at 10 min,
wiping all the legitimately processed work alongside the lost images.

Adds mark_lost_images_failed() — a new sub-check wired into
jobs_health_check ahead of check_stale_jobs. For each running async_api
job whose updated_at is older than STALLED_JOBS_MAX_MINUTES and whose
Redis pending sets still hold ids, SREMs those ids from pending, SADDs
them to failed_images, and runs _update_job_progress for both stages.
The existing completion logic (FAILURE_THRESHOLD = 0.5) then decides
SUCCESS vs FAILURE on accurate per-image counts instead of REVOKE-ing
the whole job.

Design decisions:

* Idle-threshold signal is Job.updated_at, not NATS consumer counters.
  JetStream does not clear num_ack_pending when max_deliver is hit —
  guarding on num_ack_pending > 0 would block the exact production case
  this is meant to reconcile. Verified empirically with chaos_monkey
  exhaust_max_deliver (next commit) against a live NATS JetStream.
* get_consumer_state() is still called per candidate, but only to log
  current counters in the diagnostic appended to progress.errors. The
  reconcile decision itself does not read them.
* Runs BEFORE _run_stale_jobs_check so a reconcilable job lands in its
  natural completion state instead of being REVOKEd.

Tests:

* 8 TransactionTestCase cases in TestMarkLostImagesFailed covering the
  main job-2421 shape, the mixed-failure path, the >50% FAILURE
  threshold, the idle-threshold guard, and two cases (num_pending > 0,
  num_ack_pending > 0) that now reconcile because the idle cutoff is
  the load-bearing signal — previously (in earlier revisions of this
  branch) those were guarded noops.

Co-Authored-By: Claude <noreply@anthropic.com>

* feat(jobs): chaos_monkey exhaust_max_deliver subcommand

Fault-injection tool for validating mark_lost_images_failed (and any
future reconciler logic) against a live NATS JetStream without needing
to run ADC and a real ML pipeline.

Given a job_id and a set of image ids, publishes one message per id on
the job's subject, then pulls without ACK and waits ack_wait — repeating
NATS_MAX_DELIVER times so each message hits its delivery budget. After
this the consumer sits in (num_pending=0, num_ack_pending=len(ids),
num_redelivered>0), which empirically is the post-exhaustion resting
state (JetStream does not clear num_ack_pending for messages that hit
max_deliver).

Complements the existing 'flush redis' / 'flush nats' subcommands.
Takes ~NATS_MAX_DELIVER × (TASK_TTR + 3s) to run (default ≈ 66s).

Run either against a real dispatched job, or with --ensure-stream to
create the stream+consumer itself (useful for standalone reconciler
tests against a fake job_id).

Usage:

    python manage.py chaos_monkey exhaust_max_deliver \
        --job-id 999999 \
        --image-ids img-a,img-b,img-c \
        --ensure-stream

Co-Authored-By: Claude <noreply@anthropic.com>

* docs(jobs): document max_deliver exhaustion + reconciler in lifecycle + chaos runbook

* processing-lifecycle.md — add a new failure-mode row describing the
  symptom (job stuck at <100% indefinitely, some images still in Redis
  pending sets, NATS consumer shows num_redelivered > 0 and
  num_ack_pending stays elevated) and point at mark_lost_images_failed
  as the fix. Distinct from Bug A (pre-#1234 ACK/SREM ordering) — that
  was a crash-window race; this one is max_deliver giving up.
* chaos-scenarios.md — add Scenario F: a step-by-step runbook for
  validating the reconciler against a live stack using the new
  chaos_monkey exhaust_max_deliver subcommand. No code patches or
  celeryworker restart required, unlike Scenarios B/D which rely on
  sentinel-file sentinel patches.

Co-Authored-By: Claude <noreply@anthropic.com>

* chore(jobs): drop unused logger import in chaos_monkey

The module defined ``logger = logging.getLogger(__name__)`` but never used it
(all output goes through ``self.stdout.write``). flake8 F401/F841 would flag
on a future lint pass. Spotted in Copilot review of #1244.

Co-Authored-By: Claude <noreply@anthropic.com>

* docs(jobs): replace PR #TODO with PR #1244 in lifecycle doc

The new failure-mode row was committed with a placeholder PR reference;
replace it now that #1244 is open.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(jobs): tighten lost-images reconciler — race re-validation, ids cap

Three Copilot review findings on #1244 rolled up:

- Re-validate inside ``select_for_update`` at the top of
  ``_reconcile_lost_images`` before issuing any Redis SREM/SADD. Closes the
  window where a late ``process_nats_pipeline_result`` arrives between
  ``mark_lost_images_failed``'s candidate selection and reconciliation,
  bumps ``updated_at``, and would otherwise cause counter inflation
  (same image counted as both processed and failed).

- Cap the id list embedded in ``progress.errors`` at 10 entries plus
  "and N more". The full list still goes to ``job.logger.warning`` —
  recoverable from the UI's logs panel without bloating the JSONB field
  surfaced in the job detail payload.

- Fix the ``_run_mark_lost_images_failed_check`` docstring that claimed
  the reconciler skips when ``num_redelivered=0``. It does not — that
  guard was removed when we discovered NATS keeps exhausted messages
  in ``num_ack_pending`` indefinitely; the only skip conditions now are
  consumer missing or no pending ids.

``_reconcile_lost_images`` now returns one of ``"marked_failed"``,
``"raced"``, or ``"state_disappeared"`` so the caller can record the
distinction in the per-tick result dict.

Two new tests:

- ``test_reconcile_skips_when_job_updated_at_bumped_after_candidate_select``
  — TDD-verified: fails without the new ``cutoff`` parameter.
- ``test_progress_errors_truncates_long_id_list``
  — TDD-verified: fails without the new preview-limit constant.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants