From 71b3bc583d67c76b5abbebb3a23d9c6160596483 Mon Sep 17 00:00:00 2001 From: Aviraj <100823015+avirajsingh7@users.noreply.github.com> Date: Fri, 25 Jul 2025 13:11:20 +0530 Subject: [PATCH 1/5] logs for threads and response endpoint --- backend/app/api/routes/responses.py | 32 +++++++------ backend/app/api/routes/threads.py | 72 +++++++++++++++++++++++++---- backend/app/crud/thread_results.py | 11 ++++- 3 files changed, 92 insertions(+), 23 deletions(-) diff --git a/backend/app/api/routes/responses.py b/backend/app/api/routes/responses.py index 130a4839..35d12eac 100644 --- a/backend/app/api/routes/responses.py +++ b/backend/app/api/routes/responses.py @@ -101,7 +101,7 @@ def process_response( ): """Process a response and send callback with results, with Langfuse tracing.""" logger.info( - f"Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}" ) tracer.start_trace( @@ -140,7 +140,7 @@ def process_response( response_chunks = get_file_search_results(response) logger.info( - f"Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" ) tracer.end_generation( @@ -185,7 +185,8 @@ def process_response( except openai.OpenAIError as e: error_message = handle_openai_error(e) logger.error( - f"OpenAI API error during response processing: {error_message}, project_id={project_id}" + f"[process_response] OpenAI API error during response processing: {error_message}, project_id={project_id}", + exc_info=True, ) tracer.log_error(error_message, response_id=request.response_id) callback_response = ResponsesAPIResponse.failure_response(error=error_message) @@ -194,11 +195,11 @@ def process_response( if request.callback_url: logger.info( - f"Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" ) send_callback(request.callback_url, callback_response.model_dump()) logger.info( - f"Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + f"[process_response] Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={project_id}" ) @@ -216,14 +217,10 @@ async def responses( _current_user.organization_id, ) - logger.info( - f"Processing response request for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}" - ) - assistant = get_assistant_by_id(_session, request.assistant_id, project_id) if not assistant: logger.warning( - f"Assistant not found: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}", + f"[response] Assistant not found: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}", ) raise HTTPException(status_code=404, detail="Assistant not found or not active") @@ -235,7 +232,7 @@ async def responses( ) if not credentials or "api_key" not in credentials: logger.error( - f"OpenAI API key not configured for org_id={organization_id}, project_id={project_id}" + f"[response] OpenAI API key not configured for org_id={organization_id}, project_id={project_id}" ) return { "success": False, @@ -267,7 +264,7 @@ async def responses( ) logger.info( - f"Background task scheduled for response processing: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}" + f"[response] Background task scheduled for response processing: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}" ) return { @@ -301,6 +298,9 @@ async def responses_sync( project_id=project_id, ) if not credentials or "api_key" not in credentials: + logger.error( + f"[response_sync] OpenAI API key not configured for org_id={organization_id}, project_id={project_id}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) @@ -370,7 +370,9 @@ async def responses_sync( ) tracer.flush() - + logger.info( + f"[response_sync] Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + ) return ResponsesAPIResponse.success_response( data=_APIResponse( status="success", @@ -387,6 +389,10 @@ async def responses_sync( ) except openai.OpenAIError as e: error_message = handle_openai_error(e) + logger.error( + f"[response_sync] OpenAI API error during response processing: {error_message}, project_id={project_id}", + exc_info=True, + ) tracer.log_error(error_message, response_id=request.response_id) tracer.flush() return ResponsesAPIResponse.failure_response(error=error_message) diff --git a/backend/app/api/routes/threads.py b/backend/app/api/routes/threads.py index 90fe0415..0a11ed38 100644 --- a/backend/app/api/routes/threads.py +++ b/backend/app/api/routes/threads.py @@ -13,7 +13,7 @@ from app.core import logging, settings from app.models import UserOrganization, OpenAIThreadCreate, UserProjectOrg from app.crud import upsert_thread_result, get_thread_result -from app.utils import APIResponse +from app.utils import APIResponse, mask_string from app.crud.credentials import get_provider_credential from app.core.util import configure_openai from app.core.langfuse.langfuse import LangfuseTracer @@ -42,9 +42,10 @@ def send_callback(callback_url: str, data: dict): # session.verify = False response = session.post(callback_url, json=data) response.raise_for_status() + logger.info(f"[send_callback] Callback sent successfully to {callback_url}") return True except requests.RequestException as e: - logger.error(f"Callback failed: {str(e)}", exc_info=True) + logger.error(f"[send_callback] Callback failed: {str(e)}", exc_info=True) return False @@ -84,6 +85,10 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]: ) return True, None except openai.OpenAIError as e: + logger.error( + f"[setup_thread] Failed to add message to existing thread {mask_string(thread_id)}: {str(e)}", + exc_info=True, + ) return False, handle_openai_error(e) else: try: @@ -94,6 +99,9 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]: request["thread_id"] = thread.id return True, None except openai.OpenAIError as e: + logger.error( + f"[setup_thread] Failed to create new thread: {str(e)}", exc_info=True + ) return False, handle_openai_error(e) @@ -156,6 +164,9 @@ def process_run_core( ) try: + logger.info( + f"[process_run_core] Starting run for thread ID: {mask_string(request.get('thread_id'))} with assistant ID: {mask_string(request.get('assistant_id'))}" + ) run = client.beta.threads.runs.create_and_poll( thread_id=request["thread_id"], assistant_id=request["assistant_id"], @@ -189,15 +200,25 @@ def process_run_core( "model": run.model, } request = {**request, **{"diagnostics": diagnostics}} + logger.info( + f"[process_run_core] Run completed successfully for thread ID: {mask_string(request.get('thread_id'))}" + ) return create_success_response(request, message).model_dump(), None else: error_msg = f"Run failed with status: {run.status}" + logger.warning( + f"[process_run_core] Run failed with error: {run.last_error} for thread ID: {mask_string(request.get('thread_id'))}" + ) tracer.log_error(error_msg) return APIResponse.failure_response(error=error_msg).model_dump(), error_msg except openai.OpenAIError as e: error_msg = handle_openai_error(e) tracer.log_error(error_msg) + logger.error( + f"[process_run_core] OpenAI error: {error_msg} for thread ID: {mask_string(request.get('thread_id'))}", + exc_info=True, + ) return APIResponse.failure_response(error=error_msg).model_dump(), error_msg finally: tracer.flush() @@ -214,9 +235,12 @@ def poll_run_and_prepare_response(request: dict, client: OpenAI, db: Session): thread_id = request["thread_id"] prompt = request["question"] + logger.info( + f"[poll_run_and_prepare_response] Starting run for thread ID: {mask_string(thread_id)}" + ) + try: run = run_and_poll_thread(client, thread_id, request["assistant_id"]) - status = run.status or "unknown" response = None error = None @@ -225,11 +249,18 @@ def poll_run_and_prepare_response(request: dict, client: OpenAI, db: Session): response = extract_response_from_thread( client, thread_id, request.get("remove_citation", False) ) + logger.info( + f"[poll_run_and_prepare_response] Successfully executed run for thread ID: {mask_string(thread_id)}" + ) except openai.OpenAIError as e: status = "failed" error = str(e) response = None + logger.error( + f"[poll_run_and_prepare_response] Run failed for thread ID {mask_string(thread_id)}: {error}", + exc_info=True, + ) upsert_thread_result( db, @@ -259,6 +290,9 @@ async def threads( ) client, success = configure_openai(credentials) if not success: + logger.warning( + f"[threads] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) @@ -273,10 +307,16 @@ async def threads( # Validate thread is_valid, error_message = validate_thread(client, request.get("thread_id")) if not is_valid: + logger.error( + f"[threads] Error processing thread ID {mask_string(request.get('thread_id'))}: {error_message} | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" + ) raise Exception(error_message) # Setup thread is_success, error_message = setup_thread(client, request) if not is_success: + logger.error( + f"[threads] Error setting up thread ID {mask_string(request.get('thread_id'))}: {error_message} | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" + ) raise Exception(error_message) # Send immediate response @@ -304,7 +344,9 @@ async def threads( ) # Schedule background task background_tasks.add_task(process_run, request, client, tracer) - + logger.info( + f"[threads] Background task scheduled for thread ID: {mask_string(request.get('thread_id'))} | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" + ) return initial_response @@ -325,6 +367,9 @@ async def threads_sync( # Configure OpenAI client client, success = configure_openai(credentials) if not success: + logger.error( + f"[threads_sync] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) @@ -340,6 +385,9 @@ async def threads_sync( # Validate thread is_valid, error_message = validate_thread(client, request.get("thread_id")) if not is_valid: + logger.error( + f"[threads_sync] Error processing thread ID {mask_string(request.get('thread_id'))}: {error_message}" + ) raise Exception(error_message) # Setup thread is_success, error_message = setup_thread(client, request) @@ -360,11 +408,8 @@ async def threads_sync( metadata={"thread_id": request.get("thread_id")}, ) - try: - response, error_message = process_run_core(request, client, tracer) - return response - finally: - tracer.flush() + response, error_message = process_run_core(request, client, tracer) + return response @router.post("/threads/start") @@ -389,6 +434,9 @@ async def start_thread( # Configure OpenAI client client, success = configure_openai(credentials) if not success: + logger.error( + f"[start_thread] OpenAI API key not configured for this organization. | project_id: {_current_user.project_id}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) @@ -412,6 +460,9 @@ async def start_thread( background_tasks.add_task(poll_run_and_prepare_response, request, client, db) + logger.info( + f"[start_thread] Background task scheduled to process response for thread ID: {mask_string(thread_id)} | project_id: {_current_user.project_id}" + ) return APIResponse.success_response( data={ "thread_id": thread_id, @@ -434,6 +485,9 @@ async def get_thread( result = get_thread_result(db, thread_id) if not result: + logger.warning( + f"[get_thread] Thread result not found for ID: {mask_string(thread_id)} | org_id: {_current_user.organization_id}" + ) raise HTTPException(404, "thread not found") status = result.status or ("success" if result.response else "processing") diff --git a/backend/app/crud/thread_results.py b/backend/app/crud/thread_results.py index cd72ef18..a5064e09 100644 --- a/backend/app/crud/thread_results.py +++ b/backend/app/crud/thread_results.py @@ -1,6 +1,10 @@ +import logging from sqlmodel import Session, select from datetime import datetime from app.models import OpenAIThreadCreate, OpenAI_Thread +from app.utils import mask_string + +logger = logging.getLogger(__name__) def upsert_thread_result(session: Session, data: OpenAIThreadCreate): @@ -13,10 +17,15 @@ def upsert_thread_result(session: Session, data: OpenAIThreadCreate): existing.status = data.status existing.error = data.error existing.updated_at = datetime.utcnow() + logger.info( + f"[upsert_thread_result] Updated existing thread result with ID: {mask_string(data.thread_id)}" + ) else: new_thread = OpenAI_Thread(**data.dict()) session.add(new_thread) - + logger.info( + f"[upsert_thread_result] Created new thread result with ID: {mask_string(data.thread_id)}" + ) session.commit() From 132f8f164ae8eeffb4f5ef56e1856297c5ff7e33 Mon Sep 17 00:00:00 2001 From: Aviraj <100823015+avirajsingh7@users.noreply.github.com> Date: Wed, 30 Jul 2025 12:40:13 +0530 Subject: [PATCH 2/5] refactor logs level --- backend/app/api/routes/responses.py | 2 +- backend/app/api/routes/threads.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/app/api/routes/responses.py b/backend/app/api/routes/responses.py index 35d12eac..9a5098a1 100644 --- a/backend/app/api/routes/responses.py +++ b/backend/app/api/routes/responses.py @@ -371,7 +371,7 @@ async def responses_sync( tracer.flush() logger.info( - f"[response_sync] Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" + f"[response_sync] Successfully generated response: response_id={response.id}, project_id={project_id}" ) return ResponsesAPIResponse.success_response( data=_APIResponse( diff --git a/backend/app/api/routes/threads.py b/backend/app/api/routes/threads.py index 0a11ed38..8a8b639a 100644 --- a/backend/app/api/routes/threads.py +++ b/backend/app/api/routes/threads.py @@ -206,7 +206,7 @@ def process_run_core( return create_success_response(request, message).model_dump(), None else: error_msg = f"Run failed with status: {run.status}" - logger.warning( + logger.error( f"[process_run_core] Run failed with error: {run.last_error} for thread ID: {mask_string(request.get('thread_id'))}" ) tracer.log_error(error_msg) From 9194bd492cd6d04b0869b296543c0dc2f25d40df Mon Sep 17 00:00:00 2001 From: Aviraj <100823015+avirajsingh7@users.noreply.github.com> Date: Thu, 31 Jul 2025 13:50:53 +0530 Subject: [PATCH 3/5] Add logs for openAI call --- backend/app/api/routes/threads.py | 12 +++++++++++- backend/app/crud/thread_results.py | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/backend/app/api/routes/threads.py b/backend/app/api/routes/threads.py index 8a8b639a..2b920ff4 100644 --- a/backend/app/api/routes/threads.py +++ b/backend/app/api/routes/threads.py @@ -71,7 +71,11 @@ def validate_thread(client: OpenAI, thread_id: str) -> tuple[bool, str]: f"There is an active run on this thread (status: {latest_run.status}). Please wait for it to complete.", ) return True, None - except openai.OpenAIError: + except openai.OpenAIError as e: + logger.error( + f"[validate_thread] Failed to validate thread ID {mask_string(thread_id)}: {str(e)}", + exc_info=True, + ) return False, f"Invalid thread ID provided {thread_id}" @@ -83,6 +87,9 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]: client.beta.threads.messages.create( thread_id=thread_id, role="user", content=request["question"] ) + logger.info( + f"[setup_thread] Added message to existing thread {mask_string(thread_id)}" + ) return True, None except openai.OpenAIError as e: logger.error( @@ -97,6 +104,9 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]: thread_id=thread.id, role="user", content=request["question"] ) request["thread_id"] = thread.id + logger.info( + f"[setup_thread] Created new thread with ID: {mask_string(thread.id)}" + ) return True, None except openai.OpenAIError as e: logger.error( diff --git a/backend/app/crud/thread_results.py b/backend/app/crud/thread_results.py index a5064e09..33515642 100644 --- a/backend/app/crud/thread_results.py +++ b/backend/app/crud/thread_results.py @@ -24,7 +24,7 @@ def upsert_thread_result(session: Session, data: OpenAIThreadCreate): new_thread = OpenAI_Thread(**data.dict()) session.add(new_thread) logger.info( - f"[upsert_thread_result] Created new thread result with ID: {mask_string(data.thread_id)}" + f"[upsert_thread_result] Created new thread result with ID: {mask_string(new_thread.thread_id)}" ) session.commit() From 0c481a10bb534b3793e89d1252b730fdc5644dd5 Mon Sep 17 00:00:00 2001 From: Aviraj <100823015+avirajsingh7@users.noreply.github.com> Date: Thu, 31 Jul 2025 13:57:26 +0530 Subject: [PATCH 4/5] fix loging --- backend/app/api/routes/threads.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/backend/app/api/routes/threads.py b/backend/app/api/routes/threads.py index 2b920ff4..95630bfb 100644 --- a/backend/app/api/routes/threads.py +++ b/backend/app/api/routes/threads.py @@ -66,6 +66,9 @@ def validate_thread(client: OpenAI, thread_id: str) -> tuple[bool, str]: if runs.data and len(runs.data) > 0: latest_run = runs.data[0] if latest_run.status in ["queued", "in_progress", "requires_action"]: + logger.error( + f"[validate_thread] Thread ID {mask_string(thread_id)} is currently {latest_run.status}." + ) return ( False, f"There is an active run on this thread (status: {latest_run.status}). Please wait for it to complete.", @@ -300,7 +303,7 @@ async def threads( ) client, success = configure_openai(credentials) if not success: - logger.warning( + logger.error( f"[threads] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" ) return APIResponse.failure_response( @@ -317,16 +320,10 @@ async def threads( # Validate thread is_valid, error_message = validate_thread(client, request.get("thread_id")) if not is_valid: - logger.error( - f"[threads] Error processing thread ID {mask_string(request.get('thread_id'))}: {error_message} | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" - ) raise Exception(error_message) # Setup thread is_success, error_message = setup_thread(client, request) if not is_success: - logger.error( - f"[threads] Error setting up thread ID {mask_string(request.get('thread_id'))}: {error_message} | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}" - ) raise Exception(error_message) # Send immediate response @@ -395,10 +392,8 @@ async def threads_sync( # Validate thread is_valid, error_message = validate_thread(client, request.get("thread_id")) if not is_valid: - logger.error( - f"[threads_sync] Error processing thread ID {mask_string(request.get('thread_id'))}: {error_message}" - ) raise Exception(error_message) + # Setup thread is_success, error_message = setup_thread(client, request) if not is_success: @@ -495,7 +490,7 @@ async def get_thread( result = get_thread_result(db, thread_id) if not result: - logger.warning( + logger.error( f"[get_thread] Thread result not found for ID: {mask_string(thread_id)} | org_id: {_current_user.organization_id}" ) raise HTTPException(404, "thread not found") From d8c5d010ad10b9c0cab61a4bdbebdad3f28de741 Mon Sep 17 00:00:00 2001 From: Aviraj <100823015+avirajsingh7@users.noreply.github.com> Date: Tue, 5 Aug 2025 11:12:49 +0530 Subject: [PATCH 5/5] refactor --- backend/app/crud/thread_results.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/app/crud/thread_results.py b/backend/app/crud/thread_results.py index 33515642..7a2691ff 100644 --- a/backend/app/crud/thread_results.py +++ b/backend/app/crud/thread_results.py @@ -18,13 +18,13 @@ def upsert_thread_result(session: Session, data: OpenAIThreadCreate): existing.error = data.error existing.updated_at = datetime.utcnow() logger.info( - f"[upsert_thread_result] Updated existing thread result with ID: {mask_string(data.thread_id)}" + f"[upsert_thread_result] Updated existing thread result in the db with ID: {mask_string(data.thread_id)}" ) else: new_thread = OpenAI_Thread(**data.dict()) session.add(new_thread) logger.info( - f"[upsert_thread_result] Created new thread result with ID: {mask_string(new_thread.thread_id)}" + f"[upsert_thread_result] Created new thread result in the db with ID: {mask_string(new_thread.thread_id)}" ) session.commit()