Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions backend/app/api/routes/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def process_response(
):
"""Process a response and send callback with results, with Langfuse tracing."""
logger.info(
f"Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}"
f"[process_response] Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}"
)

tracer.start_trace(
Expand Down Expand Up @@ -171,7 +171,7 @@ def process_response(
response_chunks = get_file_search_results(response)

logger.info(
f"Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
f"[process_response] Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
)

tracer.end_generation(
Expand Down Expand Up @@ -245,7 +245,8 @@ def process_response(
except openai.OpenAIError as e:
error_message = handle_openai_error(e)
logger.error(
f"OpenAI API error during response processing: {error_message}, project_id={project_id}"
f"[process_response] OpenAI API error during response processing: {error_message}, project_id={project_id}",
exc_info=True,
)
tracer.log_error(error_message, response_id=request.response_id)
callback_response = ResponsesAPIResponse.failure_response(error=error_message)
Expand All @@ -254,11 +255,11 @@ def process_response(

if request.callback_url:
logger.info(
f"Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
f"[process_response] Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
)
send_callback(request.callback_url, callback_response.model_dump())
logger.info(
f"Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
f"[process_response] Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={project_id}"
)


Expand All @@ -276,14 +277,10 @@ async def responses(
_current_user.organization_id,
)

logger.info(
f"Processing response request for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}"
)

assistant = get_assistant_by_id(_session, request.assistant_id, project_id)
if not assistant:
logger.warning(
f"Assistant not found: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}",
f"[response] Assistant not found: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}",
)
raise HTTPException(status_code=404, detail="Assistant not found or not active")

Expand All @@ -295,7 +292,7 @@ async def responses(
)
if not credentials or "api_key" not in credentials:
logger.error(
f"OpenAI API key not configured for org_id={organization_id}, project_id={project_id}"
f"[response] OpenAI API key not configured for org_id={organization_id}, project_id={project_id}"
)
return {
"success": False,
Expand Down Expand Up @@ -329,7 +326,7 @@ async def responses(
)

logger.info(
f"Background task scheduled for response processing: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}"
f"[response] Background task scheduled for response processing: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}"
)

return {
Expand Down Expand Up @@ -363,6 +360,9 @@ async def responses_sync(
project_id=project_id,
)
if not credentials or "api_key" not in credentials:
logger.error(
f"[response_sync] OpenAI API key not configured for org_id={organization_id}, project_id={project_id}"
)
return APIResponse.failure_response(
error="OpenAI API key not configured for this organization."
)
Expand Down Expand Up @@ -432,7 +432,9 @@ async def responses_sync(
)

tracer.flush()

logger.info(
f"[response_sync] Successfully generated response: response_id={response.id}, project_id={project_id}"
)
return ResponsesAPIResponse.success_response(
data=_APIResponse(
status="success",
Expand All @@ -449,6 +451,10 @@ async def responses_sync(
)
except openai.OpenAIError as e:
error_message = handle_openai_error(e)
logger.error(
f"[response_sync] OpenAI API error during response processing: {error_message}, project_id={project_id}",
exc_info=True,
)
tracer.log_error(error_message, response_id=request.response_id)
tracer.flush()
return ResponsesAPIResponse.failure_response(error=error_message)
79 changes: 69 additions & 10 deletions backend/app/api/routes/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from app.core import logging, settings
from app.models import UserOrganization, OpenAIThreadCreate, UserProjectOrg
from app.crud import upsert_thread_result, get_thread_result
from app.utils import APIResponse
from app.utils import APIResponse, mask_string
from app.crud.credentials import get_provider_credential
from app.core.util import configure_openai
from app.core.langfuse.langfuse import LangfuseTracer
Expand Down Expand Up @@ -42,9 +42,10 @@ def send_callback(callback_url: str, data: dict):
# session.verify = False
response = session.post(callback_url, json=data)
response.raise_for_status()
logger.info(f"[send_callback] Callback sent successfully to {callback_url}")
return True
except requests.RequestException as e:
logger.error(f"Callback failed: {str(e)}", exc_info=True)
logger.error(f"[send_callback] Callback failed: {str(e)}", exc_info=True)
return False


Expand All @@ -65,12 +66,19 @@ def validate_thread(client: OpenAI, thread_id: str) -> tuple[bool, str]:
if runs.data and len(runs.data) > 0:
latest_run = runs.data[0]
if latest_run.status in ["queued", "in_progress", "requires_action"]:
logger.error(
f"[validate_thread] Thread ID {mask_string(thread_id)} is currently {latest_run.status}."
)
return (
False,
f"There is an active run on this thread (status: {latest_run.status}). Please wait for it to complete.",
)
return True, None
except openai.OpenAIError:
except openai.OpenAIError as e:
logger.error(
f"[validate_thread] Failed to validate thread ID {mask_string(thread_id)}: {str(e)}",
exc_info=True,
)
return False, f"Invalid thread ID provided {thread_id}"


Expand All @@ -82,8 +90,15 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]:
client.beta.threads.messages.create(
thread_id=thread_id, role="user", content=request["question"]
)
logger.info(
f"[setup_thread] Added message to existing thread {mask_string(thread_id)}"
)
return True, None
except openai.OpenAIError as e:
logger.error(
f"[setup_thread] Failed to add message to existing thread {mask_string(thread_id)}: {str(e)}",
exc_info=True,
)
return False, handle_openai_error(e)
else:
try:
Expand All @@ -92,8 +107,14 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]:
thread_id=thread.id, role="user", content=request["question"]
)
request["thread_id"] = thread.id
logger.info(
f"[setup_thread] Created new thread with ID: {mask_string(thread.id)}"
)
return True, None
except openai.OpenAIError as e:
logger.error(
f"[setup_thread] Failed to create new thread: {str(e)}", exc_info=True
)
return False, handle_openai_error(e)


Expand Down Expand Up @@ -156,6 +177,9 @@ def process_run_core(
)

try:
logger.info(
f"[process_run_core] Starting run for thread ID: {mask_string(request.get('thread_id'))} with assistant ID: {mask_string(request.get('assistant_id'))}"
)
run = client.beta.threads.runs.create_and_poll(
thread_id=request["thread_id"],
assistant_id=request["assistant_id"],
Expand Down Expand Up @@ -189,15 +213,25 @@ def process_run_core(
"model": run.model,
}
request = {**request, **{"diagnostics": diagnostics}}
logger.info(
f"[process_run_core] Run completed successfully for thread ID: {mask_string(request.get('thread_id'))}"
)
return create_success_response(request, message).model_dump(), None
else:
error_msg = f"Run failed with status: {run.status}"
logger.error(
f"[process_run_core] Run failed with error: {run.last_error} for thread ID: {mask_string(request.get('thread_id'))}"
)
tracer.log_error(error_msg)
return APIResponse.failure_response(error=error_msg).model_dump(), error_msg

except openai.OpenAIError as e:
error_msg = handle_openai_error(e)
tracer.log_error(error_msg)
logger.error(
f"[process_run_core] OpenAI error: {error_msg} for thread ID: {mask_string(request.get('thread_id'))}",
exc_info=True,
)
return APIResponse.failure_response(error=error_msg).model_dump(), error_msg
finally:
tracer.flush()
Expand All @@ -214,9 +248,12 @@ def poll_run_and_prepare_response(request: dict, client: OpenAI, db: Session):
thread_id = request["thread_id"]
prompt = request["question"]

logger.info(
f"[poll_run_and_prepare_response] Starting run for thread ID: {mask_string(thread_id)}"
)

try:
run = run_and_poll_thread(client, thread_id, request["assistant_id"])

status = run.status or "unknown"
response = None
error = None
Expand All @@ -225,11 +262,18 @@ def poll_run_and_prepare_response(request: dict, client: OpenAI, db: Session):
response = extract_response_from_thread(
client, thread_id, request.get("remove_citation", False)
)
logger.info(
f"[poll_run_and_prepare_response] Successfully executed run for thread ID: {mask_string(thread_id)}"
)

except openai.OpenAIError as e:
status = "failed"
error = str(e)
response = None
logger.error(
f"[poll_run_and_prepare_response] Run failed for thread ID {mask_string(thread_id)}: {error}",
exc_info=True,
)

upsert_thread_result(
db,
Expand Down Expand Up @@ -259,6 +303,9 @@ async def threads(
)
client, success = configure_openai(credentials)
if not success:
logger.error(
f"[threads] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}"
)
return APIResponse.failure_response(
error="OpenAI API key not configured for this organization."
)
Expand Down Expand Up @@ -304,7 +351,9 @@ async def threads(
)
# Schedule background task
background_tasks.add_task(process_run, request, client, tracer)

logger.info(
f"[threads] Background task scheduled for thread ID: {mask_string(request.get('thread_id'))} | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}"
)
return initial_response


Expand All @@ -325,6 +374,9 @@ async def threads_sync(
# Configure OpenAI client
client, success = configure_openai(credentials)
if not success:
logger.error(
f"[threads_sync] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_id}, project_id: {request.get('project_id')}"
)
return APIResponse.failure_response(
error="OpenAI API key not configured for this organization."
)
Expand All @@ -341,6 +393,7 @@ async def threads_sync(
is_valid, error_message = validate_thread(client, request.get("thread_id"))
if not is_valid:
raise Exception(error_message)

# Setup thread
is_success, error_message = setup_thread(client, request)
if not is_success:
Expand All @@ -360,11 +413,8 @@ async def threads_sync(
metadata={"thread_id": request.get("thread_id")},
)

try:
response, error_message = process_run_core(request, client, tracer)
return response
finally:
tracer.flush()
response, error_message = process_run_core(request, client, tracer)
return response


@router.post("/threads/start")
Expand All @@ -389,6 +439,9 @@ async def start_thread(
# Configure OpenAI client
client, success = configure_openai(credentials)
if not success:
logger.error(
f"[start_thread] OpenAI API key not configured for this organization. | project_id: {_current_user.project_id}"
)
return APIResponse.failure_response(
error="OpenAI API key not configured for this organization."
)
Expand All @@ -412,6 +465,9 @@ async def start_thread(

background_tasks.add_task(poll_run_and_prepare_response, request, client, db)

logger.info(
f"[start_thread] Background task scheduled to process response for thread ID: {mask_string(thread_id)} | project_id: {_current_user.project_id}"
)
return APIResponse.success_response(
data={
"thread_id": thread_id,
Expand All @@ -434,6 +490,9 @@ async def get_thread(
result = get_thread_result(db, thread_id)

if not result:
logger.error(
f"[get_thread] Thread result not found for ID: {mask_string(thread_id)} | org_id: {_current_user.organization_id}"
)
raise HTTPException(404, "thread not found")

status = result.status or ("success" if result.response else "processing")
Expand Down
11 changes: 10 additions & 1 deletion backend/app/crud/thread_results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import logging
from sqlmodel import Session, select
from datetime import datetime
from app.models import OpenAIThreadCreate, OpenAI_Thread
from app.utils import mask_string

logger = logging.getLogger(__name__)


def upsert_thread_result(session: Session, data: OpenAIThreadCreate):
Expand All @@ -13,10 +17,15 @@ def upsert_thread_result(session: Session, data: OpenAIThreadCreate):
existing.status = data.status
existing.error = data.error
existing.updated_at = datetime.utcnow()
logger.info(
f"[upsert_thread_result] Updated existing thread result in the db with ID: {mask_string(data.thread_id)}"
)
else:
new_thread = OpenAI_Thread(**data.dict())
session.add(new_thread)

logger.info(
f"[upsert_thread_result] Created new thread result in the db with ID: {mask_string(new_thread.thread_id)}"
)
session.commit()


Expand Down