Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 47 additions & 44 deletions backend/app/api/routes/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
get_conversation_by_ancestor_id,
)
from app.models import UserProjectOrg, OpenAIConversationCreate, OpenAIConversation
from app.utils import APIResponse, mask_string
from app.utils import APIResponse, mask_string, get_openai_client
from app.core.langfuse.langfuse import LangfuseTracer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -126,25 +126,57 @@ def get_additional_data(request: dict) -> dict:


def process_response(
request: ResponsesAPIRequest,
client: OpenAI,
assistant,
tracer: LangfuseTracer,
request_data: dict,
project_id: int,
organization_id: int,
ancestor_id: str,
latest_conversation: OpenAIConversation | None,
):
"""Process a response and send callback with results, with Langfuse tracing."""
# Reconstruct request object from serialized data
request = ResponsesAPIRequest(**request_data)
assistant_id = request.assistant_id

logger.info(
f"[process_response] Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}"
f"[process_response] Starting generating response for assistant_id={mask_string(assistant_id)}, project_id={project_id}"
)

with Session(engine) as session:
assistant = get_assistant_by_id(session, assistant_id, project_id)
if not assistant:
logger.error(
f"[process_response] Assistant not found: assistant_id={mask_string(assistant_id)}, project_id={project_id}"
)
return

client = get_openai_client(session, organization_id, project_id)

langfuse_credentials = get_provider_credential(
session=session,
org_id=organization_id,
provider="langfuse",
project_id=project_id,
)
tracer = LangfuseTracer(
credentials=langfuse_credentials,
response_id=request.response_id,
)
Comment on lines +142 to +161
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Early failures skip callback and tracer; also get_openai_client exceptions are unhandled.

If the assistant is missing or get_openai_client raises HTTPException, the task exits without sending a failure callback or flushing the tracer. Initialize the tracer before early returns and catch HTTPException around client creation; send a failure callback in both cases.

Apply this diff to reorder tracer init, add error callbacks, and handle client init errors:

 with Session(engine) as session:
-        assistant = get_assistant_by_id(session, assistant_id, project_id)
-        if not assistant:
-            logger.error(
-                f"[process_response] Assistant not found: assistant_id={mask_string(assistant_id)}, project_id={project_id}"
-            )
-            return
-
-        client = get_openai_client(session, organization_id, project_id)
-
-        langfuse_credentials = get_provider_credential(
-            session=session,
-            org_id=organization_id,
-            provider="langfuse",
-            project_id=project_id,
-        )
-        tracer = LangfuseTracer(
-            credentials=langfuse_credentials,
-            response_id=request.response_id,
-        )
+        # Init tracer early so we can record/flush even on early failures
+        langfuse_credentials = get_provider_credential(
+            session=session,
+            org_id=organization_id,
+            provider="langfuse",
+            project_id=project_id,
+        )
+        tracer = LangfuseTracer(credentials=langfuse_credentials, response_id=request.response_id)
+
+        assistant = get_assistant_by_id(session, assistant_id, project_id)
+        if not assistant:
+            error_message = "Assistant not found or not active"
+            logger.error(
+                f"[process_response] {error_message}: assistant_id={mask_string(assistant_id)}, project_id={project_id}"
+            )
+            tracer.log_error(error_message, response_id=request.response_id)
+            if request.callback_url:
+                request_dict = request.model_dump()
+                send_callback(
+                    request.callback_url,
+                    {
+                        "success": False,
+                        "data": get_additional_data(request_dict) or None,
+                        "error": error_message,
+                        "metadata": None,
+                    },
+                )
+            tracer.flush()
+            return
+
+        try:
+            client = get_openai_client(session, organization_id, project_id)
+        except HTTPException as he:
+            error_message = getattr(he, "detail", str(he))
+            logger.error(
+                f"[process_response] Failed to configure OpenAI client: {error_message} | project_id={project_id}",
+                exc_info=True,
+            )
+            tracer.log_error(error_message, response_id=request.response_id)
+            if request.callback_url:
+                request_dict = request.model_dump()
+                send_callback(
+                    request.callback_url,
+                    {
+                        "success": False,
+                        "data": get_additional_data(request_dict) or None,
+                        "error": error_message,
+                        "metadata": None,
+                    },
+                )
+            tracer.flush()
+            return

Also applies to: 145-149

🤖 Prompt for AI Agents
backend/app/api/routes/responses.py around lines 142-161: the tracer is created
after possible early returns and get_openai_client can raise HTTPException, so
move LangfuseTracer initialization to before any early-return checks; wrap
get_openai_client call in a try/except catching HTTPException, and in both the
assistant-not-found branch and the client-init exception handler invoke the
failure callback (using the same response_id/context available) and flush/close
the tracer before returning so the failure is recorded and sent.


# Handle ancestor_id and latest conversation logic
ancestor_id = request.response_id
latest_conversation = None
if ancestor_id:
latest_conversation = get_conversation_by_ancestor_id(
session=session,
ancestor_response_id=ancestor_id,
project_id=project_id,
)
if latest_conversation:
ancestor_id = latest_conversation.response_id

tracer.start_trace(
name="generate_response_async",
input={"question": request.question, "assistant_id": request.assistant_id},
input={"question": request.question, "assistant_id": assistant_id},
metadata={"callback_url": request.callback_url},
tags=[request.assistant_id],
tags=[assistant_id],
)

tracer.start_generation(
Expand Down Expand Up @@ -177,7 +209,7 @@ def process_response(
response_chunks = get_file_search_results(response)

logger.info(
f"[process_response] Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
f"[process_response] Successfully generated response: response_id={response.id}, assistant={mask_string(assistant_id)}, project_id={project_id}"
)

tracer.end_generation(
Expand Down Expand Up @@ -223,7 +255,7 @@ def process_response(
user_question=request.question,
response=response.output_text,
model=response.model,
assistant_id=request.assistant_id,
assistant_id=assistant_id,
)

create_conversation(
Expand Down Expand Up @@ -263,7 +295,7 @@ def process_response(

if request.callback_url:
logger.info(
f"[process_response] Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
f"[process_response] Sending callback to URL: {request.callback_url}, assistant={mask_string(assistant_id)}, project_id={project_id}"
)

# Send callback with webhook-specific response format
Expand All @@ -281,7 +313,7 @@ def process_response(
},
)
logger.info(
f"[process_response] Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
f"[process_response] Callback sent successfully, assistant={mask_string(assistant_id)}, project_id={project_id}"
)


Expand Down Expand Up @@ -325,40 +357,11 @@ async def responses(
"metadata": None,
}

client = OpenAI(api_key=credentials["api_key"])

langfuse_credentials = get_provider_credential(
session=_session,
org_id=organization_id,
provider="langfuse",
project_id=project_id,
)
tracer = LangfuseTracer(
credentials=langfuse_credentials,
response_id=request.response_id,
)

ancestor_id = request.response_id
latest_conversation = None
if ancestor_id:
latest_conversation = get_conversation_by_ancestor_id(
session=_session,
ancestor_response_id=ancestor_id,
project_id=project_id,
)
if latest_conversation:
ancestor_id = latest_conversation.response_id

background_tasks.add_task(
process_response,
request,
client,
assistant,
tracer,
request.model_dump(),
project_id,
organization_id,
ancestor_id,
latest_conversation,
)

logger.info(
Expand Down
Loading