-
Couldn't load subscription status.
- Fork 5
Refactor background response processing to move initialization logic from endpoint to background task #379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughRefactors async response handling to accept serialized request data with project and organization IDs, reconstruct required resources (DB session, assistant, OpenAI client, Langfuse tracer), manage conversation ancestry, invoke OpenAI responses API, persist conversation records, and send callbacks. Endpoint schedules the updated background task with the new signature. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant API as Responses Endpoint
participant Worker as process_response (Background)
participant DB as Database
participant LF as Langfuse
participant OA as OpenAI Responses API
participant CB as Callback URL
API->>Worker: Enqueue process_response(request.model_dump(), project_id, organization_id)
Worker->>DB: Open Session
Worker->>DB: Fetch Assistant by assistant_id/project_id/org_id
Worker->>LF: Initialize LangfuseTracer(response_id)
LF-->>Worker: Trace started (tags: assistant_id)
Worker->>DB: Lookup latest conversation by ancestor_id
DB-->>Worker: Latest conversation (optional)
Worker->>OA: responses.create(model, input, instructions, temperature, tools?, previous_response_id?)
activate OA
OA-->>Worker: response + chunks + usage
deactivate OA
Worker->>DB: Persist OpenAIConversation (assistant_id, ancestor_response_id)
Worker->>LF: End generation with output, usage
LF-->>Worker: Acknowledged
alt callback_url provided
Worker->>CB: POST callback payload (diagnostics, tokens, output)
CB-->>Worker: 2xx/Non-2xx
end
Worker-->>API: Background job completes (no direct response)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
Pre-merge checks (3 passed)✅ Passed checks (3 passed)
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. ✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/api/routes/responses.py (2)
283-294: Broaden exception handling to avoid silent task crashes and guarantee callbacks.Only
openai.OpenAIErroris caught. AddHTTPExceptionand a finalExceptioncatch to ensure we always log, flush, and callback.except openai.OpenAIError as e: error_message = handle_openai_error(e) logger.error( f"[process_response] OpenAI API error during response processing: {error_message}, project_id={project_id}", exc_info=True, ) tracer.log_error(error_message, response_id=request.response_id) - - request_dict = request.model_dump() - callback_response = ResponsesAPIResponse.failure_response(error=error_message) + request_dict = request.model_dump() + callback_response = ResponsesAPIResponse.failure_response(error=error_message) + except HTTPException as he: + error_message = getattr(he, "detail", str(he)) + logger.error( + f"[process_response] HTTP error during response processing: {error_message}, project_id={project_id}", + exc_info=True, + ) + tracer.log_error(error_message, response_id=request.response_id) + request_dict = request.model_dump() + callback_response = ResponsesAPIResponse.failure_response(error=error_message) + except Exception as e: + error_message = str(e) + logger.error( + f"[process_response] Unexpected error during response processing: {error_message}, project_id={project_id}", + exc_info=True, + ) + tracer.log_error(error_message, response_id=request.response_id) + request_dict = request.model_dump() + callback_response = ResponsesAPIResponse.failure_response(error=error_message)
94-101: Critical bug: extending with the wrong iterator in file search results.You iterate over
results(the accumulator) instead oftool_call.results, so chunks are always empty or duplicated.def get_file_search_results(response): results: list[FileResultChunk] = [] - for tool_call in response.output: - if tool_call.type == "file_search_call": - results.extend( - [FileResultChunk(score=hit.score, text=hit.text) for hit in results] - ) + for item in response.output: + if getattr(item, "type", None) == "file_search_call" and getattr(item, "results", None): + results.extend( + FileResultChunk(score=hit.score, text=hit.text) for hit in item.results + ) return results
🧹 Nitpick comments (5)
backend/app/api/routes/responses.py (5)
175-181: Include response_id in trace tags to preserve session lineage.
LangfuseTracer.__init__searches bytags=response_idto reuse sessions. Tag the trace with both assistant_id and response_id when available.- tracer.start_trace( + tracer.start_trace( name="generate_response_async", - input={"question": request.question, "assistant_id": assistant_id}, + input={"question": request.question, "assistant_id": assistant_id}, metadata={"callback_url": request.callback_url}, - tags=[assistant_id], + tags=[t for t in [assistant_id, request.response_id] if t], )
296-300: Avoid logging full callback URLs.Callback URLs can contain secrets. Log only the host.
+from urllib.parse import urlsplit @@ - logger.info( - f"[process_response] Sending callback to URL: {request.callback_url}, assistant={mask_string(assistant_id)}, project_id={project_id}" - ) + _host = urlsplit(request.callback_url).netloc if request.callback_url else "N/A" + logger.info( + f"[process_response] Sending callback to host: {_host}, assistant={mask_string(assistant_id)}, project_id={project_id}" + ) @@ - logger.info( - f"[process_response] Callback sent successfully, assistant={mask_string(assistant_id)}, project_id={project_id}" - ) + logger.info( + f"[process_response] Callback dispatched, assistant={mask_string(assistant_id)}, project_id={project_id}, host={_host}" + )Also applies to: 315-317, 1-13
418-418: Unify OpenAI client construction.For consistency with the async path (centralized credential handling, uniform logging), prefer
get_openai_client(_session, organization_id, project_id)here.- client = OpenAI(api_key=credentials["api_key"]) + client = get_openai_client(_session, organization_id, project_id)
411-416: Align error response type with declared response_model.This path returns
APIResponsewhile the route declaresResponsesAPIResponse. Use the same response wrapper for consistency.- return APIResponse( - success=False, - data=additional_data if additional_data else None, - error="OpenAI API key not configured for this organization.", - metadata=None, - ) + return ResponsesAPIResponse( + success=False, + data=additional_data if additional_data else None, + error="OpenAI API key not configured for this organization.", + metadata=None, + )
302-314: Optional: SSRF hardening for callbacks.Consider validating
callback_urlagainst an allowlist and blocking private/internal IP ranges before callingsend_callback.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
backend/app/api/routes/responses.py(7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/api/routes/responses.py (5)
backend/app/utils.py (1)
get_openai_client(170-200)backend/app/crud/assistants.py (1)
get_assistant_by_id(19-30)backend/app/crud/credentials.py (1)
get_provider_credential(102-130)backend/app/core/langfuse/langfuse.py (2)
LangfuseTracer(12-109)start_trace(53-69)backend/app/crud/openai_conversation.py (1)
get_conversation_by_ancestor_id(41-58)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (3)
backend/app/api/routes/responses.py (3)
128-137: Good refactor: background task now self-sufficient.Moving client/tracer/DB setup into
process_responsereduces endpoint coupling and makes retries easier.
362-365: Endpoint scheduling change looks correct.Background task enqueues the serialized request with org/project IDs matching the new signature.
258-259: assistant_id is the OpenAI assistant ID (string) — confirmedOpenAIConversationCreate.assistant_id is typed/described as a string (backend/app/models/openai_conversation.py). Assistant.assistant_id is stored as a VARCHAR/string (backend/app/models/assistants.py) and CRUD looks up by that string (backend/app/crud/assistants.py, backend/app/crud/openai_conversation.py); Alembic migrations show assistant_id as VARCHAR with no FK to the Assistant PK and tests/routes pass OpenAI-style IDs.
| with Session(engine) as session: | ||
| assistant = get_assistant_by_id(session, assistant_id, project_id) | ||
| if not assistant: | ||
| logger.error( | ||
| f"[process_response] Assistant not found: assistant_id={mask_string(assistant_id)}, project_id={project_id}" | ||
| ) | ||
| return | ||
|
|
||
| client = get_openai_client(session, organization_id, project_id) | ||
|
|
||
| langfuse_credentials = get_provider_credential( | ||
| session=session, | ||
| org_id=organization_id, | ||
| provider="langfuse", | ||
| project_id=project_id, | ||
| ) | ||
| tracer = LangfuseTracer( | ||
| credentials=langfuse_credentials, | ||
| response_id=request.response_id, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Early failures skip callback and tracer; also get_openai_client exceptions are unhandled.
If the assistant is missing or get_openai_client raises HTTPException, the task exits without sending a failure callback or flushing the tracer. Initialize the tracer before early returns and catch HTTPException around client creation; send a failure callback in both cases.
Apply this diff to reorder tracer init, add error callbacks, and handle client init errors:
with Session(engine) as session:
- assistant = get_assistant_by_id(session, assistant_id, project_id)
- if not assistant:
- logger.error(
- f"[process_response] Assistant not found: assistant_id={mask_string(assistant_id)}, project_id={project_id}"
- )
- return
-
- client = get_openai_client(session, organization_id, project_id)
-
- langfuse_credentials = get_provider_credential(
- session=session,
- org_id=organization_id,
- provider="langfuse",
- project_id=project_id,
- )
- tracer = LangfuseTracer(
- credentials=langfuse_credentials,
- response_id=request.response_id,
- )
+ # Init tracer early so we can record/flush even on early failures
+ langfuse_credentials = get_provider_credential(
+ session=session,
+ org_id=organization_id,
+ provider="langfuse",
+ project_id=project_id,
+ )
+ tracer = LangfuseTracer(credentials=langfuse_credentials, response_id=request.response_id)
+
+ assistant = get_assistant_by_id(session, assistant_id, project_id)
+ if not assistant:
+ error_message = "Assistant not found or not active"
+ logger.error(
+ f"[process_response] {error_message}: assistant_id={mask_string(assistant_id)}, project_id={project_id}"
+ )
+ tracer.log_error(error_message, response_id=request.response_id)
+ if request.callback_url:
+ request_dict = request.model_dump()
+ send_callback(
+ request.callback_url,
+ {
+ "success": False,
+ "data": get_additional_data(request_dict) or None,
+ "error": error_message,
+ "metadata": None,
+ },
+ )
+ tracer.flush()
+ return
+
+ try:
+ client = get_openai_client(session, organization_id, project_id)
+ except HTTPException as he:
+ error_message = getattr(he, "detail", str(he))
+ logger.error(
+ f"[process_response] Failed to configure OpenAI client: {error_message} | project_id={project_id}",
+ exc_info=True,
+ )
+ tracer.log_error(error_message, response_id=request.response_id)
+ if request.callback_url:
+ request_dict = request.model_dump()
+ send_callback(
+ request.callback_url,
+ {
+ "success": False,
+ "data": get_additional_data(request_dict) or None,
+ "error": error_message,
+ "metadata": None,
+ },
+ )
+ tracer.flush()
+ returnAlso applies to: 145-149
🤖 Prompt for AI Agents
backend/app/api/routes/responses.py around lines 142-161: the tracer is created
after possible early returns and get_openai_client can raise HTTPException, so
move LangfuseTracer initialization to before any early-return checks; wrap
get_openai_client call in a try/except catching HTTPException, and in both the
assistant-not-found branch and the client-init exception handler invoke the
failure callback (using the same response_id/context available) and flush/close
the tracer before returning so the failure is recorded and sent.
|
Converted to draft not tested yes. |
Summary
This PR refactors the process_response background task function to be more self-contained by moving initialization logic (database operations, client creation, and tracer setup) from the main endpoint to within the background task itself.
Explain the motivation for making this change. What existing problem does the pull request solve?
Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.Notes
Please add here if any other information is required for the reviewer.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor