fix(core): make RedisJobStore.update atomic with per-field HSET#57
Conversation
Closes #54. RedisJobStore stored each job as a single JSON string and updated it with a read-modify-write (GET, merge in Python, SET). Because the API and the ARQ worker write the same job from separate processes, two concurrent updates could clobber each other (lost update): whichever SET landed last overwrote every field the other writer had changed. This is the failure mode the F2 "shared job store" fix is meant to prevent, so the store must not reintroduce it. Store each job as a Redis hash (job:{id}) with individually JSON-encoded field values: - create: MULTI/EXEC of DELETE + HSET(mapping) + EXPIRE (replaces any prior record, clears stale fields, reapplies TTL). - update: HSET only the provided non-None fields + EXPIRE. HSET is atomic, so writers touching different fields no longer clobber each other; same-field writes remain last-writer-wins (the expected contract). - get: HGETALL + per-field decode, tolerant of bytes or str (clients created without decode_responses return bytes). Consolidate the two duplicated get/set FakeRedis stand-ins into a single hash-capable tests/unit/_fake_redis.py (adds hset/hgetall/expire/delete and a buffered transactional pipeline), and add a regression test asserting two disjoint concurrent field updates both survive. Suite: 460 passing, coverage 93.03%. ruff + ruff format clean; basedpyright src/: 0 errors. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Warning Review limit reached
More reviews will be available in 15 minutes and 22 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (4)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR changes RedisJobStore from whole-record JSON writes to per-field Redis hash writes so API/worker updates can safely coexist in multi-writer deployments.
Changes:
- Reworks Redis job persistence to use
HSET/HGETALLwith per-field JSON encoding and TTL refreshes. - Adds a shared hash-capable
FakeRedisfor Redis-backed unit tests. - Updates job-store and audio-task tests to validate hash round-tripping and concurrent disjoint updates.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
src/audio_processor/core/job_store.py |
Implements Redis hash storage and field-level updates for RedisJobStore. |
tests/unit/_fake_redis.py |
Adds a shared async fake Redis with hash and pipeline support. |
tests/unit/test_job_store.py |
Updates Redis job-store tests and adds a concurrent-update regression test. |
tests/unit/test_audio_tasks.py |
Switches worker task assertions to read persisted state through RedisJobStore. |
| await asyncio.gather( | ||
| store.update("j1", status="running"), | ||
| store.update("j1", progress={"pct": 10}), | ||
| ) |
PR Review (automated, /pr-review)No code defect blocks merge. CI is green; the core change (single JSON blob -> per-job Redis hash with per-field Process notes
Suggested
Informational
Notably good: symmetric encode/decode with bytes-vs-str coverage, consolidation of the duplicate Merge order is enforced by the stack: review/merge #53 first. 🤖 Generated with Claude Code |
|
The findings around ✅ Actions performedReview triggered.
|
fee89b2
into
claude/repo-architecture-review-zRxhA
Re-applied on top of the merged atomic store (#57), which already implemented the per-field HSET RedisJobStore and the shared test fake. This commit adds the remaining review items that #57 did not cover: Security: - api_keys is now SecretStr (was plain str) so keys never leak via repr / model_dump / logs; api_key_set reads via get_secret_value(). - require_api_key compares against all keys without short-circuiting (avoids a timing side channel); the rate limiter hashes the API key before using it as an in-memory map key. - Fail-fast Settings validator: auth_required requires at least one api_key. Memory: - Rate-limiter window map is hard-capped: after evicting expired windows it evicts the oldest entries when still over the cap. Error handling: - _generate_artifacts catches any exception (DOM/json.dumps can raise TypeError/ValueError/KeyError) and records an artifacts_error field instead of failing a completed transcription. - Status-route timestamp parsing is defensive (no opaque 500 on a bad record). - _maybe_enqueue raises when enabled without a pool, and enqueue failures mark the job FAILED instead of stranding it QUEUED. Tests: test_config.py (validators, SecretStr non-leak) and _maybe_enqueue seam tests. 489 passing, coverage 92.69%.
…API hardening, decomposition) (#53) * refactor: remove dead preprocessing pipeline and orphaned exceptions The preprocessing/ package (PR #27) was superseded by the services/ implementations (PR #43) and is imported nowhere in production code. The two pipelines computed quality metrics with different algorithms (Butterworth-filter SNR vs spectral-percentile SNR) and maintained a second, independent Silero VAD model cache, creating a correctness ambiguity and duplicate maintenance burden. This removes: - src/audio_processor/preprocessing/{loader,ffmpeg,vad,quality}.py - src/audio_processor/exceptions.py (AudioLoadError, FfmpegConversionError were only used by the preprocessing modules; the canonical hierarchy lives in core/exceptions.py) - tests/unit/preprocessing/ The wired-in services/ implementations remain the single source of truth. Suite: 436 passing, coverage 89.98%. * feat: unify job lifecycle behind a shared JobStore and enable ARQ enqueue The API stored jobs in a process-local in-memory dict and never enqueued them, while the worker read/wrote a separate Redis store. A submitted job could never reach COMPLETED from the API's perspective, and /results and /artifacts were structurally unreachable (issue #50). Introduce core/job_store.py with a JobStore abstraction and two backends: - InMemoryJobStore (dev/tests; also exposes a sync mapping interface for direct record injection) - RedisJobStore (JSON-in-Redis keyed by job:{id}) Both backends share one key scheme and serialization, so the API and the separate-process worker observe the same state. - routes.py resolves the store via app.state (falling back to the in-memory store) and enqueues to ARQ when enqueue_enabled and a pool is attached. - audio_tasks._update_job_status delegates to RedisJobStore (single source of truth; removes the duplicated get/merge/set + key literal). - api/__init__.py gains a lifespan that opens an ARQ pool and attaches a RedisJobStore when job_store_backend == "redis". Defaults (memory backend, enqueue disabled) preserve existing behavior; arq is imported lazily so the API does not hard-depend on the 'jobs' extra. New config: job_store_backend ("memory"|"redis"), enqueue_enabled (bool). Tests: add test_job_store.py (both backends) and test_audio_tasks.py, which gives process_audio_job its first coverage and verifies a job reaches COMPLETED with result+artifacts in the shared store. 449 passing, 92.76%. * feat(security): add API-key auth, rate limiting, and safe streaming uploads Addresses two findings from the architecture review: F3 (no authn/authz): every endpoint was anonymous, so unauthenticated callers could drive FFmpeg work and paid Deepgram calls. F4 (unbounded upload): the size guard trusted the client Content-Length header and the body was read fully into memory before validation, enabling a memory-exhaustion DoS; orphaned temp files were only cleaned on the validation-error path. Changes: - api/security.py: require_api_key (constant-time X-API-Key check) and a per-client fixed-window rate_limit dependency. Both are gated by config and default OFF, so existing behavior is preserved. - require_api_key is attached at the /api/v1 router; rate_limit guards the expensive POST /process endpoint. - routes.py: stream uploads to disk in 1 MiB chunks with a hard byte cap enforced on bytes actually read (no full-body buffering, header not trusted). A finally block guarantees the temp file is removed on every failure path; on success the worker owns and deletes it. - config: auth_required, api_keys (+ api_key_set), rate_limit_enabled, rate_limit_requests, rate_limit_window_seconds. No new third-party dependency is added (keeps the audited dependency set stable); the rate limiter is process-local and documented as a safety net to complement a gateway/Redis limiter in multi-process deployments. Tests: test_security.py and test_routes_hardening.py cover auth (open/401/ wrong/ok/misconfig-500), rate limiting (under/over/per-client), the streaming cap, and temp-file cleanup on failure. 462 passing, coverage 92.79%. * docs: document job/security settings in .env.example; apply ruff format - Add JOB_STORE_BACKEND, ENQUEUE_ENABLED, AUTH_REQUIRED, API_KEYS, RATE_LIMIT_* to .env.example so the new configuration is discoverable. - Apply ruff format to api/__init__.py and test_security.py (whitespace only). * fix(test): move UploadFile import into TYPE_CHECKING block CodeQL flagged UploadFile as an unused runtime import; it is only used in a cast() string annotation, so it belongs in the type-checking block. * refactor: remove dead Python 3.10 UTC compatibility shims requires-python is >=3.11, so datetime.UTC is always available and the sys.version_info guard's else branch was unreachable dead code, duplicated across four modules (models, routes, audio_tasks, worker). Replace each with a direct `from datetime import UTC` and drop the now-unused sys/timezone imports. * refactor: remove template scaffolding from worker and utils Cookiecutter residue that the audio pipeline never used: - jobs/worker.py: drop the example/stub tasks (example_background_task, send_email_task, process_file_upload) and the no-op cleanup_old_data cron (it returned a hard-coded 0 yet was registered on the worker). Remove the two large triple-quoted "example" blocks (FastAPI integration + Celery alternative) and the now-unused asyncio/datetime/cron imports. WorkerSettings now registers only process_audio_job. - utils/financial.py: delete the empty placeholder module (no references). - Update test_worker.py to cover the real surface (process_audio_job registration, lifecycle hooks, enqueue_task) and CLAUDE.md project tree. 456 passing, coverage 92.96%. * refactor: decompose process_audio_job into staged helpers The 260-line task mixed orchestration, conversion, transcription, artifact generation, cleanup, and six near-identical inlined progress dicts. Extract focused, individually-readable helpers: - _progress(): builds a progress payload (collapses the 6 repeated blocks). - _convert_audio(): converter is_video/extract vs convert_for_asr branch. - _transcribe(): lazy Deepgram import + ConfigurationError tolerance. - _build_transcription_payload(): result serialization. - _generate_artifacts(): lazy import + generation-failure tolerance. process_audio_job is now a ~80-line orchestrator with the same external behavior and status transitions. Add tests for the two previously-uncovered error branches (generic-exception wrapping to AudioProcessorError + FAILED, and artifact-generation failure still completing the job). 458 passing, coverage 93.06%. * fix: address PR review (rate-limiter bound, config invariant, test coverage) - security.py: bound _RATE_WINDOWS with opportunistic eviction of expired windows above a soft cap, so a flood of unique client identifiers cannot grow the per-process map without bound. - config.py: add a model_validator enforcing the documented invariant that enqueue_enabled requires job_store_backend='redis', so the misconfiguration fails fast at startup instead of silently never processing jobs. - test_job_store.py: cover the bytes-decode branch of RedisJobStore._decode. * fix(test): resolve CodeQL unused-import on UploadFile CodeQL flagged UploadFile as unused because its only reference was the string in cast("UploadFile", ...), and ruff TC006 conversely requires the cast type to be quoted -- a direct conflict. Drop the cast/import entirely and pass the duck-typed _FakeUpload directly with a precise reportArgumentType ignore on the call. Also rename the autouse fixture _clear_store -> clear_store to match the suite convention and avoid reportUnusedFunction. * fix(core): make RedisJobStore.update atomic with per-field HSET (#57) Closes #54. RedisJobStore stored each job as a single JSON string and updated it with a read-modify-write (GET, merge in Python, SET). Because the API and the ARQ worker write the same job from separate processes, two concurrent updates could clobber each other (lost update): whichever SET landed last overwrote every field the other writer had changed. This is the failure mode the F2 "shared job store" fix is meant to prevent, so the store must not reintroduce it. Store each job as a Redis hash (job:{id}) with individually JSON-encoded field values: - create: MULTI/EXEC of DELETE + HSET(mapping) + EXPIRE (replaces any prior record, clears stale fields, reapplies TTL). - update: HSET only the provided non-None fields + EXPIRE. HSET is atomic, so writers touching different fields no longer clobber each other; same-field writes remain last-writer-wins (the expected contract). - get: HGETALL + per-field decode, tolerant of bytes or str (clients created without decode_responses return bytes). Consolidate the two duplicated get/set FakeRedis stand-ins into a single hash-capable tests/unit/_fake_redis.py (adds hset/hgetall/expire/delete and a buffered transactional pipeline), and add a regression test asserting two disjoint concurrent field updates both survive. Suite: 460 passing, coverage 93.03%. ruff + ruff format clean; basedpyright src/: 0 errors. Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: address PR review — secrets, API hardening, error handling Re-applied on top of the merged atomic store (#57), which already implemented the per-field HSET RedisJobStore and the shared test fake. This commit adds the remaining review items that #57 did not cover: Security: - api_keys is now SecretStr (was plain str) so keys never leak via repr / model_dump / logs; api_key_set reads via get_secret_value(). - require_api_key compares against all keys without short-circuiting (avoids a timing side channel); the rate limiter hashes the API key before using it as an in-memory map key. - Fail-fast Settings validator: auth_required requires at least one api_key. Memory: - Rate-limiter window map is hard-capped: after evicting expired windows it evicts the oldest entries when still over the cap. Error handling: - _generate_artifacts catches any exception (DOM/json.dumps can raise TypeError/ValueError/KeyError) and records an artifacts_error field instead of failing a completed transcription. - Status-route timestamp parsing is defensive (no opaque 500 on a bad record). - _maybe_enqueue raises when enabled without a pool, and enqueue failures mark the job FAILED instead of stranding it QUEUED. Tests: test_config.py (validators, SecretStr non-leak) and _maybe_enqueue seam tests. 489 passing, coverage 92.69%. * style: drop unused noqa(BLE001) on re-raising except in _maybe_enqueue guard The except re-raises, so BLE001 does not apply and the suppression was unused (ruff RUF100), failing the Code Quality CI checks. * fix(security): use non-crypto hash for rate-limit bucket id CodeQL flagged hashlib.sha256(api_key) as weak hashing of sensitive data (it treats the API key as a password requiring a slow KDF). The hash here is only an in-process rate-limit bucket id, not password storage. Use the builtin non-cryptographic hash(), which keeps per-key buckets and still avoids holding the raw secret as a map key, without tripping the crypto sink. --------- Co-authored-by: Claude <noreply@anthropic.com>
Summary
Fast-follow for #53 that makes
RedisJobStoresafe to enable in a multi-writerdeployment. Stacked on
claude/repo-architecture-review-zRxhA(#53); review/merge #53 first.Closes #54.
Why
#53's
RedisJobStore.updatewas a non-atomic read-modify-write over a singleJSON string:
The whole point of finding F2 is that the API and the ARQ worker write the same
job from separate processes. With a whole-record SET, two concurrent writers
lose updates: whichever SET lands last overwrites every field the other changed.
This reintroduces, at the storage layer, the kind of state divergence F2 set out
to remove.
What
Store each job as a Redis hash (
job:{id}) with individually JSON-encodedfield values:
MULTI/EXECofDELETE+HSET(mapping)+EXPIRE(replacesany prior record, clears stale fields, reapplies TTL).
HSETonly the provided non-Nonefields +EXPIRE.HSETis asingle atomic command, so writers touching different fields no longer
clobber each other. Same-field writes stay last-writer-wins (the expected
contract).
HGETALL+ per-field decode, tolerant ofbytesorstr(clientscreated without
decode_responsesreturn bytes).Also consolidates the two duplicated get/set
FakeRedisstand-ins from #53into one hash-capable
tests/unit/_fake_redis.py, and adds a regression testasserting two disjoint concurrent field updates both survive (would fail under
the old blob approach).
Verification
460 passing, coverage 93.03% (gate 80%).ruff check+ruff format --check: clean.basedpyright src/: 0 errors.test_concurrent_field_updates_do_not_clobber,bytes-vs-str field decode, missing-key returns
None, and the fivetest_audio_tasksassertions now read back through the store.Notes
job:{id}changes from a JSON string to a hash. Nomigration concern: the redis backend is config-gated and not yet enabled in
any deployment (see Soften F2 "Fixed" framing: end-to-end processing is config-gated, not on by default [follow-up to #53] #55).