Observability: Telemetry and logging for collection and LLM job services#772
Observability: Telemetry and logging for collection and LLM job services#772
Conversation
- Integrated OpenTelemetry tracing into collection creation and deletion processes to improve observability. - Added logging context for better traceability during job execution. - Refactored job execution methods to include detailed span attributes and error handling. - Updated callback mechanisms to ensure success and failure responses are properly logged and sent. - Improved error handling in LLM job execution, including telemetry for provider calls and response handling. - Updated the lock file to reflect changes in Python version requirements.
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 54 minutes and 17 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis PR introduces comprehensive OpenTelemetry instrumentation across the backend, adding tracing configuration, structured logging context, span attributes for API requests and Celery job execution, Sentry integration, and metrics collection for task and LLM activity. Configuration defaults enable OTEL by environment variable, and instrumentation is applied at multiple layers: middleware, API dependencies, service execution, and Celery task handling. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
backend/app/services/collections/delete_collection.py (1)
29-35:⚠️ Potential issue | 🟡 MinorAlign
start_job’s return type with the returned value.The function is annotated as
str, but Line 65 returns theUUIDobject passed in ascollection_job_id.Suggested fix
def start_job( db: Session, request: DeletionRequest, project_id: int, collection_job_id: UUID, organization_id: int, -) -> str: +) -> UUID:As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.Also applies to: 65-65
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/collections/delete_collection.py` around lines 29 - 35, The function start_job currently declares a return type of str but actually returns the UUID object collection_job_id; update the signature to return UUID (e.g., change "-> str" to "-> UUID") and ensure UUID is imported (from uuid import UUID) so the return type matches the returned value; verify any callers or tests expecting a str are adjusted if necessary.backend/app/api/routes/collections.py (1)
88-92:⚠️ Potential issue | 🟡 MinorAdd return annotations to the changed endpoints.
These handlers now have changed bodies but still omit return types; annotate them with the response wrapper type.
Suggested fix
def create_collection( session: SessionDep, current_user: AuthContextDep, request: CreationRequest, -): +) -> APIResponse[CollectionJobImmediatePublic]: @@ def delete_collection( session: SessionDep, current_user: AuthContextDep, collection_id: UUID = FastPath(description="Collection to delete"), request: CallbackRequest | None = Body(default=None), -): +) -> APIResponse[CollectionJobImmediatePublic]:As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.Also applies to: 156-160
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/api/routes/collections.py` around lines 88 - 92, The endpoint handlers (notably create_collection and the other handler defined around lines 156-160) lack return type annotations; add explicit return type hints using the project's response wrapper type (the same wrapper used elsewhere for API handlers) so signatures become e.g. def create_collection(... ) -> ResponseWrapper[CreationResponse]: and likewise annotate the second handler with its appropriate wrapper/response type; update imports if needed to reference the response wrapper and the concrete response DTO types referenced in the function bodies.backend/app/services/collections/create_collection.py (1)
39-46:⚠️ Potential issue | 🟡 MinorAlign
start_job’s return type with the returned value.The function is annotated as
str, but Line 76 returnscollection_job_idas aUUID.Suggested fix
def start_job( db: Session, request: CreationRequest, project_id: int, collection_job_id: UUID, with_assistant: bool, organization_id: int, -) -> str: +) -> UUID:As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.Also applies to: 76-76
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/collections/create_collection.py` around lines 39 - 46, The function start_job is annotated to return str but actually returns collection_job_id (a UUID); update the return type annotation from str to UUID and ensure UUID is imported/used from uuid (or typing) so the signature reads start_job(... ) -> UUID; verify all parameters remain fully typed and adjust any callers or type checks expecting str if necessary.backend/app/api/routes/llm.py (1)
53-55:⚠️ Potential issue | 🟡 MinorAdd the missing return type for
llm_call.The changed endpoint should declare its
APIResponse[LLMJobImmediatePublic]return type.Suggested fix
def llm_call( _current_user: AuthContextDep, session: SessionDep, request: LLMCallRequest -): +) -> APIResponse[LLMJobImmediatePublic]:As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/api/routes/llm.py` around lines 53 - 55, The llm_call endpoint is missing an explicit return type; update the function signature for llm_call to declare it returns APIResponse[LLMJobImmediatePublic] (i.e., def llm_call(_current_user: AuthContextDep, session: SessionDep, request: LLMCallRequest) -> APIResponse[LLMJobImmediatePublic]:). Ensure the referenced types (APIResponse and LLMJobImmediatePublic) are imported if not already.
🧹 Nitpick comments (5)
backend/app/core/config.py (1)
138-139: Default mismatch with.env.example.
OTEL_ENABLEDdefaults toFalsehere but.env.examplesetsOTEL_ENABLED=true. Consider aligning (either defaultTruehere orfalsein the example) to avoid confusion about the out-of-the-box behavior.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/core/config.py` around lines 138 - 139, The default for OTEL_ENABLED in config.py (OTEL_ENABLED: bool = False) conflicts with .env.example which sets OTEL_ENABLED=true; update one to match: either change OTEL_ENABLED in config.py to True (OTEL_ENABLED: bool = True) to reflect the example, or change the .env.example entry to false—make the chosen source of truth consistent and keep OTEL_SERVICE_NAME unchanged.backend/app/celery/tasks/job_execution.py (1)
19-56: Type the new OTel helper boundary.The new helpers leave
task_instance,fn, and the return value untyped. Add a generic callable type so task return types are preserved.Suggested fix
import logging +from collections.abc import Callable +from typing import Any, TypeVar from asgi_correlation_id import correlation_id @@ logger = logging.getLogger(__name__) +_T = TypeVar("_T") @@ -def _extract_parent_context(task_instance) -> otel_context.Context: +def _extract_parent_context(task_instance: Any) -> otel_context.Context: @@ -def _run_with_otel_parent(task_instance, fn): +def _run_with_otel_parent(task_instance: Any, fn: Callable[[], _T]) -> _T:As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/celery/tasks/job_execution.py` around lines 19 - 56, The helper functions _extract_parent_context and _run_with_otel_parent lack proper type hints for task_instance, fn, and the return value; add typing to preserve task return types by importing TypeVar and Callable, declare a TypeVar R for the return type, type task_instance as the appropriate Celery Task/Any (or Task request type) and type fn as Callable[[], R], and annotate _run_with_otel_parent to return R; also update _extract_parent_context to return otel_context.Context (already present) and ensure any use-sites match the new generic signature.backend/app/services/llm/jobs.py (1)
129-173: Inconsistency withstart_job: no span created here.
start_job(Line 81-86) wraps the job creation + task scheduling intracer.start_as_current_span("llm.start_job")and records exceptions / attributes on the span.start_chain_jobonly openslog_context(...)and skips the tracer span entirely, which means chain-start failures don't surface as a span in Sentry AI Insights / OTel traces the same way LLM-call starts do. If this is intentional, ignore; otherwise mirror the pattern so the two entry points have parity.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 129 - 173, start_chain_job currently only uses log_context and omits creating an OpenTelemetry/Sentry span like start_job does; wrap the critical section in tracer.start_as_current_span("llm.start_chain_job") (matching the pattern used in start_job) around job creation + start_llm_chain_job, add the same span.set_attribute calls and exception recording (including setting span.status on error) so chain-start failures appear in traces, and ensure to record the task_id and job_id on the span; locate start_chain_job and mirror the span usage and exception handling logic from start_job to implement this.backend/app/core/telemetry.py (1)
8-8: Minor: address ruff hints (UP035, B010).
- Line 8:
Iteratorshould be imported fromcollections.abcper UP035.- Line 558: ruff B010 —
setattr(engine, "_kaapi_db_telemetry_instrumented", True)is equivalent to plain attribute assignment.♻️ Proposed fix
-from typing import TYPE_CHECKING, Any, Iterator +from collections.abc import Iterator +from typing import TYPE_CHECKING, Any ... - setattr(engine, "_kaapi_db_telemetry_instrumented", True) + engine._kaapi_db_telemetry_instrumented = True # type: ignore[attr-defined]Also applies to: 558-558
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/core/telemetry.py` at line 8, Update the imports and attribute assignment to address ruff hints: replace Iterator import from typing with importing Iterator from collections.abc (leave TYPE_CHECKING and Any as-is) so change the import line that currently lists "TYPE_CHECKING, Any, Iterator"; and replace the setattr call that sets "_kaapi_db_telemetry_instrumented" on the engine (the line using setattr(engine, "_kaapi_db_telemetry_instrumented", True)) with a direct attribute assignment engine._kaapi_db_telemetry_instrumented = True to satisfy B010; keep the same attribute name and effect.backend/app/celery/utils.py (1)
16-23: Add type hint fortaskand surfaceapply_asyncfailures.Two small concerns in the new helper:
- The
taskparameter has no type annotation. As per coding guidelines: "Always add type hints to all function parameters and return values in Python code".apply_async(...)can raise broker errors (e.g., connection refused,OperationalError). Currently everystart_*_job(...)wrapper catches these viatry/except(seestart_jobinjobs.py, Line 101-120), which is fine — just worth noting that_enqueue_with_trace_contextis not idempotent and a retry-after-partial-failure path would re-inject OTel headers. Not actionable here, just a note for callers.♻️ Proposed type-hint fix
-def _enqueue_with_trace_context(task, **kwargs) -> str: +def _enqueue_with_trace_context(task: "Task", **kwargs: Any) -> str: """Publish Celery task with explicit trace context headers.""" otel_headers: dict[str, str] = {} inject(otel_headers) celery_headers = dict(otel_headers) celery_headers["otel"] = otel_headers async_result = task.apply_async(kwargs=kwargs, headers=celery_headers) return async_result.idAnd at the top of the file:
+from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + from celery import TaskAs per coding guidelines: "Always add type hints to all function parameters and return values in Python code".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/celery/utils.py` around lines 16 - 23, Add an explicit type hint for the task parameter (use celery.app.task.Task) and keep the return type as str; then wrap the call to task.apply_async(...) in a try/except to surface broker/apply_async failures with context: catch Exception as e and re-raise a new RuntimeError (or custom exception) that includes the task identity (e.g., task.name) and chain the original exception via "from e" so callers can handle or log the underlying broker error; update any imports if needed (e.g., from celery.app.task import Task).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/api/deps.py`:
- Around line 52-59: Remove PII from OpenTelemetry span attributes by stopping
the direct setting of auth_context.user.email and, unless approved,
auth_context.organization.name and auth_context.project.name on spans; keep only
non-PII identifiers (span.set_attribute("user.id", ...),
span.set_attribute("tenant.org_id", ...),
span.set_attribute("tenant.project_id", ...)) or, if you need correlation,
replace the direct name/email values with a deterministic hash or masked value
before calling span.set_attribute for keys "user.email", "tenant.org_name", and
"tenant.project_name".
In `@backend/app/api/routes/llm.py`:
- Around line 162-170: The code warns when llm_call.usage is missing but still
constructs Usage(**llm_call.usage) which will raise; update the handler that
builds LLMCallResponse (the block creating LLMCallResponse with
response=llm_response, usage=Usage(**llm_call.usage), provider_raw_response=...)
to first guard llm_call.usage and either (A) return a controlled 4xx/5xx error
indicating missing usage (including job_id and project_id in the error/log) or
(B) populate a safe default Usage instance (e.g., zeros/nulls) before passing it
into LLMCallResponse; ensure the chosen approach is used consistently where
get_llm_call / LLMCallResponse and Usage are referenced so the endpoint no
longer raises when llm_call.usage is None.
In `@backend/app/core/logger.py`:
- Around line 43-68: The logging guard currently prevents reconfiguring when
configure_logging is called again, causing an old service name to persist;
update configure_logging to be service-name aware by storing the configured
service name on the root logger (e.g., add a
root_logger._kaapi_logging_service_name) and, if _kaapi_logging_configured is
True but the provided service_name (resolved_service_name) differs from
root_logger._kaapi_logging_service_name, replace or update the ServiceNameFilter
instances on existing handlers (or recreate handlers) so they use the new
resolved_service_name; ensure to update the stored _kaapi_logging_service_name
when changes are applied and keep existing use of CorrelationIdFilter and
_kaapi_logging_configured.
In `@backend/app/core/telemetry.py`:
- Around line 351-363: The loop currently checks only for key presence but may
call span.set_attribute with None; update the logic in the block that iterates
over ("gen_ai.request.temperature", "temperature") etc. to only call
span.set_attribute(attr_key, params.get(param_key)) when params.get(param_key)
is not None (i.e., guard against None values), and similarly change the tools
handling so span.set_attribute("gen_ai.request.available_tools",
json.dumps(tools)) is only executed when tools is not None and non-empty; ensure
these checks are done around the existing span.set_attribute usage so the
OpenTelemetry attributes are never set to None.
In `@backend/app/services/collections/create_collection.py`:
- Around line 279-306: The except block in create_collection.execute_job
currently logs the error, attempts provider cleanup, marks the job failed via
_mark_job_failed, and sends the failure callback (build_failure_payload +
send_callback) but then returns normally; to ensure Celery records the task as
failed, re-raise the original exception at the end of that except block (use a
bare "raise" to preserve the original traceback) immediately after sending the
callback and after _mark_job_failed completes, so the exception propagated by
span.record_exception and logger.error is not swallowed.
In `@backend/app/services/collections/delete_collection.py`:
- Around line 236-245: The exception is currently swallowed after calling
span.record_exception/ span.set_status and _mark_job_failed_and_callback, so
Celery treats the task as successful; modify the except block in the function
containing span.record_exception to re-raise the caught exception after the
failure callback: keep the existing calls to span.record_exception(err),
span.set_status(...), and _mark_job_failed_and_callback(project_id=project_id,
collection_id=collection_id, job_id=job_uuid, err=err,
callback_url=deletion_request.callback_url) and then add a plain "raise" to
re-raise the original exception so upstream task failure handling (e.g., Celery
task_failure telemetry) runs.
In `@backend/app/services/llm/jobs.py`:
- Around line 599-601: The per-LLM-call synchronous
flush_telemetry(timeout_millis=10000) in execute_llm_call is causing additive
tail latency for chain jobs; remove this unconditional 10s flush and instead
either (a) delete the call so you rely on the existing outer finally blocks and
task_postrun flush, or (b) replace the literal with a config-driven short
timeout (e.g., read TELEMETRY_PER_LLM_FLUSH_MS and default to 0/500) so the
flush is off or uses a tiny timeout by default; update execute_llm_call to
reference that config variable and ensure behavior remains backward-compatible.
- Around line 891-911: The nested except currently re-uses the outer exception
variable e, hiding the real DB/update error; modify the inner except in
execute_chain_job to catch a new variable (e.g., update_err) instead of reusing
e, and change the logger.error there to include that new variable and its
exc_info so the update_llm_chain_status failure is logged (reference
update_llm_chain_status, ChainStatus.FAILED, logger.error, and the
Session(engine) context).
- Around line 720-751: The log message in execute_job uses inconsistent
formatting for callback_url and always logs "Error if any..." at INFO level;
update the logger call that currently formats f"...callback_url
{callback_url_str}" to use an equals sign like callback_url={callback_url_str}
to match other fields, and change the post-execution logging that references
result.error so it only emits a log when result.error is truthy (e.g., if
result.error: logger.error(...)) or else demote it to debug (logger.debug(...))
— adjust the calls around the execute_llm_call invocation and the subsequent
logger usage where result and logger are referenced.
---
Outside diff comments:
In `@backend/app/api/routes/collections.py`:
- Around line 88-92: The endpoint handlers (notably create_collection and the
other handler defined around lines 156-160) lack return type annotations; add
explicit return type hints using the project's response wrapper type (the same
wrapper used elsewhere for API handlers) so signatures become e.g. def
create_collection(... ) -> ResponseWrapper[CreationResponse]: and likewise
annotate the second handler with its appropriate wrapper/response type; update
imports if needed to reference the response wrapper and the concrete response
DTO types referenced in the function bodies.
In `@backend/app/api/routes/llm.py`:
- Around line 53-55: The llm_call endpoint is missing an explicit return type;
update the function signature for llm_call to declare it returns
APIResponse[LLMJobImmediatePublic] (i.e., def llm_call(_current_user:
AuthContextDep, session: SessionDep, request: LLMCallRequest) ->
APIResponse[LLMJobImmediatePublic]:). Ensure the referenced types (APIResponse
and LLMJobImmediatePublic) are imported if not already.
In `@backend/app/services/collections/create_collection.py`:
- Around line 39-46: The function start_job is annotated to return str but
actually returns collection_job_id (a UUID); update the return type annotation
from str to UUID and ensure UUID is imported/used from uuid (or typing) so the
signature reads start_job(... ) -> UUID; verify all parameters remain fully
typed and adjust any callers or type checks expecting str if necessary.
In `@backend/app/services/collections/delete_collection.py`:
- Around line 29-35: The function start_job currently declares a return type of
str but actually returns the UUID object collection_job_id; update the signature
to return UUID (e.g., change "-> str" to "-> UUID") and ensure UUID is imported
(from uuid import UUID) so the return type matches the returned value; verify
any callers or tests expecting a str are adjusted if necessary.
---
Nitpick comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 19-56: The helper functions _extract_parent_context and
_run_with_otel_parent lack proper type hints for task_instance, fn, and the
return value; add typing to preserve task return types by importing TypeVar and
Callable, declare a TypeVar R for the return type, type task_instance as the
appropriate Celery Task/Any (or Task request type) and type fn as Callable[[],
R], and annotate _run_with_otel_parent to return R; also update
_extract_parent_context to return otel_context.Context (already present) and
ensure any use-sites match the new generic signature.
In `@backend/app/celery/utils.py`:
- Around line 16-23: Add an explicit type hint for the task parameter (use
celery.app.task.Task) and keep the return type as str; then wrap the call to
task.apply_async(...) in a try/except to surface broker/apply_async failures
with context: catch Exception as e and re-raise a new RuntimeError (or custom
exception) that includes the task identity (e.g., task.name) and chain the
original exception via "from e" so callers can handle or log the underlying
broker error; update any imports if needed (e.g., from celery.app.task import
Task).
In `@backend/app/core/config.py`:
- Around line 138-139: The default for OTEL_ENABLED in config.py (OTEL_ENABLED:
bool = False) conflicts with .env.example which sets OTEL_ENABLED=true; update
one to match: either change OTEL_ENABLED in config.py to True (OTEL_ENABLED:
bool = True) to reflect the example, or change the .env.example entry to
false—make the chosen source of truth consistent and keep OTEL_SERVICE_NAME
unchanged.
In `@backend/app/core/telemetry.py`:
- Line 8: Update the imports and attribute assignment to address ruff hints:
replace Iterator import from typing with importing Iterator from collections.abc
(leave TYPE_CHECKING and Any as-is) so change the import line that currently
lists "TYPE_CHECKING, Any, Iterator"; and replace the setattr call that sets
"_kaapi_db_telemetry_instrumented" on the engine (the line using setattr(engine,
"_kaapi_db_telemetry_instrumented", True)) with a direct attribute assignment
engine._kaapi_db_telemetry_instrumented = True to satisfy B010; keep the same
attribute name and effect.
In `@backend/app/services/llm/jobs.py`:
- Around line 129-173: start_chain_job currently only uses log_context and omits
creating an OpenTelemetry/Sentry span like start_job does; wrap the critical
section in tracer.start_as_current_span("llm.start_chain_job") (matching the
pattern used in start_job) around job creation + start_llm_chain_job, add the
same span.set_attribute calls and exception recording (including setting
span.status on error) so chain-start failures appear in traces, and ensure to
record the task_id and job_id on the span; locate start_chain_job and mirror the
span usage and exception handling logic from start_job to implement this.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c64e3285-4258-4801-9752-d5a56c557a8f
⛔ Files ignored due to path filters (1)
backend/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (19)
.env.example.gitignorebackend/app/api/deps.pybackend/app/api/routes/collections.pybackend/app/api/routes/llm.pybackend/app/celery/celery_app.pybackend/app/celery/tasks/job_execution.pybackend/app/celery/utils.pybackend/app/core/config.pybackend/app/core/db.pybackend/app/core/langfuse/langfuse.pybackend/app/core/logger.pybackend/app/core/middleware.pybackend/app/core/sentry_filters.pybackend/app/core/telemetry.pybackend/app/main.pybackend/app/services/collections/create_collection.pybackend/app/services/collections/delete_collection.pybackend/app/services/llm/jobs.py
| def configure_logging(service_name: str | None = None) -> None: | ||
| root_logger = logging.getLogger() | ||
| if getattr(root_logger, "_kaapi_logging_configured", False): | ||
| return | ||
|
|
||
| # Stream handler (console) | ||
| stream_handler = logging.StreamHandler() | ||
| stream_handler.setFormatter(formatter) | ||
| stream_handler.addFilter(CorrelationIdFilter()) | ||
| logger.addHandler(stream_handler) | ||
| root_logger.setLevel(LOGGING_LEVEL) | ||
|
|
||
| # Rotating file handler | ||
| file_handler = RotatingFileHandler( | ||
| LOG_FILE_PATH, maxBytes=10 * 1024 * 1024, backupCount=5 | ||
| ) | ||
| file_handler.setFormatter(formatter) | ||
| file_handler.addFilter(CorrelationIdFilter()) | ||
| logger.addHandler(file_handler) | ||
| formatter = logging.Formatter(LOGGING_FORMAT) | ||
| resolved_service_name = service_name or settings.OTEL_SERVICE_NAME | ||
|
|
||
| stream_handler = logging.StreamHandler() | ||
| stream_handler.setFormatter(formatter) | ||
| stream_handler.addFilter(CorrelationIdFilter()) | ||
| stream_handler.addFilter(ServiceNameFilter(resolved_service_name)) | ||
|
|
||
| file_handler = RotatingFileHandler( | ||
| LOG_FILE_PATH, maxBytes=10 * 1024 * 1024, backupCount=5 | ||
| ) | ||
| file_handler.setFormatter(formatter) | ||
| file_handler.addFilter(CorrelationIdFilter()) | ||
| file_handler.addFilter(ServiceNameFilter(resolved_service_name)) | ||
|
|
||
| root_logger.handlers.clear() | ||
| root_logger.addHandler(stream_handler) | ||
| root_logger.addHandler(file_handler) | ||
| root_logger._kaapi_logging_configured = True |
There was a problem hiding this comment.
Make the logging guard service-name aware.
configure_logging(service_name="kaapi-celery") is ignored once _kaapi_logging_configured is set, so a process that configured logging earlier keeps the wrong service_name in every log record.
Suggested fix
def configure_logging(service_name: str | None = None) -> None:
root_logger = logging.getLogger()
- if getattr(root_logger, "_kaapi_logging_configured", False):
+ resolved_service_name = service_name or settings.OTEL_SERVICE_NAME
+ if (
+ getattr(root_logger, "_kaapi_logging_configured", False)
+ and getattr(root_logger, "_kaapi_logging_service_name", None)
+ == resolved_service_name
+ ):
return
root_logger.setLevel(LOGGING_LEVEL)
formatter = logging.Formatter(LOGGING_FORMAT)
- resolved_service_name = service_name or settings.OTEL_SERVICE_NAME
stream_handler = logging.StreamHandler()
@@
root_logger.addHandler(stream_handler)
root_logger.addHandler(file_handler)
root_logger._kaapi_logging_configured = True
+ root_logger._kaapi_logging_service_name = resolved_service_name🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/core/logger.py` around lines 43 - 68, The logging guard currently
prevents reconfiguring when configure_logging is called again, causing an old
service name to persist; update configure_logging to be service-name aware by
storing the configured service name on the root logger (e.g., add a
root_logger._kaapi_logging_service_name) and, if _kaapi_logging_configured is
True but the provided service_name (resolved_service_name) differs from
root_logger._kaapi_logging_service_name, replace or update the ServiceNameFilter
instances on existing handlers (or recreate handlers) so they use the new
resolved_service_name; ensure to update the stored _kaapi_logging_service_name
when changes are applied and keep existing use of CorrelationIdFilter and
_kaapi_logging_configured.
| for attr_key, param_key in ( | ||
| ("gen_ai.request.temperature", "temperature"), | ||
| ("gen_ai.request.max_tokens", "max_tokens"), | ||
| ("gen_ai.request.top_p", "top_p"), | ||
| ("gen_ai.request.presence_penalty", "presence_penalty"), | ||
| ("gen_ai.request.frequency_penalty", "frequency_penalty"), | ||
| ): | ||
| if param_key in params: | ||
| span.set_attribute(attr_key, params.get(param_key)) | ||
|
|
||
| tools = params.get("tools") | ||
| if tools is not None: | ||
| span.set_attribute("gen_ai.request.available_tools", json.dumps(tools)) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Does OpenTelemetry Python span.set_attribute raise or silently drop when value is None?
💡 Result:
OpenTelemetry Python span.set_attribute neither raises an exception nor silently drops the attribute when the value is None. Instead, the behavior is undefined, as explicitly stated in the official documentation and API specification. The documentation strongly discourages passing None values, noting: "Note: The behavior of None value attributes is undefined, and hence strongly discouraged." This applies to both set_attribute(key, value) and set_attributes(attributes). The OpenTelemetry specification confirms that null attribute values are invalid, with attempting to set null being undefined behavior. In practice, SDK implementations (like NonRecordingSpan) treat it as a no-op without raising, but users should avoid None values to ensure portability and correctness across implementations and exporters.
Citations:
- 1: https://opentelemetry-python.readthedocs.io/en/stable/api/trace.span.html
- 2: https://opentelemetry-python.readthedocs.io/en/latest/api/trace.span.html
- 3: https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-api/src/opentelemetry/trace/span.py
- 4: https://opentelemetry.io/docs/specs/otel/common/
- 5: https://opentelemetry-python.readthedocs.io/en/stable/_modules/opentelemetry/trace/span.html
🏁 Script executed:
# Check the code context around lines 351-363
sed -n '340,375p' backend/app/core/telemetry.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 1640
🏁 Script executed:
# Search for how params are constructed and if None values are passed
rg -n "temperature\s*=" backend/app/core/telemetry.py | head -20Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 57
🏁 Script executed:
# Look for params construction and usage context
rg -B 10 "gen_ai.request.temperature" backend/app/core/telemetry.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 588
Check for None values in params before setting OpenTelemetry span attributes.
param_key in params only guards existence, not value. If a caller stores temperature=None (common in config defaults), set_attribute("gen_ai.request.temperature", None) passes undefined behavior per OpenTelemetry specification—behavior is not guaranteed across implementations and exporters. This violates the spec requirement that attribute values be valid.
🛡️ Proposed fix
for attr_key, param_key in (
("gen_ai.request.temperature", "temperature"),
("gen_ai.request.max_tokens", "max_tokens"),
("gen_ai.request.top_p", "top_p"),
("gen_ai.request.presence_penalty", "presence_penalty"),
("gen_ai.request.frequency_penalty", "frequency_penalty"),
):
- if param_key in params:
- span.set_attribute(attr_key, params.get(param_key))
+ value = params.get(param_key)
+ if value is not None:
+ span.set_attribute(attr_key, value)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/core/telemetry.py` around lines 351 - 363, The loop currently
checks only for key presence but may call span.set_attribute with None; update
the logic in the block that iterates over ("gen_ai.request.temperature",
"temperature") etc. to only call span.set_attribute(attr_key,
params.get(param_key)) when params.get(param_key) is not None (i.e., guard
against None values), and similarly change the tools handling so
span.set_attribute("gen_ai.request.available_tools", json.dumps(tools)) is only
executed when tools is not None and non-empty; ensure these checks are done
around the existing span.set_attribute usage so the OpenTelemetry attributes are
never set to None.
| logger.info( | ||
| f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url {callback_url_str}" | ||
| ) | ||
|
|
||
| try: | ||
| with Session(engine) as session: | ||
| job_crud = JobCrud(session=session) | ||
| job_crud.update( | ||
| job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING) | ||
| ) | ||
| try: | ||
| with Session(engine) as session: | ||
| job_crud = JobCrud(session=session) | ||
| job_crud.update( | ||
| job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING) | ||
| ) | ||
|
|
||
| langfuse_credentials = get_provider_credential( | ||
| session=session, | ||
| org_id=organization_id, | ||
| project_id=project_id, | ||
| provider="langfuse", | ||
| ) | ||
|
|
||
| langfuse_credentials = get_provider_credential( | ||
| session=session, | ||
| org_id=organization_id, | ||
| result = execute_llm_call( | ||
| config=request.config, | ||
| query=request.query, | ||
| job_id=job_uuid, | ||
| project_id=project_id, | ||
| provider="langfuse", | ||
| organization_id=organization_id, | ||
| request_metadata=request.request_metadata, | ||
| langfuse_credentials=langfuse_credentials, | ||
| include_provider_raw_response=request.include_provider_raw_response, | ||
| ) | ||
|
|
||
| result = execute_llm_call( | ||
| config=request.config, | ||
| query=request.query, | ||
| job_id=job_uuid, | ||
| project_id=project_id, | ||
| organization_id=organization_id, | ||
| request_metadata=request.request_metadata, | ||
| langfuse_credentials=langfuse_credentials, | ||
| include_provider_raw_response=request.include_provider_raw_response, | ||
| ) | ||
|
|
||
| logger.info( | ||
| f"[execute_job] Error if any during execution of job: {result.error}" | ||
| ) | ||
|
|
||
| if result.success: | ||
| callback_response = APIResponse.success_response( | ||
| data=result.response, metadata=result.metadata | ||
| logger.info( | ||
| f"[execute_job] Error if any during execution of job: {result.error}" | ||
| ) |
There was a problem hiding this comment.
Two log-quality nits in execute_job.
- Line 721:
callback_url {callback_url_str}is missing the=separator used elsewhere (job_id={...}, task_id={...}). Reads oddly in log aggregators. - Line 749-751: This log fires on both success and failure paths and prints
Error if any during execution of job: Noneon every successful call, at INFO level. Consider logging it only whenresult.erroris truthy, or demoting to DEBUG.
♻️ Proposed fix
logger.info(
- f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url {callback_url_str}"
+ f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url={callback_url_str}"
)
...
- logger.info(
- f"[execute_job] Error if any during execution of job: {result.error}"
- )
+ if result.error:
+ logger.warning(
+ f"[execute_job] LLM call returned error | job_id={job_id}, error={result.error}"
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| logger.info( | |
| f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url {callback_url_str}" | |
| ) | |
| try: | |
| with Session(engine) as session: | |
| job_crud = JobCrud(session=session) | |
| job_crud.update( | |
| job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING) | |
| ) | |
| try: | |
| with Session(engine) as session: | |
| job_crud = JobCrud(session=session) | |
| job_crud.update( | |
| job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING) | |
| ) | |
| langfuse_credentials = get_provider_credential( | |
| session=session, | |
| org_id=organization_id, | |
| project_id=project_id, | |
| provider="langfuse", | |
| ) | |
| langfuse_credentials = get_provider_credential( | |
| session=session, | |
| org_id=organization_id, | |
| result = execute_llm_call( | |
| config=request.config, | |
| query=request.query, | |
| job_id=job_uuid, | |
| project_id=project_id, | |
| provider="langfuse", | |
| organization_id=organization_id, | |
| request_metadata=request.request_metadata, | |
| langfuse_credentials=langfuse_credentials, | |
| include_provider_raw_response=request.include_provider_raw_response, | |
| ) | |
| result = execute_llm_call( | |
| config=request.config, | |
| query=request.query, | |
| job_id=job_uuid, | |
| project_id=project_id, | |
| organization_id=organization_id, | |
| request_metadata=request.request_metadata, | |
| langfuse_credentials=langfuse_credentials, | |
| include_provider_raw_response=request.include_provider_raw_response, | |
| ) | |
| logger.info( | |
| f"[execute_job] Error if any during execution of job: {result.error}" | |
| ) | |
| if result.success: | |
| callback_response = APIResponse.success_response( | |
| data=result.response, metadata=result.metadata | |
| logger.info( | |
| f"[execute_job] Error if any during execution of job: {result.error}" | |
| ) | |
| logger.info( | |
| f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url={callback_url_str}" | |
| ) | |
| try: | |
| with Session(engine) as session: | |
| job_crud = JobCrud(session=session) | |
| job_crud.update( | |
| job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING) | |
| ) | |
| langfuse_credentials = get_provider_credential( | |
| session=session, | |
| org_id=organization_id, | |
| project_id=project_id, | |
| provider="langfuse", | |
| ) | |
| result = execute_llm_call( | |
| config=request.config, | |
| query=request.query, | |
| job_id=job_uuid, | |
| project_id=project_id, | |
| organization_id=organization_id, | |
| request_metadata=request.request_metadata, | |
| langfuse_credentials=langfuse_credentials, | |
| include_provider_raw_response=request.include_provider_raw_response, | |
| ) | |
| if result.error: | |
| logger.warning( | |
| f"[execute_job] LLM call returned error | job_id={job_id}, error={result.error}" | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/jobs.py` around lines 720 - 751, The log message in
execute_job uses inconsistent formatting for callback_url and always logs "Error
if any..." at INFO level; update the logger call that currently formats
f"...callback_url {callback_url_str}" to use an equals sign like
callback_url={callback_url_str} to match other fields, and change the
post-execution logging that references result.error so it only emits a log when
result.error is truthy (e.g., if result.error: logger.error(...)) or else demote
it to debug (logger.debug(...)) — adjust the calls around the execute_llm_call
invocation and the subsequent logger usage where result and logger are
referenced.
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
backend/app/services/collections/delete_collection.py (2)
29-65:⚠️ Potential issue | 🟡 MinorFix the
start_jobreturn type.Line 35 says
-> str, but Line 65 returnscollection_job_id, which is aUUID. Align the annotation with the implementation unless callers require a string. As per coding guidelines,**/*.py: Always add type hints to all function parameters and return values in Python code.🛠️ Proposed fix
) -> str: + ) -> UUID:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/collections/delete_collection.py` around lines 29 - 65, The function start_job currently annotates its return as str but returns collection_job_id (a UUID); fix by making the annotation match the implementation or vice versa: either change the return type annotation on start_job to -> UUID (and ensure UUID is imported) if callers expect a UUID, or return str(collection_job_id) and update any callers/types accordingly; reference the start_job function, collection_job_id variable, and the return statement to locate the change.
143-149:⚠️ Potential issue | 🟠 MajorIsolate callback failures from deletion state handling.
Line 234 can raise after the collection has already been deleted and marked successful, causing the broad
exceptto mark the job failed. Line 149 can also raise inside_mark_job_failed_and_callback, preventing the original exception from being re-raised at Line 246.🛡️ Proposed fix
if callback_url and collection_job: failure_payload = build_failure_payload( collection_job=collection_job, collection_id=collection_id, error_message=str(err), ) - send_callback(callback_url, failure_payload) + try: + send_callback(callback_url, failure_payload) + except Exception: + logger.warning( + "[delete_collection.execute_job] Failure callback failed | job_id=%s", + str(job_id), + exc_info=True, + ) @@ if deletion_request.callback_url and collection_job: success_payload = build_success_payload( collection_job=collection_job, collection_id=collection_id, ) - send_callback(deletion_request.callback_url, success_payload) + try: + send_callback(deletion_request.callback_url, success_payload) + except Exception: + logger.warning( + "[delete_collection.execute_job] Success callback failed | job_id=%s", + str(job_uuid), + exc_info=True, + )Also applies to: 229-246
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/collections/delete_collection.py` around lines 143 - 149, The callback sending and failure-marking logic (calls to send_callback and build_failure_payload and the helper _mark_job_failed_and_callback) must be isolated so errors in sending callbacks do not change the deletion job state or swallow the original exception: wrap send_callback and the code path that builds/sends failure callbacks in their own try/except blocks, log any callback/send errors but do not call _mark_job_failed_and_callback from inside the same broad exception path that handles the deletion result, and ensure that any exception raised during deletion is re-raised after attempts to notify; likewise protect _mark_job_failed_and_callback itself with its own try/except so its failures are logged and do not prevent the original exception from being re-raised. Ensure you reference and modify the code around send_callback, build_failure_payload, and _mark_job_failed_and_callback to implement these isolated try/excepts and re-raise the original exception at the end of the outer exception handler.backend/app/services/llm/jobs.py (2)
494-583:⚠️ Potential issue | 🟠 MajorFinalize LLM metrics on all provider failure paths.
Line 501 returns provider setup errors before any
llm.call.errorsmetric is emitted. After Line 512, unexpected exceptions jump to the outerexceptat Line 681 and also skiprecord_llm_call_finished(...). This undercounts provider/config failures in the new telemetry.🛡️ Proposed direction
+ provider_name = str(completion_config.provider) + model_name = str(completion_config.params.get("model") or "") + operation = "chat" + record_llm_call_started( + provider=provider_name, + model=model_name, + operation=operation, + organization_id=organization_id, + project_id=project_id, + ) + provider_started_at = time.perf_counter() + call_finished_recorded = False try: provider_instance = get_llm_provider( @@ except ValueError as ve: + record_llm_call_finished( + provider=provider_name, + model=model_name, + operation=operation, + duration_ms=(time.perf_counter() - provider_started_at) * 1000, + error=True, + organization_id=organization_id, + project_id=project_id, + ) + call_finished_recorded = True return BlockResult(error=str(ve), llm_call_id=llm_call_id)Then guard the outer
exceptwith the same initialized locals so unhandled provider exceptions emit one error metric before returning.Also applies to: 668-689
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 494 - 583, The provider setup and execution error paths (get_llm_provider ValueError, inner try ValueError, and any exceptions that bubble to the outer except) return before calling record_llm_call_finished, undercounting errors; update the failure exits to always call record_llm_call_finished(provider=provider_name, model=model_name, operation="chat", duration_ms=(time.perf_counter()-provider_started_at)*1000, error=True, organization_id=organization_id, project_id=project_id) before returning and ensure provider_started_at, provider_name and model_name are initialized for the outer except path (e.g. set defaults before provider lookup) so the outer exception handler can emit the same metric and then return the BlockResult (also keep existing ai_span.set_status calls where applicable).
182-198:⚠️ Potential issue | 🟠 MajorMake callback delivery best-effort and avoid recording raw callback URLs.
Line 184 and Line 756 put the full callback URL into spans; these URLs often carry tenant-specific paths or secret query tokens. Also,
handle_job_errorsends the callback before marking the job failed, and the success path sends the callback before marking the job successful. A callback outage can therefore prevent the correct DB status update or flip a successful LLM execution into a failed job.🛡️ Proposed direction
- cb_span.set_attribute("callback.url", callback_url) + cb_span.set_attribute("callback.configured", True) cb_span.set_attribute("callback.status", "failure") - send_callback( - callback_url=callback_url, - data=callback_response.model_dump(), - ) + try: + send_callback( + callback_url=callback_url, + data=callback_response.model_dump(), + ) + except Exception: + logger.warning( + f"[handle_job_error] Failed to send failure callback | job_id={job_id}", + exc_info=True, + )Apply the same pattern in the success branch after persisting
JobStatus.SUCCESS.Also applies to: 754-779
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 182 - 198, The span currently records the full callback URL and the code sends callbacks before updating DB state; change both success and failure flows to update the job status first via JobCrud.update(JobUpdate(...)) and then attempt a best-effort send_callback inside a try/except so callback failures do not prevent DB updates, and remove the raw URL from the span (use an obfuscated value or just record the host/domain or a hash under tracer.start_as_current_span("llm.send_callback") instead of the full callback_url) while recording any send error in the span or logs; apply this pattern to the failure branch shown (send_callback, cb_span.set_attribute) and the analogous success branch so send_callback is post-update and wrapped in exception handling.backend/app/services/collections/create_collection.py (1)
39-76:⚠️ Potential issue | 🟡 MinorFix the
start_jobreturn type.Line 46 says
-> str, but Line 76 returnscollection_job_id, which is aUUID. Align the annotation with the implementation unless callers require a string. As per coding guidelines,**/*.py: Always add type hints to all function parameters and return values in Python code.🛠️ Proposed fix
) -> str: + ) -> UUID:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/collections/create_collection.py` around lines 39 - 76, The function start_job has a return type annotation of -> str but actually returns collection_job_id (a UUID); update the signature to return UUID (use the same UUID type used for the collection_job_id parameter) so the annotation matches the implementation, i.e., change -> str to -> UUID in start_job and ensure UUID is imported where needed; alternatively, if callers expect a string, convert the returned value to str(collection_job_id) before returning—make this consistent with usages of start_job in the codebase (refer to start_job, collection_job_id, and start_create_collection_job).
♻️ Duplicate comments (1)
backend/app/core/telemetry.py (1)
356-368:⚠️ Potential issue | 🟡 MinorAvoid setting undefined OpenTelemetry attribute values.
Line 363 only checks key presence, so
temperature=Nonestill reachesspan.set_attribute(...); Line 367 also records emptytoolsvalues. Keep these attributes unset unless the value is meaningful. This is a duplicate of the earlier review finding on this same block.🛡️ Proposed fix
for attr_key, param_key in ( ("gen_ai.request.temperature", "temperature"), ("gen_ai.request.max_tokens", "max_tokens"), ("gen_ai.request.top_p", "top_p"), ("gen_ai.request.presence_penalty", "presence_penalty"), ("gen_ai.request.frequency_penalty", "frequency_penalty"), ): - if param_key in params: - span.set_attribute(attr_key, params.get(param_key)) + value = params.get(param_key) + if value is not None: + span.set_attribute(attr_key, value) tools = params.get("tools") - if tools is not None: + if tools: span.set_attribute("gen_ai.request.available_tools", json.dumps(tools))OpenTelemetry Python Span.set_attribute behavior when attribute value is None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/core/telemetry.py` around lines 356 - 368, The telemetry code currently sets OpenTelemetry attributes even when values are None or empty; update the loop over ("gen_ai.request.temperature", ...) in the function/method using span so that you read value = params.get(param_key) and only call span.set_attribute(attr_key, value) when value is not None (and for numeric/boolean semantics consider explicitly checking value is not None), and change the tools handling (variable tools = params.get("tools")) to only set "gen_ai.request.available_tools" when tools is truthy/non-empty (e.g., if tools: span.set_attribute(..., json.dumps(tools))) to avoid recording undefined or empty attributes.
🧹 Nitpick comments (3)
backend/app/core/telemetry.py (2)
514-559: Add parameter type hints to the SQLAlchemy event callbacks.These nested listener functions return
None, but their parameters are untyped.Anyis fine here because SQLAlchemy callback argument types are dynamic. As per coding guidelines,**/*.py: Always add type hints to all function parameters and return values in Python code.♻️ Proposed fix
`@event.listens_for`(engine, "before_cursor_execute") def _before_cursor_execute( - conn, cursor, statement, parameters, context, executemany + conn: Any, + cursor: Any, + statement: Any, + parameters: Any, + context: Any, + executemany: bool, ) -> None: @@ `@event.listens_for`(engine, "after_cursor_execute") def _after_cursor_execute( - conn, cursor, statement, parameters, context, executemany + conn: Any, + cursor: Any, + statement: Any, + parameters: Any, + context: Any, + executemany: bool, ) -> None: @@ `@event.listens_for`(engine, "handle_error") - def _handle_error(exception_context) -> None: + def _handle_error(exception_context: Any) -> None: @@ `@event.listens_for`(engine.pool, "checkout") - def _on_checkout(dbapi_connection, connection_record, connection_proxy) -> None: + def _on_checkout( + dbapi_connection: Any, + connection_record: Any, + connection_proxy: Any, + ) -> None: @@ `@event.listens_for`(engine.pool, "checkin") - def _on_checkin(dbapi_connection, connection_record) -> None: + def _on_checkin(dbapi_connection: Any, connection_record: Any) -> None:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/core/telemetry.py` around lines 514 - 559, Update the SQLAlchemy event listener functions to add parameter type hints (use typing.Any for dynamic SQLAlchemy args) and explicit return type None: _before_cursor_execute(conn: Any, cursor: Any, statement: Any, parameters: Any, context: Any, executemany: Any) -> None, _after_cursor_execute(conn: Any, cursor: Any, statement: Any, parameters: Any, context: Any, executemany: Any) -> None, _handle_error(exception_context: Any) -> None, _on_checkout(dbapi_connection: Any, connection_record: Any, connection_proxy: Any) -> None, and _on_checkin(dbapi_connection: Any, connection_record: Any) -> None; also add "from typing import Any" at the top of the module if not present.
563-563: Use direct assignment for the instrumentation marker.Ruff is right here: constant-name
setattris less clear than direct assignment. Keep theattr-definedsuppression if type checking complains aboutengine: object.♻️ Proposed fix
- setattr(engine, "_kaapi_db_telemetry_instrumented", True) + engine._kaapi_db_telemetry_instrumented = True # type: ignore[attr-defined]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/core/telemetry.py` at line 563, Replace the setattr call that sets the instrumentation marker with a direct attribute assignment on the engine object: change setattr(engine, "_kaapi_db_telemetry_instrumented", True) to engine._kaapi_db_telemetry_instrumented = True; if type checkers complain about engine being typed as object, retain the existing attr-defined suppression comment so static checks remain satisfied; ensure you update the code around the instrumentation logic that references engine and the "_kaapi_db_telemetry_instrumented" attribute.backend/app/services/llm/jobs.py (1)
55-63: Type the provider callable explicitly.Line 57 leaves
funcuntyped, and Line 60 uses a baredict. This helper is on the LLM execution path, so the callable contract should be explicit. As per coding guidelines,**/*.py: Always add type hints to all function parameters and return values in Python code.♻️ Proposed fix
+from collections.abc import Callable @@ def _execute_provider_call( *, - func, + func: Callable[..., tuple[Any, Any]], completion_config: Any, query: QueryParams, - credentials: dict | None, + credentials: dict[str, Any] | None, session_id: str | None, **kwargs: Any, ) -> tuple[Any, Any]:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 55 - 63, The helper _execute_provider_call has an untyped callable and a bare dict; update its signature to type func as a Callable that returns the same tuple[Any, Any] (e.g. Callable[..., tuple[Any, Any]] or a more specific Callable[[Any, QueryParams, dict[str, Any] | None, str | None], tuple[Any, Any]]), and change credentials from bare dict | None to dict[str, Any] | None (or Mapping[str, Any] | None) and add the necessary imports (Callable, Mapping) from typing. Ensure the updated type hints are used for func, credentials, and any other parameters so the function signature and return annotation are fully typed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/services/collections/create_collection.py`:
- Around line 276-307: The exception handler and the success/failure callback
invocations must not let callback failures change the collection job outcome:
wrap the call to send_callback(creation_request.callback_url, success_payload)
and the later send_callback(..., failure_payload) each in their own try/except
that catches and logs the callback error (include context like collection_job_id
and callback URL) but does NOT re-raise; when handling the original exception in
the except block, ensure you preserve and re-raise the original exception (err)
and do not let any exception from the failure callback mask it—i.e., call
send_callback inside a try/except that logs warnings and continues, then call
_mark_job_failed(...) and finally re-raise the original err; also apply the same
non-fatal try/except around provider.delete(result) cleanup so cleanup/callback
failures don't overwrite job failure handling.
---
Outside diff comments:
In `@backend/app/services/collections/create_collection.py`:
- Around line 39-76: The function start_job has a return type annotation of ->
str but actually returns collection_job_id (a UUID); update the signature to
return UUID (use the same UUID type used for the collection_job_id parameter) so
the annotation matches the implementation, i.e., change -> str to -> UUID in
start_job and ensure UUID is imported where needed; alternatively, if callers
expect a string, convert the returned value to str(collection_job_id) before
returning—make this consistent with usages of start_job in the codebase (refer
to start_job, collection_job_id, and start_create_collection_job).
In `@backend/app/services/collections/delete_collection.py`:
- Around line 29-65: The function start_job currently annotates its return as
str but returns collection_job_id (a UUID); fix by making the annotation match
the implementation or vice versa: either change the return type annotation on
start_job to -> UUID (and ensure UUID is imported) if callers expect a UUID, or
return str(collection_job_id) and update any callers/types accordingly;
reference the start_job function, collection_job_id variable, and the return
statement to locate the change.
- Around line 143-149: The callback sending and failure-marking logic (calls to
send_callback and build_failure_payload and the helper
_mark_job_failed_and_callback) must be isolated so errors in sending callbacks
do not change the deletion job state or swallow the original exception: wrap
send_callback and the code path that builds/sends failure callbacks in their own
try/except blocks, log any callback/send errors but do not call
_mark_job_failed_and_callback from inside the same broad exception path that
handles the deletion result, and ensure that any exception raised during
deletion is re-raised after attempts to notify; likewise protect
_mark_job_failed_and_callback itself with its own try/except so its failures are
logged and do not prevent the original exception from being re-raised. Ensure
you reference and modify the code around send_callback, build_failure_payload,
and _mark_job_failed_and_callback to implement these isolated try/excepts and
re-raise the original exception at the end of the outer exception handler.
In `@backend/app/services/llm/jobs.py`:
- Around line 494-583: The provider setup and execution error paths
(get_llm_provider ValueError, inner try ValueError, and any exceptions that
bubble to the outer except) return before calling record_llm_call_finished,
undercounting errors; update the failure exits to always call
record_llm_call_finished(provider=provider_name, model=model_name,
operation="chat", duration_ms=(time.perf_counter()-provider_started_at)*1000,
error=True, organization_id=organization_id, project_id=project_id) before
returning and ensure provider_started_at, provider_name and model_name are
initialized for the outer except path (e.g. set defaults before provider lookup)
so the outer exception handler can emit the same metric and then return the
BlockResult (also keep existing ai_span.set_status calls where applicable).
- Around line 182-198: The span currently records the full callback URL and the
code sends callbacks before updating DB state; change both success and failure
flows to update the job status first via JobCrud.update(JobUpdate(...)) and then
attempt a best-effort send_callback inside a try/except so callback failures do
not prevent DB updates, and remove the raw URL from the span (use an obfuscated
value or just record the host/domain or a hash under
tracer.start_as_current_span("llm.send_callback") instead of the full
callback_url) while recording any send error in the span or logs; apply this
pattern to the failure branch shown (send_callback, cb_span.set_attribute) and
the analogous success branch so send_callback is post-update and wrapped in
exception handling.
---
Duplicate comments:
In `@backend/app/core/telemetry.py`:
- Around line 356-368: The telemetry code currently sets OpenTelemetry
attributes even when values are None or empty; update the loop over
("gen_ai.request.temperature", ...) in the function/method using span so that
you read value = params.get(param_key) and only call
span.set_attribute(attr_key, value) when value is not None (and for
numeric/boolean semantics consider explicitly checking value is not None), and
change the tools handling (variable tools = params.get("tools")) to only set
"gen_ai.request.available_tools" when tools is truthy/non-empty (e.g., if tools:
span.set_attribute(..., json.dumps(tools))) to avoid recording undefined or
empty attributes.
---
Nitpick comments:
In `@backend/app/core/telemetry.py`:
- Around line 514-559: Update the SQLAlchemy event listener functions to add
parameter type hints (use typing.Any for dynamic SQLAlchemy args) and explicit
return type None: _before_cursor_execute(conn: Any, cursor: Any, statement: Any,
parameters: Any, context: Any, executemany: Any) -> None,
_after_cursor_execute(conn: Any, cursor: Any, statement: Any, parameters: Any,
context: Any, executemany: Any) -> None, _handle_error(exception_context: Any)
-> None, _on_checkout(dbapi_connection: Any, connection_record: Any,
connection_proxy: Any) -> None, and _on_checkin(dbapi_connection: Any,
connection_record: Any) -> None; also add "from typing import Any" at the top of
the module if not present.
- Line 563: Replace the setattr call that sets the instrumentation marker with a
direct attribute assignment on the engine object: change setattr(engine,
"_kaapi_db_telemetry_instrumented", True) to
engine._kaapi_db_telemetry_instrumented = True; if type checkers complain about
engine being typed as object, retain the existing attr-defined suppression
comment so static checks remain satisfied; ensure you update the code around the
instrumentation logic that references engine and the
"_kaapi_db_telemetry_instrumented" attribute.
In `@backend/app/services/llm/jobs.py`:
- Around line 55-63: The helper _execute_provider_call has an untyped callable
and a bare dict; update its signature to type func as a Callable that returns
the same tuple[Any, Any] (e.g. Callable[..., tuple[Any, Any]] or a more specific
Callable[[Any, QueryParams, dict[str, Any] | None, str | None], tuple[Any,
Any]]), and change credentials from bare dict | None to dict[str, Any] | None
(or Mapping[str, Any] | None) and add the necessary imports (Callable, Mapping)
from typing. Ensure the updated type hints are used for func, credentials, and
any other parameters so the function signature and return annotation are fully
typed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a42435d5-b514-4923-8154-ae1f2e20560c
⛔ Files ignored due to path filters (1)
backend/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
backend/app/api/deps.pybackend/app/api/routes/llm.pybackend/app/core/telemetry.pybackend/app/services/collections/create_collection.pybackend/app/services/collections/delete_collection.pybackend/app/services/llm/jobs.pybackend/pyproject.toml
✅ Files skipped from review due to trivial changes (1)
- backend/pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/app/api/deps.py
- backend/app/api/routes/llm.py
| if creation_request.callback_url: | ||
| send_callback(creation_request.callback_url, success_payload) | ||
|
|
||
| except Exception as err: | ||
| span.record_exception(err) | ||
| span.set_status(trace.Status(trace.StatusCode.ERROR, str(err))) | ||
| logger.error( | ||
| "[create_collection.execute_job] Collection Creation Failed | {'collection_job_id': '%s', 'error': '%s'}", | ||
| job_id, | ||
| str(err), | ||
| exc_info=True, | ||
| ) | ||
| collection_crud.create(collection) | ||
| collection = collection_crud.read_one(collection.id) | ||
|
|
||
| if flat_docs: | ||
| DocumentCollectionCrud(session).create(collection, flat_docs) | ||
| if provider is not None and result is not None: | ||
| try: | ||
| provider.delete(result) | ||
| except Exception: | ||
| logger.warning( | ||
| "[create_collection.execute_job] Provider cleanup failed" | ||
| ) | ||
|
|
||
| collection_job_crud = CollectionJobCrud(session, project_id) | ||
| collection_job = collection_job_crud.update( | ||
| collection_job.id, | ||
| CollectionJobUpdate( | ||
| status=CollectionJobStatus.SUCCESSFUL, | ||
| collection_id=collection.id, | ||
| ), | ||
| collection_job = _mark_job_failed( | ||
| project_id=project_id, | ||
| job_id=job_id, | ||
| err=err, | ||
| collection_job=collection_job, | ||
| ) | ||
|
|
||
| success_payload = build_success_payload(collection_job, collection) | ||
|
|
||
| elapsed = time.time() - start_time | ||
| logger.info( | ||
| "[create_collection.execute_job] Collection created: %s | Time: %.2fs | Files: %d | Total Size: %s MB | Types: %s", | ||
| collection_id, | ||
| elapsed, | ||
| len(flat_docs), | ||
| collection_job.total_size_mb, | ||
| list(file_exts), | ||
| ) | ||
|
|
||
| if creation_request.callback_url: | ||
| send_callback(creation_request.callback_url, success_payload) | ||
|
|
||
| except Exception as err: | ||
| logger.error( | ||
| "[create_collection.execute_job] Collection Creation Failed | {'collection_job_id': '%s', 'error': '%s'}", | ||
| job_id, | ||
| str(err), | ||
| exc_info=True, | ||
| ) | ||
|
|
||
| if provider is not None and result is not None: | ||
| try: | ||
| provider.delete(result) | ||
| except Exception: | ||
| logger.warning( | ||
| "[create_collection.execute_job] Provider cleanup failed" | ||
| ) | ||
|
|
||
| collection_job = _mark_job_failed( | ||
| project_id=project_id, | ||
| job_id=job_id, | ||
| err=err, | ||
| collection_job=collection_job, | ||
| ) | ||
|
|
||
| if creation_request and creation_request.callback_url and collection_job: | ||
| failure_payload = build_failure_payload(collection_job, str(err)) | ||
| send_callback(creation_request.callback_url, failure_payload) | ||
| if creation_request and creation_request.callback_url and collection_job: | ||
| failure_payload = build_failure_payload(collection_job, str(err)) | ||
| send_callback(creation_request.callback_url, failure_payload) | ||
| raise |
There was a problem hiding this comment.
Do not let callback failures change the collection job outcome.
Line 277 is inside the main creation try: if the success callback raises after the collection is created and persisted, the except marks the job failed and may delete the provider resource. Line 306 has the inverse problem: failure-callback errors can mask the original exception before the bare raise.
🛡️ Proposed fix
if creation_request.callback_url:
- send_callback(creation_request.callback_url, success_payload)
+ try:
+ send_callback(creation_request.callback_url, success_payload)
+ except Exception:
+ logger.warning(
+ "[create_collection.execute_job] Success callback failed | collection_job_id=%s",
+ job_id,
+ exc_info=True,
+ )
@@
if creation_request and creation_request.callback_url and collection_job:
failure_payload = build_failure_payload(collection_job, str(err))
- send_callback(creation_request.callback_url, failure_payload)
+ try:
+ send_callback(creation_request.callback_url, failure_payload)
+ except Exception:
+ logger.warning(
+ "[create_collection.execute_job] Failure callback failed | collection_job_id=%s",
+ job_id,
+ exc_info=True,
+ )
raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/collections/create_collection.py` around lines 276 -
307, The exception handler and the success/failure callback invocations must not
let callback failures change the collection job outcome: wrap the call to
send_callback(creation_request.callback_url, success_payload) and the later
send_callback(..., failure_payload) each in their own try/except that catches
and logs the callback error (include context like collection_job_id and callback
URL) but does NOT re-raise; when handling the original exception in the except
block, ensure you preserve and re-raise the original exception (err) and do not
let any exception from the failure callback mask it—i.e., call send_callback
inside a try/except that logs warnings and continues, then call
_mark_job_failed(...) and finally re-raise the original err; also apply the same
non-fatal try/except around provider.delete(result) cleanup so cleanup/callback
failures don't overwrite job failure handling.
| from sentry_sdk.integrations.celery import CeleryIntegration | ||
| from sentry_sdk.integrations.fastapi import FastApiIntegration | ||
| from sentry_sdk.integrations.httpx import HttpxIntegration | ||
| from sentry_sdk.integrations.logging import LoggingIntegration | ||
| from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration | ||
| from sentry_sdk.integrations.starlette import StarletteIntegration |
There was a problem hiding this comment.
this can't be grouped ... since module imported is depends on their own specific sub-modules ..
| with log_context( | ||
| tag="llm-chain", | ||
| lifecycle="llm.chain.execute_job", | ||
| job_id=job_uuid, | ||
| task_id=task_id, | ||
| project_id=project_id, | ||
| organization_id=organization_id, | ||
| total_blocks=len(request.blocks), | ||
| ): | ||
| logger.info( | ||
| f"[execute_chain_job] Starting chain execution | " | ||
| f"job_id={job_uuid}, total_blocks={len(request.blocks)}" | ||
| ) | ||
|
|
||
| try: | ||
| with Session(engine) as session: | ||
| chain_record = create_llm_chain( | ||
| session, | ||
| try: | ||
| with Session(engine) as session: |
There was a problem hiding this comment.
too many nesting like with -> try -> with
| usage = response.usage | ||
| if usage: | ||
| ai_span.set_attribute("llm.usage.total_tokens", usage.total_tokens) | ||
| ai_span.set_attribute("kaapi_llm_input_tokens", usage.input_tokens) |
There was a problem hiding this comment.
do we need to mention name like kaapi_llm_input_tokens or just llm_input_tokens works
| ) | ||
|
|
||
|
|
||
| def record_celery_task_started(task: object | None) -> None: |
There was a problem hiding this comment.
are we using this ? cant seem to find reference to this code
| _emit_celery_worker_gauges(active, pid) | ||
|
|
||
|
|
||
| def record_celery_task_finished(task: object | None, state: str | None) -> None: |
There was a problem hiding this comment.
are we using this ? cant seem to find reference to this code
| from sentry_sdk.integrations.logging import LoggingIntegration | ||
| from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration | ||
|
|
||
| sentry_sdk.init( |
There was a problem hiding this comment.
this is mostly repeat of sentry_sdk.init in backend/app/main.py, can we use something smarter
|
|
||
| async def http_request_logger(request: Request, call_next) -> Response: | ||
| start_time = time.time() | ||
| method = request.method |
There was a problem hiding this comment.
Got this from using claude to review, can you check if it makes sense
backend/app/core/middleware.py:13-14
method = request.method
route = request.url.path
request.url.path is the concrete URL — e.g. /api/v1/collections/123e4567-e89b-12d3-a456-426614174000 — and it's then pushed into OTel
span attributes and Sentry tags at middleware.py:16-25, 31-33, 42-53, 68-77.
Why it's a problem
- Cardinality blow-up. Every UUID in the path becomes a unique http.route tag value. Sentry bills and indexes tags per unique value;
your /collections/{id}, /llm/call/{job_id}, /projects/{pid}/... endpoints will fan out to thousands of tag values. Dashboards
filtering by route become unusable because each request is its own row. - PII leakage. Paths can contain user-identifying values — email-like slugs in some routes, callback URLs with query strings, API keys
in path params on legacy endpoints. sentry_sdk.set_tag("http.route", "/users/akhilesh@…") writes that identifier into the tag index
and ships it to Sentry regardless of scrubbing rules, which only apply to PII in known fields. - It breaks AI Insights / Performance grouping. Sentry's Performance module groups transactions by http.route. With the raw path,
every UUID gets its own transaction group — the "slowest endpoints" view is meaningless.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
backend/app/celery/celery_app.py (1)
80-123: Hoist thetask_postrunflush handler and tighten the signature.Two small things on
initialize_worker_process:
**_is missing a type annotation, unlike the other signal handlers in this file (**_: object). As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".- Nesting
@task_postrun.connect(weak=False)inside another signal handler works (it fires once per forked process), but it makes the task-flush wiring invisible from a module-level read and couples telemetry flushing to the Sentry/OTel init block. Defining it at module level (next tolog_pool_status_post) keeps alltask_postrunsubscribers co-located and removes the closure-lifetime concern that motivatedweak=Falsein the first place.♻️ Proposed refactor
-@worker_process_init.connect -def initialize_worker_process(**_) -> None: +@worker_process_init.connect +def initialize_worker_process(**_: object) -> None: """Initialize each forked Celery worker process. @@ - from app.core.telemetry import flush_telemetry, setup_telemetry - - setup_telemetry(service_name="kaapi-celery") - - `@task_postrun.connect`(weak=False) - def _flush_otel_after_task(**_: object) -> None: - flush_telemetry() - - import app.services.llm.jobs # noqa: F401 + from app.core.telemetry import setup_telemetry + + setup_telemetry(service_name="kaapi-celery") + + import app.services.llm.jobs # noqa: F401 + + +@task_postrun.connect(weak=False) +def _flush_otel_after_task(**_: object) -> None: + from app.core.telemetry import flush_telemetry + + flush_telemetry()As per coding guidelines: "Always add type hints to all function parameters and return values in Python code".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/celery/celery_app.py` around lines 80 - 123, The task_postrun flush handler currently lives nested inside initialize_worker_process and uses an untyped **_ parameter; move the `@task_postrun.connect`(weak=False) subscriber out to module scope (co-locate it with log_pool_status_post) so task-flush wiring is visible and not coupled to Sentry/OTel init, and change the handler signature to def _flush_otel_after_task(**_: object) -> None to add the missing type hint; the handler should simply call flush_telemetry() and retain weak=False on the decorator.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/api/routes/collections.py`:
- Around line 107-132: The job record uses deduplicated IDs (unique_documents)
but create_service.start_job is still passed the original request, so the worker
may process duplicates; update the call to create_service.start_job to pass a
request object whose documents field is the deduplicated list (use
unique_documents or set request.documents = unique_documents before the call, or
create a shallow copy of request with documents=[str(doc_id) for doc_id in
unique_documents]) so the worker receives the same deduplicated document list
used to create the CollectionJob (references: unique_documents,
CollectionJobCreate, create_service.start_job).
In `@backend/app/tests/services/collections/test_delete_collection.py`:
- Around line 373-376: The test functions (e.g.,
test_execute_job_local_delete_failure_after_remote_success_marks_failed) are
missing a type annotation for the db fixture; add a parameter type hint db:
Session and ensure you import Session (e.g., from sqlalchemy.orm import Session)
at the top of the test module, and apply the same change to the other new
test(s) referenced (around lines 441-444) so all fixture parameters are properly
typed.
---
Nitpick comments:
In `@backend/app/celery/celery_app.py`:
- Around line 80-123: The task_postrun flush handler currently lives nested
inside initialize_worker_process and uses an untyped **_ parameter; move the
`@task_postrun.connect`(weak=False) subscriber out to module scope (co-locate it
with log_pool_status_post) so task-flush wiring is visible and not coupled to
Sentry/OTel init, and change the handler signature to def
_flush_otel_after_task(**_: object) -> None to add the missing type hint; the
handler should simply call flush_telemetry() and retain weak=False on the
decorator.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 60326658-4721-4c8c-bd25-5fa0190dceb8
📒 Files selected for processing (6)
backend/app/api/routes/collections.pybackend/app/celery/celery_app.pybackend/app/core/middleware.pybackend/app/services/llm/jobs.pybackend/app/tests/services/collections/test_create_collection.pybackend/app/tests/services/collections/test_delete_collection.py
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/app/services/llm/jobs.py
- backend/app/core/middleware.py
| unique_documents = list(dict.fromkeys(request.documents)) | ||
|
|
||
| collection_job_crud = CollectionJobCrud(session, current_user.project_.id) | ||
| collection_job = collection_job_crud.create( | ||
| CollectionJobCreate( | ||
| action_type=CollectionActionType.CREATE, | ||
| project_id=current_user.project_.id, | ||
| status=CollectionJobStatus.PENDING, | ||
| docs_num=len(unique_documents), | ||
| documents=[str(doc_id) for doc_id in unique_documents], | ||
| ) | ||
| ) | ||
|
|
||
| unique_documents = list(dict.fromkeys(request.documents)) | ||
| # True if both model and instructions were provided in the request body | ||
| with_assistant = bool( | ||
| getattr(request, "model", None) and getattr(request, "instructions", None) | ||
| ) | ||
|
|
||
| collection_job_crud = CollectionJobCrud(session, current_user.project_.id) | ||
| collection_job = collection_job_crud.create( | ||
| CollectionJobCreate( | ||
| action_type=CollectionActionType.CREATE, | ||
| create_service.start_job( | ||
| db=session, | ||
| request=request, | ||
| collection_job_id=collection_job.id, | ||
| project_id=current_user.project_.id, | ||
| status=CollectionJobStatus.PENDING, | ||
| docs_num=len(unique_documents), | ||
| documents=[str(doc_id) for doc_id in unique_documents], | ||
| organization_id=current_user.organization_.id, | ||
| with_assistant=with_assistant, | ||
| ) |
There was a problem hiding this comment.
Pass the deduplicated request to the worker.
Line 107 deduplicates documents for CollectionJobCreate, but Line 127 still dispatches the original request; duplicate document IDs can make the job record report one set while the Celery worker processes another.
🐛 Proposed fix
unique_documents = list(dict.fromkeys(request.documents))
+ normalized_request = request.model_copy(
+ update={"documents": unique_documents}
+ )
collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
@@
create_service.start_job(
db=session,
- request=request,
+ request=normalized_request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| unique_documents = list(dict.fromkeys(request.documents)) | |
| collection_job_crud = CollectionJobCrud(session, current_user.project_.id) | |
| collection_job = collection_job_crud.create( | |
| CollectionJobCreate( | |
| action_type=CollectionActionType.CREATE, | |
| project_id=current_user.project_.id, | |
| status=CollectionJobStatus.PENDING, | |
| docs_num=len(unique_documents), | |
| documents=[str(doc_id) for doc_id in unique_documents], | |
| ) | |
| ) | |
| unique_documents = list(dict.fromkeys(request.documents)) | |
| # True if both model and instructions were provided in the request body | |
| with_assistant = bool( | |
| getattr(request, "model", None) and getattr(request, "instructions", None) | |
| ) | |
| collection_job_crud = CollectionJobCrud(session, current_user.project_.id) | |
| collection_job = collection_job_crud.create( | |
| CollectionJobCreate( | |
| action_type=CollectionActionType.CREATE, | |
| create_service.start_job( | |
| db=session, | |
| request=request, | |
| collection_job_id=collection_job.id, | |
| project_id=current_user.project_.id, | |
| status=CollectionJobStatus.PENDING, | |
| docs_num=len(unique_documents), | |
| documents=[str(doc_id) for doc_id in unique_documents], | |
| organization_id=current_user.organization_.id, | |
| with_assistant=with_assistant, | |
| ) | |
| unique_documents = list(dict.fromkeys(request.documents)) | |
| normalized_request = request.model_copy( | |
| update={"documents": unique_documents} | |
| ) | |
| collection_job_crud = CollectionJobCrud(session, current_user.project_.id) | |
| collection_job = collection_job_crud.create( | |
| CollectionJobCreate( | |
| action_type=CollectionActionType.CREATE, | |
| project_id=current_user.project_.id, | |
| status=CollectionJobStatus.PENDING, | |
| docs_num=len(unique_documents), | |
| documents=[str(doc_id) for doc_id in unique_documents], | |
| ) | |
| ) | |
| # True if both model and instructions were provided in the request body | |
| with_assistant = bool( | |
| getattr(request, "model", None) and getattr(request, "instructions", None) | |
| ) | |
| create_service.start_job( | |
| db=session, | |
| request=normalized_request, | |
| collection_job_id=collection_job.id, | |
| project_id=current_user.project_.id, | |
| organization_id=current_user.organization_.id, | |
| with_assistant=with_assistant, | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/api/routes/collections.py` around lines 107 - 132, The job record
uses deduplicated IDs (unique_documents) but create_service.start_job is still
passed the original request, so the worker may process duplicates; update the
call to create_service.start_job to pass a request object whose documents field
is the deduplicated list (use unique_documents or set request.documents =
unique_documents before the call, or create a shallow copy of request with
documents=[str(doc_id) for doc_id in unique_documents]) so the worker receives
the same deduplicated document list used to create the CollectionJob
(references: unique_documents, CollectionJobCreate, create_service.start_job).
| @patch("app.services.collections.delete_collection.get_llm_provider") | ||
| def test_execute_job_local_delete_failure_after_remote_success_marks_failed( | ||
| mock_get_llm_provider: MagicMock, db | ||
| ) -> None: |
There was a problem hiding this comment.
Add type hints for the new db fixture parameters.
Both newly added test functions omit the Session type on db.
Proposed fix
def test_execute_job_local_delete_failure_after_remote_success_marks_failed(
- mock_get_llm_provider: MagicMock, db
+ mock_get_llm_provider: MagicMock, db: Session
) -> None:
@@
def test_execute_job_provider_factory_failure_marks_job_failed(
- mock_get_llm_provider: MagicMock, db
+ mock_get_llm_provider: MagicMock, db: Session
) -> None:As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.
Also applies to: 441-444
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/tests/services/collections/test_delete_collection.py` around
lines 373 - 376, The test functions (e.g.,
test_execute_job_local_delete_failure_after_remote_success_marks_failed) are
missing a type annotation for the db fixture; add a parameter type hint db:
Session and ensure you import Session (e.g., from sqlalchemy.orm import Session)
at the top of the test module, and apply the same change to the other new
test(s) referenced (around lines 441-444) so all fixture parameters are properly
typed.
Summary
Target issue is #760
Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.