<a href="https://colab.research.google.com/github/Aryan-Dessai-25/QuickTest_NLP/blob/main/Test_NLP_models_with_API.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Dependencies Installation
# Force reinstall GPU-compatible PyTorch with Triton support
!pip uninstall -y torch torchvision torchaudio numpy

# Install GPU-compatible PyTorch
!pip install --no-cache-dir --upgrade torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# Install all required and compatible packages
!pip install --no-cache-dir \
  transformers==4.35.2 \
  sentence-transformers==2.3.1 \
  faiss-cpu==1.7.4 \
  fastapi==0.105.0 \
  uvicorn==0.24.0.post1 \
  python-multipart==0.0.6 \
  pyngrok==7.0.0 \
  langchain==0.0.350 \
  langchain-community==0.0.13 \
  pillow==10.0.1 \
  numpy==1.26.4 \
  psutil \
  peft==0.7.1 \
  nest-asyncio

# Restart the runtime after running this cell to apply changes


In [None]:
import os
from pyngrok import ngrok


NGROK_AUTH_TOKEN = ""  # Replace this with your own token
ngrok.set_auth_token(NGROK_AUTH_TOKEN)

In [None]:

MODEL_CONFIG = {
    # Summarization models
    "summarization_model": "philschmid/bart-large-cnn-samsum",
    "summarization_peft_adapter": None,  # Set to adapter path if needed

    # RAG models for extractive QnA
    "embedding_model": "sentence-transformers/all-MiniLM-L6-v2",
    "qa_model": "distilbert-base-cased-distilled-squad",
    "qa_peft_adapter": None,  # Set to adapter path if needed

    # RAG models for abstractive QnA
    "abstractive_embedding_model": "sentence-transformers/all-MiniLM-L6-v2",
    "abstractive_qa_model": "microsoft/DialoGPT-small",  # Lightweight CausalLM
    "abstractive_qa_peft_adapter": None,  # Set to adapter path if needed

    # Alternative models for abstractive QA
    # "abstractive_qa_model": "distilgpt2",  # Even smaller option
    # "abstractive_qa_model": "microsoft/DialoGPT-medium",  # Larger but still T4-friendly

    # Sentiment Classification models
    "sentiment_model": "cardiffnlp/twitter-roberta-base-sentiment-latest",
    "sentiment_peft_adapter": None,  # Set to adapter path if needed

    # Named Entity Recognition models
    "ner_model": "dbmdz/bert-large-cased-finetuned-conll03-english",
    "ner_peft_adapter": None,  # Set to adapter path if needed
}

print("Updated Model Configuration:")
for key, value in MODEL_CONFIG.items():
    print(f"  {key}: {value}")

In [None]:
# Extractive Q&A + RAG System with PEFT Support
from langchain_community.vectorstores.faiss import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document as LangchainDocument
from transformers import AutoTokenizer, AutoModelForQuestionAnswering
from peft import PeftModel
import torch
import torch.nn.functional as F
import gc
import importlib

class RAGSystem:
    def __init__(self):
        self.embedding_model = None
        self.tokenizer = None
        self.qa_model = None
        self.vector_store = None

        self.embedding_model_name = MODEL_CONFIG["embedding_model"]
        self.qa_model_name = MODEL_CONFIG["qa_model"]
        self.qa_peft_adapter = MODEL_CONFIG["qa_peft_adapter"]

        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=50,
            length_function=len
        )

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print("RAG system initialized - models will be loaded when needed")

    def _load_embedding_model(self):
        if self.embedding_model is None:
            print("Loading embedding model...")
            try:
                importlib.invalidate_caches()
                self.embedding_model = HuggingFaceEmbeddings(
                    model_name=self.embedding_model_name,
                    model_kwargs={"device": self.device}
                )
            except ImportError:
                raise RuntimeError(
                    "sentence-transformers is not installed. "
                    "Install it using: pip install sentence-transformers"
                )
            print("Embedding model loaded")

    def _load_qa_model(self):
        if self.qa_model is None or self.tokenizer is None:
            print("Loading QA model...")
            self.tokenizer = AutoTokenizer.from_pretrained(self.qa_model_name)
            self.qa_model = AutoModelForQuestionAnswering.from_pretrained(self.qa_model_name)

            # Load PEFT adapter if specified
            if self.qa_peft_adapter:
                print(f"Loading PEFT adapter: {self.qa_peft_adapter}")
                self.qa_model = PeftModel.from_pretrained(self.qa_model, self.qa_peft_adapter)

            self.qa_model.to(self.device)
            print("QA model loaded")

    def _unload_qa_model(self):
        self.qa_model = None
        self.tokenizer = None
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

    def add_documents(self, documents):
        try:
            self._load_embedding_model()

            # Flatten and chunk text
            all_chunks = []
            for doc in documents:
                chunks = self.text_splitter.split_text(doc["content"])
                for chunk in chunks:
                    all_chunks.append(
                        LangchainDocument(page_content=chunk, metadata=doc["metadata"])
                    )

            if not all_chunks:
                return {"status": "error", "message": "No valid document content to index."}

            if self.vector_store is None:
                self.vector_store = FAISS.from_documents(all_chunks, self.embedding_model)
            else:
                self.vector_store.add_documents(all_chunks)

            return {
                "status": "success",
                "message": f"Added {len(documents)} documents with {len(all_chunks)} total chunks."
            }

        except Exception as e:
            return {
                "status": "error",
                "message": f"Failed to add documents: {str(e)}"
            }

    def answer_question(self, question, top_k=3):
        try:
            if self.vector_store is None:
                return {"status": "error", "message": "Knowledge base is empty."}

            self._load_embedding_model()

            docs = self.vector_store.similarity_search(question, k=top_k)
            contexts = [doc.page_content for doc in docs]
            combined_context = " ".join(contexts)

            self._load_qa_model()

            inputs = self.tokenizer(
                question,
                combined_context,
                return_tensors="pt",
                max_length=512,
                truncation=True,
                padding=True
            )
            inputs = {k: v.to(self.device) for k, v in inputs.items()}

            with torch.no_grad():
                outputs = self.qa_model(**inputs)

            answer_start = torch.argmax(outputs.start_logits)
            answer_end = torch.argmax(outputs.end_logits) + 1

            input_ids = inputs["input_ids"][0]
            answer = self.tokenizer.convert_tokens_to_string(
                self.tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end])
            )

            if not answer.strip():
                answer = "I don't have enough information to answer that question."

            result = {
                "status": "success",
                "answer": answer,
                "sources": [{"content": doc.page_content, "metadata": doc.metadata} for doc in docs]
            }

            return result

        except Exception as e:
            return {"status": "error", "message": str(e)}

        finally:
            self._unload_qa_model()

In [None]:
# Abstractive Answering RAG System with PEFT Support
from langchain_community.vectorstores.faiss import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document as LangchainDocument
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
from peft import PeftModel
import torch
import torch.nn.functional as F
import gc
import importlib
import re

class AbstractiveRAGSystem:
    def __init__(self):
        self.embedding_model = None
        self.tokenizer = None
        self.qa_model = None
        self.vector_store = None

        self.embedding_model_name = MODEL_CONFIG["abstractive_embedding_model"]
        self.qa_model_name = MODEL_CONFIG["abstractive_qa_model"]
        self.qa_peft_adapter = MODEL_CONFIG["abstractive_qa_peft_adapter"]

        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=400,  # Slightly smaller for better context management
            chunk_overlap=40,
            length_function=len
        )

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Generation configuration
        self.generation_config = GenerationConfig(
            max_new_tokens=150,
            min_length=10,
            do_sample=True,
            temperature=0.7,
            top_p=0.9,
            top_k=50,
            repetition_penalty=1.1,
            pad_token_id=None,  # Will be set after loading tokenizer
            eos_token_id=None,  # Will be set after loading tokenizer
        )

        print("Abstractive RAG system initialized - models will be loaded when needed")

    def _load_embedding_model(self):
        """Load the embedding model for document retrieval"""
        if self.embedding_model is None:
            print("Loading embedding model...")
            try:
                importlib.invalidate_caches()
                self.embedding_model = HuggingFaceEmbeddings(
                    model_name=self.embedding_model_name,
                    model_kwargs={"device": self.device}
                )
            except ImportError:
                raise RuntimeError(
                    "sentence-transformers is not installed. "
                    "Install it using: pip install sentence-transformers"
                )
            print("Embedding model loaded")

    def _load_qa_model(self):
        """Load the CausalLM model for answer generation"""
        if self.qa_model is None or self.tokenizer is None:
            print("Loading Causal LM model...")

            # Load tokenizer
            self.tokenizer = AutoTokenizer.from_pretrained(self.qa_model_name)

            # Set pad token if not exists
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token

            # Load model with optimizations for T4 GPU
            self.qa_model = AutoModelForCausalLM.from_pretrained(
                self.qa_model_name,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                device_map="auto" if torch.cuda.is_available() else None,
                low_cpu_mem_usage=True
            )

            # Load PEFT adapter if specified
            if self.qa_peft_adapter:
                print(f"Loading PEFT adapter: {self.qa_peft_adapter}")
                self.qa_model = PeftModel.from_pretrained(self.qa_model, self.qa_peft_adapter)

            if not torch.cuda.is_available():
                self.qa_model.to(self.device)

            # Update generation config with tokenizer info
            self.generation_config.pad_token_id = self.tokenizer.pad_token_id
            self.generation_config.eos_token_id = self.tokenizer.eos_token_id

            print("Causal LM model loaded")

    def _unload_qa_model(self):
        """Unload QA model to free memory"""
        if self.qa_model is not None:
            del self.qa_model
            self.qa_model = None
        if self.tokenizer is not None:
            del self.tokenizer
            self.tokenizer = None
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

    def add_documents(self, documents):
        """Add documents to the knowledge base"""
        try:
            self._load_embedding_model()

            # Flatten and chunk text
            all_chunks = []
            for doc in documents:
                chunks = self.text_splitter.split_text(doc["content"])
                for chunk in chunks:
                    all_chunks.append(
                        LangchainDocument(page_content=chunk, metadata=doc["metadata"])
                    )

            if not all_chunks:
                return {"status": "error", "message": "No valid document content to index."}

            if self.vector_store is None:
                self.vector_store = FAISS.from_documents(all_chunks, self.embedding_model)
            else:
                self.vector_store.add_documents(all_chunks)

            return {
                "status": "success",
                "message": f"Added {len(documents)} documents with {len(all_chunks)} total chunks."
            }

        except Exception as e:
            return {
                "status": "error",
                "message": f"Failed to add documents: {str(e)}"
            }

    def _create_prompt(self, question, context):
        """Create a structured prompt for the language model"""
        prompt = f"""Context: {context}

Question: {question}

Answer: Based on the provided context, """
        return prompt

    def _clean_generated_answer(self, generated_text, original_prompt):
        """Clean and extract the answer from generated text"""
        # Remove the original prompt from the generated text
        if original_prompt in generated_text:
            answer = generated_text.replace(original_prompt, "").strip()
        else:
            answer = generated_text.strip()

        # Remove any remaining prompt artifacts
        answer = re.sub(r'^Answer:\s*', '', answer)
        answer = re.sub(r'^Based on the provided context,?\s*', '', answer, flags=re.IGNORECASE)

        # Clean up common generation artifacts
        answer = re.sub(r'\n+', ' ', answer)  # Replace multiple newlines with space
        answer = re.sub(r'\s+', ' ', answer)  # Replace multiple spaces with single space

        # Truncate at sentence boundaries if too long
        sentences = answer.split('.')
        if len(sentences) > 3:
            answer = '. '.join(sentences[:3]) + '.'

        return answer.strip()

    def answer_question(self, question, top_k=3, max_context_length=800):
        """Generate an abstractive answer to a question using retrieved context"""
        try:
            if self.vector_store is None:
                return {"status": "error", "message": "Knowledge base is empty."}

            self._load_embedding_model()

            # Retrieve relevant documents
            docs = self.vector_store.similarity_search(question, k=top_k)
            contexts = [doc.page_content for doc in docs]
            combined_context = " ".join(contexts)

            # Truncate context if too long
            if len(combined_context) > max_context_length:
                combined_context = combined_context[:max_context_length] + "..."

            self._load_qa_model()

            # Create prompt
            prompt = self._create_prompt(question, combined_context)

            # Tokenize input
            inputs = self.tokenizer(
                prompt,
                return_tensors="pt",
                max_length=512,
                truncation=True,
                padding=True
            )

            # Move to device
            input_ids = inputs["input_ids"].to(self.device)
            attention_mask = inputs["attention_mask"].to(self.device)

            # Generate answer
            with torch.no_grad():
                outputs = self.qa_model.generate(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    generation_config=self.generation_config,
                    return_dict_in_generate=True,
                    output_scores=True
                )

            # Decode generated text
            generated_ids = outputs.sequences[0]
            generated_text = self.tokenizer.decode(generated_ids, skip_special_tokens=True)

            # Clean the answer
            answer = self._clean_generated_answer(generated_text, prompt)

            if not answer.strip():
                answer = "I don't have enough information to provide a comprehensive answer to that question."

            result = {
                "status": "success",
                "answer": answer,
                "sources": [{"content": doc.page_content, "metadata": doc.metadata} for doc in docs],
                "context_used": combined_context[:200] + "..." if len(combined_context) > 200 else combined_context  # For debugging
            }

            return result

        except Exception as e:
            return {"status": "error", "message": f"Error generating answer: {str(e)}"}

        finally:
            self._unload_qa_model()

    def get_vector_store_info(self):
        """Get information about the current vector store"""
        if self.vector_store is None:
            return {"status": "empty", "message": "No documents in knowledge base"}

        try:
            # Get the number of documents in the vector store
            doc_count = self.vector_store.index.ntotal if hasattr(self.vector_store, 'index') else "Unknown"
            return {
                "status": "ready",
                "document_count": doc_count,
                "embedding_model": self.embedding_model_name
            }
        except Exception as e:
            return {"status": "error", "message": f"Error getting vector store info: {str(e)}"}

    def clear_knowledge_base(self):
        """Clear the vector store and free memory"""
        if self.vector_store is not None:
            del self.vector_store
            self.vector_store = None

        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

        return {"status": "success", "message": "Knowledge base cleared"}

In [None]:
# Summarization Module with PEFT Support
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from langchain.text_splitter import RecursiveCharacterTextSplitter
from peft import PeftModel
import torch
import gc

class TextSummarizer:
    def __init__(self):
        self.model_name = MODEL_CONFIG["summarization_model"]
        self.peft_adapter = MODEL_CONFIG["summarization_peft_adapter"]
        self.tokenizer = None
        self.model = None
        print(f"Summarizer initialized with model '{self.model_name}' - model will be loaded when needed")

    def _load_model(self):
        if self.tokenizer is None or self.model is None:
            print("Loading summarization model...")
            self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name)

            # Load PEFT adapter if specified
            if self.peft_adapter:
                print(f"Loading PEFT adapter: {self.peft_adapter}")
                self.model = PeftModel.from_pretrained(self.model, self.peft_adapter)

            if torch.cuda.is_available():
                self.model = self.model.to("cuda")
            print("Model loaded")

    def _unload_model(self):
        self.tokenizer = None
        self.model = None
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

    def _smart_chunk(self, text, chunk_size=512, chunk_overlap=50):
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", ".", "!", "?", ",", " "],
        )
        return splitter.split_text(text)

    def summarize(self, text, max_length=150, min_length=40):
        try:
            self._load_model()

            chunks = self._smart_chunk(text)
            summaries = []

            for chunk in chunks:
                inputs = self.tokenizer(chunk, return_tensors="pt", truncation=True, max_length=512)
                if torch.cuda.is_available():
                    inputs = {k: v.to("cuda") for k, v in inputs.items()}

                summary_ids = self.model.generate(
                    inputs["input_ids"],
                    max_length=max_length,
                    min_length=min_length,
                    num_beams=4,
                    length_penalty=2.0,
                    early_stopping=True
                )

                summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
                summaries.append(summary)

            final_summary = " ".join(summaries)

            self._unload_model()

            return {
                "status": "success",
                "summary": final_summary,
                "note": f"Processed in {len(chunks)} chunk(s) using LangChain chunking"
            }

        except Exception as e:
            self._unload_model()
            return {
                "status": "error",
                "message": str(e)
            }


In [None]:
# Sentiment Classification Module
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
from peft import PeftModel
import torch
import gc

class SentimentClassifier:
    def __init__(self):
        self.model_name = MODEL_CONFIG["sentiment_model"]
        self.peft_adapter = MODEL_CONFIG["sentiment_peft_adapter"]
        self.tokenizer = None
        self.model = None
        self.pipeline = None
        print(f"Sentiment Classifier initialized with model '{self.model_name}' - model will be loaded when needed")

    def _load_model(self):
        if self.tokenizer is None or self.model is None:
            print("Loading sentiment classification model...")
            self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name)

            # Load PEFT adapter if specified
            if self.peft_adapter:
                print(f"Loading PEFT adapter: {self.peft_adapter}")
                self.model = PeftModel.from_pretrained(self.model, self.peft_adapter)

            device = 0 if torch.cuda.is_available() else -1
            self.pipeline = pipeline(
                "sentiment-analysis",
                model=self.model,
                tokenizer=self.tokenizer,
                device=device
            )
            print("Sentiment model loaded")

    def _unload_model(self):
        self.tokenizer = None
        self.model = None
        self.pipeline = None
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

    def classify_sentiment(self, text):
        try:
            self._load_model()

            # Handle text length
            if len(text) > 512:
                text = text[:512]

            result = self.pipeline(text)

            # Standardize output format
            sentiment_result = {
                "status": "success",
                "text": text,
                "sentiment": result[0]["label"].lower(),
                "confidence": round(result[0]["score"], 4),
                "raw_output": result
            }

            return sentiment_result

        except Exception as e:
            return {
                "status": "error",
                "message": str(e)
            }
        finally:
            self._unload_model()

    def batch_classify_sentiment(self, texts):
        try:
            self._load_model()

            # Truncate texts if needed
            processed_texts = [text[:512] if len(text) > 512 else text for text in texts]

            results = self.pipeline(processed_texts)

            batch_results = []
            for i, result in enumerate(results):
                batch_results.append({
                    "text": processed_texts[i],
                    "sentiment": result["label"].lower(),
                    "confidence": round(result["score"], 4)
                })

            return {
                "status": "success",
                "results": batch_results,
                "count": len(batch_results)
            }

        except Exception as e:
            return {
                "status": "error",
                "message": str(e)
            }
        finally:
            self._unload_model()

In [None]:
# Named Entity Recognition Module
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
from peft import PeftModel
import torch
import gc

class NamedEntityRecognizer:
    def __init__(self):
        self.model_name = MODEL_CONFIG["ner_model"]
        self.peft_adapter = MODEL_CONFIG["ner_peft_adapter"]
        self.tokenizer = None
        self.model = None
        self.pipeline = None
        print(f"NER initialized with model '{self.model_name}' - model will be loaded when needed")

    def _load_model(self):
        if self.tokenizer is None or self.model is None:
            print("Loading NER model...")
            self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            self.model = AutoModelForTokenClassification.from_pretrained(self.model_name)

            # Load PEFT adapter if specified
            if self.peft_adapter:
                print(f"Loading PEFT adapter: {self.peft_adapter}")
                self.model = PeftModel.from_pretrained(self.model, self.peft_adapter)

            device = 0 if torch.cuda.is_available() else -1
            self.pipeline = pipeline(
                "ner",
                model=self.model,
                tokenizer=self.tokenizer,
                device=device,
                aggregation_strategy="simple"
            )
            print("NER model loaded")

    def _unload_model(self):
        self.tokenizer = None
        self.model = None
        self.pipeline = None
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

    def extract_entities(self, text):
        try:
            self._load_model()

            # Handle text length
            if len(text) > 512:
                text = text[:512]

            entities = self.pipeline(text)

            # Process and clean entities
            processed_entities = []
            for entity in entities:
                processed_entities.append({
                    "text": entity["word"],
                    "label": entity["entity_group"],
                    "confidence": round(float(entity["score"]), 4),
                    "start": entity["start"],
                    "end": entity["end"]
                })

            # Group entities by type
            entities_by_type = {}
            for entity in processed_entities:
                entity_type = entity["label"]
                if entity_type not in entities_by_type:
                    entities_by_type[entity_type] = []
                entities_by_type[entity_type].append(entity)

            return {
                "status": "success",
                "text": text,
                "entities": processed_entities,
                "entities_by_type": entities_by_type,
                "entity_count": len(processed_entities)
            }

        except Exception as e:
            return {
                "status": "error",
                "message": str(e)
            }
        finally:
            self._unload_model()

    def batch_extract_entities(self, texts):
        try:
            self._load_model()

            batch_results = []
            for text in texts:
                # Handle text length
                if len(text) > 512:
                    text = text[:512]

                entities = self.pipeline(text)

                processed_entities = []
                for entity in entities:
                    processed_entities.append({
                        "text": entity["word"],
                        "label": entity["entity_group"],
                        "confidence": round(entity["score"], 4),
                        "start": entity["start"],
                        "end": entity["end"]
                    })

                batch_results.append({
                    "text": text,
                    "entities": processed_entities,
                    "entity_count": len(processed_entities)
                })

            return {
                "status": "success",
                "results": batch_results,
                "total_texts": len(batch_results)
            }

        except Exception as e:
            return {
                "status": "error",
                "message": str(e)
            }
        finally:
            self._unload_model()


In [None]:
# API Creation
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Body
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import uvicorn
import json
import gc
import torch

app = FastAPI(title="NLP Performance Testing API with Abstractive RAG")

# Enable CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize our modules
summarizer = None
rag_system = None
abstractive_rag_system = None
sentiment_classifier = None
ner_system = None

@app.on_event("startup")
async def startup_event():
    import subprocess
    import sys

    # Force install inside FastAPI process
    subprocess.call([sys.executable, "-m", "pip", "install", "sentence-transformers"])
    global summarizer, rag_system, abstractive_rag_system, sentiment_classifier, ner_system

    summarizer = TextSummarizer()
    rag_system = RAGSystem()
    abstractive_rag_system = AbstractiveRAGSystem()
    sentiment_classifier = SentimentClassifier()
    ner_system = NamedEntityRecognizer()
    print("API initialized - models will be loaded on demand")

# Define request models
class Document(BaseModel):
    content: str
    metadata: Dict[str, Any]

class QuestionRequest(BaseModel):
    question: str
    top_k: Optional[int] = 3
    max_context_length: Optional[int] = 800

class SummarizeRequest(BaseModel):
    text: str
    max_length: Optional[int] = 150
    min_length: Optional[int] = 40

class SentimentRequest(BaseModel):
    text: str

class BatchSentimentRequest(BaseModel):
    texts: List[str]

class NERRequest(BaseModel):
    text: str

class BatchNERRequest(BaseModel):
    texts: List[str]

# Define API endpoints

# Extractive RAG endpoints
@app.post("/rag/add_documents")
async def add_documents(documents: List[Document]):
    """Add documents to the extractive RAG knowledge base"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    plain_docs = [doc.dict() for doc in documents]
    result = rag_system.add_documents(plain_docs)
    return result

@app.post("/rag/answer")
async def answer_question(request: QuestionRequest):
    """Answer questions using extractive RAG (span extraction)"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    result = rag_system.answer_question(request.question, top_k=request.top_k)
    return result

# Abstractive RAG endpoints
@app.post("/abstractive_rag/add_documents")
async def add_documents_abstractive(documents: List[Document]):
    """Add documents to the abstractive RAG knowledge base"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    plain_docs = [doc.dict() for doc in documents]
    result = abstractive_rag_system.add_documents(plain_docs)
    return result

@app.post("/abstractive_rag/answer")
async def answer_question_abstractive(request: QuestionRequest):
    """Answer questions using abstractive RAG (generative answers)"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    result = abstractive_rag_system.answer_question(
        request.question,
        top_k=request.top_k,
        max_context_length=request.max_context_length
    )
    return result

# Summarization endpoint
@app.post("/summarize")
async def summarize_text(request: SummarizeRequest):
    """Summarize text using the summarization model"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    result = summarizer.summarize(
        request.text,
        max_length=request.max_length,
        min_length=request.min_length
    )
    return result

# Sentiment Analysis endpoints
@app.post("/sentiment/classify")
async def classify_sentiment(request: SentimentRequest):
    """Classify sentiment of a single text"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    result = sentiment_classifier.classify_sentiment(request.text)
    return result

@app.post("/sentiment/batch_classify")
async def batch_classify_sentiment(request: BatchSentimentRequest):
    """Classify sentiment of multiple texts in batch"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    result = sentiment_classifier.batch_classify_sentiment(request.texts)
    return result

# Named Entity Recognition endpoints
@app.post("/ner/extract")
async def extract_entities(request: NERRequest):
    """Extract named entities from a single text"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    result = ner_system.extract_entities(request.text)
    return result

@app.post("/ner/batch_extract")
async def batch_extract_entities(request: BatchNERRequest):
    """Extract named entities from multiple texts in batch"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    result = ner_system.batch_extract_entities(request.texts)
    return result

# Health check endpoint
@app.get("/health")
async def health_check():
    """Check the health status of all available models"""
    return {
        "status": "healthy",
        "models_available": {
            "summarizer": summarizer is not None,
            "rag_system": rag_system is not None,
            "abstractive_rag_system": abstractive_rag_system is not None,
            "sentiment_classifier": sentiment_classifier is not None,
            "ner_system": ner_system is not None
        },
        "endpoints": {
            "extractive_rag": ["/rag/add_documents", "/rag/answer"],
            "abstractive_rag": ["/abstractive_rag/add_documents", "/abstractive_rag/answer"],
            "summarization": ["/summarize"],
            "sentiment": ["/sentiment/classify", "/sentiment/batch_classify"],
            "ner": ["/ner/extract", "/ner/batch_extract"],
            "health": ["/health"]
        }
    }

# Additional utility endpoints
@app.get("/models/info")
async def get_model_info():
    """Get information about the configured models"""
    return {
        "model_config": MODEL_CONFIG,
        "device": "cuda" if torch.cuda.is_available() else "cpu",
        "cuda_available": torch.cuda.is_available(),
        "gpu_memory": torch.cuda.get_device_properties(0).total_memory if torch.cuda.is_available() else None
    }

@app.post("/models/clear_cache")
async def clear_model_cache():
    """Clear GPU memory cache and run garbage collection"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        return {
            "status": "success",
            "message": "GPU cache cleared and garbage collection run",
            "gpu_memory_allocated": torch.cuda.memory_allocated() if torch.cuda.is_available() else None
        }
    else:
        return {
            "status": "success",
            "message": "Garbage collection run (no GPU available)",
            "gpu_memory_allocated": None
        }



In [None]:
# Cell 8: Deployment (Following Your Exact Structure)
from pyngrok import ngrok
import nest_asyncio
import uvicorn

# Create a public URL
public_url = ngrok.connect(8000)
print(f"Public URL: {public_url}/docs")

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Start the FastAPI server
uvicorn.run(app, host="0.0.0.0", port=8000)
