<a href="https://colab.research.google.com/github/Vishnupriya-Selvraj/Agentic_AI_Workshop/blob/main/RAG_System_AI_Research_Papers_Q%26A.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
!pip install langchain langchain-community langchain-huggingface
!pip install sentence-transformers faiss-cpu
!pip install pypdf2 PyPDF2
!pip install transformers torch
!pip install gradio
!pip install accelerate
!pip install rank-bm25  # For hybrid search
!pip install spacy
!pip install nltk
!python -m spacy download en_core_web_sm

print("✅ All packages installed successfully!")


Collecting en-core-web-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m34.3 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
✅ All packages installed successfully!


In [None]:
import os
import re
import warnings
import numpy as np
import pandas as pd
warnings.filterwarnings('ignore')

import torch
from typing import List, Dict, Any, Tuple, Optional
import gradio as gr
from pathlib import Path
import nltk
import spacy
from collections import Counter

# Download NLTK data
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)

# LangChain imports
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.schema import Document

# Transformers
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM

# BM25 for hybrid search
from rank_bm25 import BM25Okapi

# Load spaCy model
nlp = spacy.load("en_core_web_sm")

print("✅ All libraries imported successfully!")
print(f"🔥 GPU Available: {torch.cuda.is_available()}")

In [28]:
class EnhancedDocumentProcessor:
    """Advanced document processing with improved chunking strategies"""

    def __init__(self, chunk_size=800, chunk_overlap=150):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

        # Multiple text splitters for different content types
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", ". ", " ", ""]
        )

        # Specialized splitter for academic content
        self.academic_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=[
                "\n## ",      # Section headers
                "\n### ",     # Subsection headers
                "\n\n",       # Paragraph breaks
                "\n",         # Line breaks
                ". ",         # Sentence endings
                " ",          # Word boundaries
                ""
            ]
        )

    def load_pdfs(self, pdf_paths: List[str]) -> List[Document]:
        """Load multiple PDF files with enhanced metadata"""
        all_documents = []

        for pdf_path in pdf_paths:
            try:
                print(f"📖 Loading: {os.path.basename(pdf_path)}")
                loader = PyPDFLoader(pdf_path)
                documents = loader.load()

                # Enhanced metadata extraction
                for i, doc in enumerate(documents):
                    # Clean and preprocess text
                    cleaned_text = self._clean_text(doc.page_content)
                    doc.page_content = cleaned_text

                    # Extract section information
                    section_info = self._extract_section_info(cleaned_text)

                    doc.metadata.update({
                        'source_file': os.path.basename(pdf_path),
                        'page_number': i + 1,
                        'total_pages': len(documents),
                        'word_count': len(cleaned_text.split()),
                        'char_count': len(cleaned_text),
                        'section_type': section_info['type'],
                        'section_title': section_info['title'],
                        'has_figures': 'figure' in cleaned_text.lower(),
                        'has_tables': 'table' in cleaned_text.lower(),
                        'has_equations': bool(re.search(r'\$.*?\$|\\begin\{equation\}', cleaned_text))
                    })

                all_documents.extend(documents)
                print(f"✅ Loaded {len(documents)} pages from {os.path.basename(pdf_path)}")

            except Exception as e:
                print(f"❌ Error loading {pdf_path}: {str(e)}")

        return all_documents

    def _clean_text(self, text: str) -> str:
        """Clean and normalize text content"""
        # Remove excessive whitespace
        text = re.sub(r'\s+', ' ', text)

        # Fix common PDF extraction issues
        text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)  # Add space between camelCase
        text = re.sub(r'(\.)([A-Z])', r'\1 \2', text)     # Space after periods
        text = re.sub(r'([a-z])(\d)', r'\1 \2', text)     # Space before numbers
        text = re.sub(r'(\d)([a-z])', r'\1 \2', text)     # Space after numbers

        # Remove page headers/footers patterns
        text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE)
        text = re.sub(r'^Page \d+.*$', '', text, flags=re.MULTILINE)

        # Clean up references and citations
        text = re.sub(r'\[\d+(?:,\s*\d+)*\]', '', text)  # Remove citation numbers

        return text.strip()

    def _extract_section_info(self, text: str) -> Dict[str, str]:
        """Extract section information from text"""
        # Common academic section patterns
        section_patterns = {
            'abstract': r'(?i)^(?:abstract|summary)',
            'introduction': r'(?i)^(?:1\.?\s*)?introduction',
            'methodology': r'(?i)^(?:\d+\.?\s*)?(?:method|methodology|approach)',
            'results': r'(?i)^(?:\d+\.?\s*)?(?:results?|findings?)',
            'discussion': r'(?i)^(?:\d+\.?\s*)?discussion',
            'conclusion': r'(?i)^(?:\d+\.?\s*)?(?:conclusion|conclusions?)',
            'references': r'(?i)^(?:references?|bibliography)',
            'related_work': r'(?i)^(?:\d+\.?\s*)?(?:related work|literature review)',
            'evaluation': r'(?i)^(?:\d+\.?\s*)?(?:evaluation|experiments?)'
        }

        first_line = text.split('\n')[0].strip()

        for section_type, pattern in section_patterns.items():
            if re.match(pattern, first_line):
                return {'type': section_type, 'title': first_line}

        return {'type': 'content', 'title': ''}

    def create_smart_chunks(self, documents: List[Document]) -> List[Document]:
        """Create intelligent chunks with improved context preservation"""
        print("🔪 Creating smart text chunks...")

        all_chunks = []

        for doc in documents:
            section_type = doc.metadata.get('section_type', 'content')

            # Use appropriate splitter based on content type
            if section_type in ['abstract', 'conclusion']:
                # Keep abstracts and conclusions as single chunks if possible
                if len(doc.page_content) <= self.chunk_size:
                    chunk = Document(
                        page_content=doc.page_content,
                        metadata={**doc.metadata, 'chunk_id': len(all_chunks), 'is_complete_section': True}
                    )
                    all_chunks.append(chunk)
                    continue

            # Regular chunking with academic-aware splitting
            chunks = self.academic_splitter.split_documents([doc])

            # Post-process chunks for better coherence
            processed_chunks = self._post_process_chunks(chunks, doc.metadata)
            all_chunks.extend(processed_chunks)

        # Add cross-references and relationships
        all_chunks = self._add_chunk_relationships(all_chunks)

        print(f"✅ Created {len(all_chunks)} smart chunks")
        return all_chunks

    def _post_process_chunks(self, chunks: List[Document], base_metadata: Dict) -> List[Document]:
        """Post-process chunks for better quality"""
        processed_chunks = []

        for i, chunk in enumerate(chunks):
            # Skip very short chunks
            if len(chunk.page_content.strip()) < 50:
                continue

            # Ensure chunks end at sentence boundaries when possible
            content = chunk.page_content.strip()
            if not content.endswith(('.', '!', '?', ':')):
                sentences = content.split('. ')
                if len(sentences) > 1:
                    content = '. '.join(sentences[:-1]) + '.'

            # Add enhanced metadata
            chunk.metadata.update({
                **base_metadata,
                'chunk_id': len(processed_chunks),
                'chunk_index': i,
                'chunk_size': len(content),
                'sentence_count': len([s for s in content.split('.') if s.strip()]),
                'processing_timestamp': str(pd.Timestamp.now())
            })

            chunk.page_content = content
            processed_chunks.append(chunk)

        return processed_chunks

    def _add_chunk_relationships(self, chunks: List[Document]) -> List[Document]:
        """Add relationship information between chunks"""
        for i, chunk in enumerate(chunks):
            # Add neighboring chunk information
            chunk.metadata.update({
                'prev_chunk_id': i - 1 if i > 0 else None,
                'next_chunk_id': i + 1 if i < len(chunks) - 1 else None,
                'total_chunks': len(chunks)
            })

        return chunks

In [29]:
class AdvancedVectorStoreManager:
    """Enhanced vector store with hybrid search capabilities"""

    def __init__(self, model_name="sentence-transformers/all-MiniLM-L6-v2"):
        self.model_name = model_name
        self.embeddings = HuggingFaceEmbeddings(
            model_name=model_name,
            model_kwargs={'device': 'cuda' if torch.cuda.is_available() else 'cpu'},
            encode_kwargs={'normalize_embeddings': True}
        )
        self.vector_store = None
        self.bm25 = None
        self.chunks = []
        self.chunk_texts = []

    def create_vector_store(self, chunks: List[Document]) -> bool:
        """Create enhanced vector store with hybrid search support"""
        try:
            print("⚡ Creating advanced vector embeddings...")
            print(f"📊 Processing {len(chunks)} chunks...")

            self.chunks = chunks
            self.chunk_texts = [chunk.page_content for chunk in chunks]

            # Create FAISS vector store
            self.vector_store = FAISS.from_documents(
                documents=chunks,
                embedding=self.embeddings
            )

            # Create BM25 index for keyword search
            print("🔍 Building BM25 index for keyword search...")
            tokenized_chunks = [doc.lower().split() for doc in self.chunk_texts]
            self.bm25 = BM25Okapi(tokenized_chunks)

            print(f"✅ Vector store created successfully!")
            print(f"📈 Vector dimension: {self.vector_store.index.d}")
            print(f"📚 Total vectors: {self.vector_store.index.ntotal}")
            print(f"🔍 BM25 index ready with {len(tokenized_chunks)} documents")

            return True

        except Exception as e:
            print(f"❌ Error creating vector store: {str(e)}")
            return False

    def hybrid_search(self, query: str, k: int = 10, alpha: float = 0.7) -> List[Tuple[Document, float]]:
        """Perform hybrid search combining semantic and keyword search"""
        if not self.vector_store or not self.bm25:
            return []

        try:
            # Semantic search
            semantic_results = self.vector_store.similarity_search_with_score(query, k=k*2)

            # Keyword search with BM25
            query_tokens = query.lower().split()
            bm25_scores = self.bm25.get_scores(query_tokens)

            # Get top BM25 results
            bm25_indices = np.argsort(bm25_scores)[::-1][:k*2]

            # Combine results with weighted scoring
            combined_results = {}

            # Add semantic results
            for doc, distance in semantic_results:
                chunk_id = doc.metadata.get('chunk_id', 0)
                semantic_score = 1 - distance  # Convert distance to similarity
                combined_results[chunk_id] = {
                    'document': doc,
                    'semantic_score': semantic_score,
                    'bm25_score': 0.0
                }

            # Add BM25 results
            for idx in bm25_indices:
                if idx < len(self.chunks):
                    chunk_id = self.chunks[idx].metadata.get('chunk_id', idx)
                    bm25_score = bm25_scores[idx]

                    if chunk_id in combined_results:
                        combined_results[chunk_id]['bm25_score'] = bm25_score
                    else:
                        combined_results[chunk_id] = {
                            'document': self.chunks[idx],
                            'semantic_score': 0.0,
                            'bm25_score': bm25_score
                        }

            # Calculate combined scores and rank
            final_results = []
            for chunk_id, result in combined_results.items():
                # Normalize BM25 scores
                max_bm25 = max([r['bm25_score'] for r in combined_results.values()])
                normalized_bm25 = result['bm25_score'] / max_bm25 if max_bm25 > 0 else 0

                # Combined score
                combined_score = alpha * result['semantic_score'] + (1 - alpha) * normalized_bm25

                final_results.append((result['document'], 1 - combined_score))  # Convert back to distance

            # Sort by combined score and return top k
            final_results.sort(key=lambda x: x[1])

            print(f"🔍 Hybrid search found {len(final_results[:k])} relevant chunks")
            return final_results[:k]

        except Exception as e:
            print(f"❌ Search error: {str(e)}")
            return []

    def search_with_context(self, query: str, k: int = 5) -> List[Tuple[Document, float, List[Document]]]:
        """Search with additional context from neighboring chunks"""
        base_results = self.hybrid_search(query, k)
        enhanced_results = []

        for doc, score in base_results:
            # Get neighboring chunks for context
            chunk_id = doc.metadata.get('chunk_id', 0)
            context_chunks = []

            # Previous chunk
            if chunk_id > 0:
                prev_chunk = next((c for c in self.chunks if c.metadata.get('chunk_id') == chunk_id - 1), None)
                if prev_chunk:
                    context_chunks.append(prev_chunk)

            # Next chunk
            next_chunk = next((c for c in self.chunks if c.metadata.get('chunk_id') == chunk_id + 1), None)
            if next_chunk:
                context_chunks.append(next_chunk)

            enhanced_results.append((doc, score, context_chunks))

        return enhanced_results

In [30]:
class IntelligentAnswerGenerator:
    """Advanced answer generation with better context understanding"""

    def __init__(self):
        self.generator = None
        self.tokenizer = None
        self.model_name = "microsoft/DialoGPT-medium"
        self.setup_model()

    def setup_model(self):
        """Initialize the text generation model with better configuration"""
        try:
            print("🤖 Loading enhanced language model...")

            # Try to use a better model if available
            try:
                # Use a more capable model for better answers
                self.model_name = "microsoft/DialoGPT-large"  # Fallback to medium if large fails

                self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
                if self.tokenizer.pad_token is None:
                    self.tokenizer.pad_token = self.tokenizer.eos_token

                self.generator = pipeline(
                    "text-generation",
                    model=self.model_name,
                    tokenizer=self.tokenizer,
                    max_length=512,
                    temperature=0.3,  # Lower temperature for more focused answers
                    do_sample=True,
                    top_p=0.9,
                    top_k=50,
                    pad_token_id=self.tokenizer.pad_token_id,
                    device=0 if torch.cuda.is_available() else -1
                )

            except Exception:
                # Fallback to medium model
                self.model_name = "microsoft/DialoGPT-medium"
                self.generator = pipeline(
                    "text-generation",
                    model=self.model_name,
                    max_length=512,
                    temperature=0.3,
                    do_sample=True,
                    pad_token_id=50256,
                    device=0 if torch.cuda.is_available() else -1
                )

            print(f"✅ Language model loaded: {self.model_name}")

        except Exception as e:
            print(f"❌ Error loading model: {str(e)}")
            self.generator = None

    def generate_comprehensive_answer(self, question: str, context_results: List[Tuple], max_length: int = 300) -> Dict[str, Any]:
        """Generate comprehensive answer with improved reasoning"""

        if not context_results:
            return {
                'answer': "I couldn't find relevant information to answer your question.",
                'confidence': 0.0,
                'reasoning': "No relevant context found",
                'answer_type': 'no_answer'
            }

        # Analyze question type
        question_analysis = self._analyze_question(question)

        # Prepare enhanced context
        enhanced_context = self._prepare_enhanced_context(context_results, question_analysis)

        # Generate answer based on question type
        if question_analysis['type'] in ['definition', 'explanation']:
            answer_result = self._generate_explanatory_answer(question, enhanced_context, max_length)
        elif question_analysis['type'] == 'comparison':
            answer_result = self._generate_comparative_answer(question, enhanced_context, max_length)
        elif question_analysis['type'] == 'list':
            answer_result = self._generate_list_answer(question, enhanced_context, max_length)
        else:
            answer_result = self._generate_general_answer(question, enhanced_context, max_length)

        # Add metadata and confidence scoring
        answer_result.update({
            'question_type': question_analysis['type'],
            'context_quality': self._assess_context_quality(enhanced_context, question),
            'source_count': len(context_results)
        })

        return answer_result

    def _analyze_question(self, question: str) -> Dict[str, Any]:
        """Analyze question to determine type and key components"""
        question_lower = question.lower()

        # Question type patterns
        if any(word in question_lower for word in ['what is', 'define', 'explain', 'describe']):
            question_type = 'definition'
        elif any(word in question_lower for word in ['how does', 'how do', 'how is', 'how are']):
            question_type = 'explanation'
        elif any(word in question_lower for word in ['compare', 'difference', 'versus', 'vs']):
            question_type = 'comparison'
        elif any(word in question_lower for word in ['list', 'what are', 'which are', 'enumerate']):
            question_type = 'list'
        elif any(word in question_lower for word in ['why', 'reason', 'cause']):
            question_type = 'reasoning'
        else:
            question_type = 'general'

        # Extract key terms using spaCy
        doc = nlp(question)
        key_terms = [token.lemma_.lower() for token in doc if token.pos_ in ['NOUN', 'ADJ', 'VERB'] and not token.is_stop]

        return {
            'type': question_type,
            'key_terms': key_terms,
            'entities': [ent.text for ent in doc.ents],
            'complexity': len([token for token in doc if token.pos_ in ['NOUN', 'VERB', 'ADJ']])
        }

    def _prepare_enhanced_context(self, context_results: List[Tuple], question_analysis: Dict) -> str:
        """Prepare enhanced context with smart selection and ordering"""
        context_parts = []
        seen_content = set()

        for i, (doc, score, neighbor_chunks) in enumerate(context_results):
            # Main chunk
            main_content = doc.page_content.strip()
            if main_content not in seen_content:
                # Add section information if available
                section_info = ""
                if doc.metadata.get('section_type') and doc.metadata.get('section_type') != 'content':
                    section_info = f"[From {doc.metadata['section_type'].title()} section] "

                context_parts.append(f"{section_info}{main_content}")
                seen_content.add(main_content)

            # Add relevant neighbor chunks for better context
            for neighbor in neighbor_chunks:
                neighbor_content = neighbor.page_content.strip()
                if (neighbor_content not in seen_content and
                    len(neighbor_content) > 100 and
                    any(term in neighbor_content.lower() for term in question_analysis['key_terms'])):
                    context_parts.append(f"[Related context] {neighbor_content}")
                    seen_content.add(neighbor_content)

        # Limit total context length
        combined_context = "\n\n".join(context_parts)
        if len(combined_context) > 2000:  # Limit context size
            # Keep most relevant parts
            truncated_parts = []
            current_length = 0
            for part in context_parts:
                if current_length + len(part) <= 2000:
                    truncated_parts.append(part)
                    current_length += len(part)
                else:
                    break
            combined_context = "\n\n".join(truncated_parts)

        return combined_context

    def _generate_explanatory_answer(self, question: str, context: str, max_length: int) -> Dict[str, Any]:
        """Generate detailed explanatory answers"""
        if not self.generator:
            return self._extract_answer_from_context(question, context)

        prompt = f"""Based on the research paper context, provide a clear and comprehensive explanation.

Context: {context}

Question: {question}

Provide a detailed explanation that:
1. Directly answers the question
2. Uses information from the context
3. Is clear and well-structured

Answer:"""

        try:
            response = self.generator(
                prompt,
                max_length=len(prompt) + max_length,
                num_return_sequences=1,
                temperature=0.2,  # Lower temperature for explanations
                do_sample=True,
                pad_token_id=self.generator.tokenizer.pad_token_id if hasattr(self.generator, 'tokenizer') else 50256
            )

            answer = self._extract_and_clean_answer(response[0]['generated_text'], prompt)
            confidence = self._calculate_confidence(answer, context, question)

            return {
                'answer': answer,
                'confidence': confidence,
                'reasoning': 'Generated explanatory answer based on context',
                'answer_type': 'explanation'
            }

        except Exception as e:
            print(f"⚠️ Generation error: {str(e)}")
            return self._extract_answer_from_context(question, context)

    def _generate_general_answer(self, question: str, context: str, max_length: int) -> Dict[str, Any]:
        """Generate general answers with fallback logic"""
        if not self.generator:
            return self._extract_answer_from_context(question, context)

        prompt = f"""Answer the question based on the research paper context provided.

Context: {context}

Question: {question}

Answer concisely and accurately based on the context:"""

        try:
            response = self.generator(
                prompt,
                max_length=len(prompt) + max_length,
                num_return_sequences=1,
                temperature=0.3,
                do_sample=True
            )

            answer = self._extract_and_clean_answer(response[0]['generated_text'], prompt)
            confidence = self._calculate_confidence(answer, context, question)

            return {
                'answer': answer,
                'confidence': confidence,
                'reasoning': 'Generated answer based on retrieved context',
                'answer_type': 'general'
            }

        except Exception as e:
            print(f"⚠️ Generation error: {str(e)}")
            return self._extract_answer_from_context(question, context)

    def _extract_answer_from_context(self, question: str, context: str) -> Dict[str, Any]:
        """Fallback method: extract answer directly from context"""
        sentences = [s.strip() for s in context.split('.') if s.strip()]
        question_words = set(question.lower().split())

        # Score sentences by relevance
        scored_sentences = []
        for sentence in sentences:
            sentence_words = set(sentence.lower().split())
            overlap = len(question_words.intersection(sentence_words))
            if overlap > 0:
                score = overlap / len(question_words)
                scored_sentences.append((sentence, score))

        # Sort by relevance and combine top sentences
        scored_sentences.sort(key=lambda x: x[1], reverse=True)

        if scored_sentences:
            # Take top 2-3 most relevant sentences
            top_sentences = [s[0] for s in scored_sentences[:3]]
            answer = '. '.join(top_sentences)
            if not answer.endswith('.'):
                answer += '.'

            confidence = scored_sentences[0][1] if scored_sentences else 0.3
        else:
            # Fallback to first few sentences
            answer = '. '.join(sentences[:2]) + '.' if sentences else "No relevant information found."
            confidence = 0.2

        return {
            'answer': answer,
            'confidence': min(confidence, 0.8),  # Cap confidence for extracted answers
            'reasoning': 'Extracted relevant sentences from context',
            'answer_type': 'extracted'
        }

    def _extract_and_clean_answer(self, generated_text: str, prompt: str) -> str:
        """Extract and clean the generated answer"""
        # Remove the prompt from the response
        answer = generated_text[len(prompt):].strip()

        # Clean up common generation artifacts
        answer = re.sub(r'^Answer:\s*', '', answer)
        answer = re.sub(r'\n+', ' ', answer)
        answer = ' '.join(answer.split())  # Normalize whitespace

        # Ensure proper ending
        if answer and not answer.endswith(('.', '!', '?')):
            # Try to end at a sentence boundary
            sentences = answer.split('.')
            if len(sentences) > 1:
                answer = '.'.join(sentences[:-1]) + '.'
            else:
                answer += '.'

        return answer

    def _calculate_confidence(self, answer: str, context: str, question: str) -> float:
        """Calculate confidence score for the answer"""
        if not answer or len(answer) < 20:
            return 0.1

        # Factor 1: Overlap between answer and context
        answer_words = set(answer.lower().split())
        context_words = set(context.lower().split())
        overlap_ratio = len(answer_words.intersection(context_words)) / len(answer_words) if answer_words else 0

        # Factor 2: Question term coverage
        question_words = set(question.lower().split())
        question_coverage = len(answer_words.intersection(question_words)) / len(question_words) if question_words else 0

        # Factor 3: Answer length and structure
        length_score = min(len(answer.split()) / 20, 1.0)  # Normalize to max 20 words

        # Combine factors
        confidence = (overlap_ratio * 0.4 + question_coverage * 0.4 + length_score * 0.2)

        return min(confidence, 0.95)  # Cap at 95%

    def _assess_context_quality(self, context: str, question: str) -> float:
        """Assess the quality of context for answering the question"""
        if not context:
            return 0.0

        question_words = set(question.lower().split())
        context_words = set(context.lower().split())

        # Calculate coverage and relevance
        coverage = len(question_words.intersection(context_words)) / len(question_words) if question_words else 0

        # Context length factor
        length_factor = min(len(context.split()) / 100, 1.0)

        return min(coverage * 0.7 + length_factor * 0.3, 1.0)

In [31]:
class RAGSystem:
    """Complete Retrieval-Augmented Generation system"""

    def __init__(self):
        self.doc_processor = DocumentProcessor()
        self.vector_manager = VectorStoreManager()
        self.answer_generator = AnswerGenerator()
        self.is_ready = False
        self.documents_info = {}

    def setup(self, pdf_paths: List[str]) -> bool:
        """Complete system setup"""
        print("🚀 Setting up RAG system...")
        print("="*50)

        try:
            # Step 1: Load documents
            print("Step 1: Loading PDFs...")
            documents = self.doc_processor.load_pdfs(pdf_paths)

            if not documents:
                print("❌ No documents loaded!")
                return False

            # Step 2: Create chunks
            print("\nStep 2: Creating text chunks...")
            chunks = self.doc_processor.create_chunks(documents)

            # Step 3: Create vector store
            print("\nStep 3: Creating vector embeddings...")
            if not self.vector_manager.create_vector_store(chunks):
                return False

            # Step 4: Store document information
            self.documents_info = {
                'total_documents': len(documents),
                'total_chunks': len(chunks),
                'pdf_files': [os.path.basename(path) for path in pdf_paths],
                'setup_complete': True
            }

            self.is_ready = True
            print("\n" + "="*50)
            print("🎉 RAG System Setup Complete!")
            print(f"📚 Loaded: {len(documents)} pages from {len(pdf_paths)} PDFs")
            print(f"🔪 Created: {len(chunks)} text chunks")
            print(f"⚡ Vector store ready with {self.vector_manager.vector_store.index.ntotal} embeddings")
            print("="*50)

            return True

        except Exception as e:
            print(f"❌ Setup failed: {str(e)}")
            return False

    def ask_question(self, question: str, num_sources: int = 3) -> Dict[str, Any]:
        """Ask a question and get an answer with sources"""

        if not self.is_ready:
            return {
                'answer': "❌ System not ready. Please upload and process documents first.",
                'sources': [],
                'confidence': 0.0
            }

        if not question.strip():
            return {
                'answer': "❌ Please provide a valid question.",
                'sources': [],
                'confidence': 0.0
            }

        try:
            print(f"🔍 Processing question: '{question}'")

            # Step 1: Retrieve relevant documents
            search_results = self.vector_manager.search_similar(question, k=num_sources)

            if not search_results:
                return {
                    'answer': "❌ No relevant information found in the documents.",
                    'sources': [],
                    'confidence': 0.0
                }

            # Step 2: Prepare context and source information
            context_parts = []
            sources = []
            total_score = 0

            for doc, score in search_results:
                context_parts.append(doc.page_content)
                total_score += (1 - score)  # Convert distance to similarity

                source_info = {
                    'file': doc.metadata.get('source_file', 'Unknown'),
                    'page': doc.metadata.get('page_number', 'Unknown'),
                    'chunk_id': doc.metadata.get('chunk_id', 'Unknown'),
                    'similarity_score': round(1 - score, 3),  # Convert to similarity
                    'content_preview': doc.page_content[:150] + "..." if len(doc.page_content) > 150 else doc.page_content
                }
                sources.append(source_info)

            # Step 3: Combine context
            combined_context = "\n\n".join(context_parts)

            # Step 4: Generate answer
            print("🤖 Generating answer...")
            answer = self.answer_generator.generate_answer(question, combined_context)

            # Step 5: Calculate confidence score
            confidence = min(total_score / len(search_results), 1.0)

            result = {
                'answer': answer,
                'sources': sources,
                'confidence': round(confidence, 3),
                'context_length': len(combined_context),
                'num_sources_used': len(sources)
            }

            print(f"✅ Answer generated (confidence: {result['confidence']})")
            return result

        except Exception as e:
            print(f"❌ Error processing question: {str(e)}")
            return {
                'answer': f"❌ Error processing your question: {str(e)}",
                'sources': [],
                'confidence': 0.0
            }

    def get_system_status(self) -> Dict[str, Any]:
        """Get current system status and statistics"""
        base_status = {
            'is_ready': self.is_ready,
            'documents_info': self.documents_info,
            'vector_store_stats': self.vector_manager.get_stats()
        }

        if self.is_ready:
            base_status['model_info'] = {
                'embedding_model': self.vector_manager.model_name,
                'generation_model': self.answer_generator.model_name,
                'gpu_available': torch.cuda.is_available()
            }

        return base_status

# Initialize the complete RAG system
print("\n🧪 Initializing Complete RAG System...")
rag_system = RAGSystem()
print("✅ RAG system ready for setup!")


🧪 Initializing Complete RAG System...
🤖 Loading language model...


Device set to use cpu


✅ Language model loaded!
✅ RAG system ready for setup!


In [32]:
import pandas as pd
from google.colab import files
import shutil

def upload_pdf_files():
    """Helper function to upload PDF files in Colab"""
    print("📤 Please upload your PDF files...")
    uploaded = files.upload()

    pdf_paths = []
    for filename, data in uploaded.items():
        if filename.endswith('.pdf'):
            # Save file to current directory
            with open(filename, 'wb') as f:
                f.write(data)
            pdf_paths.append(filename)
            print(f"✅ Saved: {filename}")
        else:
            print(f"⚠️ Skipped non-PDF file: {filename}")

    return pdf_paths

def setup_with_sample_data():
    """Setup system with sample data for demonstration"""
    print("🎯 Setting up with sample data...")
    print("Note: In real usage, upload your own PDF files using upload_pdf_files()")

    # Create a sample document for demonstration
    sample_content = """
    This is a sample AI research paper about Natural Language Processing.

    Abstract: This paper presents a novel approach to question answering systems
    using retrieval-augmented generation. Our method combines semantic search
    with large language models to provide accurate and contextual answers.

    Introduction: Question answering has been a fundamental challenge in NLP.
    Recent advances in transformer models have shown promising results.

    Methodology: We propose a hybrid approach that uses vector embeddings
    for document retrieval and generative models for answer synthesis.

    Results: Our system achieves 85% accuracy on benchmark datasets,
    outperforming traditional keyword-based approaches by 20%.

    Conclusion: The combination of retrieval and generation provides
    superior performance for domain-specific question answering tasks.
    """

    # Create sample PDF content (simplified)
    from langchain.schema import Document

    sample_doc = Document(
        page_content=sample_content,
        metadata={
            'source_file': 'sample_paper.pdf',
            'page_number': 1,
            'total_pages': 1
        }
    )

    # Process sample document
    chunks = rag_system.doc_processor.create_chunks([sample_doc])

    # Create vector store
    if rag_system.vector_manager.create_vector_store(chunks):
        rag_system.is_ready = True
        rag_system.documents_info = {
            'total_documents': 1,
            'total_chunks': len(chunks),
            'pdf_files': ['sample_paper.pdf'],
            'setup_complete': True
        }
        print("✅ Sample system ready for testing!")
        return True

    return False



In [33]:
def create_gradio_interface():
    """Create an interactive web interface"""

    def handle_file_upload(files):
        if not files:
            return "❌ Please upload at least one PDF file."

        try:
            pdf_paths = []
            for file in files:
                if file.name.endswith('.pdf'):
                    pdf_paths.append(file.name)

            if not pdf_paths:
                return "❌ No valid PDF files found."

            # Setup the RAG system
            success = rag_system.setup(pdf_paths)

            if success:
                status = rag_system.get_system_status()
                return f"""✅ Setup Complete!

📊 System Status:
• Documents loaded: {status['documents_info']['total_documents']}
• Text chunks created: {status['documents_info']['total_chunks']}
• Files processed: {', '.join(status['documents_info']['pdf_files'])}
• Vector embeddings: {status['vector_store_stats']['total_vectors']}

🎯 Ready to answer questions!"""
            else:
                return "❌ Setup failed. Please check your PDF files and try again."

        except Exception as e:
            return f"❌ Error during setup: {str(e)}"

    def handle_question(question):
        if not question.strip():
            return "❌ Please enter a question.", ""

        result = rag_system.ask_question(question)

        # Format answer
        answer_text = f"**Answer:** {result['answer']}\n\n"
        answer_text += f"**Confidence:** {result['confidence']}/1.0\n\n"

        # Format sources
        sources_text = "**Sources:**\n\n"
        for i, source in enumerate(result['sources'], 1):
            sources_text += f"**{i}. {source['file']}** (Page {source['page']})\n"
            sources_text += f"   • Similarity: {source['similarity_score']}\n"
            sources_text += f"   • Preview: {source['content_preview']}\n\n"

        if not result['sources']:
            sources_text = "No sources found."

        return answer_text, sources_text

    def show_system_info():
        status = rag_system.get_system_status()

        if not status['is_ready']:
            return "System not ready. Please upload documents first."

        info_text = f"""**System Information:**

**Status:** {'✅ Ready' if status['is_ready'] else '❌ Not Ready'}

**Documents:**
• Total documents: {status['documents_info']['total_documents']}
• Total chunks: {status['documents_info']['total_chunks']}
• Files: {', '.join(status['documents_info']['pdf_files'])}

**Vector Store:**
• Total vectors: {status['vector_store_stats']['total_vectors']}
• Vector dimension: {status['vector_store_stats']['vector_dimension']}
• Embedding model: {status['vector_store_stats']['model_name']}

**Models:**
• Generation model: {status.get('model_info', {}).get('generation_model', 'N/A')}
• GPU available: {status.get('model_info', {}).get('gpu_available', False)}
"""
        return info_text

    # Create Gradio interface
    with gr.Blocks(
        title="RAG System - AI Research Papers QA",
        theme=gr.themes.Soft()
    ) as interface:

        gr.Markdown("""
        # 🤖 RAG System: AI Research Papers Q&A

        **Upload your AI research papers and ask questions about them!**

        This system uses Retrieval-Augmented Generation to:
        - 📖 Process and understand your research papers
        - 🔍 Find relevant information for your questions
        - 🤖 Generate accurate answers with source citations
        """)

        with gr.Tab("📤 Upload & Setup"):
            gr.Markdown("### Step 1: Upload Your PDF Research Papers")

            file_upload = gr.File(
                label="Select PDF Files",
                file_count="multiple",
                file_types=[".pdf"],
                height=100
            )

            setup_btn = gr.Button("🚀 Process Documents", variant="primary", size="lg")
            setup_output = gr.Textbox(
                label="Setup Status",
                lines=10,
                interactive=False,
                placeholder="Upload PDFs and click 'Process Documents' to begin..."
            )

            # Sample data button for demo
            gr.Markdown("---")
            gr.Markdown("### Or Try with Sample Data")
            sample_btn = gr.Button("🎯 Use Sample Data (Demo)", variant="secondary")

            def setup_sample():
                success = setup_with_sample_data()
                if success:
                    status = rag_system.get_system_status()
                    return f"""✅ Sample System Ready!

This is a demo with sample AI research paper content.
You can now ask questions like:
• "What is the main contribution of this paper?"
• "What methodology was used?"
• "What were the results?"

📊 System loaded with {status['documents_info']['total_chunks']} text chunks."""
                else:
                    return "❌ Failed to setup sample data."

            sample_btn.click(setup_sample, outputs=[setup_output])
            setup_btn.click(handle_file_upload, inputs=[file_upload], outputs=[setup_output])

        with gr.Tab("❓ Ask Questions"):
            gr.Markdown("### Ask Questions About Your Research Papers")

            with gr.Row():
                with gr.Column(scale=4):
                    question_input = gr.Textbox(
                        label="Your Question",
                        placeholder="e.g., What are the main contributions of this research?",
                        lines=3
                    )
                with gr.Column(scale=1):
                    ask_btn = gr.Button("🔍 Get Answer", variant="primary", size="lg")

            with gr.Row():
                with gr.Column():
                    answer_output = gr.Textbox(
                        label="Answer",
                        lines=8,
                        interactive=False
                    )
                with gr.Column():
                    sources_output = gr.Textbox(
                        label="Sources & Citations",
                        lines=8,
                        interactive=False
                    )

            # Sample questions
            gr.Markdown("### 💡 Sample Questions to Try:")
            sample_questions = [
                "What are the main contributions of this research?",
                "What methodology was used in this study?",
                "What are the key findings and results?",
                "What are the limitations mentioned?",
                "What future work is suggested?"
            ]

            for q in sample_questions:
                sample_q_btn = gr.Button(f"📝 {q}", variant="secondary", size="sm")
                sample_q_btn.click(lambda x=q: x, outputs=[question_input])

            ask_btn.click(
                handle_question,
                inputs=[question_input],
                outputs=[answer_output, sources_output]
            )

        with gr.Tab("ℹ️ System Info"):
            gr.Markdown("### System Status and Information")

            info_btn = gr.Button("🔄 Refresh System Info", variant="primary")
            info_output = gr.Textbox(
                label="System Information",
                lines=15,
                interactive=False
            )

            info_btn.click(show_system_info, outputs=[info_output])

            # Auto-load info on tab open
            interface.load(show_system_info, outputs=[info_output])

    return interface

# Create the interface
print("\n🎨 Creating Gradio Interface...")
interface = create_gradio_interface()
print("✅ Interface ready!")


🎨 Creating Gradio Interface...
✅ Interface ready!


In [27]:
!pip install pypdf PyPDF

def run_system():
    """Launch the complete system"""
    print("🚀 Launching RAG System...")
    print("="*60)
    print("📋 Instructions:")
    print("1. Run this cell to launch the web interface")
    print("2. Upload your PDF research papers in the 'Upload & Setup' tab")
    print("3. Wait for processing to complete")
    print("4. Go to 'Ask Questions' tab and start asking!")
    print("="*60)

    # Launch interface
    interface.launch(
        share=True,  # Creates a public shareable link
        debug=True,
        height=800
    )

# Quick test function
def quick_test():
    """Quick test with sample data"""
    print("🧪 Quick Test Mode")
    print("-" * 30)

    # Setup with sample data
    if setup_with_sample_data():
        print("\n✅ Sample system ready!")

        # Test questions
        test_questions = [
            "What is this paper about?",
            "What methodology was used?",
            "What were the main results?"
        ]

        for question in test_questions:
            print(f"\n❓ Question: {question}")
            result = rag_system.ask_question(question, num_sources=2)
            print(f"✅ Answer: {result['answer'][:200]}...")
            print(f"📚 Sources: {len(result['sources'])} found")
            print(f"🎯 Confidence: {result['confidence']}")
    else:
        print("❌ Test setup failed")



In [None]:
run_system()

🚀 Launching RAG System...
📋 Instructions:
1. Run this cell to launch the web interface
2. Upload your PDF research papers in the 'Upload & Setup' tab
3. Wait for processing to complete
4. Go to 'Ask Questions' tab and start asking!
Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://3897e081044a71abde.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


🚀 Setting up RAG system...
Step 1: Loading PDFs...
📖 Loading: 1706.03762v7.pdf
✅ Loaded 15 pages from 1706.03762v7.pdf
📖 Loading: 2005.11401v4.pdf
✅ Loaded 19 pages from 2005.11401v4.pdf
📖 Loading: 2005.14165v4.pdf
✅ Loaded 75 pages from 2005.14165v4.pdf

Step 2: Creating text chunks...
🔪 Creating text chunks...
✅ Created 461 chunks

Step 3: Creating vector embeddings...
⚡ Creating vector embeddings...
📊 Processing 461 chunks...
✅ Vector store created successfully!
📈 Vector dimension: 384
📚 Total vectors: 461

🎉 RAG System Setup Complete!
📚 Loaded: 109 pages from 3 PDFs
🔪 Created: 461 text chunks
⚡ Vector store ready with 461 embeddings


Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.


🔍 Processing question: 'What are the main components of a RAG model, and how do they interact? '
🔍 Found 3 relevant chunks for query
🤖 Generating answer...
✅ Answer generated (confidence: 0.14800000190734863)
🔍 Processing question: 'What are the two sub-layers in each encoder layer of the Transformer model?'
🔍 Found 3 relevant chunks for query
🤖 Generating answer...
✅ Answer generated (confidence: 0.16200000047683716)
🔍 Processing question: 'Explain how positional encoding is implemented in Transformers and why it is necessary. Describe the concept of multi-head attention in the Transformer architecture. Why is it beneficial?'
🔍 Found 3 relevant chunks for query
🤖 Generating answer...
✅ Answer generated (confidence: 0.2750000059604645)
