-
Notifications
You must be signed in to change notification settings - Fork 5
Move response API to Celery-backed job processing #381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds a job queue and orchestration for responses: DB migration and Job model/CRUD, route refactor to start jobs, Celery scheduling/execution, response generation/persistence/callback delivery, utilities for OpenAI errors/callbacks, config/env updates, and tests. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant API as /api/v1/responses
participant S as start_job
participant DB as JobCrud/DB
participant Q as Celery Queue
participant W as Worker.execute_job
participant SRV as process_response
participant OA as OpenAI
participant CB as Callback URL
C->>API: POST ResponsesAPIRequest
API->>S: start_job(db, request, project_id, org_id)
S->>DB: create Job (type=RESPONSE, trace_id)
S->>Q: enqueue execute_job(job_id, task_id, request_data)
API-->>C: APIResponse { data: { status: "processing", message: ... } }
Note over Q,W: asynchronous worker execution
Q->>W: execute_job(...)
W->>SRV: process_response(request, project_id, org_id, job_id, task_id)
SRV->>OA: generate_response(...) (OpenAI calls)
alt success
SRV->>DB: update Job status=SUCCESS
alt callback_url present
W->>CB: send_response_callback(payload)
CB-->>W: 200 OK
end
else failure
SRV->>DB: update Job status=FAILED, error_message
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
🧰 Additional context used🧬 Code graph analysis (2)backend/app/services/response/response.py (5)
backend/app/services/response/jobs.py (6)
🪛 Ruff (0.13.1)backend/app/services/response/response.py208-208: Unused function argument: (ARG001) ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
🔇 Additional comments (2)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
1) Move models from route to models folder 2) fix process response to handle callback response
…tructured response
…nd error handling
23f67a4
to
96160c6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/tests/utils/openai.py (1)
25-31
: Avoid mutable default for vector_store_idsUsing a list as a default arg can leak state across tests. Initialize inside the function.
Apply:
def mock_openai_assistant( assistant_id: str = "assistant_mock", - vector_store_ids: Optional[list[str]] = ["vs_1", "vs_2"], + vector_store_ids: Optional[list[str]] = None, max_num_results: int = 30, ) -> OpenAIAssistant: - return OpenAIAssistant( + if vector_store_ids is None: + vector_store_ids = ["vs_1", "vs_2"] + return OpenAIAssistant(backend/app/tests/services/response/test_jobs.py (1)
1-38
: Fix PyTest “import file mismatch” by renaming this test module.Another module named
test_jobs.py
already exists atbackend/app/tests/crud/test_jobs.py
. Rename this file to a unique basename, e.g.,test_response_jobs_service.py
.
🧹 Nitpick comments (26)
backend/app/utils.py (2)
7-7
: Tighten typing and generic return for APIResponse
- Prefer built-in typing (dict | None) over typing.Dict/Optional.
- failure_response returns data but is annotated as APIResponse[None]. Make it APIResponse[T | None].
Apply:
-from typing import Any, Dict, Generic, Optional, TypeVar +from typing import Any, Generic, TypeVar @@ class APIResponse(BaseModel, Generic[T]): success: bool - data: Optional[T] = None - error: Optional[str] = None - metadata: Optional[Dict[str, Any]] = None + data: T | None = None + error: str | None = None + metadata: dict[str, Any] | None = None @@ @classmethod def failure_response( - cls, - error: str | list, - data: Optional[T] = None, - metadata: Optional[Dict[str, Any]] = None, - ) -> "APIResponse[None]": + cls, + error: str | list[dict[str, Any]] | dict[str, Any], + data: T | None = None, + metadata: dict[str, Any] | None = None, + ) -> "APIResponse[T | None]": - if isinstance(error, list): # to handle cases when error is a list of errors - error_message = "\n".join([f"{err['loc']}: {err['msg']}" for err in error]) + if isinstance(error, list): + error_message = "\n".join(f"{err.get('loc')}: {err.get('msg')}" for err in error) + elif isinstance(error, dict) and "msg" in error: + error_message = f"{error.get('loc')}: {error['msg']}" else: error_message = errorAlso applies to: 29-40, 41-54
208-223
: Narrow bare except in handle_openai_errorReplace bare except with Exception to satisfy linters and avoid swallowing BaseException.
Apply:
- except: + except Exception: passbackend/app/models/job.py (1)
36-44
: Index commonly queried fieldsIndex status, job_type, and created_at to speed lookups (e.g., dashboards, cleaners).
Apply:
- status: JobStatus = Field( - default=JobStatus.PENDING, description="Current state of the job." - ) - job_type: JobType = Field( - description="Job type or classification (e.g., response job, ingestion job)." - ) - created_at: datetime = Field(default_factory=now) + status: JobStatus = Field( + default=JobStatus.PENDING, description="Current state of the job.", index=True + ) + job_type: JobType = Field( + description="Job type or classification (e.g., response job, ingestion job).", + index=True, + ) + created_at: datetime = Field(default_factory=now, index=True)Note: requires an Alembic migration.
backend/app/models/response.py (2)
10-12
: Use Pydantic v2 config style (if on v2)SQLModel on Pydantic v2 expects model_config instead of inner Config.
Apply if using Pydantic v2:
+from pydantic import ConfigDict @@ - class Config: - extra = "allow" + model_config = ConfigDict(extra="allow") @@ - class Config: - extra = "allow" + model_config = ConfigDict(extra="allow") @@ - class Config: - extra = "allow" + model_config = ConfigDict(extra="allow") @@ - class Config: - extra = "allow" + model_config = ConfigDict(extra="allow")If still on Pydantic v1, keep as-is for now.
Also applies to: 23-25, 31-33, 54-56
27-30
: Constrain status typeResponseJobStatus.status is a free-form str. Consider Literal or reusing a lower-cased job status enum to avoid drift.
Would you like a small helper to map JobStatus (DB) -> API wire values (e.g., PROCESSING -> "processing")?
backend/app/tests/crud/test_jobs.py (1)
45-52
: Capture pre-update timestamp to assert monotonicityThe session may update job in-place; compare against a saved value.
Apply:
- update_data = JobUpdate(status=JobStatus.SUCCESS, error_message="All good now") - updated_job = crud.update(job.id, update_data) + old_updated_at = job.updated_at + update_data = JobUpdate(status=JobStatus.SUCCESS, error_message="All good now") + updated_job = crud.update(job.id, update_data) @@ - assert updated_job.updated_at >= job.updated_at + assert updated_job.updated_at > old_updated_atbackend/app/tests/api/routes/test_responses.py (1)
18-20
: Add leading slash to the endpoint pathFastAPI TestClient expects an absolute path; relative paths can misroute.
Apply:
- response = client.post( - "api/v1/responses", json=payload.model_dump(), headers=user_api_key_header - ) + response = client.post( + "/api/v1/responses", json=payload.model_dump(), headers=user_api_key_header + )backend/app/tests/services/response/response/test_generate_response.py (3)
27-29
: Remove unused fixture/var and set an explicit mock return.
db
param is unused; drop it.mock_response
is never used; remove it.- Also set a concrete return value for
mock_client.responses.create
to avoid brittle MagicMock attribute chaining.Apply:
-def test_generate_response_success(db: Session, assistant_mock: Assistant): +def test_generate_response_success(assistant_mock: Assistant): """Test successful OpenAI response generation.""" - mock_response = MagicMock() mock_client = MagicMock() + mock_client.responses.create.return_value = mock_openai_response( + text="Paris is the capital of France." + )
1-3
: Use the shared OpenAI response helper for realism.Import the test helper to generate a realistic OpenAI response object.
import pytest from unittest.mock import MagicMock +from app.tests.utils.openai import mock_openai_response
13-24
: Fix fixture accuracy and typing.
- The docstring says “in DB” but the object isn’t persisted; reword it.
Assistant.id
is an int in models; use an int to avoid type drift.- """Fixture to create an assistant in DB with id=123.""" + """Fixture to create an Assistant object (not persisted).""" assistant = Assistant( - id="123", + id=123, name="Test Assistant", model="gpt-4", temperature=0.7, instructions="You are a helpful assistant.", vector_store_ids=["vs1", "vs2"], max_num_results=5, )backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py (1)
51-68
: Drop Postgres enum types on downgrade to avoid orphaned types.Otherwise subsequent re-migrations can fail.
op.create_foreign_key( "openai_conversation_project_id_fkey1", "openai_conversation", "project", ["project_id"], ["id"], ) op.drop_table("job") + bind = op.get_bind() + if bind.dialect.name == "postgresql": + op.execute("DROP TYPE IF EXISTS jobstatus") + op.execute("DROP TYPE IF EXISTS jobtype")backend/app/tests/services/response/test_jobs.py (1)
30-37
: Optionally assert Celery trace propagation.You can assert the
trace_id
forwarded to Celery to ensure correlation is preserved (expected default is "N/A" without a request context).args, kwargs = mock_schedule.call_args assert kwargs["trace_id"] in ("N/A", None) # or the correlation id you set in the testbackend/app/tests/services/response/response/test_process_response.py (1)
33-41
: Avoid accidental network calls when creating an assistant.If
create_assistant
hits OpenAI, patch it or pass a stubbed client to keep tests hermetic. Confirm it doesn’t perform I/O; otherwise:with patch("app.crud.create_assistant") as mock_create: mock_create.return_value = Assistant(assistant_id="asst_123", model="gpt-4", name="Test Assistant") ...backend/app/crud/jobs.py (1)
25-39
: Minor: early-exit when no fields to update.If
job_update
is empty, skip commit/refresh to avoid a no-op write.update_data = job_update.model_dump(exclude_unset=True) - for field, value in update_data.items(): - setattr(job, field, value) + if not update_data: + return job + for field, value in update_data.items(): + setattr(job, field, value)backend/app/services/response/callbacks.py (2)
4-20
: Harden against leaking sensitive request fields in callbacks.Since
ResponsesAPIRequest
allows extras, blacklist alone is risky. Filter out common secret tokens.def get_additional_data(request: dict) -> dict: @@ - return {k: v for k, v in request.items() if k not in exclude_keys} + sensitive = {"api_key", "apikey", "secret", "password", "token", "credential", "cookie", "auth"} + return { + k: v + for k, v in request.items() + if k not in exclude_keys and not any(s in k.lower() for s in sensitive) + }
22-41
: Optionally propagate callback send result.Returning a bool can help callers decide on retries. Currently it’s fire-and-forget.
-def send_response_callback(... ) -> None: +def send_response_callback(... ) -> bool: @@ - send_callback( + return send_callback( callback_url, { "success": callback_response.get("success", False), "data": { **(callback_response.get("data") or {}), **get_additional_data(request_dict), }, "error": callback_response.get("error"), "metadata": None, }, )backend/app/services/response/jobs.py (2)
1-15
: Prune unused imports.Drop unused
HTTPException
,engine
,JobStatus
, andsend_callback
.-from fastapi import HTTPException -from sqlmodel import Session -from asgi_correlation_id import correlation_id -from app.core.db import engine +from sqlmodel import Session +from asgi_correlation_id import correlation_id @@ -from app.models import JobType, JobStatus, JobUpdate, ResponsesAPIRequest +from app.models import JobType, JobUpdate, ResponsesAPIRequest @@ -from app.api.routes.threads import send_callback
27-39
: Persist Celerytask_id
on the Job after scheduling.This helps correlate jobs with Celery tasks from the outset (before processing begins).
task_id = start_high_priority_job( function_path="app.services.response.jobs.execute_job", project_id=project_id, job_id=str(job.id), trace_id=trace_id, request_data=request.model_dump(), organization_id=organization_id, ) + # store Celery task id + job_crud.update(job.id, JobUpdate(task_id=task_id)) + logger.info( f"[start_job] Job scheduled to generate response | job_id={job.id}, project_id={project_id}, task_id={task_id}" )backend/app/services/response/response.py (3)
94-101
: Ensure Langfuse tags are strings.
Assistant.id
is an int; cast to str to satisfy Langfuse’slist[str]
tags.tracer.start_trace( name="generate_response_async", - input={"question": request.question, "assistant_id": assistant.id}, + input={"question": request.question, "assistant_id": str(assistant.id)}, metadata={"callback_url": request.callback_url}, - tags=[assistant.id], + tags=[str(assistant.id)], )
145-151
: Align log context with function name.The log tag says
process_response_task
insidegenerate_response
. Use a consistent label.- logger.error( - f"[process_response_task] OpenAI API error: {error_message}", + logger.error( + f"[generate_response] OpenAI API error: {error_message}", exc_info=True, )
200-207
: Silence unusedtask_instance
lint without breaking the signature.Keep the parameter (Celery passes it) but mark as intentionally unused.
def process_response( request: ResponsesAPIRequest, project_id: int, organization_id: int, job_id: UUID, task_id: str, task_instance, ) -> APIResponse: + # Celery passes `task_instance`; not used here. + del task_instancebackend/app/api/routes/responses.py (5)
29-36
: Return 202 for async job start.Signal asynchronous semantics explicitly.
-@router.post("/responses", response_model=APIResponse[ResponseJobStatus]) +@router.post("/responses", response_model=APIResponse[ResponseJobStatus], status_code=202)
41-46
: Add idempotency to avoid duplicate jobs on client retries.Consider deduping by a stable key (e.g.,
request.response_id
or a hash of the request) instart_job
/JobCrud.create
.
71-84
: Standardize error envelopes via APIResponse.failure_response.Keeps shape consistent and centralized.
- return JSONResponse( - status_code=e.status_code, - content={ - "success": False, - "data": additional_data if additional_data else None, - "error": str(e.detail), - "metadata": None, - }, - ) + return JSONResponse( + status_code=e.status_code, + content=APIResponse.failure_response( + error=str(e.detail), + data=additional_data if additional_data else None, + ).model_dump(), + )- return JSONResponse( - status_code=400, - content={ - "success": False, - "data": additional_data if additional_data else None, - "error": error_message, - "metadata": None, - }, - ) + status = getattr(e, "status_code", 400) + return JSONResponse( + status_code=status, + content=APIResponse.failure_response( + error=error_message, + data=additional_data if additional_data else None, + ).model_dump(), + )Also applies to: 183-191
65-65
: Clarify docstring (not just “benchmarking”).Small wording nit to reflect production use.
- """Synchronous endpoint for benchmarking OpenAI responses API with Langfuse tracing.""" + """Synchronous endpoint calling OpenAI Responses API with Langfuse tracing."""
106-122
: Guard against long hangs with client-level timeouts.Set reasonable connect/read timeouts when creating the OpenAI client (in
get_openai_client
) and optionally enable retries with backoff.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py
(1 hunks)backend/app/api/routes/responses.py
(4 hunks)backend/app/crud/__init__.py
(2 hunks)backend/app/crud/jobs.py
(1 hunks)backend/app/models/__init__.py
(2 hunks)backend/app/models/job.py
(1 hunks)backend/app/models/response.py
(1 hunks)backend/app/services/response/callbacks.py
(1 hunks)backend/app/services/response/jobs.py
(1 hunks)backend/app/services/response/response.py
(1 hunks)backend/app/tests/api/routes/test_responses.py
(1 hunks)backend/app/tests/crud/test_jobs.py
(1 hunks)backend/app/tests/services/response/response/test_generate_response.py
(1 hunks)backend/app/tests/services/response/response/test_process_response.py
(1 hunks)backend/app/tests/services/response/test_jobs.py
(1 hunks)backend/app/tests/utils/openai.py
(2 hunks)backend/app/utils.py
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (14)
backend/app/utils.py (1)
backend/app/api/routes/collections.py (3)
success
(163-164)success
(172-174)success
(198-200)
backend/app/tests/services/response/response/test_generate_response.py (5)
backend/app/core/langfuse/langfuse.py (1)
LangfuseTracer
(12-109)backend/app/models/assistants.py (1)
Assistant
(33-44)backend/app/models/response.py (1)
ResponsesAPIRequest
(4-11)backend/app/services/response/response.py (1)
generate_response
(82-154)backend/app/tests/conftest.py (1)
db
(24-41)
backend/app/crud/__init__.py (2)
backend/app/crud/jobs.py (1)
JobCrud
(11-42)backend/app/crud/openai_conversation.py (1)
get_ancestor_id_from_response
(61-99)
backend/app/crud/jobs.py (1)
backend/app/models/job.py (3)
Job
(20-43)JobType
(16-17)JobUpdate
(46-49)
backend/app/models/__init__.py (2)
backend/app/models/job.py (4)
Job
(20-43)JobType
(16-17)JobStatus
(9-13)JobUpdate
(46-49)backend/app/models/response.py (6)
CallbackResponse
(47-55)Diagnostics
(35-39)FileResultChunk
(42-44)ResponsesAPIRequest
(4-11)ResponseJobStatus
(27-32)ResponsesSyncAPIRequest
(14-24)
backend/app/models/response.py (1)
backend/app/tests/api/routes/test_assistants.py (1)
assistant_id
(25-26)
backend/app/tests/services/response/response/test_process_response.py (8)
backend/app/services/response/response.py (1)
process_response
(200-282)backend/app/models/response.py (1)
ResponsesAPIRequest
(4-11)backend/app/models/job.py (3)
Job
(20-43)JobStatus
(9-13)JobType
(16-17)backend/app/tests/conftest.py (1)
db
(24-41)backend/app/utils.py (1)
APIResponse
(29-53)backend/app/tests/utils/test_data.py (1)
create_test_credential
(103-130)backend/app/tests/utils/openai.py (2)
mock_openai_response
(57-87)generate_openai_id
(17-22)backend/app/crud/jobs.py (1)
JobCrud
(11-42)
backend/app/tests/services/response/test_jobs.py (5)
backend/app/models/response.py (1)
ResponsesAPIRequest
(4-11)backend/app/models/job.py (2)
JobType
(16-17)JobStatus
(9-13)backend/app/crud/jobs.py (2)
JobCrud
(11-42)get
(41-42)backend/app/tests/utils/utils.py (1)
get_project
(70-89)backend/app/tests/conftest.py (1)
db
(24-41)
backend/app/services/response/jobs.py (7)
backend/app/crud/jobs.py (1)
JobCrud
(11-42)backend/app/models/job.py (3)
JobType
(16-17)JobStatus
(9-13)JobUpdate
(46-49)backend/app/models/response.py (1)
ResponsesAPIRequest
(4-11)backend/app/utils.py (1)
APIResponse
(29-53)backend/app/celery/utils.py (1)
start_high_priority_job
(18-43)backend/app/services/response/response.py (1)
process_response
(200-282)backend/app/services/response/callbacks.py (1)
send_response_callback
(22-41)
backend/app/services/response/callbacks.py (1)
backend/app/utils.py (2)
APIResponse
(29-53)send_callback
(225-237)
backend/app/tests/crud/test_jobs.py (3)
backend/app/crud/jobs.py (4)
JobCrud
(11-42)create
(15-23)get
(41-42)update
(25-39)backend/app/models/job.py (3)
JobUpdate
(46-49)JobStatus
(9-13)JobType
(16-17)backend/app/tests/conftest.py (1)
db
(24-41)
backend/app/tests/api/routes/test_responses.py (2)
backend/app/models/response.py (1)
ResponsesAPIRequest
(4-11)backend/app/tests/conftest.py (2)
client
(52-55)user_api_key_header
(77-79)
backend/app/services/response/response.py (6)
backend/app/core/langfuse/langfuse.py (6)
LangfuseTracer
(12-109)start_trace
(53-69)start_generation
(71-84)end_generation
(86-93)update_trace
(95-97)log_error
(99-106)backend/app/crud/jobs.py (1)
JobCrud
(11-42)backend/app/crud/credentials.py (1)
get_provider_credential
(102-130)backend/app/crud/openai_conversation.py (3)
create_conversation
(140-163)get_ancestor_id_from_response
(61-99)get_conversation_by_ancestor_id
(41-58)backend/app/models/response.py (4)
CallbackResponse
(47-55)Diagnostics
(35-39)FileResultChunk
(42-44)ResponsesAPIRequest
(4-11)backend/app/utils.py (5)
APIResponse
(29-53)get_openai_client
(175-205)mask_string
(163-172)failure_response
(42-53)success_response
(36-39)
backend/app/api/routes/responses.py (7)
backend/app/api/deps.py (2)
get_db
(33-35)get_current_user_org_project
(110-131)backend/app/core/langfuse/langfuse.py (1)
LangfuseTracer
(12-109)backend/app/models/response.py (5)
CallbackResponse
(47-55)Diagnostics
(35-39)ResponsesAPIRequest
(4-11)ResponseJobStatus
(27-32)ResponsesSyncAPIRequest
(14-24)backend/app/services/response/jobs.py (1)
start_job
(19-39)backend/app/services/response/response.py (1)
get_file_search_results
(41-50)backend/app/services/response/callbacks.py (1)
get_additional_data
(4-19)backend/app/utils.py (4)
APIResponse
(29-53)get_openai_client
(175-205)handle_openai_error
(208-222)success_response
(36-39)
🪛 Ruff (0.13.1)
backend/app/utils.py
7-7: typing.Dict
is deprecated, use dict
instead
(UP035)
220-220: Do not use bare except
(E722)
backend/app/tests/services/response/response/test_generate_response.py
27-27: Unused function argument: db
(ARG001)
29-29: Local variable mock_response
is assigned to but never used
Remove assignment to unused variable mock_response
(F841)
backend/app/services/response/response.py
206-206: Unused function argument: task_instance
(ARG001)
🪛 GitHub Actions: AI Platform CI
backend/app/tests/services/response/test_jobs.py
[error] 1-1: import file mismatch: imported module 'test_jobs' has this file attribute: /home/runner/work/ai-platform/ai-platform/backend/app/tests/crud/test_jobs.py which is not the same as the test file we want to collect: /home/runner/work/ai-platform/ai-platform/backend/app/tests/services/response/test_jobs.py. HINT: remove pycache / .pyc files and/or use a unique basename for your test file modules
🔇 Additional comments (10)
backend/app/tests/utils/openai.py (1)
57-87
: Mock shape looks sufficientThe SimpleNamespace response covers IDs, output, and usage fields used in tests. No blockers.
backend/app/models/__init__.py (1)
18-19
: Re-exports look consistentThe new public surface (Job*, Responses*, CallbackResponse, etc.) is cleanly exposed.
Also applies to: 97-105
backend/app/crud/__init__.py (1)
13-14
: CRUD exports LGTMExporting JobCrud and get_ancestor_id_from_response aligns with usage across services and tests.
Also applies to: 63-72
backend/app/tests/services/response/response/test_generate_response.py (1)
51-56
: Confirm exception class alignment with installedopenai
SDK.Some SDK versions prefer
openai.APIError
overOpenAIError
. If CI/lint starts failing on import, switch tofrom openai import APIError
and raise that instead.backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py (1)
38-47
: Verify FK constraint names exist before dropping.The suffix “_fkey1” looks nonstandard and will fail if names differ. Please confirm actual names in your DB and adjust accordingly.
backend/app/tests/services/response/response/test_process_response.py (1)
59-89
: LGTM: success path is well-isolated.Good use of patching
generate_response
and DB assertions forJobStatus.SUCCESS
.backend/app/crud/jobs.py (1)
15-23
: LGTM: create flow is idiomatic and safe.Commit + refresh pattern is correct.
backend/app/services/response/response.py (1)
56-68
: Nice: callback payload includes diagnostics and file-search chunks.Solid structure for downstream consumers.
backend/app/api/routes/responses.py (2)
92-105
: Langfuse instrumentation looks solid.Trace + generation boundaries and metadata are hooked correctly.
If you want, I can add light tests asserting trace start/update calls via a Langfuse test double.
156-170
: Success envelope LGTM.Shape is consistent and diagnostics are complete.
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
backend/app/tests/services/response/test_jobs_response.py (2)
30-37
: Also assert question is forwarded to the scheduler payload.This tightens the contract on request_data.
mock_schedule.assert_called_once() - _, kwargs = mock_schedule.call_args + _, kwargs = mock_schedule.call_args assert kwargs["function_path"] == "app.services.response.jobs.execute_job" assert kwargs["project_id"] == project.id assert kwargs["organization_id"] == project.organization_id assert kwargs["job_id"] == str(job_id) assert kwargs["request_data"]["assistant_id"] == "assistant_123" + assert kwargs["request_data"]["question"] == "What is the capital of France?"
16-17
: Prefer a dedicated project fixture for determinism.Selecting “any active project” can couple test behavior to seed state. Consider a fixture that creates/returns a known test project.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
backend/app/tests/services/response/test_jobs_response.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/tests/services/response/test_jobs_response.py (5)
backend/app/models/response.py (1)
ResponsesAPIRequest
(4-11)backend/app/models/job.py (2)
JobType
(16-17)JobStatus
(9-13)backend/app/crud/jobs.py (1)
JobCrud
(11-42)backend/app/tests/utils/utils.py (1)
get_project
(70-89)backend/app/tests/conftest.py (1)
db
(24-41)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (2)
backend/app/tests/services/response/test_jobs_response.py (2)
10-37
: LGTM: solid happy‑path coverage for start_job.Covers job creation, initial status, and scheduler invocation with key args. Nice.
21-29
: Don't assert Celery task id in start_job test — start_job doesn't persist it.start_job only creates the Job and schedules the task (logs the task_id); Job.task_id is set later by the worker in process_response via JobCrud.update. See backend/app/services/response/jobs.py and backend/app/services/response/response.py.
Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
backend/pyproject.toml (3)
27-28
: Remove pytest and pre-commit from runtime dependencies.These are dev tools and should not ship in prod images; they already exist under dev-dependencies.
Apply this diff:
- "pytest>=7.4.4", - "pre-commit>=3.8.0",
61-63
: Align Ruff target-version with supported Python (set to py311).requires-python is >=3.11,<4.0 but Ruff targets py310; update to match.
-[tool.ruff] -target-version = "py310" +[tool.ruff] +target-version = "py311"
37-37
: Make Flower an optional extra (monitoring) — safe to remove from runtime depsrg found only release-notes references; no code imports/usages detected. Move Flower out of runtime deps or pin an upper bound.
Apply this diff to remove it from runtime:
- "flower>=2.0.1",
Then add this block to declare an extra:
[project.optional-dependencies] monitoring = [ "flower>=2.0.1,<2.1.0" ]Install with: pip install .[monitoring]
If you must keep it in runtime, add an upper bound:
- "flower>=2.0.1", + "flower>=2.0.1,<2.1.0",Ensure the Flower UI is not publicly exposed without auth and run it as a separate service in compose/k8s.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
backend/uv.lock
is excluded by!**/*.lock
📒 Files selected for processing (1)
backend/pyproject.toml
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
@avirajsingh7 please resolve review comments before merging |
backend/app/tests/services/response/response/test_generate_response.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (2)
backend/app/utils.py (1)
7-7
: Update deprecated type hint.Use the built-in
dict
instead oftyping.Dict
which is deprecated in Python 3.9+.-from typing import Any, Dict, Generic, Optional, TypeVar +from typing import Any, Generic, Optional, TypeVarAnd update line 33:
- metadata: Optional[Dict[str, Any]] = None + metadata: Optional[dict[str, Any]] = Nonebackend/app/tests/services/response/response/test_generate_response.py (1)
27-27
: Remove unuseddb
parameter from test.The test function declares a
db
parameter but doesn't use it.-def test_generate_response_success(db: Session, assistant_mock: Assistant): +def test_generate_response_success(assistant_mock: Assistant):
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py
(1 hunks)backend/app/core/config.py
(1 hunks)backend/app/services/response/jobs.py
(1 hunks)backend/app/tests/services/response/response/test_generate_response.py
(1 hunks)backend/app/tests/services/response/test_jobs_response.py
(1 hunks)backend/app/utils.py
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/app/tests/services/response/test_jobs_response.py
- backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py
🧰 Additional context used
🧬 Code graph analysis (3)
backend/app/services/response/jobs.py (7)
backend/app/crud/jobs.py (1)
JobCrud
(11-42)backend/app/models/job.py (3)
JobType
(16-17)JobStatus
(9-13)JobUpdate
(46-49)backend/app/models/response.py (1)
ResponsesAPIRequest
(4-11)backend/app/utils.py (1)
APIResponse
(29-53)backend/app/celery/utils.py (1)
start_high_priority_job
(18-43)backend/app/services/response/response.py (1)
process_response
(200-282)backend/app/services/response/callbacks.py (1)
send_response_callback
(22-41)
backend/app/utils.py (2)
backend/app/api/routes/collections.py (3)
success
(163-164)success
(172-174)success
(198-200)backend/app/api/routes/threads.py (2)
handle_openai_error
(52-56)send_callback
(37-49)
backend/app/tests/services/response/response/test_generate_response.py (5)
backend/app/core/langfuse/langfuse.py (1)
LangfuseTracer
(12-109)backend/app/models/assistants.py (1)
Assistant
(33-44)backend/app/models/response.py (1)
ResponsesAPIRequest
(4-11)backend/app/services/response/response.py (1)
generate_response
(82-154)backend/app/tests/conftest.py (1)
db
(24-41)
🪛 Ruff (0.13.1)
backend/app/utils.py
7-7: typing.Dict
is deprecated, use dict
instead
(UP035)
220-220: Do not use bare except
(E722)
backend/app/tests/services/response/response/test_generate_response.py
27-27: Unused function argument: db
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (2)
backend/app/core/config.py (1)
120-120
: LGTM! Good configuration addition for callback timeouts.The
CALLBACK_TIMEOUT
tuple provides configurable timeout values for connect and read operations, which is essential for preventing hanging connections during callback operations. This aligns well with the Celery-backed processing changes.backend/app/utils.py (1)
225-240
: Good implementation of timeout configuration!The
send_callback
function properly uses the configurableCALLBACK_TIMEOUT
from settings and implements a context manager for session cleanup, addressing the previous review comments effectively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
.env.example (1)
76-80
: Fix dotenv formatting (lint failures): remove extra blank line and spaces around "=".Align with dotenv-linter: no spaces around "=" and avoid extra blank lines.
Apply:
- - # Callback Timeouts (in seconds) -CALLBACK_CONNECT_TIMEOUT = 3 -CALLBACK_READ_TIMEOUT = 10 +CALLBACK_CONNECT_TIMEOUT=3 +CALLBACK_READ_TIMEOUT=10.env.test.example (1)
32-34
: Fix dotenv formatting (lint failures): remove spaces around "=".Comply with dotenv-linter.
# Callback Timeouts (in seconds) -CALLBACK_CONNECT_TIMEOUT = 3 -CALLBACK_READ_TIMEOUT = 10 +CALLBACK_CONNECT_TIMEOUT=3 +CALLBACK_READ_TIMEOUT=10backend/app/utils.py (1)
43-54
: Correct generic return type for failure_response.It can return data on failures now; reflect this in the type.
- ) -> "APIResponse[None]": + ) -> "APIResponse[T | None]":
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
.env.example
(1 hunks).env.test.example
(1 hunks)backend/app/core/config.py
(1 hunks)backend/app/utils.py
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/utils.py (1)
backend/app/api/routes/threads.py (2)
handle_openai_error
(52-56)send_callback
(37-49)
🪛 dotenv-linter (3.3.0)
.env.test.example
[warning] 33-33: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
[warning] 34-34: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
.env.example
[warning] 77-77: [ExtraBlankLine] Extra blank line detected
(ExtraBlankLine)
[warning] 79-79: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
[warning] 80-80: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
🪛 Ruff (0.13.1)
backend/app/utils.py
7-7: typing.Dict
is deprecated, use dict
instead
(UP035)
220-220: Do not use bare except
(E722)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (2)
backend/app/utils.py (1)
208-223
: Replace bare except with explicit exceptions (ruff E722).Avoid masking errors; log parse failures.
elif hasattr(e, "response") and hasattr(e.response, "json"): try: error_data = e.response.json() if isinstance(error_data, dict) and "error" in error_data: error_info = error_data["error"] if isinstance(error_info, dict) and "message" in error_info: return error_info["message"] - except: - pass + except (ValueError, KeyError, AttributeError, TypeError) as parse_err: + logger.debug( + "[handle_openai_error] Failed to parse error payload", + exc_info=True, + ) return str(e)backend/app/core/config.py (1)
121-124
: Back-compat for CALLBACK_TIMEOUT — add shim or verify no remaining references.rg returned no matches for CALLBACK_TIMEOUT (no output); absence of matches isn't definitive — either add the compatibility property below or confirm there are no callers.
# callback timeouts CALLBACK_CONNECT_TIMEOUT: int = 3 CALLBACK_READ_TIMEOUT: int = 10 + # Backward-compat shim for legacy tuple usage + @computed_field # type: ignore[prop-decorator] + @property + def CALLBACK_TIMEOUT(self) -> tuple[int, int]: + return (self.CALLBACK_CONNECT_TIMEOUT, self.CALLBACK_READ_TIMEOUT)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
backend/app/services/response/response.py (1)
202-208
: Drop or underscore the unusedtask_instance
argument.Ruff is already flagging this as unused (ARG001). Either remove the parameter or rename it to
_task_instance
so the task signature stays compatible without tripping lint.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/models/response.py
(1 hunks)backend/app/services/response/response.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/models/response.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/services/response/response.py (8)
backend/app/core/langfuse/langfuse.py (6)
LangfuseTracer
(12-109)start_trace
(53-69)start_generation
(71-84)end_generation
(86-93)update_trace
(95-97)log_error
(99-106)backend/app/crud/jobs.py (1)
JobCrud
(11-42)backend/app/crud/assistants.py (1)
get_assistant_by_id
(19-30)backend/app/crud/credentials.py (1)
get_provider_credential
(102-130)backend/app/crud/openai_conversation.py (3)
create_conversation
(140-163)get_ancestor_id_from_response
(61-99)get_conversation_by_ancestor_id
(41-58)backend/app/models/response.py (4)
CallbackResponse
(47-54)Diagnostics
(35-39)FileResultChunk
(42-44)ResponsesAPIRequest
(4-11)backend/app/models/openai_conversation.py (2)
OpenAIConversationCreate
(72-98)OpenAIConversation
(58-69)backend/app/utils.py (5)
APIResponse
(29-53)get_openai_client
(175-205)mask_string
(163-172)failure_response
(42-53)success_response
(36-39)
🪛 Ruff (0.13.1)
backend/app/services/response/response.py
207-207: Unused function argument: task_instance
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/app/tests/crud/test_jobs.py (1)
48-52
: Capture the pre-update timestamp before asserting onupdated_at
Because
job
andupdated_job
reference the same identity within the session, Line 51 currently compares the post-update value to itself and would pass even ifJobCrud.update
stopped bumpingupdated_at
. Stash the original timestamp before callingupdate(...)
so the assertion actually verifies thatupdated_at
advanced.You can tighten the check like this:
- update_data = JobUpdate(status=JobStatus.FAILED, error_message="Error occurred") - updated_job = crud.update(job.id, update_data) + previous_updated_at = job.updated_at + update_data = JobUpdate(status=JobStatus.FAILED, error_message="Error occurred") + updated_job = crud.update(job.id, update_data) @@ - assert updated_job.updated_at >= job.updated_at + assert previous_updated_at is not None + assert updated_job.updated_at > previous_updated_at
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
backend/app/tests/crud/test_jobs.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/tests/crud/test_jobs.py (3)
backend/app/crud/jobs.py (4)
JobCrud
(11-42)create
(15-23)get
(41-42)update
(25-39)backend/app/models/job.py (3)
JobUpdate
(46-49)JobStatus
(9-13)JobType
(16-17)backend/app/tests/conftest.py (1)
db
(24-41)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
… request metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/services/response/callbacks.py
(1 hunks)backend/app/tests/crud/test_jobs.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/tests/crud/test_jobs.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/services/response/callbacks.py (2)
backend/app/models/response.py (2)
ResponsesAPIRequest
(4-11)ResponsesSyncAPIRequest
(14-24)backend/app/utils.py (2)
APIResponse
(29-53)send_callback
(225-244)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issues attributed to commits in this pull requestThis pull request was merged and Sentry observed the following issues:
|
Summary
This PR refactors the existing response API to use Celery for background processing of responses.
We introduce a Job model to track the status of a response task (PENDING → PROCESSING → SUCCESS/FAILED), schedule tasks via Celery, and send callbacks once processing completes.
What Changed
Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.py
ordocker compose up
in the repository root and test.Summary by CodeRabbit
New Features
Refactor
Chores
Tests