Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughCentralizes SSE response handling; adds SecurityHeaders middleware and streaming request-size enforcement; makes multiple enums terminal-aware with is_terminal flags; records event-data-lost metrics and adds transactional archival; threads idempotency and logger into saga flow; replaces token-bucket with Redis Lua; refactors pod log framing and tests. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
8 issues found across 46 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/app/services/saga/saga_orchestrator.py">
<violation number="1" location="backend/app/services/saga/saga_orchestrator.py:151">
P2: Retry count is stored on a non-serialized attribute, so it resets on each requeue and the max retry limit never triggers (infinite retries possible). Persist retry_count in the queue or on a model field instead of `event._retry_count`.</violation>
</file>
<file name="backend/app/services/execution_service.py">
<violation number="1" location="backend/app/services/execution_service.py:216">
P2: `ExecutionStatus.CANCELLED` is terminal, so this new check raises `ExecutionTerminalError` before the `ALREADY_CANCELLED` handling, changing behavior for already-cancelled executions. Exclude CANCELLED from the terminal guard or move the cancelled check above it.</violation>
</file>
<file name="backend/app/core/utils.py">
<violation number="1" location="backend/app/core/utils.py:62">
P1: Security: `ip.startswith("172.")` is overly broad and trusts public IPs as proxies. The RFC 1918 private range is only `172.16.0.0/12` (i.e. `172.16.x.x` through `172.31.x.x`), but this check matches all `172.*` addresses. An attacker on a public `172.32+.x.x` IP can spoof `X-Forwarded-For` to bypass rate limiting.
Consider using Python's `ipaddress` module for robust checking, or at minimum restrict the prefix match to cover only `172.16.` – `172.31.`.</violation>
</file>
<file name="backend/app/services/auth_service.py">
<violation number="1" location="backend/app/services/auth_service.py:79">
P2: Calling `_fail_login` for a deactivated account with correct credentials increments the brute-force lockout counter. The password was already verified successfully, so this isn't a brute-force attempt. After repeated login attempts with the correct password, the account will be erroneously locked. Consider handling the deactivated-account case separately—log the event and raise the error without recording a failed lockout attempt.</violation>
</file>
<file name="backend/app/core/middlewares/rate_limit.py">
<violation number="1" location="backend/app/core/middlewares/rate_limit.py:114">
P2: Using the unverified JWT payload as the rate-limit key allows clients to spoof `sub` values and evade per-user limits. Prefer a validated identity (e.g., request.state user set by auth) and fall back to IP when no validated user is present.</violation>
<violation number="2" location="backend/app/core/middlewares/rate_limit.py:122">
P2: Add error handling around JWT payload decoding so malformed tokens don’t crash the middleware. Fall back to the IP bucket on decode/parse errors.</violation>
</file>
<file name="backend/app/services/notification_service.py">
<violation number="1" location="backend/app/services/notification_service.py:114">
P2: The shared httpx.AsyncClient is never closed, so its connection pool stays open until process exit. Add a shutdown/close hook to call `await self._http_client.aclose()` when the service is torn down.</violation>
</file>
<file name="backend/app/db/repositories/event_repository.py">
<violation number="1" location="backend/app/db/repositories/event_repository.py:267">
P2: The MongoDB session started with `start_session()` is never closed, which can leak session resources. Wrap it in an async context manager so it is always ended.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
backend/app/api/routes/admin/executions.py (1)
32-33: 🛠️ Refactor suggestion | 🟠 MajorRemove redundant per-endpoint
admin_userdependencies.With the router-level
dependencies=[Depends(admin_user)]now in place, the_: Annotated[User, Depends(admin_user)]parameter in each handler is redundant. The auth check runs twice per request—once at router level, once at handler level.♻️ Proposed fix to remove duplicate dependencies
`@router.get`( "/", response_model=AdminExecutionListResponse, responses={403: {"model": ErrorResponse}}, ) async def list_executions( - _: Annotated[User, Depends(admin_user)], service: FromDishka[AdminExecutionService], status: ExecutionStatus | None = Query(None), priority: QueuePriority | None = Query(None), user_id: str | None = Query(None), limit: int = Query(50, ge=1, le=200), skip: int = Query(0, ge=0), ) -> AdminExecutionListResponse:`@router.put`( "/{execution_id}/priority", response_model=AdminExecutionResponse, responses={403: {"model": ErrorResponse}, 404: {"model": ErrorResponse}}, ) async def update_priority( execution_id: str, body: PriorityUpdateRequest, - _: Annotated[User, Depends(admin_user)], service: FromDishka[AdminExecutionService], ) -> AdminExecutionResponse:`@router.get`( "/queue", response_model=QueueStatusResponse, responses={403: {"model": ErrorResponse}}, ) async def get_queue_status( - _: Annotated[User, Depends(admin_user)], service: FromDishka[AdminExecutionService], ) -> QueueStatusResponse:If any handler needs access to the authenticated
Userobject (not just auth enforcement), keep the dependency but use a lighter current_user variant or retrieve from request state instead.Based on learnings: "Routes must use Depends(admin_user) as a router-level auth guard."
Also applies to: 58-61, 73-74
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/api/routes/admin/executions.py` around lines 32 - 33, The per-endpoint admin dependency is redundant: remove the parameter "_: Annotated[User, Depends(admin_user)]" from the list_executions handler (and any other handlers that declare the same parameter) since the router already has dependencies=[Depends(admin_user)]; if a handler actually needs the authenticated User object, replace that heavy dependency with a lightweight current_user dependency or read the user from request.state instead (so modify the function signatures for list_executions and the other handlers that currently accept Annotated[User, Depends(admin_user)] accordingly).backend/app/events/handlers.py (1)
66-66: 🛠️ Refactor suggestion | 🟠 MajorUse structured logging with keyword arguments instead of f-string interpolation.
Per coding guidelines, log messages should not interpolate data—pass it as keyword arguments instead.
♻️ Suggested fix
if result.is_duplicate: - logger.info(f"Duplicate event: {event.event_type} ({event.event_id})") + logger.info("Duplicate event detected", event_type=event.event_type, event_id=event.event_id) returnAs per coding guidelines: "never interpolate user-controlled data into log messages—always pass data as keyword arguments."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/events/handlers.py` at line 66, Replace the f-string interpolation on the logger.info call with structured logging keyword arguments: change the line using logger.info(f"Duplicate event: {event.event_type} ({event.event_id})") to a structured form such as logger.info("Duplicate event", event_type=event.event_type, event_id=event.event_id) (update the logger.info call in handlers.py where logger is used to report duplicate events).backend/app/services/execution_service.py (1)
216-225:⚠️ Potential issue | 🟠 MajorFix cancellation status check order to keep
ALREADY_CANCELLEDreachable.Line 216 now short-circuits on
is_terminal, soExecutionStatus.CANCELLEDhitsExecutionTerminalErrorand the Line 219-225 branch is unreachable. This changes API behavior for repeated cancellation requests.💡 Proposed fix
- if current_status.is_terminal: - raise ExecutionTerminalError(execution_id, current_status) - if current_status == ExecutionStatus.CANCELLED: return CancelResult( execution_id=execution_id, status=CancelStatus.ALREADY_CANCELLED, message="Execution was already cancelled", event_id=None, ) + if current_status.is_terminal: + raise ExecutionTerminalError(execution_id, current_status)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/execution_service.py` around lines 216 - 225, The check for terminal status currently runs before the specific cancelled check, causing ExecutionStatus.CANCELLED to raise ExecutionTerminalError and preventing returning CancelResult(ALREADY_CANCELLED); modify the control flow in the function where current_status is evaluated so that the branch if current_status == ExecutionStatus.CANCELLED (returning CancelResult with CancelStatus.ALREADY_CANCELLED and message "Execution was already cancelled") executes before the if current_status.is_terminal check that raises ExecutionTerminalError(execution_id, current_status), ensuring repeated cancellation requests return the ALREADY_CANCELLED result.backend/app/services/pod_monitor/event_mapper.py (1)
510-516:⚠️ Potential issue | 🟠 MajorGuard JSON parsing to preserve fallback behavior.
At line 510,
json.loads(text)raisesjson.JSONDecodeErroron malformed brace-wrapped chunks, which propagates out of_try_parse_jsonand breaks the intended line-by-line fallback loop in_parse_executor_output. This causes valid executor output on subsequent lines to be dropped, degrading completed/timeout event mapping reliability.Wrap
json.loadsin a try/except block forjson.JSONDecodeErrorand validate that parsed data andresource_usageare dictionaries before unpacking. ReturnNoneon any parsing or validation failure to allow the fallback to proceed.🔧 Proposed fix
def _try_parse_json(self, text: str) -> PodLogs | None: """Try to parse text as executor JSON output""" if not (text.startswith("{") and text.endswith("}")): return None - data = json.loads(text) + try: + data = json.loads(text) + except json.JSONDecodeError: + return None + if not isinstance(data, dict): + return None + + resource_usage_data = data.get("resource_usage") or {} + if not isinstance(resource_usage_data, dict): + return None + return PodLogs( stdout=data.get("stdout", ""), stderr=data.get("stderr", ""), exit_code=data.get("exit_code", 0), - resource_usage=ResourceUsageDomain(**data.get("resource_usage", {})), + resource_usage=ResourceUsageDomain(**resource_usage_data), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/pod_monitor/event_mapper.py` around lines 510 - 516, The json.loads call in _try_parse_json can raise json.JSONDecodeError and the code assumes the parsed payload and resource_usage are dicts; wrap json.loads(text) in a try/except catching json.JSONDecodeError and return None on parse failure, then validate that the resulting data is a dict and that data.get("resource_usage") is a dict before doing ResourceUsageDomain(**...); if validation fails return None so the caller _parse_executor_output can fall back to line-by-line parsing and not drop subsequent valid executor output; keep PodLogs construction only when parsing and validation succeed.
🧹 Nitpick comments (8)
backend/app/dlq/manager.py (1)
258-259: Avoid f-string interpolation in log messages; catch specific exceptions.Line 259 interpolates
event_idand exceptionedirectly into the log message string. Per coding guidelines, user-controlled data should be passed as structured keyword arguments. Additionally, catching broadExceptionis discouraged.♻️ Proposed fix
- except Exception as e: - self.logger.error(f"Error retrying message {event_id}: {e}") + except (ValueError, RuntimeError) as e: # or other specific expected exceptions + self.logger.error("Error retrying message", event_id=event_id, error=str(e), exc_type=type(e).__name__)As per coding guidelines: "never interpolate user-controlled data into log messages—always pass data as keyword arguments" and "never catch Exception broadly—catch specific exception types".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/dlq/manager.py` around lines 258 - 259, The except block in manager.py that currently does "except Exception as e: self.logger.error(f'Error retrying message {event_id}: {e}')" should be changed to catch the specific exception types thrown by the retry logic (e.g., the network/db/processing errors raised in the retry path) instead of broad Exception, and log without f-string interpolation by passing structured data and exception info to the logger (for example: self.logger.error("Error retrying message", event_id=event_id, exc_info=e) or self.logger.exception with event_id as a keyword). Locate the except block around the retry routine that references event_id and replace the broad except with specific exception classes used by the retry code and update the logger call to pass event_id as a keyword argument and include exc_info rather than interpolating the error into the message.backend/app/db/repositories/event_repository.py (1)
60-65: Silent no-op when event not found.
mark_publish_failedsilently does nothing if theevent_iddoesn't exist in the database. Thefind_one(...).update(...)chain returnsNonewhen no document matches, with no indication of whether the update succeeded.Consider returning a boolean or the update result to allow callers (like the producer) to detect and log this scenario.
♻️ Optional: Return success indicator
- async def mark_publish_failed(self, event_id: str) -> None: + async def mark_publish_failed(self, event_id: str) -> bool: """Mark an event as failed to publish to Kafka for later retry.""" - await EventDocument.find_one( + result = await EventDocument.find_one( EventDocument.event_id == event_id, ).update({"$set": {"publish_failed": True, "publish_failed_at": datetime.now(timezone.utc)}}) + return result is not None and result.modified_count > 0🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/db/repositories/event_repository.py` around lines 60 - 65, The mark_publish_failed method currently performs find_one(...).update(...) and silently no-ops when no document matches; change mark_publish_failed to capture the update result and return a boolean success indicator: perform the find/update on EventDocument (EventDocument.event_id == event_id), store the result of the update operation, set publish_failed and publish_failed_at as before, and return True if a document was matched/modified and False otherwise; update the method signature of mark_publish_failed to return bool and adjust any callers (e.g., the producer) to check the boolean and log or handle the missing-event case.backend/app/events/core/producer.py (1)
46-50: Consider handlingmark_publish_failedfailure separately.If
mark_publish_failedraises an exception (e.g., MongoDB connection issue), it will shadow the original Kafka exceptione. The original error context would be lost, making debugging harder.♻️ Proposed fix: Isolate mark_publish_failed failure
except Exception as e: self._event_metrics.record_kafka_production_error(topic=topic, error_type=type(e).__name__) self.logger.error("Failed to produce message", topic=topic, error=str(e)) - await self._event_repository.mark_publish_failed(event_to_produce.event_id) + try: + await self._event_repository.mark_publish_failed(event_to_produce.event_id) + except Exception as mark_err: + self.logger.warning( + "Failed to mark event as publish_failed", + event_id=event_to_produce.event_id, + error=str(mark_err), + ) raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/events/core/producer.py` around lines 46 - 50, The current except block may let a failure in self._event_repository.mark_publish_failed(event_to_produce.event_id) mask the original Kafka exception; capture the original exception (the one caught as e), then call mark_publish_failed inside its own try/except so any error there is logged/recorded (use self.logger.exception or record a separate metric) but does not replace the original; finally re-raise the original Kafka exception (not the secondary one). Reference: the except block around self._event_metrics.record_kafka_production_error, self.logger.error("Failed to produce message", ...), self._event_repository.mark_publish_failed, and the re-raise.backend/app/api/routes/admin/events.py (1)
101-102: Consider having the service raiseNotFoundErrorinstead of returningNone.Per coding guidelines, services should "check None from repositories, log the issue, and raise a domain-specific NotFoundError subclass; services never return None to route handlers." The route handler should not need to raise
HTTPExceptionfor not-found cases—the global exception handler should convert domain exceptions.This applies similarly to lines 153-154 and 166-167.
As per coding guidelines: "never raise HTTPException in route handlers for domain-not-found cases—let the global handler convert exceptions to HTTP status codes" and "Services must check None from repositories... and raise a domain-specific NotFoundError subclass; services never return None to route handlers."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/api/routes/admin/events.py` around lines 101 - 102, The route currently checks "result" and raises HTTPException when missing; instead update the underlying service methods called by these routes to detect repository None results, log appropriately and raise the domain-specific NotFoundError (a subclass used by the global exception handler), and remove the route-level HTTPException checks/raises (the routes should no longer raise HTTPException on not-found). Specifically, replace the pattern that returns None from the service with raising NotFoundError, and remove the "if not result: raise HTTPException(...)" blocks (the ones referencing result at the shown locations) so the global exception handler converts NotFoundError to a 404.backend/app/core/security.py (1)
45-51: Docstring is missing Returns and Raises sections.The docstring documents Args but omits Returns (the username string) and Raises (InvalidCredentialsError). Consider adding these for completeness per Google-style docstring conventions.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/core/security.py` around lines 45 - 51, The docstring for decode_token is missing Returns and Raises sections; update the decode_token docstring to include a Returns section that states it returns the username (the JWT "sub" claim) as a str and a Raises section that documents that InvalidCredentialsError is raised on malformed/invalid/expired tokens (unless allow_expired=True) to match Google-style docstring conventions and clarify behavior for callers.backend/app/core/middlewares/request_size_limit.py (1)
37-59: Unusedexceededflag can be removed.The
exceededvariable is assigned on line 48 but never read. Consider removing it to avoid dead code.♻️ Suggested cleanup
- bytes_received = 0 - max_size = self.max_size_bytes - exceeded = False + bytes_received = 0 + max_size = self.max_size_bytes async def receive_wrapper() -> Message: - nonlocal bytes_received, exceeded + nonlocal bytes_received message = await receive() if message["type"] == "http.request": body = message.get("body", b"") bytes_received += len(body) if bytes_received > max_size: - exceeded = True raise _RequestTooLarge() return message🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/core/middlewares/request_size_limit.py` around lines 37 - 59, The unused boolean flag "exceeded" in the request-size middleware is dead code; remove the "exceeded" variable and its nonlocal declaration from the receive_wrapper closure and any assignments to it, leaving the logic that increments bytes_received, compares to max_size, and raises _RequestTooLarge intact (see receive_wrapper, bytes_received, max_size, and _RequestTooLarge).backend/app/services/execution_service.py (1)
517-521: Use full Google-style docstring sections in_publish_deletion_event.The updated docstring explains intent but still omits
Args/Returnssections required by project standards.✍️ Suggested docstring shape
- """Publish cancellation event for a deleted execution. - - Uses ExecutionCancelledEvent because no dedicated deletion event type - exists yet — the saga orchestrator treats both the same way. - """ + """Publish a cancellation event representing a deleted execution. + + Args: + execution_id: UUID of the execution being deleted. + user_id: ID of the user requesting deletion. + + Returns: + None. + """As per coding guidelines, "Use Google-style docstrings with Args/Returns/Raises sections for all functions and classes".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/execution_service.py` around lines 517 - 521, The docstring for _publish_deletion_event is missing Google-style sections; update its triple-quoted docstring to include an Args: section listing parameters (e.g., execution or execution_id and any context/kwargs), a Returns: section (even if None, state that it returns None), and a Raises: section for any exceptions that can be propagated; keep the existing descriptive text about using ExecutionCancelledEvent and the saga behavior, then append the Args/Returns/Raises headings following project guidelines so the function _publish_deletion_event conforms to the required docstring format.backend/app/domain/enums/replay.py (1)
25-33: Add Google-style docstrings to new enum methods.
__new__andis_terminalwere added withoutArgs/Returnsdocumentation.As per coding guidelines, "Use Google-style docstrings with Args/Returns/Raises sections for all functions and classes".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/domain/enums/replay.py` around lines 25 - 33, Add Google-style docstrings for the ReplayStatus.__new__ and ReplayStatus.is_terminal methods: for __new__, document Args (value: str, terminal: bool), Returns (ReplayStatus) and Raises (if any; indicate None if none), and for is_terminal, document Returns (bool) and a short description; attach these docstrings directly above the respective definitions (__new__ and is_terminal) using the Google style with Args/Returns/Raises sections.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/core/middlewares/rate_limit.py`:
- Around line 118-125: The JWT payload decoding must be made resilient: wrap the
base64.urlsafe_b64decode and _json.loads operations (the block that builds
padded, calls base64.urlsafe_b64decode and _json.loads to get payload and
username) in a try/except that catches decoding/JSON errors (e.g.,
binascii.Error, ValueError, JSONDecodeError or a broad Exception) and on error
simply skip returning a user key so the function falls back to IP-based rate
limiting instead of raising; keep the existing return f"user:{username}" when
parsing succeeds and do not change other control flow.
In `@backend/app/core/middlewares/request_size_limit.py`:
- Around line 52-59: When catching _RequestTooLarge in the middleware, detect
whether the inner app already started sending a response (use the
send/receive_wrapper state such as a response_started flag you maintain in
receive_wrapper/send wrapper) and if so log a warning that the 413 cannot be
sent because http.response.start was already emitted; otherwise continue to
build and send the JSONResponse(413) as currently done. Update the exception
handler around self.app(scope, receive_wrapper, send) to check that flag and
call the middleware logger (e.g., self.logger.warning) with context (including
max_size) when the response has already started instead of attempting to send
the replacement response.
In `@backend/app/core/security.py`:
- Around line 61-62: The except block currently suppresses the original JWT
error by using "from None"; change the handler to capture and chain the original
exception: catch jwt.PyJWTError as exc and re-raise InvalidCredentialsError()
from exc so the original traceback is preserved (update the except clause that
references jwt.PyJWTError and the raise that constructs
InvalidCredentialsError).
In `@backend/app/core/utils.py`:
- Around line 56-64: The _is_trusted_proxy function currently treats any "172."
address as trusted, which is too broad; update it to correctly detect the
RFC1918 172.16.0.0/12 private range. Replace the ip.startswith("172.") check
with a proper network containment test (e.g., use the ipaddress module: parse ip
with ipaddress.ip_address and test membership in
ipaddress.ip_network("172.16.0.0/12")) while preserving the other checks (127.*,
::1, 10.*, 192.168.*) and handling invalid IPs safely.
In `@backend/app/db/repositories/event_repository.py`:
- Around line 267-270: The created session from
EventDocument.get_motor_collection().database.client.start_session() is not
being closed; wrap the session creation in an async context manager and nest the
existing async with session.start_transaction() inside it so the session is
properly cleaned up after use; update the block that currently calls
start_session() then uses async with session.start_transaction() to use "async
with <start_session()> as session:" around the transaction and the calls to
archived_doc.insert and doc.delete to ensure session cleanup.
In `@backend/app/db/repositories/execution_repository.py`:
- Around line 136-140: In the aggregation that computes "avg_ms" (inside the
ExecutionRepository aggregation pipeline where avg_ms is built using "$avg":
{"$multiply": [{"$divide": [{"$subtract": ["$updated_at", "$created_at"]},
1]}]}), remove the unnecessary "$multiply" and "$divide" wrappers and set the
expression to simply take the "$avg" of the "$subtract" result (e.g. "$avg":
{"$subtract": ["$updated_at", "$created_at"]}) since MongoDB date subtraction
already yields milliseconds.
In `@backend/app/services/saga/saga_orchestrator.py`:
- Around line 133-152: The retry counter stored as a dynamic attribute on the
event is lost during serialization; instead add a dict on SagaOrchestrator (e.g.
self._retry_counts: dict[str,int]) in __init__ and use it inside
try_schedule_from_queue to increment/read retry_count keyed by execution_id,
remove the key when the saga succeeds or when retry_count >=
self._MAX_SAGA_START_RETRIES, and update the enqueue/release/error branches
(references: try_schedule_from_queue, _queue.enqueue, _queue.release,
_resolve_completion, _MAX_SAGA_START_RETRIES) to use this dict rather than
event._retry_count so retries persist across serialization.
In `@backend/app/settings.py`:
- Line 152: ENVIRONMENT is defaulting to "development" which is unsafe; change
the default value of the settings variable ENVIRONMENT to "production" so the
app is secure-by-default, and ensure any development deployments explicitly
override ENVIRONMENT via the existing config or environment-variable loading
logic (reference the ENVIRONMENT variable in settings.py to locate the change).
In `@docs/architecture/domain-exceptions.md`:
- Line 33: Add the missing AccountLockedError to the exceptions docs: insert
AccountLockedError into the hierarchy diagram as a sibling to UnauthorizedError
and ForbiddenError (i.e., under the same auth-related parent node), and update
the exception locations table by adding an entry for AccountLockedError under
the User/Auth module pointing to app/domain/user/exceptions.py so the table and
diagram match the HTTP status codes row for 423.
---
Outside diff comments:
In `@backend/app/api/routes/admin/executions.py`:
- Around line 32-33: The per-endpoint admin dependency is redundant: remove the
parameter "_: Annotated[User, Depends(admin_user)]" from the list_executions
handler (and any other handlers that declare the same parameter) since the
router already has dependencies=[Depends(admin_user)]; if a handler actually
needs the authenticated User object, replace that heavy dependency with a
lightweight current_user dependency or read the user from request.state instead
(so modify the function signatures for list_executions and the other handlers
that currently accept Annotated[User, Depends(admin_user)] accordingly).
In `@backend/app/events/handlers.py`:
- Line 66: Replace the f-string interpolation on the logger.info call with
structured logging keyword arguments: change the line using
logger.info(f"Duplicate event: {event.event_type} ({event.event_id})") to a
structured form such as logger.info("Duplicate event",
event_type=event.event_type, event_id=event.event_id) (update the logger.info
call in handlers.py where logger is used to report duplicate events).
In `@backend/app/services/execution_service.py`:
- Around line 216-225: The check for terminal status currently runs before the
specific cancelled check, causing ExecutionStatus.CANCELLED to raise
ExecutionTerminalError and preventing returning CancelResult(ALREADY_CANCELLED);
modify the control flow in the function where current_status is evaluated so
that the branch if current_status == ExecutionStatus.CANCELLED (returning
CancelResult with CancelStatus.ALREADY_CANCELLED and message "Execution was
already cancelled") executes before the if current_status.is_terminal check that
raises ExecutionTerminalError(execution_id, current_status), ensuring repeated
cancellation requests return the ALREADY_CANCELLED result.
In `@backend/app/services/pod_monitor/event_mapper.py`:
- Around line 510-516: The json.loads call in _try_parse_json can raise
json.JSONDecodeError and the code assumes the parsed payload and resource_usage
are dicts; wrap json.loads(text) in a try/except catching json.JSONDecodeError
and return None on parse failure, then validate that the resulting data is a
dict and that data.get("resource_usage") is a dict before doing
ResourceUsageDomain(**...); if validation fails return None so the caller
_parse_executor_output can fall back to line-by-line parsing and not drop
subsequent valid executor output; keep PodLogs construction only when parsing
and validation succeed.
---
Nitpick comments:
In `@backend/app/api/routes/admin/events.py`:
- Around line 101-102: The route currently checks "result" and raises
HTTPException when missing; instead update the underlying service methods called
by these routes to detect repository None results, log appropriately and raise
the domain-specific NotFoundError (a subclass used by the global exception
handler), and remove the route-level HTTPException checks/raises (the routes
should no longer raise HTTPException on not-found). Specifically, replace the
pattern that returns None from the service with raising NotFoundError, and
remove the "if not result: raise HTTPException(...)" blocks (the ones
referencing result at the shown locations) so the global exception handler
converts NotFoundError to a 404.
In `@backend/app/core/middlewares/request_size_limit.py`:
- Around line 37-59: The unused boolean flag "exceeded" in the request-size
middleware is dead code; remove the "exceeded" variable and its nonlocal
declaration from the receive_wrapper closure and any assignments to it, leaving
the logic that increments bytes_received, compares to max_size, and raises
_RequestTooLarge intact (see receive_wrapper, bytes_received, max_size, and
_RequestTooLarge).
In `@backend/app/core/security.py`:
- Around line 45-51: The docstring for decode_token is missing Returns and
Raises sections; update the decode_token docstring to include a Returns section
that states it returns the username (the JWT "sub" claim) as a str and a Raises
section that documents that InvalidCredentialsError is raised on
malformed/invalid/expired tokens (unless allow_expired=True) to match
Google-style docstring conventions and clarify behavior for callers.
In `@backend/app/db/repositories/event_repository.py`:
- Around line 60-65: The mark_publish_failed method currently performs
find_one(...).update(...) and silently no-ops when no document matches; change
mark_publish_failed to capture the update result and return a boolean success
indicator: perform the find/update on EventDocument (EventDocument.event_id ==
event_id), store the result of the update operation, set publish_failed and
publish_failed_at as before, and return True if a document was matched/modified
and False otherwise; update the method signature of mark_publish_failed to
return bool and adjust any callers (e.g., the producer) to check the boolean and
log or handle the missing-event case.
In `@backend/app/dlq/manager.py`:
- Around line 258-259: The except block in manager.py that currently does
"except Exception as e: self.logger.error(f'Error retrying message {event_id}:
{e}')" should be changed to catch the specific exception types thrown by the
retry logic (e.g., the network/db/processing errors raised in the retry path)
instead of broad Exception, and log without f-string interpolation by passing
structured data and exception info to the logger (for example:
self.logger.error("Error retrying message", event_id=event_id, exc_info=e) or
self.logger.exception with event_id as a keyword). Locate the except block
around the retry routine that references event_id and replace the broad except
with specific exception classes used by the retry code and update the logger
call to pass event_id as a keyword argument and include exc_info rather than
interpolating the error into the message.
In `@backend/app/domain/enums/replay.py`:
- Around line 25-33: Add Google-style docstrings for the ReplayStatus.__new__
and ReplayStatus.is_terminal methods: for __new__, document Args (value: str,
terminal: bool), Returns (ReplayStatus) and Raises (if any; indicate None if
none), and for is_terminal, document Returns (bool) and a short description;
attach these docstrings directly above the respective definitions (__new__ and
is_terminal) using the Google style with Args/Returns/Raises sections.
In `@backend/app/events/core/producer.py`:
- Around line 46-50: The current except block may let a failure in
self._event_repository.mark_publish_failed(event_to_produce.event_id) mask the
original Kafka exception; capture the original exception (the one caught as e),
then call mark_publish_failed inside its own try/except so any error there is
logged/recorded (use self.logger.exception or record a separate metric) but does
not replace the original; finally re-raise the original Kafka exception (not the
secondary one). Reference: the except block around
self._event_metrics.record_kafka_production_error, self.logger.error("Failed to
produce message", ...), self._event_repository.mark_publish_failed, and the
re-raise.
In `@backend/app/services/execution_service.py`:
- Around line 517-521: The docstring for _publish_deletion_event is missing
Google-style sections; update its triple-quoted docstring to include an Args:
section listing parameters (e.g., execution or execution_id and any
context/kwargs), a Returns: section (even if None, state that it returns None),
and a Raises: section for any exceptions that can be propagated; keep the
existing descriptive text about using ExecutionCancelledEvent and the saga
behavior, then append the Args/Returns/Raises headings following project
guidelines so the function _publish_deletion_event conforms to the required
docstring format.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (46)
backend/app/api/routes/admin/events.pybackend/app/api/routes/admin/executions.pybackend/app/api/routes/common.pybackend/app/api/routes/sse.pybackend/app/core/container.pybackend/app/core/metrics/queue.pybackend/app/core/middlewares/__init__.pybackend/app/core/middlewares/csrf.pybackend/app/core/middlewares/rate_limit.pybackend/app/core/middlewares/request_size_limit.pybackend/app/core/middlewares/security_headers.pybackend/app/core/security.pybackend/app/core/utils.pybackend/app/db/docs/resource.pybackend/app/db/repositories/event_repository.pybackend/app/db/repositories/execution_repository.pybackend/app/db/repositories/notification_repository.pybackend/app/db/repositories/replay_repository.pybackend/app/db/repositories/saga_repository.pybackend/app/db/repositories/user_repository.pybackend/app/dlq/manager.pybackend/app/dlq/models.pybackend/app/domain/enums/__init__.pybackend/app/domain/enums/execution.pybackend/app/domain/enums/replay.pybackend/app/domain/enums/saga.pybackend/app/domain/enums/storage.pybackend/app/events/core/producer.pybackend/app/events/handlers.pybackend/app/main.pybackend/app/services/auth_service.pybackend/app/services/event_replay/replay_service.pybackend/app/services/execution_queue.pybackend/app/services/execution_service.pybackend/app/services/notification_service.pybackend/app/services/pod_monitor/event_mapper.pybackend/app/services/rate_limit_service.pybackend/app/services/saga/execution_saga.pybackend/app/services/saga/saga_orchestrator.pybackend/app/services/sse/sse_service.pybackend/app/settings.pybackend/tests/unit/services/saga/test_execution_saga_steps.pydocs/architecture/domain-exceptions.mddocs/architecture/middleware.mddocs/architecture/overview.mddocs/components/saved-scripts.md
There was a problem hiding this comment.
Pull request overview
This PR modernizes several backend subsystems (saga orchestration, queueing, stats aggregation, logging, and security middleware) and updates docs accordingly, aiming to improve reliability, observability, and operational safety across the execution platform.
Changes:
- Introduces
is_terminalon multiple lifecycle enums (execution/replay/saga/DLQ) and refactors services to rely on it. - Improves queue/orchestrator behavior and observability (metrics for lost queue event data, compensation failure reporting, structured logging).
- Adds/adjusts platform hardening and performance features (security headers middleware, token-bucket Lua implementation, MongoDB aggregation-based execution stats).
Reviewed changes
Copilot reviewed 46 out of 46 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/components/saved-scripts.md | Updates storage/indexing documentation for saved scripts. |
| docs/architecture/overview.md | Documents multi-language support in the architecture overview. |
| docs/architecture/middleware.md | Updates cache-control middleware doc for auth endpoint path. |
| docs/architecture/domain-exceptions.md | Adds AccountLockedError to documented exception mapping. |
| backend/tests/unit/services/saga/test_execution_saga_steps.py | Updates saga-step unit tests for new logger injection. |
| backend/app/settings.py | Changes default environment/log level defaults. |
| backend/app/services/sse/sse_service.py | Uses ReplayStatus.is_terminal instead of hard-coded terminal sets. |
| backend/app/services/saga/saga_orchestrator.py | Adds saga start retry logic and uses SagaState.is_terminal. |
| backend/app/services/saga/execution_saga.py | Switches saga steps to injected structured logger and propagates to compensations. |
| backend/app/services/rate_limit_service.py | Moves token bucket logic to an atomic Redis Lua script. |
| backend/app/services/pod_monitor/event_mapper.py | Replaces ast.literal_eval parsing with json.loads and adds cache eviction. |
| backend/app/services/notification_service.py | Reuses a single httpx.AsyncClient for outbound webhook/Slack delivery. |
| backend/app/services/execution_service.py | Uses ExecutionStatus.is_terminal and offloads stats to repository aggregation. |
| backend/app/services/execution_queue.py | Adds metric for lost event payloads and adjusts priority update script args. |
| backend/app/services/event_replay/replay_service.py | Uses ReplayStatus.is_terminal for cleanup conditions. |
| backend/app/services/auth_service.py | Adds deactivated-user checks, email uniqueness check, and structured logging improvements. |
| backend/app/main.py | Enables docs/openapi in dev-mode, adds security headers middleware, expands CORS methods/headers exposure. |
| backend/app/events/handlers.py | Adds idempotency wrapper to saga subscriber consumption. |
| backend/app/events/core/producer.py | Marks persisted events as failed-to-publish on Kafka publish failure. |
| backend/app/domain/enums/storage.py | Introduces AllocationStatus enum. |
| backend/app/domain/enums/saga.py | Adds terminal flag + is_terminal and exports active/terminal sets. |
| backend/app/domain/enums/replay.py | Adds terminal flag + is_terminal and exports terminal set. |
| backend/app/domain/enums/execution.py | Adds terminal flag + is_terminal and exports active/terminal sets. |
| backend/app/domain/enums/init.py | Re-exports new enum constants/sets and AllocationStatus. |
| backend/app/dlq/models.py | Adds terminal flag + is_terminal and exports terminal set for DLQ status. |
| backend/app/dlq/manager.py | Uses DLQMessageStatus.is_terminal for manual retry/discard checks. |
| backend/app/db/repositories/user_repository.py | Adds get_user_by_email lookup. |
| backend/app/db/repositories/saga_repository.py | Uses SAGA_ACTIVE and optimizes user execution-id lookup via distinct. |
| backend/app/db/repositories/replay_repository.py | Uses REPLAY_TERMINAL in old session deletion. |
| backend/app/db/repositories/notification_repository.py | Makes claiming pending notifications atomic and optimizes subscription retrieval. |
| backend/app/db/repositories/execution_repository.py | Makes terminal writes guarded by non-terminal filter; adds Mongo aggregation stats. |
| backend/app/db/repositories/event_repository.py | Adds mark_publish_failed and wraps archival delete in a transaction. |
| backend/app/db/docs/resource.py | Switches allocation status default to AllocationStatus.ACTIVE. |
| backend/app/core/utils.py | Hardens client IP extraction by trusting proxy headers only from “trusted” direct IPs. |
| backend/app/core/security.py | Adds allow_expired option to token decoding (logout path). |
| backend/app/core/middlewares/security_headers.py | Adds new middleware injecting standard security response headers. |
| backend/app/core/middlewares/request_size_limit.py | Enforces request body size for chunked requests by wrapping ASGI receive. |
| backend/app/core/middlewares/rate_limit.py | Changes user-id extraction strategy for rate limiting and adds rate limit headers. |
| backend/app/core/middlewares/csrf.py | Updates CSRF middleware docstring re: exempt auth endpoints. |
| backend/app/core/middlewares/init.py | Exports SecurityHeadersMiddleware. |
| backend/app/core/metrics/queue.py | Adds a counter for lost queue event payload data. |
| backend/app/core/container.py | Updates container initialization note in docstring. |
| backend/app/api/routes/sse.py | Reuses shared SSEResponse response_class for OpenAPI correctness. |
| backend/app/api/routes/common.py | Introduces shared SSEResponse workaround class. |
| backend/app/api/routes/admin/executions.py | Enforces admin dependency at the router level. |
| backend/app/api/routes/admin/events.py | Reuses shared SSEResponse response_class for SSE endpoint. |
Comments suppressed due to low confidence (2)
backend/app/services/execution_service.py:225
cancel_execution()checkscurrent_status.is_terminalbefore checking forExecutionStatus.CANCELLED. SinceCANCELLEDis now marked terminal, theALREADY_CANCELLEDbranch is unreachable and an idempotent cancel will raiseExecutionTerminalError. HandleCANCELLEDfirst (returnALREADY_CANCELLED), then reject other terminal states.
backend/app/services/pod_monitor/event_mapper.py:516_try_parse_json()callsjson.loads()without handlingJSONDecodeError. If a line starts/ends with braces but isn’t valid JSON, this raises and aborts parsing (so the line-by-line fallback never runs). Catch decode errors in_try_parse_jsonand returnNoneso parsing can continue on other lines.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ON. Also simplified logging in event mapper
There was a problem hiding this comment.
1 issue found across 4 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/app/scripts/entrypoint.sh">
<violation number="1" location="backend/app/scripts/entrypoint.sh:79">
P2: Exit status from the executed command is ignored; the script always exits 0, masking failures. Propagate the captured EXIT_CODE instead.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/tests/unit/services/pod_monitor/test_event_mapper.py (1)
317-346:⚠️ Potential issue | 🟠 MajorConvert new sync tests to
async defto satisfy backend test policy.At Line 317, Line 378, and Line 396, newly added tests are synchronous
def. Underbackend/tests/**, tests are required to be async.Proposed fix
-def test_analyze_failure_variants() -> None: +@pytest.mark.asyncio +async def test_analyze_failure_variants() -> None: @@ -def test_parse_termination_message() -> None: +@pytest.mark.asyncio +async def test_parse_termination_message() -> None: @@ -def test_parse_framed_output() -> None: +@pytest.mark.asyncio +async def test_parse_framed_output() -> None:As per coding guidelines, "All backend test functions must be async (async def); use
@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e,@pytest.mark.kafka, or@pytest.mark.k8smarkers".Also applies to: 378-413
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/tests/unit/services/pod_monitor/test_event_mapper.py` around lines 317 - 346, The test function test_analyze_failure_variants is declared as a synchronous def but backend tests must be async; change its signature to async def test_analyze_failure_variants(...), add the appropriate pytest marker (e.g. `@pytest.mark.unit`) above the function, and ensure any setup/teardown or calls that require awaiting (e.g. PodEventMapper construction or helper coroutines) are awaited where needed; apply the same change (convert def → async def and add `@pytest.mark.unit`) to the other newly added test functions referenced in this file so all backend tests follow the async test policy.
🧹 Nitpick comments (2)
backend/app/services/pod_monitor/event_mapper.py (1)
498-504: Use Google-style docstrings for new parser helpers.The new helper docstrings at Line 498–Line 504 are concise but do not follow the required Args/Returns/Raises format.
As per coding guidelines, "Use Google-style docstrings with Args/Returns/Raises sections for all functions and classes".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/pod_monitor/event_mapper.py` around lines 498 - 504, Update the docstrings for the new parser helpers to Google-style: replace the one-line descriptions in _parse_termination_message and _parse_framed_output with Google-style docstrings that include an Args section (documenting parameter name and type: raw: str / logs: str), a Returns section (documenting the returned dict[str, str] for _parse_termination_message and tuple[str, str] for _parse_framed_output), and a Raises section for any exceptions the functions can raise (e.g., ValueError or other parsing errors when input is malformed); keep the brief description line at the top and ensure types and semantics match the function behavior.backend/tests/unit/services/pod_monitor/test_event_mapper.py (1)
378-390: Add a regression case for invalid numeric termination values.Current parser tests cover structure, but not bad numeric values that later feed mapper casts (
cpu_jiffies=abc,wall_seconds=nan-text). Add a case to ensure extraction/mapping falls back safely instead of raising.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/tests/unit/services/pod_monitor/test_event_mapper.py` around lines 378 - 390, Add a regression case to test_parse_termination_message to ensure PodEventMapper._parse_termination_message handles invalid numeric values safely: augment the test to pass inputs like "cpu_jiffies=abc\nwall_seconds=nan-text\n" and assert the parser still returns a dict with those keys mapped to the raw string values (e.g., {"cpu_jiffies":"abc","wall_seconds":"nan-text"}) so downstream mapper casting can fall back instead of raising; this ensures the parser only splits key=value and does not attempt numeric conversion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/scripts/entrypoint.sh`:
- Line 79: The script incorrectly discards the captured user command exit code
(EXIT_CODE) by calling a hardcoded exit 0; update the script in entrypoint.sh to
exit with the previously saved EXIT_CODE (use exit $EXIT_CODE or exec to
propagate) so the container returns the actual command status; ensure the
variable name EXIT_CODE (set earlier as EXIT_CODE=$?) is used and preserved
before any cleanup or traps so the final exit uses that value.
In `@backend/app/services/pod_monitor/event_mapper.py`:
- Around line 476-478: In _extract_logs, replace the broad "except Exception:"
with targeted exception handling: explicitly re-raise cancellation-related
exceptions (e.g., asyncio.CancelledError, KeyboardInterrupt) to avoid swallowing
cancellation, catch only I/O/network or client-specific errors (e.g., IOError,
OSError, requests.exceptions.RequestException or
kubernetes.client.exceptions.ApiException as appropriate) and when catching
them, raise a new SpecificError(...) from the original exception to preserve the
chain; keep the logger.warning call but move it into the specific except blocks
so you log the pod name and exc_info while still raising a wrapped SpecificError
from the caught exception.
- Around line 490-493: Replace the direct casts for execution_time_wall_seconds,
cpu_time_jiffies, clk_tck_hertz, and peak_memory_kb with guarded parsing: wrap
each float(...) / int(...) conversion in a small try/except (or use helper
functions like safe_int/safe_float) that returns a sensible default on
ValueError/TypeError and logs or ignores the malformed value; update the code in
the event mapping function that sets execution_time_wall_seconds,
cpu_time_jiffies, clk_tck_hertz, and peak_memory_kb (same block shown) to use
these safe parsers so malformed termination metadata cannot raise and crash the
mapper.
---
Outside diff comments:
In `@backend/tests/unit/services/pod_monitor/test_event_mapper.py`:
- Around line 317-346: The test function test_analyze_failure_variants is
declared as a synchronous def but backend tests must be async; change its
signature to async def test_analyze_failure_variants(...), add the appropriate
pytest marker (e.g. `@pytest.mark.unit`) above the function, and ensure any
setup/teardown or calls that require awaiting (e.g. PodEventMapper construction
or helper coroutines) are awaited where needed; apply the same change (convert
def → async def and add `@pytest.mark.unit`) to the other newly added test
functions referenced in this file so all backend tests follow the async test
policy.
---
Nitpick comments:
In `@backend/app/services/pod_monitor/event_mapper.py`:
- Around line 498-504: Update the docstrings for the new parser helpers to
Google-style: replace the one-line descriptions in _parse_termination_message
and _parse_framed_output with Google-style docstrings that include an Args
section (documenting parameter name and type: raw: str / logs: str), a Returns
section (documenting the returned dict[str, str] for _parse_termination_message
and tuple[str, str] for _parse_framed_output), and a Raises section for any
exceptions the functions can raise (e.g., ValueError or other parsing errors
when input is malformed); keep the brief description line at the top and ensure
types and semantics match the function behavior.
In `@backend/tests/unit/services/pod_monitor/test_event_mapper.py`:
- Around line 378-390: Add a regression case to test_parse_termination_message
to ensure PodEventMapper._parse_termination_message handles invalid numeric
values safely: augment the test to pass inputs like
"cpu_jiffies=abc\nwall_seconds=nan-text\n" and assert the parser still returns a
dict with those keys mapped to the raw string values (e.g.,
{"cpu_jiffies":"abc","wall_seconds":"nan-text"}) so downstream mapper casting
can fall back instead of raising; this ensures the parser only splits key=value
and does not attempt numeric conversion.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
backend/app/scripts/entrypoint.shbackend/app/services/k8s_worker/pod_builder.pybackend/app/services/pod_monitor/event_mapper.pybackend/tests/unit/services/pod_monitor/test_event_mapper.py
There was a problem hiding this comment.
3 issues found across 7 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/app/db/repositories/event_repository.py">
<violation number="1" location="backend/app/db/repositories/event_repository.py:268">
P1: `session.start_transaction()` is a synchronous context manager in pymongo, not an awaitable/async context manager. `async with await` will raise at runtime. Use a normal `with` block (and let the async function await inside it) for pymongo sessions.</violation>
</file>
<file name="backend/app/db/repositories/execution_repository.py">
<violation number="1" location="backend/app/db/repositories/execution_repository.py:147">
P1: `collection.aggregate()` already returns an async cursor; awaiting it will raise a TypeError. Call `aggregate()` directly and only await `cursor.to_list(...)`.</violation>
</file>
<file name="backend/app/db/repositories/saga_repository.py">
<violation number="1" location="backend/app/db/repositories/saga_repository.py:170">
P2: Using a synchronous PyMongo collection in an async method means `distinct` is not awaitable and can block the event loop. Use the Motor collection (or otherwise run the sync call off-thread) to keep this async-safe.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
backend/app/scripts/entrypoint.sh (1)
10-14: Minor: Character count vs byte count inconsistency.Line 13 uses
${#ERR_MSG}which returns the character count, while the main path (lines 72-73) useswc -cfor byte count. For the current ASCII error message this works identically, but for consistency and robustness against future multi-byte UTF-8 messages, consider using the same approach:♻️ Suggested fix for consistency
if [ "$#" -eq 0 ]; then printf 'cpu_jiffies=0\nclk_tck=100\npeak_memory_kb=0\nwall_seconds=0\n' > /dev/termination-log ERR_MSG="Entrypoint Error: No command provided." - printf 'STDOUT 0\nSTDERR %d\n%s' "${`#ERR_MSG`}" "$ERR_MSG" + printf 'STDOUT 0\nSTDERR %d\n%s' "$(printf '%s' "$ERR_MSG" | wc -c)" "$ERR_MSG" exit 127 fi🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/scripts/entrypoint.sh` around lines 10 - 14, The current branch uses `${`#ERR_MSG`}` (character count) when printing the STDERR length for ERR_MSG; change it to compute the byte count like the other path by piping the exact message through printf '%s' and running wc -c (e.g., assign BYTE_LEN="$(printf '%s' "$ERR_MSG" | wc -c)" ) and then use that BYTE_LEN in the printf call so multi-byte UTF-8 messages are counted consistently; update references around ERR_MSG and the printf that prints STDERR length to use the new byte-length variable.backend/tests/e2e/services/rate_limit/test_rate_limit_service.py (1)
233-235: Remove unnecessaryorfallback; passrule.burst_multiplierdirectly to match production code.On Line 234, the test uses
rule.burst_multiplier or 1.0but production code calls_check_token_bucket(..., rule.burst_multiplier, rule)without any fallback. Sinceburst_multiplieris a required float field with default value 1.5 (not Optional), the fallback is redundant. Remove it to maintain consistency with the production call path.Suggested patch
status = await svc._check_token_bucket( - "user", "/api", int(rule.requests), rule.window_seconds, rule.burst_multiplier or 1.0, rule + "user", "/api", int(rule.requests), rule.window_seconds, rule.burst_multiplier, rule )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/tests/e2e/services/rate_limit/test_rate_limit_service.py` around lines 233 - 235, The test calls svc._check_token_bucket with a redundant fallback for burst_multiplier; remove the "or 1.0" and pass rule.burst_multiplier directly so the call matches production behavior. Update the invocation of _check_token_bucket in the test (the call that currently uses rule.burst_multiplier or 1.0) to pass rule.burst_multiplier as-is, since burst_multiplier is a required float with a default of 1.5.backend/app/db/repositories/event_repository.py (1)
60-65: Missing error handling for non-existent event.The method silently does nothing if the event doesn't exist. While this may be intentional, it could mask issues where
mark_publish_failedis called with an invalidevent_id. Consider logging when no document is updated.♻️ Proposed enhancement
async def mark_publish_failed(self, event_id: str) -> None: """Mark an event as failed to publish to Kafka for later retry.""" - await EventDocument.find_one( + result = await EventDocument.find_one( EventDocument.event_id == event_id, ).update({"$set": {"publish_failed": True, "publish_failed_at": datetime.now(timezone.utc)}}) + if not result or getattr(result, "modified_count", 0) == 0: + self.logger.warning("Event not found for marking publish failed", event_id=event_id)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/db/repositories/event_repository.py` around lines 60 - 65, The mark_publish_failed method currently performs an update silently even if no document exists; change it to capture the update result from EventDocument.find_one(...).update, check whether any document was matched/modified, and if none, log a warning including the event_id so callers know the id was invalid. Update the method (mark_publish_failed) to use the update result returned by EventDocument.find_one(...).update and call the repository logger (or logging.getLogger(...)) to emit a warning with the event_id when the update did not affect any document.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/db/repositories/event_repository.py`:
- Around line 267-271: The code calls start_session() without awaiting it;
change the session handling so you await
EventDocument.get_pymongo_collection().database.client.start_session() and use
it as an async context manager, e.g. async with await
EventDocument.get_pymongo_collection().database.client.start_session() as
session: then use async with session.start_transaction(): and inside that block
call await archived_doc.insert(session=session) and await
doc.delete(session=session) to ensure proper async session creation, transaction
scoping and cleanup.
In `@backend/app/db/repositories/execution_repository.py`:
- Around line 126-127: The aggregation pipeline in execution_repository uses the
hardcoded string "completed" (in the "$cond" / "$eq" check that feeds the
"successful" sum); replace that literal with the enum value from ExecutionStatus
(e.g. ExecutionStatus.COMPLETED.value or the appropriate attribute used across
the repo) and update the other occurrence noted nearby; ensure ExecutionStatus
is imported into execution_repository and use the enum consistently so the
aggregation compares "$status" to the canonical enum value rather than a fragile
string.
---
Nitpick comments:
In `@backend/app/db/repositories/event_repository.py`:
- Around line 60-65: The mark_publish_failed method currently performs an update
silently even if no document exists; change it to capture the update result from
EventDocument.find_one(...).update, check whether any document was
matched/modified, and if none, log a warning including the event_id so callers
know the id was invalid. Update the method (mark_publish_failed) to use the
update result returned by EventDocument.find_one(...).update and call the
repository logger (or logging.getLogger(...)) to emit a warning with the
event_id when the update did not affect any document.
In `@backend/app/scripts/entrypoint.sh`:
- Around line 10-14: The current branch uses `${`#ERR_MSG`}` (character count)
when printing the STDERR length for ERR_MSG; change it to compute the byte count
like the other path by piping the exact message through printf '%s' and running
wc -c (e.g., assign BYTE_LEN="$(printf '%s' "$ERR_MSG" | wc -c)" ) and then use
that BYTE_LEN in the printf call so multi-byte UTF-8 messages are counted
consistently; update references around ERR_MSG and the printf that prints STDERR
length to use the new byte-length variable.
In `@backend/tests/e2e/services/rate_limit/test_rate_limit_service.py`:
- Around line 233-235: The test calls svc._check_token_bucket with a redundant
fallback for burst_multiplier; remove the "or 1.0" and pass
rule.burst_multiplier directly so the call matches production behavior. Update
the invocation of _check_token_bucket in the test (the call that currently uses
rule.burst_multiplier or 1.0) to pass rule.burst_multiplier as-is, since
burst_multiplier is a required float with a default of 1.5.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
backend/app/db/repositories/event_repository.pybackend/app/db/repositories/execution_repository.pybackend/app/db/repositories/saga_repository.pybackend/app/scripts/entrypoint.shbackend/app/services/rate_limit_service.pybackend/tests/e2e/services/rate_limit/test_rate_limit_service.pybackend/tests/e2e/test_auth_routes.py
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/services/rate_limit_service.py
There was a problem hiding this comment.
3 issues found across 4 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/app/scripts/entrypoint.sh">
<violation number="1" location="backend/app/scripts/entrypoint.sh:79">
P1: The entrypoint now always exits with 0, so command failures won’t propagate to the container exit status. This masks errors for orchestrators or callers that rely on the process exit code.</violation>
</file>
<file name="backend/app/db/repositories/notification_repository.py">
<violation number="1" location="backend/app/db/repositories/notification_repository.py:58">
P2: This filter now marks *all* non-READ notifications as READ, which will overwrite delivery states (pending/queued/sending/failed/skipped) and CLICKED. “Mark all as read” should typically only affect delivered-but-unread notifications. Consider restricting the update to DELIVERED (or an explicit unread set) to avoid losing state.</violation>
</file>
<file name="backend/app/services/pod_monitor/event_mapper.py">
<violation number="1" location="backend/app/services/pod_monitor/event_mapper.py:486">
P2: Guard against non-numeric termination metadata before casting to int to avoid ValueError and losing log parsing.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
backend/app/scripts/entrypoint.sh (1)
79-79:⚠️ Potential issue | 🔴 CriticalPropagate the captured command exit code instead of always returning success.
At Line 79,
exit 0discardsEXIT_CODEcaptured at Line 59. This makes failed user commands look successful to Kubernetes/orchestrators.🐛 Proposed fix
-exit 0 +exit "$EXIT_CODE"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/scripts/entrypoint.sh` at line 79, The script currently discards the user command result by unconditionally calling exit 0; instead propagate the previously captured EXIT_CODE (the variable set when running the user command) as the script's exit status so orchestrators see failures; replace the unconditional exit with an exit that uses EXIT_CODE and ensure EXIT_CODE is defined (fallback to a non-zero/zero default only if intentionally desired).backend/app/services/pod_monitor/event_mapper.py (2)
486-493:⚠️ Potential issue | 🟠 MajorGuard numeric parsing of termination metadata.
At Line 486 and Line 490–Line 493, direct
int(...)/float(...)casts can raise on malformed termination message values and abort event mapping.Proposed fix
return PodLogs( - exit_code=int(meta.get("exit_code", str(terminated.exit_code or 0))), + exit_code=self._safe_int(meta.get("exit_code"), terminated.exit_code or 0), stdout=stdout, stderr=stderr, resource_usage=ResourceUsageDomain( - execution_time_wall_seconds=float(meta.get("wall_seconds", "0")), - cpu_time_jiffies=int(meta.get("cpu_jiffies", "0")), - clk_tck_hertz=int(meta.get("clk_tck", "100")), - peak_memory_kb=int(meta.get("peak_memory_kb", "0")), + execution_time_wall_seconds=self._safe_float(meta.get("wall_seconds"), 0.0), + cpu_time_jiffies=self._safe_int(meta.get("cpu_jiffies"), 0), + clk_tck_hertz=self._safe_int(meta.get("clk_tck"), 100), + peak_memory_kb=self._safe_int(meta.get("peak_memory_kb"), 0), ), ) + + `@staticmethod` + def _safe_int(raw: str | None, default: int) -> int: + try: + return int(raw) if raw is not None else default + except (TypeError, ValueError): + return default + + `@staticmethod` + def _safe_float(raw: str | None, default: float) -> float: + try: + return float(raw) if raw is not None else default + except (TypeError, ValueError): + return default🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/pod_monitor/event_mapper.py` around lines 486 - 493, The termination metadata parsing currently does blind int()/float() casts (for exit_code and fields inside ResourceUsageDomain) which can raise on malformed values; update the mapping in event_mapper.py to parse using guarded conversions: read values from meta (and terminated.exit_code) into locals, attempt int() or float() inside try/except (or use small helper functions like safe_int/safe_float) and fall back to sensible defaults (0 or 0.0) when parsing fails or values are missing, then pass those sanitized values into exit_code and ResourceUsageDomain (refer to the exit_code assignment, the meta dict, and the ResourceUsageDomain constructor to locate the spots to change).
470-478:⚠️ Potential issue | 🟠 MajorNarrow
_extract_logsexception handling.At Line 476,
except Exceptionis still too broad and can hide non-I/O defects. Catch concrete Kubernetes/log-read exceptions only.Proposed fix
- except Exception: + except (k8s_client.exceptions.ApiException, OSError): self.logger.warning("Failed to fetch pod logs", pod_name=pod.metadata.name, exc_info=True) return NoneBased on learnings: "Raise SpecificError(...) from original_exc to preserve exception chains; never catch Exception broadly—catch specific exception types".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/pod_monitor/event_mapper.py` around lines 470 - 478, Replace the broad "except Exception" in the pod log read block inside _extract_logs with concrete exception handling: import and catch kubernetes.client.rest.ApiException (or kubernetes.client.exceptions.ApiException) as e and any network-related exceptions you expect (e.g., requests.exceptions.RequestException or urllib3.exceptions.HTTPError) and log those with self.logger.warning(..., exc_info=True, error=e); for all other unexpected exceptions re-raise them (or let them bubble) so non-I/O defects aren't swallowed. Ensure the handler references self._k8s_api.read_namespaced_pod_log and pod.metadata.name/namespace and preserve exception chains if you wrap errors.
🧹 Nitpick comments (1)
backend/tests/unit/services/pod_monitor/test_event_mapper.py (1)
311-314: Use a Kubernetes-specific exception in the log-read error-path test.At Line 313,
Exception("boom")is too generic for this boundary and makes the test less representative of actual client failure modes.Proposed fix
- mock_err.read_namespaced_pod_log = AsyncMock(side_effect=Exception("boom")) + mock_err.read_namespaced_pod_log = AsyncMock( + side_effect=k8s_client.exceptions.ApiException(status=500, reason="boom") + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/tests/unit/services/pod_monitor/test_event_mapper.py` around lines 311 - 314, Replace the generic Exception used as the AsyncMock side_effect with a Kubernetes client API exception to better simulate the real failure mode: change the side_effect on mock_err.read_namespaced_pod_log used in the PodEventMapper test (the call to PodEventMapper(k8s_api=mock_err, logger=_test_logger)._extract_logs) to raise kubernetes.client.exceptions.ApiException (or the appropriate ApiException import) instead of Exception("boom") so the test exercises the API-specific error path.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/services/pod_monitor/event_mapper.py`:
- Around line 480-481: Remove the early exit that treats an empty logs payload
as an extraction failure: delete or change the "if not logs: return None" guard
so that empty logs are allowed to proceed through the mapping logic in
event_mapper.py; instead normalize an empty logs value (e.g., to an empty string
or empty list) and let the existing mapping flow construct and return the pod
state/timeout/completed mapping as normal (ensure any downstream usage of the
variable `logs` in the surrounding function handles the empty value).
In `@backend/tests/unit/services/pod_monitor/test_event_mapper.py`:
- Around line 317-318: Convert the synchronous test functions to async by
changing "def test_analyze_failure_variants()" (and the other two test functions
in this file starting at the later occurrences) to "async def", and add the
appropriate pytest marker (e.g., `@pytest.mark.unit`) above each test; update any
internal calls to await async helpers if needed (replace direct calls with
"await" for coroutines used inside these tests) and keep the existing function
names (test_analyze_failure_variants and the two other test function names
present around lines 378 and 398) so test discovery remains unchanged.
---
Duplicate comments:
In `@backend/app/scripts/entrypoint.sh`:
- Line 79: The script currently discards the user command result by
unconditionally calling exit 0; instead propagate the previously captured
EXIT_CODE (the variable set when running the user command) as the script's exit
status so orchestrators see failures; replace the unconditional exit with an
exit that uses EXIT_CODE and ensure EXIT_CODE is defined (fallback to a
non-zero/zero default only if intentionally desired).
In `@backend/app/services/pod_monitor/event_mapper.py`:
- Around line 486-493: The termination metadata parsing currently does blind
int()/float() casts (for exit_code and fields inside ResourceUsageDomain) which
can raise on malformed values; update the mapping in event_mapper.py to parse
using guarded conversions: read values from meta (and terminated.exit_code) into
locals, attempt int() or float() inside try/except (or use small helper
functions like safe_int/safe_float) and fall back to sensible defaults (0 or
0.0) when parsing fails or values are missing, then pass those sanitized values
into exit_code and ResourceUsageDomain (refer to the exit_code assignment, the
meta dict, and the ResourceUsageDomain constructor to locate the spots to
change).
- Around line 470-478: Replace the broad "except Exception" in the pod log read
block inside _extract_logs with concrete exception handling: import and catch
kubernetes.client.rest.ApiException (or
kubernetes.client.exceptions.ApiException) as e and any network-related
exceptions you expect (e.g., requests.exceptions.RequestException or
urllib3.exceptions.HTTPError) and log those with self.logger.warning(...,
exc_info=True, error=e); for all other unexpected exceptions re-raise them (or
let them bubble) so non-I/O defects aren't swallowed. Ensure the handler
references self._k8s_api.read_namespaced_pod_log and pod.metadata.name/namespace
and preserve exception chains if you wrap errors.
---
Nitpick comments:
In `@backend/tests/unit/services/pod_monitor/test_event_mapper.py`:
- Around line 311-314: Replace the generic Exception used as the AsyncMock
side_effect with a Kubernetes client API exception to better simulate the real
failure mode: change the side_effect on mock_err.read_namespaced_pod_log used in
the PodEventMapper test (the call to PodEventMapper(k8s_api=mock_err,
logger=_test_logger)._extract_logs) to raise
kubernetes.client.exceptions.ApiException (or the appropriate ApiException
import) instead of Exception("boom") so the test exercises the API-specific
error path.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
backend/app/db/repositories/notification_repository.pybackend/app/scripts/entrypoint.shbackend/app/services/pod_monitor/event_mapper.pybackend/tests/unit/services/pod_monitor/test_event_mapper.py
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/db/repositories/notification_repository.py
There was a problem hiding this comment.
3 issues found across 17 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/app/services/notification_service.py">
<violation number="1" location="backend/app/services/notification_service.py:365">
P2: Creating a new `httpx.AsyncClient` per webhook send defeats connection pooling and increases latency/CPU. HTTPX docs recommend reusing a single client rather than instantiating one in a hot loop. Consider restoring a shared client on the service and reusing it across webhook sends.</violation>
</file>
<file name="backend/app/services/execution_queue.py">
<violation number="1" location="backend/app/services/execution_queue.py:160">
P2: Setting the TTL in a separate command is non-atomic; a failure between INCR and EXPIRE can leave retry keys without expiration. Use a transaction/pipeline so increment and TTL are applied together.</violation>
</file>
<file name="backend/app/db/repositories/event_repository.py">
<violation number="1" location="backend/app/db/repositories/event_repository.py:267">
P2: Motor’s async client requires awaiting start_session before using it in an async context manager; without `await`, this will raise when entering the context or skip session initialization.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (3)
backend/app/db/repositories/event_repository.py (1)
267-270:⚠️ Potential issue | 🔴 CriticalFix async session/transaction API usage (current form is runtime-unsafe).
At Line 267/268, the await/context-manager usage is inverted. This can break transaction flow at runtime (
start_session()andstart_transaction()have different async semantics).🐛 Proposed fix
- async with EventDocument.get_pymongo_collection().database.client.start_session() as session: - async with await session.start_transaction(): + async with await EventDocument.get_pymongo_collection().database.client.start_session() as session: + async with session.start_transaction(): await archived_doc.insert(session=session) await doc.delete(session=session)In PyMongo (async client) and Beanie 2.x, what is the correct pattern for using start_session() and start_transaction() with async with?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/db/repositories/event_repository.py` around lines 267 - 270, The current code misuses await with the async context managers: call start_session() directly in the async with (no await) and call session.start_transaction() (no await) inside the nested async with; i.e. replace the inverted pattern around EventDocument.get_pymongo_collection().database.client.start_session() and session.start_transaction() so the block becomes "async with EventDocument.get_pymongo_collection().database.client.start_session() as session: async with session.start_transaction(): await archived_doc.insert(session=session); await doc.delete(session=session)". Ensure you remove the extra await before start_transaction and do not await start_session.backend/app/services/pod_monitor/event_mapper.py (2)
483-490:⚠️ Potential issue | 🟠 MajorGuard numeric parsing from termination metadata to prevent mapper crashes.
At Lines 483-490, direct
int(...)/float(...)casts can raiseValueError/TypeErroron malformed termination messages and abort event mapping.Proposed fix
return PodLogs( - exit_code=int(meta.get("exit_code", str(terminated.exit_code or 0))), + exit_code=self._safe_int(meta.get("exit_code"), terminated.exit_code or 0), stdout=stdout, stderr=stderr, resource_usage=ResourceUsageDomain( - execution_time_wall_seconds=float(meta.get("wall_seconds", "0")), - cpu_time_jiffies=int(meta.get("cpu_jiffies", "0")), - clk_tck_hertz=int(meta.get("clk_tck", "100")), - peak_memory_kb=int(meta.get("peak_memory_kb", "0")), + execution_time_wall_seconds=self._safe_float(meta.get("wall_seconds"), 0.0), + cpu_time_jiffies=self._safe_int(meta.get("cpu_jiffies"), 0), + clk_tck_hertz=self._safe_int(meta.get("clk_tck"), 100), + peak_memory_kb=self._safe_int(meta.get("peak_memory_kb"), 0), ), ) + + `@staticmethod` + def _safe_int(raw: str | None, default: int) -> int: + try: + return int(raw) if raw is not None else default + except (TypeError, ValueError): + return default + + `@staticmethod` + def _safe_float(raw: str | None, default: float) -> float: + try: + return float(raw) if raw is not None else default + except (TypeError, ValueError): + return default🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/pod_monitor/event_mapper.py` around lines 483 - 490, The numeric casts from termination metadata in event_mapper.py (fields like exit_code, ResourceUsageDomain.execution_time_wall_seconds, cpu_time_jiffies, clk_tck_hertz, peak_memory_kb) are unguarded and can raise ValueError/TypeError; wrap these conversions in safe parsing helpers (e.g., safe_int(value, default) and safe_float(value, default)) or try/except blocks that return sensible defaults when parsing fails, and use those helpers where exit_code=int(meta.get(...)) and the ResourceUsageDomain numeric fields are set so malformed or missing meta won't crash the mapper (update the mapping logic that references terminated.exit_code and meta.get(...) accordingly).
476-478:⚠️ Potential issue | 🟠 MajorNarrow
_extract_logsexception handling instead of catching everything.At Line 476,
except Exceptionis too broad for an I/O boundary and can hide non-recoverable failures. Catch specific exceptions from pod-log reads (for example, Kubernetes API and I/O errors), keep the warning, and let unexpected exceptions bubble.As per coding guidelines, "Raise SpecificError(...) from original_exc to preserve exception chains; never catch Exception broadly—catch specific exception types".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/pod_monitor/event_mapper.py` around lines 476 - 478, In _extract_logs, replace the broad "except Exception" with handlers for the specific pod-log/read failures (e.g., kubernetes.client.exceptions.ApiException, urllib3.exceptions.HTTPError, and standard I/O errors like OSError/IOError) so only known recoverable pod-read errors are caught and logged with self.logger.warning(..., exc_info=True) and return None; any other unexpected exceptions should not be swallowed—let them propagate (or re-raise) to preserve tracebacks, and when you wrap or transform exceptions follow the pattern raise SpecificError("...") from original_exc to preserve the exception chain.
🧹 Nitpick comments (8)
backend/tests/unit/services/test_execution_queue.py (2)
174-183: Consider verifying the TTL argument in expire call.The test verifies
expirewas called but doesn't assert the TTL value. Since the TTL alignment with_EVENT_TTLis important for consistency:♻️ Optional: Verify TTL argument
+from app.services.execution_queue import _EVENT_TTL + `@pytest.mark.asyncio` async def test_increment_retry_count(queue_service: ExecutionQueueService, mock_redis: AsyncMock) -> None: mock_redis.incr = AsyncMock(return_value=1) mock_redis.expire = AsyncMock(return_value=True) count = await queue_service.increment_retry_count("e1") assert count == 1 mock_redis.incr.assert_awaited_once_with("exec_queue:retries:e1") - mock_redis.expire.assert_awaited_once() + mock_redis.expire.assert_awaited_once_with("exec_queue:retries:e1", _EVENT_TTL)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/tests/unit/services/test_execution_queue.py` around lines 174 - 183, The test test_increment_retry_count should assert the TTL passed to mock_redis.expire to ensure it matches the service constant; update the assertion to check mock_redis.expire was awaited with the retry key and the expected TTL (use the same _EVENT_TTL used by ExecutionQueueService or import that constant) so replace the generic expire.assert_awaited_once() with expire.assert_awaited_once_with("exec_queue:retries:e1", _EVENT_TTL) while keeping the rest of the test and references to increment_retry_count unchanged.
91-103: Consider asserting that the data-loss metric was recorded.The test verifies
sremcleanup but doesn't verify thatrecord_event_data_lost()was called. Since observability is a key aspect of this change, consider adding metric verification:♻️ Add metric assertion
`@pytest.mark.asyncio` -async def test_try_schedule_event_data_expired(queue_service: ExecutionQueueService, mock_redis: AsyncMock) -> None: +async def test_try_schedule_event_data_expired( + queue_service: ExecutionQueueService, mock_redis: AsyncMock, test_settings: Settings +) -> None: """When the event JSON has expired from Redis, try_schedule cleans the active set and returns None.""" + from unittest.mock import patch + script = AsyncMock(return_value=[b"e-gone", str(time.time()).encode(), 2]) mock_redis.register_script = MagicMock(return_value=script) mock_redis.get = AsyncMock(return_value=None) queue_service._schedule_script = None - result = await queue_service.try_schedule(5) + with patch.object(queue_service._metrics, "record_event_data_lost") as mock_metric: + result = await queue_service.try_schedule(5) - assert result is None - mock_redis.srem.assert_awaited_once_with(_ACTIVE_KEY, "e-gone") + assert result is None + mock_redis.srem.assert_awaited_once_with(_ACTIVE_KEY, "e-gone") + mock_metric.assert_called_once()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/tests/unit/services/test_execution_queue.py` around lines 91 - 103, The test currently checks srem cleanup but doesn't verify the observability metric; update test_try_schedule_event_data_expired to assert that the event-data-loss metric was recorded by mocking or spying on the metric method (e.g., set queue_service.record_event_data_lost or queue_service.metrics.record_event_data_lost to a MagicMock/AsyncMock before calling queue_service.try_schedule) and then assert it was called once after await queue_service.try_schedule(5); reference symbols: test_try_schedule_event_data_expired, ExecutionQueueService.try_schedule, and record_event_data_lost (or queue_service.metrics.record_event_data_lost).backend/app/services/execution_queue.py (1)
157-162: Minor: Consider atomic INCR + EXPIRE with pipeline or INCREX.The current implementation uses two separate Redis calls. If the process crashes between
incrandexpire, the key may persist without TTL. For robustness, consider using a pipeline:♻️ Optional: Use pipeline for atomicity
async def increment_retry_count(self, execution_id: str) -> int: """Atomically increment and return the retry count for an execution.""" key = _retry_key(execution_id) - count: int = await self._redis.incr(key) - await self._redis.expire(key, _EVENT_TTL) + pipe = self._redis.pipeline(transaction=True) + pipe.incr(key) + pipe.expire(key, _EVENT_TTL) + results = await pipe.execute() + count: int = results[0] return count🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/execution_queue.py` around lines 157 - 162, The current increment_retry_count uses two separate Redis calls (self._redis.incr and self._redis.expire) which can leave the key without TTL if the process crashes between calls; change increment_retry_count to run the INCR and EXPIRE in a single Redis pipeline/transaction (use self._redis.pipeline()/multi_exec, call incr(key) and expire(key, _EVENT_TTL) on the pipe, await pipe.execute(), then extract and return the increment result as an int) so both operations are applied atomically for the key produced by _retry_key(execution_id).backend/app/db/repositories/event_repository.py (1)
60-62: Use Google-style docstring for the new repository method.The new method docstring is single-line; please add
Args/Returnssections to match repo standards.♻️ Suggested update
async def mark_publish_failed(self, event_id: str) -> None: - """Mark an event as failed to publish to Kafka for later retry.""" + """Mark an event as failed to publish to Kafka for later retry. + + Args: + event_id: Unique identifier of the event to mark. + + Returns: + None. + """As per coding guidelines, "Use Google-style docstrings with Args/Returns/Raises sections for all functions and classes."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/db/repositories/event_repository.py` around lines 60 - 62, The docstring for mark_publish_failed is a single-line summary; update it to a Google-style docstring for EventRepository.mark_publish_failed (or the standalone async function mark_publish_failed) with an initial short description plus an Args section documenting event_id: str, a Returns section (None), and a Raises section if the method can raise exceptions (e.g., database errors) to match repository standards; keep the text concise and consistent with other repo methods' docstrings.backend/app/db/repositories/notification_repository.py (1)
58-59: Consider usingNotInfor consistency with other queries in this file.The file already imports
NotIn(line 6) and uses it elsewhere (lines 87, 121). Using a singleNotIncondition is more concise and consistent with the existing patterns.Suggested refactor
async def mark_all_as_read(self, user_id: str) -> int: result = await NotificationDocument.find( NotificationDocument.user_id == user_id, - NotificationDocument.status != NotificationStatus.READ, - NotificationDocument.status != NotificationStatus.CLICKED, + NotIn(NotificationDocument.status, [NotificationStatus.READ, NotificationStatus.CLICKED]), ).update_many({"$set": {"status": NotificationStatus.READ, "read_at": datetime.now(UTC)}})🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/db/repositories/notification_repository.py` around lines 58 - 59, The query currently uses two separate inequality conditions (NotificationDocument.status != NotificationStatus.READ, NotificationDocument.status != NotificationStatus.CLICKED); replace them with a single NotIn([NotificationStatus.READ, NotificationStatus.CLICKED]) to match the file’s existing pattern and imports—update the clause that references NotificationDocument.status to use NotIn with the two statuses so the query is more concise and consistent with other uses of NotIn in this repository.backend/app/services/saga/saga_orchestrator.py (1)
130-133: Narrow the retry-path exception handling.Line 132 catches
Exceptionbroadly in a core control-flow path. Please catch expected failure types explicitly so retries don’t accidentally absorb unrelated faults.As per coding guidelines "Raise SpecificError(...) from original_exc to preserve exception chains; never catch Exception broadly—catch specific exception types".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/saga/saga_orchestrator.py` around lines 130 - 133, The try/except around await self._start_saga(event) currently catches Exception too broadly; update it to catch only the specific retryable exceptions (e.g., SagaTimeoutError, TransientQueueError, or other domain-specific exceptions you define) and call await self._queue.increment_retry_count(execution_id) only for those cases, leaving other exceptions to propagate; when wrapping or re-raising create and use a specific error (e.g., SagaRetryError) with "raise SagaRetryError(...) from original_exc" to preserve the exception chain and avoid swallowing unrelated faults in _start_saga and the surrounding orchestrator logic.backend/app/db/repositories/execution_repository.py (1)
50-55: Use Google-style docstrings for the new repository methods.Lines 50-55 and 109-110 should include structured
Args,Returns, and (where relevant)Raisessections to match repository standards.As per coding guidelines, "Use Google-style docstrings with Args/Returns/Raises sections for all functions and classes."
Also applies to: 109-110
backend/app/services/execution_service.py (1)
517-521: Consider upgrading this docstring to full Google style.Line 517’s docstring is clear but still missing structured
Args/Returnssections.As per coding guidelines, "Use Google-style docstrings with Args/Returns/Raises sections for all functions and classes."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/execution_service.py` around lines 517 - 521, Convert the existing triple-quoted docstring for the function that "Publishes cancellation event for a deleted execution" in execution_service.py into a Google-style docstring: add an Args section enumerating and describing each parameter from that function's signature, a Returns section describing the return value (if any), and a Raises section for any exceptions the function may propagate; keep the existing summary and note that it uses ExecutionCancelledEvent in the description. Ensure the docstring sits immediately above the function definition and follows Google style formatting (Args:, Returns:, Raises:) so linters and docs pick it up.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/services/pod_monitor/event_mapper.py`:
- Around line 503-511: The code reads declared frame lengths from the logs
(stdout_len, stderr_len) and slices payloads into stdout and stderr without
checking bounds; add explicit boundary validation before slicing: verify nl + 1
+ stdout_len <= len(logs) and likewise for stderr (using the computed index for
the stderr frame) and handle failures as parse errors (e.g., return a
parse-failure / raise or log and abort mapping) instead of accepting truncated
frames; also ensure any index() lookups are guarded (catch ValueError) so
malformed logs are treated as parse failures.
In `@backend/app/services/saga/saga_orchestrator.py`:
- Around line 146-149: The current design causes recursive scheduling because
_resolve_completion(...) invokes try_schedule_from_queue(), and callers like the
max-retry path (where you await _resolve_completion(execution_id,
SagaState.FAILED, ...)) then let that call trigger nested scheduling; fix by
removing implicit scheduling from _resolve_completion and making scheduling
explicit: refactor _resolve_completion (or add an optional parameter like
schedule_after: bool default False) so it does not call
try_schedule_from_queue() unconditionally, then update callers (e.g., the
max-retry drop site that calls _resolve_completion and any other callers) to
explicitly call try_schedule_from_queue() after awaiting _resolve_completion
when scheduling is desired; ensure all call sites are updated to prevent any
remaining indirect recursion.
In `@backend/tests/e2e/db/repositories/test_execution_repository.py`:
- Around line 54-61: The call to repo.write_terminal_result(...) (which returns
bool) is ignored, so failed terminal writes can silently pass; update the helper
used by tests like "no_completed" and "multiple_languages" to capture the
boolean result from repo.write_terminal_result(ExecutionResultDomain(...)) and
assert it (or raise) when False so the test setup fails fast if the terminal
write did not succeed.
In `@backend/tests/unit/services/saga/test_saga_orchestrator.py`:
- Around line 329-346: The test docstring for
test_max_retries_exceeded_fails_execution() is inaccurate: it claims the
execution is dropped with FAILED state but the assertions only check that the
queue was not re-enqueued and that the slot was released; update the docstring
to reflect the actual assertions (e.g., "After _MAX_SAGA_START_RETRIES failures,
the execution is not re-enqueued and the slot is released") or alternatively add
an explicit assertion that verifies the execution reached FAILED state via the
repo/observer API used in this test (referencing
test_max_retries_exceeded_fails_execution, orch._MAX_SAGA_START_RETRIES,
fake_queue.enqueued, fake_repo/fake_queue behavior).
---
Duplicate comments:
In `@backend/app/db/repositories/event_repository.py`:
- Around line 267-270: The current code misuses await with the async context
managers: call start_session() directly in the async with (no await) and call
session.start_transaction() (no await) inside the nested async with; i.e.
replace the inverted pattern around
EventDocument.get_pymongo_collection().database.client.start_session() and
session.start_transaction() so the block becomes "async with
EventDocument.get_pymongo_collection().database.client.start_session() as
session: async with session.start_transaction(): await
archived_doc.insert(session=session); await doc.delete(session=session)". Ensure
you remove the extra await before start_transaction and do not await
start_session.
In `@backend/app/services/pod_monitor/event_mapper.py`:
- Around line 483-490: The numeric casts from termination metadata in
event_mapper.py (fields like exit_code,
ResourceUsageDomain.execution_time_wall_seconds, cpu_time_jiffies,
clk_tck_hertz, peak_memory_kb) are unguarded and can raise ValueError/TypeError;
wrap these conversions in safe parsing helpers (e.g., safe_int(value, default)
and safe_float(value, default)) or try/except blocks that return sensible
defaults when parsing fails, and use those helpers where
exit_code=int(meta.get(...)) and the ResourceUsageDomain numeric fields are set
so malformed or missing meta won't crash the mapper (update the mapping logic
that references terminated.exit_code and meta.get(...) accordingly).
- Around line 476-478: In _extract_logs, replace the broad "except Exception"
with handlers for the specific pod-log/read failures (e.g.,
kubernetes.client.exceptions.ApiException, urllib3.exceptions.HTTPError, and
standard I/O errors like OSError/IOError) so only known recoverable pod-read
errors are caught and logged with self.logger.warning(..., exc_info=True) and
return None; any other unexpected exceptions should not be swallowed—let them
propagate (or re-raise) to preserve tracebacks, and when you wrap or transform
exceptions follow the pattern raise SpecificError("...") from original_exc to
preserve the exception chain.
---
Nitpick comments:
In `@backend/app/db/repositories/event_repository.py`:
- Around line 60-62: The docstring for mark_publish_failed is a single-line
summary; update it to a Google-style docstring for
EventRepository.mark_publish_failed (or the standalone async function
mark_publish_failed) with an initial short description plus an Args section
documenting event_id: str, a Returns section (None), and a Raises section if the
method can raise exceptions (e.g., database errors) to match repository
standards; keep the text concise and consistent with other repo methods'
docstrings.
In `@backend/app/db/repositories/notification_repository.py`:
- Around line 58-59: The query currently uses two separate inequality conditions
(NotificationDocument.status != NotificationStatus.READ,
NotificationDocument.status != NotificationStatus.CLICKED); replace them with a
single NotIn([NotificationStatus.READ, NotificationStatus.CLICKED]) to match the
file’s existing pattern and imports—update the clause that references
NotificationDocument.status to use NotIn with the two statuses so the query is
more concise and consistent with other uses of NotIn in this repository.
In `@backend/app/services/execution_queue.py`:
- Around line 157-162: The current increment_retry_count uses two separate Redis
calls (self._redis.incr and self._redis.expire) which can leave the key without
TTL if the process crashes between calls; change increment_retry_count to run
the INCR and EXPIRE in a single Redis pipeline/transaction (use
self._redis.pipeline()/multi_exec, call incr(key) and expire(key, _EVENT_TTL) on
the pipe, await pipe.execute(), then extract and return the increment result as
an int) so both operations are applied atomically for the key produced by
_retry_key(execution_id).
In `@backend/app/services/execution_service.py`:
- Around line 517-521: Convert the existing triple-quoted docstring for the
function that "Publishes cancellation event for a deleted execution" in
execution_service.py into a Google-style docstring: add an Args section
enumerating and describing each parameter from that function's signature, a
Returns section describing the return value (if any), and a Raises section for
any exceptions the function may propagate; keep the existing summary and note
that it uses ExecutionCancelledEvent in the description. Ensure the docstring
sits immediately above the function definition and follows Google style
formatting (Args:, Returns:, Raises:) so linters and docs pick it up.
In `@backend/app/services/saga/saga_orchestrator.py`:
- Around line 130-133: The try/except around await self._start_saga(event)
currently catches Exception too broadly; update it to catch only the specific
retryable exceptions (e.g., SagaTimeoutError, TransientQueueError, or other
domain-specific exceptions you define) and call await
self._queue.increment_retry_count(execution_id) only for those cases, leaving
other exceptions to propagate; when wrapping or re-raising create and use a
specific error (e.g., SagaRetryError) with "raise SagaRetryError(...) from
original_exc" to preserve the exception chain and avoid swallowing unrelated
faults in _start_saga and the surrounding orchestrator logic.
In `@backend/tests/unit/services/test_execution_queue.py`:
- Around line 174-183: The test test_increment_retry_count should assert the TTL
passed to mock_redis.expire to ensure it matches the service constant; update
the assertion to check mock_redis.expire was awaited with the retry key and the
expected TTL (use the same _EVENT_TTL used by ExecutionQueueService or import
that constant) so replace the generic expire.assert_awaited_once() with
expire.assert_awaited_once_with("exec_queue:retries:e1", _EVENT_TTL) while
keeping the rest of the test and references to increment_retry_count unchanged.
- Around line 91-103: The test currently checks srem cleanup but doesn't verify
the observability metric; update test_try_schedule_event_data_expired to assert
that the event-data-loss metric was recorded by mocking or spying on the metric
method (e.g., set queue_service.record_event_data_lost or
queue_service.metrics.record_event_data_lost to a MagicMock/AsyncMock before
calling queue_service.try_schedule) and then assert it was called once after
await queue_service.try_schedule(5); reference symbols:
test_try_schedule_event_data_expired, ExecutionQueueService.try_schedule, and
record_event_data_lost (or queue_service.metrics.record_event_data_lost).
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
backend/app/core/middlewares/rate_limit.pybackend/app/core/utils.pybackend/app/db/repositories/event_repository.pybackend/app/db/repositories/execution_repository.pybackend/app/db/repositories/notification_repository.pybackend/app/domain/user/__init__.pybackend/app/domain/user/exceptions.pybackend/app/services/auth_service.pybackend/app/services/execution_queue.pybackend/app/services/execution_service.pybackend/app/services/notification_service.pybackend/app/services/pod_monitor/event_mapper.pybackend/app/services/saga/saga_orchestrator.pybackend/tests/e2e/db/repositories/test_execution_repository.pybackend/tests/unit/services/saga/test_saga_orchestrator.pybackend/tests/unit/services/test_execution_queue.pydocs/architecture/domain-exceptions.md
🚧 Files skipped from review as they are similar to previous changes (3)
- backend/app/services/auth_service.py
- docs/architecture/domain-exceptions.md
- backend/app/services/notification_service.py
|



Summary by cubic
Strengthened backend security, reliability, and performance. Adds security headers, IP‑keyed token‑bucket rate limiting, streamed request size checks, atomic terminal writes using ACTIVE/TERMINAL sets, and a new pod output format with correct exit code propagation.
New Features
Refactors
Written for commit 5d6b62d. Summary will update on new commits.
Summary by CodeRabbit
New Features
Bug Fixes
Security
Documentation