In [None]:
import os
import re
import json
import time
import yaml
import string
import logging
import hashlib
import unicodedata
from pathlib import Path
from collections import defaultdict
from typing import List, Dict, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.linalg import eigh
from sklearn.metrics import roc_auc_score

from groq import Groq

import torch
import nltk
import spacy
from textblob import TextBlob
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer, PorterStemmer
from nltk.tokenize import word_tokenize, sent_tokenize
from transformers import AutoTokenizer, AutoModelForSequenceClassification

import PyPDF2
import docx
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from sentence_transformers import SentenceTransformer

import chromadb
from chromadb.config import Settings

import warnings
warnings.filterwarnings('ignore')

In [None]:
# Download required NLTK data
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

try:
    nltk.data.find('corpora/stopwords')
except LookupError:
    nltk.download('stopwords')

try:
    nltk.data.find('corpora/wordnet')
except LookupError:
    nltk.download('wordnet')

try:
    nltk.data.find('taggers/averaged_perceptron_tagger')
except LookupError:
    nltk.download('averaged_perceptron_tagger')

In [3]:
# Configure logging
logging.basicConfig(filename='logs.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [4]:
params_path = os.path.join(os.getcwd(), 'config.yaml')
config = None
with open(params_path) as f:
    config = yaml.safe_load(f)

In [None]:
# Configuration
DOCUMENTS_FOLDER = "./test"  # documents folder
GROQ_API_KEY = config['api']['groq']  # Groq API key
CHROMA_DB_PATH = "./database"  # ChromaDB storage path

# Create documents folder if it doesn't exist
os.makedirs(DOCUMENTS_FOLDER, exist_ok=True)

In [3]:
class DocumentHandler(FileSystemEventHandler):
    """File system event handler for document monitoring"""
    
    def __init__(self, rag_system):
        self.rag_system = rag_system
        
    def on_created(self, event):
        if not event.is_directory:
            filepath = Path(event.src_path)
            if filepath.suffix.lower() in self.rag_system.supported_extensions:
                logging.info(f"New document detected: {filepath.name}")
                # Add a small delay to ensure file is fully written
                time.sleep(2)
                self.rag_system.process_document(filepath)
    
    def on_modified(self, event):
        if not event.is_directory:
            filepath = Path(event.src_path)
            if filepath.suffix.lower() in self.rag_system.supported_extensions:
                logging.info(f"Document modified: {filepath.name}")
                # Add a small delay to ensure file is fully written
                time.sleep(2)
                self.rag_system.process_document(filepath)
    
    def on_deleted(self, event):
        if not event.is_directory:
            filepath = Path(event.src_path)
            if filepath.suffix.lower() in self.rag_system.supported_extensions:
                logging.info(f"Document deleted: {filepath.name}")
                self.rag_system.delete_document(filepath)

In [4]:
class RAGSystem:
    def __init__(self, 
                 documents_folder: str,
                 groq_api_key: str,
                 db_path: str = "./chroma_db",
                 collection_name: str = "document_chunks",
                 groq_model: str = "llama3-8b-8192",
                 chunk_size: int = 512,
                 chunk_overlap: int = 50):
        """
        Initialize the RAG system with ChromaDB
        
        Args:
            documents_folder: Path to folder containing documents
            groq_api_key: Groq API key
            db_path: Path to ChromaDB database directory
            collection_name: Name of the ChromaDB collection
            chunk_size: Size of text chunks
            chunk_overlap: Overlap between chunks
        """
        self.documents_folder = Path(documents_folder)
        self.groq_client = Groq(api_key=groq_api_key)
        self.db_path = db_path
        self.collection_name = collection_name
        self.groq_model = groq_model
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        # Initialize embedding model
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
        
        # Initialize ChromaDB
        self.init_chromadb()
        
        # File monitoring
        self.observer = None
        self.event_handler = DocumentHandler(self)
        
        # Supported file extensions
        self.supported_extensions = {'.txt', '.pdf', '.docx', '.md'}
        
        # Document tracking (to check if files have been modified)
        self.document_hashes = self.load_document_hashes()
        
        logging.info("RAG System initialized with ChromaDB")

    def init_chromadb(self):
        """Initialize ChromaDB client and collection"""
        try:
            # Create ChromaDB client with persistent storage
            self.chroma_client = chromadb.PersistentClient(
                path=self.db_path,
                settings=Settings(
                    anonymized_telemetry=False,
                    allow_reset=True
                )
            )
            
            # Get or create collection
            self.collection = self.chroma_client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "Document chunks for RAG system"}
            )
            
            logging.info(f"ChromaDB initialized with collection: {self.collection_name}")
            
        except Exception as e:
            logging.error(f"Error initializing ChromaDB: {e}")
            raise

    def load_document_hashes(self) -> Dict[str, str]:
        """Load document hashes from metadata file"""
        hash_file = Path(self.db_path) / "document_hashes.json"
        if hash_file.exists():
            try:
                with open(hash_file, 'r') as f:
                    return json.load(f)
            except Exception as e:
                logging.warning(f"Error loading document hashes: {e}")
        return {}

    def save_document_hashes(self):
        """Save document hashes to metadata file"""
        hash_file = Path(self.db_path) / "document_hashes.json"
        os.makedirs(Path(self.db_path), exist_ok=True)
        try:
            with open(hash_file, 'w') as f:
                json.dump(self.document_hashes, f)
        except Exception as e:
            logging.error(f"Error saving document hashes: {e}")

    def get_file_hash(self, filepath: Path) -> str:
        """Generate MD5 hash of file content"""
        hash_md5 = hashlib.md5()
        with open(filepath, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()

    def extract_text_from_file(self, filepath: Path) -> str:
        """Extract text from various file formats"""
        try:
            if filepath.suffix.lower() == '.pdf':
                return self._extract_pdf_text(filepath)
            elif filepath.suffix.lower() == '.docx':
                return self._extract_docx_text(filepath)
            elif filepath.suffix.lower() in ['.txt', '.md']:
                with open(filepath, 'r', encoding='utf-8') as file:
                    return file.read()
            else:
                logging.warning(f"Unsupported file format: {filepath.suffix}")
                return ""
        except Exception as e:
            logging.error(f"Error extracting text from {filepath}: {e}")
            return ""

    def _extract_pdf_text(self, filepath: Path) -> str:
        """Extract text from PDF file"""
        text = ""
        try:
            with open(filepath, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                for page in pdf_reader.pages:
                    text += page.extract_text() + "\n"
        except Exception as e:
            logging.error(f"Error reading PDF {filepath}: {e}")
        return text

    def _extract_docx_text(self, filepath: Path) -> str:
        """Extract text from DOCX file"""
        try:
            doc = docx.Document(filepath)
            text = ""
            for paragraph in doc.paragraphs:
                text += paragraph.text + "\n"
            return text
        except Exception as e:
            logging.error(f"Error reading DOCX {filepath}: {e}")
            return ""

    def create_chunks(self, text: str) -> List[str]:
        """Split text into overlapping chunks"""
        if not text.strip():
            return []
        
        words = text.split()
        chunks = []
        
        for i in range(0, len(words), self.chunk_size - self.chunk_overlap):
            chunk_words = words[i:i + self.chunk_size]
            chunk_text = ' '.join(chunk_words)
            if chunk_text.strip():  # Only add non-empty chunks
                chunks.append(chunk_text)
            
        return chunks

    def create_embeddings(self, chunks: List[str]) -> List[List[float]]:
        """Create embeddings for text chunks"""
        if not chunks:
            return []
        
        try:
            embeddings = self.embedding_model.encode(chunks)
            # Convert numpy arrays to lists for ChromaDB compatibility
            return [embedding.tolist() for embedding in embeddings]
        except Exception as e:
            logging.error(f"Error creating embeddings: {e}")
            return []

    def is_document_updated(self, filepath: Path) -> bool:
        """Check if document has been updated since last processing"""
        current_hash = self.get_file_hash(filepath)
        filename = str(filepath)
        
        # Check if file hash has changed
        if filename not in self.document_hashes:
            return True
        
        return self.document_hashes[filename] != current_hash

    def remove_document_from_db(self, filepath: Path):
        """Remove document chunks from ChromaDB"""
        try:
            filename = str(filepath)
            
            # Get all documents with this filename
            results = self.collection.get(
                where={"filename": filename}
            )
            
            if results['ids']:
                # Delete all chunks for this document
                self.collection.delete(ids=results['ids'])
                logging.info(f"Removed {len(results['ids'])} chunks for {filepath.name}")
                
        except Exception as e:
            logging.error(f"Error removing document from ChromaDB: {e}")

    def process_document(self, filepath: Path):
        """Process a single document: extract text, create chunks and embeddings"""
        if filepath.suffix.lower() not in self.supported_extensions:
            logging.warning(f"Skipping unsupported file: {filepath}")
            return
        
        if not self.is_document_updated(filepath):
            logging.info(f"Document {filepath.name} already up to date")
            return
        
        logging.info(f"Processing document: {filepath.name}")
        
        # Remove existing chunks for this document
        self.remove_document_from_db(filepath)
        
        # Extract text
        text = self.extract_text_from_file(filepath)
        if not text.strip():
            logging.warning(f"No text extracted from {filepath}")
            return
        
        # Create chunks
        chunks = self.create_chunks(text)
        if not chunks:
            logging.warning(f"No chunks created from {filepath}")
            return
        
        # Create embeddings
        embeddings = self.create_embeddings(chunks)
        if not embeddings:
            logging.error(f"Failed to create embeddings for {filepath}")
            return
        
        # Save to ChromaDB
        self.save_document_to_db(filepath, chunks, embeddings)
        
        # Update document hash
        self.document_hashes[str(filepath)] = self.get_file_hash(filepath)
        self.save_document_hashes()
        
        logging.info(f"Successfully processed {filepath.name} with {len(chunks)} chunks")

    def save_document_to_db(self, filepath: Path, chunks: List[str], embeddings: List[List[float]]):
        """Save document chunks and embeddings to ChromaDB"""
        try:
            filename = str(filepath)
            file_hash = self.get_file_hash(filepath)
            last_modified = filepath.stat().st_mtime
            
            # Prepare data for ChromaDB
            ids = []
            documents = []
            metadatas = []
            chunk_embeddings = []
            
            for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
                chunk_id = f"{filename}_{file_hash}_{i}"
                
                ids.append(chunk_id)
                documents.append(chunk)
                metadatas.append({
                    "filename": filename,
                    "file_hash": file_hash,
                    "last_modified": last_modified,
                    "chunk_index": i,
                    "file_extension": filepath.suffix.lower()
                })
                chunk_embeddings.append(embedding)
            
            # Add to ChromaDB collection
            self.collection.add(
                ids=ids,
                documents=documents,
                embeddings=chunk_embeddings,
                metadatas=metadatas
            )
            
            logging.info(f"Saved {len(chunks)} chunks to ChromaDB for {filepath.name}")
            
        except Exception as e:
            logging.error(f"Error saving document to ChromaDB: {e}")
            raise

    def scan_documents_folder(self):
        """Scan the documents folder and process all files"""
        if not self.documents_folder.exists():
            logging.error(f"Documents folder does not exist: {self.documents_folder}")
            return
        
        logging.info(f"Scanning documents folder: {self.documents_folder}")
        
        for filepath in self.documents_folder.rglob('*'):
            if filepath.is_file() and filepath.suffix.lower() in self.supported_extensions:
                self.process_document(filepath)

    def search_similar_chunks(self, query: str, top_k: int = 5) -> List[Tuple[str, float, Dict]]:
        """Search for similar chunks based on query using ChromaDB"""
        try:
            # Create query embedding
            query_embedding = self.embedding_model.encode([query])[0].tolist()
            
            # Search in ChromaDB
            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=min(top_k, self.get_total_chunks()),
                include=["documents", "metadatas", "distances"]
            )
            
            if not results['documents'][0]:
                logging.warning("No chunks found in ChromaDB")
                return []
            
            # Convert distances to similarities (ChromaDB returns squared euclidean distances)
            # Similarity = 1 / (1 + distance)
            similarities = []
            for doc, metadata, distance in zip(results['documents'][0], 
                                             results['metadatas'][0], 
                                             results['distances'][0]):
                similarity = 1 / (1 + distance)
                similarities.append((doc, similarity, metadata))
            
            return similarities
            
        except Exception as e:
            logging.error(f"Error searching similar chunks: {e}")
            return []

    def generate_answer(self, query: str, context_chunks: List[str]) -> str:
        """Generate answer using Groq API with retrieved context"""
        try:
            # Prepare context
            context = "\n\n".join(context_chunks)
            
            # Create prompt
            prompt = f"""Answer the question based only on the following context. If the answer cannot be found in the context, say so clearly.

Context:
{context}

Question: Answer the question based on the above context: {query}

Answer:"""
            
            # Call Groq API
            response = self.groq_client.chat.completions.create(
                model=self.groq_model,  # You can change this to other Groq models like "mixtral-8x7b-32768"
                messages=[
                    {"role": "user", "content": prompt}
                ],
                max_tokens=1024,
                temperature=0.1
            )
            
            return response.choices[0].message.content.strip()
            
        except Exception as e:
            logging.error(f"Error generating answer: {e}")
            return f"Error generating answer: {str(e)}"

    def answer_question(self, query: str, top_k: int = 5) -> Dict[str, any]:
        """Answer a question using RAG with ChromaDB"""
        logging.info(f"Answering question: {query}")
        
        # Search for similar chunks
        similar_chunks = self.search_similar_chunks(query, top_k)
        
        if not similar_chunks:
            return {
                "answer": "I couldn't find any relevant information in the documents to answer your question.",
                "sources": [],
                "confidence": 0.0,
                "source_files": []
            }
        
        # Extract context chunks and calculate average confidence
        context_chunks = [chunk[0] for chunk in similar_chunks]
        confidences = [chunk[1] for chunk in similar_chunks]
        metadatas = [chunk[2] for chunk in similar_chunks]
        avg_confidence = sum(confidences) / len(confidences)
        
        # Extract unique source files
        source_files = list(set([meta['filename'] for meta in metadatas]))
        
        # Generate answer
        answer = self.generate_answer(query, context_chunks)
        
        return {
            "answer": answer,
            "sources": [(chunk[0], chunk[1]) for chunk in similar_chunks],
            "confidence": avg_confidence,
            "source_files": source_files,
            "metadata": metadatas
        }

    def start_monitoring(self):
        """Start monitoring the documents folder for changes"""
        if not self.documents_folder.exists():
            logging.error(f"Cannot monitor non-existent folder: {self.documents_folder}")
            return
        
        logging.info(f"Starting to monitor folder: {self.documents_folder}")
        
        self.observer = Observer()
        self.observer.schedule(self.event_handler, str(self.documents_folder), recursive=True)
        self.observer.start()
        
        logging.info("File monitoring started")

    def stop_monitoring(self):
        """Stop monitoring the documents folder"""
        if self.observer and self.observer.is_alive():
            self.observer.stop()
            self.observer.join()
            logging.info("File monitoring stopped")

    def get_total_chunks(self) -> int:
        """Get total number of chunks in the database"""
        try:
            count = self.collection.count()
            return count
        except Exception as e:
            logging.error(f"Error getting chunk count: {e}")
            return 0

    def get_database_stats(self) -> Dict[str, any]:
        """Get statistics about the ChromaDB database"""
        try:
            total_chunks = self.get_total_chunks()
            
            # Get unique documents
            if total_chunks > 0:
                results = self.collection.get(include=["metadatas"])
                unique_files = set()
                file_types = {}
                
                for metadata in results['metadatas']:
                    filename = metadata['filename']
                    file_ext = metadata.get('file_extension', 'unknown')
                    
                    unique_files.add(filename)
                    file_types[file_ext] = file_types.get(file_ext, 0) + 1
                
                return {
                    "total_chunks": total_chunks,
                    "unique_documents": len(unique_files),
                    "file_types": file_types,
                    "documents": list(unique_files)
                }
            else:
                return {
                    "total_chunks": 0,
                    "unique_documents": 0,
                    "file_types": {},
                    "documents": []
                }
                
        except Exception as e:
            logging.error(f"Error getting database stats: {e}")
            return {"error": str(e)}

    def search_documents_by_filename(self, filename_pattern: str) -> List[str]:
        """Search for documents by filename pattern"""
        try:
            results = self.collection.get(
                where={"filename": {"$contains": filename_pattern}},
                include=["metadatas"]
            )
            
            unique_files = set()
            for metadata in results['metadatas']:
                unique_files.add(metadata['filename'])
            
            return list(unique_files)
            
        except Exception as e:
            logging.error(f"Error searching documents: {e}")
            return []

    def delete_document(self, filepath: Path):
        """Delete a document and all its chunks from ChromaDB"""
        try:
            filename = str(filepath)
            
            # Get all chunks for this document
            results = self.collection.get(
                where={"filename": filename}
            )
            
            if results['ids']:
                # Delete all chunks for this document
                self.collection.delete(ids=results['ids'])
                logging.info(f"Deleted {len(results['ids'])} chunks for {filepath.name}")
                
                # Remove from document hashes
                if filename in self.document_hashes:
                    del self.document_hashes[filename]
                    self.save_document_hashes()
                    
        except Exception as e:
            logging.error(f"Error deleting document: {e}")

    def reset_database(self):
        """Reset the entire ChromaDB database"""
        try:
            self.chroma_client.delete_collection(self.collection_name)
            self.collection = self.chroma_client.create_collection(
                name=self.collection_name,
                metadata={"description": "Document chunks for RAG system"}
            )
            self.document_hashes = {}
            self.save_document_hashes()
            logging.info("Database reset successfully")
        except Exception as e:
            logging.error(f"Error resetting database: {e}")


In [7]:
llm_models = None
file_path = os.path.join(os.getcwd(), 'llm_models.txt')
with open(file_path, 'r') as f:
    llm_models = [line.strip() for line in f if line.strip()]
print(len(llm_models))

13


In [None]:
class TextPreprocessor:
    
    def __init__(self, language = 'english'):
        self.language = language
        self.stop_words = set(stopwords.words(language))
        self.lemmatizer = WordNetLemmatizer()
        self.stemmer = PorterStemmer()
        
        # Try to load spaCy model for advanced processing
        self.nlp = None
        try:
            self.nlp = spacy.load('en_core_web_sm')
        except OSError:
            print("Install with: python -m spacy download en_core_web_sm")
    
    def clean_text(self, text):
        if not text or not isinstance(text, str):
            return ""
        
        # Remove extra whitespace and normalize
        text = re.sub(r'\s+', ' ', text.strip())
        
        # Remove or replace special characters and symbols
        text = re.sub(r'[^\w\s\.\!\?\,\:\;\-\'\"]', ' ', text)
        
        # Fix common encoding issues
        text = text.encode('ascii', 'ignore').decode('ascii')
        
        # Normalize unicode characters
        text = unicodedata.normalize('NFKD', text)
        
        return text
    
    def normalize_text(self, text, 
                      lowercase = True,
                      remove_punctuation = False,
                      remove_numbers = False,
                      expand_contractions = True):
        if not text:
            return ""
        
        # Expand contractions
        if expand_contractions:
            text = self._expand_contractions(text)
        
        # Convert to lowercase
        if lowercase:
            text = text.lower()
        
        # Remove numbers
        if remove_numbers:
            text = re.sub(r'\d+', '', text)
        
        # Remove punctuation
        if remove_punctuation:
            text = text.translate(str.maketrans('', '', string.punctuation))
        
        # Clean up extra spaces
        text = re.sub(r'\s+', ' ', text.strip())
        
        return text
    
    def _expand_contractions(self, text):
        contractions = {
            "ain't": "are not", "aren't": "are not", "can't": "cannot",
            "couldn't": "could not", "didn't": "did not", "doesn't": "does not",
            "don't": "do not", "hadn't": "had not", "hasn't": "has not",
            "haven't": "have not", "he'd": "he would", "he'll": "he will",
            "he's": "he is", "i'd": "i would", "i'll": "i will", "i'm": "i am",
            "i've": "i have", "isn't": "is not", "it'd": "it would",
            "it'll": "it will", "it's": "it is", "let's": "let us",
            "shouldn't": "should not", "that's": "that is", "there's": "there is",
            "they'd": "they would", "they'll": "they will", "they're": "they are",
            "they've": "they have", "we'd": "we would", "we're": "we are",
            "we've": "we have", "weren't": "were not", "what's": "what is",
            "where's": "where is", "who's": "who is", "won't": "will not",
            "wouldn't": "would not", "you'd": "you would", "you'll": "you will",
            "you're": "you are", "you've": "you have"
        }
        
        for contraction, expansion in contractions.items():
            text = re.sub(r'\b' + contraction + r'\b', expansion, text, flags=re.IGNORECASE)
        
        return text
    
    def remove_stopwords(self, text, custom_stopwords = None):
        if not text:
            return ""
        
        tokens = word_tokenize(text)
        stop_words = self.stop_words.copy()
        
        if custom_stopwords:
            stop_words.update(custom_stopwords)
        
        filtered_tokens = [token for token in tokens if token.lower() not in stop_words]
        
        return ' '.join(filtered_tokens)
    
    def lemmatize_text(self, text):
        if not text:
            return ""
        
        tokens = word_tokenize(text)
        lemmatized = [self.lemmatizer.lemmatize(token.lower()) for token in tokens]
        
        return ' '.join(lemmatized)
    
    def stem_text(self, text):
        if not text:
            return ""
        
        tokens = word_tokenize(text)
        stemmed = [self.stemmer.stem(token.lower()) for token in tokens]
        
        return ' '.join(stemmed)
    
    def extract_entities(self, text):
        entities = defaultdict(list)
        
        if self.nlp is None:
            return dict(entities)
        
        doc = self.nlp(text)
        for ent in doc.ents:
            entities[ent.label_].append(ent.text)
        
        return dict(entities)
    
    def correct_spelling(self, text):
        if not text:
            return ""
        
        try:
            blob = TextBlob(text)
            return str(blob.correct())
        except:
            return text
    
    def segment_sentences(self, text):
        if not text:
            return []
        
        return sent_tokenize(text)
    
    def preprocess(self, text, 
                   clean = True,
                   normalize = True,
                   remove_stopwords = False,
                   lemmatize = False,
                   stem = False,
                   correct_spelling = False,
                   custom_stopwords = None,
                   preserve_entities = False):
        if not text or not isinstance(text, str):
            return ""
        
        original_entities = {}
        if preserve_entities:
            original_entities = self.extract_entities(text)
        
        # Step 1: Clean text
        if clean:
            text = self.clean_text(text)
        
        # Step 2: Spelling correction (before other processing)
        if correct_spelling:
            text = self.correct_spelling(text)
        
        # Step 3: Normalize text
        if normalize:
            text = self.normalize_text(text, 
                                     lowercase=True,
                                     remove_punctuation=False,
                                     remove_numbers=False,
                                     expand_contractions=True)
        
        # Step 4: Remove stopwords
        if remove_stopwords:
            text = self.remove_stopwords(text, custom_stopwords)
        
        # Step 5: Lemmatize or stem (mutually exclusive)
        if lemmatize and not stem:
            text = self.lemmatize_text(text)
        elif stem and not lemmatize:
            text = self.stem_text(text)
        elif lemmatize and stem:
            print("Warning: Both lemmatize and stem are True. Using lemmatization only.")
            text = self.lemmatize_text(text)
        
        # Step 6: Final cleanup
        text = re.sub(r'\s+', ' ', text.strip())
        
        return text

In [None]:
class NLITextSimilarity:
    def __init__(self, model_name = "microsoft/deberta-v2-xlarge-mnli", 
                 enable_preprocessing = True, 
                 preprocessing_config = None):
        self.model_name = model_name
        self.enable_preprocessing = enable_preprocessing
        
        # Default preprocessing configuration
        self.preprocessing_config = {
            'clean': True,
            'normalize': True,
            'remove_stopwords': False,
            'lemmatize': False,
            'stem': False,
            'correct_spelling': False,
            'preserve_entities': False
        }
        
        # Update with user config
        if preprocessing_config:
            self.preprocessing_config.update(preprocessing_config)
        
        # Initialize preprocessor
        if self.enable_preprocessing:
            self.preprocessor = TextPreprocessor()
        
        print(f"Loading model: {model_name}")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        self.model.eval()
        
        # Label mapping for MNLI models
        self.label_mapping = {
            'CONTRADICTION': 0,
            'NEUTRAL': 1, 
            'ENTAILMENT': 2
        }
    
    def preprocess_text(self, text, custom_config = None):
        if not self.enable_preprocessing:
            return text
        
        config = self.preprocessing_config.copy()
        if custom_config:
            config.update(custom_config)
        
        return self.preprocessor.preprocess(text, **config)
        
    def get_nli_score(self, premise, hypothesis, 
                     preprocess = None):
        # Apply preprocessing if enabled
        if preprocess is None:
            preprocess = self.enable_preprocessing
        
        if preprocess and self.enable_preprocessing:
            premise = self.preprocess_text(premise)
            hypothesis = self.preprocess_text(hypothesis)
        
        # Tokenize the input
        inputs = self.tokenizer(premise, hypothesis, 
                              return_tensors="pt", 
                              truncation=True, 
                              max_length=512,
                              padding=True)
        
        # Get model predictions
        with torch.no_grad():
            outputs = self.model(**inputs)
            probabilities = torch.softmax(outputs.logits, dim=-1)
            
        # Convert to numpy and extract probabilities
        probs = probabilities.squeeze().numpy()
        
        return {
            'contradiction': float(probs[0]),
            'neutral': float(probs[1]),
            'entailment': float(probs[2])
        }
    
    def bidirectional_similarity(self, text1, text2, 
                                preprocess = None):
        # Get NLI scores in both directions
        scores_1_to_2 = self.get_nli_score(text1, text2, preprocess)
        scores_2_to_1 = self.get_nli_score(text2, text1, preprocess)
        
        # Average the entailment probabilities
        similarity = (scores_1_to_2['entailment'] + scores_2_to_1['entailment']) / 2
        
        return similarity
    
    def compute_similarity_matrix(self, responses, preprocess = None):
        n = len(responses)
        similarity_matrix = np.zeros((n, n))
        
        # Calculate pairwise similarities
        for i in range(n):
            for j in range(i, n):
                if i == j:
                    similarity_matrix[i, j] = 1.0  # Self-similarity is 1
                else:
                    scores_1_to_2 = self.get_nli_score(responses[i], responses[j], preprocess)
                    scores_2_to_1 = self.get_nli_score(responses[j], responses[i], preprocess)
                    similarity_matrix[i, j] = similarity_matrix[j, i] = (scores_1_to_2['entailment'] + scores_2_to_1['entailment']) / 2
        
        return similarity_matrix

In [None]:
class UncertaintyQuantifier:
    def __init__(self):
        self.word_sets = []
        self.eigenvalues = None
    
    def get_similarity_matrix(self, similarity_matrix):
        return similarity_matrix
    
    def get_eigenvalues(self):
        return self.eigenvalues

    def preprocess_sentence(self, sentence):
        words = re.findall(r'\b\w+\b', sentence.lower())
        return set(words)
    
    def jaccard_similarity(self, response1, response2):
        intersection = len(response1.intersection(response2))
        union = len(response1.union(response2))
        
        # Handle empty sets
        if union == 0:
            return 0.0
        
        return intersection / union
    
    def compute_similarity_matrix(self, responses):
        n = len(responses)
        similarity_matrix = np.zeros((n, n))
        
        # Preprocess all responses
        self.word_sets = [self.preprocess_sentence(sentence) for sentence in responses]
        
        # Calculate pairwise similarities
        for i in range(n):
            for j in range(i, n):
                if i == j:
                    similarity_matrix[i, j] = 1.0  # Self-similarity is 1
                else:
                    sim = self.jaccard_similarity(self.word_sets[i], self.word_sets[j])
                    similarity_matrix[i, j] = similarity_matrix[j, i] = sim
        
        return similarity_matrix
    
    def num_semantic_sets(self, similarity_matrix, threshold = 0.5):
        n = len(similarity_matrix)
        if n <= 1:
            return n
        
        groups = list(range(n))  # Initially each response is its own group
        
        for i in range(n):
            for j in range(i + 1, n):
                # precomputed similarity from matrix
                jaccard = similarity_matrix[i, j]
                nli_scores = {
                        'entailment': jaccard,
                        'contradiction': 1 - jaccard
                }
                
                # Check bidirectional entailment
                if (nli_scores['entailment'] > threshold and 
                    nli_scores['entailment'] > nli_scores['contradiction']):
                    
                    min_group = min(groups[i], groups[j])
                    max_group = max(groups[i], groups[j])
                    groups = [min_group if g == max_group else g for g in groups]
        
        return len(set(groups))
    
    def eigenvalue_uncertainty(self, similarity_matrix):
        n = similarity_matrix.shape[0]
        if n <= 1:
            return 0.0
        
        # Compute degree matrix
        degree_matrix = np.diag(similarity_matrix.sum(axis=1))
        
        # Avoid division by zero
        degree_sqrt_inv = np.zeros_like(degree_matrix)
        for i in range(n):
            if degree_matrix[i, i] > 1e-10:
                degree_sqrt_inv[i, i] = 1.0 / np.sqrt(degree_matrix[i, i])
        
        # Compute Laplacian
        laplacian = np.eye(n) - degree_sqrt_inv @ similarity_matrix @ degree_sqrt_inv
        
        # Compute eigenvalues
        self.eigenvalues = np.linalg.eigvals(laplacian)
        self.eigenvalues = np.real(self.eigenvalues)
        self.eigenvalues = np.sort(self.eigenvalues)
        
        # Sum of (1 - lambda_k) for lambda_k <= 1
        uncertainty = sum(max(0, 1 - lam) for lam in self.eigenvalues if lam <= 1)
        return uncertainty
    
    def degree_based_measures(self, similarity_matrix):
        n = similarity_matrix.shape[0]
        
        # Compute degree for each node
        degrees = similarity_matrix.sum(axis=1)
        
        # Normalize degrees
        max_degree = n - 1 if n > 1 else 1
        normalized_degrees = degrees / max_degree
        
        # Uncertainty: average distance from maximum connectivity
        uncertainty = np.mean(1 - normalized_degrees)
        
        # Confidence: individual degrees (higher degree = higher confidence)
        confidence_scores = normalized_degrees
        
        return uncertainty, confidence_scores
    
    def eccentricity_measures(self, similarity_matrix, k = 2):
        n = similarity_matrix.shape[0]
        if n <= 1:
            return 0.0, np.array([1.0] * n)
        # Compute degree matrix
        degree_matrix = np.diag(similarity_matrix.sum(axis=1))
        
        # Avoid division by zero
        degree_sqrt_inv = np.zeros_like(degree_matrix)
        for i in range(n):
            if degree_matrix[i, i] > 1e-10:
                degree_sqrt_inv[i, i] = 1.0 / np.sqrt(degree_matrix[i, i])
        
        # Compute normalized Laplacian
        laplacian = np.eye(n) - degree_sqrt_inv @ similarity_matrix @ degree_sqrt_inv
        
        # Get k smallest eigenvectors
        k = min(k, n-1) if n > 1 else 1
        eigenvals, eigenvecs = eigh(laplacian)
        
        # Use first k eigenvectors for embedding
        embedding = eigenvecs[:, :k]
        
        # Center the embeddings
        centroid = np.mean(embedding, axis=0)
        centered_embedding = embedding - centroid
        
        # Calculate distances from center
        distances = np.linalg.norm(centered_embedding, axis=1)
        
        # Uncertainty: average distance from center
        uncertainty = np.mean(distances)
        
        # Confidence: negative distance (closer to center = higher confidence)
        max_dist = np.max(distances) if np.max(distances) > 0 else 1
        confidence_scores = 1 - (distances / max_dist)
        
        return uncertainty, confidence_scores

In [None]:
llm_models = None
df = pd.DataFrame(columns=['model', 'question_level', 'similarity_method', 'eigenvalue_uncertanity', 'degree_uncertanity', 'eccentricity_uncertanity', 'avg_degree_confidence', 'avg_eccentirc_confidence', 'responce'])


file_path = os.path.join(os.getcwd(), 'llm_models.txt')
with open(file_path, 'r') as f:
    llm_models = [line.strip() for line in f if line.strip()]


for ind, models in enumerate(llm_models):
    # Initialize RAG system
    rag = RAGSystem(
        documents_folder=DOCUMENTS_FOLDER,
        groq_api_key=GROQ_API_KEY,
        db_path=CHROMA_DB_PATH,
        collection_name="document_chunks",
        groq_model = "llama3-8b-8192",
        chunk_size=512,
        chunk_overlap=50
    )

    # Initial scan of documents folder
    rag.scan_documents_folder()

    # Start monitoring for new/modified documents
    rag.start_monitoring()

    questions = ['Who is sovereign in India?',
                 'Which Fundamental Rights is guaranteed only to Indian citizens and not to foreigners?',
                 'Is India a true federation or quasi-federal state?'
    ]
    responses = []
    for level, question in enumerate(questions, 1):
        for i in range(30):
            result = rag.answer_question(question, top_k=5)
            responses.append(result['answer'])
        
        nli_sim_model_1 = NLITextSimilarity(model_name='sentence-transformers/all-MiniLM-L6-v2')
        sim_matrix = nli_sim_model_1.compute_similarity_matrix(responses=responses)

        uq = UncertaintyQuantifier()
        un_egiv = uq.eigenvalue_uncertainty(sim_matrix)
        deg_un, deg_conf = uq.degree_based_measures(sim_matrix)
        ecc_un, ecc_conf = uq.eccentricity_measures(sim_matrix, k=4)
        save_response = np.argmax(ecc_conf)
        
        df.loc[ind] = [models, level, 'NLI', un_egiv, deg_un, ecc_un, np.mean(deg_conf), np.mean(ecc_conf), responses[save_response]]


In [None]:
df.to_csv('llm_models_uncertanity.csv', index=False)