Job Management: Preserve llm_call details#823
Conversation
📝 WalkthroughWalkthroughPR introduces a two-phase LLM call lifecycle (pending → resolved), makes LlmCall provider/model/input_type nullable, expands AudioContent to support ChangesAudio URL Support and Call Lifecycle
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
backend/app/core/storage_utils.py (1)
211-222: ⚡ Quick winAdd m4a MIME aliases to extension mapping.
audio/m4a(and common variantaudio/x-m4a) currently falls back towav, which can produce wrong file extensions.Suggested mapping update
_MIME_TO_EXT: dict[str, str] = { @@ "audio/mp4": "mp4", + "audio/m4a": "m4a", + "audio/x-m4a": "m4a", "audio/aac": "aac", "audio/flac": "flac", }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/core/storage_utils.py` around lines 211 - 222, The _MIME_TO_EXT mapping in storage_utils.py is missing aliases for m4a mime types so audio/m4a and audio/x-m4a fall back incorrectly; update the _MIME_TO_EXT dict to include entries mapping "audio/m4a" and "audio/x-m4a" to "m4a" (alongside the existing audio/* entries) so functions that rely on _MIME_TO_EXT return the correct .m4a extension.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/alembic/versions/058_make_llm_call_fields_nullable.py`:
- Around line 25-28: The downgrade() changes try to set llm_call.model,
llm_call.provider and llm_call.input_type to nullable=False which will fail if
any rows contain NULL; modify downgrade() to first backfill NULL values for
these columns (or abort with a clear error) before calling op.alter_column, e.g.
run UPDATE statements against the "llm_call" table to set sensible defaults for
model, provider and input_type or raise a RuntimeError if NULLs exist, then call
op.alter_column("llm_call", "model", nullable=False),
op.alter_column("llm_call", "provider", nullable=False) and
op.alter_column("llm_call", "input_type", nullable=False).
In `@backend/app/crud/llm.py`:
- Around line 284-291: get_llm_call_by_job_id is non-deterministic because it
uses .first() without filtering or ordering, which can return the wrong
standalone LlmCall when multiple rows exist; update the query in
get_llm_call_by_job_id to restrict to the intended active/pending row (e.g., add
a predicate such as LlmCall.status == "pending" or LlmCall.is_active) and add a
deterministic ORDER BY (for example LlmCall.created_at.desc() or
LlmCall.id.desc()) before taking the first result so updates always target the
correct record.
In `@backend/app/services/llm/chain/executor.py`:
- Line 73: The _resolve_presigned_url function's parameter is untyped; update
the signature to include a concrete type (e.g., change def
_resolve_presigned_url(self, output) -> None: to def
_resolve_presigned_url(self, output: dict[str, Any]) -> None:) and import Any
from typing at the top of the module; if a more specific type exists in your
codebase (e.g., a Response or Output model), use that type instead of dict[str,
Any] and keep the return as -> None.
In `@backend/app/utils.py`:
- Around line 595-607: The download_audio_bytes function is vulnerable to SSRF
and memory exhaustion; harden it by validating the input URL and streaming with
size caps: ensure the scheme is http/https, resolve the hostname to an IP and
reject private/loopback/metadata or other disallowed ranges (re-check after
redirects), enforce and check Content-Length against a configured
MAX_AUDIO_BYTES before allocating, use requests.get(stream=True) with short
timeouts and iterate response.iter_content(chunk_size) accumulating up to the
max and aborting if exceeded, and return clear error messages on validation
failures, redirect-caused host changes, timeouts, or oversized payloads; keep
all checks and limits inside download_audio_bytes and reference it when
implementing these changes.
---
Nitpick comments:
In `@backend/app/core/storage_utils.py`:
- Around line 211-222: The _MIME_TO_EXT mapping in storage_utils.py is missing
aliases for m4a mime types so audio/m4a and audio/x-m4a fall back incorrectly;
update the _MIME_TO_EXT dict to include entries mapping "audio/m4a" and
"audio/x-m4a" to "m4a" (alongside the existing audio/* entries) so functions
that rely on _MIME_TO_EXT return the correct .m4a extension.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: aca2da0e-ba34-4944-b3f4-9eac2b41990d
📒 Files selected for processing (8)
backend/app/alembic/versions/058_make_llm_call_fields_nullable.pybackend/app/core/storage_utils.pybackend/app/crud/llm.pybackend/app/crud/llm_chain.pybackend/app/models/llm/request.pybackend/app/services/llm/chain/executor.pybackend/app/services/llm/jobs.pybackend/app/utils.py
| def downgrade() -> None: | ||
| op.alter_column("llm_call", "model", nullable=False) | ||
| op.alter_column("llm_call", "provider", nullable=False) | ||
| op.alter_column("llm_call", "input_type", nullable=False) |
There was a problem hiding this comment.
Downgrade can fail once NULL rows exist.
Setting these columns back to nullable=False without backfilling will fail on rollback if any row has NULL in input_type, provider, or model.
Suggested migration hardening
def downgrade() -> None:
+ op.execute(
+ """
+ UPDATE llm_call
+ SET input_type = COALESCE(input_type, 'text'),
+ provider = COALESCE(provider, 'unknown'),
+ model = COALESCE(model, 'unknown')
+ """
+ )
op.alter_column("llm_call", "model", nullable=False)
op.alter_column("llm_call", "provider", nullable=False)
op.alter_column("llm_call", "input_type", nullable=False)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/alembic/versions/058_make_llm_call_fields_nullable.py` around
lines 25 - 28, The downgrade() changes try to set llm_call.model,
llm_call.provider and llm_call.input_type to nullable=False which will fail if
any rows contain NULL; modify downgrade() to first backfill NULL values for
these columns (or abort with a clear error) before calling op.alter_column, e.g.
run UPDATE statements against the "llm_call" table to set sensible defaults for
model, provider and input_type or raise a RuntimeError if NULLs exist, then call
op.alter_column("llm_call", "model", nullable=False),
op.alter_column("llm_call", "provider", nullable=False) and
op.alter_column("llm_call", "input_type", nullable=False).
There was a problem hiding this comment.
@Prajna1999, understood! If downgrades are only expected to run in controlled environments (e.g., dev/staging with no real NULL rows), the concern wouldn't apply in practice. Thanks for the clarification.
| self._context.project_id, self._context.organization_id | ||
| ) | ||
|
|
||
| def _resolve_presigned_url(self, output) -> None: |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Add a type hint for _resolve_presigned_url parameter.
output is currently untyped; this breaks the project’s typing rule and weakens static checks.
As per coding guidelines: “Always add type hints to all function parameters and return values in Python code”.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/services/llm/chain/executor.py` at line 73, The
_resolve_presigned_url function's parameter is untyped; update the signature to
include a concrete type (e.g., change def _resolve_presigned_url(self, output)
-> None: to def _resolve_presigned_url(self, output: dict[str, Any]) -> None:)
and import Any from typing at the top of the module; if a more specific type
exists in your codebase (e.g., a Response or Output model), use that type
instead of dict[str, Any] and keep the return as -> None.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/crud/llm.py (1)
22-53:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMultimodal
listinput is serialized viastr(...), which loses structure and is hard to parse later.
listis now an accepted input type, but this path currently falls intoreturn str(query_input). That stores Python repr text instead of a stable JSON payload, which can break downstream reads and defeats detail preservation for multimodal jobs.Proposed fix
-def serialize_input(query_input: QueryInput | str | list) -> str: +def serialize_input(query_input: QueryInput | str | list[QueryInput | str]) -> str: @@ - else: + elif isinstance(query_input, list): + return json.dumps( + [ + item.model_dump(mode="json") if hasattr(item, "model_dump") else item + for item in query_input + ] + ) + else: return str(query_input)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/crud/llm.py` around lines 22 - 53, The serialize_input function currently turns list inputs into a Python string which loses structure; update serialize_input (and its handling of QueryInput | str | list) to detect when query_input is a list and serialize it to stable JSON (e.g., json.dumps) preserving element structure and multimodal metadata, rather than falling through to return str(...). Ensure the list branch serializes elements consistently (reusing TextInput and AudioInput serialization logic used in serialize_input for individual items), so lists of TextInput/AudioInput produce a JSON array with the same objects/fields (type, format, mime_type, url, size_bytes) as single-item serialization.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/services/llm/jobs.py`:
- Around line 648-709: The guardrail short-circuit can return before the LlmCall
row is created/resolved, losing call history; ensure the pending→resolved
persistence happens before any early return from
execute_llm_call/apply_input_guardrails by either (A) moving the existing
create/update logic (get_llm_call_by_job_id, update_llm_call_resolved_fields,
create_llm_call) to run immediately after resolving the config and before
calling apply_input_guardrails(), or (B) if keeping guardrail invocation first,
add a code path that when apply_input_guardrails returns
guardrail_direct_response or input_error it still calls
update_llm_call_resolved_fields (or create_llm_call when chain_id set) to
persist the resolved call; reference get_llm_call_by_job_id,
update_llm_call_resolved_fields, create_llm_call, apply_input_guardrails,
execute_llm_call, and start_job to locate the relevant spots.
- Line 1272: The call to get_llm_chain_by_job_id in jobs.py is missing its
import and will raise NameError; add an import for get_llm_chain_by_job_id (the
function used at the call site) at the top of backend/app/services/llm/jobs.py
from the module where it’s defined so the symbol is available before the line
invoking get_llm_chain_by_job_id(session, job_uuid).
- Around line 230-239: The code creates an LlmChain via create_llm_chain but if
start_llm_chain_job() raises, the chain row remains in its initial state; wrap
the start_llm_chain_job(...) call in a try/except that catches exceptions, load
or reference the created chain (from create_llm_chain call or via
job_id/project_id/organization_id), set its status to FAILED (and record the
error message/traceback in the chain's error field), persist that update, and
then re-raise the exception so existing job-failure handling still runs;
reference create_llm_chain, start_llm_chain_job, and execute_chain_job when
locating the code to change.
---
Outside diff comments:
In `@backend/app/crud/llm.py`:
- Around line 22-53: The serialize_input function currently turns list inputs
into a Python string which loses structure; update serialize_input (and its
handling of QueryInput | str | list) to detect when query_input is a list and
serialize it to stable JSON (e.g., json.dumps) preserving element structure and
multimodal metadata, rather than falling through to return str(...). Ensure the
list branch serializes elements consistently (reusing TextInput and AudioInput
serialization logic used in serialize_input for individual items), so lists of
TextInput/AudioInput produce a JSON array with the same objects/fields (type,
format, mime_type, url, size_bytes) as single-item serialization.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d05cdbd3-8bfe-484d-9916-7709342fb653
📒 Files selected for processing (3)
backend/app/crud/llm.pybackend/app/services/llm/jobs.pybackend/app/tests/services/llm/test_jobs.py
✅ Files skipped from review due to trivial changes (1)
- backend/app/tests/services/llm/test_jobs.py
| try: | ||
| create_llm_chain( | ||
| db, | ||
| job_id=job.id, | ||
| project_id=project_id, | ||
| organization_id=organization_id, | ||
| total_blocks=len(request.blocks), | ||
| input=serialize_input(request.query.input), | ||
| configs=[block.model_dump(mode="json") for block in request.blocks], | ||
| ) |
There was a problem hiding this comment.
Mark the pre-created chain as failed if task enqueueing dies.
This block can create the LlmChain row successfully, but if start_llm_chain_job() raises a few lines later only the Job is marked failed. The chain row is then left in its initial state forever because execute_chain_job() never runs to reconcile it.
💡 Proposed fix
def start_chain_job(
db: Session, request: LLMChainRequest, project_id: int, organization_id: int
) -> UUID:
"""Create an LLM Chain job and schedule Celery task."""
trace_id = correlation_id.get() or "N/A"
job_crud = JobCrud(session=db)
job = job_crud.create(
job_type=JobType.LLM_CHAIN, trace_id=trace_id, project_id=project_id
)
+ chain_record = None
@@
try:
- create_llm_chain(
+ chain_record = create_llm_chain(
db,
job_id=job.id,
project_id=project_id,
organization_id=organization_id,
total_blocks=len(request.blocks),
@@
except Exception as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
+ if chain_record is not None:
+ update_llm_chain_status(
+ db,
+ chain_id=chain_record.id,
+ status=ChainStatus.FAILED,
+ error=str(e),
+ )
logger.error(
f"[start_chain_job] Error starting Celery task: {str(e)} | job_id={job.id}, project_id={project_id}",
exc_info=True,
)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/services/llm/jobs.py` around lines 230 - 239, The code creates an
LlmChain via create_llm_chain but if start_llm_chain_job() raises, the chain row
remains in its initial state; wrap the start_llm_chain_job(...) call in a
try/except that catches exceptions, load or reference the created chain (from
create_llm_chain call or via job_id/project_id/organization_id), set its status
to FAILED (and record the error message/traceback in the chain's error field),
persist that update, and then re-raise the exception so existing job-failure
handling still runs; reference create_llm_chain, start_llm_chain_job, and
execute_chain_job when locating the code to change.
| # For standalone calls (no chain_id), a pending LlmCall row was | ||
| # pre-created in start_job(). Find it and fill in the resolved fields. | ||
| # For chain block calls (chain_id set), always create a fresh row. | ||
| existing = ( | ||
| get_llm_call_by_job_id(session, job_id) | ||
| if chain_id is None | ||
| else None | ||
| ) | ||
| if existing is not None: | ||
| # Determine input/output types now that config is resolved | ||
| completion_type = resolved_config_blob.completion.type or ( | ||
| resolved_config_blob.completion.params.get("type", "text") | ||
| if isinstance(resolved_config_blob.completion.params, dict) | ||
| else getattr( | ||
| resolved_config_blob.completion.params, "type", "text" | ||
| ) | ||
| ) | ||
| if completion_type == "stt": | ||
| _input_type, _output_type = "audio", "text" | ||
| elif completion_type == "tts": | ||
| _input_type, _output_type = "text", "audio" | ||
| elif isinstance(query.input, ImageInput): | ||
| _input_type, _output_type = "image", "text" | ||
| elif isinstance(query.input, PDFInput): | ||
| _input_type, _output_type = "pdf", "text" | ||
| elif isinstance(query.input, list): | ||
| _input_type, _output_type = "multimodal", "text" | ||
| else: | ||
| _input_type, _output_type = "text", "text" | ||
|
|
||
| config_dict: dict[str, Any] | ||
| if llm_call_request.config.is_stored_config: | ||
| config_dict = { | ||
| "config_id": str(llm_call_request.config.id), | ||
| "config_version": llm_call_request.config.version, | ||
| } | ||
| else: | ||
| config_dict = { | ||
| "config_blob": resolved_config_blob.model_dump() | ||
| } | ||
|
|
||
| llm_call = update_llm_call_resolved_fields( | ||
| session, | ||
| llm_call_id=existing.id, | ||
| input_type=_input_type, | ||
| output_type=_output_type, | ||
| provider=original_provider, | ||
| model=model_name, | ||
| config=config_dict, | ||
| ) | ||
| else: | ||
| llm_call = create_llm_call( | ||
| session, | ||
| request=llm_call_request, | ||
| job_id=job_id, | ||
| project_id=project_id, | ||
| organization_id=organization_id, | ||
| resolved_config=resolved_config_blob, | ||
| original_provider=original_provider, | ||
| chain_id=chain_id, | ||
| ) | ||
| llm_call_id = llm_call.id |
There was a problem hiding this comment.
Persist the call record before any input-guardrail early return.
With the new pending→resolved flow, the LlmCall only gets created/resolved in this block. If apply_input_guardrails() returns guardrail_direct_response or input_error earlier, execute_llm_call() exits without updating the pre-created standalone row and without creating a chain-block row at all. That drops successful short-circuit responses from llm_call history.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/services/llm/jobs.py` around lines 648 - 709, The guardrail
short-circuit can return before the LlmCall row is created/resolved, losing call
history; ensure the pending→resolved persistence happens before any early return
from execute_llm_call/apply_input_guardrails by either (A) moving the existing
create/update logic (get_llm_call_by_job_id, update_llm_call_resolved_fields,
create_llm_call) to run immediately after resolving the config and before
calling apply_input_guardrails(), or (B) if keeping guardrail invocation first,
add a code path that when apply_input_guardrails returns
guardrail_direct_response or input_error it still calls
update_llm_call_resolved_fields (or create_llm_call when chain_id set) to
persist the resolved call; reference get_llm_call_by_job_id,
update_llm_call_resolved_fields, create_llm_call, apply_input_guardrails,
execute_llm_call, and start_job to locate the relevant spots.
| input=serialize_input(request.query.input), | ||
| configs=[block.model_dump(mode="json") for block in request.blocks], | ||
| ) | ||
| chain_record = get_llm_chain_by_job_id(session, job_uuid) |
There was a problem hiding this comment.
Import get_llm_chain_by_job_id before calling it.
This path will fail with NameError on the first chain execution because the symbol is never imported into backend/app/services/llm/jobs.py.
🐛 Proposed fix
-from app.crud.llm_chain import create_llm_chain, update_llm_chain_status
+from app.crud.llm_chain import (
+ create_llm_chain,
+ get_llm_chain_by_job_id,
+ update_llm_chain_status,
+)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| chain_record = get_llm_chain_by_job_id(session, job_uuid) | |
| from app.crud.llm_chain import ( | |
| create_llm_chain, | |
| get_llm_chain_by_job_id, | |
| update_llm_chain_status, | |
| ) |
🧰 Tools
🪛 Ruff (0.15.12)
[error] 1272-1272: Undefined name get_llm_chain_by_job_id
(F821)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/services/llm/jobs.py` at line 1272, The call to
get_llm_chain_by_job_id in jobs.py is missing its import and will raise
NameError; add an import for get_llm_chain_by_job_id (the function used at the
call site) at the top of backend/app/services/llm/jobs.py from the module where
it’s defined so the symbol is available before the line invoking
get_llm_chain_by_job_id(session, job_uuid).
Summary
Target issue is #819
Explain the motivation for making this change. What existing problem does the pull request solve?
Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.Notes
Please add here if any other information is required for the reviewer.
Summary by CodeRabbit
New Features
Bug Fixes
Chores