Skip to content

Conversation

@chrisguidry
Copy link
Owner

When a task was blocked via ConcurrencyBlocked (e.g., a redelivery of a task
that's still running), the finally block would still call _release_concurrency_slot
with the same execution.key, removing the slot that the original delivery was
actively using. This allowed other tasks to acquire that slot and run concurrently.

The fix tracks whether we actually acquired a slot (acquired_concurrency_slot)
and only releases in the finally block if we did.

Also cleaned up the Lua script:

  • Removed ZREMRANGEBYSCORE which was expiring slots based on redelivery_timeout
    (causing slots to be evicted while tasks were still running)
  • Changed from worker_id:task_key to just task_key as the member for O(1) ZSCORE
    lookup to detect redelivery of the same task
  • Added slot_timeout logic to block fresh redeliveries but allow takeover of
    stale slots from crashed workers

The test now uses two workers to properly stress cross-worker concurrency limits.

🤖 Generated with Claude Code

When a task was blocked via ConcurrencyBlocked (e.g., a redelivery of a task
that's still running), the finally block would still call _release_concurrency_slot
with the same execution.key, removing the slot that the original delivery was
actively using. This allowed other tasks to acquire that slot and run concurrently.

The fix tracks whether we actually acquired a slot (`acquired_concurrency_slot`)
and only releases in the finally block if we did.

Also cleaned up the Lua script:
- Removed ZREMRANGEBYSCORE which was expiring slots based on redelivery_timeout
  (causing slots to be evicted while tasks were still running)
- Changed from worker_id:task_key to just task_key as the member for O(1 ZSCORE
  lookup to detect redelivery of the same task
- Added slot_timeout logic to block fresh redeliveries but allow takeover of
  stale slots from crashed workers

The test now uses two workers to properly stress cross-worker concurrency limits.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@claude
Copy link
Contributor

claude bot commented Dec 10, 2025

Claude finished @chrisguidry's task —— View job


Code Review Feedback

✅ Summary

This PR effectively fixes a critical concurrency bug where slots were being incorrectly released when tasks were blocked by concurrency limits. The solution is clean and well-tested.

🔍 Key Changes Analysis

Bug Fix: Concurrency Slot Tracking (src/docket/worker.py)

The core fix adds an acquired_concurrency_slot flag to track whether a slot was actually acquired:

  • Line 654: Initializes acquired_concurrency_slot = False
  • Lines 677-681: Sets flag to True only when slot is successfully acquired
  • Lines 804-808: Only releases slot if flag is True

This prevents the double-release bug where a blocked task (raising ConcurrencyBlocked) would still try to release a slot it never acquired.

Improved Lua Script Logic (src/docket/worker.py:1006-1034)

The updated Lua script is much cleaner:

  • Lines 1013-1024: Smart handling of existing slots - blocks fresh redeliveries but allows takeover of stale slots
  • Line 1040: Uses fixed 5-second buffer for slot_timeout instead of a multiplier approach
  • Simplified member format: Uses just task_key instead of worker_id:task_key for O(1) lookups

✅ Strengths

  1. Root Cause Fix: Directly addresses the slot release bug with minimal code changes
  2. Improved Test Coverage: The multi-worker test (test_concurrency_blocked_task_executes_exactly_once) properly stresses cross-worker concurrency
  3. Cleaner Redis Operations: Simplified Lua script with better performance characteristics
  4. Defensive Programming: The slot timeout mechanism prevents both duplicate execution and stale slot blocking

🎯 Potential Considerations

  1. Hardcoded Timeout Buffer: Line 1040 uses a fixed 5-second buffer. Consider making this configurable if different workloads need different safety margins.

  2. Error Handling: The current implementation silently continues if a slot can't be released (lines 1069-1070). While safe, consider logging this scenario for debugging.

  3. Test Timing Sensitivity: The test uses very short timeouts (50ms) which could be flaky in CI environments. The current values seem reasonable but watch for intermittent failures.

🔒 Security & Performance

  • No security concerns identified - All changes are internal concurrency control
  • Performance improvement - O(1) ZSCORE lookups instead of potential O(n) operations
  • Memory efficiency - Simpler Redis data structure (just task_key as member)

✅ Test Coverage

The test changes are excellent:

  • Properly tests cross-worker concurrency with multiple workers
  • Verifies no task overlap occurs
  • Checks cleanup with pending message verification
  • Maintains 100% coverage requirement

💡 Minor Suggestions

  1. Consider adding a debug log when a slot is blocked due to being "fresh" (line 1019) to help with troubleshooting
  2. The comment on line 1037-1040 is helpful but could be extracted to module-level documentation for better visibility

✅ Verdict

This is a solid fix that addresses the root cause without over-engineering. The improved test coverage and cleaner Lua script are valuable improvements beyond just the bug fix. The changes maintain backward compatibility and don't introduce any new dependencies or breaking changes.

Approved - Ready to merge once CI passes.

@github-actions
Copy link

github-actions bot commented Dec 10, 2025

📚 Documentation has been built for this PR!

You can download the documentation directly here:
https://github.com/chrisguidry/docket/actions/runs/20111798672/artifacts/4829474560

@codecov-commenter
Copy link

codecov-commenter commented Dec 10, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 100.00%. Comparing base (050e53f) to head (3d7833e).

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff            @@
##              main      #223   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files           48        48           
  Lines         7745      7745           
  Branches       403       402    -1     
=========================================
  Hits          7745      7745           
Flag Coverage Δ
python-3.10 100.00% <100.00%> (ø)
python-3.11 98.95% <100.00%> (ø)
python-3.12 100.00% <100.00%> (ø)
python-3.13 100.00% <100.00%> (ø)
python-3.14 100.00% <100.00%> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/docket/worker.py 100.00% <100.00%> (ø)
tests/concurrency_limits/test_redelivery.py 100.00% <100.00%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@chrisguidry chrisguidry merged commit 6fdac41 into main Dec 10, 2025
34 checks passed
@chrisguidry chrisguidry deleted the concurrency-redelivery-flake branch December 10, 2025 20:12
chrisguidry added a commit that referenced this pull request Dec 11, 2025
PR #223 fixed a concurrency slot release bug but removed the ZREMRANGEBYSCORE
that cleaned up stale entries. This could cause memory growth over time if
slots become orphaned (e.g., worker crashes between acquiring and releasing).

The fix adds ZREMRANGEBYSCORE back to the Lua script but with the correct
cutoff: `slot_timeout` (redelivery_timeout + 5) instead of `redelivery_timeout`.
This is safe because any slot older than slot_timeout is either:
1. Orphaned (should have been released) - safe to clean
2. From a task exceeding redelivery_timeout - will be redelivered anyway

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
chrisguidry added a commit that referenced this pull request Dec 11, 2025
PR #223 fixed a concurrency slot release bug but removed the
ZREMRANGEBYSCORE
that cleaned up stale entries. This could cause memory growth over time
if
slots become orphaned (e.g., worker crashes between acquiring and
releasing).

The fix adds ZREMRANGEBYSCORE back to the Lua script but with the
correct
cutoff: `slot_timeout` (redelivery_timeout + 5) instead of
`redelivery_timeout`.
This is safe because any slot older than slot_timeout is either:
1. Orphaned (should have been released) - safe to clean
2. From a task exceeding redelivery_timeout - will be redelivered anyway

Related: #223

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants