diff --git a/.env.example b/.env.example index 3fa7be346..b25b0f24e 100644 --- a/.env.example +++ b/.env.example @@ -73,3 +73,8 @@ CELERY_WORKER_PREFETCH_MULTIPLIER=1 CELERY_ENABLE_UTC=true # India Standard Time (UTC+05:30) CELERY_TIMEZONE=Asia/Kolkata + + +# Callback Timeouts (in seconds) +CALLBACK_CONNECT_TIMEOUT = 3 +CALLBACK_READ_TIMEOUT = 10 diff --git a/.env.test.example b/.env.test.example index 9065e4e68..f938561d9 100644 --- a/.env.test.example +++ b/.env.test.example @@ -28,3 +28,7 @@ AWS_ACCESS_KEY_ID=this_is_a_test_key AWS_SECRET_ACCESS_KEY=this_is_a_test_key AWS_DEFAULT_REGION=ap-south-1 AWS_S3_BUCKET_PREFIX="bucket-prefix-name" + +# Callback Timeouts (in seconds) +CALLBACK_CONNECT_TIMEOUT = 3 +CALLBACK_READ_TIMEOUT = 10 diff --git a/backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py b/backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py new file mode 100644 index 000000000..029bb1740 --- /dev/null +++ b/backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py @@ -0,0 +1,44 @@ +"""create job table + +Revision ID: c6fb6d0b5897 +Revises: 6ed6ed401847 +Create Date: 2025-09-22 17:55:57.558157 + +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel.sql.sqltypes + + +# revision identifiers, used by Alembic. +revision = "c6fb6d0b5897" +down_revision = "6ed6ed401847" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "job", + sa.Column("id", sa.Uuid(), nullable=False), + sa.Column("task_id", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.Column("trace_id", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.Column("error_message", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.Column( + "status", + sa.Enum("PENDING", "PROCESSING", "SUCCESS", "FAILED", name="jobstatus"), + nullable=False, + ), + sa.Column("job_type", sa.Enum("RESPONSE", name="jobtype"), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("id"), + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("job") + # ### end Alembic commands ### diff --git a/backend/app/api/routes/responses.py b/backend/app/api/routes/responses.py index 94e5f19db..c84659ee0 100644 --- a/backend/app/api/routes/responses.py +++ b/backend/app/api/routes/responses.py @@ -1,386 +1,62 @@ import logging -from typing import Optional import openai -from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException -from openai import OpenAI -from pydantic import BaseModel, Extra +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import JSONResponse from sqlmodel import Session -from app.core.db import engine from app.api.deps import get_db, get_current_user_org_project -from app.api.routes.threads import send_callback -from app.crud.assistants import get_assistant_by_id +from app.core.langfuse.langfuse import LangfuseTracer from app.crud.credentials import get_provider_credential -from app.crud.openai_conversation import ( - create_conversation, - get_ancestor_id_from_response, - get_conversation_by_ancestor_id, +from app.models import ( + CallbackResponse, + Diagnostics, + ResponsesAPIRequest, + ResponseJobStatus, + ResponsesSyncAPIRequest, + UserProjectOrg, ) -from app.models import UserProjectOrg, OpenAIConversationCreate, OpenAIConversation -from app.utils import APIResponse, mask_string -from app.core.langfuse.langfuse import LangfuseTracer +from app.services.response.jobs import start_job +from app.services.response.response import get_file_search_results +from app.services.response.callbacks import get_additional_data +from app.utils import APIResponse, get_openai_client, handle_openai_error, mask_string + logger = logging.getLogger(__name__) router = APIRouter(tags=["responses"]) -def handle_openai_error(e: openai.OpenAIError) -> str: - """Extract error message from OpenAI error.""" - # Try to get error message from different possible attributes - if hasattr(e, "body") and isinstance(e.body, dict) and "message" in e.body: - return e.body["message"] - elif hasattr(e, "message"): - return e.message - elif hasattr(e, "response") and hasattr(e.response, "json"): - try: - error_data = e.response.json() - if isinstance(error_data, dict) and "error" in error_data: - error_info = error_data["error"] - if isinstance(error_info, dict) and "message" in error_info: - return error_info["message"] - except: - pass - return str(e) - - -class ResponsesAPIRequest(BaseModel): - assistant_id: str - question: str - callback_url: Optional[str] = None - response_id: Optional[str] = None - - class Config: - extra = Extra.allow - - -class ResponsesSyncAPIRequest(BaseModel): - model: str - instructions: str - vector_store_ids: list[str] - max_num_results: Optional[int] = 20 - temperature: Optional[float] = 0.1 - response_id: Optional[str] = None - question: str - - -class Diagnostics(BaseModel): - input_tokens: int - output_tokens: int - total_tokens: int - model: str - - -class FileResultChunk(BaseModel): - score: float - text: str - - -class _APIResponse(BaseModel): - status: str - response_id: str - message: str - chunks: list[FileResultChunk] - diagnostics: Optional[Diagnostics] = None - - class Config: - extra = Extra.allow - - -class ResponsesAPIResponse(APIResponse[_APIResponse]): - pass - - -def get_file_search_results(response): - results: list[FileResultChunk] = [] - for tool_call in response.output: - if tool_call.type == "file_search_call": - results.extend( - [FileResultChunk(score=hit.score, text=hit.text) for hit in results] - ) - return results - - -def get_additional_data(request: dict) -> dict: - """Extract additional data from request, excluding specific keys.""" - # Keys to exclude for async request (ResponsesAPIRequest) - async_exclude_keys = {"assistant_id", "callback_url", "response_id", "question"} - # Keys to exclude for sync request (ResponsesSyncAPIRequest) - sync_exclude_keys = { - "model", - "instructions", - "vector_store_ids", - "max_num_results", - "temperature", - "response_id", - "question", - } - - # Determine which keys to exclude based on the request structure - if "assistant_id" in request: - exclude_keys = async_exclude_keys - else: - exclude_keys = sync_exclude_keys - - return {k: v for k, v in request.items() if k not in exclude_keys} - - -def process_response( - request: ResponsesAPIRequest, - client: OpenAI, - assistant, - tracer: LangfuseTracer, - project_id: int, - organization_id: int, - ancestor_id: str, - latest_conversation: OpenAIConversation | None, -): - """Process a response and send callback with results, with Langfuse tracing.""" - logger.info( - f"[process_response] Starting generating response for assistant_id={mask_string(request.assistant_id)}, project_id={project_id}" - ) - - tracer.start_trace( - name="generate_response_async", - input={"question": request.question, "assistant_id": request.assistant_id}, - metadata={"callback_url": request.callback_url}, - tags=[request.assistant_id], - ) - - tracer.start_generation( - name="openai_response", - input={"question": request.question}, - metadata={"model": assistant.model, "temperature": assistant.temperature}, - ) - - try: - params = { - "model": assistant.model, - "previous_response_id": ancestor_id, - "instructions": assistant.instructions, - "temperature": assistant.temperature, - "input": [{"role": "user", "content": request.question}], - } - - if assistant.vector_store_ids: - params["tools"] = [ - { - "type": "file_search", - "vector_store_ids": assistant.vector_store_ids, - "max_num_results": assistant.max_num_results, - } - ] - params["include"] = ["file_search_call.results"] - - response = client.responses.create(**params) - - response_chunks = get_file_search_results(response) - - logger.info( - f"[process_response] Successfully generated response: response_id={response.id}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" - ) - - tracer.end_generation( - output={ - "response_id": response.id, - "message": response.output_text, - }, - usage={ - "input": response.usage.input_tokens, - "output": response.usage.output_tokens, - "total": response.usage.total_tokens, - "unit": "TOKENS", - }, - model=response.model, - ) - - tracer.update_trace( - tags=[response.id], - output={ - "status": "success", - "message": response.output_text, - "error": None, - }, - ) - - with Session(engine) as session: - ancestor_response_id = ( - latest_conversation.ancestor_response_id - if latest_conversation - else get_ancestor_id_from_response( - session=session, - current_response_id=response.id, - previous_response_id=response.previous_response_id, - project_id=project_id, - ) - ) - - # Create conversation record in database - conversation_data = OpenAIConversationCreate( - response_id=response.id, - previous_response_id=response.previous_response_id, - ancestor_response_id=ancestor_response_id, - user_question=request.question, - response=response.output_text, - model=response.model, - assistant_id=request.assistant_id, - ) - - create_conversation( - session=session, - conversation=conversation_data, - project_id=project_id, - organization_id=organization_id, - ) - - request_dict = request.model_dump() - callback_response = ResponsesAPIResponse.success_response( - data=_APIResponse( - status="success", - response_id=response.id, - message=response.output_text, - chunks=response_chunks, - diagnostics=Diagnostics( - input_tokens=response.usage.input_tokens, - output_tokens=response.usage.output_tokens, - total_tokens=response.usage.total_tokens, - model=response.model, - ), - ) - ) - except openai.OpenAIError as e: - error_message = handle_openai_error(e) - logger.error( - f"[process_response] OpenAI API error during response processing: {error_message}, project_id={project_id}", - exc_info=True, - ) - tracer.log_error(error_message, response_id=request.response_id) - - request_dict = request.model_dump() - callback_response = ResponsesAPIResponse.failure_response(error=error_message) - - tracer.flush() - - if request.callback_url: - logger.info( - f"[process_response] Sending callback to URL: {request.callback_url}, assistant={mask_string(request.assistant_id)}, project_id={project_id}" - ) - - # Send callback with webhook-specific response format - callback_data = callback_response.model_dump() - send_callback( - request.callback_url, - { - "success": callback_data.get("success", False), - "data": { - **(callback_data.get("data") or {}), - **get_additional_data(request_dict), - }, - "error": callback_data.get("error"), - "metadata": None, - }, - ) - logger.info( - f"[process_response] Callback sent successfully, assistant={mask_string(request.assistant_id)}, project_id={project_id}" - ) - - -@router.post("/responses", response_model=dict) +@router.post("/responses", response_model=APIResponse[ResponseJobStatus]) async def responses( request: ResponsesAPIRequest, - background_tasks: BackgroundTasks, _session: Session = Depends(get_db), _current_user: UserProjectOrg = Depends(get_current_user_org_project), ): - """Asynchronous endpoint that processes requests in background with Langfuse tracing.""" - + """Asynchronous endpoint that processes requests using Celery.""" project_id, organization_id = ( _current_user.project_id, _current_user.organization_id, ) - assistant = get_assistant_by_id(_session, request.assistant_id, project_id) - if not assistant: - logger.warning( - f"[response] Assistant not found: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}", - ) - raise HTTPException(status_code=404, detail="Assistant not found or not active") - - credentials = get_provider_credential( - session=_session, - org_id=organization_id, - provider="openai", - project_id=project_id, - ) - if not credentials or "api_key" not in credentials: - logger.error( - f"[response] OpenAI API key not configured for org_id={organization_id}, project_id={project_id}" - ) - request_dict = request.model_dump() - additional_data = get_additional_data(request_dict) - return { - "success": False, - "error": "OpenAI API key not configured for this organization.", - "data": additional_data if additional_data else None, - "metadata": None, - } - - client = OpenAI(api_key=credentials["api_key"]) - - langfuse_credentials = get_provider_credential( - session=_session, - org_id=organization_id, - provider="langfuse", + start_job( + db=_session, + request=request, project_id=project_id, + organization_id=organization_id, ) - tracer = LangfuseTracer( - credentials=langfuse_credentials, - response_id=request.response_id, - ) - - ancestor_id = request.response_id - latest_conversation = None - if ancestor_id: - latest_conversation = get_conversation_by_ancestor_id( - session=_session, - ancestor_response_id=ancestor_id, - project_id=project_id, - ) - if latest_conversation: - ancestor_id = latest_conversation.response_id - - background_tasks.add_task( - process_response, - request, - client, - assistant, - tracer, - project_id, - organization_id, - ancestor_id, - latest_conversation, - ) - - logger.info( - f"[response] Background task scheduled for response processing: assistant_id={mask_string(request.assistant_id)}, project_id={project_id}, organization_id={organization_id}" - ) - request_dict = request.model_dump() + additional_data = get_additional_data(request_dict) - return { - "success": True, - "data": { - "status": "processing", - "message": "Response creation started", - **additional_data, - }, - "error": None, - "metadata": None, - } + response = ResponseJobStatus( + status="processing", + message="Your request is being processed. You will receive a callback once it's complete.", + **additional_data, + ) + return APIResponse.success_response(data=response) -@router.post("/responses/sync", response_model=ResponsesAPIResponse) +@router.post("/responses/sync", response_model=APIResponse[CallbackResponse]) async def responses_sync( request: ResponsesSyncAPIRequest, _session: Session = Depends(get_db), @@ -392,28 +68,21 @@ async def responses_sync( _current_user.organization_id, ) - credentials = get_provider_credential( - session=_session, - org_id=organization_id, - provider="openai", - project_id=project_id, - ) - if not credentials or "api_key" not in credentials: + try: + client = get_openai_client(_session, organization_id, project_id) + except HTTPException as e: request_dict = request.model_dump() - logger.error( - f"[response_sync] OpenAI API key not configured for org_id={organization_id}, project_id={project_id}" - ) - # Create a custom error response with additional data in data field additional_data = get_additional_data(request_dict) - return APIResponse( - success=False, - data=additional_data if additional_data else None, - error="OpenAI API key not configured for this organization.", - metadata=None, + return JSONResponse( + status_code=e.status_code, + content={ + "success": False, + "data": additional_data if additional_data else None, + "error": str(e.detail), + "metadata": None, + }, ) - client = OpenAI(api_key=credentials["api_key"]) - langfuse_credentials = get_provider_credential( session=_session, org_id=organization_id, @@ -484,8 +153,8 @@ async def responses_sync( request_dict = request.model_dump() additional_data = get_additional_data(request_dict) - return ResponsesAPIResponse.success_response( - data=_APIResponse( + return APIResponse.success_response( + data=CallbackResponse( status="success", response_id=response.id, message=response.output_text, @@ -511,9 +180,12 @@ async def responses_sync( request_dict = request.model_dump() # Create a custom error response with additional data in data field additional_data = get_additional_data(request_dict) - return ResponsesAPIResponse( - success=False, - data=additional_data if additional_data else None, - error=error_message, - metadata=None, + return JSONResponse( + status_code=400, + content={ + "success": False, + "data": additional_data if additional_data else None, + "error": error_message, + "metadata": None, + }, ) diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 16f095647..515874af5 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -118,6 +118,10 @@ def AWS_S3_BUCKET(self) -> str: CELERY_ENABLE_UTC: bool = True CELERY_TIMEZONE: str = "UTC" + # callback timeouts + CALLBACK_CONNECT_TIMEOUT: int = 3 + CALLBACK_READ_TIMEOUT: int = 10 + @computed_field # type: ignore[prop-decorator] @property def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int: diff --git a/backend/app/crud/__init__.py b/backend/app/crud/__init__.py index f73a07003..43ef15565 100644 --- a/backend/app/crud/__init__.py +++ b/backend/app/crud/__init__.py @@ -10,6 +10,8 @@ from .document_collection import DocumentCollectionCrud from .doc_transformation_job import DocTransformationJobCrud +from .jobs import JobCrud + from .organization import ( create_organization, get_organization_by_id, @@ -59,6 +61,7 @@ ) from .openai_conversation import ( + get_ancestor_id_from_response, get_conversation_by_id, get_conversation_by_response_id, get_conversation_by_ancestor_id, diff --git a/backend/app/crud/jobs.py b/backend/app/crud/jobs.py new file mode 100644 index 000000000..1a9005b69 --- /dev/null +++ b/backend/app/crud/jobs.py @@ -0,0 +1,42 @@ +import logging +from sqlmodel import Session +from uuid import UUID + +from app.models.job import Job, JobType, JobUpdate +from app.core.util import now + +logger = logging.getLogger(__name__) + + +class JobCrud: + def __init__(self, session: Session): + self.session = session + + def create(self, job_type: JobType, trace_id: str | None = None) -> Job: + new_job = Job( + job_type=job_type, + trace_id=trace_id, + ) + self.session.add(new_job) + self.session.commit() + self.session.refresh(new_job) + return new_job + + def update(self, job_id: UUID, job_update: JobUpdate) -> Job: + job = self.session.get(Job, job_id) + if not job: + raise ValueError(f"Job not found with the given job_id {job_id}") + + update_data = job_update.model_dump(exclude_unset=True) + for field, value in update_data.items(): + setattr(job, field, value) + + job.updated_at = now() + self.session.add(job) + self.session.commit() + self.session.refresh(job) + + return job + + def get(self, job_id: UUID) -> Job | None: + return self.session.get(Job, job_id) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index e00f5ef28..94c45ba3f 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -14,6 +14,9 @@ TransformationStatus, ) from .document_collection import DocumentCollection + +from .job import Job, JobType, JobStatus, JobUpdate + from .message import Message from .project_user import ( @@ -91,4 +94,13 @@ ModelEvaluationUpdate, ) +from .response import ( + CallbackResponse, + Diagnostics, + FileResultChunk, + ResponsesAPIRequest, + ResponseJobStatus, + ResponsesSyncAPIRequest, +) + from .onboarding import OnboardingRequest, OnboardingResponse diff --git a/backend/app/models/job.py b/backend/app/models/job.py new file mode 100644 index 000000000..4ddbd3b3e --- /dev/null +++ b/backend/app/models/job.py @@ -0,0 +1,49 @@ +from datetime import datetime +from enum import Enum +from uuid import uuid4, UUID + +from sqlmodel import SQLModel, Field +from app.core.util import now + + +class JobStatus(str, Enum): + PENDING = "PENDING" + PROCESSING = "PROCESSING" + SUCCESS = "SUCCESS" + FAILED = "FAILED" + + +class JobType(str, Enum): + RESPONSE = "RESPONSE" + + +class Job(SQLModel, table=True): + __tablename__ = "job" + + id: UUID = Field( + default_factory=uuid4, + primary_key=True, + ) + task_id: str | None = Field( + nullable=True, description="Celery task ID returned when job is queued." + ) + trace_id: str | None = Field( + default=None, description="Tracing ID for correlating logs and traces." + ) + error_message: str | None = Field( + default=None, description="Error details if the job fails." + ) + status: JobStatus = Field( + default=JobStatus.PENDING, description="Current state of the job." + ) + job_type: JobType = Field( + description="Type of job being executed (e.g., response, ingestion)." + ) + created_at: datetime = Field(default_factory=now) + updated_at: datetime = Field(default_factory=now) + + +class JobUpdate(SQLModel): + status: JobStatus | None = None + error_message: str | None = None + task_id: str | None = None diff --git a/backend/app/models/response.py b/backend/app/models/response.py new file mode 100644 index 000000000..4de982de6 --- /dev/null +++ b/backend/app/models/response.py @@ -0,0 +1,54 @@ +from sqlmodel import SQLModel + + +class ResponsesAPIRequest(SQLModel): + assistant_id: str + question: str + callback_url: str | None = None + response_id: str | None = None + + class Config: + extra = "allow" + + +class ResponsesSyncAPIRequest(SQLModel): + model: str + instructions: str + vector_store_ids: list[str] + max_num_results: int = 20 + temperature: float = 0.1 + response_id: str | None = None + question: str + + class Config: + extra = "allow" + + +class ResponseJobStatus(SQLModel): + status: str + message: str | None = None + + class Config: + extra = "allow" + + +class Diagnostics(SQLModel): + input_tokens: int + output_tokens: int + total_tokens: int + model: str + + +class FileResultChunk(SQLModel): + score: float + text: str + + +class CallbackResponse(SQLModel): + status: str + response_id: str + message: str + diagnostics: Diagnostics | None = None + + class Config: + extra = "allow" diff --git a/backend/app/services/response/callbacks.py b/backend/app/services/response/callbacks.py new file mode 100644 index 000000000..fbf0af1f5 --- /dev/null +++ b/backend/app/services/response/callbacks.py @@ -0,0 +1,37 @@ +from app.models import ResponsesAPIRequest, ResponsesSyncAPIRequest +from app.utils import APIResponse, send_callback + + +def get_additional_data(request: dict) -> dict: + """ + Returns extra metadata included in the request payload + that is not part of the async or sync request models. + """ + + if "assistant_id" in request: + exclude_keys = set(ResponsesAPIRequest.model_fields.keys()) + else: + exclude_keys = set(ResponsesSyncAPIRequest.model_fields.keys()) + return {k: v for k, v in request.items() if k not in exclude_keys} + + +def send_response_callback( + callback_url: str, + callback_response: APIResponse, + request_dict: dict, +) -> None: + """Send a standardized callback response to the provided callback URL.""" + + callback_response = callback_response.model_dump() + send_callback( + callback_url, + { + "success": callback_response.get("success", False), + "data": { + **(callback_response.get("data") or {}), + **get_additional_data(request_dict), + }, + "error": callback_response.get("error"), + "metadata": None, + }, + ) diff --git a/backend/app/services/response/jobs.py b/backend/app/services/response/jobs.py new file mode 100644 index 000000000..cab0f0e83 --- /dev/null +++ b/backend/app/services/response/jobs.py @@ -0,0 +1,76 @@ +import logging +from uuid import UUID +from fastapi import HTTPException +from sqlmodel import Session +from asgi_correlation_id import correlation_id +from app.crud import JobCrud +from app.models import JobType, JobStatus, JobUpdate, ResponsesAPIRequest +from app.celery.utils import start_high_priority_job + +from app.services.response.response import process_response +from app.services.response.callbacks import send_response_callback + +logger = logging.getLogger(__name__) + + +def start_job( + db: Session, request: ResponsesAPIRequest, project_id: int, organization_id: int +) -> UUID: + """Create a response job and schedule Celery task.""" + trace_id = correlation_id.get() or "N/A" + job_crud = JobCrud(session=db) + job = job_crud.create(job_type=JobType.RESPONSE, trace_id=trace_id) + + try: + task_id = start_high_priority_job( + function_path="app.services.response.jobs.execute_job", + project_id=project_id, + job_id=str(job.id), + trace_id=trace_id, + request_data=request.model_dump(), + organization_id=organization_id, + ) + except Exception as e: + logger.error( + f"[start_job] Error starting Celery task : {str(e)} | job_id={job.id}, project_id={project_id}", + exc_info=True, + ) + job_update = JobUpdate(status=JobStatus.FAILED, error_message=str(e)) + job_crud.update(job_id=job.id, job_update=job_update) + raise HTTPException( + status_code=500, detail="Internal server error while generating response" + ) + + logger.info( + f"[start_job] Job scheduled to generate response | job_id={job.id}, project_id={project_id}, task_id={task_id}" + ) + return job.id + + +def execute_job( + request_data: dict, + project_id: int, + organization_id: int, + job_id: str, + task_id: str, + task_instance, +) -> None: + """Celery task to process a response request asynchronously.""" + request_data: ResponsesAPIRequest = ResponsesAPIRequest(**request_data) + job_id = UUID(job_id) + + response = process_response( + request=request_data, + project_id=project_id, + organization_id=organization_id, + job_id=job_id, + task_id=task_id, + task_instance=task_instance, + ) + + if request_data.callback_url: + send_response_callback( + callback_url=request_data.callback_url, + callback_response=response, + request_dict=request_data.model_dump(), + ) diff --git a/backend/app/services/response/response.py b/backend/app/services/response/response.py new file mode 100644 index 000000000..681606406 --- /dev/null +++ b/backend/app/services/response/response.py @@ -0,0 +1,284 @@ +import logging +from uuid import UUID + +import openai +from openai import OpenAI +from openai.types.responses.response import Response +from fastapi import HTTPException +from sqlmodel import Session + +from app.core.db import engine +from app.core.langfuse.langfuse import LangfuseTracer +from app.crud import ( + JobCrud, + get_assistant_by_id, + get_provider_credential, + create_conversation, + get_ancestor_id_from_response, + get_conversation_by_ancestor_id, +) +from app.models import ( + CallbackResponse, + Diagnostics, + FileResultChunk, + Assistant, + JobStatus, + JobUpdate, + ResponsesAPIRequest, + OpenAIConversationCreate, + OpenAIConversation, +) +from app.utils import ( + APIResponse, + get_openai_client, + handle_openai_error, + mask_string, +) + +logger = logging.getLogger(__name__) + + +def get_file_search_results(response: Response) -> list[FileResultChunk]: + """Extract file search results from a response.""" + results: list[FileResultChunk] = [] + for tool_call in response.output: + if tool_call.type == "file_search_call": + results.extend( + FileResultChunk(score=hit.score, text=hit.text) + for hit in tool_call.results + ) + return results + + +def _build_callback_response(response: Response) -> CallbackResponse: + """Build callback response with diagnostics and search results.""" + return CallbackResponse( + status="success", + response_id=response.id, + message=response.output_text, + diagnostics=Diagnostics( + input_tokens=response.usage.input_tokens, + output_tokens=response.usage.output_tokens, + total_tokens=response.usage.total_tokens, + model=response.model, + ), + ) + + +def _fail_job(job_id: UUID, error_message: str) -> APIResponse: + with Session(engine) as session: + JobCrud(session=session).update( + job_id=job_id, + job_update=JobUpdate( + status=JobStatus.FAILED, + error_message=error_message, + ), + ) + return APIResponse.failure_response(error=error_message) + + +def generate_response( + tracer: LangfuseTracer, + client: OpenAI, + assistant: Assistant, + request: ResponsesAPIRequest, + ancestor_id: str | None, +) -> tuple[Response | None, str | None]: + """Generate a response using OpenAI and track with Langfuse.""" + response: Response | None = None + error_message: str | None = None + + try: + tracer.start_trace( + name="generate_response_async", + input={ + "question": request.question, + "assistant_id": assistant.assistant_id, + }, + metadata={"callback_url": request.callback_url}, + tags=[assistant.assistant_id], + ) + tracer.start_generation( + name="openai_response", + input={"question": request.question}, + metadata={"model": assistant.model, "temperature": assistant.temperature}, + ) + + params: dict = { + "model": assistant.model, + "instructions": assistant.instructions, + "temperature": assistant.temperature, + "input": [{"role": "user", "content": request.question}], + } + if ancestor_id: + params["previous_response_id"] = ancestor_id + + if assistant.vector_store_ids: + params["tools"] = [ + { + "type": "file_search", + "vector_store_ids": assistant.vector_store_ids, + "max_num_results": assistant.max_num_results, + } + ] + params["include"] = ["file_search_call.results"] + + response = client.responses.create(**params) + + tracer.end_generation( + output={"response_id": response.id, "message": response.output_text}, + usage={ + "input": response.usage.input_tokens, + "output": response.usage.output_tokens, + "total": response.usage.total_tokens, + "unit": "TOKENS", + }, + model=response.model, + ) + tracer.update_trace( + tags=[response.id], + output={ + "status": "success", + "message": response.output_text, + "error": None, + }, + ) + + except openai.OpenAIError as e: + error_message = handle_openai_error(e) + logger.error( + f"[process_response_task] OpenAI API error: {error_message}", + exc_info=True, + ) + if tracer: + tracer.log_error(error_message, response_id=request.response_id) + + return response, error_message + + +def persist_conversation( + response: Response, + request: ResponsesAPIRequest, + project_id: int, + organization_id: int, + job_id: UUID, + assistant_id: str, + latest_conversation: OpenAIConversation | None, +) -> None: + """Persist conversation and mark job as successful.""" + with Session(engine) as session: + ancestor_response_id = ( + latest_conversation.ancestor_response_id + if latest_conversation + else get_ancestor_id_from_response( + session=session, + current_response_id=response.id, + previous_response_id=response.previous_response_id, + project_id=project_id, + ) + ) + + create_conversation( + session=session, + conversation=OpenAIConversationCreate( + response_id=response.id, + previous_response_id=response.previous_response_id, + ancestor_response_id=ancestor_response_id, + user_question=request.question, + response=response.output_text, + model=response.model, + assistant_id=assistant_id, + ), + project_id=project_id, + organization_id=organization_id, + ) + + JobCrud(session=session).update( + job_id=job_id, + job_update=JobUpdate(status=JobStatus.SUCCESS), + ) + + +def process_response( + request: ResponsesAPIRequest, + project_id: int, + organization_id: int, + job_id: UUID, + task_id: str, + task_instance, +) -> APIResponse: + assistant_id = request.assistant_id + logger.info( + f"[process_response_task] Generating response for " + f"assistant_id={mask_string(assistant_id)}, " + f"project_id={project_id}, task_id={task_id}, job_id={job_id}" + ) + + latest_conversation: OpenAIConversation | None = None + + try: + with Session(engine) as session: + JobCrud(session=session).update( + job_id=job_id, + job_update=JobUpdate(status=JobStatus.PROCESSING, task_id=task_id), + ) + + assistant = get_assistant_by_id(session, assistant_id, project_id) + if not assistant: + logger.error( + f"[process_response_task] Assistant not found: " + f"assistant_id={mask_string(assistant_id)}, project_id={project_id}" + ) + return _fail_job(job_id, "Assistant not found or not active") + + try: + client = get_openai_client(session, organization_id, project_id) + except HTTPException as e: + return _fail_job(job_id, str(e.detail)) + + langfuse_credentials = get_provider_credential( + session=session, + org_id=organization_id, + provider="langfuse", + project_id=project_id, + ) + + ancestor_id = request.response_id + if ancestor_id: + latest_conversation = get_conversation_by_ancestor_id( + session, + ancestor_response_id=ancestor_id, + project_id=project_id, + ) + if latest_conversation: + ancestor_id = latest_conversation.response_id + + tracer = LangfuseTracer( + credentials=langfuse_credentials, + response_id=request.response_id, + ) + response, error_message = generate_response( + tracer=tracer, + client=client, + assistant=assistant, + request=request, + ancestor_id=ancestor_id, + ) + + if response: + persist_conversation( + response, + request, + project_id, + organization_id, + job_id, + assistant_id, + latest_conversation, + ) + return APIResponse.success_response(data=_build_callback_response(response)) + else: + return _fail_job(job_id, error_message or "Unknown error") + + except Exception as e: + logger.error(f"[process_response_task] Unexpected error: {e}", exc_info=True) + return _fail_job(job_id, f"Unexpected error: {str(e)}") diff --git a/backend/app/tests/api/routes/test_responses.py b/backend/app/tests/api/routes/test_responses.py index 54a28ca12..3ee85e600 100644 --- a/backend/app/tests/api/routes/test_responses.py +++ b/backend/app/tests/api/routes/test_responses.py @@ -1,656 +1,30 @@ -from unittest.mock import MagicMock, patch -from app.api.routes.responses import process_response +from unittest.mock import patch +from fastapi.testclient import TestClient +from app.models import ResponsesAPIRequest -def create_mock_assistant(model="gpt-4o", vector_store_ids=None, max_num_results=20): - """Create a mock assistant with default or custom values.""" - if vector_store_ids is None: - vector_store_ids = ["vs_test"] - - mock_assistant = MagicMock() - mock_assistant.model = model - mock_assistant.instructions = "Test instructions" - mock_assistant.temperature = 0.1 - mock_assistant.vector_store_ids = vector_store_ids - mock_assistant.max_num_results = max_num_results - return mock_assistant - - -def create_mock_openai_response( - response_id="resp_1234567890abcdef1234567890abcdef1234567890", - output_text="Test output", - model="gpt-4o", - output=None, - previous_response_id=None, -): - """Create a mock OpenAI response with default or custom values.""" - if output is None: - output = [] - - mock_response = MagicMock() - mock_response.id = response_id - mock_response.output_text = output_text - mock_response.model = model - mock_response.usage.input_tokens = 10 - mock_response.usage.output_tokens = 5 - mock_response.usage.total_tokens = 15 - mock_response.output = output - mock_response.previous_response_id = previous_response_id - return mock_response - - -def create_mock_conversation( - response_id="resp_latest1234567890abcdef1234567890", - ancestor_response_id="resp_ancestor1234567890abcdef1234567890", -): - """Create a mock conversation with default or custom values.""" - mock_conversation = MagicMock() - mock_conversation.response_id = response_id - mock_conversation.ancestor_response_id = ancestor_response_id - return mock_conversation - - -def setup_common_mocks( - mock_get_credential, - mock_get_assistant, - mock_openai, - mock_tracer_class, - mock_get_ancestor_id_from_response, - mock_create_conversation, - mock_get_conversation_by_ancestor_id, - assistant_model="gpt-4o", - vector_store_ids=None, - conversation_found=True, - response_output=None, -): - """Setup common mocks used across multiple tests.""" - # Setup mock credentials - mock_get_credential.return_value = {"api_key": "test_api_key"} - - # Setup mock assistant - mock_assistant = create_mock_assistant(assistant_model, vector_store_ids) - mock_get_assistant.return_value = mock_assistant - - # Setup mock OpenAI client - mock_client = MagicMock() - mock_openai.return_value = mock_client - - # Setup mock response - mock_response = create_mock_openai_response(output=response_output) - mock_client.responses.create.return_value = mock_response - - # Setup mock tracer - mock_tracer = MagicMock() - mock_tracer_class.return_value = mock_tracer - - # Setup mock CRUD functions - mock_get_ancestor_id_from_response.return_value = ( - "resp_ancestor1234567890abcdef1234567890" - ) - mock_create_conversation.return_value = None - - # Setup mock conversation if needed - if conversation_found: - mock_conversation = create_mock_conversation() - mock_get_conversation_by_ancestor_id.return_value = mock_conversation - else: - mock_get_conversation_by_ancestor_id.return_value = None - - return mock_client, mock_assistant - - -@patch("app.api.routes.responses.process_response") -@patch("app.api.routes.responses.OpenAI") -@patch("app.api.routes.responses.get_provider_credential") -@patch("app.api.routes.responses.get_assistant_by_id") -@patch("app.api.routes.responses.LangfuseTracer") -@patch("app.api.routes.responses.get_ancestor_id_from_response") -@patch("app.api.routes.responses.create_conversation") -@patch("app.api.routes.responses.get_conversation_by_ancestor_id") -def test_responses_endpoint_success( - mock_get_conversation_by_ancestor_id, - mock_create_conversation, - mock_get_ancestor_id_from_response, - mock_tracer_class, - mock_get_assistant, - mock_get_credential, - mock_openai, - mock_process_response, - user_api_key_header: dict[str, str], - client, -): - """Test the /responses endpoint for successful response creation.""" - - # Mock the background task to prevent actual execution - mock_process_response.return_value = None - - # Setup common mocks - setup_common_mocks( - mock_get_credential, - mock_get_assistant, - mock_openai, - mock_tracer_class, - mock_get_ancestor_id_from_response, - mock_create_conversation, - mock_get_conversation_by_ancestor_id, - ) - - request_data = { - "assistant_id": "assistant_dalgo", - "question": "What is Dalgo?", - "callback_url": "http://example.com/callback", - } - - response = client.post( - "/api/v1/responses", json=request_data, headers=user_api_key_header - ) - - assert response.status_code == 200 - response_json = response.json() - assert response_json["success"] is True - assert response_json["data"]["status"] == "processing" - assert response_json["data"]["message"] == "Response creation started" - - # Verify that the background task was scheduled with correct parameters - mock_process_response.assert_called_once() - call_args = mock_process_response.call_args - assert call_args[0][0].assistant_id == "assistant_dalgo" - assert call_args[0][0].question == "What is Dalgo?" - assert call_args[0][0].callback_url == "http://example.com/callback" - assert call_args[0][0].response_id is None - - -@patch("app.api.routes.responses.process_response") -@patch("app.api.routes.responses.OpenAI") -@patch("app.api.routes.responses.get_provider_credential") -@patch("app.api.routes.responses.get_assistant_by_id") -@patch("app.api.routes.responses.LangfuseTracer") -@patch("app.api.routes.responses.get_ancestor_id_from_response") -@patch("app.api.routes.responses.create_conversation") -@patch("app.api.routes.responses.get_conversation_by_ancestor_id") -def test_responses_endpoint_without_vector_store( - mock_get_conversation_by_ancestor_id, - mock_create_conversation, - mock_get_ancestor_id_from_response, - mock_tracer_class, - mock_get_assistant, - mock_get_credential, - mock_openai, - mock_process_response, - user_api_key_header, - client, -): - """Test the /responses endpoint when assistant has no vector store configured.""" - # Mock the background task to prevent actual execution - mock_process_response.return_value = None - - # Setup common mocks with no vector store - mock_client, mock_assistant = setup_common_mocks( - mock_get_credential, - mock_get_assistant, - mock_openai, - mock_tracer_class, - mock_get_ancestor_id_from_response, - mock_create_conversation, - mock_get_conversation_by_ancestor_id, - assistant_model="gpt-4", - vector_store_ids=[], - ) - - request_data = { - "assistant_id": "assistant_123", - "question": "What is Glific?", - "callback_url": "http://example.com/callback", - } - - response = client.post( - "/api/v1/responses", json=request_data, headers=user_api_key_header - ) - assert response.status_code == 200 - response_json = response.json() - assert response_json["success"] is True - assert response_json["data"]["status"] == "processing" - assert response_json["data"]["message"] == "Response creation started" - - # Verify that the background task was scheduled with correct parameters - mock_process_response.assert_called_once() - call_args = mock_process_response.call_args - assert call_args[0][0].assistant_id == "assistant_123" - assert call_args[0][0].question == "What is Glific?" - assert call_args[0][0].callback_url == "http://example.com/callback" - assert call_args[0][0].response_id is None - - -@patch("app.api.routes.responses.get_assistant_by_id") -def test_responses_endpoint_assistant_not_found( - mock_get_assistant, - user_api_key_header, - client, -): - """Test the /responses endpoint when assistant is not found.""" - # Setup mock assistant to return None (not found) - mock_get_assistant.return_value = None - - request_data = { - "assistant_id": "nonexistent_assistant", - "question": "What is this?", - "callback_url": "http://example.com/callback", - } - - response = client.post( - "/api/v1/responses", json=request_data, headers=user_api_key_header - ) - assert response.status_code == 404 - response_json = response.json() - assert response_json["success"] is False - assert response_json["error"] == "Assistant not found or not active" - - -@patch("app.api.routes.responses.get_provider_credential") -@patch("app.api.routes.responses.get_assistant_by_id") -@patch("app.api.routes.responses.process_response") -def test_responses_endpoint_no_openai_credentials( - mock_process_response, - mock_get_assistant, - mock_get_credential, - user_api_key_header, - client, -): - """Test the /responses endpoint when OpenAI credentials are not configured.""" - # Setup mock assistant - mock_assistant = create_mock_assistant() - mock_get_assistant.return_value = mock_assistant - - # Setup mock credentials to return None (no credentials) - mock_get_credential.return_value = None - - request_data = { - "assistant_id": "assistant_123", - "question": "What is this?", - "callback_url": "http://example.com/callback", - } - - response = client.post( - "/api/v1/responses", json=request_data, headers=user_api_key_header - ) - assert response.status_code == 200 - response_json = response.json() - assert response_json["success"] is False - assert "OpenAI API key not configured" in response_json["error"] - # Ensure background task was not scheduled - mock_process_response.assert_not_called() - - -@patch("app.api.routes.responses.get_provider_credential") -@patch("app.api.routes.responses.get_assistant_by_id") -@patch("app.api.routes.responses.process_response") -def test_responses_endpoint_missing_api_key_in_credentials( - mock_process_response, - mock_get_assistant, - mock_get_credential, - user_api_key_header, - client, -): - """Test the /responses endpoint when credentials exist but don't have api_key.""" - # Setup mock assistant - mock_assistant = create_mock_assistant() - mock_get_assistant.return_value = mock_assistant - - # Setup mock credentials without api_key - mock_get_credential.return_value = {"other_key": "value"} - - request_data = { - "assistant_id": "assistant_123", - "question": "What is this?", - "callback_url": "http://example.com/callback", - } - - response = client.post( - "/api/v1/responses", json=request_data, headers=user_api_key_header - ) - assert response.status_code == 200 - response_json = response.json() - assert response_json["success"] is False - assert "OpenAI API key not configured" in response_json["error"] - # Ensure background task was not scheduled - mock_process_response.assert_not_called() - - -@patch("app.api.routes.responses.process_response") -@patch("app.api.routes.responses.OpenAI") -@patch("app.api.routes.responses.get_provider_credential") -@patch("app.api.routes.responses.get_assistant_by_id") -@patch("app.api.routes.responses.LangfuseTracer") -@patch("app.api.routes.responses.get_ancestor_id_from_response") -@patch("app.api.routes.responses.create_conversation") -@patch("app.api.routes.responses.get_conversation_by_ancestor_id") -def test_responses_endpoint_with_ancestor_conversation_found( - mock_get_conversation_by_ancestor_id, - mock_create_conversation, - mock_get_ancestor_id_from_response, - mock_tracer_class, - mock_get_assistant, - mock_get_credential, - mock_openai, - mock_process_response, - user_api_key_header: dict[str, str], - client, +def test_responses_async_success( + client: TestClient, user_api_key_header: dict[str, str] ): - """Test the /responses endpoint when a conversation is found by ancestor ID.""" - # Mock the background task to prevent actual execution - mock_process_response.return_value = None - - # Setup common mocks with conversation found - mock_client, mock_assistant = setup_common_mocks( - mock_get_credential, - mock_get_assistant, - mock_openai, - mock_tracer_class, - mock_get_ancestor_id_from_response, - mock_create_conversation, - mock_get_conversation_by_ancestor_id, - conversation_found=True, - ) - - request_data = { - "assistant_id": "assistant_dalgo", - "question": "What is Dalgo?", - "callback_url": "http://example.com/callback", - "response_id": "resp_ancestor1234567890abcdef1234567890", - } - - response = client.post( - "/api/v1/responses", json=request_data, headers=user_api_key_header - ) - - assert response.status_code == 200 - response_json = response.json() - assert response_json["success"] is True - assert response_json["data"]["status"] == "processing" - assert response_json["data"]["message"] == "Response creation started" - - # Verify that the background task was scheduled with correct parameters - mock_process_response.assert_called_once() - call_args = mock_process_response.call_args - assert call_args[0][0].assistant_id == "assistant_dalgo" - assert call_args[0][0].question == "What is Dalgo?" - assert call_args[0][0].callback_url == "http://example.com/callback" - assert call_args[0][0].response_id == "resp_ancestor1234567890abcdef1234567890" - - -@patch("app.api.routes.responses.process_response") -@patch("app.api.routes.responses.OpenAI") -@patch("app.api.routes.responses.get_provider_credential") -@patch("app.api.routes.responses.get_assistant_by_id") -@patch("app.api.routes.responses.LangfuseTracer") -@patch("app.api.routes.responses.get_ancestor_id_from_response") -@patch("app.api.routes.responses.create_conversation") -@patch("app.api.routes.responses.get_conversation_by_ancestor_id") -def test_responses_endpoint_with_ancestor_conversation_not_found( - mock_get_conversation_by_ancestor_id, - mock_create_conversation, - mock_get_ancestor_id_from_response, - mock_tracer_class, - mock_get_assistant, - mock_get_credential, - mock_openai, - mock_process_response, - user_api_key_header: dict[str, str], - client, -): - """Test the /responses endpoint when no conversation is found by ancestor ID.""" - # Mock the background task to prevent actual execution - mock_process_response.return_value = None - - # Setup common mocks with conversation not found - mock_client, mock_assistant = setup_common_mocks( - mock_get_credential, - mock_get_assistant, - mock_openai, - mock_tracer_class, - mock_get_ancestor_id_from_response, - mock_create_conversation, - mock_get_conversation_by_ancestor_id, - conversation_found=False, - ) - - request_data = { - "assistant_id": "assistant_dalgo", - "question": "What is Dalgo?", - "callback_url": "http://example.com/callback", - "response_id": "resp_ancestor1234567890abcdef1234567890", - } - - response = client.post( - "/api/v1/responses", json=request_data, headers=user_api_key_header - ) - - assert response.status_code == 200 - response_json = response.json() - assert response_json["success"] is True - assert response_json["data"]["status"] == "processing" - assert response_json["data"]["message"] == "Response creation started" - - # Verify that the background task was scheduled with correct parameters - mock_process_response.assert_called_once() - call_args = mock_process_response.call_args - assert call_args[0][0].assistant_id == "assistant_dalgo" - assert call_args[0][0].question == "What is Dalgo?" - assert call_args[0][0].callback_url == "http://example.com/callback" - assert call_args[0][0].response_id == "resp_ancestor1234567890abcdef1234567890" - - -@patch("app.api.routes.responses.process_response") -@patch("app.api.routes.responses.OpenAI") -@patch("app.api.routes.responses.get_provider_credential") -@patch("app.api.routes.responses.get_assistant_by_id") -@patch("app.api.routes.responses.LangfuseTracer") -@patch("app.api.routes.responses.get_ancestor_id_from_response") -@patch("app.api.routes.responses.create_conversation") -@patch("app.api.routes.responses.get_conversation_by_ancestor_id") -def test_responses_endpoint_without_response_id( - mock_get_conversation_by_ancestor_id, - mock_create_conversation, - mock_get_ancestor_id_from_response, - mock_tracer_class, - mock_get_assistant, - mock_get_credential, - mock_openai, - mock_process_response, - user_api_key_header: dict[str, str], - client, -): - """Test the /responses endpoint when no response_id is provided.""" - # Mock the background task to prevent actual execution - mock_process_response.return_value = None - - # Setup common mocks - mock_client, mock_assistant = setup_common_mocks( - mock_get_credential, - mock_get_assistant, - mock_openai, - mock_tracer_class, - mock_get_ancestor_id_from_response, - mock_create_conversation, - mock_get_conversation_by_ancestor_id, - ) - - request_data = { - "assistant_id": "assistant_dalgo", - "question": "What is Dalgo?", - "callback_url": "http://example.com/callback", - # No response_id provided - } - - response = client.post( - "/api/v1/responses", json=request_data, headers=user_api_key_header - ) - - assert response.status_code == 200 - response_json = response.json() - assert response_json["success"] is True - assert response_json["data"]["status"] == "processing" - assert response_json["data"]["message"] == "Response creation started" - - # Verify that the background task was scheduled with correct parameters - mock_process_response.assert_called_once() - call_args = mock_process_response.call_args - assert call_args[0][0].assistant_id == "assistant_dalgo" - assert call_args[0][0].question == "What is Dalgo?" - assert call_args[0][0].callback_url == "http://example.com/callback" - assert call_args[0][0].response_id is None - - -@patch("app.api.routes.responses.get_conversation_by_ancestor_id") -@patch("app.api.routes.responses.create_conversation") -@patch("app.api.routes.responses.get_ancestor_id_from_response") -@patch("app.api.routes.responses.send_callback") -def test_process_response_ancestor_conversation_found( - mock_send_callback, - mock_get_ancestor_id_from_response, - mock_create_conversation, - mock_get_conversation_by_ancestor_id, - db, - user_api_key, -): - """Test process_response function when ancestor conversation is found.""" - from app.api.routes.responses import ResponsesAPIRequest - - # Setup mock request - request = ResponsesAPIRequest( - assistant_id="assistant_dalgo", - question="What is Dalgo?", - callback_url="http://example.com/callback", - response_id="resp_ancestor1234567890abcdef1234567890", - ) - - # Setup mock assistant - mock_assistant = create_mock_assistant() - - # Setup mock OpenAI client - mock_client = MagicMock() - mock_response = create_mock_openai_response( - response_id="resp_new1234567890abcdef1234567890abcdef", - output_text="Test response", - previous_response_id="resp_latest1234567890abcdef1234567890", - ) - mock_client.responses.create.return_value = mock_response - - # Setup mock tracer - mock_tracer = MagicMock() - - # Setup mock conversation found by ancestor ID - mock_conversation = create_mock_conversation() - mock_get_conversation_by_ancestor_id.return_value = mock_conversation - - # Setup mock CRUD functions - mock_get_ancestor_id_from_response.return_value = ( - "resp_ancestor1234567890abcdef1234567890" - ) - mock_create_conversation.return_value = None - - # Call process_response - process_response( - request=request, - client=mock_client, - assistant=mock_assistant, - tracer=mock_tracer, - project_id=user_api_key.project_id, - organization_id=user_api_key.organization_id, - ancestor_id=mock_conversation.response_id, - latest_conversation=mock_conversation, - ) - - # process_response doesn't call get_conversation_by_ancestor_id; endpoint resolves it - mock_get_conversation_by_ancestor_id.assert_not_called() - - # Verify OpenAI client was called with the conversation's response_id as - # previous_response_id - mock_client.responses.create.assert_called_once() - call_args = mock_client.responses.create.call_args[1] - assert call_args["previous_response_id"] == ( - "resp_latest1234567890abcdef1234567890" - ) - - # Verify create_conversation was called - mock_create_conversation.assert_called_once() - - # Verify send_callback was called - mock_send_callback.assert_called_once() - - -@patch("app.api.routes.responses.get_conversation_by_ancestor_id") -@patch("app.api.routes.responses.create_conversation") -@patch("app.api.routes.responses.get_ancestor_id_from_response") -@patch("app.api.routes.responses.send_callback") -def test_process_response_ancestor_conversation_not_found( - mock_send_callback, - mock_get_ancestor_id_from_response, - mock_create_conversation, - mock_get_conversation_by_ancestor_id, - db, - user_api_key, -): - """Test process_response function when no ancestor conversation is found.""" - from app.api.routes.responses import ResponsesAPIRequest - - # Setup mock request - request = ResponsesAPIRequest( - assistant_id="assistant_dalgo", - question="What is Dalgo?", - callback_url="http://example.com/callback", - response_id="resp_ancestor1234567890abcdef1234567890", - ) - - # Setup mock assistant - mock_assistant = create_mock_assistant() - - # Setup mock OpenAI client - mock_client = MagicMock() - mock_response = create_mock_openai_response( - response_id="resp_new1234567890abcdef1234567890abcdef", - output_text="Test response", - previous_response_id="resp_ancestor1234567890abcdef1234567890", - ) - mock_client.responses.create.return_value = mock_response - - # Setup mock tracer - mock_tracer = MagicMock() - - # Setup mock conversation not found by ancestor ID - mock_get_conversation_by_ancestor_id.return_value = None - - # Setup mock CRUD functions - mock_get_ancestor_id_from_response.return_value = ( - "resp_ancestor1234567890abcdef1234567890" - ) - mock_create_conversation.return_value = None - - # Call process_response - process_response( - request=request, - client=mock_client, - assistant=mock_assistant, - tracer=mock_tracer, - project_id=user_api_key.project_id, - organization_id=user_api_key.organization_id, - ancestor_id=request.response_id, - latest_conversation=None, - ) + with patch("app.api.routes.responses.start_job") as mock_start_job: + payload = ResponsesAPIRequest( + assistant_id="assistant_123", + question="What is the capital of France?", + callback_url="http://example.com/callback", + response_id="response_123", + extra_field="extra_value", + ) - # process_response doesn't call get_conversation_by_ancestor_id; endpoint resolves it - mock_get_conversation_by_ancestor_id.assert_not_called() + response = client.post( + "api/v1/responses", json=payload.model_dump(), headers=user_api_key_header + ) - # Verify OpenAI client was called with the original response_id as - # previous_response_id - mock_client.responses.create.assert_called_once() - call_args = mock_client.responses.create.call_args[1] - assert call_args["previous_response_id"] == ( - "resp_ancestor1234567890abcdef1234567890" - ) + assert response.status_code == 200 + response_data = response.json() - # Verify create_conversation was called - mock_create_conversation.assert_called_once() + assert response_data["success"] is True + assert response_data["data"]["status"] == "processing" + assert "Your request is being processed" in response_data["data"]["message"] + assert response_data["data"]["extra_field"] == "extra_value" - # Verify send_callback was called - mock_send_callback.assert_called_once() + mock_start_job.assert_called_once() diff --git a/backend/app/tests/crud/test_jobs.py b/backend/app/tests/crud/test_jobs.py new file mode 100644 index 000000000..4c4aacede --- /dev/null +++ b/backend/app/tests/crud/test_jobs.py @@ -0,0 +1,60 @@ +from uuid import uuid4 +import pytest +from sqlmodel import Session +from app.crud import JobCrud +from app.models import JobUpdate, JobStatus, JobType + + +@pytest.fixture +def dummy_jobs(db: Session): + """Create and return a list of dummy jobs for testing.""" + crud = JobCrud(db) + + jobs = [ + crud.create(job_type=JobType.RESPONSE, trace_id="trace-1"), + crud.create(job_type=JobType.RESPONSE, trace_id="trace-2"), + crud.create(job_type=JobType.RESPONSE, trace_id="trace-3"), + ] + + return jobs + + +def test_create_job(db: Session): + crud = JobCrud(db) + job = crud.create(job_type=JobType.RESPONSE, trace_id="trace-123") + + assert job.id is not None + assert job.trace_id == "trace-123" + assert job.status == JobStatus.PENDING + + +def test_get_job(db: Session, dummy_jobs): + crud = JobCrud(db) + job = dummy_jobs[0] + + fetched = crud.get(job.id) + assert fetched is not None + assert fetched.id == job.id + assert fetched.trace_id == "trace-1" + + +def test_update_job(db: Session, dummy_jobs): + crud = JobCrud(db) + job = dummy_jobs[1] + + update_data = JobUpdate(status=JobStatus.FAILED, error_message="Error occurred") + updated_job = crud.update(job.id, update_data) + + assert updated_job.status == JobStatus.FAILED + assert updated_job.error_message == "Error occurred" + assert updated_job.updated_at is not None + assert updated_job.updated_at >= job.updated_at + + +def test_update_job_not_found(db: Session): + crud = JobCrud(db) + fake_id = uuid4() + update_data = JobUpdate(status=JobStatus.SUCCESS) + + with pytest.raises(ValueError, match=str(fake_id)): + crud.update(fake_id, update_data) diff --git a/backend/app/tests/services/response/response/test_generate_response.py b/backend/app/tests/services/response/response/test_generate_response.py new file mode 100644 index 000000000..9c26466b0 --- /dev/null +++ b/backend/app/tests/services/response/response/test_generate_response.py @@ -0,0 +1,70 @@ +import pytest +from unittest.mock import MagicMock + +from openai import OpenAIError +from sqlmodel import Session + +from app.core.langfuse.langfuse import LangfuseTracer +from app.models import Assistant, ResponsesAPIRequest +from app.services.response.response import generate_response + + +@pytest.fixture +def assistant_mock() -> Assistant: + """Fixture to create an assistant in DB with id=123.""" + assistant = Assistant( + id="123", + name="Test Assistant", + model="gpt-4", + temperature=0.7, + instructions="You are a helpful assistant.", + vector_store_ids=["vs1", "vs2"], + max_num_results=5, + ) + return assistant + + +def test_generate_response_success(db: Session, assistant_mock: Assistant): + """Test successful OpenAI response generation.""" + mock_client = MagicMock() + + request = ResponsesAPIRequest( + assistant_id="123", + question="What is the capital of France?", + callback_url="http://example.com/callback", + ) + + response, error = generate_response( + tracer=LangfuseTracer(), + client=mock_client, + assistant=assistant_mock, + request=request, + ancestor_id=None, + ) + + mock_client.responses.create.assert_called_once() + assert error is None + + +def test_generate_response_openai_error(assistant_mock: Assistant): + """Test OpenAI error handling path.""" + + mock_client = MagicMock() + mock_client.responses.create.side_effect = OpenAIError("API failed") + + request = ResponsesAPIRequest( + assistant_id="123", + question="What is the capital of Germany?", + ) + + response, error = generate_response( + tracer=LangfuseTracer(), + client=mock_client, + assistant=assistant_mock, + request=request, + ancestor_id=None, + ) + + assert response is None + assert error is not None + assert "API failed" in error diff --git a/backend/app/tests/services/response/response/test_process_response.py b/backend/app/tests/services/response/response/test_process_response.py new file mode 100644 index 000000000..0ca59019e --- /dev/null +++ b/backend/app/tests/services/response/response/test_process_response.py @@ -0,0 +1,166 @@ +import pytest +from unittest.mock import patch, MagicMock +from uuid import uuid4 +from app.services.response.response import process_response +from app.models import ( + ResponsesAPIRequest, + Assistant, + Job, + JobStatus, + AssistantCreate, + Project, + JobType, +) +from app.core.db import engine +from sqlmodel import Session +from app.utils import APIResponse +from app.tests.utils.utils import get_project +from app.tests.utils.test_data import create_test_credential +from app.tests.utils.openai import mock_openai_response, generate_openai_id +from app.crud import JobCrud, create_assistant +from openai import OpenAI + + +@pytest.fixture +def setup_db(db: Session) -> tuple[Assistant, Job, Project]: + """Fixture to set up a job and assistant in the database.""" + _, project = create_test_credential(db) + assistant_create = AssistantCreate( + name="Test Assistant", + instructions="You are a helpful assistant.", + model="gpt-4", + ) + client = OpenAI(api_key="test_api_key") + assistant = create_assistant( + session=db, + assistant=assistant_create, + openai_client=client, + project_id=project.id, + organization_id=project.organization_id, + ) + + job = JobCrud(session=db).create( + job_type=JobType.RESPONSE, + trace_id=str(uuid4()), + ) + + return assistant, job, project + + +def make_request(assistant_id: str, previous_response_id: str | None = None): + return ResponsesAPIRequest( + assistant_id=assistant_id, + question="What is the capital of France?", + callback_url="http://example.com/callback", + response_id=previous_response_id, + ) + + +def test_process_response_success( + db: Session, setup_db: tuple[Assistant, Job, Project] +) -> None: + assistant, job, project = setup_db + prev_id = generate_openai_id("resp_") + request = make_request(assistant.assistant_id, prev_id) + job_id = job.id + task_id = "task_123" + + response, error = mock_openai_response("Mock response text.", prev_id), None + + with ( + patch( + "app.services.response.response.generate_response", + return_value=(response, error), + ), + patch("app.services.response.response.Session", return_value=db), + ): + api_response: APIResponse = process_response( + request=request, + project_id=project.id, + organization_id=project.organization_id, + job_id=job_id, + task_id=task_id, + task_instance=None, + ) + + job = db.get(Job, job_id) + assert api_response.success is True + assert job.status == JobStatus.SUCCESS + + +def test_process_response_assistant_not_found( + db: Session, setup_db: tuple[Assistant, Job, Project] +) -> None: + _, job, project = setup_db + request: ResponsesAPIRequest = make_request("non_existent_asst") + + with patch("app.services.response.response.Session", return_value=db): + api_response: APIResponse = process_response( + request=request, + project_id=project.id, + organization_id=project.organization_id, + job_id=job.id, + task_id="task_456", + task_instance=None, + ) + + job = db.get(Job, job.id) + assert api_response.success is False + assert "Assistant not found" in api_response.error + assert job.status == JobStatus.FAILED + + +def test_process_response_generate_response_failure( + db: Session, setup_db: tuple[Assistant, Job, Project] +) -> None: + assistant, job, project = setup_db + request: ResponsesAPIRequest = make_request(assistant.assistant_id) + + with ( + patch( + "app.services.response.response.generate_response", + return_value=(None, "Some error"), + ), + patch("app.services.response.response.Session", return_value=db), + ): + api_response: APIResponse = process_response( + request=request, + project_id=project.id, + organization_id=project.organization_id, + job_id=job.id, + task_id="task_789", + task_instance=None, + ) + + job = db.get(Job, job.id) + assert api_response.success is False + assert "Some error" in api_response.error + assert job.status == JobStatus.FAILED + + +def test_process_response_unexpected_exception( + db: Session, setup_db: tuple[Assistant, Job, Project] +) -> None: + assistant, job, project = setup_db + request: ResponsesAPIRequest = make_request(assistant.assistant_id) + + with ( + patch( + "app.services.response.response.generate_response", + side_effect=Exception("Boom"), + ), + patch("app.services.response.response.Session", return_value=db), + ): + api_response: APIResponse = process_response( + request=request, + project_id=project.id, + organization_id=project.organization_id, + job_id=job.id, + task_id="task_999", + task_instance=None, + ) + + job = db.get(Job, job.id) + assert api_response.success is False + assert "Unexpected error" in api_response.error + assert job.status == JobStatus.FAILED diff --git a/backend/app/tests/services/response/test_jobs_response.py b/backend/app/tests/services/response/test_jobs_response.py new file mode 100644 index 000000000..1baf39ac1 --- /dev/null +++ b/backend/app/tests/services/response/test_jobs_response.py @@ -0,0 +1,55 @@ +import pytest +from unittest.mock import patch +from sqlmodel import Session, select +from fastapi import HTTPException +from app.services.response.jobs import start_job +from app.models import ResponsesAPIRequest, JobType, JobStatus, Job +from app.crud import JobCrud +from app.tests.utils.utils import get_project + + +def test_start_job(db: Session): + request = ResponsesAPIRequest( + assistant_id="assistant_123", + question="What is the capital of France?", + ) + + project = get_project(db) + # Patch Celery scheduling + with patch("app.services.response.jobs.start_high_priority_job") as mock_schedule: + mock_schedule.return_value = "fake-task-id" + + job_id = start_job(db, request, project.id, project.organization_id) + + job_crud = JobCrud(session=db) + job = job_crud.get(job_id) + assert job is not None + assert job.job_type == JobType.RESPONSE + assert job.status == JobStatus.PENDING + assert job.trace_id is not None + + # Validate Celery was called correctly + mock_schedule.assert_called_once() + _, kwargs = mock_schedule.call_args + assert kwargs["function_path"] == "app.services.response.jobs.execute_job" + assert kwargs["project_id"] == project.id + assert kwargs["organization_id"] == project.organization_id + assert kwargs["job_id"] == str(job_id) + assert kwargs["request_data"]["assistant_id"] == "assistant_123" + + +def test_start_job_celery_exception(db: Session): + """Test start_job when Celery task scheduling fails.""" + request = ResponsesAPIRequest( + assistant_id="assistant_123", + question="What is the capital of France?", + ) + project = get_project(db) + + with patch("app.services.response.jobs.start_high_priority_job") as mock_schedule: + mock_schedule.side_effect = Exception("Celery connection failed") + + with pytest.raises(HTTPException) as exc_info: + start_job(db, request, project.id, project.organization_id) + + assert exc_info.value.status_code == 500 diff --git a/backend/app/tests/utils/openai.py b/backend/app/tests/utils/openai.py index a864ee33c..93bebed7d 100644 --- a/backend/app/tests/utils/openai.py +++ b/backend/app/tests/utils/openai.py @@ -3,12 +3,15 @@ import string from typing import Optional +from types import SimpleNamespace from unittest.mock import MagicMock from openai.types.beta import Assistant as OpenAIAssistant from openai.types.beta.assistant import ToolResources, ToolResourcesFileSearch from openai.types.beta.assistant_tool import FileSearchTool from openai.types.beta.file_search_tool import FileSearch +from openai.types.responses.response import Response, ToolChoice, ResponseUsage +from openai.types.responses.response_output_item import ResponseOutputItem def generate_openai_id(prefix: str, length: int = 40) -> str: @@ -51,6 +54,39 @@ def mock_openai_assistant( ) +def mock_openai_response( + text: str = "Hello world", + previous_response_id: str | None = None, + model: str = "gpt-4", +) -> SimpleNamespace: + """Return a minimal mock OpenAI-like response object for testing.""" + + usage = SimpleNamespace( + input_tokens=10, + output_tokens=20, + total_tokens=30, + ) + + output_item = SimpleNamespace( + id=generate_openai_id("out_"), + type="message", + role="assistant", + content=[{"type": "output_text", "text": text}], + ) + + response = SimpleNamespace( + id=generate_openai_id("resp_"), + created_at=int(time.time()), + model=model, + object="response", + output=[output_item], + output_text=text, + usage=usage, + previous_response_id=previous_response_id, + ) + return response + + def get_mock_openai_client_with_vector_store(): mock_client = MagicMock() diff --git a/backend/app/utils.py b/backend/app/utils.py index 1c03839ab..b8e922973 100644 --- a/backend/app/utils.py +++ b/backend/app/utils.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path +import requests from typing import Any, Dict, Generic, Optional, TypeVar import jwt @@ -10,6 +11,7 @@ from jinja2 import Template from jwt.exceptions import InvalidTokenError from fastapi import HTTPException +import openai from openai import OpenAI from pydantic import BaseModel from sqlmodel import Session @@ -38,14 +40,17 @@ def success_response( @classmethod def failure_response( - cls, error: str | list, metadata: Optional[Dict[str, Any]] = None + cls, + error: str | list, + data: Optional[T] = None, + metadata: Optional[Dict[str, Any]] = None, ) -> "APIResponse[None]": if isinstance(error, list): # to handle cases when error is a list of errors error_message = "\n".join([f"{err['loc']}: {err['msg']}" for err in error]) else: error_message = error - return cls(success=False, data=None, error=error_message, metadata=metadata) + return cls(success=False, data=data, error=error_message, metadata=metadata) @dataclass @@ -200,6 +205,45 @@ def get_openai_client(session: Session, org_id: int, project_id: int) -> OpenAI: ) +def handle_openai_error(e: openai.OpenAIError) -> str: + if hasattr(e, "body") and isinstance(e.body, dict) and "message" in e.body: + return e.body["message"] + elif hasattr(e, "message"): + return e.message + elif hasattr(e, "response") and hasattr(e.response, "json"): + try: + error_data = e.response.json() + if isinstance(error_data, dict) and "error" in error_data: + error_info = error_data["error"] + if isinstance(error_info, dict) and "message" in error_info: + return error_info["message"] + except: + pass + return str(e) + + +def send_callback(callback_url: str, data: dict): + """Send results to the callback URL (synchronously).""" + try: + with requests.Session() as session: + # uncomment this to run locally without SSL + # session.verify = False + response = session.post( + callback_url, + json=data, + timeout=( + settings.CALLBACK_CONNECT_TIMEOUT, + settings.CALLBACK_READ_TIMEOUT, + ), + ) + response.raise_for_status() + logger.info(f"[send_callback] Callback sent successfully to {callback_url}") + return True + except requests.RequestException as e: + logger.error(f"[send_callback] Callback failed: {str(e)}", exc_info=True) + return False + + @ft.singledispatch def load_description(filename: Path) -> str: if not filename.exists(): diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 85f31cf5c..da158b34d 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "scikit-learn>=1.7.1", "celery>=5.3.0,<6.0.0", "redis>=5.0.0,<6.0.0", + "flower>=2.0.1", ] [tool.uv] diff --git a/backend/uv.lock b/backend/uv.lock index b6979ee53..c5f5ca013 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -176,6 +176,7 @@ dependencies = [ { name = "email-validator" }, { name = "emails" }, { name = "fastapi", extra = ["standard"] }, + { name = "flower" }, { name = "httpx" }, { name = "jinja2" }, { name = "langfuse" }, @@ -219,6 +220,7 @@ requires-dist = [ { name = "email-validator", specifier = ">=2.1.0.post1,<3.0.0.0" }, { name = "emails", specifier = ">=0.6,<1.0" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.114.2,<1.0.0" }, + { name = "flower", specifier = ">=2.0.1" }, { name = "httpx", specifier = ">=0.25.1,<1.0.0" }, { name = "jinja2", specifier = ">=3.1.4,<4.0.0" }, { name = "langfuse", specifier = ">=2.60.3" }, @@ -761,6 +763,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b9/f8/feced7779d755758a52d1f6635d990b8d98dc0a29fa568bbe0625f18fdf3/filelock-3.16.1-py3-none-any.whl", hash = "sha256:2082e5703d51fbf98ea75855d9d5527e33d8ff23099bec374a134febee6946b0", size = 16163, upload-time = "2024-09-17T19:02:00.268Z" }, ] +[[package]] +name = "flower" +version = "2.0.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "celery" }, + { name = "humanize" }, + { name = "prometheus-client" }, + { name = "pytz" }, + { name = "tornado" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/09/a1/357f1b5d8946deafdcfdd604f51baae9de10aafa2908d0b7322597155f92/flower-2.0.1.tar.gz", hash = "sha256:5ab717b979530770c16afb48b50d2a98d23c3e9fe39851dcf6bc4d01845a02a0", size = 3220408, upload-time = "2023-08-13T14:37:46.073Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a6/ff/ee2f67c0ff146ec98b5df1df637b2bc2d17beeb05df9f427a67bd7a7d79c/flower-2.0.1-py2.py3-none-any.whl", hash = "sha256:9db2c621eeefbc844c8dd88be64aef61e84e2deb29b271e02ab2b5b9f01068e2", size = 383553, upload-time = "2023-08-13T14:37:41.552Z" }, +] + [[package]] name = "frozenlist" version = "1.7.0" @@ -983,6 +1001,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/39/7b/bb06b061991107cd8783f300adff3e7b7f284e330fd82f507f2a1417b11d/huggingface_hub-0.34.4-py3-none-any.whl", hash = "sha256:9b365d781739c93ff90c359844221beef048403f1bc1f1c123c191257c3c890a", size = 561452, upload-time = "2025-08-08T09:14:50.159Z" }, ] +[[package]] +name = "humanize" +version = "4.13.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/98/1d/3062fcc89ee05a715c0b9bfe6490c00c576314f27ffee3a704122c6fd259/humanize-4.13.0.tar.gz", hash = "sha256:78f79e68f76f0b04d711c4e55d32bebef5be387148862cb1ef83d2b58e7935a0", size = 81884, upload-time = "2025-08-25T09:39:20.04Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1e/c7/316e7ca04d26695ef0635dc81683d628350810eb8e9b2299fc08ba49f366/humanize-4.13.0-py3-none-any.whl", hash = "sha256:b810820b31891813b1673e8fec7f1ed3312061eab2f26e3fa192c393d11ed25f", size = 128869, upload-time = "2025-08-25T09:39:18.54Z" }, +] + [[package]] name = "identify" version = "2.6.1" @@ -1780,6 +1807,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b1/07/4e8d94f94c7d41ca5ddf8a9695ad87b888104e2fd41a35546c1dc9ca74ac/premailer-3.10.0-py2.py3-none-any.whl", hash = "sha256:021b8196364d7df96d04f9ade51b794d0b77bcc19e998321c515633a2273be1a", size = 19544, upload-time = "2021-08-02T20:32:52.771Z" }, ] +[[package]] +name = "prometheus-client" +version = "0.23.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/23/53/3edb5d68ecf6b38fcbcc1ad28391117d2a322d9a1a3eff04bfdb184d8c3b/prometheus_client-0.23.1.tar.gz", hash = "sha256:6ae8f9081eaaaf153a2e959d2e6c4f4fb57b12ef76c8c7980202f1e57b48b2ce", size = 80481, upload-time = "2025-09-18T20:47:25.043Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b8/db/14bafcb4af2139e046d03fd00dea7873e48eafe18b7d2797e73d6681f210/prometheus_client-0.23.1-py3-none-any.whl", hash = "sha256:dd1913e6e76b59cfe44e7a4b83e01afc9873c1bdfd2ed8739f1e76aeca115f99", size = 61145, upload-time = "2025-09-18T20:47:23.875Z" }, +] + [[package]] name = "prompt-toolkit" version = "3.0.52" @@ -2721,6 +2757,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/41/f2/fd673d979185f5dcbac4be7d09461cbb99751554ffb6718d0013af8604cb/tokenizers-0.21.4-cp39-abi3-win_amd64.whl", hash = "sha256:475d807a5c3eb72c59ad9b5fcdb254f6e17f53dfcbb9903233b0dfa9c943b597", size = 2507568, upload-time = "2025-07-28T15:48:55.456Z" }, ] +[[package]] +name = "tornado" +version = "6.5.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/09/ce/1eb500eae19f4648281bb2186927bb062d2438c2e5093d1360391afd2f90/tornado-6.5.2.tar.gz", hash = "sha256:ab53c8f9a0fa351e2c0741284e06c7a45da86afb544133201c5cc8578eb076a0", size = 510821, upload-time = "2025-08-08T18:27:00.78Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f6/48/6a7529df2c9cc12efd2e8f5dd219516184d703b34c06786809670df5b3bd/tornado-6.5.2-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:2436822940d37cde62771cff8774f4f00b3c8024fe482e16ca8387b8a2724db6", size = 442563, upload-time = "2025-08-08T18:26:42.945Z" }, + { url = "https://files.pythonhosted.org/packages/f2/b5/9b575a0ed3e50b00c40b08cbce82eb618229091d09f6d14bce80fc01cb0b/tornado-6.5.2-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:583a52c7aa94ee046854ba81d9ebb6c81ec0fd30386d96f7640c96dad45a03ef", size = 440729, upload-time = "2025-08-08T18:26:44.473Z" }, + { url = "https://files.pythonhosted.org/packages/1b/4e/619174f52b120efcf23633c817fd3fed867c30bff785e2cd5a53a70e483c/tornado-6.5.2-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b0fe179f28d597deab2842b86ed4060deec7388f1fd9c1b4a41adf8af058907e", size = 444295, upload-time = "2025-08-08T18:26:46.021Z" }, + { url = "https://files.pythonhosted.org/packages/95/fa/87b41709552bbd393c85dd18e4e3499dcd8983f66e7972926db8d96aa065/tornado-6.5.2-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b186e85d1e3536d69583d2298423744740986018e393d0321df7340e71898882", size = 443644, upload-time = "2025-08-08T18:26:47.625Z" }, + { url = "https://files.pythonhosted.org/packages/f9/41/fb15f06e33d7430ca89420283a8762a4e6b8025b800ea51796ab5e6d9559/tornado-6.5.2-cp39-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e792706668c87709709c18b353da1f7662317b563ff69f00bab83595940c7108", size = 443878, upload-time = "2025-08-08T18:26:50.599Z" }, + { url = "https://files.pythonhosted.org/packages/11/92/fe6d57da897776ad2e01e279170ea8ae726755b045fe5ac73b75357a5a3f/tornado-6.5.2-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:06ceb1300fd70cb20e43b1ad8aaee0266e69e7ced38fa910ad2e03285009ce7c", size = 444549, upload-time = "2025-08-08T18:26:51.864Z" }, + { url = "https://files.pythonhosted.org/packages/9b/02/c8f4f6c9204526daf3d760f4aa555a7a33ad0e60843eac025ccfd6ff4a93/tornado-6.5.2-cp39-abi3-musllinux_1_2_i686.whl", hash = "sha256:74db443e0f5251be86cbf37929f84d8c20c27a355dd452a5cfa2aada0d001ec4", size = 443973, upload-time = "2025-08-08T18:26:53.625Z" }, + { url = "https://files.pythonhosted.org/packages/ae/2d/f5f5707b655ce2317190183868cd0f6822a1121b4baeae509ceb9590d0bd/tornado-6.5.2-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:b5e735ab2889d7ed33b32a459cac490eda71a1ba6857b0118de476ab6c366c04", size = 443954, upload-time = "2025-08-08T18:26:55.072Z" }, + { url = "https://files.pythonhosted.org/packages/e8/59/593bd0f40f7355806bf6573b47b8c22f8e1374c9b6fd03114bd6b7a3dcfd/tornado-6.5.2-cp39-abi3-win32.whl", hash = "sha256:c6f29e94d9b37a95013bb669616352ddb82e3bfe8326fccee50583caebc8a5f0", size = 445023, upload-time = "2025-08-08T18:26:56.677Z" }, + { url = "https://files.pythonhosted.org/packages/c7/2a/f609b420c2f564a748a2d80ebfb2ee02a73ca80223af712fca591386cafb/tornado-6.5.2-cp39-abi3-win_amd64.whl", hash = "sha256:e56a5af51cc30dd2cae649429af65ca2f6571da29504a07995175df14c18f35f", size = 445427, upload-time = "2025-08-08T18:26:57.91Z" }, + { url = "https://files.pythonhosted.org/packages/5e/4f/e1f65e8f8c76d73658b33d33b81eed4322fb5085350e4328d5c956f0c8f9/tornado-6.5.2-cp39-abi3-win_arm64.whl", hash = "sha256:d6c33dc3672e3a1f3618eb63b7ef4683a7688e7b9e6e8f0d9aa5726360a004af", size = 444456, upload-time = "2025-08-08T18:26:59.207Z" }, +] + [[package]] name = "tqdm" version = "4.67.1"