refactor: architecture review remediation (job lifecycle, dead code, API hardening, decomposition)#53
Conversation
The preprocessing/ package (PR #27) was superseded by the services/ implementations (PR #43) and is imported nowhere in production code. The two pipelines computed quality metrics with different algorithms (Butterworth-filter SNR vs spectral-percentile SNR) and maintained a second, independent Silero VAD model cache, creating a correctness ambiguity and duplicate maintenance burden. This removes: - src/audio_processor/preprocessing/{loader,ffmpeg,vad,quality}.py - src/audio_processor/exceptions.py (AudioLoadError, FfmpegConversionError were only used by the preprocessing modules; the canonical hierarchy lives in core/exceptions.py) - tests/unit/preprocessing/ The wired-in services/ implementations remain the single source of truth. Suite: 436 passing, coverage 89.98%.
…ueue The API stored jobs in a process-local in-memory dict and never enqueued them, while the worker read/wrote a separate Redis store. A submitted job could never reach COMPLETED from the API's perspective, and /results and /artifacts were structurally unreachable (issue #50). Introduce core/job_store.py with a JobStore abstraction and two backends: - InMemoryJobStore (dev/tests; also exposes a sync mapping interface for direct record injection) - RedisJobStore (JSON-in-Redis keyed by job:{id}) Both backends share one key scheme and serialization, so the API and the separate-process worker observe the same state. - routes.py resolves the store via app.state (falling back to the in-memory store) and enqueues to ARQ when enqueue_enabled and a pool is attached. - audio_tasks._update_job_status delegates to RedisJobStore (single source of truth; removes the duplicated get/merge/set + key literal). - api/__init__.py gains a lifespan that opens an ARQ pool and attaches a RedisJobStore when job_store_backend == "redis". Defaults (memory backend, enqueue disabled) preserve existing behavior; arq is imported lazily so the API does not hard-depend on the 'jobs' extra. New config: job_store_backend ("memory"|"redis"), enqueue_enabled (bool). Tests: add test_job_store.py (both backends) and test_audio_tasks.py, which gives process_audio_job its first coverage and verifies a job reaches COMPLETED with result+artifacts in the shared store. 449 passing, 92.76%.
…ploads
Addresses two findings from the architecture review:
F3 (no authn/authz): every endpoint was anonymous, so unauthenticated
callers could drive FFmpeg work and paid Deepgram calls.
F4 (unbounded upload): the size guard trusted the client Content-Length
header and the body was read fully into memory before validation, enabling
a memory-exhaustion DoS; orphaned temp files were only cleaned on the
validation-error path.
Changes:
- api/security.py: require_api_key (constant-time X-API-Key check) and a
per-client fixed-window rate_limit dependency. Both are gated by config and
default OFF, so existing behavior is preserved.
- require_api_key is attached at the /api/v1 router; rate_limit guards the
expensive POST /process endpoint.
- routes.py: stream uploads to disk in 1 MiB chunks with a hard byte cap
enforced on bytes actually read (no full-body buffering, header not
trusted). A finally block guarantees the temp file is removed on every
failure path; on success the worker owns and deletes it.
- config: auth_required, api_keys (+ api_key_set), rate_limit_enabled,
rate_limit_requests, rate_limit_window_seconds.
No new third-party dependency is added (keeps the audited dependency set
stable); the rate limiter is process-local and documented as a safety net to
complement a gateway/Redis limiter in multi-process deployments.
Tests: test_security.py and test_routes_hardening.py cover auth (open/401/
wrong/ok/misconfig-500), rate limiting (under/over/per-client), the streaming
cap, and temp-file cleanup on failure. 462 passing, coverage 92.79%.
- Add JOB_STORE_BACKEND, ENQUEUE_ENABLED, AUTH_REQUIRED, API_KEYS, RATE_LIMIT_* to .env.example so the new configuration is discoverable. - Apply ruff format to api/__init__.py and test_security.py (whitespace only).
|
Warning Review limit reached
More reviews will be available in 56 minutes and 45 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (17)
WalkthroughThis PR adds a shared job-store abstraction with Redis and in-memory backends, implements API-key authentication and rate limiting, introduces FastAPI lifecycle management for Redis pooling, refactors HTTP routes to use the job store with streaming uploads and hardening, and updates the worker to persist state via the store. The old preprocessing modules and exceptions are removed in separate deletions. ChangesJob Store, Security, and Route Hardening
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
✅ FIPS Compatibility Check
Status: ✅ PASSED What is FIPS?FIPS 140-2/140-3 is a US government standard for cryptographic modules. Common issues:
|
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
There was a problem hiding this comment.
Pull request overview
Unifies job state between the API and the ARQ worker behind a shared JobStore abstraction (memory + Redis), removes the unused preprocessing/ pipeline that was superseded by services/, and adds opt-in API-key auth, per-client rate limiting, and a streaming upload with a hard size cap and guaranteed temp-file cleanup. New behaviors are gated by config and default-off so existing behavior is preserved.
Changes:
- New
core/job_store.py(InMemoryJobStore+RedisJobStore) used by bothroutes.pyandaudio_tasks._update_job_status; API enqueues to ARQ viaapp.state.arq_poolwhenenqueue_enabled+job_store_backend=redis. - Removed dead
src/audio_processor/preprocessing/{loader,ffmpeg,vad,quality}.py, the orphan top-levelexceptions.py, andtests/unit/preprocessing/. - New
api/security.pywith router-levelrequire_api_keyandrate_limitonPOST /process; streaming uploads in 1 MiB chunks with a 413 byte-cap andfinallycleanup of orphaned temp files.
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| src/audio_processor/core/job_store.py | New shared job-store abstraction with memory and Redis backends, common key scheme. |
| src/audio_processor/core/config.py | New job_store_backend, enqueue_enabled, auth_required, api_keys, rate-limit settings, and api_key_set property. |
| src/audio_processor/api/init.py | Lifespan now opens an ARQ pool and attaches a RedisJobStore when Redis backend is selected; lazy arq import. |
| src/audio_processor/api/routes.py | Router-level auth dependency, streaming upload with size cap, shared store resolution, optional ARQ enqueue, temp-file cleanup. |
| src/audio_processor/api/security.py | New X-API-Key auth dependency and process-local fixed-window rate limiter, both gated by config. |
| src/audio_processor/jobs/audio_tasks.py | _update_job_status delegates to RedisJobStore instead of inline JSON-in-Redis. |
| src/audio_processor/preprocessing/* | Removed dead modules superseded by services/. |
| src/audio_processor/exceptions.py | Removed orphan top-level exceptions module. |
| tests/unit/test_job_store.py | New unit tests for both backends including fake-Redis JSON/TTL handling. |
| tests/unit/test_audio_tasks.py | New end-to-end coverage for process_audio_job with fake Redis, asserting state transitions. |
| tests/unit/test_security.py | New tests for auth (open/missing/invalid/valid/misconfig) and rate-limit windows. |
| tests/unit/test_routes_hardening.py | New tests for streaming 413, temp-file cleanup on validation failure, and auth enforcement on /api/v1. |
| tests/unit/preprocessing/* | Removed alongside the deleted package. |
| .env.example | Documents the new job-store, enqueue, auth, and rate-limit settings. |
CodeQL flagged UploadFile as an unused runtime import; it is only used in a cast() string annotation, so it belongs in the type-checking block.
requires-python is >=3.11, so datetime.UTC is always available and the sys.version_info guard's else branch was unreachable dead code, duplicated across four modules (models, routes, audio_tasks, worker). Replace each with a direct `from datetime import UTC` and drop the now-unused sys/timezone imports.
Cookiecutter residue that the audio pipeline never used: - jobs/worker.py: drop the example/stub tasks (example_background_task, send_email_task, process_file_upload) and the no-op cleanup_old_data cron (it returned a hard-coded 0 yet was registered on the worker). Remove the two large triple-quoted "example" blocks (FastAPI integration + Celery alternative) and the now-unused asyncio/datetime/cron imports. WorkerSettings now registers only process_audio_job. - utils/financial.py: delete the empty placeholder module (no references). - Update test_worker.py to cover the real surface (process_audio_job registration, lifecycle hooks, enqueue_task) and CLAUDE.md project tree. 456 passing, coverage 92.96%.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/audio_processor/api/security.py`:
- Line 32: The _RATE_WINDOWS dict is unbounded and can grow indefinitely; modify
the rate limiting implementation (the module-level _RATE_WINDOWS usage) to evict
expired windows and cap growth by adding a TTL timestamp per entry and an
opportunistic cleanup step: when accessing or inserting into _RATE_WINDOWS,
remove entries whose window end is past now and, if the map exceeds a
configurable MAX_WINDOWS, evict the oldest/least-recently-used entries (or drop
the smallest-priority ones) before inserting; apply the same eviction/capping
logic to the other occurrences that manipulate _RATE_WINDOWS (lines referenced
around the 115-119 usage) so entries are reset-in-place but also removed when
expired to prevent unbounded memory growth.
In `@src/audio_processor/core/config.py`:
- Around line 190-196: Add a Pydantic model-level validator to enforce the
documented invariant: if enqueue_enabled is True then job_store_backend must
equal "redis". In the config model (the class declaring enqueue_enabled and
job_store_backend) add a `@model_validator`(mode="after") method that checks these
fields and raises a ValueError when enqueue_enabled and job_store_backend !=
"redis", so misconfiguration fails at startup instead of silently warning in
_maybe_enqueue; also add model_validator to the pydantic import list.
In `@tests/unit/test_audio_tasks.py`:
- Around line 149-211: Add two tests exercising the uncovered error branches in
process_audio_job: (1) trigger the generic except Exception path by making the
injected services raise a plain Exception (e.g., set
patched_services.side_effect = Exception("boom") ) and assert process_audio_job
raises AudioProcessorError and the redis-stored job (job_key) has status
"failed" and an error; (2) trigger the artifact-generation-failure path by
monkeypatching ArtifactGenerator to return a mock whose generate_all raises
AudioProcessorError (reference ArtifactGenerator.generate_all) and assert
process_audio_job still completes, the stored job status is "completed", and
stored["artifacts"] == {}. Ensure tests use the same ctx {"redis": FakeRedis()}
pattern and reference process_audio_job, AudioProcessorError, ArtifactGenerator
to locate the spots.
In `@tests/unit/test_job_store.py`:
- Around line 104-110: The test test_get_decodes_bytes_and_str only writes a str
payload so the bytes branch in RedisJobStore._decode is never exercised; update
the test to also insert a bytes payload into the FakeRedis backing store (e.g.,
set redis.store[job_key("b_bytes")] to json.dumps({...}).encode() or replace the
existing entry with a bytes value) and assert that await store.get(...) returns
the expected dict for that key too, ensuring both the str and bytes decoding
paths are covered.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 6904d4a9-c0ac-4191-b79d-4cccdb13bd56
📒 Files selected for processing (22)
.env.examplesrc/audio_processor/api/__init__.pysrc/audio_processor/api/routes.pysrc/audio_processor/api/security.pysrc/audio_processor/core/config.pysrc/audio_processor/core/job_store.pysrc/audio_processor/exceptions.pysrc/audio_processor/jobs/audio_tasks.pysrc/audio_processor/preprocessing/__init__.pysrc/audio_processor/preprocessing/ffmpeg.pysrc/audio_processor/preprocessing/loader.pysrc/audio_processor/preprocessing/quality.pysrc/audio_processor/preprocessing/vad.pytests/unit/preprocessing/__init__.pytests/unit/preprocessing/test_ffmpeg.pytests/unit/preprocessing/test_loader.pytests/unit/preprocessing/test_quality.pytests/unit/preprocessing/test_vad.pytests/unit/test_audio_tasks.pytests/unit/test_job_store.pytests/unit/test_routes_hardening.pytests/unit/test_security.py
💤 Files with no reviewable changes (10)
- tests/unit/preprocessing/test_quality.py
- src/audio_processor/preprocessing/quality.py
- src/audio_processor/preprocessing/vad.py
- tests/unit/preprocessing/test_loader.py
- src/audio_processor/preprocessing/ffmpeg.py
- tests/unit/preprocessing/test_vad.py
- src/audio_processor/preprocessing/loader.py
- tests/unit/preprocessing/test_ffmpeg.py
- src/audio_processor/preprocessing/init.py
- src/audio_processor/exceptions.py
The 260-line task mixed orchestration, conversion, transcription, artifact generation, cleanup, and six near-identical inlined progress dicts. Extract focused, individually-readable helpers: - _progress(): builds a progress payload (collapses the 6 repeated blocks). - _convert_audio(): converter is_video/extract vs convert_for_asr branch. - _transcribe(): lazy Deepgram import + ConfigurationError tolerance. - _build_transcription_payload(): result serialization. - _generate_artifacts(): lazy import + generation-failure tolerance. process_audio_job is now a ~80-line orchestrator with the same external behavior and status transitions. Add tests for the two previously-uncovered error branches (generic-exception wrapping to AudioProcessorError + FAILED, and artifact-generation failure still completing the job). 458 passing, coverage 93.06%.
…verage) - security.py: bound _RATE_WINDOWS with opportunistic eviction of expired windows above a soft cap, so a flood of unique client identifiers cannot grow the per-process map without bound. - config.py: add a model_validator enforcing the documented invariant that enqueue_enabled requires job_store_backend='redis', so the misconfiguration fails fast at startup instead of silently never processing jobs. - test_job_store.py: cover the bytes-decode branch of RedisJobStore._decode.
CodeQL flagged UploadFile as unused because its only reference was the string
in cast("UploadFile", ...), and ruff TC006 conversely requires the cast type to
be quoted -- a direct conflict. Drop the cast/import entirely and pass the
duck-typed _FakeUpload directly with a precise reportArgumentType ignore on the
call. Also rename the autouse fixture _clear_store -> clear_store to match the
suite convention and avoid reportUnusedFunction.
PR ReviewMerge status: No hard blockers. CI is fully green, coverage 93.06%, and all new capabilities (Redis job store, API-key auth, rate limiting, ARQ enqueue) default to off, so current behavior is preserved. Async reviewers are clean: Copilot 0 comments, all 4 CodeRabbit items resolved in e93c248..8225703, CodeQL unused-import fixed (0 open alerts), SonarCloud + qlty gates passed. The findings below are "fix before enabling the new features in production," not merge blockers. No Critical tier; two items (atomic update, artifact catch) were agent-rated Critical but down-tiered to Important because they only bite once the redis+enqueue path is enabled, which is the PR's end goal. Important (fix before enabling redis/auth/enqueue)
Suggested (selected)
Verified correct: streaming upload caps on bytes read (not Content-Length), temp cleanup in 🤖 Generated with Claude Code |
Closes #54. RedisJobStore stored each job as a single JSON string and updated it with a read-modify-write (GET, merge in Python, SET). Because the API and the ARQ worker write the same job from separate processes, two concurrent updates could clobber each other (lost update): whichever SET landed last overwrote every field the other writer had changed. This is the failure mode the F2 "shared job store" fix is meant to prevent, so the store must not reintroduce it. Store each job as a Redis hash (job:{id}) with individually JSON-encoded field values: - create: MULTI/EXEC of DELETE + HSET(mapping) + EXPIRE (replaces any prior record, clears stale fields, reapplies TTL). - update: HSET only the provided non-None fields + EXPIRE. HSET is atomic, so writers touching different fields no longer clobber each other; same-field writes remain last-writer-wins (the expected contract). - get: HGETALL + per-field decode, tolerant of bytes or str (clients created without decode_responses return bytes). Consolidate the two duplicated get/set FakeRedis stand-ins into a single hash-capable tests/unit/_fake_redis.py (adds hset/hgetall/expire/delete and a buffered transactional pipeline), and add a regression test asserting two disjoint concurrent field updates both survive. Suite: 460 passing, coverage 93.03%. ruff + ruff format clean; basedpyright src/: 0 errors. Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
| from audio_processor.core.config import settings | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import Awaitable |
Re-applied on top of the merged atomic store (#57), which already implemented the per-field HSET RedisJobStore and the shared test fake. This commit adds the remaining review items that #57 did not cover: Security: - api_keys is now SecretStr (was plain str) so keys never leak via repr / model_dump / logs; api_key_set reads via get_secret_value(). - require_api_key compares against all keys without short-circuiting (avoids a timing side channel); the rate limiter hashes the API key before using it as an in-memory map key. - Fail-fast Settings validator: auth_required requires at least one api_key. Memory: - Rate-limiter window map is hard-capped: after evicting expired windows it evicts the oldest entries when still over the cap. Error handling: - _generate_artifacts catches any exception (DOM/json.dumps can raise TypeError/ValueError/KeyError) and records an artifacts_error field instead of failing a completed transcription. - Status-route timestamp parsing is defensive (no opaque 500 on a bad record). - _maybe_enqueue raises when enabled without a pool, and enqueue failures mark the job FAILED instead of stranding it QUEUED. Tests: test_config.py (validators, SecretStr non-leak) and _maybe_enqueue seam tests. 489 passing, coverage 92.69%.
…e guard The except re-raises, so BLE001 does not apply and the suppression was unused (ruff RUF100), failing the Code Quality CI checks.
CodeQL flagged hashlib.sha256(api_key) as weak hashing of sensitive data (it treats the API key as a password requiring a slow KDF). The hash here is only an in-process rate-limit bucket id, not password storage. Use the builtin non-cryptographic hash(), which keeps per-key buckets and still avoids holding the raw secret as a map key, without tripping the crypto sink.
|



Summary
Implements the top 3 highest-value items from the repository architecture & maintainability review, plus the next tier of lower-risk cleanups. Each item is an independent commit; new capabilities default to off so existing behavior is preserved until explicitly enabled.
preprocessing/vsservices/)process_audio_jobmixing concernsTop 3
F1 — Remove the dead
preprocessing/pipeline. Superseded byservices/, imported nowhere, computed quality metrics with a different SNR algorithm, and kept a second Silero cache. Removed the package, the orphaned top-levelexceptions.py, and its tests.F2 — Unify the job lifecycle behind a shared
JobStore. Newcore/job_store.py(InMemoryJobStore+RedisJobStore, one key scheme/serialization). The API resolves the store fromapp.stateand enqueues to ARQ when enabled;audio_tasks._update_job_statusdelegates toRedisJobStore;api/__init__.pylifespan wires the Redis store + ARQ pool. New flagsjob_store_backend/enqueue_enableddefault to memory/off. Amodel_validatorenforcesenqueue_enabled⇒redisat startup.F3/F4 — Auth, rate limiting, safe streaming uploads.
api/security.py: constant-timeX-API-Keyauth (router-level) and a per-client fixed-window rate limiter with bounded memory (onPOST /process), both gated and default off. Uploads stream to disk in 1 MiB chunks with a hard byte cap enforced on bytes read (header no longer trusted); afinallyguarantees temp-file cleanup on failure.Next tier
F5 — Decompose
process_audio_jobinto_progress+ stage helpers (_convert_audio,_transcribe,_build_transcription_payload,_generate_artifacts); a ~80-line orchestrator with identical behavior. Added tests for both error branches.F7 — Remove template scaffolding: worker example/stub tasks + no-op cron + embedded example blocks (now registers only
process_audio_job), and the emptyutils/financial.py.F12 — Remove dead Python 3.10 UTC shims across four modules (
requires-python>=3.11makes the fallback unreachable).Tests & verification
test_job_store.py,test_audio_tasks.py(first coverage forprocess_audio_jobincl. error branches),test_security.py,test_routes_hardening.py(streaming cap + temp cleanup + auth enforcement).ruff check+ruff format --check: clean.basedpyright src/: 0 errors.bandit: 0 High.Review comments addressed
CodeRabbit review items folded in: bounded rate-limiter map,
enqueue_enabled→redisvalidator, bytes-decode test coverage, and the twoprocess_audio_joberror-branch tests. CodeQL unused-import flag fixed.Enabling end-to-end processing
Set
JOB_STORE_BACKEND=redis+ENQUEUE_ENABLED=trueand run the ARQ worker (arq audio_processor.jobs.worker.WorkerSettings). Integration is unit-proven with a fake Redis.Still deferred (follow-ups)
AudioJobmodel adoption in routes) — kept the dict record to preserve the test injection contract.https://claude.ai/code/session_01KPHSWypMUKgGVJakajTQbB