diff --git a/backend/app/api/routes/responses.py b/backend/app/api/routes/responses.py index 01361803..ded6ab0e 100644 --- a/backend/app/api/routes/responses.py +++ b/backend/app/api/routes/responses.py @@ -120,7 +120,7 @@ def process_response( ): """Process a response and send callback with results, with Langfuse tracing.""" logger.info( - f"Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}" ) tracer.start_trace( @@ -171,7 +171,7 @@ def process_response( response_chunks = get_file_search_results(response) logger.info( - f"Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" ) tracer.end_generation( @@ -245,7 +245,8 @@ def process_response( except openai.OpenAIError as e: error_message = handle_openai_error(e) logger.error( - f"OpenAI API error during response processing: {error_message}, project_id={project_id}" + f"[process_response] OpenAI API error during response processing: {error_message}, project_id={project_id}", + exc_info=True, ) tracer.log_error(error_message, response_id=request.response_id) callback_response = ResponsesAPIResponse.failure_response(error=error_message) @@ -254,11 +255,11 @@ def process_response( if request.callback_url: logger.info( - f"Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" ) send_callback(request.callback_url, callback_response.model_dump()) logger.info( - f"Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={project_id}" ) @@ -276,14 +277,10 @@ async def responses( _current_user.organization_id, ) - logger.info( - f"Processing response request for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}" - ) - assistant = get_assistant_by_id(_session, request.assistant_id, project_id) if not assistant: logger.warning( - f"Assistant not found: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}", + f"[response] Assistant not found: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}", ) raise HTTPException(status_code=404, detail="Assistant not found or not active") @@ -295,7 +292,7 @@ async def responses( ) if not credentials or "api_key" not in credentials: logger.error( - f"OpenAI API key not configured for org_id={organization_id}, project_id={project_id}" + f"[response] OpenAI API key not configured for org_id={organization_id}, project_id={project_id}" ) return { "success": False, @@ -329,7 +326,7 @@ async def responses( ) logger.info( - f"Background task scheduled for response processing: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}" + f"[response] Background task scheduled for response processing: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}" ) return { @@ -363,6 +360,9 @@ async def responses_sync( project_id=project_id, ) if not credentials or "api_key" not in credentials: + logger.error( + f"[response_sync] OpenAI API key not configured for org_id={organization_id}, project_id={project_id}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) @@ -432,7 +432,9 @@ async def responses_sync( ) tracer.flush() - + logger.info( + f"[response_sync] Successfully generated response: response_id={response.id}, project_id={project_id}" + ) return ResponsesAPIResponse.success_response( data=_APIResponse( status="success", @@ -449,6 +451,10 @@ async def responses_sync( ) except openai.OpenAIError as e: error_message = handle_openai_error(e) + logger.error( + f"[response_sync] OpenAI API error during response processing: {error_message}, project_id={project_id}", + exc_info=True, + ) tracer.log_error(error_message, response_id=request.response_id) tracer.flush() return ResponsesAPIResponse.failure_response(error=error_message) diff --git a/backend/app/api/routes/threads.py b/backend/app/api/routes/threads.py index 90fe0415..95630bfb 100644 --- a/backend/app/api/routes/threads.py +++ b/backend/app/api/routes/threads.py @@ -13,7 +13,7 @@ from app.core import logging, settings from app.models import UserOrganization, OpenAIThreadCreate, UserProjectOrg from app.crud import upsert_thread_result, get_thread_result -from app.utils import APIResponse +from app.utils import APIResponse, mask_string from app.crud.credentials import get_provider_credential from app.core.util import configure_openai from app.core.langfuse.langfuse import LangfuseTracer @@ -42,9 +42,10 @@ def send_callback(callback_url: str, data: dict): # session.verify = False response = session.post(callback_url, json=data) response.raise_for_status() + logger.info(f"[send_callback] Callback sent successfully to {callback_url}") return True except requests.RequestException as e: - logger.error(f"Callback failed: {str(e)}", exc_info=True) + logger.error(f"[send_callback] Callback failed: {str(e)}", exc_info=True) return False @@ -65,12 +66,19 @@ def validate_thread(client: OpenAI, thread_id: str) -> tuple[bool, str]: if runs.data and len(runs.data) > 0: latest_run = runs.data[0] if latest_run.status in ["queued", "in_progress", "requires_action"]: + logger.error( + f"[validate_thread] Thread ID {mask_string(thread_id)} is currently {latest_run.status}." + ) return ( False, f"There is an active run on this thread (status: {latest_run.status}). Please wait for it to complete.", ) return True, None - except openai.OpenAIError: + except openai.OpenAIError as e: + logger.error( + f"[validate_thread] Failed to validate thread ID {mask_string(thread_id)}: {str(e)}", + exc_info=True, + ) return False, f"Invalid thread ID provided {thread_id}" @@ -82,8 +90,15 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]: client.beta.threads.messages.create( thread_id=thread_id, role="user", content=request["question"] ) + logger.info( + f"[setup_thread] Added message to existing thread {mask_string(thread_id)}" + ) return True, None except openai.OpenAIError as e: + logger.error( + f"[setup_thread] Failed to add message to existing thread {mask_string(thread_id)}: {str(e)}", + exc_info=True, + ) return False, handle_openai_error(e) else: try: @@ -92,8 +107,14 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]: thread_id=thread.id, role="user", content=request["question"] ) request["thread_id"] = thread.id + logger.info( + f"[setup_thread] Created new thread with ID: {mask_string(thread.id)}" + ) return True, None except openai.OpenAIError as e: + logger.error( + f"[setup_thread] Failed to create new thread: {str(e)}", exc_info=True + ) return False, handle_openai_error(e) @@ -156,6 +177,9 @@ def process_run_core( ) try: + logger.info( + f"[process_run_core] Starting run for thread ID: {mask_string(request.get('thread_id'))} with assistant ID: {mask_string(request.get('assistant_id'))}" + ) run = client.beta.threads.runs.create_and_poll( thread_id=request["thread_id"], assistant_id=request["assistant_id"], @@ -189,15 +213,25 @@ def process_run_core( "model": run.model, } request = {**request, **{"diagnostics": diagnostics}} + logger.info( + f"[process_run_core] Run completed successfully for thread ID: {mask_string(request.get('thread_id'))}" + ) return create_success_response(request, message).model_dump(), None else: error_msg = f"Run failed with status: {run.status}" + logger.error( + f"[process_run_core] Run failed with error: {run.last_error} for thread ID: {mask_string(request.get('thread_id'))}" + ) tracer.log_error(error_msg) return APIResponse.failure_response(error=error_msg).model_dump(), error_msg except openai.OpenAIError as e: error_msg = handle_openai_error(e) tracer.log_error(error_msg) + logger.error( + f"[process_run_core] OpenAI error: {error_msg} for thread ID: {mask_string(request.get('thread_id'))}", + exc_info=True, + ) return APIResponse.failure_response(error=error_msg).model_dump(), error_msg finally: tracer.flush() @@ -214,9 +248,12 @@ def poll_run_and_prepare_response(request: dict, client: OpenAI, db: Session): thread_id = request["thread_id"] prompt = request["question"] + logger.info( + f"[poll_run_and_prepare_response] Starting run for thread ID: {mask_string(thread_id)}" + ) + try: run = run_and_poll_thread(client, thread_id, request["assistant_id"]) - status = run.status or "unknown" response = None error = None @@ -225,11 +262,18 @@ def poll_run_and_prepare_response(request: dict, client: OpenAI, db: Session): response = extract_response_from_thread( client, thread_id, request.get("remove_citation", False) ) + logger.info( + f"[poll_run_and_prepare_response] Successfully executed run for thread ID: {mask_string(thread_id)}" + ) except openai.OpenAIError as e: status = "failed" error = str(e) response = None + logger.error( + f"[poll_run_and_prepare_response] Run failed for thread ID {mask_string(thread_id)}: {error}", + exc_info=True, + ) upsert_thread_result( db, @@ -259,6 +303,9 @@ async def threads( ) client, success = configure_openai(credentials) if not success: + logger.error( + f"[threads] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) @@ -304,7 +351,9 @@ async def threads( ) # Schedule background task background_tasks.add_task(process_run, request, client, tracer) - + logger.info( + f"[threads] Background task scheduled for thread ID: {mask_string(request.get('thread_id'))} | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" + ) return initial_response @@ -325,6 +374,9 @@ async def threads_sync( # Configure OpenAI client client, success = configure_openai(credentials) if not success: + logger.error( + f"[threads_sync] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) @@ -341,6 +393,7 @@ async def threads_sync( is_valid, error_message = validate_thread(client, request.get("thread_id")) if not is_valid: raise Exception(error_message) + # Setup thread is_success, error_message = setup_thread(client, request) if not is_success: @@ -360,11 +413,8 @@ async def threads_sync( metadata={"thread_id": request.get("thread_id")}, ) - try: - response, error_message = process_run_core(request, client, tracer) - return response - finally: - tracer.flush() + response, error_message = process_run_core(request, client, tracer) + return response @router.post("/threads/start") @@ -389,6 +439,9 @@ async def start_thread( # Configure OpenAI client client, success = configure_openai(credentials) if not success: + logger.error( + f"[start_thread] OpenAI API key not configured for this organization. | project_id: {_current_user.project_id}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) @@ -412,6 +465,9 @@ async def start_thread( background_tasks.add_task(poll_run_and_prepare_response, request, client, db) + logger.info( + f"[start_thread] Background task scheduled to process response for thread ID: {mask_string(thread_id)} | project_id: {_current_user.project_id}" + ) return APIResponse.success_response( data={ "thread_id": thread_id, @@ -434,6 +490,9 @@ async def get_thread( result = get_thread_result(db, thread_id) if not result: + logger.error( + f"[get_thread] Thread result not found for ID: {mask_string(thread_id)} | org_id: {_current_user.organization_id}" + ) raise HTTPException(404, "thread not found") status = result.status or ("success" if result.response else "processing") diff --git a/backend/app/crud/thread_results.py b/backend/app/crud/thread_results.py index cd72ef18..7a2691ff 100644 --- a/backend/app/crud/thread_results.py +++ b/backend/app/crud/thread_results.py @@ -1,6 +1,10 @@ +import logging from sqlmodel import Session, select from datetime import datetime from app.models import OpenAIThreadCreate, OpenAI_Thread +from app.utils import mask_string + +logger = logging.getLogger(__name__) def upsert_thread_result(session: Session, data: OpenAIThreadCreate): @@ -13,10 +17,15 @@ def upsert_thread_result(session: Session, data: OpenAIThreadCreate): existing.status = data.status existing.error = data.error existing.updated_at = datetime.utcnow() + logger.info( + f"[upsert_thread_result] Updated existing thread result in the db with ID: {mask_string(data.thread_id)}" + ) else: new_thread = OpenAI_Thread(**data.dict()) session.add(new_thread) - + logger.info( + f"[upsert_thread_result] Created new thread result in the db with ID: {mask_string(new_thread.thread_id)}" + ) session.commit()