diff --git a/backend/app/alembic/versions/219033c644de_add_llm_im_jobs_table.py b/backend/app/alembic/versions/219033c644de_add_llm_im_jobs_table.py new file mode 100644 index 00000000..056797f7 --- /dev/null +++ b/backend/app/alembic/versions/219033c644de_add_llm_im_jobs_table.py @@ -0,0 +1,24 @@ +"""Add LLM in jobs table + +Revision ID: 219033c644de +Revises: e7c68e43ce6f +Create Date: 2025-10-17 15:38:33.565674 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "219033c644de" +down_revision = "e7c68e43ce6f" +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute("ALTER TYPE jobtype ADD VALUE IF NOT EXISTS 'LLM_API'") + + +def downgrade(): + pass diff --git a/backend/app/api/main.py b/backend/app/api/main.py index 01892168..62d5db5b 100644 --- a/backend/app/api/main.py +++ b/backend/app/api/main.py @@ -7,6 +7,7 @@ documents, doc_transformation_job, login, + llm, organization, openai_conversation, project, @@ -31,6 +32,7 @@ api_router.include_router(credentials.router) api_router.include_router(documents.router) api_router.include_router(doc_transformation_job.router) +api_router.include_router(llm.router) api_router.include_router(login.router) api_router.include_router(onboarding.router) api_router.include_router(openai_conversation.router) diff --git a/backend/app/api/routes/llm.py b/backend/app/api/routes/llm.py new file mode 100644 index 00000000..678d2500 --- /dev/null +++ b/backend/app/api/routes/llm.py @@ -0,0 +1,36 @@ +import logging + +from fastapi import APIRouter + +from app.api.deps import AuthContextDep, SessionDep +from app.models import LLMCallRequest, Message +from app.services.llm.jobs import start_job +from app.utils import APIResponse + + +logger = logging.getLogger(__name__) +router = APIRouter(tags=["LLM"]) + + +@router.post("/llm/call", response_model=APIResponse[Message]) +async def llm_call( + _current_user: AuthContextDep, _session: SessionDep, request: LLMCallRequest +): + """ + Endpoint to initiate an LLM call as a background job. + """ + project_id = _current_user.project.id + organization_id = _current_user.organization.id + + start_job( + db=_session, + request=request, + project_id=project_id, + organization_id=organization_id, + ) + + return APIResponse.success_response( + data=Message( + message=f"Your response is being generated and will be delivered via callback." + ), + ) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 15b61428..791da4ba 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -48,6 +48,11 @@ from .job import Job, JobType, JobStatus, JobUpdate +from .llm import ( + LLMCallRequest, + LLMCallResponse, +) + from .message import Message from .model_evaluation import ( ModelEvaluation, diff --git a/backend/app/models/job.py b/backend/app/models/job.py index 4ddbd3b3..62851f5f 100644 --- a/backend/app/models/job.py +++ b/backend/app/models/job.py @@ -15,6 +15,7 @@ class JobStatus(str, Enum): class JobType(str, Enum): RESPONSE = "RESPONSE" + LLM_API = "LLM_API" class Job(SQLModel, table=True): diff --git a/backend/app/models/llm/__init__.py b/backend/app/models/llm/__init__.py new file mode 100644 index 00000000..6a7a454f --- /dev/null +++ b/backend/app/models/llm/__init__.py @@ -0,0 +1,2 @@ +from app.models.llm.request import LLMCallRequest, CompletionConfig, QueryParams +from app.models.llm.response import LLMCallResponse diff --git a/backend/app/models/llm/request.py b/backend/app/models/llm/request.py new file mode 100644 index 00000000..cad87d57 --- /dev/null +++ b/backend/app/models/llm/request.py @@ -0,0 +1,48 @@ +from typing import Any, Literal + +from sqlmodel import Field, SQLModel + + +# Query Parameters (dynamic per request) +class QueryParams(SQLModel): + """Query-specific parameters for each LLM call.""" + + input: str = Field(..., min_length=1, description="User input text/prompt") + conversation_id: str | None = Field( + default=None, + description="Optional conversation ID. If not provided, a new conversation will be created.", + ) + + +class CompletionConfig(SQLModel): + """Completion configuration with provider and parameters.""" + + provider: Literal["openai"] = Field( + default="openai", description="LLM provider to use" + ) + params: dict[str, Any] = Field( + ..., description="Provider-specific parameters (schema varies by provider)" + ) + + +class LLMCallConfig(SQLModel): + """Complete configuration for LLM call including all processing stages.""" + + completion: CompletionConfig = Field(..., description="Completion configuration") + # Future additions: + # classifier: ClassifierConfig | None = None + # pre_filter: PreFilterConfig | None = None + + +class LLMCallRequest(SQLModel): + """User-facing API request for LLM completion.""" + + query: QueryParams = Field(..., description="Query-specific parameters") + config: LLMCallConfig = Field(..., description="Configuration for the LLM call") + callback_url: str | None = Field( + default=None, description="Webhook URL for async response delivery" + ) + include_provider_response: bool = Field( + default=False, + description="Whether to include the raw LLM provider response in the output", + ) diff --git a/backend/app/models/llm/response.py b/backend/app/models/llm/response.py new file mode 100644 index 00000000..969fe8c5 --- /dev/null +++ b/backend/app/models/llm/response.py @@ -0,0 +1,23 @@ +"""LLM response models. + +This module contains response models for LLM API calls. +""" +from sqlmodel import SQLModel, Field + + +class Diagnostics(SQLModel): + input_tokens: int + output_tokens: int + total_tokens: int + model: str + provider: str + + +class LLMCallResponse(SQLModel): + id: str = Field(..., description="Unique id provided by the LLM provider.") + conversation_id: str | None = None + output: str + usage: Diagnostics + llm_response: dict | None = Field( + default=None, description="Raw Response from LLM provider." + ) diff --git a/backend/app/services/llm/__init__.py b/backend/app/services/llm/__init__.py new file mode 100644 index 00000000..2b376a86 --- /dev/null +++ b/backend/app/services/llm/__init__.py @@ -0,0 +1,10 @@ +# Providers +from app.services.llm.providers import ( + BaseProvider, + OpenAIProvider, +) +from app.services.llm.providers import ( + PROVIDER_REGISTRY, + get_llm_provider, + get_supported_providers, +) diff --git a/backend/app/services/llm/jobs.py b/backend/app/services/llm/jobs.py new file mode 100644 index 00000000..daaab2e9 --- /dev/null +++ b/backend/app/services/llm/jobs.py @@ -0,0 +1,144 @@ +import logging +from uuid import UUID + +from asgi_correlation_id import correlation_id +from fastapi import HTTPException +from sqlmodel import Session + +from app.core.db import engine +from app.crud.jobs import JobCrud +from app.models import JobStatus, JobType, JobUpdate, LLMCallRequest, LLMCallResponse +from app.utils import APIResponse, send_callback +from app.celery.utils import start_high_priority_job +from app.services.llm.providers.registry import get_llm_provider + + +logger = logging.getLogger(__name__) + + +def start_job( + db: Session, request: LLMCallRequest, project_id: int, organization_id: int +) -> UUID: + """Create an LLM job and schedule Celery task.""" + trace_id = correlation_id.get() or "N/A" + job_crud = JobCrud(session=db) + job = job_crud.create(job_type=JobType.LLM_API, trace_id=trace_id) + + try: + task_id = start_high_priority_job( + function_path="app.services.llm.jobs.execute_job", + project_id=project_id, + job_id=str(job.id), + trace_id=trace_id, + request_data=request.model_dump(), + organization_id=organization_id, + ) + except Exception as e: + logger.error( + f"[start_job] Error starting Celery task: {str(e)} | job_id={job.id}, project_id={project_id}", + exc_info=True, + ) + job_update = JobUpdate(status=JobStatus.FAILED, error_message=str(e)) + job_crud.update(job_id=job.id, job_update=job_update) + raise HTTPException( + status_code=500, detail="Internal server error while executing LLM call" + ) + + logger.info( + f"[start_job] Job scheduled for LLM call | job_id={job.id}, project_id={project_id}, task_id={task_id}" + ) + return job.id + + +def handle_job_error(job_id: UUID, callback_url: str | None, error: str): + """Handle job failure uniformly callback, and DB update.""" + with Session(engine) as session: + job_crud = JobCrud(session=session) + + callback = APIResponse.failure_response(error=error) + if callback_url: + send_callback( + callback_url=callback_url, + data=callback.model_dump(), + ) + + job_crud.update( + job_id=job_id, + job_update=JobUpdate(status=JobStatus.FAILED, error_message=error), + ) + + return callback.model_dump() + + +def execute_job( + request_data: dict, + project_id: int, + organization_id: int, + job_id: str, + task_id: str, + task_instance, +) -> LLMCallResponse | None: + """Celery task to process an LLM request asynchronously.""" + + request = LLMCallRequest(**request_data) + job_id: UUID = UUID(job_id) + + config = request.config + provider = config.completion.provider + + logger.info( + f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, " + f"provider={provider}" + ) + + try: + # Update job status to PROCESSING + with Session(engine) as session: + job_crud = JobCrud(session=session) + job_crud.update( + job_id=job_id, job_update=JobUpdate(status=JobStatus.PROCESSING) + ) + + provider_instance = get_llm_provider( + session=session, + provider_type=provider, + project_id=project_id, + organization_id=organization_id, + ) + + response, error = provider_instance.execute( + completion_config=config.completion, + query=request.query, + include_provider_response=request.include_provider_response, + ) + + if response: + callback = APIResponse.success_response(data=response) + send_callback( + callback_url=request.callback_url, + data=callback.model_dump(), + ) + + with Session(engine) as session: + job_crud = JobCrud(session=session) + + job_crud.update( + job_id=job_id, job_update=JobUpdate(status=JobStatus.SUCCESS) + ) + logger.info( + f"[execute_job] Successfully completed LLM job | job_id={job_id}, " + f"response_id={response.id}, tokens={response.usage.total_tokens}" + ) + return callback.model_dump() + + return handle_job_error( + job_id, request.callback_url, error=error or "Unknown error occurred" + ) + + except Exception as e: + error = f"Unexpected error in LLM job execution: {str(e)}" + logger.error( + f"[execute_job] {error} | job_id={job_id}, task_id={task_id}", + exc_info=True, + ) + return handle_job_error(job_id, request.callback_url, error=error) diff --git a/backend/app/services/llm/providers/__init__.py b/backend/app/services/llm/providers/__init__.py new file mode 100644 index 00000000..8d31b3a7 --- /dev/null +++ b/backend/app/services/llm/providers/__init__.py @@ -0,0 +1,7 @@ +from app.services.llm.providers.base import BaseProvider +from app.services.llm.providers.openai import OpenAIProvider +from app.services.llm.providers.registry import ( + PROVIDER_REGISTRY, + get_llm_provider, + get_supported_providers, +) diff --git a/backend/app/services/llm/providers/base.py b/backend/app/services/llm/providers/base.py new file mode 100644 index 00000000..b9895a4e --- /dev/null +++ b/backend/app/services/llm/providers/base.py @@ -0,0 +1,63 @@ +"""Base provider interface for LLM providers. + +This module defines the abstract base class that all LLM providers must implement. +It provides a provider-agnostic interface for executing LLM calls. +""" + +from abc import ABC, abstractmethod +from typing import Any + +from app.models.llm import CompletionConfig, LLMCallResponse, QueryParams + + +class BaseProvider(ABC): + """Abstract base class for LLM providers. + + All provider implementations (OpenAI, Anthropic, etc.) must inherit from + this class and implement the required methods. + + Providers directly pass user configuration to their respective APIs. + User is responsible for providing valid provider-specific parameters. + + Attributes: + client: The provider-specific client instance + """ + + def __init__(self, client: Any): + """Initialize provider with client. + + Args: + client: Provider-specific client instance + """ + self.client = client + + @abstractmethod + def execute( + self, + completion_config: CompletionConfig, + query: QueryParams, + include_provider_response: bool = False, + ) -> tuple[LLMCallResponse | None, str | None]: + """Execute LLM API call. + + Directly passes the user's config params to provider API along with input. + + Args: + completion_config: LLM completion configuration + query: Query parameters including input and conversation_id + include_provider_response: Whether to include the raw LLM provider response in the output + + Returns: + Tuple of (response, error_message) + - If successful: (LLMCallResponse, None) + - If failed: (None, error_message) + """ + raise NotImplementedError("Providers must implement execute method") + + def get_provider_name(self) -> str: + """Get the name of the provider. + + Returns: + Provider name (e.g., "openai", "anthropic", "google") + """ + return self.__class__.__name__.replace("Provider", "").lower() diff --git a/backend/app/services/llm/providers/openai.py b/backend/app/services/llm/providers/openai.py new file mode 100644 index 00000000..6bace6cf --- /dev/null +++ b/backend/app/services/llm/providers/openai.py @@ -0,0 +1,87 @@ +import logging + +import openai +from openai import OpenAI +from openai.types.responses.response import Response + +from app.models.llm import ( + CompletionConfig, + LLMCallResponse, + QueryParams, +) +from app.services.llm.providers.base import BaseProvider + + +logger = logging.getLogger(__name__) + + +class OpenAIProvider(BaseProvider): + def __init__(self, client: OpenAI): + """Initialize OpenAI provider with client. + + Args: + client: OpenAI client instance + """ + super().__init__(client) + self.client = client + + def execute( + self, + completion_config: CompletionConfig, + query: QueryParams, + include_provider_response: bool = False, + ) -> tuple[LLMCallResponse | None, str | None]: + response: Response | None = None + error_message: str | None = None + + try: + params = { + **completion_config.params, + } + params["input"] = query.input + + # Add conversation_id if provided + if query.conversation_id: + params["conversation_id"] = query.conversation_id + + response = self.client.responses.create(**params) + + # Build response + llm_response = LLMCallResponse( + id=response.id, + output=response.output_text, + usage={ + "input_tokens": response.usage.input_tokens, + "output_tokens": response.usage.output_tokens, + "total_tokens": response.usage.total_tokens, + "model": response.model, + "provider": "openai", + }, + ) + if include_provider_response: + llm_response.llm_response = response.model_dump() + + logger.info( + f"[OpenAIProvider.execute] Successfully generated response: {response.id}" + ) + return llm_response, None + + except TypeError as e: + # handle unexpected arguments gracefully + error_message = f"Invalid or unexpected parameter in Config: {str(e)}" + return None, error_message + + except openai.OpenAIError as e: + # imported here to avoid circular imports + from app.utils import handle_openai_error + + error_message = handle_openai_error(e) + logger.error( + f"[OpenAIProvider.execute] OpenAI API error: {error_message}", exc_info=True + ) + return None, error_message + + except Exception as e: + error_message = f"Unexpected error: {str(e)}" + logger.error(f"[OpenAIProvider.execute] {error_message}", exc_info=True) + return None, error_message diff --git a/backend/app/services/llm/providers/registry.py b/backend/app/services/llm/providers/registry.py new file mode 100644 index 00000000..3aea7803 --- /dev/null +++ b/backend/app/services/llm/providers/registry.py @@ -0,0 +1,58 @@ +import logging + +from sqlmodel import Session + +from app.services.llm.providers.base import BaseProvider +from app.services.llm.providers.openai import OpenAIProvider + + +logger = logging.getLogger(__name__) + + +# Registry of provider types to their implementation classes +PROVIDER_REGISTRY: dict[str, type[BaseProvider]] = { + "openai": OpenAIProvider, + # Future providers can be added here: + # "anthropic": AnthropicProvider, + # "google": GoogleProvider, +} + + +def get_llm_provider( + session: Session, provider_type: str, project_id: int, organization_id: int +) -> BaseProvider: + # Import here to avoid circular imports + from app.utils import get_openai_client + + provider_class = PROVIDER_REGISTRY.get(provider_type) + + if provider_class is None: + supported = list(PROVIDER_REGISTRY.keys()) + logger.error( + f"[get_llm_provider] Unsupported provider type requested: {provider_type}" + ) + raise ValueError( + f"Provider '{provider_type}' is not supported. " + f"Supported providers: {', '.join(supported)}" + ) + + if provider_type == "openai": + client = get_openai_client( + session=session, org_id=organization_id, project_id=project_id + ) + else: + logger.error( + f"[get_llm_provider] Unsupported provider type requested: {provider_type}" + ) + raise ValueError(f"Provider '{provider_type}' is not supported.") + + return provider_class(client=client) + + +def get_supported_providers() -> list[str]: + """Get list of supported provider types. + + Returns: + List of supported provider type strings + """ + return list(PROVIDER_REGISTRY.keys())