diff --git a/backend/app/api/routes/responses.py b/backend/app/api/routes/responses.py index 94e5f19db..c221d0e85 100644 --- a/backend/app/api/routes/responses.py +++ b/backend/app/api/routes/responses.py @@ -18,7 +18,7 @@ get_conversation_by_ancestor_id, ) from app.models import UserProjectOrg, OpenAIConversationCreate, OpenAIConversation -from app.utils import APIResponse, mask_string +from app.utils import APIResponse, mask_string, get_openai_client from app.core.langfuse.langfuse import LangfuseTracer logger = logging.getLogger(__name__) @@ -126,25 +126,57 @@ def get_additional_data(request: dict) -> dict: def process_response( - request: ResponsesAPIRequest, - client: OpenAI, - assistant, - tracer: LangfuseTracer, + request_data: dict, project_id: int, organization_id: int, - ancestor_id: str, - latest_conversation: OpenAIConversation | None, ): """Process a response and send callback with results, with Langfuse tracing.""" + # Reconstruct request object from serialized data + request = ResponsesAPIRequest(**request_data) + assistant_id = request.assistant_id + logger.info( - f"[process_response] Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Starting generating response for assistant_id={mask_string(assistant_id)}, project_id={project_id}" ) + with Session(engine) as session: + assistant = get_assistant_by_id(session, assistant_id, project_id) + if not assistant: + logger.error( + f"[process_response] Assistant not found: assistant_id={mask_string(assistant_id)}, project_id={project_id}" + ) + return + + client = get_openai_client(session, organization_id, project_id) + + langfuse_credentials = get_provider_credential( + session=session, + org_id=organization_id, + provider="langfuse", + project_id=project_id, + ) + tracer = LangfuseTracer( + credentials=langfuse_credentials, + response_id=request.response_id, + ) + + # Handle ancestor_id and latest conversation logic + ancestor_id = request.response_id + latest_conversation = None + if ancestor_id: + latest_conversation = get_conversation_by_ancestor_id( + session=session, + ancestor_response_id=ancestor_id, + project_id=project_id, + ) + if latest_conversation: + ancestor_id = latest_conversation.response_id + tracer.start_trace( name="generate_response_async", - input={"question": request.question, "assistant_id": request.assistant_id}, + input={"question": request.question, "assistant_id": assistant_id}, metadata={"callback_url": request.callback_url}, - tags=[request.assistant_id], + tags=[assistant_id], ) tracer.start_generation( @@ -177,7 +209,7 @@ def process_response( response_chunks = get_file_search_results(response) logger.info( - f"[process_response] Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Successfully generated response: response_id={response.id}, assistant={mask_string(assistant_id)}, project_id={project_id}" ) tracer.end_generation( @@ -223,7 +255,7 @@ def process_response( user_question=request.question, response=response.output_text, model=response.model, - assistant_id=request.assistant_id, + assistant_id=assistant_id, ) create_conversation( @@ -263,7 +295,7 @@ def process_response( if request.callback_url: logger.info( - f"[process_response] Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Sending callback to URL: {request.callback_url}, assistant={mask_string(assistant_id)}, project_id={project_id}" ) # Send callback with webhook-specific response format @@ -281,7 +313,7 @@ def process_response( }, ) logger.info( - f"[process_response] Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Callback sent successfully, assistant={mask_string(assistant_id)}, project_id={project_id}" ) @@ -325,40 +357,11 @@ async def responses( "metadata": None, } - client = OpenAI(api_key=credentials["api_key"]) - - langfuse_credentials = get_provider_credential( - session=_session, - org_id=organization_id, - provider="langfuse", - project_id=project_id, - ) - tracer = LangfuseTracer( - credentials=langfuse_credentials, - response_id=request.response_id, - ) - - ancestor_id = request.response_id - latest_conversation = None - if ancestor_id: - latest_conversation = get_conversation_by_ancestor_id( - session=_session, - ancestor_response_id=ancestor_id, - project_id=project_id, - ) - if latest_conversation: - ancestor_id = latest_conversation.response_id - background_tasks.add_task( process_response, - request, - client, - assistant, - tracer, + request.model_dump(), project_id, organization_id, - ancestor_id, - latest_conversation, ) logger.info(