Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 38 additions & 32 deletions backend/app/api/routes/responses.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import logging
import uuid
from typing import Any, Dict, Optional
from typing import Optional

import openai
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from openai import OpenAI
from pydantic import BaseModel, Extra
from sqlmodel import Session

from app.api.deps import get_current_user_org, get_db
from app.api.deps import get_db, get_current_user_org_project
from app.api.routes.threads import send_callback
from app.crud.assistants import get_assistant_by_id
from app.crud.credentials import get_provider_credential
from app.models import UserOrganization
from app.models import UserProjectOrg
from app.utils import APIResponse, mask_string
from app.core.langfuse.langfuse import LangfuseTracer

Expand All @@ -28,7 +27,6 @@


class ResponsesAPIRequest(BaseModel):
project_id: int
assistant_id: str
question: str
callback_url: Optional[str] = None
Expand All @@ -39,7 +37,6 @@


class ResponsesSyncAPIRequest(BaseModel):
project_id: int
model: str
instructions: str
vector_store_ids: list[str]
Expand Down Expand Up @@ -91,8 +88,7 @@
return {
k: v
for k, v in request.items()
if k
not in {"project_id", "assistant_id", "callback_url", "response_id", "question"}
if k not in {"assistant_id", "callback_url", "response_id", "question"}
}


Expand All @@ -101,10 +97,11 @@
client: OpenAI,
assistant,
tracer: LangfuseTracer,
project_id: int,
):
"""Process a response and send callback with results, with Langfuse tracing."""
logger.info(
f"Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={request.project_id}"
f"Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}"
)

tracer.start_trace(
Expand Down Expand Up @@ -143,7 +140,7 @@
response_chunks = get_file_search_results(response)

logger.info(
f"Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={request.project_id}"
f"Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
)

tracer.end_generation(
Expand Down Expand Up @@ -188,7 +185,7 @@
except openai.OpenAIError as e:
error_message = handle_openai_error(e)
logger.error(
f"OpenAI API error during response processing: {error_message}, project_id={request.project_id}"
f"OpenAI API error during response processing: {error_message}, project_id={project_id}"
)
tracer.log_error(error_message, response_id=request.response_id)
callback_response = ResponsesAPIResponse.failure_response(error=error_message)
Expand All @@ -197,11 +194,11 @@

if request.callback_url:
logger.info(
f"Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={request.project_id}"
f"Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
)
send_callback(request.callback_url, callback_response.model_dump())
logger.info(
f"Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={request.project_id}"
f"Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
)


Expand All @@ -210,31 +207,35 @@
request: ResponsesAPIRequest,
background_tasks: BackgroundTasks,
_session: Session = Depends(get_db),
_current_user: UserOrganization = Depends(get_current_user_org),
_current_user: UserProjectOrg = Depends(get_current_user_org_project),
):
"""Asynchronous endpoint that processes requests in background with Langfuse tracing."""
logger.info(
f"Processing response request for assistant_id={mask_string(request.assistant_id)}, project_id={request.project_id}, organization_id={_current_user.organization_id}"

project_id, organization_id = (
_current_user.project_id,
_current_user.organization_id,
)

assistant = get_assistant_by_id(
_session, request.assistant_id, _current_user.organization_id
logger.info(
f"Processing response request for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}"
)

assistant = get_assistant_by_id(_session, request.assistant_id, organization_id)
if not assistant:
logger.warning(
f"Assistant not found: assistant_id={mask_string(request.assistant_id)}, project_id={request.project_id}, organization_id={_current_user.organization_id}",
f"Assistant not found: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}",
)
raise HTTPException(status_code=404, detail="Assistant not found or not active")

credentials = get_provider_credential(
session=_session,
org_id=_current_user.organization_id,
org_id=organization_id,
provider="openai",
project_id=request.project_id,
project_id=project_id,
)
if not credentials or "api_key" not in credentials:
logger.error(
f"OpenAI API key not configured for org_id={_current_user.organization_id}, project_id={request.project_id}"
f"OpenAI API key not configured for org_id={organization_id}, project_id={project_id}"
)
return {
"success": False,
Expand All @@ -247,9 +248,9 @@

langfuse_credentials = get_provider_credential(
session=_session,
org_id=_current_user.organization_id,
org_id=organization_id,
provider="langfuse",
project_id=request.project_id,
project_id=project_id,
)
tracer = LangfuseTracer(
credentials=langfuse_credentials,
Expand All @@ -262,10 +263,11 @@
client,
assistant,
tracer,
project_id,
)

logger.info(
f"Background task scheduled for response processing: assistant_id={mask_string(request.assistant_id)}, project_id={request.project_id}, organization_id={_current_user.organization_id}"
f"Background task scheduled for response processing: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}"
)

return {
Expand All @@ -284,14 +286,19 @@
async def responses_sync(
request: ResponsesSyncAPIRequest,
_session: Session = Depends(get_db),
_current_user: UserOrganization = Depends(get_current_user_org),
_current_user: UserProjectOrg = Depends(get_current_user_org_project),
):
"""Synchronous endpoint for benchmarking OpenAI responses API with Langfuse tracing."""
project_id, organization_id = (

Check warning on line 292 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L292

Added line #L292 was not covered by tests
_current_user.project_id,
_current_user.organization_id,
)

credentials = get_provider_credential(
session=_session,
org_id=_current_user.organization_id,
org_id=organization_id,
provider="openai",
project_id=request.project_id,
project_id=project_id,
)
if not credentials or "api_key" not in credentials:
return APIResponse.failure_response(
Expand All @@ -302,18 +309,17 @@

langfuse_credentials = get_provider_credential(
session=_session,
org_id=_current_user.organization_id,
org_id=organization_id,
provider="langfuse",
project_id=request.project_id,
project_id=project_id,
)
tracer = LangfuseTracer(
credentials=langfuse_credentials,
response_id=request.response_id,
)

tracer.start_trace(
name="generate_response_sync",
input={"question": request.question},
name="generate_response_sync", input={"question": request.question}
)
tracer.start_generation(
name="openai_response",
Expand Down
2 changes: 0 additions & 2 deletions backend/app/tests/api/routes/test_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def test_responses_endpoint_success(

headers = {"X-API-KEY": original_api_key}
request_data = {
"project_id": glific_project.id,
"assistant_id": "assistant_123",
"question": "What is Glific?",
"callback_url": "http://example.com/callback",
Expand Down Expand Up @@ -118,7 +117,6 @@ def test_responses_endpoint_without_vector_store(

headers = {"X-API-KEY": original_api_key}
request_data = {
"project_id": glific_project.id,
"assistant_id": "assistant_123",
"question": "What is Glific?",
"callback_url": "http://example.com/callback",
Expand Down