feat(meeting): implement real-time websocket gateway for audio, signa…#47
feat(meeting): implement real-time websocket gateway for audio, signa…#47aniebietafia merged 6 commits intomainfrom
Conversation
…ling, and captions This adds the core WebSocket layer bridging frontend clients to the Kafka-backed AI processing pipeline. Specific additions: - Created `ConnectionManager` leveraging Redis Pub/Sub to allow WebSocket broadcasting and unicasting across multi-instance pod deployments. - Added `authenticate_ws` dependency for securely validating JWTs via query parameters (since browser WS APIs lack custom header support). - Implemented `/ws/signaling` for WebRTC peer-to-peer negotiation relay. - Implemented bidirectional `/ws/audio` ingestion (WebSocket -> Kafka `audio.raw`) and egress (Kafka `audio.synthesized` -> WebSocket) with sequence-based stale frame dropping. - Implemented unidirectional `/ws/captions` broadcast powered by dynamic consumer groups. - Wrote full unit test and integration suite for WebSocket handlers. - Generated `docs/testing.md` guide detailing manual testing workflows via Postman. Closes: #8 and #9 Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
📝 WalkthroughWalkthroughAdds a Kafka-backed realtime audio pipeline (ingest → STT → translation → TTS → egress), WebSocket endpoints and connection management, provider integrations (Deepgram, DeepL, OpenAI TTS, Voice.ai), user role column and admin init, many supporting services, schemas, and extensive tests. Changes
Sequence Diagram(s)sequenceDiagram
participant WS_In as WebSocket<br/>(Audio Ingest)
participant Ingest as AudioIngestService
participant Kafka as Kafka
participant STT as STTWorker
participant Translate as TranslationWorker
participant TTS as TTSWorker
participant WS_Out as WebSocket<br/>(Audio Egress)
WS_In->>Ingest: publish_audio_chunk(room_id, user_id, audio_bytes)
Ingest->>Ingest: base64_encode, increment sequence
Ingest->>Kafka: produce(topic="audio.raw", event=AudioChunkEvent)
Kafka-->>STT: AudioChunkEvent
STT->>STT: decode audio, call Deepgram
STT->>Kafka: produce(topic="text.original", event=TranscriptionEvent)
Kafka-->>Translate: TranscriptionEvent
Translate->>Translate: determine target languages, call DeepL/OpenAI fallback
Translate->>Kafka: produce(topic="text.translated", event=TranslationEvent)
Kafka-->>TTS: TranslationEvent
TTS->>TTS: select provider, synthesize audio
TTS->>Kafka: produce(topic="audio.synthesized", event=SynthesizedAudioEvent)
Kafka-->>WS_Out: SynthesizedAudioEvent
WS_Out->>WS_In: send_bytes to meeting participants
Estimated code review effort🎯 5 (Critical) | ⏱️ ~110 minutes Possibly related issues
Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| # Cancel whatever is still running | ||
| for t in pending: | ||
| t.cancel() | ||
| except Exception: |
|
|
||
| await websocket.send_json(caption_msg) | ||
|
|
||
| except WebSocketDisconnect: |
| logger.warning( | ||
| "Failed to send unicast message to %s", target_id | ||
| ) | ||
| except asyncio.CancelledError: |
| app.dependency_overrides[get_account_lockout_service] = _override_lockout_svc | ||
|
|
||
| # Mock the kafka manager to prevent lifespan from bridging actual sockets | ||
| import app.main as app_main_module |
There was a problem hiding this comment.
Actionable comments posted: 11
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
🟡 Minor comments (15)
scripts/test_audio_client.py-14-18 (1)
14-18:⚠️ Potential issue | 🟡 MinorAdd None checks before constructing the WebSocket URL.
If
ROOM_CODEorACCESS_TOKENare not set, the URL will contain literal"None"strings, causing confusing connection failures.Proposed fix
-ROOM_CODE = f"{settings.ROOM_CODE}" -# NOTE: Replace 'YOUR_ACCESS_TOKEN' with the JWT token from Postman -TOKEN = f"{settings.ACCESS_TOKEN}" +ROOM_CODE = settings.ROOM_CODE +TOKEN = settings.ACCESS_TOKEN + +if not ROOM_CODE or not TOKEN: + raise ValueError( + "ROOM_CODE and ACCESS_TOKEN must be set in environment. " + "See docs/testing.md for manual testing setup." + ) -WS_URL = f"ws://localhost:8000/api/v1/ws/audio/{ROOM_CODE}?token={TOKEN}" +WS_URL = f"ws://localhost:8000/api/v1/ws/audio/{ROOM_CODE}?token={TOKEN}"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/test_audio_client.py` around lines 14 - 18, ROOM_CODE and ACCESS_TOKEN can be None causing WS_URL to contain "None"; before building WS_URL check settings.ROOM_CODE and settings.ACCESS_TOKEN (used to set ROOM_CODE and TOKEN) and raise or exit with a clear error if either is missing, or prompt to set them, so that ROOM_CODE and TOKEN are guaranteed non-empty strings before constructing WS_URL (the variables to update are ROOM_CODE, TOKEN and WS_URL).app/services/audio_bridge.py-77-77 (1)
77-77:⚠️ Potential issue | 🟡 MinorInvalid encoding string will raise an unhandled
ValueError.
AudioEncoding(encoding)will raiseValueErrorif the caller passes an unsupported encoding. Consider validating or catching this to provide a clearer error message.Proposed fix
+ try: + audio_encoding = AudioEncoding(encoding) + except ValueError: + logger.warning("Unsupported encoding %r, defaulting to LINEAR16", encoding) + audio_encoding = AudioEncoding.LINEAR16 + payload = AudioChunkPayload( room_id=room_id, user_id=user_id, sequence_number=seq, audio_data=audio_b64, sample_rate=sample_rate, - encoding=AudioEncoding(encoding), + encoding=audio_encoding, source_language=source_language, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/audio_bridge.py` at line 77, The call to AudioEncoding(encoding) can raise an unhandled ValueError for unsupported encoding strings; update the code around the AudioEncoding(encoding) usage to validate or catch that exception (e.g., wrap the conversion in a try/except ValueError), then raise or return a clearer error message that includes the invalid encoding value and acceptable options (or fall back to a safe default). Reference the AudioEncoding constructor and the encoding variable to locate where to add the try/except and improved error handling.app/services/stt_worker.py-44-44 (1)
44-44:⚠️ Potential issue | 🟡 MinorHandle potential
base64.b64decodeerrors.
base64.b64decoderaisesbinascii.Errorif the input is malformed. An invalidaudio_datapayload would crash the worker. Consider wrapping in try/except and logging/skipping corrupt chunks.🛡️ Suggested defensive handling
+ import binascii + # 1. Decode base64 audio - audio_bytes = base64.b64decode(payload.audio_data) + try: + audio_bytes = base64.b64decode(payload.audio_data) + except binascii.Error: + logger.warning( + "Invalid base64 in audio chunk seq=%d from user=%s, skipping", + payload.sequence_number, + payload.user_id, + ) + return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/stt_worker.py` at line 44, The base64 decoding line audio_bytes = base64.b64decode(payload.audio_data) can raise binascii.Error for malformed input; wrap this call in a try/except that catches binascii.Error (and ValueError) and handle corrupt payloads by logging the error with context (including payload.id or chunk index) and skipping or returning early from the processing function (e.g., the handler in stt_worker.py); also ensure binascii is imported and use the existing logger (e.g., logger or process_logger) when emitting the error.app/external_services/openai_tts/config.py-8-12 (1)
8-12:⚠️ Potential issue | 🟡 MinorHarden API key validation against whitespace-only values.
A value like
" "passes the current check and generates invalid auth headers.🛠️ Suggested fix
- if not settings.OPENAI_API_KEY: + if not settings.OPENAI_API_KEY or not settings.OPENAI_API_KEY.strip(): raise RuntimeError("OPENAI_API_KEY is not configured.") + api_key = settings.OPENAI_API_KEY.strip() return { - "Authorization": f"Bearer {settings.OPENAI_API_KEY}", + "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/external_services/openai_tts/config.py` around lines 8 - 12, The OPENAI_API_KEY check currently allows whitespace-only values; update the validation around settings.OPENAI_API_KEY (the value used to build the Authorization header) to ensure it's non-empty after trimming whitespace (e.g., if not settings.OPENAI_API_KEY or not settings.OPENAI_API_KEY.strip(): raise RuntimeError(...)). Keep building the header as before using the original value (or the stripped value) once validated.app/core/init_admin.py-26-44 (1)
26-44:⚠️ Potential issue | 🟡 MinorAdd error handling with rollback for database operations.
If
db.commit()fails (constraint violation, connection error), the session is left in a bad state without explicit rollback. The caller inmain.pycatches the exception but the session may be corrupted.🛡️ Proposed fix with error handling
+ try: if existing_admin: if existing_admin.user_role != UserRole.ADMIN.value: existing_admin.user_role = UserRole.ADMIN.value db.commit() logger.info("Existing admin user updated with ADMIN role.") return logger.info("Creating default admin user: System Admin") admin_user = User( email=admin_email, full_name="System Admin", hashed_password=security_service.hash_password(settings.ADMIN_PASSWORD), user_role=UserRole.ADMIN.value, is_active=True, is_verified=True, ) db.add(admin_user) db.commit() logger.info("Default admin user created successfully.") + except Exception: + db.rollback() + raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/core/init_admin.py` around lines 26 - 44, The DB commit calls in the admin initialization (when updating existing_admin.user_role and when creating admin_user via db.add/db.commit) lack error handling and rollback; wrap each transaction block (the existing_admin update path and the new User creation path where admin_user is added) in try/except, on exception call db.rollback(), log the exception with context (e.g., include existing_admin or admin_email), and re-raise or raise a descriptive exception so the caller can handle it; ensure you reference the same symbols (existing_admin, admin_user, db.commit, db.rollback, User) when locating and updating the code.tests/meeting/test_ws_router.py-91-92 (1)
91-92:⚠️ Potential issue | 🟡 MinorAvoid blocking
time.sleep()in tests.
time.sleep(0.1)blocks the thread and is fragile for timing-dependent assertions. Consider using proper synchronization or increasing reliability with retries.♻️ Proposed improvement
- websocket.send_bytes(b"fake_audio_chunk") - time.sleep(0.1) # Yield to event loop for background tasks to process + websocket.send_bytes(b"fake_audio_chunk") + # Give time for async processing; consider using a more deterministic approach + import asyncio + # For TestClient sync context, sleep is acceptable but could be flaky + time.sleep(0.2) # Increased margin for CI environmentsAlternatively, consider refactoring the test to use
pytest-asynciowithhttpx.AsyncClientfor proper async WebSocket testing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/meeting/test_ws_router.py` around lines 91 - 92, The test currently blocks with time.sleep(0.1) after websocket.send_bytes(b"fake_audio_chunk"); replace this blocking call with non-blocking async synchronization (e.g., await asyncio.sleep(0.1)) or, better, wait on a concrete condition/event with retries (use an asyncio.Event, a polling loop with asyncio.sleep, or assert-with-retry) to ensure background handlers processed the chunk; update the test to use the async pattern (or migrate to pytest-asyncio and httpx.AsyncClient) and reference websocket.send_bytes and the post-send wait logic when making the change.app/services/translation_worker.py-106-112 (1)
106-112:⚠️ Potential issue | 🟡 MinorPotential PII exposure in debug logs.
Logging the first 50 characters of translated text could expose user-generated content (potentially PII) in production logs. Consider reducing log verbosity or masking the content.
Proposed fix
logger.debug( - "Translation: seq=%d %s→%s text='%s'", + "Translation: seq=%d %s→%s len=%d", payload.sequence_number, payload.source_language, target_lang, - translated_text[:50], + len(translated_text), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/translation_worker.py` around lines 106 - 112, The debug log in translation_worker.py currently prints user content (translated_text) which risks PII exposure; update the logger.debug call that references payload.sequence_number, payload.source_language, target_lang, and translated_text to stop emitting raw text—either remove translated_text entirely, replace it with a redacted/masked placeholder, or log only non-sensitive metadata such as the text length or a deterministic hash/fingerprint of translated_text; ensure the change is applied to the logger.debug invocation shown so production logs never contain user-generated content.app/services/connection_manager.py-123-124 (1)
123-124:⚠️ Potential issue | 🟡 MinorAdd explanatory comment for empty
except CancelledErrorblock.CodeQL flagged this empty except. While suppressing
CancelledErroris intentional for graceful shutdown, adding a comment clarifies intent.Proposed fix
except asyncio.CancelledError: - pass + pass # Expected during graceful shutdown when room becomes empty🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/connection_manager.py` around lines 123 - 124, Add a brief explanatory comment above the empty except asyncio.CancelledError: block clarifying that swallowing CancelledError is intentional to allow graceful shutdown/task cancellation (e.g., "Intentionally ignore CancelledError during shutdown to avoid noisy traces; re-raise other exceptions"). Locate the except asyncio.CancelledError handler and replace the bare pass with a single-line comment explaining the intent so future readers and CodeQL understand this suppression is deliberate.app/external_services/deepl/service.py-171-172 (1)
171-172:⚠️ Potential issue | 🟡 MinorAdd defensive check for empty
choicesarray.Accessing
data["choices"][0]without checking ifchoicesis non-empty could raiseIndexErrororKeyErrorif the OpenAI API returns an unexpected response.Proposed fix
data = response.json() - translated = data["choices"][0]["message"]["content"].strip() + choices = data.get("choices", []) + if not choices: + raise RuntimeError("OpenAI returned empty choices array") + translated = choices[0]["message"]["content"].strip()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/external_services/deepl/service.py` around lines 171 - 172, The assignment to translated directly indexes data["choices"][0] without verifying the choices array; in the function that processes the OpenAI response (where response.json() is assigned to data and translated is computed) add a defensive check that data contains a non-empty "choices" list and that choices[0] has the expected "message"->"content" keys, and if not, log or raise a clear error (or return a safe fallback) instead of allowing an IndexError/KeyError; update the logic around the translated = data["choices"][0]["message"]["content"].strip() line to validate data.get("choices") and its first element before accessing nested fields.app/external_services/voiceai/service.py-76-76 (1)
76-76:⚠️ Potential issue | 🟡 MinorRemove debug print statement.
This
print()statement appears to be a debug artifact that should be removed before merging. Use the existing logger instead.Proposed fix
- print(f"Voice.ai Audio format: {audio_format}") + logger.debug("Voice.ai Audio format: %s", audio_format)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/external_services/voiceai/service.py` at line 76, Remove the leftover debug print statement that outputs f"Voice.ai Audio format: {audio_format}" and replace it with the module's existing logger call (e.g., logger.debug or logger.info) so logging is consistent; locate the print referencing the audio_format variable and swap it to logger.debug(f"Voice.ai Audio format: {audio_format}") (or logger.info if more appropriate) ensuring you use the existing logger instance used elsewhere in the file.app/meeting/ws_router.py-256-260 (1)
256-260:⚠️ Potential issue | 🟡 MinorMove import outside exception handler and avoid printing traceback.
Importing
tracebackinside the exception handler is inefficient if exceptions occur frequently. Thetraceback.print_exc()should use logger instead.Proposed fix
+import traceback # Move to top of file with other imports ... except Exception as frame_err: - print(f"Error processing egress frame: {frame_err}") - import traceback - - traceback.print_exc() + logger.exception("Error processing egress frame: %s", frame_err)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_router.py` around lines 256 - 260, Move the traceback import out of the exception handler and stop using print/traceback.print_exc(); instead log the exception with the module logger and include the exception info. In the except block that currently captures frame_err and calls traceback.print_exc(), remove the inline import and print, ensure traceback is imported at module top, and replace the print/print_exc() calls by using logger.exception(...) or logger.error(..., exc_info=True) to record the stack trace (referencing the existing frame_err variable and the traceback.print_exc() call to locate the code).tests/meeting/test_ws.py-80-86 (1)
80-86:⚠️ Potential issue | 🟡 MinorRemove unnecessary
@pytest.mark.asynciodecorator.The
test_authenticate_ws_valid_tokenfunction is decorated with@pytest.mark.asyncioand defined asasync, butauthenticate_wsis a synchronous function. Remove the@pytest.mark.asynciodecorator and changeasync deftodef.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/meeting/test_ws.py` around lines 80 - 86, The test function test_authenticate_ws_valid_token is incorrectly async and marked with `@pytest.mark.asyncio` even though authenticate_ws is synchronous; change the test to a normal synchronous test by removing the `@pytest.mark.asyncio` decorator and converting "async def test_authenticate_ws_valid_token()" to "def test_authenticate_ws_valid_token()", leaving the patched jwt.decode usage and assertions unchanged.app/external_services/voiceai/service.py-90-90 (1)
90-90:⚠️ Potential issue | 🟡 MinorRemove debug print statement.
Same issue — use the logger for consistency with line 91.
Proposed fix
- print(f"Voice.ai TTS API completed in {elapsed_ms} ms")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/external_services/voiceai/service.py` at line 90, Remove the debug print statement that outputs "Voice.ai TTS API completed in {elapsed_ms} ms" and replace it with a call to the module's existing logger (e.g., logger or processLogger) using an appropriate level such as logger.info(f"Voice.ai TTS API completed in {elapsed_ms} ms"); eliminate the print(...) invocation so all logs are consistent with the surrounding logging on the Voice.ai TTS code path.app/meeting/ws_router.py-282-283 (1)
282-283:⚠️ Potential issue | 🟡 MinorAdd logging or comment for empty except block.
CodeQL flagged this empty except. Silent exception swallowing makes debugging difficult.
Proposed fix
except Exception: - pass + logger.debug("Audio websocket tasks terminated")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_router.py` around lines 282 - 283, There is an empty except Exception: pass block that silently swallows errors; replace it with an explicit logging call (or a clear comment if swallowing is intentional). Locate the catch in ws_router.py (the try/except surrounding message routing/handling) and change it to except Exception as e: logger.exception("Unhandled error in [function_name]: %s", e) using the module logger (create one via logging.getLogger(__name__) if none exists), or if the exception truly should be ignored, add a comment explaining why and include minimal logging at debug level so failures are visible.app/meeting/ws_router.py-93-93 (1)
93-93:⚠️ Potential issue | 🟡 MinorRemove debug print statements throughout the file.
Multiple
print()statements are scattered throughout this file (lines 93, 159-163, 182-188, 192, 206-209, 226, 241-244, 248-254, 257). These should be removed or converted to proper logger calls before merging.Example fix for line 93
- print("Audio WS client connected: %s", user_id) + logger.info("Audio WS client connected: %s", user_id)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_router.py` at line 93, The file contains leftover debug print() calls (e.g., the one using user_id) that should be removed or converted to proper logging; replace all print(...) occurrences in app/meeting/ws_router.py with calls to a module logger (create logger = logging.getLogger(__name__) if missing) using appropriate levels (logger.debug or logger.info) and include contextual data like user_id in the formatted message, or remove the prints entirely if they are unnecessary; ensure imports and logger creation are added/updated and no print statements remain in functions handling WebSocket connections.
🧹 Nitpick comments (23)
tests/test_auth/test_auth_refresh.py (2)
234-238: Moveimport asyncioto module level.The
asyncioimport is repeated inside multiple test methods (lines 234, 258, 283, 349, 385, 406). Move it to the top of the file with other imports.♻️ Proposed fix
Add at the top with other imports (around line 7):
import asyncioThen remove all the inline
import asynciostatements from the test methods.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_auth/test_auth_refresh.py` around lines 234 - 238, Move the repeated inline "import asyncio" statements to the module level: add a single "import asyncio" alongside the other top-of-file imports and remove the in-function imports found in the tests that call asyncio.run(...), e.g. where tests invoke asyncio.run(token_store.save_refresh_token(...)) and other asyncio.run usages; ensure no duplicate inline imports remain so all test functions reuse the module-level asyncio import.
150-178: Consider using try/finally for more robust cleanup.If an exception occurs during teardown (e.g., at line 177),
app.dependency_overrides.clear()won't execute. Using try/finally ensures both cleanup steps run.♻️ Proposed fix for robust cleanup
app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[get_email_producer_service] = _override_email_producer app.dependency_overrides[get_token_store_service] = _override_token_store app.dependency_overrides[get_account_lockout_service] = _override_lockout_svc limiter.enabled = False - with TestClient(app) as test_client: - yield test_client - limiter.enabled = True - app.dependency_overrides.clear() + try: + with TestClient(app) as test_client: + yield test_client + finally: + limiter.enabled = True + app.dependency_overrides.clear()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_auth/test_auth_refresh.py` around lines 150 - 178, The fixture client may skip cleanup if an exception occurs; wrap the TestClient context and the limiter toggle/dependency override setup in a try/finally: set app.dependency_overrides and limiter.enabled = False before entering the TestClient, yield test_client inside the try block, and in the finally block restore limiter.enabled = True and call app.dependency_overrides.clear(); update the client fixture (the function and its inner overrides like _override_get_db) to use this try/finally around TestClient(app) so both limiter and dependency overrides are always reset.docs/testing.md (1)
5-9: Minor style improvement: vary sentence structure in prerequisites.Static analysis flagged repetitive sentence beginnings. Consider varying the structure for better readability.
📝 Suggested rewording
## Prerequisites -1. Ensure the FluentMeet backend is running (`uvicorn app.main:app --reload`). -2. Ensure Kafka and Redis are running locally. -3. Ensure the Kafka Consumers (STT, translation, TTS) are running in the background. +1. Start the FluentMeet backend (`uvicorn app.main:app --reload`). +2. Have Kafka and Redis running locally. +3. Verify that Kafka consumers (STT, translation, TTS) are running in the background.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/testing.md` around lines 5 - 9, The "## Prerequisites" list repeats the same sentence structure; update the three bullets under that heading to vary their openings for readability (e.g., start one with an imperative like "Start the FluentMeet backend..." referencing the heading "## Prerequisites", another with "Run Kafka and Redis locally" and the third with "Ensure Kafka Consumers (STT, translation, TTS) are running in the background"); keep the meaning intact but rephrase each item to avoid repetitive beginnings.app/kafka/topics.py (1)
22-30: Consider using a tuple for true immutability.
Finalonly prevents reassignment of the variable, not mutation of the list contents. Using a tuple ensures the collection itself is immutable.♻️ Suggested change
# All standard topics that should be auto-created on startup -TOPICS_TO_CREATE: Final = [ +TOPICS_TO_CREATE: Final = ( NOTIFICATIONS_EMAIL, MEDIA_UPLOAD, MEDIA_PROCESS_RECORDING, AUDIO_RAW, AUDIO_SYNTHESIZED, TEXT_ORIGINAL, TEXT_TRANSLATED, -] +)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/kafka/topics.py` around lines 22 - 30, TOPICS_TO_CREATE is declared as a Final list which still allows mutation of its contents; change it to an immutable tuple by replacing the list literal with a tuple literal (e.g., (...) instead of [...]) for true immutability. Update the type annotation if desired to use typing.Tuple (e.g., Tuple[str, ...]) or leave Final with the tuple literal; ensure the constant name TOPICS_TO_CREATE and the referenced topic constants (NOTIFICATIONS_EMAIL, MEDIA_UPLOAD, etc.) remain in the tuple.app/auth/constants.py (1)
1-15: LGTM!Clean enum definitions using
StrEnumwith correct ISO 639-1 language codes. Consider adding a module-level docstring for consistency with other modules in the project.📝 Optional: Add module docstring
+"""Constants for the auth feature package.""" + import enum🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/auth/constants.py` around lines 1 - 15, Add a short module-level docstring at the top of the file describing the purpose of the constants module (e.g., exports of UserRole and SupportedLanguage enums) so it matches other modules' documentation style; place the docstring before the imports and mention the UserRole and SupportedLanguage enums (and that they use enum.StrEnum) so readers immediately understand the module's intent.app/core/config.py (1)
95-98: Request-scoped values in application config may expose secrets.
ROOM_CODE,ACCESS_TOKEN, andSYSTEM_PATHappear to be test/debug conveniences (used byscripts/test_audio_client.py). HavingACCESS_TOKENas an environment variable risks:
- Accidental logging of settings
- Token persistence in shell history or
.envfilesConsider moving these to a separate test-only config or documenting that they're for local development only.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/core/config.py` around lines 95 - 98, ROOM_CODE, ACCESS_TOKEN, and SYSTEM_PATH are request/test-scoped values that shouldn't live in the main application config; remove them from the core config (symbols: ROOM_CODE, ACCESS_TOKEN, SYSTEM_PATH) and move them into a separate test-only configuration module or a development-only branch (e.g., test_config or dev_config) that is only imported when DEBUG/TESTING is enabled, or load them conditionally from environment only if an explicit DEV/Test flag is set; update any references in test scripts (e.g., scripts/test_audio_client.py) to import the new test config and add documentation/comments that these values are for local development only and must never be set in production envs.app/external_services/deepgram/service.py (2)
57-65: Consider reusingAsyncClientfor connection pooling.Creating a new
AsyncClientper request loses HTTP/2 connection pooling benefits. For a high-throughput pipeline, this adds latency overhead.Proposed fix — lazily initialize a shared client
class DeepgramSTTService: """Stateless service for converting audio bytes to text via Deepgram.""" def __init__(self, timeout: float = 10.0) -> None: self._timeout = timeout + self._client: httpx.AsyncClient | None = None + + async def _get_client(self) -> httpx.AsyncClient: + if self._client is None: + self._client = httpx.AsyncClient(timeout=self._timeout) + return self._client + + async def close(self) -> None: + if self._client: + await self._client.aclose() + self._client = None async def transcribe( self, ... ) -> dict: ... - async with httpx.AsyncClient(timeout=self._timeout) as client: - response = await client.post( - settings.DEEPGRAM_API_URL, - headers=headers, - params=params, - content=audio_bytes, - ) - response.raise_for_status() + client = await self._get_client() + response = await client.post( + settings.DEEPGRAM_API_URL, + headers=headers, + params=params, + content=audio_bytes, + ) + response.raise_for_status()Remember to call
close()during application shutdown.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/external_services/deepgram/service.py` around lines 57 - 65, The code creates a new httpx.AsyncClient per request (using "async with httpx.AsyncClient(timeout=self._timeout) as client") which loses connection pooling; change the service to lazily initialize and reuse a single AsyncClient instance (e.g., self._client) configured with the same timeout (and http2 if desired) and replace the context-manager POST call with self._client.post(...), keeping response.raise_for_status() as-is; also add a shutdown/cleanup path that awaits self._client.aclose() (or close()) to release connections during application shutdown so the client isn't leaked.
25-65: Missing retry/backoff for transient failures.Per the linked issue
#8, workers should "implement retries/backoff and logging for transient errors." Consider wrapping the HTTP call with exponential backoff for5xxor timeout errors.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/external_services/deepgram/service.py` around lines 25 - 65, The transcribe method currently makes a single HTTP call to Deepgram (using httpx.AsyncClient with self._timeout and settings.DEEPGRAM_API_URL) and lacks retry/backoff for transient failures; update transcribe to wrap the POST (the client.post + response.raise_for_status) in an exponential backoff retry loop that retries on network timeouts (httpx.TimeoutException), transient 5xx responses, and connection errors, logging each attempt and backoff via the module logger, using an increasing async sleep delay (e.g., base_delay * 2**attempt) up to a max attempts limit, and re-raising the last exception if retries are exhausted. Ensure headers come from get_deepgram_headers() as before and preserve returned dict on success.scripts/test_audio_client.py (1)
93-97: Movetracebackimport to the top of the file.Importing inside an exception handler works but is non-idiomatic. Top-level imports improve readability and avoid repeated import overhead if the exception occurs multiple times.
Proposed fix
import asyncio import base64 from pathlib import Path +import traceback import websocketsThen remove lines 95-96:
except Exception as e: print(f"Connection error: {type(e).__name__}: {e}") - import traceback - traceback.print_exc()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/test_audio_client.py` around lines 93 - 97, Move the traceback import out of the exception handler and place it with the top-level imports; specifically, add "import traceback" to the module-level imports and remove the inline "import traceback" inside the "except Exception as e:" block so the except only prints the error and calls traceback.print_exc() without importing.app/meeting/service.py (1)
284-292: Consider extracting duplicated language resolution logic.The same priority logic (
explicit listening_language > user.listening_language > "en") appears in both_check_lobby_required(lines 284-290) and_finalize_join(lines 324-330). Consider extracting to a small helper to reduce duplication.♻️ Suggested helper extraction
def _resolve_listening_language( explicit: str | None, user: User | None ) -> str: """Return final language: explicit > user profile > default 'en'.""" if explicit: return explicit if user and user.listening_language: return user.listening_language return "en"Then use in both methods:
- if listening_language: - final_lang = listening_language - elif user and user.listening_language: - final_lang = user.listening_language - else: - final_lang = "en" + final_lang = _resolve_listening_language(listening_language, user)Also applies to: 324-332
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/service.py` around lines 284 - 292, There is duplicated language-resolution logic in _check_lobby_required and _finalize_join; extract a helper (e.g., _resolve_listening_language(explicit: str | None, user: User | None) -> str) that returns explicit > user.listening_language > "en", then replace the inline blocks in both _check_lobby_required and _finalize_join to call _resolve_listening_language(listening_language, user) and use its return value when calling self.state.add_to_lobby and any other places computing final_lang.app/user/schemas.py (1)
41-41: Use the role enum in the response schema instead of rawstr.Typing this field as the shared
UserRoleenum keeps API contracts tighter and prevents drift in accepted/returned role values.♻️ Suggested change
-from app.auth.schemas import SupportedLanguage +from app.auth.constants import UserRole +from app.auth.schemas import SupportedLanguage @@ - user_role: str + user_role: UserRole🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/user/schemas.py` at line 41, Change the user_role field to use the shared UserRole enum type instead of raw str: update the schema's field declaration (user_role) to type UserRole, add/import the UserRole enum into this module if not already imported, and ensure any JSON serialization/deserialization (e.g., Pydantic model or schema class) still works with enums (Pydantic will serialize enums to their value by default); adjust any tests or usages that expect a plain string to accept the enum type or its value.app/kafka/manager.py (1)
78-86: Consider making partition and replication settings configurable.Hardcoded
num_partitions=1andreplication_factor=1limit horizontal scaling and fault tolerance. For production, audio pipeline topics may need more partitions for parallelism, andreplication_factor > 1for durability.♻️ Proposed approach
+from app.core.config import settings + for topic in TOPICS_TO_CREATE: new_topics.append( - NewTopic(name=topic, num_partitions=1, replication_factor=1) + NewTopic( + name=topic, + num_partitions=settings.KAFKA_DEFAULT_PARTITIONS, + replication_factor=settings.KAFKA_REPLICATION_FACTOR, + ) ) new_topics.append( NewTopic( - name=f"dlq.{topic}", num_partitions=1, replication_factor=1 + name=f"dlq.{topic}", + num_partitions=1, # DLQ can stay at 1 partition + replication_factor=settings.KAFKA_REPLICATION_FACTOR, ) )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/kafka/manager.py` around lines 78 - 86, The topic creation loop hardcodes num_partitions=1 and replication_factor=1 in NewTopic calls (for topics from TOPICS_TO_CREATE and their dlq counterparts), which prevents scaling and durability; make partition and replication settings configurable by reading values (e.g., KAFKA_NUM_PARTITIONS and KAFKA_REPLICATION_FACTOR or config values) and use those variables when constructing NewTopic(name=..., num_partitions=num_partitions, replication_factor=replication_factor), with sensible defaults if env/config is missing and validation to ensure replication_factor does not exceed the cluster's broker count.tests/meeting/test_ws_router.py (1)
15-19: Dependency override signature doesn't match original function.The override
lambda: "user1"doesn't accept parameters, whileauthenticate_ws(token, db)does. FastAPI handles this but it's unconventional. Consider matching the signature for clarity.♻️ Proposed fix
`@pytest.fixture`(autouse=True) def override_auth(): - app.dependency_overrides[authenticate_ws] = lambda: "user1" + app.dependency_overrides[authenticate_ws] = lambda token=None, db=None: "user1" yield app.dependency_overrides = {}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/meeting/test_ws_router.py` around lines 15 - 19, The test fixture override_auth registers a dependency override for authenticate_ws but the override uses a zero-arg lambda while authenticate_ws expects (token, db); change the override to match the original signature (e.g., a callable accepting token and db and returning "user1") so FastAPI dependency resolution and test readability are correct, keeping the same use of app.dependency_overrides and the existing teardown that clears the overrides.app/meeting/ws_dependencies.py (2)
40-48: Consider explicit validation whentypeclaim is missing.Defaulting
token_typeto"access"when the claim is absent (line 41) could allow malformed tokens to be treated as access tokens. An explicit requirement for thetypeclaim would be more secure.🔒 Proposed stricter validation
raw_sub = payload.get("sub") - token_type = payload.get("type", "access") + token_type = payload.get("type") if ( not raw_sub or not isinstance(raw_sub, str) - or token_type not in ("access", "guest") + or token_type not in {"access", "guest"} ): raise error_excThis ensures tokens without an explicit
typeclaim are rejected rather than defaulting to elevated access permissions.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_dependencies.py` around lines 40 - 48, The code currently defaults token_type = payload.get("type", "access"), which treats missing type claims as "access"; change validation to require an explicit "type" claim by fetching token_type = payload.get("type") and failing if token_type is None or not one of ("access", "guest") (alongside the existing checks for raw_sub and its type), i.e., update the conditional that raises error_exc to include token_type is None or token_type not in ("access","guest") so tokens without an explicit type are rejected; use the existing variables payload, raw_sub, token_type and error_exc to locate and modify the logic.
72-73: Consider reusing a singletonMeetingStateServiceinstance similar toget_kafka_manager().The current implementation creates a new
MeetingStateService()instance on every call toassert_room_participant(). Although_get_redis_client()already uses a singleton pattern (so no new Redis connections are created), the unnecessary instantiation of service objects can be avoided. Consider implementing a module-level singleton or using a dependency injection pattern, similar to howget_kafka_manager()is used elsewhere.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_dependencies.py` around lines 72 - 73, assert_room_participant currently instantiates MeetingStateService() on every call; change it to reuse a singleton similar to get_kafka_manager() by creating a module-level instance (e.g., _meeting_state_service = MeetingStateService()) and reference that in assert_room_participant instead of constructing a new one; ensure the singleton is initialized lazily or at import and that any helper functions that call _get_redis_client() continue to work with the reused MeetingStateService instance to avoid extra object allocations.app/services/tts_worker.py (1)
107-115: Log a warning when falling back to the default provider.If
ACTIVE_TTS_PROVIDERis misconfigured (e.g., typo like"openaii"), it silently falls back to OpenAI. A warning log would help catch configuration errors.📝 Proposed improvement
provider = settings.ACTIVE_TTS_PROVIDER.lower() if provider == "voiceai": return await get_voiceai_tts_service().synthesize( text, language=language, encoding=encoding ) - # Default: OpenAI + if provider != "openai": + logger.warning( + "Unknown TTS provider '%s', defaulting to OpenAI", + settings.ACTIVE_TTS_PROVIDER, + ) return await get_openai_tts_service().synthesize(text, encoding=encoding)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/tts_worker.py` around lines 107 - 115, The code silently falls back to OpenAI when settings.ACTIVE_TTS_PROVIDER is unrecognized; update the provider selection in tts_worker.py to log a warning before falling back: after computing provider = settings.ACTIVE_TTS_PROVIDER.lower(), detect when provider is neither "voiceai" nor "openai" and call the module logger (or process logger used elsewhere) to warn that the configured provider is invalid and that get_openai_tts_service().synthesize will be used as a fallback; keep existing branches unchanged and ensure the warning includes the raw settings.ACTIVE_TTS_PROVIDER value for debugging.app/external_services/openai_tts/service.py (1)
60-67: Reusehttpx.AsyncClientfor connection pooling.Creating a new
AsyncClientper request loses connection pooling benefits and adds overhead. Store the client as an instance attribute and reuse it across calls.♻️ Proposed refactor for connection reuse
class OpenAITTSService: """Stateless service for converting text to speech via OpenAI.""" def __init__(self, timeout: float = 15.0) -> None: self._timeout = timeout + self._client: httpx.AsyncClient | None = None + + async def _get_client(self) -> httpx.AsyncClient: + if self._client is None or self._client.is_closed: + self._client = httpx.AsyncClient(timeout=self._timeout) + return self._client async def synthesize( self, text: str, *, voice: str | None = None, encoding: str = "linear16", ) -> dict: # ... validation code ... start = time.monotonic() - async with httpx.AsyncClient(timeout=self._timeout) as client: - response = await client.post( + client = await self._get_client() + response = await client.post( settings.OPENAI_TTS_API_URL, headers=headers, json=payload, ) - response.raise_for_status() + response.raise_for_status()Note: You'll also need to add a cleanup method to close the client on shutdown.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/external_services/openai_tts/service.py` around lines 60 - 67, The code currently creates a new httpx.AsyncClient in the request block which prevents connection pooling; modify the service class to create and store a single AsyncClient instance as an attribute (e.g., self._client) during initialization (constructor) and replace the local async with httpx.AsyncClient(...) usage in the method that posts to settings.OPENAI_TTS_API_URL to reuse self._client.post(...), ensuring you still pass timeout via the client's config or client.request call; also add an async cleanup method (e.g., async def close(self): await self._client.aclose()) and invoke it on application shutdown to properly close connections.tests/test_kafka/test_pipeline.py (1)
117-117: Direct assignment to private attribute_statefor testing.Assigning directly to
worker._stateworks but couples the test to internal implementation details. Consider adding a constructor parameter or aset_statemethod for testability if refactoring later.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_kafka/test_pipeline.py` at line 117, The test directly assigns to the private attribute worker._state which couples tests to internals; modify the Worker class (e.g., its __init__ or add a public set_state(self, state) method) to accept an initial or injectable state and update the test to use the new constructor parameter or set_state API instead of assigning worker._state directly so tests rely on the public API (refer to the Worker class and the test line that currently uses worker._state).app/external_services/voiceai/service.py (1)
80-87: Consider reusinghttpx.AsyncClientinstead of creating one per request.Creating a new
AsyncClientfor eachsynthesize()call incurs connection setup overhead. Since the service is used as a singleton, consider initializing the client once and reusing it, or using a connection pool.Proposed refactor
class VoiceAITTSService: """Stateless service for converting text to speech via Voice.ai.""" def __init__(self, timeout: float = 60.0) -> None: self._timeout = timeout + self._client = httpx.AsyncClient(timeout=self._timeout) + + async def close(self) -> None: + await self._client.aclose() async def synthesize( ... - async with httpx.AsyncClient(timeout=self._timeout) as client: - response = await client.post( + response = await self._client.post( settings.VOICEAI_TTS_API_URL, headers=headers, json=payload, ) - response.raise_for_status() + response.raise_for_status()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/external_services/voiceai/service.py` around lines 80 - 87, The current synthesize() creates a new httpx.AsyncClient per request which wastes connections; refactor the VoiceAI service to create a single AsyncClient instance on initialization (e.g., in the VoiceAIService __init__ or a class-level attribute) and reuse it in synthesize() instead of using "async with httpx.AsyncClient(...)"; ensure the client is configured with the same timeout/headers, remove per-call context manager, and add a shutdown/close method (or integrate with app lifecycle) to call client.aclose() so connections are properly cleaned up.tests/meeting/test_meeting_router.py (2)
264-265: Direct attribute patching bypasses dependency injection.Directly assigning to
app_main_module.get_kafka_managerworks but doesn't use FastAPI'sdependency_overridespattern used elsewhere in this fixture. Consider consistency with other overrides ifget_kafka_manageris a FastAPI dependency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/meeting/test_meeting_router.py` around lines 264 - 265, The test currently assigns directly to app_main_module.get_kafka_manager (mock_kafka = AsyncMock(); app_main_module.get_kafka_manager = lambda: mock_kafka), bypassing FastAPI's dependency injection; change the fixture to register the mock via the FastAPI app's dependency_overrides (e.g., set app.dependency_overrides[get_kafka_manager] = lambda: mock_kafka) and ensure the override is cleared after the test so the AsyncMock named mock_kafka is used through the same dependency injection mechanism as other fixtures.
30-30: Moduleapp.mainis imported twice with different styles.The module is imported via
from app.main import appat line 30 and then again viaimport app.main as app_main_moduleat line 262. Consider using a consistent import style.Proposed fix
-from app.main import app +import app.main as app_main_module ... +app = app_main_module.app ... - import app.main as app_main_module🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/meeting/test_meeting_router.py` at line 30, The test imports the same module in two styles (from app.main import app and import app.main as app_main_module), causing inconsistency; pick one import style and consolidate all references accordingly (either use the symbol app from "from app.main import app" everywhere or use the module alias app_main_module and access app_main_module.app everywhere), update any uses at both the top (the current import at line 30) and later references (where app_main_module is used) to the chosen form, and remove the redundant import so the file only imports app once.app/meeting/ws_router.py (1)
145-152: Partition assignment may remain empty after 1-second sleep.If no partitions are assigned after the initial check, sleeping 1 second and checking again is fragile. Consider using a proper wait mechanism or handling the case where partitions remain empty.
Proposed improvement
# Force partition assignment by seeking to end partitions = consumer.assignment() - if not partitions: - # Wait briefly for automatic assignment - await asyncio.sleep(1) - partitions = consumer.assignment() + max_retries = 5 + for _ in range(max_retries): + if partitions: + break + await asyncio.sleep(0.5) + partitions = consumer.assignment() + if not partitions: + logger.warning("No partitions assigned for egress consumer") for tp in partitions: await consumer.seek_to_end(tp)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_router.py` around lines 145 - 152, The current logic that checks consumer.assignment(), sleeps 1 second, and rechecks is fragile because partitions can still be empty; replace this with a robust wait-and-retry that polls consumer.assignment() in a short loop with a total timeout (e.g., retry every 0.1s up to N seconds) or use the consumer's assignment callback to detect when partitions are assigned; if after the timeout partitions is still empty, handle the error path (log and exit or continue gracefully) instead of proceeding to seek_to_end. Specifically update the block using partitions = consumer.assignment(), the await asyncio.sleep(1) retry, and the subsequent for tp in partitions: await consumer.seek_to_end(tp) to implement the loop-with-timeout or on_assign approach and add a clear fallback when partitions remain empty.app/external_services/deepl/service.py (1)
83-90: Consider reusinghttpx.AsyncClientfor both services.Same recommendation as VoiceAI service — creating a new client per request adds overhead. This applies to both
DeepLTranslationServiceandOpenAITranslationFallback.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/external_services/deepl/service.py` around lines 83 - 90, The DeepLTranslationService creates a new httpx.AsyncClient per request which adds overhead; instead instantiate and reuse a single AsyncClient on the service (e.g., add self._client = httpx.AsyncClient(timeout=self._timeout) in the service constructor/async initializer) and replace the per-request "async with httpx.AsyncClient(...)" usage (where response = await client.post(...)) with calls to "await self._client.post(...)" (and do the same change in OpenAITranslationFallback), and add a proper shutdown/cleanup method (e.g., async def aclose or close()) that calls self._client.aclose() to release resources.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1d1c49ad-e5d6-4a3c-a14c-7a4900c53483
📒 Files selected for processing (54)
alembic/versions/4e7d4d5e7661_add_user_role_column.pyapp/auth/constants.pyapp/auth/models.pyapp/auth/schemas.pyapp/auth/token_store.pyapp/auth/utils.pyapp/core/config.pyapp/core/init_admin.pyapp/external_services/deepgram/__init__.pyapp/external_services/deepgram/config.pyapp/external_services/deepgram/service.pyapp/external_services/deepl/__init__.pyapp/external_services/deepl/config.pyapp/external_services/deepl/service.pyapp/external_services/openai_tts/__init__.pyapp/external_services/openai_tts/config.pyapp/external_services/openai_tts/service.pyapp/external_services/voiceai/__init__.pyapp/external_services/voiceai/config.pyapp/external_services/voiceai/service.pyapp/kafka/manager.pyapp/kafka/topics.pyapp/main.pyapp/meeting/constants.pyapp/meeting/repository.pyapp/meeting/router.pyapp/meeting/schemas.pyapp/meeting/service.pyapp/meeting/state.pyapp/meeting/ws_dependencies.pyapp/meeting/ws_router.pyapp/routers/api.pyapp/schemas/__init__.pyapp/schemas/pipeline.pyapp/services/audio_bridge.pyapp/services/connection_manager.pyapp/services/stt_worker.pyapp/services/translation_worker.pyapp/services/tts_worker.pyapp/user/schemas.pydocs/testing.mdpyproject.tomlrequirements.txtscripts/introduction.rawscripts/output.rawscripts/test_audio_client.pyscripts/voiceai_output.rawtests/meeting/test_meeting_router.pytests/meeting/test_meeting_service.pytests/meeting/test_ws.pytests/meeting/test_ws_router.pytests/test_auth/test_auth_refresh.pytests/test_auth/test_schemas_user.pytests/test_kafka/test_pipeline.py
| user_role: Mapped[str] = mapped_column( | ||
| String(50), | ||
| default=UserRole.USER.value, | ||
| server_default=UserRole.USER.value, | ||
| index=True, | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether role constraints already exist in model + migration.
set -euo pipefail
echo "== Model definition =="
rg -n -C3 "user_role|CheckConstraint|Enum\\(" app/auth/models.py
echo
echo "== Migration(s) touching user_role =="
fd -i "add_user_role_column" alembic/versions || true
rg -n -C3 "user_role|CHECK|Enum|create_check_constraint|sa\\.Enum" alembic/versionsRepository: Brints/FluentMeet
Length of output: 2185
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== UserRole enum definition =="
fd "\.py$" app/auth | xargs rg -l "UserRole" | head -5
echo
echo "== Searching for UserRole enum =="
rg -n "class UserRole" -A 10 app/
echo
echo "== Checking for validators in User model =="
rg -n "@validator|@field_validator|def __init__|def validate" app/auth/models.py -A 3
echo
echo "== Full User model to check for constraints =="
head -60 app/auth/models.pyRepository: Brints/FluentMeet
Length of output: 683
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Searching for schema validation (Pydantic/Marshmallow) =="
find app/auth -name "*.py" -exec grep -l "schema\|Schema\|Pydantic\|BaseModel" {} \;
echo
echo "== Checking app/auth directory structure =="
ls -la app/auth/
echo
echo "== Searching for user_role in routes/services =="
rg -n "user_role" app/ --type py -B 2 -A 2 | head -50
echo
echo "== Check if User model has __table_args__ =="
rg -n "__table_args__" app/auth/models.py -A 5Repository: Brints/FluentMeet
Length of output: 2485
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Full User model =="
cat app/auth/models.py
echo
echo "== Service layer user_role operations =="
rg -n "user_role\s*=" app/auth/service.py -B 3 -A 3Repository: Brints/FluentMeet
Length of output: 2982
Constrain user_role values at the database layer.
user_role is currently a free-form String(50) with no database constraints. As an authorization-critical field, this should be constrained (CHECK constraint or enum type) to prevent invalid persisted roles. Currently, the field:
- Uses plain
String(50)in SQLAlchemy (not Enum type) - Has no
CheckConstraintor database-level validation in the migration - Has no validation in Pydantic schemas (
user_role: strwithout constraints) - Has no model-level validation (no setter, property, or validator)
This allows arbitrary strings to be written to the database, bypassing the UserRole enum definition that only exists in application code.
🔒 Suggested direction (DB constraint)
-from sqlalchemy import Boolean, DateTime, ForeignKey, String
+from sqlalchemy import Boolean, CheckConstraint, DateTime, ForeignKey, String
@@
class User(Base):
__tablename__ = "users"
+ __table_args__ = (
+ CheckConstraint(
+ "user_role IN ('user', 'admin')",
+ name="ck_users_user_role",
+ ),
+ )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/auth/models.py` around lines 46 - 51, The user_role column currently uses
a free-form String(50) (user_role in the User model) and lacks DB-level
constraints and schema/model validation; change it to a constrained type by
switching to a SQLAlchemy Enum(UserRole) or adding a CheckConstraint that only
allows values from UserRole, update the Alembic migration to ALTER the column to
the enum type or add the CHECK clause and include a downgrade, update Pydantic
schemas to type user_role: UserRole (or a constrained str/Enum) and add a
model-level validator (e.g., `@validates`('user_role') or a property setter) in
the User model to enforce and coerce values before persistence so application
and DB constraints stay in sync.
| try: | ||
| with SessionLocal() as db_session: | ||
| init_admin(db_session) | ||
| except Exception as exc: | ||
| logger.warning("Admin initialization failed: %s", sanitize_for_log(exc)) |
There was a problem hiding this comment.
Don’t silently continue when admin bootstrap fails in non-test environments.
Continuing startup after admin-init failure can leave deployments without required admin access and hide real boot issues. Prefer fail-fast outside explicitly allowed environments (e.g., CI/test), and log with traceback.
✅ Suggested policy change
# Initialize Admin
try:
with SessionLocal() as db_session:
init_admin(db_session)
except Exception as exc:
- logger.warning("Admin initialization failed: %s", sanitize_for_log(exc))
+ logger.exception("Admin initialization failed: %s", sanitize_for_log(exc))
+ # In CI/test you may choose to continue; otherwise fail fast.
+ raise📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| with SessionLocal() as db_session: | |
| init_admin(db_session) | |
| except Exception as exc: | |
| logger.warning("Admin initialization failed: %s", sanitize_for_log(exc)) | |
| try: | |
| with SessionLocal() as db_session: | |
| init_admin(db_session) | |
| except Exception as exc: | |
| logger.exception("Admin initialization failed: %s", sanitize_for_log(exc)) | |
| # In CI/test you may choose to continue; otherwise fail fast. | |
| raise |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/main.py` around lines 35 - 39, The current try/except around
SessionLocal()/init_admin swallows failures; change it to fail-fast outside
test/CI by detecting environment (e.g., use an existing env var check or add
is_test_or_ci()) and if not test/CI re-raise the exception or call sys.exit(1)
after logging the full traceback; always log the error with traceback (use
logger.exception or include traceback.format_exc()) and reference SessionLocal,
init_admin, logger, and sanitize_for_log when updating the error handling so
non-test deployments do not continue silently after admin bootstrap failure.
| case( | ||
| ( | ||
| Room.ended_at.isnot(None), | ||
| func.round( | ||
| ( | ||
| func.julianday(Room.ended_at) | ||
| - func.julianday(Room.created_at) | ||
| ) | ||
| * 1440 | ||
| ), | ||
| ), | ||
| else_=None, | ||
| ).label("duration_minutes"), |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify the database driver and check for other julianday usages
echo "=== Checking database configuration ==="
rg -n "psycopg|postgres|sqlite" --type py -C2
echo ""
echo "=== Checking for julianday usage across codebase ==="
rg -n "julianday" --type pyRepository: Brints/FluentMeet
Length of output: 5277
func.julianday() is SQLite-specific and will fail on PostgreSQL.
The julianday() function does not exist in PostgreSQL. The project uses PostgreSQL in production (configured in app/core/config.py with POSTGRES_SERVER, POSTGRES_USER, POSTGRES_PASSWORD, and POSTGRES_DB), so this query will raise a runtime error on the production database.
Proposed fix for PostgreSQL compatibility
case(
(
Room.ended_at.isnot(None),
func.round(
- (
- func.julianday(Room.ended_at)
- - func.julianday(Room.created_at)
- )
- * 1440
+ func.extract(
+ "epoch", Room.ended_at - Room.created_at
+ ) / 60
),
),
else_=None,
).label("duration_minutes"),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case( | |
| ( | |
| Room.ended_at.isnot(None), | |
| func.round( | |
| ( | |
| func.julianday(Room.ended_at) | |
| - func.julianday(Room.created_at) | |
| ) | |
| * 1440 | |
| ), | |
| ), | |
| else_=None, | |
| ).label("duration_minutes"), | |
| case( | |
| ( | |
| Room.ended_at.isnot(None), | |
| func.round( | |
| func.extract( | |
| "epoch", Room.ended_at - Room.created_at | |
| ) / 60 | |
| ), | |
| ), | |
| else_=None, | |
| ).label("duration_minutes"), |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/repository.py` around lines 96 - 108, The current use of
func.julianday(...) to compute duration_minutes is SQLite-specific and will fail
on PostgreSQL; replace that expression with a database-agnostic epoch-based
calculation using SQLAlchemy's func.extract('epoch', Room.ended_at -
Room.created_at) divided by 60 and wrapped with func.round to get minutes,
keeping the same CASE when Room.ended_at.isnot(None) and the
.label("duration_minutes") so the rest of the query remains unchanged.
| print(f"Egress: about to send {len(audio_bytes)} bytes to client") | ||
|
|
||
| # Also save to disk for testing/validation | ||
| output_path = Path(rf"{settings.SYSTEM_PATH}\voiceai_output.raw") |
There was a problem hiding this comment.
Hardcoded Windows-style path will fail on non-Windows systems.
Using rf"{settings.SYSTEM_PATH}\voiceai_output.raw" with backslash is Windows-specific and will create incorrect paths on Linux/Mac. Use Path operations for cross-platform compatibility.
Additionally, writing to disk appears to be debug/testing code that should not be in production.
Proposed fix
- # Also save to disk for testing/validation
- output_path = Path(rf"{settings.SYSTEM_PATH}\voiceai_output.raw")
- mode = "ab" if payload.sequence_number > 0 else "wb"
-
- def _write_audio(
- _path: Path = output_path,
- _mode: str = mode,
- _data: bytes = audio_bytes,
- ) -> None:
- with _path.open(_mode) as f:
- f.write(_data)
-
- await asyncio.to_thread(_write_audio)
- print(
- f"Egress: SAVED {len(audio_bytes)} bytes to {output_path} "
- f"(seq={payload.sequence_number})"
- )
+ # Debug: Optionally save to disk for testing/validation
+ if settings.DEBUG_SAVE_AUDIO:
+ output_path = Path(settings.SYSTEM_PATH) / "voiceai_output.raw"
+ mode = "ab" if payload.sequence_number > 0 else "wb"
+ await asyncio.to_thread(
+ lambda: output_path.open(mode).write(audio_bytes)
+ )
+ logger.debug("Saved %d bytes to %s", len(audio_bytes), output_path)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/ws_router.py` at line 229, The code uses a hardcoded
Windows-style path when constructing output_path (output_path =
Path(rf"{settings.SYSTEM_PATH}\voiceai_output.raw")) which will break on
non-Windows systems and is also debug/test disk-writing that shouldn't run in
production; change to build the path with pathlib (e.g.,
Path(settings.SYSTEM_PATH) / "voiceai_output.raw") and ensure you call
.resolve() if needed, and remove or gate the actual disk write behind a
debug/feature flag (e.g., settings.DEBUG or a dedicated VOICEAI_DUMP flag) so
ws_router.py only writes the file in test/dev mode.
| else: | ||
| stt_service = get_deepgram_stt_service() | ||
| result = await stt_service.transcribe( | ||
| audio_bytes, | ||
| language=payload.source_language, | ||
| sample_rate=payload.sample_rate, | ||
| encoding=payload.encoding.value, | ||
| ) |
There was a problem hiding this comment.
Add error handling for the Deepgram STT call.
The external stt_service.transcribe() call can fail due to network issues, rate limits, or service outages. Per the linked issue #8 acceptance criteria requiring "retries/backoff and logging for transient errors," this call should be wrapped with error handling to prevent worker crashes and enable graceful degradation.
🛡️ Suggested error handling
else:
stt_service = get_deepgram_stt_service()
- result = await stt_service.transcribe(
- audio_bytes,
- language=payload.source_language,
- sample_rate=payload.sample_rate,
- encoding=payload.encoding.value,
- )
+ try:
+ result = await stt_service.transcribe(
+ audio_bytes,
+ language=payload.source_language,
+ sample_rate=payload.sample_rate,
+ encoding=payload.encoding.value,
+ )
+ except Exception as exc:
+ logger.error(
+ "STT failed for seq=%d user=%s: %s",
+ payload.sequence_number,
+ payload.user_id,
+ exc,
+ )
+ return # or implement retry logic🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/stt_worker.py` around lines 64 - 71, Wrap the call to
stt_service.transcribe (obtained from get_deepgram_stt_service) in a try/except
with a retry loop implementing exponential backoff (e.g., 3 attempts with
backoff delays) and log each attempt and error using the module's logger (e.g.,
processLogger or the existing logger). Catch transient/network exceptions (and a
general Exception fallback), retry on transient errors, and on final failure
return a graceful result (e.g., None or an empty transcription) or raise a
controlled/transient-specific exception so the worker does not crash; ensure the
surrounding code that expects result handles this graceful outcome. Include
clear log messages indicating attempt number, error details, and final failure.
| # 1. Call the configured TTS provider | ||
| encoding = settings.PIPELINE_AUDIO_ENCODING | ||
| audio_result = await self._synthesize( | ||
| text=text, | ||
| language=payload.target_language, | ||
| encoding=encoding, | ||
| ) |
There was a problem hiding this comment.
Add error handling for TTS provider failures.
If _synthesize() raises an exception (HTTP error, timeout), the message processing fails without retry or DLQ routing. The PR objectives mention "retries/backoff and logging for transient errors."
🛡️ Proposed error handling
+ try:
audio_result = await self._synthesize(
text=text,
language=payload.target_language,
encoding=encoding,
)
+ except Exception as e:
+ logger.error(
+ "TTS synthesis failed for seq=%d room=%s: %s",
+ payload.sequence_number,
+ payload.room_id,
+ e,
+ )
+ # Re-raise to let BaseConsumer handle DLQ routing
+ raiseConsider whether transient failures (rate limits, timeouts) should trigger retries before DLQ routing.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # 1. Call the configured TTS provider | |
| encoding = settings.PIPELINE_AUDIO_ENCODING | |
| audio_result = await self._synthesize( | |
| text=text, | |
| language=payload.target_language, | |
| encoding=encoding, | |
| ) | |
| # 1. Call the configured TTS provider | |
| encoding = settings.PIPELINE_AUDIO_ENCODING | |
| try: | |
| audio_result = await self._synthesize( | |
| text=text, | |
| language=payload.target_language, | |
| encoding=encoding, | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| "TTS synthesis failed for seq=%d room=%s: %s", | |
| payload.sequence_number, | |
| payload.room_id, | |
| e, | |
| ) | |
| # Re-raise to let BaseConsumer handle DLQ routing | |
| raise |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/tts_worker.py` around lines 61 - 67, Wrap the call to
self._synthesize(...) in a try/except that distinguishes transient from fatal
errors: catch network/HTTP/timeouts and implement a retry loop with exponential
backoff (configurable attempts and backoff) before failing, and log each attempt
and final failure using the same logger used in this module; on permanent
failures or after all retries, mark the message for DLQ routing or re-raise a
specific exception so the caller can route to DLQ; ensure you reference the
existing symbols _synthesize, encoding (settings.PIPELINE_AUDIO_ENCODING),
audio_result and integrate with the module's retry/metrics/logging conventions.
|
|
||
| # Same as Black. | ||
| line-length = 88 | ||
| line-length = 100 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify which formatters/sorters are actually used in CI/hooks and whether line-length configs are aligned.
echo "== pyproject formatter config =="
rg -n '^\[tool\.ruff\]|^line-length|^\[tool\.isort\]|^profile|^line_length' pyproject.toml -n -A2 -B1
echo
echo "== CI/workflow references to ruff/isort/black =="
rg -n --hidden -S 'ruff format|ruff check|isort|black' .github/workflows .pre-commit-config.yaml pyproject.toml 2>/dev/null || trueRepository: Brints/FluentMeet
Length of output: 850
Align Black, Ruff, and isort to consistent line length.
Line 38 sets Ruff to 100 chars, but CI uses Black (88 chars) and isort with profile = "black" (88 chars). This creates formatting drift: Black will format at 88 chars while Ruff expects 100, causing conflicting expectations in CI and editor tooling. Change Ruff to 88 to align with Black's default, or change Black to 100. Recommend 88 for consistency with ecosystem defaults.
The comment at line 37 ("Same as Black") is inaccurate; Black defaults to 88, not 100.
Suggested config alignment (using 88 chars)
[tool.ruff]
-line-length = 100
+line-length = 88Also ensure isort line_length matches:
[tool.isort]
profile = "black"
skip = ["alembic"]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| line-length = 100 | |
| line-length = 88 |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pyproject.toml` at line 38, Update the Ruff configuration's line-length from
100 to 88 (the current setting shown as line-length = 100) so it matches Black's
default and avoid formatting drift; also correct the adjacent comment that says
"Same as Black" to reflect Black's actual default of 88, and ensure isort's
line_length (or its profile = "black" usage) is consistent with 88 so all three
tools (Black, Ruff, isort) use the same line length.
| pytokens==0.4.1 | ||
| PyYAML==6.0.3 | ||
| redis==7.3.0 | ||
| redis==7.4.0 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check PyPI for redis package versions
echo "=== Checking redis version availability on PyPI ==="
curl -s https://pypi.org/pypi/redis/json | jq '.releases | keys | .[-10:]'
echo ""
echo "=== Checking for security advisories ==="
gh api graphql -f query='
{
securityVulnerabilities(first: 5, ecosystem: PIP, package: "redis") {
nodes {
advisory {
summary
severity
publishedAt
}
vulnerableVersionRange
firstPatchedVersion {
identifier
}
}
}
}'Repository: Brints/FluentMeet
Length of output: 434
🌐 Web query:
redis 7.4.0 security vulnerabilities CVE
💡 Result:
Redis 7.4.0 is affected by these published security issues (CVEs):
- CVE-2024-31449 – Lua bit library stack overflow; may lead to RCE. Fixed in Redis 7.4.1. [1][2]
- CVE-2024-31228 – DoS via unbounded pattern matching (e.g.,
KEYS,SCAN, etc.). Fixed in Redis 7.4.1. [1] - CVE-2024-31227 – DoS via malformed ACL selectors triggering server panic. Fixed in Redis 7.4.1. [1]
- CVE-2025-49844 – Lua use-after-free; may lead to RCE. Fixed in Redis 7.4.6 (OSS/CE). [3]
Action: If you’re on 7.4.0, upgrade to ≥ 7.4.6 to cover all of the above. [3]
Sources
[1] Redis security advisory (CVE-2024-31449 / -31227 / -31228): https://redis.io/blog/security-advisory-cve-2024-31449-cve-2024-31227-cve-2024-31228/
[2] GitHub advisory GHSA-whxg-wx83-85p5 (CVE-2024-31449): GHSA-whxg-wx83-85p5
[3] Redis security advisory (CVE-2025-49844): https://redis.io/blog/security-advisory-cve-2025-49844/
Upgrade redis to 7.4.6 or later to address critical security vulnerabilities.
The pinned version 7.4.0 contains four published CVEs, including two remote code execution vulnerabilities (CVE-2024-31449, CVE-2025-49844) and two denial-of-service issues (CVE-2024-31228, CVE-2024-31227). These are fixed in redis 7.4.1–7.4.6. Update requirements.txt to use redis==7.4.6 or a more recent stable version.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@requirements.txt` at line 86, The pinned dependency redis==7.4.0 in
requirements.txt is vulnerable; update the requirement to redis==7.4.6 (or a
newer stable 7.4.x+) to include CVE fixes. Edit the line containing
"redis==7.4.0" in requirements.txt and replace it with "redis==7.4.6" (or a
later safe version), then run your dependency install/lock step and run tests/CI
to validate compatibility.
Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 8
♻️ Duplicate comments (1)
app/auth/models.py (1)
37-43:⚠️ Potential issue | 🟠 MajorConstrain
user_roleat the DB boundary (still free-form).Line 38 currently persists
user_roleasString(50)/str, which still allows arbitrary role values for an authorization-critical field. This can desync role checks (e.g., admin gating) from intended enum semantics.🔒 Suggested direction
-from sqlalchemy import Boolean, DateTime, ForeignKey, String +from sqlalchemy import Boolean, DateTime, Enum, ForeignKey, String @@ - user_role: Mapped[str] = mapped_column( - String(50), - default=UserRole.USER.value, - server_default=UserRole.USER.value, - index=True, - ) + user_role: Mapped[UserRole] = mapped_column( + Enum( + UserRole, + name="user_role_enum", + native_enum=False, + validate_strings=True, + ), + default=UserRole.USER, + server_default=UserRole.USER.value, + nullable=False, + index=True, + )Use this read-only check to confirm model + migration + schema alignment:
#!/bin/bash set -euo pipefail echo "== Model role definition ==" rg -n -C3 "user_role|Enum\\(|CheckConstraint" app/auth/models.py echo echo "== Migration(s) touching user_role ==" fd -i "add_user_role_column" alembic/versions || true rg -n -C3 "user_role|CHECK|Enum|create_check_constraint|sa\\.Enum" alembic/versions echo echo "== API schema typing for role ==" rg -n -C2 "user_role\\s*:\\s*(str|UserRole)" app/auth/schemas.py app/user/schemas.pyExpected result: DB-level enum/check constraint exists, migration enforces it, and schema types use
UserRole(or equivalent constrained type).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/auth/models.py` around lines 37 - 43, The user_role column is persisted as a plain String(50) which allows arbitrary values; update the model to enforce enum constraints at the DB boundary by changing user_role to use the UserRole enum (e.g., mapped_column(sa.Enum(UserRole), server_default=UserRole.USER.value) or add a CheckConstraint tied to UserRole) and then add a corresponding Alembic migration that creates the ENUM or CHECK constraint for user_role; also ensure request/response schemas reference UserRole (not plain str) so the model, migration, and API schema (user_role, UserRole, mapped_column) stay aligned.
🧹 Nitpick comments (4)
app/services/connection_manager.py (1)
135-136: Add explanatory comment for empty except clause.CodeQL flagged the empty
except asyncio.CancelledError: passclause. While this is correct behavior (task cancellation is the expected shutdown signal), adding a brief comment improves maintainability.♻️ Proposed fix
- except asyncio.CancelledError: - pass + except asyncio.CancelledError: + pass # Expected during graceful shutdown when room becomes empty🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/connection_manager.py` around lines 135 - 136, The empty except asyncio.CancelledError: pass is intentional to allow task cancellation to be treated as normal shutdown; add a brief explanatory comment above that except block (referencing the asyncio.CancelledError handler) explaining that cancellations are expected during shutdown and should be ignored rather than logged or re-raised, so future readers and static analyzers understand this is deliberate (keep the pass but add the comment in connection_manager.py next to the except clause).app/kafka/manager.py (1)
76-81: Consider making partition count and replication factor configurable.Hardcoded
num_partitions=1andreplication_factor=1are suitable for development but may be insufficient for production:
- Single partition limits parallel consumption throughput, especially for high-volume topics like
audio.raw- Replication factor of 1 means no fault tolerance (data loss if broker fails)
Consider extracting these to configuration settings or environment variables, aligning with the "partition sizing" open consideration documented in issue
#8.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/kafka/manager.py` around lines 76 - 81, The topic creation currently hardcodes num_partitions=1 and replication_factor=1 when building NewTopic objects for items in TOPICS_TO_CREATE (and their dlq.<topic> variants); change this to read partition and replication defaults from configuration or environment (e.g., KAFKA_NUM_PARTITIONS, KAFKA_REPLICATION_FACTOR or equivalent settings) and use those variables when constructing NewTopic so production values can be increased while keeping sensible development defaults; ensure the settings are documented and applied to both the topic and dlq.NewTopic creation logic.app/meeting/ws_router.py (1)
268-269: Add explanatory comments for empty except clauses.CodeQL flagged these empty
exceptclauses. While they handle expectedWebSocketDisconnector task cancellation scenarios, adding comments clarifies the intent.♻️ Proposed fix
- except Exception: - pass + except Exception: + pass # Silently handle task cancellation during shutdown - except WebSocketDisconnect: - pass + except WebSocketDisconnect: + pass # Expected when client disconnects; consumer cleanup in finally blockAlso applies to: 328-329
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_router.py` around lines 268 - 269, The empty "except Exception: pass" blocks shown in the diff should be clarified and tightened: replace the bare silent except with a brief explanatory comment stating why exceptions are intentionally ignored (e.g., expected WebSocketDisconnect or asyncio.CancelledError during shutdown) and, where possible, catch the specific exceptions instead of Exception (for example WebSocketDisconnect and asyncio.CancelledError) or at minimum log/debug the unexpected exception; locate the occurrences of the literal "except Exception: pass" in ws_router.py and add the comments and targeted exception handling around those blocks.tests/test_auth/test_auth_refresh.py (1)
174-178: Preserve and restore prior limiter state in fixture teardown.Line 177 unconditionally sets
limiter.enabled = True; this can leak state when other tests intentionally changed it.Proposed fixture hardening
- limiter.enabled = False - with TestClient(app) as test_client: - yield test_client - limiter.enabled = True - app.dependency_overrides.clear() + previous_limiter_state = limiter.enabled + limiter.enabled = False + try: + with TestClient(app) as test_client: + yield test_client + finally: + limiter.enabled = previous_limiter_state + app.dependency_overrides.clear()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_auth/test_auth_refresh.py` around lines 174 - 178, The teardown unconditionally sets limiter.enabled = True which can leak state; modify the fixture to capture the prior value (e.g., prev = limiter.enabled) before changing it, and in the finally/teardown restore limiter.enabled = prev instead of hard-coding True; reference the limiter.enabled usage in the fixture where TestClient(app) is yielded so the original state is preserved and restored after the yield.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/auth/models.py`:
- Around line 18-28: Reformat this model file with Black to satisfy CI: run the
Black formatter (e.g., black app/auth/models.py) or apply its suggested changes
so the declarations for id, email, hashed_password, full_name, is_active,
is_verified, created_at, updated_at, and deleted_at (the mapped_column lines and
their arguments) conform to Black's line-wrapping and spacing rules; ensure
function/variable names remain unchanged (e.g., mapped_column,
DateTime(timezone=True), default=utc_now, onupdate=utc_now) and only formatting
is altered.
In `@app/external_services/deepl/service.py`:
- Around line 169-175: The OpenAI response parsing assumes a strict shape and
can raise KeyError/IndexError; update the parsing around response, data,
translated and elapsed_ms to defensively handle missing keys/empty lists by:
parse response.json() into data, verify data is a dict, ensure "choices" exists
and is a non-empty list, and that choices[0] contains "message" with "content"
before accessing; on failure, log or include the raw response/data in the error
path and return a safe fallback (e.g., translated_text as an empty string or a
descriptive error message) while still returning latency_ms, and wrap the
extraction in try/except to avoid uncaught exceptions.
In `@app/meeting/ws_router.py`:
- Line 92: Replace all print() debug statements in app/meeting/ws_router.py
(e.g., the print("Audio WS client connected: %s", user_id) call that references
user_id and the other prints around lines 158-162, 181-187, 191, 200-204, 218,
233-236, 240, 242, 245-248) with a module logger: create or use logger =
logging.getLogger(__name__) and call appropriate log levels (e.g., logger.info
for normal connection messages like "Audio WS client connected", logger.debug
for verbose internals, logger.error for exceptions), and use logger formatting
(either logger.info("Audio WS client connected: %s", user_id) or f-strings
passed as a single argument) instead of print so messages are consistent and
structured across functions that reference user_id and other local vars.
- Line 198: The code currently instantiates a new MeetingStateService for every
Kafka message when calling MeetingStateService().get_participants(room_code);
instead, create a single MeetingStateService instance outside the message
processing loop (e.g., service = MeetingStateService()) and reuse
service.get_participants(room_code) inside the loop to avoid repeated
construction; ensure the call remains awaited and that any stateful cleanup (if
MeetingStateService has close/dispose methods) is handled appropriately outside
the loop.
In `@tests/meeting/test_meeting_service.py`:
- Line 387: Several test calls to _make_participant are written as long
single-line calls and violate Black formatting; update each offending call
(including the instance using _make_participant(room_id=room.id,
user_id=joiner.id, display_name="Joiner") and the similar calls flagged at the
other locations) to wrap arguments across multiple lines so Black accepts them
(place each keyword argument on its own line, keep the opening paren on the same
line as the function name, and close the paren on its own line). Ensure all
similar long calls in this test module follow the same multiline argument style
so black --check passes.
In `@tests/test_auth/test_auth_refresh.py`:
- Line 1: The file's formatting fails Black; run the Black formatter on the file
containing the module docstring """Integration tests for ``POST
/api/v1/auth/refresh-token``.""" to reformat the module and tests (fix
whitespace, line lengths, and quoting) and commit the resulting changes so
`black --check` passes; ensure you only apply Black formatting changes and
include them in the same PR.
- Around line 61-76: FakeRedis.scan currently ignores cursor and count and
always returns cursor=0; update FakeRedis.scan to implement proper Redis-like
pagination: when count is None return all matches as before, otherwise build the
list of matching keys (use fnmatch on self._store.keys()), sort or
stable-iterate them, start from the provided cursor index, return up to count
keys and the next cursor (index after the returned slice), and wrap/return 0
when all keys have been consumed; ensure the method signature (scan) still
accepts cursor, match, count and that TokenStoreService.revoke_all_user_tokens
will receive advancing cursors until 0.
In `@tests/test_auth/test_email_verification.py`:
- Line 114: The failing Black check is due to the unformatted client.get call
that assigns to response; reformat that expression so it complies with Black
(either run Black over the file or manually wrap the call arguments across
lines), e.g., break the long URL string into a single argument on the next line
or pass the query params as a dict to client.get, ensuring the statement is
PEP8/Black-compliant and preserves the call to
client.get("/api/v1/auth/verify-email?token=8f14e45f-ceea-4f6a-9fef-3d4d3e0d1be1")
and the response variable assignment.
---
Duplicate comments:
In `@app/auth/models.py`:
- Around line 37-43: The user_role column is persisted as a plain String(50)
which allows arbitrary values; update the model to enforce enum constraints at
the DB boundary by changing user_role to use the UserRole enum (e.g.,
mapped_column(sa.Enum(UserRole), server_default=UserRole.USER.value) or add a
CheckConstraint tied to UserRole) and then add a corresponding Alembic migration
that creates the ENUM or CHECK constraint for user_role; also ensure
request/response schemas reference UserRole (not plain str) so the model,
migration, and API schema (user_role, UserRole, mapped_column) stay aligned.
---
Nitpick comments:
In `@app/kafka/manager.py`:
- Around line 76-81: The topic creation currently hardcodes num_partitions=1 and
replication_factor=1 when building NewTopic objects for items in
TOPICS_TO_CREATE (and their dlq.<topic> variants); change this to read partition
and replication defaults from configuration or environment (e.g.,
KAFKA_NUM_PARTITIONS, KAFKA_REPLICATION_FACTOR or equivalent settings) and use
those variables when constructing NewTopic so production values can be increased
while keeping sensible development defaults; ensure the settings are documented
and applied to both the topic and dlq.NewTopic creation logic.
In `@app/meeting/ws_router.py`:
- Around line 268-269: The empty "except Exception: pass" blocks shown in the
diff should be clarified and tightened: replace the bare silent except with a
brief explanatory comment stating why exceptions are intentionally ignored
(e.g., expected WebSocketDisconnect or asyncio.CancelledError during shutdown)
and, where possible, catch the specific exceptions instead of Exception (for
example WebSocketDisconnect and asyncio.CancelledError) or at minimum log/debug
the unexpected exception; locate the occurrences of the literal "except
Exception: pass" in ws_router.py and add the comments and targeted exception
handling around those blocks.
In `@app/services/connection_manager.py`:
- Around line 135-136: The empty except asyncio.CancelledError: pass is
intentional to allow task cancellation to be treated as normal shutdown; add a
brief explanatory comment above that except block (referencing the
asyncio.CancelledError handler) explaining that cancellations are expected
during shutdown and should be ignored rather than logged or re-raised, so future
readers and static analyzers understand this is deliberate (keep the pass but
add the comment in connection_manager.py next to the except clause).
In `@tests/test_auth/test_auth_refresh.py`:
- Around line 174-178: The teardown unconditionally sets limiter.enabled = True
which can leak state; modify the fixture to capture the prior value (e.g., prev
= limiter.enabled) before changing it, and in the finally/teardown restore
limiter.enabled = prev instead of hard-coding True; reference the
limiter.enabled usage in the fixture where TestClient(app) is yielded so the
original state is preserved and restored after the yield.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0308d0b4-b830-4b0c-b768-4f57f240d0d5
📒 Files selected for processing (47)
app/auth/dependencies.pyapp/auth/models.pyapp/auth/router.pyapp/auth/service.pyapp/auth/verification.pyapp/core/config.pyapp/core/dependencies.pyapp/core/error_responses.pyapp/core/exception_handlers.pyapp/core/init_admin.pyapp/db/session.pyapp/external_services/cloudinary/service.pyapp/external_services/deepgram/service.pyapp/external_services/deepl/service.pyapp/kafka/consumer.pyapp/kafka/manager.pyapp/kafka/producer.pyapp/meeting/models.pyapp/meeting/repository.pyapp/meeting/router.pyapp/meeting/schemas.pyapp/meeting/service.pyapp/meeting/state.pyapp/meeting/ws_dependencies.pyapp/meeting/ws_router.pyapp/schemas/pipeline.pyapp/services/audio_bridge.pyapp/services/connection_manager.pyapp/services/email_consumer.pyapp/services/stt_worker.pyapp/services/translation_worker.pyapp/user/router.pyapp/user/service.pyscripts/test_audio_client.pytests/meeting/test_meeting_router.pytests/meeting/test_meeting_service.pytests/meeting/test_ws.pytests/meeting/test_ws_router.pytests/test_auth/test_auth_login.pytests/test_auth/test_auth_refresh.pytests/test_auth/test_auth_signup.pytests/test_auth/test_email_verification.pytests/test_auth/test_verification_token_crud.pytests/test_kafka/test_pipeline.pytests/test_user/test_storage_service.pytests/test_user/test_user_endpoints.pytests/test_user/test_user_service.py
✅ Files skipped from review due to trivial changes (28)
- app/core/error_responses.py
- app/user/router.py
- app/db/session.py
- app/auth/dependencies.py
- app/core/dependencies.py
- app/core/exception_handlers.py
- tests/test_user/test_storage_service.py
- tests/test_auth/test_auth_login.py
- app/external_services/cloudinary/service.py
- app/kafka/consumer.py
- app/user/service.py
- tests/test_auth/test_auth_signup.py
- app/kafka/producer.py
- app/services/email_consumer.py
- app/auth/service.py
- tests/test_user/test_user_endpoints.py
- app/auth/verification.py
- tests/test_user/test_user_service.py
- tests/test_auth/test_verification_token_crud.py
- app/services/audio_bridge.py
- scripts/test_audio_client.py
- app/auth/router.py
- tests/meeting/test_ws_router.py
- app/external_services/deepgram/service.py
- tests/meeting/test_ws.py
- app/meeting/models.py
- tests/test_kafka/test_pipeline.py
- app/schemas/pipeline.py
🚧 Files skipped from review as they are similar to previous changes (7)
- app/meeting/repository.py
- app/meeting/schemas.py
- app/core/init_admin.py
- app/meeting/state.py
- app/meeting/ws_dependencies.py
- app/core/config.py
- app/meeting/service.py
| data = response.json() | ||
| translated = data["choices"][0]["message"]["content"].strip() | ||
|
|
||
| return { | ||
| "translated_text": translated, | ||
| "latency_ms": round(elapsed_ms, 1), | ||
| } |
There was a problem hiding this comment.
Add defensive access for OpenAI response parsing.
The response parsing at line 170 assumes a specific structure that will raise KeyError or IndexError if OpenAI returns an unexpected response format (e.g., during API changes or errors).
🛡️ Proposed fix
data = response.json()
- translated = data["choices"][0]["message"]["content"].strip()
+ try:
+ translated = data["choices"][0]["message"]["content"].strip()
+ except (KeyError, IndexError, TypeError) as e:
+ logger.error("Unexpected OpenAI response format: %s", e)
+ return {"translated_text": "", "latency_ms": round(elapsed_ms, 1)}
return {
"translated_text": translated,
"latency_ms": round(elapsed_ms, 1),
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| data = response.json() | |
| translated = data["choices"][0]["message"]["content"].strip() | |
| return { | |
| "translated_text": translated, | |
| "latency_ms": round(elapsed_ms, 1), | |
| } | |
| data = response.json() | |
| try: | |
| translated = data["choices"][0]["message"]["content"].strip() | |
| except (KeyError, IndexError, TypeError) as e: | |
| logger.error("Unexpected OpenAI response format: %s", e) | |
| return {"translated_text": "", "latency_ms": round(elapsed_ms, 1)} | |
| return { | |
| "translated_text": translated, | |
| "latency_ms": round(elapsed_ms, 1), | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/external_services/deepl/service.py` around lines 169 - 175, The OpenAI
response parsing assumes a strict shape and can raise KeyError/IndexError;
update the parsing around response, data, translated and elapsed_ms to
defensively handle missing keys/empty lists by: parse response.json() into data,
verify data is a dict, ensure "choices" exists and is a non-empty list, and that
choices[0] contains "message" with "content" before accessing; on failure, log
or include the raw response/data in the error path and return a safe fallback
(e.g., translated_text as an empty string or a descriptive error message) while
still returning latency_ms, and wrap the extraction in try/except to avoid
uncaught exceptions.
|
|
||
| listening_language = participant_state.get("language", "en") | ||
| await websocket.accept() | ||
| print("Audio WS client connected: %s", user_id) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Replace print() statements with proper logging.
Multiple print() statements are used throughout this file for debugging. These should be replaced with appropriate logger calls for consistency, proper log levels, and production readiness.
♻️ Example fix for line 92
- print("Audio WS client connected: %s", user_id)
+ logger.info("Audio WS client connected: %s", log_sanitizer.sanitize(user_id))Also applies to: 158-162, 181-187, 191-191, 200-204, 218-218, 233-236, 240-240, 242-242, 245-248
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/ws_router.py` at line 92, Replace all print() debug statements in
app/meeting/ws_router.py (e.g., the print("Audio WS client connected: %s",
user_id) call that references user_id and the other prints around lines 158-162,
181-187, 191, 200-204, 218, 233-236, 240, 242, 245-248) with a module logger:
create or use logger = logging.getLogger(__name__) and call appropriate log
levels (e.g., logger.info for normal connection messages like "Audio WS client
connected", logger.debug for verbose internals, logger.error for exceptions),
and use logger formatting (either logger.info("Audio WS client connected: %s",
user_id) or f-strings passed as a single argument) instead of print so messages
are consistent and structured across functions that reference user_id and other
local vars.
| # only deliver audio matching the listener's language. | ||
| # For single-user testing, skip the filter so the speaker | ||
| # can hear their own translated audio. | ||
| participants = await MeetingStateService().get_participants(room_code) |
There was a problem hiding this comment.
Avoid instantiating MeetingStateService inside the message loop.
Creating a new MeetingStateService() instance for each Kafka message is inefficient. Move the instantiation outside the loop.
♻️ Proposed fix
+ meeting_state = MeetingStateService()
+
try:
async for msg in consumer:
try:
# ...
- participants = await MeetingStateService().get_participants(room_code)
+ participants = await meeting_state.get_participants(room_code)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/ws_router.py` at line 198, The code currently instantiates a new
MeetingStateService for every Kafka message when calling
MeetingStateService().get_participants(room_code); instead, create a single
MeetingStateService instance outside the message processing loop (e.g., service
= MeetingStateService()) and reuse service.get_participants(room_code) inside
the loop to avoid repeated construction; ensure the call remains awaited and
that any stateful cleanup (if MeetingStateService has close/dispose methods) is
handled appropriately outside the loop.
| async def scan( | ||
| self, | ||
| cursor: int, # noqa: ARG002 | ||
| match: str | None = None, | ||
| count: int | None = None, # noqa: ARG002 | ||
| ) -> tuple[int, list[str]]: | ||
| """Return all keys matching *match* pattern in one shot (cursor=0).""" | ||
| import fnmatch | ||
|
|
||
| if match: | ||
| # Convert Redis glob to fnmatch (Redis uses * for wildcard) | ||
| matched = [k for k in self._store if fnmatch.fnmatch(k, match)] | ||
| else: | ||
| matched = list(self._store.keys()) | ||
| # Return cursor=0 to signal iteration complete | ||
| return 0, matched |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify fake SCAN behavior vs production token-store loop usage.
rg -n --type=py -C3 'async def scan\(|cursor\s*==\s*0|revoke_all_user_tokens|scan\('Repository: Brints/FluentMeet
Length of output: 6093
🏁 Script executed:
# First, let's view the actual scan implementation in test_auth_refresh.py
cat -n tests/test_auth/test_auth_refresh.py | sed -n '61,76p'Repository: Brints/FluentMeet
Length of output: 770
🏁 Script executed:
# Check the full FakeRedis class to understand its complete structure
cat -n tests/test_auth/test_auth_refresh.py | sed -n '30,100p'Repository: Brints/FluentMeet
Length of output: 2519
🏁 Script executed:
# Look at the test cases to see how many tokens/keys are created
rg -n "def test_" tests/test_auth/test_auth_refresh.pyRepository: Brints/FluentMeet
Length of output: 537
🏁 Script executed:
# View the test that exercises revoke_all_user_tokens
cat -n tests/test_auth/test_auth_refresh.py | sed -n '339,375p'Repository: Brints/FluentMeet
Length of output: 1617
🏁 Script executed:
# Check how many keys might be created in tests - search for multiple set/store operations
rg -n "set\(|_store\[" tests/test_auth/test_auth_refresh.py | head -20Repository: Brints/FluentMeet
Length of output: 193
🏁 Script executed:
# Compare scan implementations across test files
echo "=== test_auth_login.py ===" && cat -n tests/test_auth/test_auth_login.py | sed -n '64,80p'
echo -e "\n=== test_meeting_router.py ===" && cat -n tests/meeting/test_meeting_router.py | sed -n '82,98p'Repository: Brints/FluentMeet
Length of output: 1316
🏁 Script executed:
# Check if any tests create many keys/tokens (>100)
rg -n "for.*in range|asyncio\.run.*save_refresh_token" tests/test_auth/test_auth_refresh.pyRepository: Brints/FluentMeet
Length of output: 43
FakeRedis.scan does not implement cursor pagination, creating a test coverage gap.
The scan method always returns cursor=0 and ignores the count parameter (lines 61–76). This affects multiple test files (test_auth_refresh.py, test_auth_login.py, test_meeting_router.py) and prevents tests from exercising multi-iteration SCAN behavior.
Since TokenStoreService.revoke_all_user_tokens calls scan(count=100) in a loop expecting proper cursor semantics (app/auth/token_store.py, lines 85–90), pagination bugs there could pass undetected. Current tests create too few keys to trigger the multi-iteration code path. A loop condition error or early termination bug in the production code would not be caught.
Implement pagination in FakeRedis.scan to return partial results respecting count and advancing the cursor until all matching keys are exhausted.
Suggested implementation
async def scan(
self,
- cursor: int, # noqa: ARG002
+ cursor: int,
match: str | None = None,
- count: int | None = None, # noqa: ARG002
+ count: int | None = None,
) -> tuple[int, list[str]]:
- """Return all keys matching *match* pattern in one shot (cursor=0)."""
+ """Return paged keys to better emulate Redis SCAN cursor behavior."""
import fnmatch
if match:
- # Convert Redis glob to fnmatch (Redis uses * for wildcard)
- matched = [k for k in self._store if fnmatch.fnmatch(k, match)]
+ matched = [k for k in sorted(self._store) if fnmatch.fnmatch(k, match)]
else:
- matched = list(self._store.keys())
- # Return cursor=0 to signal iteration complete
- return 0, matched
+ matched = sorted(self._store.keys())
+
+ page_size = count or 10
+ start = cursor
+ end = start + page_size
+ page = matched[start:end]
+ next_cursor = 0 if end >= len(matched) else end
+ return next_cursor, page📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def scan( | |
| self, | |
| cursor: int, # noqa: ARG002 | |
| match: str | None = None, | |
| count: int | None = None, # noqa: ARG002 | |
| ) -> tuple[int, list[str]]: | |
| """Return all keys matching *match* pattern in one shot (cursor=0).""" | |
| import fnmatch | |
| if match: | |
| # Convert Redis glob to fnmatch (Redis uses * for wildcard) | |
| matched = [k for k in self._store if fnmatch.fnmatch(k, match)] | |
| else: | |
| matched = list(self._store.keys()) | |
| # Return cursor=0 to signal iteration complete | |
| return 0, matched | |
| async def scan( | |
| self, | |
| cursor: int, | |
| match: str | None = None, | |
| count: int | None = None, | |
| ) -> tuple[int, list[str]]: | |
| """Return paged keys to better emulate Redis SCAN cursor behavior.""" | |
| import fnmatch | |
| if match: | |
| matched = [k for k in sorted(self._store) if fnmatch.fnmatch(k, match)] | |
| else: | |
| matched = sorted(self._store.keys()) | |
| page_size = count or 10 | |
| start = cursor | |
| end = start + page_size | |
| page = matched[start:end] | |
| next_cursor = 0 if end >= len(matched) else end | |
| return next_cursor, page |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_auth/test_auth_refresh.py` around lines 61 - 76, FakeRedis.scan
currently ignores cursor and count and always returns cursor=0; update
FakeRedis.scan to implement proper Redis-like pagination: when count is None
return all matches as before, otherwise build the list of matching keys (use
fnmatch on self._store.keys()), sort or stable-iterate them, start from the
provided cursor index, return up to count keys and the next cursor (index after
the returned slice), and wrap/return 0 when all keys have been consumed; ensure
the method signature (scan) still accepts cursor, match, count and that
TokenStoreService.revoke_all_user_tokens will receive advancing cursors until 0.
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
app/meeting/state.py (1)
103-126:⚠️ Potential issue | 🟠 MajorMake the lobby-to-participant promotion atomic.
MeetingService.admit_user()treats this boolean as the source of truth, but the existence check now happens before the pipeline. If another request removes the user from the lobby betweenhget()andexecute(), this code still writes a participant record and returnsTrue, effectively admitting someone who is no longer in the lobby.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/state.py` around lines 103 - 126, The promotion is not atomic because you read with hget before starting the pipeline; replace that with a WATCH/MULTI transaction or a Lua script so the check-and-move is atomic: WATCH the lobby hash (key_room_lobby(room_code)), re-read the lobby field (hget or hexists) after WATCH, if it still exists start MULTI (pipe = self._redis.pipeline()) then hdel the lobby field and hset the participant (key_room_participants(room_code)) and execute(), and handle redis.WatchError by retrying or returning False; alternatively implement the logic as a single EVAL script that checks the lobby field, deletes it and sets the participant atomically. Ensure you reference and update the existing uses of hget, hdel, hset, pipeline(), execute(), key_room_lobby and key_room_participants.
♻️ Duplicate comments (2)
app/meeting/ws_router.py (1)
231-247:⚠️ Potential issue | 🔴 CriticalRemove the raw-audio disk dump from the request path.
Every synthesized frame is written to local storage before it is sent to the client. That adds per-frame I/O latency and persists user audio outside the pipeline, which is a privacy/compliance risk for production traffic.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_router.py` around lines 231 - 247, Remove the per-frame disk dump: delete the output_path/mode variables, the nested _write_audio function, and the await asyncio.to_thread(_write_audio) call (and the subsequent print that reports SAVED ... to output_path). Ensure the rest of the egress path that sends audio_bytes to the client remains unchanged; if any test-only file-write behavior is needed, move it behind a configurable flag or a separate test-only helper rather than executing inside the live send flow (references: output_path, _write_audio, asyncio.to_thread, payload.sequence_number).app/services/stt_worker.py (1)
65-71:⚠️ Potential issue | 🟠 MajorHandle Deepgram failures without crashing the worker.
transcribe()is still unguarded here. A transient network/API failure will bubble out of the consumer and stop STT processing for subsequent audio until the worker is restarted.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/stt_worker.py` around lines 65 - 71, Wrap the call to get_deepgram_stt_service() and stt_service.transcribe(...) in a try/except block (around the existing stt_service/transcribe usage) to catch network/API exceptions, log the full error via the process/logger used in this module, and avoid letting exceptions propagate and crash the worker; on transient failures consider retrying with a short backoff (or return a controlled failure result so the worker can continue) and ensure the function using transcribe handles the error case gracefully instead of allowing an unhandled exception from get_deepgram_stt_service or stt_service.transcribe to stop STT processing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/meeting/ws_dependencies.py`:
- Around line 33-57: The WebSocket auth path decodes JWT in this block
(jwt.decode / payload) and looks up the user
(db.execute(select(User).where(User.email == raw_sub)).scalar_one_or_none()) but
never checks token revocation by jti; update the flow to extract
payload.get("jti") and validate it against the token store/blacklist (e.g.,
Redis/token service) and raise error_exc if the jti is present/marked revoked or
missing for an access token; ensure this check runs for token_type == "access"
before returning str(user.id) (and preserve existing behavior for guest tokens
if intended).
In `@app/meeting/ws_router.py`:
- Around line 296-343: The handler currently forwards every TEXT_TRANSLATED
message to all connected clients; change the assert_room_participant call to
capture the participant's preferred listening language (e.g., participant =
await assert_room_participant(room_code, user_id)) and then, inside the consumer
loop where you handle is_translation (msg.topic == TEXT_TRANSLATED), skip
messages whose payload_data.get("target_language") does not match the
participant's listening language (continue), while still sending non-translated
TEXT_ORIGINAL messages as before; update references in the loop that build
caption_msg so translated captions are only forwarded when
payload_data["target_language"] == participant_language.
- Around line 277-286: The current asyncio.wait() block swallows exceptions from
task1/task2 (and has a bare except that does nothing), so inspect completed
tasks and surface errors instead of silently passing: after await
asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED) iterate over
_done and for each task call task.exception() (or await task) and log or
re-raise any exception with context (include which task failed), cancel and then
await pending tasks to ensure proper cleanup (use asyncio.gather(*pending,
return_exceptions=True) to drain cancellations), and remove the bare "except
Exception: pass" so failures from task1/task2 are not hidden; reference the
variables task1, task2, _done, pending and the except block in ws_router.py.
In `@app/services/connection_manager.py`:
- Around line 113-124: The broadcast loop currently awaits send_json serially
which lets one slow client stall the room; change the fanout in the msg_type ==
"broadcast" branch to perform bounded concurrent sends (use an asyncio.Semaphore
or a worker pool and schedule per-socket send tasks, then await them with
asyncio.gather/asyncio.wait) instead of awaiting each send_json inline, handle
exceptions per task by logging via logger.warning with
log_sanitizer.sanitize(user_id), close and remove failed sockets from
self.active_connections[room_code], and preserve the existing sender_id check so
the sender is not echoed.
In `@app/services/stt_worker.py`:
- Around line 57-63: The worker currently injects a hard-coded transcript when
settings.DEEPGRAM_API_KEY is missing (see settings.DEEPGRAM_API_KEY check and
the result dict in stt_worker.py); instead fail closed: remove the fake
transcript, log an explicit error, and raise an exception (e.g., RuntimeError or
a custom SttConfigurationError) so upstream processing aborts rather than
treating bogus text as real speech; ensure any callers of the STT function (the
code that expects result) handle the exception path appropriately.
In `@app/services/translation_worker.py`:
- Around line 144-175: The current code in translation_worker.py returns a
mocked translation both when config is missing and when an exception occurs;
instead, change both paths to surface failures: in the branch that checks
settings.DEEPL_API_KEY and settings.OPENAI_API_KEY, do not return a mocked
string — raise a RuntimeError (or a specific TranslationError) indicating
missing translation backend; likewise in the except Exception block around calls
to get_deepl_translation_service()/get_openai_translation_fallback(), remove the
mocked return and re-raise the caught exception (or raise a wrapped
RuntimeError) after logging via logger.warning so callers can detect and handle
real failures rather than publishing fabricated captions/audio.
In `@tests/meeting/test_meeting_router.py`:
- Around line 261-265: The test currently replaces app.main.get_kafka_manager by
assigning a lambda returning AsyncMock (mock_kafka) without restoring it,
leaking state; change the test to save the original value of
app.main.get_kafka_manager before patching and ensure it is restored after the
test (or, better, use pytest's monkeypatch fixture to set
app.main.get_kafka_manager to a callable returning AsyncMock and automatically
undo the patch), referencing the symbol app.main.get_kafka_manager and the mock
variable mock_kafka (or AsyncMock) so the original function is reinstated in
teardown.
---
Outside diff comments:
In `@app/meeting/state.py`:
- Around line 103-126: The promotion is not atomic because you read with hget
before starting the pipeline; replace that with a WATCH/MULTI transaction or a
Lua script so the check-and-move is atomic: WATCH the lobby hash
(key_room_lobby(room_code)), re-read the lobby field (hget or hexists) after
WATCH, if it still exists start MULTI (pipe = self._redis.pipeline()) then hdel
the lobby field and hset the participant (key_room_participants(room_code)) and
execute(), and handle redis.WatchError by retrying or returning False;
alternatively implement the logic as a single EVAL script that checks the lobby
field, deletes it and sets the participant atomically. Ensure you reference and
update the existing uses of hget, hdel, hset, pipeline(), execute(),
key_room_lobby and key_room_participants.
---
Duplicate comments:
In `@app/meeting/ws_router.py`:
- Around line 231-247: Remove the per-frame disk dump: delete the
output_path/mode variables, the nested _write_audio function, and the await
asyncio.to_thread(_write_audio) call (and the subsequent print that reports
SAVED ... to output_path). Ensure the rest of the egress path that sends
audio_bytes to the client remains unchanged; if any test-only file-write
behavior is needed, move it behind a configurable flag or a separate test-only
helper rather than executing inside the live send flow (references: output_path,
_write_audio, asyncio.to_thread, payload.sequence_number).
In `@app/services/stt_worker.py`:
- Around line 65-71: Wrap the call to get_deepgram_stt_service() and
stt_service.transcribe(...) in a try/except block (around the existing
stt_service/transcribe usage) to catch network/API exceptions, log the full
error via the process/logger used in this module, and avoid letting exceptions
propagate and crash the worker; on transient failures consider retrying with a
short backoff (or return a controlled failure result so the worker can continue)
and ensure the function using transcribe handles the error case gracefully
instead of allowing an unhandled exception from get_deepgram_stt_service or
stt_service.transcribe to stop STT processing.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6dd51404-58cf-4247-879c-8f4488354a61
📒 Files selected for processing (28)
app/auth/models.pyapp/auth/router.pyapp/core/config.pyapp/core/init_admin.pyapp/db/session.pyapp/external_services/cloudinary/service.pyapp/external_services/deepgram/service.pyapp/external_services/deepl/service.pyapp/kafka/consumer.pyapp/kafka/manager.pyapp/meeting/repository.pyapp/meeting/router.pyapp/meeting/schemas.pyapp/meeting/service.pyapp/meeting/state.pyapp/meeting/ws_dependencies.pyapp/meeting/ws_router.pyapp/schemas/pipeline.pyapp/services/connection_manager.pyapp/services/stt_worker.pyapp/services/translation_worker.pyscripts/test_audio_client.pytests/meeting/test_meeting_router.pytests/meeting/test_meeting_service.pytests/meeting/test_ws.pytests/meeting/test_ws_router.pytests/test_auth/test_auth_refresh.pytests/test_kafka/test_pipeline.py
✅ Files skipped from review due to trivial changes (7)
- app/db/session.py
- app/auth/router.py
- tests/meeting/test_ws.py
- app/schemas/pipeline.py
- app/external_services/deepgram/service.py
- tests/test_kafka/test_pipeline.py
- app/core/config.py
🚧 Files skipped from review as they are similar to previous changes (13)
- app/kafka/consumer.py
- app/external_services/cloudinary/service.py
- app/meeting/repository.py
- app/meeting/router.py
- app/core/init_admin.py
- scripts/test_audio_client.py
- app/kafka/manager.py
- tests/meeting/test_ws_router.py
- tests/test_auth/test_auth_refresh.py
- app/meeting/schemas.py
- app/auth/models.py
- app/meeting/service.py
- app/external_services/deepl/service.py
| try: | ||
| payload = jwt.decode( | ||
| token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] | ||
| ) | ||
| except JWTError as err: | ||
| raise error_exc from err | ||
|
|
||
| raw_sub = payload.get("sub") | ||
| token_type = payload.get("type", "access") | ||
|
|
||
| if ( | ||
| not raw_sub | ||
| or not isinstance(raw_sub, str) | ||
| or token_type not in ("access", "guest") | ||
| ): | ||
| raise error_exc | ||
|
|
||
| if token_type == "access": | ||
| # The 'sub' is an email; we need the UUID to match Redis participant state | ||
| user = db.execute( | ||
| select(User).where(User.email == raw_sub) | ||
| ).scalar_one_or_none() | ||
| if not user: | ||
| raise error_exc | ||
| return str(user.id) |
There was a problem hiding this comment.
WebSocket auth currently bypasses access-token revocation.
This dependency only decodes the JWT and looks up the user by sub; it never checks the access-token jti against the token store. A logged-out or blacklisted access token can therefore open a new WebSocket connection until it expires.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/ws_dependencies.py` around lines 33 - 57, The WebSocket auth path
decodes JWT in this block (jwt.decode / payload) and looks up the user
(db.execute(select(User).where(User.email == raw_sub)).scalar_one_or_none()) but
never checks token revocation by jti; update the flow to extract
payload.get("jti") and validate it against the token store/blacklist (e.g.,
Redis/token service) and raise error_exc if the jti is present/marked revoked or
missing for an access token; ensure this check runs for token_type == "access"
before returning str(user.id) (and preserve existing behavior for guest tokens
if intended).
| try: | ||
| # Run until either task fails or disconnects | ||
| _done, pending = await asyncio.wait( | ||
| [task1, task2], return_when=asyncio.FIRST_COMPLETED | ||
| ) | ||
| # Cancel whatever is still running | ||
| for t in pending: | ||
| t.cancel() | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Do not swallow failures from the audio background tasks.
asyncio.wait() does not surface exceptions from task1/task2, and this block never inspects the completed tasks. If Kafka consume/send fails, the WebSocket shuts down without any actionable error log and the root cause is lost.
🧰 Tools
🪛 GitHub Check: CodeQL
[notice] 285-285: Empty except
'except' clause does nothing but pass and there is no explanatory comment.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/ws_router.py` around lines 277 - 286, The current asyncio.wait()
block swallows exceptions from task1/task2 (and has a bare except that does
nothing), so inspect completed tasks and surface errors instead of silently
passing: after await asyncio.wait([task1, task2],
return_when=asyncio.FIRST_COMPLETED) iterate over _done and for each task call
task.exception() (or await task) and log or re-raise any exception with context
(include which task failed), cancel and then await pending tasks to ensure
proper cleanup (use asyncio.gather(*pending, return_exceptions=True) to drain
cancellations), and remove the bare "except Exception: pass" so failures from
task1/task2 are not hidden; reference the variables task1, task2, _done, pending
and the except block in ws_router.py.
| try: | ||
| # Validate they are in the room, but we don't strictly *need* their state | ||
| _ = await assert_room_participant(room_code, user_id) | ||
| except Exception as e: | ||
| await websocket.close(code=1008, reason=str(e)) | ||
| return | ||
|
|
||
| await websocket.accept() | ||
|
|
||
| # Use a persistent user-specific group so reconnects don't drop captions | ||
| # Note: "Subscribe from now" is handled via auto_offset_reset="latest" | ||
| # in their group creation or by wiping the group offsets. | ||
| # We'll use a dynamic timestamp group to force "latest". | ||
| consumer = AIOKafkaConsumer( | ||
| TEXT_ORIGINAL, | ||
| TEXT_TRANSLATED, | ||
| bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, | ||
| group_id=f"captions-{room_code}-{user_id}-{int(time.time())}", | ||
| auto_offset_reset="latest", | ||
| value_deserializer=lambda v: json.loads(v.decode("utf-8")), | ||
| ) | ||
|
|
||
| await consumer.start() | ||
|
|
||
| try: | ||
| async for msg in consumer: | ||
| payload_data = msg.value.get("payload", {}) | ||
| if payload_data.get("room_id") != room_code: | ||
| continue | ||
|
|
||
| # Build unified caption response depending on topic | ||
| is_translation = msg.topic == TEXT_TRANSLATED | ||
|
|
||
| caption_msg = { | ||
| "event": "caption", | ||
| "speaker_id": payload_data.get("user_id"), | ||
| "is_final": payload_data.get("is_final", True), | ||
| "timestamp_ms": int(time.time() * 1000), | ||
| } | ||
|
|
||
| if is_translation: | ||
| caption_msg["language"] = payload_data.get("target_language") | ||
| caption_msg["text"] = payload_data.get("translated_text") | ||
| else: | ||
| caption_msg["language"] = payload_data.get("source_language") | ||
| caption_msg["text"] = payload_data.get("text") | ||
|
|
||
| await websocket.send_json(caption_msg) |
There was a problem hiding this comment.
Filter translated captions to the connected participant's language.
This endpoint validates room membership but then forwards every TEXT_TRANSLATED event for the room to every client. In a multilingual room, each listener will receive other participants' target-language caption streams as well, not just the one matching their own listening language.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/ws_router.py` around lines 296 - 343, The handler currently
forwards every TEXT_TRANSLATED message to all connected clients; change the
assert_room_participant call to capture the participant's preferred listening
language (e.g., participant = await assert_room_participant(room_code, user_id))
and then, inside the consumer loop where you handle is_translation (msg.topic ==
TEXT_TRANSLATED), skip messages whose payload_data.get("target_language") does
not match the participant's listening language (continue), while still sending
non-translated TEXT_ORIGINAL messages as before; update references in the loop
that build caption_msg so translated captions are only forwarded when
payload_data["target_language"] == participant_language.
| if msg_type == "broadcast": | ||
| sender_id = payload.get("sender_id") | ||
| for user_id, ws in list(self.active_connections[room_code].items()): | ||
| # Don't echo back to the sender | ||
| if user_id != sender_id: | ||
| try: | ||
| await ws.send_json(data) | ||
| except Exception: | ||
| logger.warning( | ||
| "Failed to send message to %s", | ||
| log_sanitizer.sanitize(user_id), | ||
| ) |
There was a problem hiding this comment.
One slow client can stall signaling for the whole room.
Broadcast fanout awaits each send_json() serially inside the Redis subscriber loop. If one socket blocks, every later recipient waits behind it and pub/sub delivery backs up for that room. This needs bounded concurrent sends and cleanup of failed sockets.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/connection_manager.py` around lines 113 - 124, The broadcast
loop currently awaits send_json serially which lets one slow client stall the
room; change the fanout in the msg_type == "broadcast" branch to perform bounded
concurrent sends (use an asyncio.Semaphore or a worker pool and schedule
per-socket send tasks, then await them with asyncio.gather/asyncio.wait) instead
of awaiting each send_json inline, handle exceptions per task by logging via
logger.warning with log_sanitizer.sanitize(user_id), close and remove failed
sockets from self.active_connections[room_code], and preserve the existing
sender_id check so the sender is not echoed.
| if not settings.DEEPGRAM_API_KEY: | ||
| logger.info("DEEPGRAM_API_KEY not set. Mocking STT response for testing.") | ||
| result: dict[str, Any] = { | ||
| "text": "Hello, this is a simulated transcription for testing purposes.", | ||
| "detected_language": payload.source_language, | ||
| "confidence": 1.0, | ||
| } |
There was a problem hiding this comment.
Do not publish fake transcripts when STT is misconfigured.
If DEEPGRAM_API_KEY is missing, this worker emits a hard-coded sentence into text.original. Downstream translation/TTS will treat that as real speech and surface bogus content to participants instead of failing closed.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/stt_worker.py` around lines 57 - 63, The worker currently
injects a hard-coded transcript when settings.DEEPGRAM_API_KEY is missing (see
settings.DEEPGRAM_API_KEY check and the result dict in stt_worker.py); instead
fail closed: remove the fake transcript, log an explicit error, and raise an
exception (e.g., RuntimeError or a custom SttConfigurationError) so upstream
processing aborts rather than treating bogus text as real speech; ensure any
callers of the STT function (the code that expects result) handle the exception
path appropriately.
| if not settings.DEEPL_API_KEY and not settings.OPENAI_API_KEY: | ||
| logger.info("Translation config missing. Mocking text for testing.") | ||
| return f"[Mocked Translation -> {target_language}]: {text}" | ||
|
|
||
| deepl = get_deepl_translation_service() | ||
| openai_fallback = get_openai_translation_fallback() | ||
|
|
||
| try: | ||
| if settings.DEEPL_API_KEY and deepl.supports_language(target_language): | ||
| result = await deepl.translate( | ||
| text, | ||
| source_language=source_language, | ||
| target_language=target_language, | ||
| ) | ||
| elif settings.OPENAI_API_KEY: | ||
| logger.info( | ||
| "DeepL skipped or unsupported for '%s', falling back to OpenAI", | ||
| target_language, | ||
| ) | ||
| result = await openai_fallback.translate( | ||
| text, | ||
| source_language=source_language, | ||
| target_language=target_language, | ||
| ) | ||
| else: | ||
| raise RuntimeError("No available translation backend.") | ||
| except Exception as api_exc: | ||
| logger.warning( | ||
| "Translation backend failed (%s). Mocking translation.", | ||
| str(api_exc), | ||
| ) | ||
| return f"[Mocked Translation -> {target_language}]: {text}" |
There was a problem hiding this comment.
Do not substitute mocked translations on backend failure.
Both the “missing config” path and the broad exception fallback return fabricated translated text, which is then published as if it were real. In production, misconfiguration or a provider outage should not turn into bogus captions/audio for participants.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/translation_worker.py` around lines 144 - 175, The current code
in translation_worker.py returns a mocked translation both when config is
missing and when an exception occurs; instead, change both paths to surface
failures: in the branch that checks settings.DEEPL_API_KEY and
settings.OPENAI_API_KEY, do not return a mocked string — raise a RuntimeError
(or a specific TranslationError) indicating missing translation backend;
likewise in the except Exception block around calls to
get_deepl_translation_service()/get_openai_translation_fallback(), remove the
mocked return and re-raise the caught exception (or raise a wrapped
RuntimeError) after logging via logger.warning so callers can detect and handle
real failures rather than publishing fabricated captions/audio.
| # Mock the kafka manager to prevent lifespan from bridging actual sockets | ||
| import app.main as app_main_module | ||
|
|
||
| mock_kafka = AsyncMock() | ||
| app_main_module.get_kafka_manager = lambda: mock_kafka |
There was a problem hiding this comment.
Restore the Kafka-manager patch after each test.
This fixture mutates app.main.get_kafka_manager directly and never puts it back. That leaks test state across modules and can make unrelated tests pass or fail depending on execution order.
🧰 Tools
🪛 GitHub Check: CodeQL
[notice] 262-262: Module is imported with 'import' and 'import from'
Module 'app.main' is imported with both 'import' and 'import from'.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/meeting/test_meeting_router.py` around lines 261 - 265, The test
currently replaces app.main.get_kafka_manager by assigning a lambda returning
AsyncMock (mock_kafka) without restoring it, leaking state; change the test to
save the original value of app.main.get_kafka_manager before patching and ensure
it is restored after the test (or, better, use pytest's monkeypatch fixture to
set app.main.get_kafka_manager to a callable returning AsyncMock and
automatically undo the patch), referencing the symbol app.main.get_kafka_manager
and the mock variable mock_kafka (or AsyncMock) so the original function is
reinstated in teardown.
Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 6
♻️ Duplicate comments (5)
app/services/stt_worker.py (2)
57-65:⚠️ Potential issue | 🟠 MajorDo not emit synthetic transcripts when STT is misconfigured.
At Line 57, publishing a hard-coded sentence on missing
DEEPGRAM_API_KEYcauses false transcripts to flow through translation/TTS. Fail closed (log + drop/raise controlled error) instead.Suggested fix
if not settings.DEEPGRAM_API_KEY: - logger.info("DEEPGRAM_API_KEY not set. Mocking STT response for testing.") - result: dict[str, Any] = { - "text": ( - "Hello, this is a simulated transcription for testing purposes." - ), - "detected_language": payload.source_language, - "confidence": 1.0, - } + logger.error( + "DEEPGRAM_API_KEY missing; dropping chunk seq=%d user=%s", + payload.sequence_number, + payload.user_id, + ) + return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/services/stt_worker.py` around lines 57 - 65, The code currently emits a hard-coded transcript when settings.DEEPGRAM_API_KEY is missing (see settings.DEEPGRAM_API_KEY check and the result dict in stt_worker.py); instead, fail closed by removing the synthetic "result" creation and either raise a clear, controlled exception (e.g., RuntimeError or a custom STTConfigurationError) or return an explicit failure value so downstream translation/TTS won't run; update the logger call to log an error (logger.error) with context about the missing DEEPGRAM_API_KEY and ensure the surrounding function (the STT handler in stt_worker.py) short-circuits immediately when the key is absent.
67-73:⚠️ Potential issue | 🟠 MajorAdd retry/backoff and bounded failure handling around Deepgram STT call.
At Line 68, transient external failures currently have no retry strategy and may interrupt processing. Add bounded retries with exponential backoff and graceful final failure handling.
Suggested fix
+import asyncio @@ else: stt_service = get_deepgram_stt_service() - result = await stt_service.transcribe( - audio_bytes, - language=payload.source_language, - sample_rate=payload.sample_rate, - encoding=payload.encoding.value, - ) + max_attempts = 3 + result = None + for attempt in range(1, max_attempts + 1): + try: + result = await stt_service.transcribe( + audio_bytes, + language=payload.source_language, + sample_rate=payload.sample_rate, + encoding=payload.encoding.value, + ) + break + except Exception as exc: + logger.warning( + "Deepgram STT failed attempt=%d/%d seq=%d user=%s: %s", + attempt, + max_attempts, + payload.sequence_number, + payload.user_id, + exc, + ) + if attempt == max_attempts: + logger.error( + "Deepgram STT exhausted retries seq=%d user=%s", + payload.sequence_number, + payload.user_id, + ) + return + await asyncio.sleep(0.25 * (2 ** (attempt - 1)))#!/bin/bash set -euo pipefail # Verify current STT call path has no retry loop/backoff. rg -n -C4 'transcribe\(|attempt|sleep|max_attempts' app/services/stt_worker.py # Verify Deepgram wrapper surfaces exceptions that need worker-level handling. rg -n -C4 'raise_for_status|HTTPStatusError|RequestError|transcribe\(' app/external_services/deepgram/service.pyapp/meeting/ws_router.py (3)
327-349:⚠️ Potential issue | 🟠 MajorTranslated captions are not filtered by the listener's language preference.
Every connected client receives all
TEXT_TRANSLATEDmessages for the room, regardless of whether the translation matches their preferred language. In a multilingual room, users will see translations intended for others.Retrieve the participant's language from
assert_room_participantand filter accordingly:+ participant_state = await assert_room_participant(room_code, user_id) + listening_language = participant_state.get("language", "en") + # ... in the loop: + if is_translation: + if payload_data.get("target_language") != listening_language: + continue🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_router.py` around lines 327 - 349, The loop currently sends all TEXT_TRANSLATED messages to every websocket; update it to fetch the participant's preferred language via assert_room_participant (use the same participant lookup used when opening the WS) and filter translated captions by comparing payload_data.get("target_language") to that participant language before building/sending caption_msg; keep behavior for non-translated TEXT events unchanged (still send using source_language/text). Use the existing symbols: assert_room_participant, consumer, TEXT_TRANSLATED, websocket, payload_data, room_code, and caption_msg to locate and implement the check.
234-250:⚠️ Potential issue | 🟠 MajorHardcoded Windows-style path and debug disk write should be removed or gated.
The path uses Windows backslash syntax (
\) which fails on Linux/Unix. Additionally, writing audio to disk on every message is debug code that shouldn't run in production.- # Also save to disk for testing/validation - output_path = Path(rf"{settings.SYSTEM_PATH}\voiceai_output.raw") - mode = "ab" if payload.sequence_number > 0 else "wb" - - def _write_audio( - _path: Path = output_path, - _mode: str = mode, - _data: bytes = audio_bytes, - ) -> None: - with _path.open(_mode) as f: - f.write(_data) - - await asyncio.to_thread(_write_audio) - print( - f"Egress: SAVED {len(audio_bytes)} bytes to {output_path} " - f"(seq={payload.sequence_number})" - ) + # Debug: Optionally save to disk for testing/validation + if settings.DEBUG_SAVE_AUDIO: + output_path = Path(settings.SYSTEM_PATH) / "voiceai_output.raw" + mode = "ab" if payload.sequence_number > 0 else "wb" + await asyncio.to_thread( + lambda p=output_path, m=mode, d=audio_bytes: p.open(m).write(d) + ) + logger.debug("Saved %d bytes to %s", len(audio_bytes), output_path)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_router.py` around lines 234 - 250, The code writes debug audio to a hardcoded Windows path and unconditionally performs disk I/O; change Path(rf"{settings.SYSTEM_PATH}\voiceai_output.raw") to use pathlib joining (e.g., Path(settings.SYSTEM_PATH) / "voiceai_output.raw") to be OS-agnostic, and gate the entire write and print via a feature flag (e.g., settings.WRITE_AUDIO_TO_DISK or settings.ENV == "development") so the _write_audio function, asyncio.to_thread call, and the print are only executed when that flag is enabled; also replace the print with a logger.debug call. Keep the existing mode logic (mode = "ab" if payload.sequence_number > 0 else "wb") and references to payload.sequence_number and asyncio.to_thread when implementing the gated behavior.
283-292:⚠️ Potential issue | 🟠 MajorTask exceptions are swallowed and pending tasks are not properly awaited.
The
asyncio.wait()block doesn't inspect completed tasks for exceptions, and the emptyexceptclause hides failures. Cancelled tasks should be awaited to ensure clean shutdown.try: # Run until either task fails or disconnects _done, pending = await asyncio.wait( [task1, task2], return_when=asyncio.FIRST_COMPLETED ) + # Surface any exceptions from completed tasks + for task in _done: + if task.exception(): + logger.error("Audio task failed: %s", task.exception()) # Cancel whatever is still running for t in pending: t.cancel() - except Exception: - pass + try: + await t + except asyncio.CancelledError: + pass + except Exception as e: + logger.exception("Audio WebSocket handler error: %s", e)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_router.py` around lines 283 - 292, The current asyncio.wait usage swallows exceptions and ignores pending cancellations; update the block that awaits task1 and task2 so you inspect completed tasks for exceptions, cancel pending tasks, then await those cancelled tasks (use asyncio.gather(..., return_exceptions=True)) and re-raise or log any non-CancelledError exceptions instead of using a bare "except Exception: pass". Concretely, replace the try/except around asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED) with logic that captures (_done, pending) from asyncio.wait, iterates _done and checks done.exception() to handle/raise errors, calls t.cancel() for each t in pending, awaits asyncio.gather(*pending, return_exceptions=True) to ensure clean shutdown, and only catch and handle asyncio.CancelledError where appropriate.
🧹 Nitpick comments (1)
app/meeting/ws_router.py (1)
351-352: Add explanatory comment for emptyexceptclause.CodeQL flags the empty
except WebSocketDisconnect: passas a code smell. While this is intentional (disconnects are expected), adding a comment clarifies the intent.except WebSocketDisconnect: - pass + pass # Expected when client disconnects; proceed to cleanup🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@app/meeting/ws_router.py` around lines 351 - 352, The empty except handling for WebSocketDisconnect in the ws_router module is intentional because client disconnects are normal; update the except WebSocketDisconnect: pass block to include a concise explanatory comment stating that disconnects are expected and intentionally ignored (optionally note that no action is needed and that logging can be added at debug if desired), referencing the except WebSocketDisconnect clause in this file so reviewers understand the intent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/meeting/service.py`:
- Around line 286-294: When choosing final_lang before calling
self.state.add_to_lobby(room_code, tracking_id, ...), preserve any existing
lobby language for this tracking_id: first check the current lobby entry for
tracking_id via self.state (e.g. lookup/get existing lobby entry) and if it has
a language use that as final_lang; only if no lobby entry or no language present
fall back to listening_language, then user.listening_language, then "en". Ensure
this logic is applied where add_to_lobby is invoked so admit_from_lobby will see
the preserved language.
- Around line 179-191: The code currently strips tzinfo from room.scheduled_at
with replace(tzinfo=None) which loses the UTC conversion; instead convert
scheduled_at to UTC first (e.g.,
room.scheduled_at.astimezone(datetime.timezone.utc)) then make it naive for
comparison (replace(tzinfo=None)), so in the block that uses room.scheduled_at,
compute sched =
room.scheduled_at.astimezone(datetime.timezone.utc).replace(tzinfo=None) (and
ensure utc_now() is also in UTC) before comparing and raising
BadRequestException in the existing logic that checks sched > now when not
is_host.
In `@app/meeting/ws_router.py`:
- Line 319: The current dynamic group_id
(group_id=f"captions-{room_code}-{user_id}-{int(time.time())}") creates a new
Kafka consumer group per WebSocket connection; change it to a stable identifier
such as group_id=f"captions-{room_code}-{user_id}" (or another deterministic
per-user or per-session id stored in the session) so reconnections reuse the
same consumer group, and if you need "latest" behavior on reconnect, perform a
seek_to_end() after subscribing rather than generating a new group;
alternatively, implement a cleanup/TTL strategy for ephemeral groups if
per-connection uniqueness is required.
- Around line 265-269: The exception handler currently prints the error and
performs an inline import of traceback followed by traceback.print_exc(); remove
the inline "import traceback" and the traceback.print_exc() call and replace the
prints with structured logging (e.g., call the module logger's exception() or
error(..., exc_info=True)) so the exception and stack trace are recorded via the
app logger; update the except block that catches "frame_err" to use
logger.exception or logger.error with exc_info=True and remove the print(...)
statement and inline import.
In `@app/services/stt_worker.py`:
- Around line 103-108: The info log in stt_worker.py leaks transcript content by
logging text[:50]; update the logging call (the format string and its arguments
where payload.sequence_number, payload.room_id, payload.user_id, text, and
result.get("confidence", 0.0) are used) to remove any raw transcript and instead
log only metadata such as text length (use len(text) or "redacted") along with
sequence_number, room_id, user_id, confidence and latency; ensure the format
string and argument list are adjusted accordingly so no substring of text is
passed to the logger.
- Around line 44-45: The base64 decode call base64.b64decode(payload.audio_data)
can raise binascii.Error/ValueError for malformed input and should be handled so
it doesn't bubble into BaseConsumer._process_with_retry (causing retries); wrap
the decode in a try/except around base64.b64decode(payload.audio_data), catch
binascii.Error and ValueError, log a clear error including payload identifiers
via the same logger used in this module (e.g., process_logger or the existing
logger), and return/ack/drop the message (do not re-raise) so processing stops;
update the method that handles the incoming payload (the function containing the
decode, e.g., STTWorker.process or the message handler) to implement this
defensive handling.
---
Duplicate comments:
In `@app/meeting/ws_router.py`:
- Around line 327-349: The loop currently sends all TEXT_TRANSLATED messages to
every websocket; update it to fetch the participant's preferred language via
assert_room_participant (use the same participant lookup used when opening the
WS) and filter translated captions by comparing
payload_data.get("target_language") to that participant language before
building/sending caption_msg; keep behavior for non-translated TEXT events
unchanged (still send using source_language/text). Use the existing symbols:
assert_room_participant, consumer, TEXT_TRANSLATED, websocket, payload_data,
room_code, and caption_msg to locate and implement the check.
- Around line 234-250: The code writes debug audio to a hardcoded Windows path
and unconditionally performs disk I/O; change
Path(rf"{settings.SYSTEM_PATH}\voiceai_output.raw") to use pathlib joining
(e.g., Path(settings.SYSTEM_PATH) / "voiceai_output.raw") to be OS-agnostic, and
gate the entire write and print via a feature flag (e.g.,
settings.WRITE_AUDIO_TO_DISK or settings.ENV == "development") so the
_write_audio function, asyncio.to_thread call, and the print are only executed
when that flag is enabled; also replace the print with a logger.debug call. Keep
the existing mode logic (mode = "ab" if payload.sequence_number > 0 else "wb")
and references to payload.sequence_number and asyncio.to_thread when
implementing the gated behavior.
- Around line 283-292: The current asyncio.wait usage swallows exceptions and
ignores pending cancellations; update the block that awaits task1 and task2 so
you inspect completed tasks for exceptions, cancel pending tasks, then await
those cancelled tasks (use asyncio.gather(..., return_exceptions=True)) and
re-raise or log any non-CancelledError exceptions instead of using a bare
"except Exception: pass". Concretely, replace the try/except around
asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED) with logic
that captures (_done, pending) from asyncio.wait, iterates _done and checks
done.exception() to handle/raise errors, calls t.cancel() for each t in pending,
awaits asyncio.gather(*pending, return_exceptions=True) to ensure clean
shutdown, and only catch and handle asyncio.CancelledError where appropriate.
In `@app/services/stt_worker.py`:
- Around line 57-65: The code currently emits a hard-coded transcript when
settings.DEEPGRAM_API_KEY is missing (see settings.DEEPGRAM_API_KEY check and
the result dict in stt_worker.py); instead, fail closed by removing the
synthetic "result" creation and either raise a clear, controlled exception
(e.g., RuntimeError or a custom STTConfigurationError) or return an explicit
failure value so downstream translation/TTS won't run; update the logger call to
log an error (logger.error) with context about the missing DEEPGRAM_API_KEY and
ensure the surrounding function (the STT handler in stt_worker.py)
short-circuits immediately when the key is absent.
---
Nitpick comments:
In `@app/meeting/ws_router.py`:
- Around line 351-352: The empty except handling for WebSocketDisconnect in the
ws_router module is intentional because client disconnects are normal; update
the except WebSocketDisconnect: pass block to include a concise explanatory
comment stating that disconnects are expected and intentionally ignored
(optionally note that no action is needed and that logging can be added at debug
if desired), referencing the except WebSocketDisconnect clause in this file so
reviewers understand the intent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 75785193-64fa-44b7-9c5b-6aab63b3e1e1
📒 Files selected for processing (13)
.github/workflows/ci.ymlapp/auth/router.pyapp/db/session.pyapp/external_services/cloudinary/service.pyapp/kafka/consumer.pyapp/meeting/schemas.pyapp/meeting/service.pyapp/meeting/ws_router.pyapp/services/stt_worker.pypyproject.tomlscripts/test_audio_client.pytests/test_auth/test_auth_signup.pytests/test_kafka/test_pipeline.py
✅ Files skipped from review due to trivial changes (5)
- .github/workflows/ci.yml
- app/db/session.py
- app/kafka/consumer.py
- app/auth/router.py
- tests/test_auth/test_auth_signup.py
🚧 Files skipped from review as they are similar to previous changes (5)
- app/external_services/cloudinary/service.py
- scripts/test_audio_client.py
- pyproject.toml
- app/meeting/schemas.py
- tests/test_kafka/test_pipeline.py
| if room.scheduled_at: | ||
| # Normalize to naive UTC for comparison (SQLite strips tzinfo) | ||
| sched = ( | ||
| room.scheduled_at.replace(tzinfo=None) | ||
| if room.scheduled_at.tzinfo | ||
| else room.scheduled_at | ||
| ) | ||
| now = utc_now().replace(tzinfo=None) | ||
| if sched > now and not is_host: | ||
| raise BadRequestException( | ||
| code="MEETING_NOT_STARTED", | ||
| message="This meeting is scheduled for a future time.", | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
python - <<'PY'
from datetime import UTC, datetime, timedelta, timezone
sched = datetime(2026, 4, 6, 10, 0, tzinfo=timezone(timedelta(hours=2)))
print("original :", sched.isoformat())
print("replace(None) :", sched.replace(tzinfo=None).isoformat())
print("astimezone(UTC) :", sched.astimezone(UTC).isoformat())
PYRepository: Brints/FluentMeet
Length of output: 186
🏁 Script executed:
cat -n app/meeting/service.py | sed -n '175,195p'Repository: Brints/FluentMeet
Length of output: 1035
🏁 Script executed:
cat -n app/meeting/models.py | sed -n '25,40p'Repository: Brints/FluentMeet
Length of output: 806
🏁 Script executed:
rg -n "utc_now" app/meeting/service.py -A 2 -B 2Repository: Brints/FluentMeet
Length of output: 937
Convert scheduled_at to UTC before comparing it.
Lines 180-186 use replace(tzinfo=None), which removes the timezone offset without converting the instant. For a timezone-aware scheduled_at value like 10:00+02:00, this becomes naive 10:00 instead of the correct 08:00+00:00. The comparison then incorrectly blocks or allows access based on the wrong instant in time.
🛠️ Proposed fix
if room.scheduled_at:
- # Normalize to naive UTC for comparison (SQLite strips tzinfo)
- sched = (
- room.scheduled_at.replace(tzinfo=None)
- if room.scheduled_at.tzinfo
- else room.scheduled_at
- )
- now = utc_now().replace(tzinfo=None)
+ sched = (
+ room.scheduled_at.astimezone(UTC)
+ if room.scheduled_at.tzinfo is not None
+ else room.scheduled_at.replace(tzinfo=UTC)
+ )
+ now = utc_now()
if sched > now and not is_host:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if room.scheduled_at: | |
| # Normalize to naive UTC for comparison (SQLite strips tzinfo) | |
| sched = ( | |
| room.scheduled_at.replace(tzinfo=None) | |
| if room.scheduled_at.tzinfo | |
| else room.scheduled_at | |
| ) | |
| now = utc_now().replace(tzinfo=None) | |
| if sched > now and not is_host: | |
| raise BadRequestException( | |
| code="MEETING_NOT_STARTED", | |
| message="This meeting is scheduled for a future time.", | |
| ) | |
| if room.scheduled_at: | |
| sched = ( | |
| room.scheduled_at.astimezone(UTC) | |
| if room.scheduled_at.tzinfo is not None | |
| else room.scheduled_at.replace(tzinfo=UTC) | |
| ) | |
| now = utc_now() | |
| if sched > now and not is_host: | |
| raise BadRequestException( | |
| code="MEETING_NOT_STARTED", | |
| message="This meeting is scheduled for a future time.", | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/service.py` around lines 179 - 191, The code currently strips
tzinfo from room.scheduled_at with replace(tzinfo=None) which loses the UTC
conversion; instead convert scheduled_at to UTC first (e.g.,
room.scheduled_at.astimezone(datetime.timezone.utc)) then make it naive for
comparison (replace(tzinfo=None)), so in the block that uses room.scheduled_at,
compute sched =
room.scheduled_at.astimezone(datetime.timezone.utc).replace(tzinfo=None) (and
ensure utc_now() is also in UTC) before comparing and raising
BadRequestException in the existing logic that checks sched > now when not
is_host.
| # Priority: explicit join request > user profile > default "en" | ||
| if listening_language: | ||
| final_lang = listening_language | ||
| elif user and user.listening_language: | ||
| final_lang = user.listening_language | ||
| else: | ||
| final_lang = "en" | ||
|
|
||
| await self.state.add_to_lobby(room_code, tracking_id, display_name, final_lang) |
There was a problem hiding this comment.
Preserve the existing lobby language on reconnect.
A waiting user has not hit _finalize_join() yet, so a retry lands here again. If that retry omits listening_language, add_to_lobby() overwrites the stored lobby entry with the profile/default value, and admit_from_lobby() will later promote the wrong language.
Prefer the current lobby language for tracking_id before falling back to the user profile or "en".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/service.py` around lines 286 - 294, When choosing final_lang
before calling self.state.add_to_lobby(room_code, tracking_id, ...), preserve
any existing lobby language for this tracking_id: first check the current lobby
entry for tracking_id via self.state (e.g. lookup/get existing lobby entry) and
if it has a language use that as final_lang; only if no lobby entry or no
language present fall back to listening_language, then user.listening_language,
then "en". Ensure this logic is applied where add_to_lobby is invoked so
admit_from_lobby will see the preserved language.
| except Exception as frame_err: | ||
| print(f"Error processing egress frame: {frame_err}") | ||
| import traceback | ||
|
|
||
| traceback.print_exc() |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Remove debug import and traceback.print_exc() from exception handler.
Importing inside an exception handler is unusual and the print_exc() should use structured logging instead.
except Exception as frame_err:
- print(f"Error processing egress frame: {frame_err}")
- import traceback
-
- traceback.print_exc()
+ logger.exception("Error processing egress frame: %s", frame_err)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/ws_router.py` around lines 265 - 269, The exception handler
currently prints the error and performs an inline import of traceback followed
by traceback.print_exc(); remove the inline "import traceback" and the
traceback.print_exc() call and replace the prints with structured logging (e.g.,
call the module logger's exception() or error(..., exc_info=True)) so the
exception and stack trace are recorded via the app logger; update the except
block that catches "frame_err" to use logger.exception or logger.error with
exc_info=True and remove the print(...) statement and inline import.
| TEXT_ORIGINAL, | ||
| TEXT_TRANSLATED, | ||
| bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, | ||
| group_id=f"captions-{room_code}-{user_id}-{int(time.time())}", |
There was a problem hiding this comment.
Dynamic group_id with timestamp creates orphaned consumer groups.
Using int(time.time()) in the group_id creates a new consumer group for every WebSocket connection. These groups are never cleaned up and will accumulate in Kafka, consuming metadata resources.
Consider using a stable group_id per user or implementing a cleanup strategy:
- group_id=f"captions-{room_code}-{user_id}-{int(time.time())}",
+ group_id=f"captions-{room_code}-{user_id}",If you need "latest" behavior on reconnect, you can seek to end after starting rather than relying on a new group.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| group_id=f"captions-{room_code}-{user_id}-{int(time.time())}", | |
| group_id=f"captions-{room_code}-{user_id}", |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/meeting/ws_router.py` at line 319, The current dynamic group_id
(group_id=f"captions-{room_code}-{user_id}-{int(time.time())}") creates a new
Kafka consumer group per WebSocket connection; change it to a stable identifier
such as group_id=f"captions-{room_code}-{user_id}" (or another deterministic
per-user or per-session id stored in the session) so reconnections reuse the
same consumer group, and if you need "latest" behavior on reconnect, perform a
seek_to_end() after subscribing rather than generating a new group;
alternatively, implement a cleanup/TTL strategy for ephemeral groups if
per-connection uniqueness is required.
| audio_bytes = base64.b64decode(payload.audio_data) | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify decode path currently has no guard.
rg -n -C3 'b64decode\(' app/services/stt_worker.py
# Verify whether BaseConsumer has a top-level exception boundary around handler execution.
rg -n -C4 'class BaseConsumer|await .*handle\(|try:|except' app/kafka/consumer.pyRepository: Brints/FluentMeet
Length of output: 3222
🌐 Web query:
Python base64.b64decode exceptions binascii.Error ValueError
💡 Result:
base64.b64decode(s, altchars=None, validate=False) can raise different exceptions depending on what’s wrong with the input:
-
binascii.Error- If the input is incorrectly padded (e.g., missing/extra
=). (docs.python.org) - If
validate=Trueand the input contains any non‑Base64 alphabet characters (withvalidate=False, those characters are discarded before the padding check). (docs.python.org)
- If the input is incorrectly padded (e.g., missing/extra
-
ValueError- If
altcharsis provided but its length is not exactly 2 (it “may assert or raiseValueError”). (docs.python.org)
- If
Common handling pattern:
import base64, binascii
try:
data = base64.b64decode(s, validate=True)
except binascii.Error:
# invalid base64 (bad padding / invalid characters under validate=True)
...
except ValueError:
# bad altchars length (not 2)
...Citations:
- 1: https://docs.python.org/3/library/base64.html?utm_source=openai
- 2: https://docs.python.org/3/library/base64.html?utm_source=openai
- 3: https://docs.python.org/3/library/base64.html?utm_source=openai
🌐 Web query:
Python base64.b64decode validate parameter documentation
💡 Result:
In Python’s base64.b64decode, the validate parameter controls whether non‑Base64 characters are allowed in the input.
Docs (Python standard library):
base64.b64decode(s, altchars=None, validate=False) and (newer form) base64.b64decode(s, altchars=None, validate=True, *, ignorechars) [1]
validate=False(default unlessignorecharsis provided): characters not in the Base64 alphabet (and, if applicable, thealtcharsalphabet) are discarded before padding is checked. [1]validate=True: any non‑alphabet characters cause abinascii.Error(i.e., strict validation). [1]- If
ignorecharsis specified, those characters are ignored even when validating; in that case the default forvalidatebecomesTrue. [1] - The docs point to
binascii.a2b_base64()for more on “strict base64” checking. [1]
Source: Python base64 module documentation. [1]
Handle invalid base64 input defensively before processing.
At Line 44, base64.b64decode(payload.audio_data) is unguarded and will raise binascii.Error or ValueError on malformed input. This exception propagates through the retry loop in BaseConsumer._process_with_retry(), causing the message to be retried multiple times before eventual DLQ forwarding. Malformed base64 is non-transient and should be logged and dropped immediately instead.
Suggested fix
+import binascii
import base64
import logging
import time
@@
- audio_bytes = base64.b64decode(payload.audio_data)
+ try:
+ audio_bytes = base64.b64decode(payload.audio_data, validate=True)
+ except (binascii.Error, ValueError) as exc:
+ logger.warning(
+ "Invalid base64 audio chunk seq=%d user=%s: %s",
+ payload.sequence_number,
+ payload.user_id,
+ exc,
+ )
+ return📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| audio_bytes = base64.b64decode(payload.audio_data) | |
| try: | |
| audio_bytes = base64.b64decode(payload.audio_data, validate=True) | |
| except (binascii.Error, ValueError) as exc: | |
| logger.warning( | |
| "Invalid base64 audio chunk seq=%d user=%s: %s", | |
| payload.sequence_number, | |
| payload.user_id, | |
| exc, | |
| ) | |
| return |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/stt_worker.py` around lines 44 - 45, The base64 decode call
base64.b64decode(payload.audio_data) can raise binascii.Error/ValueError for
malformed input and should be handled so it doesn't bubble into
BaseConsumer._process_with_retry (causing retries); wrap the decode in a
try/except around base64.b64decode(payload.audio_data), catch binascii.Error and
ValueError, log a clear error including payload identifiers via the same logger
used in this module (e.g., process_logger or the existing logger), and
return/ack/drop the message (do not re-raise) so processing stops; update the
method that handles the incoming payload (the function containing the decode,
e.g., STTWorker.process or the message handler) to implement this defensive
handling.
| "STT: seq=%d room=%s user=%s text='%s' confidence=%.2f latency=%.1fms", | ||
| payload.sequence_number, | ||
| payload.room_id, | ||
| payload.user_id, | ||
| text[:50], | ||
| result.get("confidence", 0.0), |
There was a problem hiding this comment.
Avoid logging transcript content in info logs (privacy risk).
At Lines 103-108, text[:50] can leak user speech/PII into logs. Prefer metadata-only logging (length/sequence/confidence).
Suggested fix
logger.info(
- "STT: seq=%d room=%s user=%s text='%s' confidence=%.2f latency=%.1fms",
+ "STT: seq=%d room=%s user=%s chars=%d confidence=%.2f latency=%.1fms",
payload.sequence_number,
payload.room_id,
payload.user_id,
- text[:50],
+ len(text),
result.get("confidence", 0.0),
elapsed_ms,
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "STT: seq=%d room=%s user=%s text='%s' confidence=%.2f latency=%.1fms", | |
| payload.sequence_number, | |
| payload.room_id, | |
| payload.user_id, | |
| text[:50], | |
| result.get("confidence", 0.0), | |
| "STT: seq=%d room=%s user=%s chars=%d confidence=%.2f latency=%.1fms", | |
| payload.sequence_number, | |
| payload.room_id, | |
| payload.user_id, | |
| len(text), | |
| result.get("confidence", 0.0), |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/stt_worker.py` around lines 103 - 108, The info log in
stt_worker.py leaks transcript content by logging text[:50]; update the logging
call (the format string and its arguments where payload.sequence_number,
payload.room_id, payload.user_id, text, and result.get("confidence", 0.0) are
used) to remove any raw transcript and instead log only metadata such as text
length (use len(text) or "redacted") along with sequence_number, room_id,
user_id, confidence and latency; ensure the format string and argument list are
adjusted accordingly so no substring of text is passed to the logger.
…ling, and captions
This adds the core WebSocket layer bridging frontend clients to the Kafka-backed AI processing pipeline.
Specific additions:
ConnectionManagerleveraging Redis Pub/Sub to allow WebSocket broadcasting and unicasting across multi-instance pod deployments.authenticate_wsdependency for securely validating JWTs via query parameters (since browser WS APIs lack custom header support)./ws/signalingfor WebRTC peer-to-peer negotiation relay./ws/audioingestion (WebSocket -> Kafkaaudio.raw) and egress (Kafkaaudio.synthesized-> WebSocket) with sequence-based stale frame dropping./ws/captionsbroadcast powered by dynamic consumer groups.docs/testing.mdguide detailing manual testing workflows via Postman.Closes: #8 and #9
Summary by CodeRabbit
New Features
Documentation
Tests