Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughApplies a gevent-based soft-timeout decorator to many Celery task entrypoints and centralizes explicit timeout handling across collection, LLM, document-transform, STT, and TTS job services, mapping gevent/Celery timeouts to a uniform TimeoutError, marking runs/jobs FAILED, and sending failure callbacks where configured. (≤50 words) ChangesSoft-timeout enforcement & failure handling
Sequence Diagram(s)sequenceDiagram
participant Celery as "Celery Task"
participant Decorator as "gevent_timeout decorator"
participant Service as "execute_* service"
participant DB as "Database / CRUD"
participant Callback as "Callback / Webhook"
rect rgba(200,200,255,0.5)
Celery->>Decorator: task invoked (trace_id, args...)
Decorator->>Decorator: start gevent.Timeout(seconds)
end
Celery->>Service: _set_trace(trace_id) + call execute_*(..., task_id, task_instance)
Service->>DB: update job/run status (in-flight / failed)
Service->>Callback: send failure callback if configured
Service-->>Celery: returns or raises (Timeout or other)
rect rgba(255,200,200,0.5)
Decorator->>Service: on gevent.Timeout -> Timeout propagated
Service->>DB: mark FAILED with timeout message
Service->>Callback: send failure callback (if configured)
Decorator-->>Celery: re-raise Timeout
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
c15e0f3 to
9b24343
Compare
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/llm/jobs.py (1)
193-225:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPersist the FAILED state before attempting the callback.
send_callback()still runs before the DB updates here. If the callback delivery fails,handle_job_error()exits beforeJobCrud.update()andupdate_llm_chain_status(), so the new timeout/error paths can leave the job and chain stuck in a non-terminal state.Suggested fix
def handle_job_error( job_id: UUID, callback_url: str | None, callback_response: APIResponse, organization_id: int | None = None, project_id: int | None = None, chain_id: UUID | None = None, ) -> dict: """Handle job failure uniformly — send callback and update DB.""" - if callback_url: - webhook_secret = get_webhook_secret(project_id, organization_id) - with tracer.start_as_current_span("llm.send_callback") as cb_span: - cb_span.set_attribute("callback.url", callback_url) - cb_span.set_attribute("callback.status", "failure") - send_callback( - callback_url=callback_url, - data=callback_response.model_dump(), - webhook_secret=webhook_secret, - ) - with Session(engine) as session: JobCrud(session=session).update( job_id=job_id, job_update=JobUpdate( status=JobStatus.FAILED, error_message=callback_response.error, ), ) - if chain_id: + if chain_id is not None: try: update_llm_chain_status( session, chain_id=chain_id, status=ChainStatus.FAILED, error=callback_response.error, ) except Exception as update_err: logger.error( f"[handle_job_error] Failed to update chain status: {update_err} | " f"chain_id={chain_id}", exc_info=True, ) + + if callback_url: + try: + webhook_secret = get_webhook_secret(project_id, organization_id) + with tracer.start_as_current_span("llm.send_callback") as cb_span: + cb_span.set_attribute("callback.url", callback_url) + cb_span.set_attribute("callback.status", "failure") + send_callback( + callback_url=callback_url, + data=callback_response.model_dump(), + webhook_secret=webhook_secret, + ) + except Exception as callback_err: + logger.error( + f"[handle_job_error] Failed to send callback: {callback_err} | job_id={job_id}", + exc_info=True, + ) return callback_response.model_dump()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 193 - 225, The code calls send_callback() before persisting the FAILED state, so a callback failure can prevent JobCrud.update() and update_llm_chain_status() from running; change handle_job_error to first open a DB session and persist the JobStatus.FAILED (via JobCrud.update) and, if applicable, call update_llm_chain_status (catching and logging its exceptions), commit/close the session, and only then start the tracer span and call send_callback() (catching/logging send_callback errors) so callback delivery cannot block saving terminal job/chain state; reference send_callback, JobCrud.update, update_llm_chain_status and the tracer span name ("llm.send_callback") when locating the code to reorder and add exception handling.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 95-99: Annotate the Celery bound task function
run_collection_batch_job: type the first parameter as celery.Task (i.e., self:
celery.Task), annotate job_id and trace_id as str (they already appear but
ensure explicit hints), type **kwargs as Any (from typing import Any), and add
an explicit return type that matches the return type of execute_batch_job
(import or reference its declared return type) so the signature is fully typed;
update imports if needed to include celery and Any and ensure the function
signature and return annotation align with execute_batch_job.
In `@backend/app/celery/utils.py`:
- Around line 229-230: The timeout log currently interpolates raw args/kwargs
(f"[{name}] Timed out after {seconds}s — args={args}, kwargs={kwargs}"), which
can leak secrets/PII; change it to log a sanitized summary instead. Replace the
direct interpolation of args and kwargs in the timeout message with a
redacted/summarized representation (e.g., counts, types, truncated values, or a
dedicated sanitizer function like redact_task_payload) and/or call a helper
(e.g., redact_task_payload(args, kwargs)) that strips/masks sensitive data
before logging; retain name and seconds as before and ensure the new helper is
used wherever this log occurs.
In `@backend/app/services/collections/create_collection.py`:
- Around line 324-326: The timeout error message currently uses the wrong
function prefix "[execute_setup_job]"; update the string construction for
timeout_err so the prefix matches the enclosing function name (replace
"[execute_setup_job]" with the actual function name used in
create_collection.py, e.g., "[create_collection]") so logs follow the
"[function_name] ..." convention and keep the rest of the message intact (the
f-string and err.seconds usage should remain unchanged).
In `@backend/app/services/doctransform/job.py`:
- Around line 127-132: The log messages inside the _handle_job_failure function
use the wrong prefix "[doc_transform.execute_job]"; update those logger.error
calls to use the current function name in square brackets (e.g.
"[doc_transform._handle_job_failure]") so log filtering is accurate—locate the
logger.error invocations in _handle_job_failure (including the call that logs
failed to persist FAILED status and the later similar logger.error at lines
referenced) and change the prefix string accordingly while preserving the rest
of the message and parameters (context_suffix, job_uuid, db_error, etc.).
In `@backend/app/services/llm/jobs.py`:
- Around line 973-975: The callback payload currently embeds a log prefix and
the raw timeout object; update the assignment to APIResponse.failure_response in
execute_chain_job so the error string is a stable message like "Chain job
exceeded soft time limit" (do not include "[execute_chain_job]" or the err
object), and instead log the detailed err separately (e.g., logger.exception or
logger.error) before returning callback_response; keep request.request_metadata
unchanged so the client-stored error is clean and consistent with execute_job.
---
Outside diff comments:
In `@backend/app/services/llm/jobs.py`:
- Around line 193-225: The code calls send_callback() before persisting the
FAILED state, so a callback failure can prevent JobCrud.update() and
update_llm_chain_status() from running; change handle_job_error to first open a
DB session and persist the JobStatus.FAILED (via JobCrud.update) and, if
applicable, call update_llm_chain_status (catching and logging its exceptions),
commit/close the session, and only then start the tracer span and call
send_callback() (catching/logging send_callback errors) so callback delivery
cannot block saving terminal job/chain state; reference send_callback,
JobCrud.update, update_llm_chain_status and the tracer span name
("llm.send_callback") when locating the code to reorder and add exception
handling.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bc00e2d4-6bae-428b-86f8-7497e8fc0a61
📒 Files selected for processing (10)
backend/app/celery/tasks/job_execution.pybackend/app/celery/utils.pybackend/app/services/collections/create_collection.pybackend/app/services/collections/delete_collection.pybackend/app/services/doctransform/job.pybackend/app/services/llm/jobs.pybackend/app/services/stt_evaluations/batch_job.pybackend/app/services/stt_evaluations/metric_job.pybackend/app/services/tts_evaluations/batch_job.pybackend/app/services/tts_evaluations/batch_result_processing.py
| logger.error( | ||
| "[doc_transform.execute_job] failed to persist FAILED status%s | job_id=%s | db_error=%s", | ||
| context_suffix, | ||
| job_uuid, | ||
| db_error, | ||
| ) |
There was a problem hiding this comment.
Use the current function name in helper log prefixes.
These log lines are emitted from _handle_job_failure but are prefixed as execute_job, which makes log filtering less reliable.
As per coding guidelines, "Prefix all log messages with the function name in square brackets".
Also applies to: 139-143
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/doctransform/job.py` around lines 127 - 132, The log
messages inside the _handle_job_failure function use the wrong prefix
"[doc_transform.execute_job]"; update those logger.error calls to use the
current function name in square brackets (e.g.
"[doc_transform._handle_job_failure]") so log filtering is accurate—locate the
logger.error invocations in _handle_job_failure (including the call that logs
failed to persist FAILED status and the later similar logger.error at lines
referenced) and change the prefix string accordingly while preserving the rest
of the message and parameters (context_suffix, job_uuid, db_error, etc.).
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (1)
backend/app/tests/services/llm/test_jobs.py (1)
1357-1359: ⚡ Quick winAdd a timeout-path regression test here too.
This assertion only checks that
chain_idis forwarded. The new behavior in this PR is theexcept Timeoutpath, so it would be worth forcingexecutor.run()to raisegevent.Timeoutand assertinghandle_job_error(..., chain_id=...)runs before the timeout is re-raised.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/tests/services/llm/test_jobs.py` around lines 1357 - 1359, Add a regression test that forces the timeout path by setting executor.run to raise gevent.Timeout (e.g., set side_effect=gevent.Timeout()) when invoked, then run the code under test inside pytest.raises(gevent.Timeout) so the Timeout is re-raised to the test; after the call assert mock_handle_error (or handle_job_error) was invoked with chain_id equal to chain_id (use mock_handle_error.call_args/ assert_called_once and inspect kwargs["chain_id"]) to ensure handle_job_error ran before the Timeout propagated. Ensure you reference executor.run(), mock_handle_error (or handle_job_error) and gevent.Timeout in the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/celery/utils.py`:
- Around line 218-235: Annotate the new decorator to preserve the wrapped
callable's signature: import ParamSpec and TypeVar and declare P =
ParamSpec("P") and R = TypeVar("R"); change gevent_timeout to def
gevent_timeout(seconds: float, task_name: Optional[str] = None) ->
Callable[[Callable[P, R]], Callable[P, R]]; change decorator to def
decorator(func: Callable[P, R]) -> Callable[P, R]; and change wrapper signature
to def wrapper(*args: P.args, **kwargs: P.kwargs) -> R, keeping
functools.wraps(func) and existing runtime logic unchanged.
- Around line 227-229: The except block in the decorator that catches
gevent.Timeout should only handle the specific Timeout instance created for this
wrapper; change the handler in the function where you create the local variable
timeout (the Timeout instance at line ~223) to use "except Timeout as err:" and
then re-raise immediately if err is not timeout (if err is not timeout: raise),
otherwise log the soft-limit message and re-raise; update the logger call there
to remain the same for the matched instance.
In `@backend/app/services/collections/create_collection.py`:
- Around line 323-343: In the Timeout except branch of execute_job, guard
against rolling-back a job that has already been marked SUCCESSFUL by refreshing
the current job/collection state (e.g., reload collection_job or query job
status by job_id) and only call _handle_job_failure(job_id, ...) when the
freshly fetched status is not SUCCESSFUL; if it is SUCCESSFUL, log a warning
including job_id and skip deleting provider resources or flipping job state,
then re-raise the Timeout as before. Use the existing symbols execute_job,
collection_job, job_id, _handle_job_failure, creation_request, provider, and
result to locate and implement this conditional check.
- Around line 178-185: The current failure-path calls send_callback(...)
directly which can raise and mask the original job failure; wrap the
send_callback call in a try/except (mirroring the _handle_job_failure pattern)
so any exception from send_callback is caught and logged but not re-raised,
ensuring the original error remains the primary failure. Specifically, in the
block that builds failure_payload with build_failure_payload(...) and fetches
webhook_secret via get_webhook_secret(...), call send_callback(...) inside a
try/except, log the send error (including exception details and context like
creation_request.callback_url and collection_job id) and swallow the exception
so it cannot override the original failure handling.
In `@backend/app/services/llm/jobs.py`:
- Around line 212-225: The failure persists only after send_callback() so if the
webhook errors the chain update is skipped; move the chain-status update to
occur before calling send_callback() so the failed ChainStatus is persisted even
if the callback fails: inside the same error handling block (e.g., in
handle_job_error) perform the chain_id check and call
update_llm_chain_status(session, chain_id=chain_id, status=ChainStatus.FAILED,
error=callback_response.error) wrapped in its try/except (logging update_err)
before invoking send_callback(), keeping the existing logging and exception
handling intact.
---
Nitpick comments:
In `@backend/app/tests/services/llm/test_jobs.py`:
- Around line 1357-1359: Add a regression test that forces the timeout path by
setting executor.run to raise gevent.Timeout (e.g., set
side_effect=gevent.Timeout()) when invoked, then run the code under test inside
pytest.raises(gevent.Timeout) so the Timeout is re-raised to the test; after the
call assert mock_handle_error (or handle_job_error) was invoked with chain_id
equal to chain_id (use mock_handle_error.call_args/ assert_called_once and
inspect kwargs["chain_id"]) to ensure handle_job_error ran before the Timeout
propagated. Ensure you reference executor.run(), mock_handle_error (or
handle_job_error) and gevent.Timeout in the test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7b5fc941-4a59-4aa8-9c00-4cb896855ae3
📒 Files selected for processing (5)
backend/app/celery/tasks/job_execution.pybackend/app/celery/utils.pybackend/app/services/collections/create_collection.pybackend/app/services/llm/jobs.pybackend/app/tests/services/llm/test_jobs.py
| if creation_request and creation_request.callback_url and collection_job: | ||
| failure_payload = build_failure_payload(collection_job, str(err)) | ||
| webhook_secret = get_webhook_secret(project_id, organization_id) | ||
| send_callback( | ||
| str(creation_request.callback_url), | ||
| failure_payload, | ||
| webhook_secret=webhook_secret, | ||
| ) |
There was a problem hiding this comment.
Don't let failure-webhook errors replace the real job failure.
send_callback() is unguarded here, unlike the _handle_job_failure pattern in backend/app/services/doctransform/job.py:107-145. If the webhook send fails, this helper raises a second exception and masks the original timeout/provider failure.
Suggested fix
if creation_request and creation_request.callback_url and collection_job:
- failure_payload = build_failure_payload(collection_job, str(err))
- webhook_secret = get_webhook_secret(project_id, organization_id)
- send_callback(
- str(creation_request.callback_url),
- failure_payload,
- webhook_secret=webhook_secret,
- )
+ try:
+ failure_payload = build_failure_payload(collection_job, str(err))
+ webhook_secret = get_webhook_secret(project_id, organization_id)
+ send_callback(
+ str(creation_request.callback_url),
+ failure_payload,
+ webhook_secret=webhook_secret,
+ )
+ except Exception as cb_error:
+ logger.error(
+ "[create_collection.execute_job] Failure callback failed | "
+ "{'collection_job_id': '%s', 'error': '%s'}",
+ job_id,
+ str(cb_error),
+ exc_info=True,
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if creation_request and creation_request.callback_url and collection_job: | |
| failure_payload = build_failure_payload(collection_job, str(err)) | |
| webhook_secret = get_webhook_secret(project_id, organization_id) | |
| send_callback( | |
| str(creation_request.callback_url), | |
| failure_payload, | |
| webhook_secret=webhook_secret, | |
| ) | |
| if creation_request and creation_request.callback_url and collection_job: | |
| try: | |
| failure_payload = build_failure_payload(collection_job, str(err)) | |
| webhook_secret = get_webhook_secret(project_id, organization_id) | |
| send_callback( | |
| str(creation_request.callback_url), | |
| failure_payload, | |
| webhook_secret=webhook_secret, | |
| ) | |
| except Exception as cb_error: | |
| logger.error( | |
| "[create_collection.execute_job] Failure callback failed | " | |
| "{'collection_job_id': '%s', 'error': '%s'}", | |
| job_id, | |
| str(cb_error), | |
| exc_info=True, | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/collections/create_collection.py` around lines 178 -
185, The current failure-path calls send_callback(...) directly which can raise
and mask the original job failure; wrap the send_callback call in a try/except
(mirroring the _handle_job_failure pattern) so any exception from send_callback
is caught and logged but not re-raised, ensuring the original error remains the
primary failure. Specifically, in the block that builds failure_payload with
build_failure_payload(...) and fetches webhook_secret via
get_webhook_secret(...), call send_callback(...) inside a try/except, log the
send error (including exception details and context like
creation_request.callback_url and collection_job id) and swallow the exception
so it cannot override the original failure handling.
| except Timeout as err: | ||
| timeout_err = TimeoutError( | ||
| f"[execute_job] Task exceeded soft time limit of {err.seconds}s" | ||
| ) | ||
| logger.error( | ||
| "[create_collection.execute_job] Collection Creation Timed Out | {'collection_job_id': '%s', 'error': '%s'}", | ||
| job_id, | ||
| str(timeout_err), | ||
| ) | ||
| _handle_job_failure( | ||
| span, | ||
| project_id, | ||
| organization_id, | ||
| job_id, | ||
| timeout_err, | ||
| collection_job, | ||
| creation_request, | ||
| provider, | ||
| result, | ||
| ) | ||
| raise |
There was a problem hiding this comment.
Guard against timeouts after the success commit.
This branch can run after lines 273-301 have already committed the collection row and marked the job SUCCESSFUL. If the timeout fires during the success callback at lines 315-321, _handle_job_failure(...) will delete the provider resource and flip the job to FAILED, but the persisted collection row is left behind. That creates a DB/external-state mismatch.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/collections/create_collection.py` around lines 323 -
343, In the Timeout except branch of execute_job, guard against rolling-back a
job that has already been marked SUCCESSFUL by refreshing the current
job/collection state (e.g., reload collection_job or query job status by job_id)
and only call _handle_job_failure(job_id, ...) when the freshly fetched status
is not SUCCESSFUL; if it is SUCCESSFUL, log a warning including job_id and skip
deleting provider resources or flipping job state, then re-raise the Timeout as
before. Use the existing symbols execute_job, collection_job, job_id,
_handle_job_failure, creation_request, provider, and result to locate and
implement this conditional check.
| if chain_id: | ||
| try: | ||
| update_llm_chain_status( | ||
| session, | ||
| chain_id=chain_id, | ||
| status=ChainStatus.FAILED, | ||
| error=callback_response.error, | ||
| ) | ||
| except Exception as update_err: | ||
| logger.error( | ||
| f"[handle_job_error] Failed to update chain status: {update_err} | " | ||
| f"chain_id={chain_id}", | ||
| exc_info=True, | ||
| ) |
There was a problem hiding this comment.
Persist failure state before the webhook call can fail.
The new chain-status update lives after send_callback(), so a callback exception still skips both the job update and the new chain_id update. That leaves failed chains stuck in a non-failed state on exactly the path this change is meant to cover.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/jobs.py` around lines 212 - 225, The failure
persists only after send_callback() so if the webhook errors the chain update is
skipped; move the chain-status update to occur before calling send_callback() so
the failed ChainStatus is persisted even if the callback fails: inside the same
error handling block (e.g., in handle_job_error) perform the chain_id check and
call update_llm_chain_status(session, chain_id=chain_id,
status=ChainStatus.FAILED, error=callback_response.error) wrapped in its
try/except (logging update_err) before invoking send_callback(), keeping the
existing logging and exception handling intact.
There was a problem hiding this comment.
Actionable comments posted: 8
♻️ Duplicate comments (1)
backend/app/services/collections/create_collection.py (1)
179-186:⚠️ Potential issue | 🟠 Major | ⚡ Quick winGuard
send_callbackto prevent masking the original failure.The
send_callbackcall in_handle_job_failureis unguarded. If the webhook send fails, the exception will propagate and mask the original timeout/provider failure that triggered this handler.Suggested fix
if creation_request and creation_request.callback_url and collection_job: - failure_payload = build_failure_payload(collection_job, str(err)) - webhook_secret = get_webhook_secret(project_id, organization_id) - send_callback( - str(creation_request.callback_url), - failure_payload, - webhook_secret=webhook_secret, - ) + try: + failure_payload = build_failure_payload(collection_job, str(err)) + webhook_secret = get_webhook_secret(project_id, organization_id) + send_callback( + str(creation_request.callback_url), + failure_payload, + webhook_secret=webhook_secret, + ) + except Exception as cb_error: + logger.error( + "[create_collection._handle_job_failure] Failure callback failed | " + "{'job_id': '%s', 'error': '%s'}", + job_id, + str(cb_error), + exc_info=True, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/collections/create_collection.py` around lines 179 - 186, In _handle_job_failure, the unguarded call to send_callback (using creation_request, build_failure_payload, get_webhook_secret, and collection_job) can raise and mask the original failure; wrap the send_callback invocation in a try/except block that catches exceptions from sending the webhook, logs the send error (including context: project_id/organization_id/callback_url and the exception), and does not re-raise so the original job failure remains the primary error path.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 157-172: The task run_collection_batch_job imports and calls a
non-existent execute_batch_job from app.services.collections.create_collection;
replace the import and call with the existing execute_job from that module (or
implement execute_batch_job if batch-specific behavior is required); update the
lambda inside _run_with_otel_parent to invoke execute_job(project_id=project_id,
job_id=job_id, task_id=current_task.request.id, task_instance=self, **kwargs)
and verify the execute_job signature accepts task_id and task_instance,
adjusting argument names if needed.
In `@backend/app/services/collections/create_collection.py`:
- Around line 324-342: The TimeoutError is constructed with an unnecessary
f-string prefix; change TimeoutError(f"Task exceeded soft time limit") to a
plain string TimeoutError("Task exceeded soft time limit") (or otherwise include
real placeholders if intended), and keep the logger.error call and subsequent
_handle_job_failure(...) invocation unchanged so timeout_err contains the
correct message when passed to _handle_job_failure and logged with job_id.
In `@backend/app/services/collections/delete_collection.py`:
- Around line 250-268: In delete_collection.execute_job, remove the unnecessary
f-string prefix when creating timeout_err: replace the TimeoutError
instantiation that uses f"Task exceeded soft time limit" with a plain string
literal ("Task exceeded soft time limit") so timeout_err is created without an
extraneous f-string; ensure the variable name timeout_err remains used for
span.record_exception, span.set_status, _mark_job_failed_and_callback and the
raised exception.
In `@backend/app/services/doctransform/job.py`:
- Line 279: The string literal used to construct timeout_err uses an unnecessary
f-string prefix; update the creation of timeout_err (the TimeoutError
instantiation named timeout_err) to use a plain string literal without the
f-prefix (i.e., remove the leading "f" from TimeoutError(f"...") so it becomes
TimeoutError("...")); search for other similar extraneous f-strings in this
module and remove the prefix where no placeholders are present.
In `@backend/app/services/stt_evaluations/batch_job.py`:
- Around line 102-113: The logger.error call and the TimeoutError message in the
except block handling (Timeout, SoftTimeLimitExceeded) use unnecessary f-string
prefixes with no placeholders; update the logger.error call (the string in
logger.error inside the except handling for Timeout/SoftTimeLimitExceeded) to a
plain string (remove the leading f) and likewise remove the f prefix when
constructing timeout_err = TimeoutError(...) so both messages are regular string
literals, leaving the rest of the exception handling (update_stt_run, run_id,
session, raising the error) unchanged.
In `@backend/app/services/stt_evaluations/metric_job.py`:
- Around line 158-169: In the except block catching (Timeout,
SoftTimeLimitExceeded) in execute_metric_computation, remove the unnecessary
f-string prefix when creating timeout_err (change TimeoutError(f"Task exceeded
soft time limit") to a plain string TimeoutError("Task exceeded soft time
limit")); keep the rest of the block (logger.error(...),
update_stt_run(session=session, run_id=run_id, status="failed",
error_message=str(timeout_err)), and re-raise) unchanged so the error message is
identical but without the redundant f-string.
In `@backend/app/services/tts_evaluations/batch_job.py`:
- Around line 130-141: The code creates timeout_err using an unnecessary
f-string (TimeoutError(f"Task exceeded soft time limit")); replace it with a
normal string literal (TimeoutError("Task exceeded soft time limit")) to remove
the extraneous f-prefix; keep the surrounding exception handling and calls to
logger.error and update_tts_run intact (symbols: Timeout, SoftTimeLimitExceeded,
timeout_err, logger.error, update_tts_run).
In `@backend/app/services/tts_evaluations/batch_result_processing.py`:
- Around line 244-255: The logger.error call in execute_tts_result_processing is
using an unnecessary f-string without placeholders; change the string literal
passed to logger.error to a plain string (remove the leading 'f') in the except
block handling Timeout and SoftTimeLimitExceeded where timeout_err is created
and update_tts_run(session=session, run_id=evaluation_run_id, ...) is called so
the log reads correctly while keeping the TimeoutError handling and subsequent
update_tts_run and raise intact.
---
Duplicate comments:
In `@backend/app/services/collections/create_collection.py`:
- Around line 179-186: In _handle_job_failure, the unguarded call to
send_callback (using creation_request, build_failure_payload,
get_webhook_secret, and collection_job) can raise and mask the original failure;
wrap the send_callback invocation in a try/except block that catches exceptions
from sending the webhook, logs the send error (including context:
project_id/organization_id/callback_url and the exception), and does not
re-raise so the original job failure remains the primary error path.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 98e634c2-192a-4409-a876-455bdb22d5ec
📒 Files selected for processing (15)
backend/app/celery/tasks/job_execution.pybackend/app/services/collections/create_collection.pybackend/app/services/collections/delete_collection.pybackend/app/services/doctransform/job.pybackend/app/services/doctransform/registry.pybackend/app/services/doctransform/zerox_transformer.pybackend/app/services/llm/jobs.pybackend/app/services/stt_evaluations/batch_job.pybackend/app/services/stt_evaluations/metric_job.pybackend/app/services/tts_evaluations/batch_job.pybackend/app/services/tts_evaluations/batch_result_processing.pybackend/app/tests/services/collections/test_create_collection.pybackend/app/tests/services/collections/test_delete_collection.pybackend/app/tests/services/doctransformer/test_job/test_execute_job_errors.pybackend/app/tests/services/llm/test_jobs.py
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
backend/app/tests/services/doctransformer/test_registry.py (2)
102-117: ⚡ Quick winAdd type hints to the new inline transformer classes.
Both
TimeoutTransformer.transformandSoftLimitTransformer.transformare missing parameter and return type annotations, violating the project's type-hint rule.✏️ Proposed fix
class TimeoutTransformer: - def transform(self, input_path, output_path): + def transform(self, input_path: Path, output_path: Path) -> Path: raise Timeout() monkeypatch.setitem(TRANSFORMERS, "timeout", TimeoutTransformer) with pytest.raises(Timeout): convert_document(input_file, output_file, transformer_name="timeout") # Celery SoftTimeLimitExceeded propagates without being wrapped class SoftLimitTransformer: - def transform(self, input_path, output_path): + def transform(self, input_path: Path, output_path: Path) -> Path: raise SoftTimeLimitExceeded()You'll also need to add
from pathlib import Pathat the top if it isn't already imported.As per coding guidelines: "Always add type hints to all function parameters and return values in Python code."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/services/doctransformer/test_registry.py` around lines 102 - 117, Add missing type hints to the inline transformer classes: annotate TimeoutTransformer.transform(self, input_path: Path, output_path: Path) -> None and SoftLimitTransformer.transform(self, input_path: Path, output_path: Path) -> None, and ensure Path is imported from pathlib at the top of the test file; keep the existing raise statements unchanged so behavior and the pytest.raises checks for convert_document with transformer_name="timeout"/"softlimit" remain the same.
75-117: ⚡ Quick winConsider splitting
test_convert_documentinto focused, single-scenario tests.The function now contains five distinct test scenarios. A failure in an earlier
pytest.raisesblock will suppress execution of the later ones, making failures harder to isolate. Separate test functions (e.g.test_convert_document_timeout_propagates,test_convert_document_soft_limit_propagates) give independent reporting and cleaner names.♻️ Sketch of the refactor
`@pytest.fixture` def dummy_input_output(tmp_path): input_file = tmp_path / "input.txt" input_file.write_text("test") return input_file, tmp_path / "output.txt" def test_convert_document_success(dummy_input_output, monkeypatch): ... def test_convert_document_transformer_not_found(dummy_input_output, monkeypatch): ... def test_convert_document_wraps_generic_error(dummy_input_output, monkeypatch): ... def test_convert_document_timeout_propagates(dummy_input_output, monkeypatch): ... def test_convert_document_soft_time_limit_propagates(dummy_input_output, monkeypatch): ...As per coding guidelines: "Use factory pattern for test fixtures in
backend/app/tests/."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/services/doctransformer/test_registry.py` around lines 75 - 117, Split the large test_convert_document into multiple focused tests: create a fixture factory (e.g., dummy_input_output) that prepares input_file and output_file and reuse it across tests; then implement separate test functions test_convert_document_success, test_convert_document_transformer_not_found, test_convert_document_wraps_generic_error, test_convert_document_timeout_propagates, and test_convert_document_soft_time_limit_propagates which each call convert_document with the proper monkeypatched TRANSFORMERS entry (use monkeypatch.setitem(TRANSFORMERS, "...", <TransformerClass>)) and assert the expected outcome (success content, raises ValueError, raises TransformationError, propagates Timeout, propagates SoftTimeLimitExceeded); keep references to convert_document, TRANSFORMERS, and TransformationError/Timeout/SoftTimeLimitExceeded to locate code.backend/app/services/collections/create_collection.py (1)
151-161: ⚡ Quick winAdd type hints to
_handle_job_failureparameters.The
span,provider, andresultparameters lack type annotations. Per coding guidelines, all function parameters must have type hints.Proposed fix
+from opentelemetry.trace import Span +from app.services.collections.providers.base import BaseCollectionProvider, ProviderResult + def _handle_job_failure( - span, + span: Span, project_id: int, organization_id: int, job_id: str, err: Exception, collection_job: CollectionJob | None, creation_request: CreationRequest | None, - provider=None, - result=None, + provider: BaseCollectionProvider | None = None, + result: ProviderResult | None = None, ) -> None:As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/create_collection.py` around lines 151 - 161, The function _handle_job_failure is missing type annotations for span, provider, and result; update the signature to add explicit types (e.g. import typing.Any and typing.Optional and annotate as span: Span | Any, provider: Optional[Provider] | Any, result: Optional[Any]) or use the concrete Span type used elsewhere (e.g. opentelemetry.trace.Span) and the project-specific Provider type if one exists; ensure you import any needed types (Span, Provider, Any, Optional) and keep the return type as -> None.backend/app/tests/services/llm/test_jobs.py (1)
224-256: 💤 Low valueMove
ChainStatusimport to module level.
from app.models.llm.request import ChainStatusat line 226 is the only local-scope import in the file. Placing it at module level makes import errors surface at collection time rather than only when this test runs.♻️ Suggested change
from app.models.llm.request import ConfigBlob, LLMCallConfig, LLMChainRequest from app.models.llm.request import ChainBlock as ChainBlockModel +from app.models.llm.request import ChainStatusThen remove line 226 inside the test method:
- from app.models.llm.request import ChainStatus - job = JobCrud(session=db).create(🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/services/llm/test_jobs.py` around lines 224 - 256, Move the local import of ChainStatus out of the test body so import errors surface at collection time: remove the line "from app.models.llm.request import ChainStatus" inside test_handle_job_error_with_chain_id_updates_chain_status and add a module-level import for ChainStatus at the top of the test file; ensure the test still references ChainStatus unchanged and run tests to confirm no import regressions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 155-172: run_collection_batch_job duplicates
run_create_collection_job; remove the duplication by either deleting one of the
task functions if they truly serve the same purpose, or implement distinct batch
logic by creating a new execute_batch_job (call from run_collection_batch_job)
so batch-specific processing is isolated from execute_job; if the two tasks are
intentionally separate only for routing, add a clarifying comment above both
task definitions stating that and why they differ; additionally update
run_create_collection_job to include proper type hints on self (celery.Task) and
**kwargs (Any) to match project Python 3.11+ typing conventions.
---
Nitpick comments:
In `@backend/app/services/collections/create_collection.py`:
- Around line 151-161: The function _handle_job_failure is missing type
annotations for span, provider, and result; update the signature to add explicit
types (e.g. import typing.Any and typing.Optional and annotate as span: Span |
Any, provider: Optional[Provider] | Any, result: Optional[Any]) or use the
concrete Span type used elsewhere (e.g. opentelemetry.trace.Span) and the
project-specific Provider type if one exists; ensure you import any needed types
(Span, Provider, Any, Optional) and keep the return type as -> None.
In `@backend/app/tests/services/doctransformer/test_registry.py`:
- Around line 102-117: Add missing type hints to the inline transformer classes:
annotate TimeoutTransformer.transform(self, input_path: Path, output_path: Path)
-> None and SoftLimitTransformer.transform(self, input_path: Path, output_path:
Path) -> None, and ensure Path is imported from pathlib at the top of the test
file; keep the existing raise statements unchanged so behavior and the
pytest.raises checks for convert_document with
transformer_name="timeout"/"softlimit" remain the same.
- Around line 75-117: Split the large test_convert_document into multiple
focused tests: create a fixture factory (e.g., dummy_input_output) that prepares
input_file and output_file and reuse it across tests; then implement separate
test functions test_convert_document_success,
test_convert_document_transformer_not_found,
test_convert_document_wraps_generic_error,
test_convert_document_timeout_propagates, and
test_convert_document_soft_time_limit_propagates which each call
convert_document with the proper monkeypatched TRANSFORMERS entry (use
monkeypatch.setitem(TRANSFORMERS, "...", <TransformerClass>)) and assert the
expected outcome (success content, raises ValueError, raises
TransformationError, propagates Timeout, propagates SoftTimeLimitExceeded); keep
references to convert_document, TRANSFORMERS, and
TransformationError/Timeout/SoftTimeLimitExceeded to locate code.
In `@backend/app/tests/services/llm/test_jobs.py`:
- Around line 224-256: Move the local import of ChainStatus out of the test body
so import errors surface at collection time: remove the line "from
app.models.llm.request import ChainStatus" inside
test_handle_job_error_with_chain_id_updates_chain_status and add a module-level
import for ChainStatus at the top of the test file; ensure the test still
references ChainStatus unchanged and run tests to confirm no import regressions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 96b92f85-01a6-4b40-8703-b07e163dd7a3
📒 Files selected for processing (13)
backend/app/celery/tasks/job_execution.pybackend/app/celery/utils.pybackend/app/services/collections/create_collection.pybackend/app/services/collections/delete_collection.pybackend/app/services/doctransform/job.pybackend/app/services/llm/jobs.pybackend/app/services/stt_evaluations/batch_job.pybackend/app/services/stt_evaluations/metric_job.pybackend/app/services/tts_evaluations/batch_job.pybackend/app/services/tts_evaluations/batch_result_processing.pybackend/app/tests/services/doctransformer/test_registry.pybackend/app/tests/services/llm/test_jobs.pybackend/app/tests/services/stt_evaluations/test_metric_job.py
✅ Files skipped from review due to trivial changes (2)
- backend/app/services/doctransform/job.py
- backend/app/services/llm/jobs.py
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/app/services/stt_evaluations/metric_job.py
- backend/app/services/tts_evaluations/batch_job.py
There was a problem hiding this comment.
♻️ Duplicate comments (1)
backend/app/celery/tasks/job_execution.py (1)
2-4:⚠️ Potential issue | 🟠 Major | ⚡ Quick winComplete typing is still missing on helper/task signatures.
self,**kwargs, and return types are still unannotated across task entrypoints, and helper params are also untyped. This violates the repo-wide Python typing rule and leavescelery/Anyimports partially unused by intent.✅ Suggested patch pattern
-from typing import Any +from collections.abc import Callable +from typing import Any import celery @@ -def _extract_parent_context(task_instance) -> otel_context.Context: +def _extract_parent_context(task_instance: celery.Task) -> otel_context.Context: @@ -def _run_with_otel_parent(task_instance, fn): +def _run_with_otel_parent( + task_instance: celery.Task, fn: Callable[[], Any] +) -> Any: @@ -def run_llm_job(self, project_id: int, job_id: str, trace_id: str, **kwargs): +def run_llm_job( + self: celery.Task, project_id: int, job_id: str, trace_id: str, **kwargs: Any +) -> Any:Apply the same signature pattern to all
run_*functions in this file.#!/bin/bash # Verify untyped params/returns in this file (read-only) python - <<'PY' import ast from pathlib import Path path = Path("backend/app/celery/tasks/job_execution.py") tree = ast.parse(path.read_text()) for node in tree.body: if isinstance(node, ast.FunctionDef): missing = [] args = node.args.posonlyargs + node.args.args + node.args.kwonlyargs for a in args: if a.annotation is None: missing.append(a.arg) if node.args.vararg and node.args.vararg.annotation is None: missing.append(f"*{node.args.vararg.arg}") if node.args.kwarg and node.args.kwarg.annotation is None: missing.append(f"**{node.args.kwarg.arg}") if node.returns is None: missing.append("return") if missing: print(f"{node.name} (Line {node.lineno}): missing {', '.join(missing)}") PYAs per coding guidelines, "
**/*.py: Always add type hints to all function parameters and return values in Python code" and "Use Python 3.11+ with type hints throughout the codebase".Also applies to: 23-42, 65-255
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/celery/tasks/job_execution.py` around lines 2 - 4, Annotate all task entrypoints and helpers in this file (every run_* function and any helpers they call): add explicit parameter and return type hints (e.g., self: celery.Task, **kwargs: Any or **kwargs: Dict[str, Any] as appropriate) and concrete types for helper params/returns, and use typing.Any/Dict/Optional where exact types are unknown; update function signatures to include return annotations (e.g., -> Any or -> Dict[str, Any]) and adjust imports (from typing import Any, Dict, Optional) so no imports are unused; apply the same pattern to every run_* function referenced in this module so the AST checker in the review comment reports no missing annotations.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 2-4: Annotate all task entrypoints and helpers in this file (every
run_* function and any helpers they call): add explicit parameter and return
type hints (e.g., self: celery.Task, **kwargs: Any or **kwargs: Dict[str, Any]
as appropriate) and concrete types for helper params/returns, and use
typing.Any/Dict/Optional where exact types are unknown; update function
signatures to include return annotations (e.g., -> Any or -> Dict[str, Any]) and
adjust imports (from typing import Any, Dict, Optional) so no imports are
unused; apply the same pattern to every run_* function referenced in this module
so the AST checker in the review comment reports no missing annotations.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e6939101-5972-4ac8-ae39-fc9d40f8ffd2
📒 Files selected for processing (3)
backend/app/celery/tasks/job_execution.pybackend/app/services/llm/jobs.pybackend/app/tests/services/llm/test_jobs.py
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/app/services/llm/jobs.py
- backend/app/tests/services/llm/test_jobs.py
|
|
||
| except (Timeout, SoftTimeLimitExceeded) as err: | ||
| timeout_err = TimeoutError("Task exceeded soft time limit") | ||
| logger.error( |
There was a problem hiding this comment.
use logger.warning here instead of error ... if u think it is not much critical
| except (Timeout, SoftTimeLimitExceeded): | ||
| raise | ||
| except TimeoutError: | ||
| logger.error( |
| ) | ||
|
|
||
| except (Timeout, SoftTimeLimitExceeded): | ||
| logger.error( |
| return executor.run() | ||
|
|
||
| except (Timeout, SoftTimeLimitExceeded) as err: | ||
| logger.error( |
|
|
||
| except (Timeout, SoftTimeLimitExceeded) as err: | ||
| timeout_err = TimeoutError("Task exceeded soft time limit") | ||
| logger.error( |
|
|
||
| except (Timeout, SoftTimeLimitExceeded) as err: | ||
| timeout_err = TimeoutError("Task exceeded soft time limit") | ||
| logger.error( |
There was a problem hiding this comment.
In cases like timeout or SoftTimeLimitExceeded, these are expected scenarios rather than unexpected failures. We already know why they occur and under what conditions they happen, so logging them with logger.error` is not appropriate. A lower log level such as warning or info would be more suitable.
|
|
||
| sample_texts = get_sample_texts_from_dataset(session, dataset, project_id) | ||
| if not run: | ||
| logger.error( |
There was a problem hiding this comment.
logger.warning .. again u know why this is happening .. when run data is not found db its return None .. so at this point either set warning or info
| update_tts_run( | ||
|
|
||
| if not dataset: | ||
| logger.error( |
There was a problem hiding this comment.
same goes here ... use logger.warning or info
| sample_texts = get_sample_texts_from_dataset(session, dataset, project_id) | ||
|
|
||
| if not sample_texts: | ||
| logger.error( |
There was a problem hiding this comment.
logger.warning or logger.info
|
|
||
| def gevent_timeout( | ||
| seconds: float | None, task_name: str | None = None | ||
| ) -> Callable[[Callable[P, R]], Callable[P, R]]: |
There was a problem hiding this comment.
nitpick: the return type is largely unreadable for humans. May consider making type safety liberal since this is just a decorator function.
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| P = ParamSpec("P") |
There was a problem hiding this comment.
nitpick: may consider YOLO-ing it and forget about type safety here as its unreadable non-standard python.
Prajna1999
left a comment
There was a problem hiding this comment.
added a few nitpicks wrt type safety and readability.
Summary
Target issue is #767
Notes
New Features
Bug Fixes
Tests
Summary by CodeRabbit
Bug Fixes
Tests