From d77ba8e8dc5f8c53cd74b870dbecd619305f6798 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Sun, 14 Sep 2025 20:12:11 +0530 Subject: [PATCH 01/18] adding github build --- .github/workflows/build.yml | 86 +++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 .github/workflows/build.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..8aaca6a --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,86 @@ +name: Build, Push, and Deploy + +on: + push: + branches: + - dev + - master + - gamma + +jobs: + build-and-push: + name: Build and Push + runs-on: ubuntu-latest + environment: ${{ github.ref == 'refs/heads/master' && 'prod' || (github.ref == 'refs/heads/dev' && 'dev' || 'staging') }} + + permissions: + id-token: write + contents: read + + env: + AWS_REGION: ${{ secrets.AWS_DEFAULT_REGION }} + ECR_REPOSITORY: ${{ secrets.ECR_REPOSITORY }} + IMAGE_TAG: ${{ github.sha }} + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secrets.AWS_ROLE_ARN }} + aws-region: ${{ secrets.AWS_DEFAULT_REGION }} + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v2 + + - name: Build, tag, and push image to Amazon ECR + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + run: | + docker build \ + -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG \ + -t $ECR_REGISTRY/$ECR_REPOSITORY:latest \ + -f build/docker/Dockerfile . + + docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG + docker push $ECR_REGISTRY/$ECR_REPOSITORY:latest + + echo "image=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG" >> $GITHUB_OUTPUT + + update-ecs-service: + name: Update ECS Service + runs-on: ubuntu-latest + needs: build-and-push + environment: ${{ github.ref == 'refs/heads/master' && 'prod' || (github.ref == 'refs/heads/dev' && 'dev' || 'staging') }} + + permissions: + id-token: write + contents: read + + env: + AWS_REGION: ${{ secrets.AWS_DEFAULT_REGION }} + ECS_CLUSTER: ${{ secrets.ECS_NAME }} + ECS_SERVICE: ${{ secrets.ECS_NAME }} + IMAGE: ${{ needs.build-and-push.outputs.image }} + + steps: + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secrets.AWS_ROLE_ARN }} + aws-region: ${{ secrets.AWS_DEFAULT_REGION }} + + - name: Update ECS service + run: | + aws ecs update-service \ + --cluster $ECS_CLUSTER \ + --service $ECS_SERVICE \ + --force-new-deployment \ + --region $AWS_REGION \ + --no-cli-pager \ + > /dev/null 2>&1 + + echo "Started Deployment" From 2758435bae7bde31581318d41b9ecadfe4543aff Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Sun, 14 Sep 2025 22:56:41 +0530 Subject: [PATCH 02/18] fixing docker yml --- .env.example | 25 ++++++------------ Dockerfile | 7 ++++-- docker-compose.yml | 63 +++++++++++----------------------------------- requirements.txt | 6 ++--- 4 files changed, 30 insertions(+), 71 deletions(-) diff --git a/.env.example b/.env.example index 3998047..75522e2 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,4 @@ -PORT=300 +PORT=3000 # Postgres Credentials DATABASE_NAME=druling @@ -9,24 +9,13 @@ DATABASE_PORT=5432 INTERNAL_SECRET=drulinghere -OPENAI_API_KEY="openai key here" -ANTHROPIC_API_KEY="anthropic key here" -GOOGLE_API_KEY="google key here" +OPENAI_API_KEY=key_here +ANTHROPIC_API_KEY=key_here +GOOGLE_API_KEY=key_here -BACKEND_URL=http://localhost:8000/internal/v1 - -# Environment-specific configuration (.env) -# Development -LOG_LEVEL=DEBUG +LOG_LEVEL=INFO LOG_FORMAT=standard -# Production -# LOG_LEVEL=INFO -# LOG_FORMAT=json -# SENTRY_DSN=your_sentry_dsn_here - -# Staging -# LOG_LEVEL=DEBUG -# LOG_FORMAT=json +BACKEND_URL=http://localhost:8000/internal/v1 -SECRET_ENCRYPTION_KEY=secret_key_here +SECRET_ENCRYPTION_KEY=secure_key_here diff --git a/Dockerfile b/Dockerfile index dd329d7..d20f359 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,8 +7,11 @@ WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -# Copy the project files -COPY src/ src/ +# Copy the entire project structure +COPY . . + +# Add the current directory to Python path +ENV PYTHONPATH=/app # Expose the port FastAPI runs on EXPOSE 8000 diff --git a/docker-compose.yml b/docker-compose.yml index dcdb991..0b1963c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,3 +1,5 @@ +version: '3.9' + services: api: build: @@ -5,60 +7,25 @@ services: dockerfile: Dockerfile ports: - "8000:8000" + env_file: + - .env environment: - - DATABASE_URL=postgresql://postgres:postgres@db:5432/cleanfastapi + DATABASE_NAME: druling + DATABASE_USER: druling + DATABASE_PASSWORD: password + DATABASE_HOST: db + DATABASE_PORT: 5432 depends_on: - db volumes: - - ./src:/src/src - command: uvicorn src.main:src --host 0.0.0.0 --port 8000 --reload + - ./src:/app/src db: image: postgres:17 - environment: - - POSTGRES_DB=cleanfastapi - - POSTGRES_USER=postgres - - POSTGRES_PASSWORD=postgres - volumes: - - postgres_data:/var/lib/postgresql/data ports: - "5432:5432" - -volumes: - postgres_data: - -# version: '3.8' -# services: -# app: -# build: . -# ports: -# - "8000:8000" -# environment: -# - LOG_LEVEL=INFO -# - LOG_FORMAT=json -# volumes: -# - ./logs:/app/logs -# depends_on: -# - elasticsearch -# -# elasticsearch: -# image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0 -# environment: -# - discovery.type=single-node -# - xpack.security.enabled=false -# ports: -# - "9200:9200" -# volumes: -# - elasticsearch_data:/usr/share/elasticsearch/data -# -# filebeat: -# image: docker.elastic.co/beats/filebeat:8.11.0 -# user: root -# volumes: -# - ./logs:/app/logs:ro -# - ./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro -# depends_on: -# - elasticsearch -# -# volumes: -# elasticsearch_data: \ No newline at end of file + environment: + POSTGRES_PASSWORD: password + POSTGRES_USER: druling + POSTGRES_PORT: 5432 + POSTGRES_DB: druling diff --git a/requirements.txt b/requirements.txt index 187a84c..2fcdf50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,7 +33,7 @@ frozenlist==1.6.0 fsspec==2025.7.0 functions==0.7.0 generativeai==0.0.1 -google-ai-generativelanguage==0.7.0 +google-ai-generativelanguage==0.6.18 google-api-core==2.25.1 google-api-python-client==2.181.0 google-auth==2.40.3 @@ -60,7 +60,7 @@ jsonpointer==3.0.0 langchain==0.3.25 langchain-anthropic==0.3.12 langchain-community==0.3.23 -langchain-core==0.3.58 +langchain-core==0.3.62 langchain-deepseek==0.1.3 langchain-google-genai==2.1.5 langchain-openai==0.3.16 @@ -118,7 +118,7 @@ stack-data==0.6.3 starlette==0.46.2 tabulate==0.9.0 tenacity==9.1.2 -tiktoken==0.3.3 +tiktoken==0.8.0 tomli==2.2.1 tqdm==4.67.1 traitlets==5.14.3 From db03098b413c29fbb36794b78d2c4bd75e5762cf Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Sun, 14 Sep 2025 23:01:32 +0530 Subject: [PATCH 03/18] Remove gamma --- .github/workflows/build.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8aaca6a..6389e02 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,7 +5,6 @@ on: branches: - dev - master - - gamma jobs: build-and-push: From ba2c563bba3274036a3c50f90885104b84a783af Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Tue, 16 Sep 2025 23:09:26 +0530 Subject: [PATCH 04/18] adding build folder --- .gitignore | 1 - build/docker/Dockerfile | 39 ++++++++++++++++++ build/docker/entrypoint.sh | 51 ++++++++++++++++++++++++ Dockerfile => build/stack/web/Dockerfile | 4 +- docker-compose.yml | 2 +- 5 files changed, 93 insertions(+), 4 deletions(-) create mode 100644 build/docker/Dockerfile create mode 100644 build/docker/entrypoint.sh rename Dockerfile => build/stack/web/Dockerfile (88%) diff --git a/.gitignore b/.gitignore index f9277fd..636801b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,6 @@ __pycache__/ *$py.class *.so .Python -build/ develop-eggs/ dist/ downloads/ diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile new file mode 100644 index 0000000..d33c3a6 --- /dev/null +++ b/build/docker/Dockerfile @@ -0,0 +1,39 @@ +# Use an official Python runtime as a base image +FROM python:3.10-slim + +# Set environment variables +ENV PYTHONDONTWRITEBYTECODE 1 # Prevent Python from writing .pyc files +ENV PYTHONUNBUFFERED 1 # Force the stdout and stderr streams to be unbuffered + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + postgresql-client && \ + apt-get clean && rm -rf /var/lib/apt/lists/* + +# Set the working directory in the container +WORKDIR /app + +# Ensure required directories exist +RUN mkdir -p /app/static /app/media + +# Copy the requirements file to the container +COPY ../../requirements.txt /app/ + +# Install dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application code to the container +COPY ../../ /app/ + +# Copy the entrypoint configuration file +COPY /build/docker/entrypoint.sh /app/entrypoint.sh + +# Expose port 8000 for the Django server +EXPOSE 8000 + +# Make the entrypoint script executable +RUN chmod +x /app/entrypoint.sh + +# Set the entrypoint script +ENTRYPOINT ["/app/entrypoint.sh"] diff --git a/build/docker/entrypoint.sh b/build/docker/entrypoint.sh new file mode 100644 index 0000000..70c4070 --- /dev/null +++ b/build/docker/entrypoint.sh @@ -0,0 +1,51 @@ +#!/bin/bash +set -e + +# Function to gracefully shut down Gunicorn +shutdown() { + echo "Shutdown signal received. Stopping Gunicorn gracefully..." + + # Gracefully stop Gunicorn workers (stops accepting new requests) + kill -SIGTERM $GUNICORN_PID + + # Allow some time for in-progress requests to complete + echo "Waiting for ongoing requests to finish..." + sleep 10 # Adjust time as needed + + # Force kill Gunicorn if it's still running after the grace period + kill -9 $GUNICORN_PID 2>/dev/null || true + + echo "Shutdown complete." + exit 0 +} + +# Trap SIGTERM and SIGINT signals (container stop or AWS shutdown notice) +trap shutdown SIGTERM SIGINT + +# Apply database migrations +echo "Applying database migrations..." +python manage.py migrate + +# Set up email templates with the --force option +echo "Setting up email templates..." +python manage.py setup_email_templates --force + +# Start Gunicorn in the background with graceful timeout +echo "Starting Gunicorn..." +gunicorn --bind 0.0.0.0:8000 setup.wsgi:application --workers 4 --timeout 90 & + +# Store the PID of the Gunicorn process +GUNICORN_PID=$! + +# Poll AWS metadata service for spot termination notice +while true; do + TERMINATION_INFO=$(curl -s http://169.254.169.254/latest/meta-data/spot/instance-action || true) + if [ -n "$TERMINATION_INFO" ]; then + echo "AWS Spot termination notice detected! Gracefully shutting down..." + shutdown + fi + sleep 5 +done + +# Wait for Gunicorn to exit +wait $GUNICORN_PID diff --git a/Dockerfile b/build/stack/web/Dockerfile similarity index 88% rename from Dockerfile rename to build/stack/web/Dockerfile index d20f359..76668d0 100644 --- a/Dockerfile +++ b/build/stack/web/Dockerfile @@ -4,11 +4,11 @@ FROM python:3.11-slim WORKDIR /app # Install dependencies -COPY requirements.txt . +COPY ../../../requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy the entire project structure -COPY . . +COPY ../../.. . # Add the current directory to Python path ENV PYTHONPATH=/app diff --git a/docker-compose.yml b/docker-compose.yml index 0b1963c..52319d1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,7 +4,7 @@ services: api: build: context: . - dockerfile: Dockerfile + dockerfile: build/stack/web/Dockerfile ports: - "8000:8000" env_file: From 56d4e45762f0d612ebe2dd8f6765eccb1bca732a Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Tue, 16 Sep 2025 23:29:53 +0530 Subject: [PATCH 05/18] adding gracefful shutdown --- build/docker/Dockerfile | 29 ++++++++----------- build/docker/entrypoint.sh | 57 ++++++++++++-------------------------- src/main.py | 27 ++++++++++++++++++ 3 files changed, 56 insertions(+), 57 deletions(-) diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile index d33c3a6..83ce3b3 100644 --- a/build/docker/Dockerfile +++ b/build/docker/Dockerfile @@ -11,29 +11,24 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ postgresql-client && \ apt-get clean && rm -rf /var/lib/apt/lists/* -# Set the working directory in the container WORKDIR /app -# Ensure required directories exist -RUN mkdir -p /app/static /app/media - -# Copy the requirements file to the container -COPY ../../requirements.txt /app/ - # Install dependencies +COPY ../../../requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -# Copy the rest of the application code to the container -COPY ../../ /app/ +# Copy the entire project structure +COPY ../../.. . -# Copy the entrypoint configuration file -COPY /build/docker/entrypoint.sh /app/entrypoint.sh +# Copy and make entrypoint script executable +COPY build/docker/entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh -# Expose port 8000 for the Django server -EXPOSE 8000 +# Add the current directory to Python path +ENV PYTHONPATH=/app -# Make the entrypoint script executable -RUN chmod +x /app/entrypoint.sh +# Expose the port FastAPI runs on +EXPOSE 8000 -# Set the entrypoint script -ENTRYPOINT ["/app/entrypoint.sh"] +# Use entrypoint script for proper signal handling +ENTRYPOINT ["/entrypoint.sh"] diff --git a/build/docker/entrypoint.sh b/build/docker/entrypoint.sh index 70c4070..c96daf7 100644 --- a/build/docker/entrypoint.sh +++ b/build/docker/entrypoint.sh @@ -1,51 +1,28 @@ #!/bin/bash + +# Enable exit on error set -e -# Function to gracefully shut down Gunicorn +# Function to handle shutdown signals shutdown() { - echo "Shutdown signal received. Stopping Gunicorn gracefully..." - - # Gracefully stop Gunicorn workers (stops accepting new requests) - kill -SIGTERM $GUNICORN_PID - - # Allow some time for in-progress requests to complete - echo "Waiting for ongoing requests to finish..." - sleep 10 # Adjust time as needed - - # Force kill Gunicorn if it's still running after the grace period - kill -9 $GUNICORN_PID 2>/dev/null || true - - echo "Shutdown complete." + echo "Received shutdown signal, gracefully stopping uvicorn..." + if [ ! -z "$UVICORN_PID" ]; then + kill -TERM "$UVICORN_PID" + wait "$UVICORN_PID" + fi + echo "Application stopped gracefully" exit 0 } -# Trap SIGTERM and SIGINT signals (container stop or AWS shutdown notice) +# Trap SIGTERM and SIGINT signals trap shutdown SIGTERM SIGINT -# Apply database migrations -echo "Applying database migrations..." -python manage.py migrate - -# Set up email templates with the --force option -echo "Setting up email templates..." -python manage.py setup_email_templates --force - -# Start Gunicorn in the background with graceful timeout -echo "Starting Gunicorn..." -gunicorn --bind 0.0.0.0:8000 setup.wsgi:application --workers 4 --timeout 90 & +# Start the FastAPI application +echo "Starting FastAPI application..." +uvicorn src.main:app --host 0.0.0.0 --port 8000 & -# Store the PID of the Gunicorn process -GUNICORN_PID=$! - -# Poll AWS metadata service for spot termination notice -while true; do - TERMINATION_INFO=$(curl -s http://169.254.169.254/latest/meta-data/spot/instance-action || true) - if [ -n "$TERMINATION_INFO" ]; then - echo "AWS Spot termination notice detected! Gracefully shutting down..." - shutdown - fi - sleep 5 -done +# Store the PID of uvicorn process +UVICORN_PID=$! -# Wait for Gunicorn to exit -wait $GUNICORN_PID +# Wait for the process to complete +wait "$UVICORN_PID" diff --git a/src/main.py b/src/main.py index 08706bf..98b906b 100644 --- a/src/main.py +++ b/src/main.py @@ -1,4 +1,6 @@ import logging +import signal +from contextlib import asynccontextmanager from dotenv import load_dotenv from fastapi import FastAPI, HTTPException @@ -13,10 +15,35 @@ setup_logging() logger = logging.getLogger("ai_server") +# Global shutdown flag +shutdown_event = False + + +def signal_handler(signum, frame): + global shutdown_event + logger.info(f"Received signal {signum}, initiating graceful shutdown...") + shutdown_event = True + + +# Register signal handlers for graceful shutdown +signal.signal(signal.SIGTERM, signal_handler) +signal.signal(signal.SIGINT, signal_handler) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + logger.info("FastAPI application starting up...") + yield + # Shutdown + logger.info("FastAPI application shutting down...") + + app = FastAPI( title="MongoDB FastAPI Demo", description="A sample FastAPI application with MongoDB", version="1.0.0", + lifespan=lifespan, ) register_routes(app) From afdd6add5c0cdfe4172aea356a9b4ba00d74e741 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Thu, 18 Sep 2025 01:32:34 +0530 Subject: [PATCH 06/18] adding default env value --- src/setup/config/config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/setup/config/config.py b/src/setup/config/config.py index 76992e9..09277e6 100644 --- a/src/setup/config/config.py +++ b/src/setup/config/config.py @@ -2,7 +2,7 @@ class Config(BaseSettings): - app_name: str = "My FastAPI App" + app_name: str = "Druling AI Server" debug: bool = False port: int = 8000 @@ -14,8 +14,8 @@ class Config(BaseSettings): internal_secret: str = "" - log_level: str = "" - log_format: str = "" + log_level: str = "INFO" + log_format: str = "standard" openai_api_key: str = "" anthropic_api_key: str = "" From 18924a2a7fb1b897fecc99cfda3e5ad5c2f974cd Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Sun, 28 Sep 2025 20:12:03 +0530 Subject: [PATCH 07/18] integration credit api from backen --- src/clients/backend/dtos/request/credit.py | 6 ++++ src/clients/backend/services/credit.py | 33 ++++++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 src/clients/backend/dtos/request/credit.py create mode 100644 src/clients/backend/services/credit.py diff --git a/src/clients/backend/dtos/request/credit.py b/src/clients/backend/dtos/request/credit.py new file mode 100644 index 0000000..cfb1b01 --- /dev/null +++ b/src/clients/backend/dtos/request/credit.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + + +class CreditData(BaseModel): + profile_id: str + amount: int diff --git a/src/clients/backend/services/credit.py b/src/clients/backend/services/credit.py new file mode 100644 index 0000000..6b6a06a --- /dev/null +++ b/src/clients/backend/services/credit.py @@ -0,0 +1,33 @@ +import logging + +from src.clients.backend.client import BackendClient +from src.clients.backend.dtos.request.credit import CreditData +from src.clients.backend.dtos.response.base import BaseResponse + +logger = logging.getLogger(__name__) + + +class Credit(BackendClient): + def __init__(self): + super().__init__() + self.base = "/credit" + + def increment(self, profile_id, amount) -> BaseResponse: + """ + Send a query to the AI server and return the response. + """ + request = CreditData(profile_id=profile_id, amount=amount) + response = self._make_request( + f"{self.base}/increment/", method="POST", data=request.model_dump() + ) + return self._process_response(response, BaseResponse) + + def decrement(self, profile_id, amount) -> BaseResponse: + """ + Send a query to the AI server and return the response. + """ + request = CreditData(profile_id=profile_id, amount=amount) + response = self._make_request( + f"{self.base}/decrement/", method="POST", data=request.model_dump() + ) + return self._process_response(response, BaseResponse) From f679bdbbab6e8727b61cb0d698e162f1d7b55bbc Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Sun, 28 Sep 2025 21:15:01 +0530 Subject: [PATCH 08/18] adding credit cost inc dec --- .../asset_conversation/schemas/response.py | 2 + src/app/conversation/schemas/response.py | 3 +- src/main.py | 8 +- src/services/asset_operation/service.py | 20 ++++- src/services/asset_operation/types/image.py | 1 - src/services/asset_operation/types/pdf.py | 4 +- src/services/asset_operation/types/video.py | 11 +-- src/services/asset_operation/utils.py | 10 ++- src/services/audio_operation/__init__.py | 0 src/services/helper/credit.py | 30 +++++++ src/services/llm_service/factory.py | 9 +- .../llm_service/providers/anthropic.py | 6 +- src/services/llm_service/providers/azure.py | 6 +- .../llm_service/providers/deepseek.py | 7 +- src/services/llm_service/providers/google.py | 76 ++++++++++------ src/services/llm_service/providers/openai.py | 6 +- src/services/llm_service/providers/xai.py | 6 +- src/services/llm_service/service.py | 87 ++++++++++++------- src/services/llm_service/utils.py | 5 +- 19 files changed, 200 insertions(+), 97 deletions(-) delete mode 100644 src/services/audio_operation/__init__.py create mode 100644 src/services/helper/credit.py diff --git a/src/app/asset_conversation/schemas/response.py b/src/app/asset_conversation/schemas/response.py index 98fcba1..6c4fff0 100644 --- a/src/app/asset_conversation/schemas/response.py +++ b/src/app/asset_conversation/schemas/response.py @@ -9,6 +9,7 @@ class AssetChatResponse(BaseModel): tokens_used: Optional[int] = None confidence_score: Optional[float] = None metadata: Optional[Dict[str, Any]] = None + credit_used: Optional[int] = None class FileProcessingResponse(BaseModel): @@ -17,3 +18,4 @@ class FileProcessingResponse(BaseModel): extracted_text: Optional[str] = None extracted_data: Optional[Dict[str, Any]] = None processing_status: Optional[str] + credit_used: Optional[int] = None diff --git a/src/app/conversation/schemas/response.py b/src/app/conversation/schemas/response.py index 773a897..c123a6a 100644 --- a/src/app/conversation/schemas/response.py +++ b/src/app/conversation/schemas/response.py @@ -10,4 +10,5 @@ class ChatResponse(BaseModel): mcp_calls: Optional[List[Dict[str, Any]]] = None confidence_score: Optional[float] = None metadata: Optional[Dict[str, Any]] = None - structure_response: Optional[Dict[str, Any]] = None \ No newline at end of file + structure_response: Optional[Dict[str, Any]] = None + credit_used: Optional[int] = None diff --git a/src/main.py b/src/main.py index 98b906b..955e4b8 100644 --- a/src/main.py +++ b/src/main.py @@ -33,15 +33,15 @@ def signal_handler(signum, frame): @asynccontextmanager async def lifespan(app: FastAPI): # Startup - logger.info("FastAPI application starting up...") + logger.info("Druling AI Server application starting up...") yield # Shutdown - logger.info("FastAPI application shutting down...") + logger.info("Druling AI Server application shutting down...") app = FastAPI( - title="MongoDB FastAPI Demo", - description="A sample FastAPI application with MongoDB", + title="Druling AI Server", + description="Druling AI Server", version="1.0.0", lifespan=lifespan, ) diff --git a/src/services/asset_operation/service.py b/src/services/asset_operation/service.py index 4c78ff6..9411ab6 100644 --- a/src/services/asset_operation/service.py +++ b/src/services/asset_operation/service.py @@ -9,6 +9,7 @@ from src.app.conversation.schemas import ChatRequest from src.services.asset_operation.utils import analyze_response, generate_response from src.services.helper import FileSave +from src.services.helper.credit import CreditHelper from src.services.llm_service import ProviderFactory logger = logging.getLogger(__name__) @@ -28,6 +29,7 @@ def __init__(self, self.content = None self.request = request + self.cost = 0 async def download_content(self): """Download file from URL (S3 or direct URL).""" @@ -45,14 +47,17 @@ async def _download_content(self, url: str) -> Optional[bytes]: return None async def llm_provider(self): - return await ProviderFactory.get_llm_provider(self.db, request=ChatRequest( + llm_provider = await ProviderFactory.get_llm_provider(self.db, request=ChatRequest( provider=self.provider, model=self.model, query="", )) + self.cost = llm_provider.cost + return llm_provider async def llm_call(self, query, system_prompt=None): llm_provider = await self.llm_provider() + await self.consumer_credit(None) messages = [] if system_prompt: messages.append(SystemMessage(content=system_prompt)) @@ -74,16 +79,25 @@ async def generate(self, params=None): """ raise NotImplementedError("Subclasses must implement generate method.") + async def consumer_credit(self, profile_id): + credit_helper = CreditHelper() + await credit_helper.increment(profile_id, self.cost) + + async def revert_credit(self, profile_id): + credit_helper = CreditHelper() + await credit_helper.decrement(profile_id, self.cost) + self.cost = 0 + def analyze_response(self, response, metadata=None): """ Process the LLM response and return structured data. This method should be implemented by subclasses. """ - return analyze_response(self.request.file_type, self.model, response, metadata=metadata) + return analyze_response(self.request.file_type, self.model, response, self.cost, metadata=metadata) def generate_response(self, response, metadata=None): """ Process the LLM response and return structured data. This method should be implemented by subclasses. """ - return generate_response(self.request.file_type, self.model, response, metadata=metadata) + return generate_response(self.request.file_type, self.model, response, self.cost, metadata=metadata) diff --git a/src/services/asset_operation/types/image.py b/src/services/asset_operation/types/image.py index d2a0e09..e0651d8 100644 --- a/src/services/asset_operation/types/image.py +++ b/src/services/asset_operation/types/image.py @@ -4,7 +4,6 @@ import uuid from typing import Dict, Any, Optional -import httpx from PIL import Image from sqlalchemy.ext.asyncio import AsyncSession diff --git a/src/services/asset_operation/types/pdf.py b/src/services/asset_operation/types/pdf.py index 767d9a9..e383744 100644 --- a/src/services/asset_operation/types/pdf.py +++ b/src/services/asset_operation/types/pdf.py @@ -5,7 +5,7 @@ import aiofiles from langchain_community.document_loaders import PyPDFLoader from langchain_text_splitters import RecursiveCharacterTextSplitter -from pypdf import PdfReader, PdfWriter +from pypdf import PdfReader from pdfrw import PdfReader as PdfrwReader, PdfWriter as PdfrwWriter from sqlalchemy.ext.asyncio import AsyncSession @@ -63,6 +63,7 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: return self.analyze_response(response, metadata=metadata) except Exception as e: + await self.revert_credit(None) logger.error(f"Error analyzing PDF: {str(e)}", exc_info=True) raise Exception(f"Error analyzing PDF: {str(e)}") finally: @@ -143,6 +144,7 @@ async def fill(self): ) except Exception as e: + await self.revert_credit(None) logger.error(f"Error filling PDF form: {str(e)}", exc_info=True) raise Exception(f"Error filling PDF form: {str(e)}") finally: diff --git a/src/services/asset_operation/types/video.py b/src/services/asset_operation/types/video.py index 26b86ef..024ded9 100644 --- a/src/services/asset_operation/types/video.py +++ b/src/services/asset_operation/types/video.py @@ -25,15 +25,11 @@ class VideoService(AssetOperationService): def __init__(self, db: AsyncSession, request, **kwargs): super().__init__(db, request, **kwargs) - self.llm = ChatGoogleGenerativeAI(model=self.model) async def llm_call(self, query, system_prompt=None): - messages = [] - if system_prompt: - messages.append(SystemMessage(content=system_prompt)) - messages.append(HumanMessage(content=query)) - - return await self.llm.ainvoke(messages) + llm_provider = await self.llm_provider() + self.cost = llm_provider.cost + return await llm_provider.analyze_video(query, system_prompt) async def analyze(self, **kwargs) -> Dict[str, Any]: """Analyze video using vision-capable models.""" @@ -53,6 +49,5 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: "mime_type": "video/mp4", }, ] - response = await self.llm_call(messages) return self.analyze_response(response) diff --git a/src/services/asset_operation/utils.py b/src/services/asset_operation/utils.py index a4a193c..afbdf15 100644 --- a/src/services/asset_operation/utils.py +++ b/src/services/asset_operation/utils.py @@ -3,20 +3,22 @@ from src.app.asset_conversation.schemas import AssetChatRequest, AssetChatResponse -def analyze_response(query_type, model, response, metadata=None): +def analyze_response(query_type, model, response, credit, metadata=None): return AssetChatResponse( response=response.content if hasattr(response, 'content') else str(response), model_used=model, query_type=query_type, - metadata=metadata + metadata=metadata, + credit_used=credit ) -def generate_response(query_type, model, response, metadata=None): +def generate_response(query_type, model, response, cost, metadata=None): return AssetChatResponse( response=response.content if hasattr(response, 'content') else str(response), model_used=model, query_type=query_type, - metadata=metadata + metadata=metadata, + credit_used=cost ) def clean_up(file_path: str): diff --git a/src/services/audio_operation/__init__.py b/src/services/audio_operation/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/services/helper/credit.py b/src/services/helper/credit.py new file mode 100644 index 0000000..1e7ac57 --- /dev/null +++ b/src/services/helper/credit.py @@ -0,0 +1,30 @@ +import logging +from typing import Optional + +from src.clients.backend.dtos.response.base import BaseResponse +from src.clients.backend.services.credit import Credit + + +logger = logging.getLogger(__name__) + +class CreditHelper: + def __init__(self): + self.credit = Credit() + + async def increment(self, profile_id, amount) -> Optional[BaseResponse]: + """Generic method to upload content to a given URL.""" + try: + response = self.credit.increment(profile_id, amount) + return response.data + except Exception as e: + logger.error(f"Error consuming credit cost: {str(e)}") + return None + + async def decrement(self, profile_id, amount) -> Optional[BaseResponse]: + """Generic method to upload content to a given URL.""" + try: + response = self.credit.decrement(profile_id, amount) + return response.data + except Exception as e: + logger.error(f"Error reverting credit cost: {str(e)}") + return None diff --git a/src/services/llm_service/factory.py b/src/services/llm_service/factory.py index e0655a5..3db847c 100644 --- a/src/services/llm_service/factory.py +++ b/src/services/llm_service/factory.py @@ -1,5 +1,3 @@ -from typing import Union - from sqlalchemy.ext.asyncio import AsyncSession from src.app.conversation.schemas import ChatRequest @@ -55,8 +53,10 @@ async def get_llm_provider( ) try: - await cls.validate_model(db=db, model=model, provider=provider) - return service_class(model=model.value, request=request, **kwargs) + llm_model = await cls.validate_model(db=db, model=model, provider=provider) + service = service_class(db=db, model=llm_model, request=request, **kwargs) + await service.initialize() + return service except Exception as e: raise RuntimeError(f"Failed to initialize {provider.value} service: {str(e)}") from e @@ -66,3 +66,4 @@ async def validate_model(cls, db: AsyncSession, model: ModelName, provider: Mode llm_model = await llm_repo.get(db=db, code=model.value, provider=provider.label, deleted_at=None) if llm_model is None or llm_model.get('is_active', False) == False: raise Exception("Model is not in use") + return llm_model diff --git a/src/services/llm_service/providers/anthropic.py b/src/services/llm_service/providers/anthropic.py index d343c26..afac7da 100644 --- a/src/services/llm_service/providers/anthropic.py +++ b/src/services/llm_service/providers/anthropic.py @@ -1,8 +1,10 @@ +from sqlalchemy.ext.asyncio import AsyncSession + from src.services.llm_service.service import LLMService from src.app.llm_model.enums import ModelProvider class AnthropicService(LLMService): - def __init__(self, model, provider=ModelProvider.ANTHROPIC, **kwargs): - super().__init__(model, provider, **kwargs) + def __init__(self, db: AsyncSession, model, provider=ModelProvider.ANTHROPIC, **kwargs): + super().__init__(db, model, provider, **kwargs) diff --git a/src/services/llm_service/providers/azure.py b/src/services/llm_service/providers/azure.py index 05ce0ac..eb28f8a 100644 --- a/src/services/llm_service/providers/azure.py +++ b/src/services/llm_service/providers/azure.py @@ -1,8 +1,10 @@ +from sqlalchemy.ext.asyncio import AsyncSession + from src.services.llm_service.service import LLMService from src.app.llm_model.enums import ModelProvider class AzureService(LLMService): - def __init__(self, model, provider=ModelProvider.AZURE, **kwargs): - super().__init__(model, provider, **kwargs) + def __init__(self, db: AsyncSession, model, provider=ModelProvider.AZURE, **kwargs): + super().__init__(db, model, provider, **kwargs) diff --git a/src/services/llm_service/providers/deepseek.py b/src/services/llm_service/providers/deepseek.py index 59668a9..5b3ae62 100644 --- a/src/services/llm_service/providers/deepseek.py +++ b/src/services/llm_service/providers/deepseek.py @@ -1,8 +1,9 @@ +from sqlalchemy.ext.asyncio import AsyncSession + from src.services.llm_service.service import LLMService from src.app.llm_model.enums import ModelProvider class DeepseekService(LLMService): - - def __init__(self, model, provider=ModelProvider.DEEPSEEK, **kwargs): - super().__init__(model, provider, **kwargs) + def __init__(self, db: AsyncSession, model, provider=ModelProvider.DEEPSEEK, **kwargs): + super().__init__(db, model, provider, **kwargs) \ No newline at end of file diff --git a/src/services/llm_service/providers/google.py b/src/services/llm_service/providers/google.py index 9c10bd6..c2f438f 100644 --- a/src/services/llm_service/providers/google.py +++ b/src/services/llm_service/providers/google.py @@ -1,5 +1,6 @@ -from langchain_core.messages import HumanMessage +from langchain_core.messages import HumanMessage, SystemMessage from langchain_google_genai import ChatGoogleGenerativeAI +from sqlalchemy.ext.asyncio import AsyncSession from src.app.asset_conversation.schemas import AssetChatRequest from src.services.llm_service.service import LLMService @@ -8,32 +9,51 @@ class GoogleService(LLMService): - def __init__(self, model, provider=ModelProvider.GOOGLE, **kwargs): - super().__init__(model, provider, **kwargs) + def __init__(self, db: AsyncSession, model, provider=ModelProvider.GOOGLE, **kwargs): + super().__init__(db, model, provider, **kwargs) async def generate_image(self, request: AssetChatRequest): - llm = ChatGoogleGenerativeAI(model=self.model) - message = HumanMessage(content=request.query) - - num = request.params.get("num", 1) - images = [] - - for _i in range(num): - response = await llm.ainvoke( - [message], - generation_config=dict(response_modalities=["TEXT", "IMAGE"]), - ) - - image_block = next( - (block - for block in response.content - if isinstance(block, dict) and block.get("image_url")), - None - ) - - if image_block: - image_url = image_block["image_url"].get("url") - if image_url: - images.append(image_url.split(",")[-1]) - - return images + try: + await self.consumer_credit(None) + llm = ChatGoogleGenerativeAI(model=self.model) + message = HumanMessage(content=request.query) + + num = request.params.get("num", 1) + images = [] + + for _i in range(num): + response = await llm.ainvoke( + [message], + generation_config=dict(response_modalities=["TEXT", "IMAGE"]), + ) + + image_block = next( + (block + for block in response.content + if isinstance(block, dict) and block.get("image_url")), + None + ) + + if image_block: + image_url = image_block["image_url"].get("url") + if image_url: + images.append(image_url.split(",")[-1]) + + return images + except Exception as e: + await self.revert_credit(None) + raise e + + async def analyze_video(self, query, system_prompt=None): + try: + await self.consumer_credit(None) + self.llm = ChatGoogleGenerativeAI(model=self.model) + messages = [] + if system_prompt: + messages.append(SystemMessage(content=system_prompt)) + messages.append(HumanMessage(content=query)) + + return await self.llm.ainvoke(messages) + except Exception as e: + await self.revert_credit(None) + raise e diff --git a/src/services/llm_service/providers/openai.py b/src/services/llm_service/providers/openai.py index 0112a1d..80f11fa 100644 --- a/src/services/llm_service/providers/openai.py +++ b/src/services/llm_service/providers/openai.py @@ -1,11 +1,13 @@ +from sqlalchemy.ext.asyncio import AsyncSession + from src.services.llm_service.service import LLMService from src.app.llm_model.enums import ModelProvider from openai import AsyncOpenAI class OpenLLMService(LLMService): - def __init__(self, model, provider=ModelProvider.OPENAI, **kwargs): - super().__init__(model, provider, **kwargs) + def __init__(self, db: AsyncSession, model, provider=ModelProvider.OPENAI, **kwargs): + super().__init__(db, model, provider, **kwargs) async def generate_image(self, request): client = AsyncOpenAI() diff --git a/src/services/llm_service/providers/xai.py b/src/services/llm_service/providers/xai.py index 8770c66..a47d5ea 100644 --- a/src/services/llm_service/providers/xai.py +++ b/src/services/llm_service/providers/xai.py @@ -1,8 +1,10 @@ +from sqlalchemy.ext.asyncio import AsyncSession + from src.services.llm_service.service import LLMService from src.app.llm_model.enums import ModelProvider class XaiService(LLMService): - def __init__(self, model, provider=ModelProvider.XAI, **kwargs): - super().__init__(model, provider, **kwargs) + def __init__(self, db: AsyncSession, model, provider=ModelProvider.XAI, **kwargs): + super().__init__(db, model, provider, **kwargs) \ No newline at end of file diff --git a/src/services/llm_service/service.py b/src/services/llm_service/service.py index d755569..d0217ca 100644 --- a/src/services/llm_service/service.py +++ b/src/services/llm_service/service.py @@ -5,9 +5,11 @@ from langchain.chat_models import init_chat_model from langchain_core.messages import HumanMessage, BaseMessage, SystemMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder -from langchain.text_splitter import RecursiveCharacterTextSplitter +from sqlalchemy.ext.asyncio import AsyncSession from src.app.conversation.schemas import ChatRequest +from src.app.llm_pricing.repository import LLMPricingRepo +from src.services.helper.credit import CreditHelper from src.services.llm_service.utils import create_response, parse_llm_response from src.utils.convert_json_to_tool import convert_json_to_tool from src.utils.dynamic_model import create_dynamic_model @@ -18,17 +20,27 @@ class LLMService: """Enhanced LLM service class with asset_operation processing capabilities.""" - def __init__(self, model, provider, request: ChatRequest, **kwargs): - self.model = model + def __init__(self, db: AsyncSession, llm_model, provider, request: ChatRequest, **kwargs): + self.db = db + self.llm_model = llm_model + self.model = llm_model.get('code') self.provider = provider - self.llm = init_chat_model(model, model_provider=provider.value) - self.text_splitter = RecursiveCharacterTextSplitter( - chunk_size=1000, - chunk_overlap=200 - ) - self.request = request + self.llm = None + self.cost = 0 + + async def initialize(self): + self.llm = init_chat_model(self.model, model_provider=self.provider.value) + self.cost = await self.get_model_cost() or 0 + + async def get_model_cost(self): + llm_repo = LLMPricingRepo(self.db).repo + llm_pricing = await llm_repo.get(db=self.db, category=self.llm_model.get('category'), is_active=True, deleted_at=None) + if llm_pricing is None or llm_pricing.get('is_active', False) == False: + return None + return llm_pricing.get('credit_cost', 0) + async def process(self): if self.request.use_mcp: response = await self.process_with_mcp() @@ -56,7 +68,7 @@ async def process_with_tools(self): response_content = result["output"] structured_response = parse_llm_response(response_content) - return create_response(query=self.request, response=response_content, structured_response=structured_response) + return create_response(query=self.request, response=response_content, structured_response=structured_response, credit=self.cost) async def process_with_mcp(self) -> BaseMessage: """Process conversation with MCP tools.""" @@ -75,23 +87,38 @@ async def process_with_mcp(self) -> BaseMessage: return await self.llm.ainvoke([HumanMessage(content='')]) async def process_query(self, human_message: str, system_message: str = None, **kwargs) -> Any: - messages = [] - if system_message: - messages.append(SystemMessage(content=system_message)) - messages.append(HumanMessage(content=human_message)) - - response = None - structured_response = None - fields = self.request.fields - if fields is None: - response = await self.llm.ainvoke(messages) - response = response.content - else: - model = create_dynamic_model(fields) - - structured_response = await self.llm.with_structured_output(schema=model).ainvoke(messages) - - if structured_response: - structured_response = structured_response.dict() - response = "Structured response generated." - return create_response(query=self.request, response=response, structured_response=structured_response) + try: + await self.consumer_credit(None) + messages = [] + if system_message: + messages.append(SystemMessage(content=system_message)) + messages.append(HumanMessage(content=human_message)) + + response = None + structured_response = None + fields = self.request.fields + if fields is None: + response = await self.llm.ainvoke(messages) + response = response.content + else: + model = create_dynamic_model(fields) + + structured_response = await self.llm.with_structured_output(schema=model).ainvoke(messages) + + if structured_response: + structured_response = structured_response.dict() + response = "Structured response generated." + return create_response(query=self.request, response=response, structured_response=structured_response, credit=self.cost) + except Exception as e: + await self.revert_credit(None) + logger.error(f"Error processing query: {str(e)}") + return create_response(query=self.request, response=f"Error: {str(e)}", structured_response=None, credit=0) + + async def consumer_credit(self, profile_id): + credit_helper = CreditHelper() + await credit_helper.increment(profile_id, self.cost) + + async def revert_credit(self, profile_id): + credit_helper = CreditHelper() + await credit_helper.decrement(profile_id, self.cost) + self.cost = 0 diff --git a/src/services/llm_service/utils.py b/src/services/llm_service/utils.py index c9db509..c463d14 100644 --- a/src/services/llm_service/utils.py +++ b/src/services/llm_service/utils.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) -def create_response(query: ChatRequest, response = None, structured_response = None) -> ChatResponse: +def create_response(query: ChatRequest, credit, response = None, structured_response = None) -> ChatResponse: return ChatResponse( response=response, model_used=query.model.value, @@ -15,7 +15,8 @@ def create_response(query: ChatRequest, response = None, structured_response = N "temperature": query.temperature, "mcp_enabled": query.use_mcp }, - structure_response=structured_response + structure_response=structured_response, + credit_used=credit ) def parse_llm_response(response: str) -> Dict[str, Any]: From ae32c81500d219fad9f3f62661e426105e3b61e3 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Sun, 28 Sep 2025 21:30:22 +0530 Subject: [PATCH 09/18] adding cost for llm calls --- src/services/asset_operation/types/image.py | 43 +++++++----- src/services/asset_operation/types/video.py | 52 +++++++------- src/services/llm_service/providers/google.py | 4 -- src/services/llm_service/service.py | 73 ++++++++++++-------- 4 files changed, 94 insertions(+), 78 deletions(-) diff --git a/src/services/asset_operation/types/image.py b/src/services/asset_operation/types/image.py index e0651d8..8d4c2ba 100644 --- a/src/services/asset_operation/types/image.py +++ b/src/services/asset_operation/types/image.py @@ -18,28 +18,33 @@ def __init__(self, db: AsyncSession, request, **kwargs): async def analyze(self, **kwargs) -> Dict[str, Any]: """Analyze image using vision-capable models.""" - await self.download_content() - - base64_image = base64.b64encode(self.content).decode('utf-8') - messages = [ - {"type": "text", "text": self.request.query}, - { - "type": "image_url", - "image_url": { - "url": f"data:image/jpeg;base64,{base64_image}" + try: + await self.download_content() + + base64_image = base64.b64encode(self.content).decode('utf-8') + messages = [ + {"type": "text", "text": self.request.query}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{base64_image}" + } } - } - ] + ] - response = await self.llm_call(messages) + response = await self.llm_call(messages) - image = Image.open(io.BytesIO(self.content)) - metadata = { - "format": image.format, - "size": image.size, - "mode": image.mode - } - return self.analyze_response(response, metadata=metadata) + image = Image.open(io.BytesIO(self.content)) + metadata = { + "format": image.format, + "size": image.size, + "mode": image.mode + } + return self.analyze_response(response, metadata=metadata) + except Exception as e: + await self.revert_credit(None) + logger.error(f"Error analyzing image: {str(e)}") + raise e async def generate(self, params: Optional[Dict] = None) -> Dict[str, Any]: """ diff --git a/src/services/asset_operation/types/video.py b/src/services/asset_operation/types/video.py index 024ded9..a86df40 100644 --- a/src/services/asset_operation/types/video.py +++ b/src/services/asset_operation/types/video.py @@ -3,9 +3,7 @@ from typing import Dict, Any -from langchain_google_genai import ChatGoogleGenerativeAI from sqlalchemy.ext.asyncio import AsyncSession -from langchain_core.messages import HumanMessage, SystemMessage from src.services.asset_operation import AssetOperationService @@ -26,28 +24,32 @@ class VideoService(AssetOperationService): def __init__(self, db: AsyncSession, request, **kwargs): super().__init__(db, request, **kwargs) - async def llm_call(self, query, system_prompt=None): - llm_provider = await self.llm_provider() - self.cost = llm_provider.cost - return await llm_provider.analyze_video(query, system_prompt) - async def analyze(self, **kwargs) -> Dict[str, Any]: """Analyze video using vision-capable models.""" - await self.download_content() - - logger.info(f"Starting analysis of video") - query = self.request.query - if not query: - raise ValueError("Analysis query cannot be empty") - encoded_video = base64.b64encode(self.content).decode("utf-8") - - messages = [ - {"type": "text", "text": query}, - { - "type": "media", - "data": encoded_video, - "mime_type": "video/mp4", - }, - ] - response = await self.llm_call(messages) - return self.analyze_response(response) + try: + llm_provider = await self.llm_provider() + self.cost = llm_provider.cost + await self.consumer_credit(None) + + await self.download_content() + + logger.info(f"Starting analysis of video") + query = self.request.query + if not query: + raise ValueError("Analysis query cannot be empty") + encoded_video = base64.b64encode(self.content).decode("utf-8") + + messages = [ + {"type": "text", "text": query}, + { + "type": "media", + "data": encoded_video, + "mime_type": "video/mp4", + }, + ] + response = await llm_provider.analyze_video(messages) + return self.analyze_response(response) + except Exception as e: + await self.revert_credit(None) + logger.error(f"Error analyzing video: {str(e)}", exc_info=True) + raise e diff --git a/src/services/llm_service/providers/google.py b/src/services/llm_service/providers/google.py index c2f438f..aabd396 100644 --- a/src/services/llm_service/providers/google.py +++ b/src/services/llm_service/providers/google.py @@ -14,7 +14,6 @@ def __init__(self, db: AsyncSession, model, provider=ModelProvider.GOOGLE, **kwa async def generate_image(self, request: AssetChatRequest): try: - await self.consumer_credit(None) llm = ChatGoogleGenerativeAI(model=self.model) message = HumanMessage(content=request.query) @@ -41,12 +40,10 @@ async def generate_image(self, request: AssetChatRequest): return images except Exception as e: - await self.revert_credit(None) raise e async def analyze_video(self, query, system_prompt=None): try: - await self.consumer_credit(None) self.llm = ChatGoogleGenerativeAI(model=self.model) messages = [] if system_prompt: @@ -55,5 +52,4 @@ async def analyze_video(self, query, system_prompt=None): return await self.llm.ainvoke(messages) except Exception as e: - await self.revert_credit(None) raise e diff --git a/src/services/llm_service/service.py b/src/services/llm_service/service.py index d0217ca..b8774b6 100644 --- a/src/services/llm_service/service.py +++ b/src/services/llm_service/service.py @@ -1,5 +1,5 @@ import logging -from typing import Any +from typing import Any, Union, Coroutine from langchain.agents import create_tool_calling_agent, AgentExecutor from langchain.chat_models import init_chat_model @@ -7,7 +7,7 @@ from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from sqlalchemy.ext.asyncio import AsyncSession -from src.app.conversation.schemas import ChatRequest +from src.app.conversation.schemas import ChatRequest, ChatResponse from src.app.llm_pricing.repository import LLMPricingRepo from src.services.helper.credit import CreditHelper from src.services.llm_service.utils import create_response, parse_llm_response @@ -51,40 +51,53 @@ async def process(self): return response async def process_with_tools(self): - query = self.request + try: + await self.consumer_credit(None) + query = self.request - tools = convert_json_to_tool(query.functions) - prompt = ChatPromptTemplate.from_messages([ - ("system", "You are a helpful assistant with access to tools. " - "Call necessary tools to give response Json format."), - ("human", "{input}"), - MessagesPlaceholder(variable_name="agent_scratchpad"), - ]) + tools = convert_json_to_tool(query.functions) + prompt = ChatPromptTemplate.from_messages([ + ("system", "You are a helpful assistant with access to tools. " + "Call necessary tools to give response Json format."), + ("human", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad"), + ]) - agent = create_tool_calling_agent(self.llm, tools, prompt) - agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=False) + agent = create_tool_calling_agent(self.llm, tools, prompt) + agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=False) - result = await agent_executor.ainvoke({"input": query.query}) - response_content = result["output"] - structured_response = parse_llm_response(response_content) + result = await agent_executor.ainvoke({"input": query.query}) + response_content = result["output"] + structured_response = parse_llm_response(response_content) - return create_response(query=self.request, response=response_content, structured_response=structured_response, credit=self.cost) + return create_response(query=self.request, response=response_content, + structured_response=structured_response, credit=self.cost) + except Exception as e: + await self.revert_credit(None) + logger.error(f"Error processing with tools: {str(e)}", exc_info=True) + return create_response(query=self.request, response=f"Error: {str(e)}", structured_response=None, credit=0) - async def process_with_mcp(self) -> BaseMessage: + async def process_with_mcp(self) -> Union[ChatResponse, Any]: """Process conversation with MCP tools.""" - query = self.request - if query.mcp_tools: - for tool_spec in query.mcp_tools: - server_tool, params_str = tool_spec.split(":", 1) if ":" in tool_spec else (tool_spec, "") - server_name, tool_name = server_tool.split(".", 1) - - params = {} - if params_str: - for param in params_str.split(","): - key, val = param.split("=", 1) - params[key.strip()] = val.strip() - - return await self.llm.ainvoke([HumanMessage(content='')]) + try: + await self.consumer_credit(None) + query = self.request + if query.mcp_tools: + for tool_spec in query.mcp_tools: + server_tool, params_str = tool_spec.split(":", 1) if ":" in tool_spec else (tool_spec, "") + server_name, tool_name = server_tool.split(".", 1) + + params = {} + if params_str: + for param in params_str.split(","): + key, val = param.split("=", 1) + params[key.strip()] = val.strip() + + return await self.llm.ainvoke([HumanMessage(content='')]) + except Exception as e: + await self.revert_credit(None) + logger.error(f"Error processing with MCP: {str(e)}", exc_info=True) + return create_response(query=self.request, response=f"Error: {str(e)}", structured_response=None, credit=0) async def process_query(self, human_message: str, system_message: str = None, **kwargs) -> Any: try: From 1ff6bab9e45167d5cbb4f72615176a7a424353e8 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Mon, 29 Sep 2025 01:32:28 +0530 Subject: [PATCH 10/18] adding credit cost deducation --- src/app/conversation/schemas/request.py | 1 + src/services/asset_operation/service.py | 10 +++++----- src/services/asset_operation/types/image.py | 2 +- src/services/asset_operation/types/pdf.py | 4 ++-- src/services/asset_operation/types/video.py | 4 ++-- src/services/llm_service/service.py | 20 ++++++++++---------- 6 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/app/conversation/schemas/request.py b/src/app/conversation/schemas/request.py index 8a39e5e..d602331 100644 --- a/src/app/conversation/schemas/request.py +++ b/src/app/conversation/schemas/request.py @@ -6,6 +6,7 @@ class ChatRequest(BaseModel): + profile_id: str query: str model: ModelName provider: ModelProvider diff --git a/src/services/asset_operation/service.py b/src/services/asset_operation/service.py index 9411ab6..f59a3ae 100644 --- a/src/services/asset_operation/service.py +++ b/src/services/asset_operation/service.py @@ -57,7 +57,7 @@ async def llm_provider(self): async def llm_call(self, query, system_prompt=None): llm_provider = await self.llm_provider() - await self.consumer_credit(None) + await self.consumer_credit() messages = [] if system_prompt: messages.append(SystemMessage(content=system_prompt)) @@ -79,13 +79,13 @@ async def generate(self, params=None): """ raise NotImplementedError("Subclasses must implement generate method.") - async def consumer_credit(self, profile_id): + async def consumer_credit(self): credit_helper = CreditHelper() - await credit_helper.increment(profile_id, self.cost) + await credit_helper.increment(self.request.profile_id, self.cost) - async def revert_credit(self, profile_id): + async def revert_credit(self): credit_helper = CreditHelper() - await credit_helper.decrement(profile_id, self.cost) + await credit_helper.decrement(self.request.profile_id, self.cost) self.cost = 0 def analyze_response(self, response, metadata=None): diff --git a/src/services/asset_operation/types/image.py b/src/services/asset_operation/types/image.py index 8d4c2ba..3a5b881 100644 --- a/src/services/asset_operation/types/image.py +++ b/src/services/asset_operation/types/image.py @@ -42,7 +42,7 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: } return self.analyze_response(response, metadata=metadata) except Exception as e: - await self.revert_credit(None) + await self.revert_credit() logger.error(f"Error analyzing image: {str(e)}") raise e diff --git a/src/services/asset_operation/types/pdf.py b/src/services/asset_operation/types/pdf.py index e383744..5005c6e 100644 --- a/src/services/asset_operation/types/pdf.py +++ b/src/services/asset_operation/types/pdf.py @@ -63,7 +63,7 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: return self.analyze_response(response, metadata=metadata) except Exception as e: - await self.revert_credit(None) + await self.revert_credit() logger.error(f"Error analyzing PDF: {str(e)}", exc_info=True) raise Exception(f"Error analyzing PDF: {str(e)}") finally: @@ -144,7 +144,7 @@ async def fill(self): ) except Exception as e: - await self.revert_credit(None) + await self.revert_credit() logger.error(f"Error filling PDF form: {str(e)}", exc_info=True) raise Exception(f"Error filling PDF form: {str(e)}") finally: diff --git a/src/services/asset_operation/types/video.py b/src/services/asset_operation/types/video.py index a86df40..47e921f 100644 --- a/src/services/asset_operation/types/video.py +++ b/src/services/asset_operation/types/video.py @@ -29,7 +29,7 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: try: llm_provider = await self.llm_provider() self.cost = llm_provider.cost - await self.consumer_credit(None) + await self.consumer_credit() await self.download_content() @@ -50,6 +50,6 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: response = await llm_provider.analyze_video(messages) return self.analyze_response(response) except Exception as e: - await self.revert_credit(None) + await self.revert_credit() logger.error(f"Error analyzing video: {str(e)}", exc_info=True) raise e diff --git a/src/services/llm_service/service.py b/src/services/llm_service/service.py index b8774b6..f911903 100644 --- a/src/services/llm_service/service.py +++ b/src/services/llm_service/service.py @@ -52,7 +52,7 @@ async def process(self): async def process_with_tools(self): try: - await self.consumer_credit(None) + await self.consumer_credit() query = self.request tools = convert_json_to_tool(query.functions) @@ -73,14 +73,14 @@ async def process_with_tools(self): return create_response(query=self.request, response=response_content, structured_response=structured_response, credit=self.cost) except Exception as e: - await self.revert_credit(None) + await self.revert_credit() logger.error(f"Error processing with tools: {str(e)}", exc_info=True) return create_response(query=self.request, response=f"Error: {str(e)}", structured_response=None, credit=0) async def process_with_mcp(self) -> Union[ChatResponse, Any]: """Process conversation with MCP tools.""" try: - await self.consumer_credit(None) + await self.consumer_credit() query = self.request if query.mcp_tools: for tool_spec in query.mcp_tools: @@ -95,13 +95,13 @@ async def process_with_mcp(self) -> Union[ChatResponse, Any]: return await self.llm.ainvoke([HumanMessage(content='')]) except Exception as e: - await self.revert_credit(None) + await self.revert_credit() logger.error(f"Error processing with MCP: {str(e)}", exc_info=True) return create_response(query=self.request, response=f"Error: {str(e)}", structured_response=None, credit=0) async def process_query(self, human_message: str, system_message: str = None, **kwargs) -> Any: try: - await self.consumer_credit(None) + await self.consumer_credit() messages = [] if system_message: messages.append(SystemMessage(content=system_message)) @@ -123,15 +123,15 @@ async def process_query(self, human_message: str, system_message: str = None, ** response = "Structured response generated." return create_response(query=self.request, response=response, structured_response=structured_response, credit=self.cost) except Exception as e: - await self.revert_credit(None) + await self.revert_credit() logger.error(f"Error processing query: {str(e)}") return create_response(query=self.request, response=f"Error: {str(e)}", structured_response=None, credit=0) - async def consumer_credit(self, profile_id): + async def consumer_credit(self): credit_helper = CreditHelper() - await credit_helper.increment(profile_id, self.cost) + await credit_helper.increment(self.request.profile_id, self.cost) - async def revert_credit(self, profile_id): + async def revert_credit(self): credit_helper = CreditHelper() - await credit_helper.decrement(profile_id, self.cost) + await credit_helper.decrement(self.request.profile_id, self.cost) self.cost = 0 From 41449fa29466af1860bf5826652a43bab2f78e40 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Thu, 9 Oct 2025 13:18:11 +0530 Subject: [PATCH 11/18] update service name for deployment --- .github/workflows/build.yml | 2 +- src/app/conversation/schemas/request.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6389e02..cd8c529 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -76,7 +76,7 @@ jobs: run: | aws ecs update-service \ --cluster $ECS_CLUSTER \ - --service $ECS_SERVICE \ + --service ${ECS_SERVICE}-web \ --force-new-deployment \ --region $AWS_REGION \ --no-cli-pager \ diff --git a/src/app/conversation/schemas/request.py b/src/app/conversation/schemas/request.py index d602331..8a5ec65 100644 --- a/src/app/conversation/schemas/request.py +++ b/src/app/conversation/schemas/request.py @@ -10,7 +10,7 @@ class ChatRequest(BaseModel): query: str model: ModelName provider: ModelProvider - temperature: float = Field(default=0.7, ge=0.0, le=2.0) + temperature: Optional[float] = Field(default=0.7, ge=0.0, le=2.0) max_tokens: Optional[int] = Field(default=None, gt=0) context: Optional[str] = None fields: Optional[List[dict]] = None From 2946ce7a63dc94830161a1b685632e08c145dbd3 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Fri, 10 Oct 2025 00:05:29 +0530 Subject: [PATCH 12/18] fixing default backend url --- src/setup/config/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/setup/config/config.py b/src/setup/config/config.py index 09277e6..5435497 100644 --- a/src/setup/config/config.py +++ b/src/setup/config/config.py @@ -21,7 +21,7 @@ class Config(BaseSettings): anthropic_api_key: str = "" google_api_key: str = "" - backend_url: str = "http://localhost:8000" + backend_url: str = "http://localhost:8000/internal/v1" timeout: int = 60 max_retries: int = 3 From e13b6d6bc721ea3de8468d6622204c3d9332d8e2 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Fri, 10 Oct 2025 00:28:01 +0530 Subject: [PATCH 13/18] Fixing profile id set in request --- src/services/asset_operation/service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/services/asset_operation/service.py b/src/services/asset_operation/service.py index f59a3ae..84aaf0e 100644 --- a/src/services/asset_operation/service.py +++ b/src/services/asset_operation/service.py @@ -48,6 +48,7 @@ async def _download_content(self, url: str) -> Optional[bytes]: async def llm_provider(self): llm_provider = await ProviderFactory.get_llm_provider(self.db, request=ChatRequest( + profile_id=self.request.profile_id, provider=self.provider, model=self.model, query="", From 30d70db353594250c31f40ed53ea9345f1259728 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Fri, 10 Oct 2025 17:57:52 +0530 Subject: [PATCH 14/18] fixing error for credit inc and dec --- src/clients/backend/dtos/response/base.py | 1 + src/clients/base.py | 24 +++++++++++++++++-- src/core/exceptions/base.py | 8 +++++++ src/core/exceptions/enum/__init__.py | 3 +++ src/core/exceptions/enum/error_code.py | 13 ++++++++++ src/core/exceptions/internal_service.py | 10 ++++++++ .../exceptions/limit_exceeded/__init__.py | 0 src/core/exceptions/limit_exceeded/base.py | 9 +++++++ .../exceptions/limit_exceeded/credit_limit.py | 10 ++++++++ .../exceptions/limit_exceeded/rate_limit.py | 10 ++++++++ src/services/asset_operation/types/pdf.py | 4 ++-- src/services/helper/credit.py | 17 ++++--------- src/services/llm_service/service.py | 6 ++--- 13 files changed, 95 insertions(+), 20 deletions(-) create mode 100644 src/core/exceptions/base.py create mode 100644 src/core/exceptions/enum/__init__.py create mode 100644 src/core/exceptions/enum/error_code.py create mode 100644 src/core/exceptions/internal_service.py create mode 100644 src/core/exceptions/limit_exceeded/__init__.py create mode 100644 src/core/exceptions/limit_exceeded/base.py create mode 100644 src/core/exceptions/limit_exceeded/credit_limit.py create mode 100644 src/core/exceptions/limit_exceeded/rate_limit.py diff --git a/src/clients/backend/dtos/response/base.py b/src/clients/backend/dtos/response/base.py index 84dbe11..2b1e814 100644 --- a/src/clients/backend/dtos/response/base.py +++ b/src/clients/backend/dtos/response/base.py @@ -12,3 +12,4 @@ class BaseResponse(BaseModel, Generic[T]): data: Optional[T] = None message: Optional[str] = "" error: Optional[str] = None + error_code: Optional[str] = None diff --git a/src/clients/base.py b/src/clients/base.py index f247e23..99e9942 100644 --- a/src/clients/base.py +++ b/src/clients/base.py @@ -7,6 +7,9 @@ from src.clients.backend.dtos.response.base import BaseResponse from src.core.enums.CustomHeader import CustomHeader +from src.core.exceptions.enum.error_code import ErrorCode +from src.core.exceptions.internal_service import InternalServer +from src.core.exceptions.limit_exceeded.credit_limit import CreditLimitException from src.setup.config import config logger = logging.getLogger(__name__) @@ -37,18 +40,35 @@ def _make_request( json=data, timeout=self.timeout, ) - response.raise_for_status() return response.json() except requests.RequestException as e: logger.error( f"Error communicating with {self.base_url} at endpoint {endpoint}: {str(e)}", ) - raise Exception(f"Error communicating with {self.base_url}: {str(e)}") + raise InternalServer("Error communicating with internal service") def _process_response( self, response_data: Dict, response_class: Type ) -> BaseResponse | Dict: """Process response data into specified class""" + try: + response = self._parse_response(response_data, response_class) + if response is None or response.success is False: + + if response.error_code in [ErrorCode.CREDIT_EXCEEDED.value]: + logger.error(f"Credit limit exceeded: {response_data}") + raise CreditLimitException(f"Credit limit exceeded") + + logger.error(f"Unsuccessful response: {response_data}") + raise InternalServer("Unsuccessful response from internal service") + + return response + except Exception as e: + logger.error(f"Error parsing response: {str(e)}") + raise InternalServer("Error parsing response from internal service") + + def _parse_response(self, response_data: Dict, response_class: Type) -> BaseResponse | Dict: + """Parse JSON response safely""" if response_class == dict: return response_data elif hasattr(response_class, "__dataclass_fields__"): diff --git a/src/core/exceptions/base.py b/src/core/exceptions/base.py new file mode 100644 index 0000000..6568569 --- /dev/null +++ b/src/core/exceptions/base.py @@ -0,0 +1,8 @@ +class BaseError(Exception): + """Base error class for the application""" + + def __init__(self, message: str = "An error occurred", original_exception=None, error_code=None, *args: object) -> None: + super().__init__(message, *args) + self.message = message + self.original_exception = original_exception + self.error_code = error_code \ No newline at end of file diff --git a/src/core/exceptions/enum/__init__.py b/src/core/exceptions/enum/__init__.py new file mode 100644 index 0000000..755433a --- /dev/null +++ b/src/core/exceptions/enum/__init__.py @@ -0,0 +1,3 @@ +from commons.exceptions.enum.error_code import ErrorCode + +__all__ = [ErrorCode] \ No newline at end of file diff --git a/src/core/exceptions/enum/error_code.py b/src/core/exceptions/enum/error_code.py new file mode 100644 index 0000000..603e11f --- /dev/null +++ b/src/core/exceptions/enum/error_code.py @@ -0,0 +1,13 @@ +from src.core.enums.BaseEnum import BaseEnum + + +class ErrorCode(BaseEnum): + CREDIT_EXCEEDED = "CREDIT_EXCEEDED" + INVALID_CREDENTIALS = "INVALID_CREDENTIALS" + INVALID_REQUEST = "INVALID_REQUEST" + RATE_LIMIT_EXCEEDED = "RATE_LIMIT_EXCEEDED" + RESOURCE_NOT_FOUND = "RESOURCE_NOT_FOUND" + SERVER_ERROR = "SERVER_ERROR" + TIMEOUT = "TIMEOUT" + UNAUTHORIZED = "UNAUTHORIZED" + VALIDATION_ERROR = "VALIDATION_ERROR" diff --git a/src/core/exceptions/internal_service.py b/src/core/exceptions/internal_service.py new file mode 100644 index 0000000..baa0973 --- /dev/null +++ b/src/core/exceptions/internal_service.py @@ -0,0 +1,10 @@ +from src.core.exceptions.base import BaseError +from src.core.exceptions.enum.error_code import ErrorCode + + +class InternalServer(BaseError): + def __init__(self, message: str = "Internal service error", original_exception=None, error_code=ErrorCode.SERVER_ERROR.value) -> None: + self.message = message + self.error_code = error_code + self.original_exception = original_exception + super().__init__(self.message, original_exception, error_code=self.error_code) \ No newline at end of file diff --git a/src/core/exceptions/limit_exceeded/__init__.py b/src/core/exceptions/limit_exceeded/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/exceptions/limit_exceeded/base.py b/src/core/exceptions/limit_exceeded/base.py new file mode 100644 index 0000000..3254fe4 --- /dev/null +++ b/src/core/exceptions/limit_exceeded/base.py @@ -0,0 +1,9 @@ +from src.core.exceptions.base import BaseError + + +class LimitExceeded(BaseError): + def __init__(self, message: str = "Could not infer id for resource being cached.", original_exception=None, error_code: str = "") -> None: + self.message = message + self.error_code = error_code + self.original_exception = original_exception + super().__init__(self.message, self.original_exception, error_code=self.error_code) diff --git a/src/core/exceptions/limit_exceeded/credit_limit.py b/src/core/exceptions/limit_exceeded/credit_limit.py new file mode 100644 index 0000000..b9983ba --- /dev/null +++ b/src/core/exceptions/limit_exceeded/credit_limit.py @@ -0,0 +1,10 @@ +from src.core.exceptions.enum.error_code import ErrorCode +from src.core.exceptions.limit_exceeded.base import LimitExceeded + + +class CreditLimitException(LimitExceeded): + def __init__(self, message: str = "Credit limit exceeded", original_exception=None, error_code=ErrorCode.CREDIT_EXCEEDED.value) -> None: + self.message = message + self.error_code = error_code + self.original_exception = original_exception + super().__init__(self.message, original_exception, error_code=self.error_code) \ No newline at end of file diff --git a/src/core/exceptions/limit_exceeded/rate_limit.py b/src/core/exceptions/limit_exceeded/rate_limit.py new file mode 100644 index 0000000..3c9bec5 --- /dev/null +++ b/src/core/exceptions/limit_exceeded/rate_limit.py @@ -0,0 +1,10 @@ +from src.core.exceptions.enum.error_code import ErrorCode +from src.core.exceptions.limit_exceeded.base import LimitExceeded + + +class RateLimitExceeded(LimitExceeded): + def __init__(self, message: str, original_exception: Exception, error_code=ErrorCode.RATE_LIMIT_EXCEEDED.value): + super().__init__(message, original_exception, error_code) + self.message = message + self.original_exception = original_exception + self.error_code = error_code diff --git a/src/services/asset_operation/types/pdf.py b/src/services/asset_operation/types/pdf.py index 5005c6e..653b932 100644 --- a/src/services/asset_operation/types/pdf.py +++ b/src/services/asset_operation/types/pdf.py @@ -65,7 +65,7 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: except Exception as e: await self.revert_credit() logger.error(f"Error analyzing PDF: {str(e)}", exc_info=True) - raise Exception(f"Error analyzing PDF: {str(e)}") + raise e finally: clean_up(temp_pdf_path) @@ -146,7 +146,7 @@ async def fill(self): except Exception as e: await self.revert_credit() logger.error(f"Error filling PDF form: {str(e)}", exc_info=True) - raise Exception(f"Error filling PDF form: {str(e)}") + raise e finally: clean_up(temp_pdf_path) diff --git a/src/services/helper/credit.py b/src/services/helper/credit.py index 1e7ac57..1c99ba6 100644 --- a/src/services/helper/credit.py +++ b/src/services/helper/credit.py @@ -4,7 +4,6 @@ from src.clients.backend.dtos.response.base import BaseResponse from src.clients.backend.services.credit import Credit - logger = logging.getLogger(__name__) class CreditHelper: @@ -13,18 +12,10 @@ def __init__(self): async def increment(self, profile_id, amount) -> Optional[BaseResponse]: """Generic method to upload content to a given URL.""" - try: - response = self.credit.increment(profile_id, amount) - return response.data - except Exception as e: - logger.error(f"Error consuming credit cost: {str(e)}") - return None + response = self.credit.increment(profile_id, amount) + return response.data async def decrement(self, profile_id, amount) -> Optional[BaseResponse]: """Generic method to upload content to a given URL.""" - try: - response = self.credit.decrement(profile_id, amount) - return response.data - except Exception as e: - logger.error(f"Error reverting credit cost: {str(e)}") - return None + response = self.credit.decrement(profile_id, amount) + return response.data diff --git a/src/services/llm_service/service.py b/src/services/llm_service/service.py index f911903..7b0c4f6 100644 --- a/src/services/llm_service/service.py +++ b/src/services/llm_service/service.py @@ -75,7 +75,7 @@ async def process_with_tools(self): except Exception as e: await self.revert_credit() logger.error(f"Error processing with tools: {str(e)}", exc_info=True) - return create_response(query=self.request, response=f"Error: {str(e)}", structured_response=None, credit=0) + return e async def process_with_mcp(self) -> Union[ChatResponse, Any]: """Process conversation with MCP tools.""" @@ -97,7 +97,7 @@ async def process_with_mcp(self) -> Union[ChatResponse, Any]: except Exception as e: await self.revert_credit() logger.error(f"Error processing with MCP: {str(e)}", exc_info=True) - return create_response(query=self.request, response=f"Error: {str(e)}", structured_response=None, credit=0) + return e async def process_query(self, human_message: str, system_message: str = None, **kwargs) -> Any: try: @@ -125,7 +125,7 @@ async def process_query(self, human_message: str, system_message: str = None, ** except Exception as e: await self.revert_credit() logger.error(f"Error processing query: {str(e)}") - return create_response(query=self.request, response=f"Error: {str(e)}", structured_response=None, credit=0) + return e async def consumer_credit(self): credit_helper = CreditHelper() From 95f05b94e35a8dd4c0466030e985c19cc86a4f36 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Fri, 10 Oct 2025 18:03:59 +0530 Subject: [PATCH 15/18] removing wrapper on exception --- src/app/asset_conversation/services/asset.py | 13 ++++++++++--- src/app/asset_conversation/services/pdf.py | 10 ++++++++-- src/app/conversation/service.py | 10 ++++++++-- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/app/asset_conversation/services/asset.py b/src/app/asset_conversation/services/asset.py index 56dea2a..2b8527e 100644 --- a/src/app/asset_conversation/services/asset.py +++ b/src/app/asset_conversation/services/asset.py @@ -1,8 +1,13 @@ +import logging + from sqlalchemy.ext.asyncio import AsyncSession +from src.core.exceptions.internal_service import InternalServer from src.services.asset_operation import AssetFactory, AssetOperationService from src.services.asset_operation.types import PdfService +logger = logging.getLogger(__name__) + class AssetConversationService: def __init__(self, db: AsyncSession, **kwargs): @@ -14,7 +19,7 @@ async def asset_service(self, request) -> AssetOperationService: asset_service = await AssetFactory.get_parser_service(self.db, request) return asset_service except Exception as e: - raise Exception(f"Error getting LLM service: {e}") + raise InternalServer(f"Error getting LLM service: {e}") async def analyze_asset(self, request): """Analyze an asset using LLM""" @@ -22,7 +27,8 @@ async def analyze_asset(self, request): asset_service = await self.asset_service(request) return await asset_service.analyze() except Exception as e: - raise Exception(f"Error analyzing asset: {e}") + logger.error(f"Error analyzing assert: {e}") + raise e async def generate_asset(self, request): """Generate an asset using LLM""" @@ -30,4 +36,5 @@ async def generate_asset(self, request): asset_service = await self.asset_service(request) return await asset_service.generate() except Exception as e: - raise Exception(f"Error generating asset: {e}") + logger.error(f"Error generating asset: {e}") + raise e diff --git a/src/app/asset_conversation/services/pdf.py b/src/app/asset_conversation/services/pdf.py index 999afd2..d2cb016 100644 --- a/src/app/asset_conversation/services/pdf.py +++ b/src/app/asset_conversation/services/pdf.py @@ -1,8 +1,12 @@ +import logging + from sqlalchemy.ext.asyncio import AsyncSession from src.app.asset_conversation.services.asset import AssetConversationService from src.services.asset_operation.types import PdfService +logger = logging.getLogger(__name__) + class PDFConversationService(AssetConversationService): def __init__(self, db: AsyncSession, **kwargs): @@ -14,7 +18,8 @@ async def analyze_asset(self, request): asset_service = await self.asset_service(request) return await asset_service.analyze() except Exception as e: - raise Exception(f"Error analyzing asset: {e}") + logger.error(f"Error analyzing asset: {e}") + raise e async def fill_pdf(self, request): """Generate an asset using LLM""" @@ -23,4 +28,5 @@ async def fill_pdf(self, request): pdf_service: PdfService = asset_service # type: ignore return await pdf_service.fill() except Exception as e: - raise Exception(f"Error generating asset: {e}") + logger.error(f"Error parsing response: {str(e)}") + raise e diff --git a/src/app/conversation/service.py b/src/app/conversation/service.py index 220315d..e23bf9e 100644 --- a/src/app/conversation/service.py +++ b/src/app/conversation/service.py @@ -1,7 +1,12 @@ +import logging + from sqlalchemy.ext.asyncio import AsyncSession +from src.core.exceptions.internal_service import InternalServer from src.services.llm_service import ProviderFactory, LLMService +logger = logging.getLogger(__name__) + class LLMConversationService: def __init__(self, db: AsyncSession, **kwargs): @@ -13,7 +18,7 @@ async def llm_service(self, request) -> LLMService: llm_service = await ProviderFactory.get_llm_provider(self.db, request) return llm_service except Exception as e: - raise Exception(f"Error getting LLM service: {e}") + raise InternalServer(f"Error getting LLM service: {e}") async def conversation(self, request): """Process a text conversation with modern LangChain integration""" @@ -21,4 +26,5 @@ async def conversation(self, request): llm_service = await self.llm_service(request) return await llm_service.process() except Exception as e: - raise Exception(f"Error processing conversation: {e}") + logger.error(f"Error processing conversation: {e}") + raise e From c2d3bad76521a273ac9689fa806b6187131595b5 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Fri, 10 Oct 2025 18:07:54 +0530 Subject: [PATCH 16/18] adding 429 response in respone factory --- src/core/api/responses.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/core/api/responses.py b/src/core/api/responses.py index 8b9daea..c7cfccf 100644 --- a/src/core/api/responses.py +++ b/src/core/api/responses.py @@ -125,6 +125,16 @@ def __init__(self, data: Any = None, message: str = "Internal server error", **k ) +class LimitExceededResponse(BaseResponse): + def __init__(self, data: Any = None, message: str = "Limit exceeded error", **kwargs): + super().__init__( + data=data, + message=message, + status_code=status.HTTP_429_TOO_MANY_REQUESTS, + **kwargs + ) + + class ResponseFactory: @staticmethod def success(data: Any = None, message: str = "Operation successful", **kwargs) -> JSONResponse: @@ -137,8 +147,8 @@ def created(data: Any = None, message: str = "Resource created successfully", ** return response.to_json_response() @staticmethod - def bad_request(data: Any = None, message: str = "Bad request", errors: Any = None, **kwargs) -> JSONResponse: - response = BadRequestResponse(data=data, message=message, errors=errors, **kwargs) + def bad_request(data: Any = None, message: str = "Bad request", errors: Any = None, error_code=None, **kwargs) -> JSONResponse: + response = BadRequestResponse(data=data, message=message, errors=errors, error_code=error_code, **kwargs) return response.to_json_response() @staticmethod @@ -147,20 +157,25 @@ def not_found(data: Any = None, message: str = "Resource not found", **kwargs) - return response.to_json_response() @staticmethod - def unauthorized(data: Any = None, message: str = "Unauthorized access", **kwargs) -> JSONResponse: - response = UnauthorizedResponse(data=data, message=message, **kwargs) + def unauthorized(data: Any = None, message: str = "Unauthorized access", error_code=None, **kwargs) -> JSONResponse: + response = UnauthorizedResponse(data=data, message=message, error_code=error_code, **kwargs) return response.to_json_response() @staticmethod - def forbidden(data: Any = None, message: str = "Access forbidden", **kwargs) -> JSONResponse: - response = ForbiddenResponse(data=data, message=message, **kwargs) + def forbidden(data: Any = None, message: str = "Access forbidden", error_code=None, **kwargs) -> JSONResponse: + response = ForbiddenResponse(data=data, message=message, error_code=error_code, **kwargs) return response.to_json_response() @staticmethod def server_error( - data: Any = None, message: str = "Internal server error", errors: Any = None, **kwargs + data: Any = None, message: str = "Internal server error", errors: Any = None, error_code = None, **kwargs ) -> JSONResponse: - response = ServerErrorResponse(data=data, message=message, errors=errors, **kwargs) + response = ServerErrorResponse(data=data, message=message, errors=errors, error_code=error_code, **kwargs) + return response.to_json_response() + + @staticmethod + def limit_exceeded(data: Any = None, message: str = "Limit exceeded error", error_code=None, **kwargs) -> JSONResponse: + response = LimitExceededResponse(data=data, message=message, error_code=error_code, **kwargs) return response.to_json_response() @staticmethod From 05eec09338eb05f9d912df306ac87b69e98ada71 Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Sat, 11 Oct 2025 15:00:06 +0530 Subject: [PATCH 17/18] removing credit call on llm services --- src/core/exceptions/enum/__init__.py | 2 +- src/services/asset_operation/service.py | 12 -------- src/services/asset_operation/types/image.py | 1 - src/services/asset_operation/types/pdf.py | 2 -- src/services/asset_operation/types/video.py | 4 --- src/services/asset_operation/utils.py | 4 +-- src/services/llm_service/service.py | 31 ++------------------- src/services/llm_service/utils.py | 2 +- 8 files changed, 7 insertions(+), 51 deletions(-) diff --git a/src/core/exceptions/enum/__init__.py b/src/core/exceptions/enum/__init__.py index 755433a..1e8217f 100644 --- a/src/core/exceptions/enum/__init__.py +++ b/src/core/exceptions/enum/__init__.py @@ -1,3 +1,3 @@ -from commons.exceptions.enum.error_code import ErrorCode +from src.core.exceptions.enum.error_code import ErrorCode __all__ = [ErrorCode] \ No newline at end of file diff --git a/src/services/asset_operation/service.py b/src/services/asset_operation/service.py index 84aaf0e..70d876b 100644 --- a/src/services/asset_operation/service.py +++ b/src/services/asset_operation/service.py @@ -9,7 +9,6 @@ from src.app.conversation.schemas import ChatRequest from src.services.asset_operation.utils import analyze_response, generate_response from src.services.helper import FileSave -from src.services.helper.credit import CreditHelper from src.services.llm_service import ProviderFactory logger = logging.getLogger(__name__) @@ -53,12 +52,10 @@ async def llm_provider(self): model=self.model, query="", )) - self.cost = llm_provider.cost return llm_provider async def llm_call(self, query, system_prompt=None): llm_provider = await self.llm_provider() - await self.consumer_credit() messages = [] if system_prompt: messages.append(SystemMessage(content=system_prompt)) @@ -80,15 +77,6 @@ async def generate(self, params=None): """ raise NotImplementedError("Subclasses must implement generate method.") - async def consumer_credit(self): - credit_helper = CreditHelper() - await credit_helper.increment(self.request.profile_id, self.cost) - - async def revert_credit(self): - credit_helper = CreditHelper() - await credit_helper.decrement(self.request.profile_id, self.cost) - self.cost = 0 - def analyze_response(self, response, metadata=None): """ Process the LLM response and return structured data. diff --git a/src/services/asset_operation/types/image.py b/src/services/asset_operation/types/image.py index 3a5b881..e668139 100644 --- a/src/services/asset_operation/types/image.py +++ b/src/services/asset_operation/types/image.py @@ -42,7 +42,6 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: } return self.analyze_response(response, metadata=metadata) except Exception as e: - await self.revert_credit() logger.error(f"Error analyzing image: {str(e)}") raise e diff --git a/src/services/asset_operation/types/pdf.py b/src/services/asset_operation/types/pdf.py index 653b932..fb16cc2 100644 --- a/src/services/asset_operation/types/pdf.py +++ b/src/services/asset_operation/types/pdf.py @@ -63,7 +63,6 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: return self.analyze_response(response, metadata=metadata) except Exception as e: - await self.revert_credit() logger.error(f"Error analyzing PDF: {str(e)}", exc_info=True) raise e finally: @@ -144,7 +143,6 @@ async def fill(self): ) except Exception as e: - await self.revert_credit() logger.error(f"Error filling PDF form: {str(e)}", exc_info=True) raise e finally: diff --git a/src/services/asset_operation/types/video.py b/src/services/asset_operation/types/video.py index 47e921f..371cdcb 100644 --- a/src/services/asset_operation/types/video.py +++ b/src/services/asset_operation/types/video.py @@ -28,9 +28,6 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: """Analyze video using vision-capable models.""" try: llm_provider = await self.llm_provider() - self.cost = llm_provider.cost - await self.consumer_credit() - await self.download_content() logger.info(f"Starting analysis of video") @@ -50,6 +47,5 @@ async def analyze(self, **kwargs) -> Dict[str, Any]: response = await llm_provider.analyze_video(messages) return self.analyze_response(response) except Exception as e: - await self.revert_credit() logger.error(f"Error analyzing video: {str(e)}", exc_info=True) raise e diff --git a/src/services/asset_operation/utils.py b/src/services/asset_operation/utils.py index afbdf15..4a0849d 100644 --- a/src/services/asset_operation/utils.py +++ b/src/services/asset_operation/utils.py @@ -3,7 +3,7 @@ from src.app.asset_conversation.schemas import AssetChatRequest, AssetChatResponse -def analyze_response(query_type, model, response, credit, metadata=None): +def analyze_response(query_type, model, response, credit=0, metadata=None): return AssetChatResponse( response=response.content if hasattr(response, 'content') else str(response), model_used=model, @@ -12,7 +12,7 @@ def analyze_response(query_type, model, response, credit, metadata=None): credit_used=credit ) -def generate_response(query_type, model, response, cost, metadata=None): +def generate_response(query_type, model, response, cost=0, metadata=None): return AssetChatResponse( response=response.content if hasattr(response, 'content') else str(response), model_used=model, diff --git a/src/services/llm_service/service.py b/src/services/llm_service/service.py index 7b0c4f6..bd970e5 100644 --- a/src/services/llm_service/service.py +++ b/src/services/llm_service/service.py @@ -3,13 +3,11 @@ from langchain.agents import create_tool_calling_agent, AgentExecutor from langchain.chat_models import init_chat_model -from langchain_core.messages import HumanMessage, BaseMessage, SystemMessage +from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from sqlalchemy.ext.asyncio import AsyncSession from src.app.conversation.schemas import ChatRequest, ChatResponse -from src.app.llm_pricing.repository import LLMPricingRepo -from src.services.helper.credit import CreditHelper from src.services.llm_service.utils import create_response, parse_llm_response from src.utils.convert_json_to_tool import convert_json_to_tool from src.utils.dynamic_model import create_dynamic_model @@ -32,14 +30,6 @@ def __init__(self, db: AsyncSession, llm_model, provider, request: ChatRequest, async def initialize(self): self.llm = init_chat_model(self.model, model_provider=self.provider.value) - self.cost = await self.get_model_cost() or 0 - - async def get_model_cost(self): - llm_repo = LLMPricingRepo(self.db).repo - llm_pricing = await llm_repo.get(db=self.db, category=self.llm_model.get('category'), is_active=True, deleted_at=None) - if llm_pricing is None or llm_pricing.get('is_active', False) == False: - return None - return llm_pricing.get('credit_cost', 0) async def process(self): if self.request.use_mcp: @@ -52,7 +42,6 @@ async def process(self): async def process_with_tools(self): try: - await self.consumer_credit() query = self.request tools = convert_json_to_tool(query.functions) @@ -71,16 +60,14 @@ async def process_with_tools(self): structured_response = parse_llm_response(response_content) return create_response(query=self.request, response=response_content, - structured_response=structured_response, credit=self.cost) + structured_response=structured_response) except Exception as e: - await self.revert_credit() logger.error(f"Error processing with tools: {str(e)}", exc_info=True) return e async def process_with_mcp(self) -> Union[ChatResponse, Any]: """Process conversation with MCP tools.""" try: - await self.consumer_credit() query = self.request if query.mcp_tools: for tool_spec in query.mcp_tools: @@ -95,13 +82,11 @@ async def process_with_mcp(self) -> Union[ChatResponse, Any]: return await self.llm.ainvoke([HumanMessage(content='')]) except Exception as e: - await self.revert_credit() logger.error(f"Error processing with MCP: {str(e)}", exc_info=True) return e async def process_query(self, human_message: str, system_message: str = None, **kwargs) -> Any: try: - await self.consumer_credit() messages = [] if system_message: messages.append(SystemMessage(content=system_message)) @@ -121,17 +106,7 @@ async def process_query(self, human_message: str, system_message: str = None, ** if structured_response: structured_response = structured_response.dict() response = "Structured response generated." - return create_response(query=self.request, response=response, structured_response=structured_response, credit=self.cost) + return create_response(query=self.request, response=response, structured_response=structured_response) except Exception as e: - await self.revert_credit() logger.error(f"Error processing query: {str(e)}") return e - - async def consumer_credit(self): - credit_helper = CreditHelper() - await credit_helper.increment(self.request.profile_id, self.cost) - - async def revert_credit(self): - credit_helper = CreditHelper() - await credit_helper.decrement(self.request.profile_id, self.cost) - self.cost = 0 diff --git a/src/services/llm_service/utils.py b/src/services/llm_service/utils.py index c463d14..95660b2 100644 --- a/src/services/llm_service/utils.py +++ b/src/services/llm_service/utils.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) -def create_response(query: ChatRequest, credit, response = None, structured_response = None) -> ChatResponse: +def create_response(query: ChatRequest, credit=0, response = None, structured_response = None) -> ChatResponse: return ChatResponse( response=response, model_used=query.model.value, From b2701ad693270acbcd2d18ffe1638ca3ebd8fd8f Mon Sep 17 00:00:00 2001 From: Ayush Singhal Date: Sat, 11 Oct 2025 15:03:14 +0530 Subject: [PATCH 18/18] change url for credit api --- src/clients/backend/dtos/request/credit.py | 1 + src/clients/backend/services/credit.py | 25 +++++++++++----------- src/services/helper/credit.py | 10 ++++----- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/clients/backend/dtos/request/credit.py b/src/clients/backend/dtos/request/credit.py index cfb1b01..d14d787 100644 --- a/src/clients/backend/dtos/request/credit.py +++ b/src/clients/backend/dtos/request/credit.py @@ -2,5 +2,6 @@ class CreditData(BaseModel): + usage_id: str profile_id: str amount: int diff --git a/src/clients/backend/services/credit.py b/src/clients/backend/services/credit.py index 6b6a06a..651d751 100644 --- a/src/clients/backend/services/credit.py +++ b/src/clients/backend/services/credit.py @@ -12,22 +12,23 @@ def __init__(self): super().__init__() self.base = "/credit" - def increment(self, profile_id, amount) -> BaseResponse: - """ - Send a query to the AI server and return the response. - """ - request = CreditData(profile_id=profile_id, amount=amount) + def reserve(self, usage_id, profile_id, amount) -> BaseResponse: + request = CreditData(usage_id=usage_id, profile_id=profile_id, amount=amount) response = self._make_request( - f"{self.base}/increment/", method="POST", data=request.model_dump() + f"{self.base}/reserve/", method="POST", data=request.model_dump() ) return self._process_response(response, BaseResponse) - def decrement(self, profile_id, amount) -> BaseResponse: - """ - Send a query to the AI server and return the response. - """ - request = CreditData(profile_id=profile_id, amount=amount) + def revert(self, usage_id, profile_id, amount) -> BaseResponse: + request = CreditData(usage_id=usage_id, profile_id=profile_id, amount=amount) response = self._make_request( - f"{self.base}/decrement/", method="POST", data=request.model_dump() + f"{self.base}/revert/", method="POST", data=request.model_dump() ) return self._process_response(response, BaseResponse) + + def confirm(self, usage_id, profile_id, amount) -> BaseResponse: + request = CreditData(usage_id=usage_id, profile_id=profile_id, amount=amount) + response = self._make_request( + f"{self.base}/confirm/", method="POST", data=request.model_dump() + ) + return self._process_response(response, BaseResponse) \ No newline at end of file diff --git a/src/services/helper/credit.py b/src/services/helper/credit.py index 1c99ba6..bebd16f 100644 --- a/src/services/helper/credit.py +++ b/src/services/helper/credit.py @@ -10,12 +10,10 @@ class CreditHelper: def __init__(self): self.credit = Credit() - async def increment(self, profile_id, amount) -> Optional[BaseResponse]: - """Generic method to upload content to a given URL.""" - response = self.credit.increment(profile_id, amount) + async def reserve(self, usage_id, profile_id, amount) -> Optional[BaseResponse]: + response = self.credit.reserve(usage_id, profile_id, amount) return response.data - async def decrement(self, profile_id, amount) -> Optional[BaseResponse]: - """Generic method to upload content to a given URL.""" - response = self.credit.decrement(profile_id, amount) + async def revert(self, usage_id, profile_id, amount) -> Optional[BaseResponse]: + response = self.credit.revert(usage_id, profile_id, amount) return response.data