In [6]:
import os
os.environ["LLM_PROVIDER"] = "google"
os.environ["GOOGLE_API_KEY"] = "AIzaSyB1EZnCPsTn6MaHSdo24tk1mRPGTTJM85g"


In [10]:
!pip install nest_asyncio




  parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != '.']


In [None]:
"""
Fynd AI Feedback System - Production Backend
FastAPI + SQLite + Claude/Gemini Integration
"""

import os
import json
import sqlite3
import uuid
from datetime import datetime
from typing import List, Optional
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, field_validator
import httpx
from dotenv import load_dotenv

load_dotenv()

# ============================================================================
# CONFIGURATION
# ============================================================================

class Config:
    """Centralized configuration"""
    DATABASE_PATH = "reviews.db"
    LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")  # anthropic or google
    ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
    GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
    MAX_REVIEW_LENGTH = 5000
    LLM_TIMEOUT = 30
    LLM_MAX_RETRIES = 2

# ============================================================================
# DATABASE LAYER
# ============================================================================

class DatabaseManager:
    """Handles all database operations with clean abstraction"""

    def __init__(self, db_path: str):
        self.db_path = db_path
        self.init_database()

    def get_connection(self):
        """Get database connection with row factory"""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        return conn

    def init_database(self):
        """Initialize database schema"""
        conn = self.get_connection()
        cursor = conn.cursor()

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS reviews (
                id TEXT PRIMARY KEY,
                rating INTEGER NOT NULL,
                review_text TEXT NOT NULL,
                ai_response TEXT NOT NULL,
                ai_summary TEXT NOT NULL,
                ai_recommended_action TEXT NOT NULL,
                created_at TEXT NOT NULL
            )
        """)

        conn.commit()
        conn.close()

    def insert_review(
        self,
        rating: int,
        review_text: str,
        ai_response: str,
        ai_summary: str,
        ai_recommended_action: str
    ) -> str:
        """Insert a new review and return its ID"""
        conn = self.get_connection()
        cursor = conn.cursor()

        review_id = str(uuid.uuid4())
        created_at = datetime.utcnow().isoformat()

        cursor.execute("""
            INSERT INTO reviews
            (id, rating, review_text, ai_response, ai_summary, ai_recommended_action, created_at)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (review_id, rating, review_text, ai_response, ai_summary, ai_recommended_action, created_at))

        conn.commit()
        conn.close()

        return review_id

    def get_all_reviews(self, rating_filter: Optional[int] = None) -> List[dict]:
        """Retrieve all reviews with optional rating filter"""
        conn = self.get_connection()
        cursor = conn.cursor()

        if rating_filter:
            cursor.execute("""
                SELECT rating, review_text, ai_summary, ai_recommended_action, created_at
                FROM reviews
                WHERE rating = ?
                ORDER BY created_at DESC
            """, (rating_filter,))
        else:
            cursor.execute("""
                SELECT rating, review_text, ai_summary, ai_recommended_action, created_at
                FROM reviews
                ORDER BY created_at DESC
            """)

        rows = cursor.fetchall()
        conn.close()

        return [dict(row) for row in rows]

# ============================================================================
# LLM SERVICE LAYER
# ============================================================================

class LLMService:
    """Handles all LLM interactions with retry logic and fallbacks"""

    def __init__(self, provider: str, api_key: Optional[str]):
        self.provider = provider
        self.api_key = api_key

        if not api_key:
            raise ValueError(f"API key required for {provider}")

    async def generate_feedback(self, rating: int, review_text: str) -> dict:
        """
        Generate AI feedback with retry logic
        Returns: {ai_response, ai_summary, ai_recommended_action}
        """
        for attempt in range(Config.LLM_MAX_RETRIES):
            try:
                if self.provider == "anthropic":
                    return await self._call_anthropic(rating, review_text)
                elif self.provider == "google":
                    return await self._call_google(rating, review_text)
                else:
                    raise ValueError(f"Unsupported provider: {self.provider}")

            except httpx.TimeoutException:
                if attempt == Config.LLM_MAX_RETRIES - 1:
                    return self._get_fallback_response(rating)
                continue

            except Exception as e:
                if attempt == Config.LLM_MAX_RETRIES - 1:
                    print(f"LLM error after retries: {e}")
                    return self._get_fallback_response(rating)
                continue

        return self._get_fallback_response(rating)

    async def _call_anthropic(self, rating: int, review_text: str) -> dict:
        """Call Anthropic Claude API"""
        prompt = self._build_prompt(rating, review_text)

        async with httpx.AsyncClient(timeout=Config.LLM_TIMEOUT) as client:
            response = await client.post(
                "https://api.anthropic.com/v1/messages",
                headers={
                    "x-api-key": self.api_key,
                    "anthropic-version": "2023-06-01",
                    "content-type": "application/json"
                },
                json={
                    "model": "claude-sonnet-4-20250514",
                    "max_tokens": 1000,
                    "messages": [{
                        "role": "user",
                        "content": prompt
                    }]
                }
            )

            response.raise_for_status()
            data = response.json()

            # Extract text from response
            content = data.get("content", [])
            if content and len(content) > 0:
                llm_output = content[0].get("text", "")
                return self._parse_llm_output(llm_output)

            raise ValueError("Empty response from Claude")

    async def _call_google(self, rating: int, review_text: str) -> dict:
        """Call Google Gemini API"""
        prompt = self._build_prompt(rating, review_text)

        async with httpx.AsyncClient(timeout=Config.LLM_TIMEOUT) as client:
            response = await client.post(
                f"https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent?key={self.api_key}",
                json={
                    "contents": [{
                        "parts": [{"text": prompt}]
                    }]
                }
            )

            response.raise_for_status()
            data = response.json()

            # Extract text from Gemini response
            candidates = data.get("candidates", [])
            if candidates and len(candidates) > 0:
                parts = candidates[0].get("content", {}).get("parts", [])
                if parts:
                    llm_output = parts[0].get("text", "")
                    return self._parse_llm_output(llm_output)

            raise ValueError("Empty response from Gemini")

    def _build_prompt(self, rating: int, review_text: str) -> str:
        """Build structured prompt for LLM"""
        return f"""You are an AI customer feedback analyst for an e-commerce platform.

A customer has submitted the following review:
- Rating: {rating}/5 stars
- Review: {review_text}

Generate a JSON response with exactly these three fields:

1. "user_response": A warm, professional reply to the customer (2-3 sentences)
2. "admin_summary": A concise summary for internal teams (1 sentence)
3. "recommended_action": A clear next action for the business (e.g., "Follow up within 24 hours", "No action required", "Escalate to product team")

CRITICAL: Respond ONLY with valid JSON. No markdown, no code blocks, no additional text.

Example format:
{{
  "user_response": "Thank you for your feedback...",
  "admin_summary": "Customer experienced shipping delay",
  "recommended_action": "Follow up within 24 hours"
}}"""

    def _parse_llm_output(self, llm_output: str) -> dict:
        """Parse LLM output with fallback handling"""
        try:
            # Clean potential markdown artifacts
            cleaned = llm_output.strip()
            if cleaned.startswith("```"):
                lines = cleaned.split("\n")
                cleaned = "\n".join(lines[1:-1]) if len(lines) > 2 else cleaned

            parsed = json.loads(cleaned)

            return {
                "ai_response": parsed.get("user_response", ""),
                "ai_summary": parsed.get("admin_summary", ""),
                "ai_recommended_action": parsed.get("recommended_action", "")
            }

        except json.JSONDecodeError:
            # Fallback: extract manually
            return {
                "ai_response": "Thank you for your feedback. We appreciate you taking the time to share your experience.",
                "ai_summary": f"Customer rating: {llm_output[:100]}",
                "ai_recommended_action": "Review manually"
            }

    def _get_fallback_response(self, rating: int) -> dict:
        """Generate fallback response when LLM fails"""
        if rating <= 2:
            return {
                "ai_response": "Thank you for your feedback. We take all concerns seriously and will investigate this matter promptly.",
                "ai_summary": "Low rating received - requires immediate attention",
                "ai_recommended_action": "Escalate to customer success team within 4 hours"
            }
        elif rating == 3:
            return {
                "ai_response": "Thank you for your feedback. We're always working to improve our service.",
                "ai_summary": "Neutral feedback - monitor for patterns",
                "ai_recommended_action": "Add to weekly review queue"
            }
        else:
            return {
                "ai_response": "Thank you for your positive feedback! We're delighted to hear about your experience.",
                "ai_summary": "Positive customer feedback",
                "ai_recommended_action": "No immediate action required"
            }

# ============================================================================
# PYDANTIC MODELS
# ============================================================================

class ReviewSubmission(BaseModel):
    """Request model for review submission"""
    rating: int = Field(..., ge=1, le=5, description="Rating from 1 to 5")
    review_text: str = Field(..., min_length=1, max_length=5000, description="Review text")

    @field_validator("review_text")
    @classmethod
    def validate_review_text(cls, v: str):
        if not v.strip():
            raise ValueError("Review text cannot be empty")
        return v.strip()


class ReviewSubmissionResponse(BaseModel):
    """Response model for review submission"""
    status: str
    ai_response: str

class ReviewRecord(BaseModel):
    """Model for individual review record"""
    rating: int
    review_text: str
    ai_summary: str
    ai_recommended_action: str
    created_at: str

class ReviewsResponse(BaseModel):
    """Response model for reviews list"""
    reviews: List[ReviewRecord]

class HealthResponse(BaseModel):
    """Health check response"""
    status: str
    timestamp: str

# ============================================================================
# FASTAPI APPLICATION
# ============================================================================

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Lifecycle management"""
    # Startup
    print("ðŸš€ Initializing Fynd AI Feedback System...")
    print(f"âœ“ Database: {Config.DATABASE_PATH}")
    print(f"âœ“ LLM Provider: {Config.LLM_PROVIDER}")
    yield
    # Shutdown
    print("Shutting down...")

app = FastAPI(
    title="Fynd AI Feedback System",
    description="Production-grade AI-powered customer feedback system",
    version="1.0.0",
    lifespan=lifespan
)

# CORS configuration
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize services
db_manager = DatabaseManager(Config.DATABASE_PATH)
llm_service = LLMService(
    provider=Config.LLM_PROVIDER,
    api_key=Config.ANTHROPIC_API_KEY if Config.LLM_PROVIDER == "anthropic" else Config.GOOGLE_API_KEY
)

# ============================================================================
# API ENDPOINTS
# ============================================================================

@app.post("/submit-review", response_model=ReviewSubmissionResponse)
async def submit_review(submission: ReviewSubmission):
    """
    Submit a customer review and receive AI-generated feedback
    """
    try:
        # Truncate if too long
        review_text = submission.review_text
        if len(review_text) > Config.MAX_REVIEW_LENGTH:
            review_text = review_text[:Config.MAX_REVIEW_LENGTH]

        # Generate AI feedback
        feedback = await llm_service.generate_feedback(
            rating=submission.rating,
            review_text=review_text
        )

        # Store in database
        db_manager.insert_review(
            rating=submission.rating,
            review_text=review_text,
            ai_response=feedback["ai_response"],
            ai_summary=feedback["ai_summary"],
            ai_recommended_action=feedback["ai_recommended_action"]
        )

        return ReviewSubmissionResponse(
            status="success",
            ai_response=feedback["ai_response"]
        )

    except Exception as e:
        print(f"Error processing review: {e}")
        raise HTTPException(
            status_code=500,
            detail="Failed to process review. Please try again."
        )

@app.get("/get-reviews", response_model=ReviewsResponse)
async def get_reviews(rating: Optional[int] = Query(None, ge=1, le=5)):
    """
    Retrieve all reviews with optional rating filter
    """
    try:
        reviews = db_manager.get_all_reviews(rating_filter=rating)
        return ReviewsResponse(reviews=reviews)

    except Exception as e:
        print(f"Error retrieving reviews: {e}")
        raise HTTPException(
            status_code=500,
            detail="Failed to retrieve reviews"
        )

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """
    Health check endpoint
    """
    return HealthResponse(
        status="healthy",
        timestamp=datetime.utcnow().isoformat()
    )

# ============================================================================
# MAIN ENTRY POINT
# ============================================================================

import asyncio
import uvicorn

config = uvicorn.Config(
    app,
    host="0.0.0.0",
    port=8000,
    log_level="info",
)

server = uvicorn.Server(config)

await server.serve()


INFO:     Started server process [1098]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


ðŸš€ Initializing Fynd AI Feedback System...
âœ“ Database: reviews.db
âœ“ LLM Provider: google
