In [1]:
# Install required packages
!pip install -q langchain langchain_google_genai langchain_community unstructured pdf2image pytesseract pdfminer.six python-docx chromadb sentence-transformers google-generativeai ipywidgets faiss-cpu

# Fix deprecation warning for HuggingFaceEmbeddings
!pip install -q langchain-huggingface

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/981.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m972.8/981.5 kB[0m [31m33.9 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m21.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.0/42.0 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m87.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import os
import tempfile
import shutil
import re
import glob
from typing import List, Dict, Any, Tuple
from pathlib import Path
import numpy as np
import uuid
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
import warnings
warnings.filterwarnings('ignore')

# Access Colab secrets
from google.colab import userdata

# For document processing
from langchain_community.document_loaders import (
    PyPDFLoader,
    TextLoader,
    Docx2txtLoader,
    UnstructuredFileLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter

# For embeddings and vector DB
import chromadb
from langchain_community.vectorstores import Chroma, FAISS
from langchain_huggingface import HuggingFaceEmbeddings

# For generative AI model
import google.generativeai as genai
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain

In [3]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
# ===== CONFIGURATION =====

class Config:
    """Configuration parameters for the application."""

    # Paths
    DOC_DIR = "/content/drive/MyDrive/RAGDocuments"  # Directory containing documents
    DB_DIR = "/content/drive/MyDrive/RAGVectorDB"    # Directory to store vector database

    # Document processing
    CHUNK_SIZE = 1000
    CHUNK_OVERLAP = 200

    # Vector DB
    EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"  # HF model for embeddings

    # Retrieval parameters
    TOP_K_RETRIEVAL = 4  # Number of chunks to retrieve for context

    # Security - will be loaded from environment variables
    HF_TOKEN = None
    GENAI_API_KEY = None

# ===== SECURE KEY MANAGEMENT =====

def setup_api_keys() -> None:
    """Securely set up API keys from environment variables or Colab secrets."""

    # For Hugging Face
    try:
        # Check if already set
        import huggingface_hub

        # Try Colab userdata first
        try:
            hf_token = userdata.get('HUGGINGFACEHUB_API_TOKEN')
            huggingface_hub.HfFolder.save_token(hf_token)
            Config.HF_TOKEN = hf_token
            print("✅ Hugging Face token configured from Colab secrets")
        except:
            # Fallback to environment variables
            hf_token = os.environ.get('HUGGINGFACEHUB_API_TOKEN', '') or os.environ.get('HF_TOKEN', '')
            if hf_token:
                huggingface_hub.HfFolder.save_token(hf_token)
                Config.HF_TOKEN = hf_token
                print("✅ Hugging Face token configured from environment")
            else:
                print("⚠️ Hugging Face token not found")
    except Exception as e:
        print(f"⚠️ Hugging Face token not configured: {e}")

    # For Google Genai
    try:
        # Try Colab userdata first
        try:
            gemini_key = userdata.get('GEMINI_API_KEY')
            Config.GENAI_API_KEY = gemini_key
            genai.configure(api_key=gemini_key)
            print("✅ Gemini API key configured from Colab secrets")
        except:
            # Fallback to environment variables
            gemini_key = os.environ.get('GEMINI_API_KEY', '') or os.environ.get('GENAI_API_KEY', '')
            if gemini_key:
                Config.GENAI_API_KEY = gemini_key
                genai.configure(api_key=gemini_key)
                print("✅ Gemini API key configured from environment")
            else:
                print("⚠️ Gemini API key not set")
    except Exception as e:
        print(f"⚠️ Error configuring Gemini API: {e}")

# Prompt user to input API keys if not found in environment
def request_api_keys() -> None:
    """Request API keys from user if not found in environment variables."""

    print("\n==== Secure API Key Configuration ====")
    print("API keys will be stored in environment variables for this session only.")

    if not Config.HF_TOKEN:
        hf_token = input("Enter your Hugging Face API token (or press Enter to skip): ")
        if hf_token:
            os.environ['HUGGINGFACEHUB_API_TOKEN'] = hf_token
            Config.HF_TOKEN = hf_token
            import huggingface_hub
            huggingface_hub.HfFolder.save_token(hf_token)
            print("✅ Hugging Face token set")

    if not Config.GENAI_API_KEY:
        genai_key = input("Enter your Google Generative AI API key: ")
        if genai_key:
            os.environ['GEMINI_API_KEY'] = genai_key
            Config.GENAI_API_KEY = genai_key
            genai.configure(api_key=genai_key)
            print("✅ Gemini API key set")

# ===== DOCUMENT PROCESSING MODULE =====

class DocumentProcessor:
    """Handle document ingestion, parsing, and chunking."""

    @staticmethod
    def get_loader_for_file(file_path: str):
        """Return the appropriate loader based on file extension."""
        file_extension = file_path.split('.')[-1].lower()

        try:
            if file_extension == 'pdf':
                return PyPDFLoader(file_path)
            elif file_extension == 'txt':
                return TextLoader(file_path)
            elif file_extension in ['docx', 'doc']:
                return Docx2txtLoader(file_path)
            else:
                # Fallback to unstructured for other file types
                return UnstructuredFileLoader(file_path)
        except Exception as e:
            print(f"Error creating loader for {file_path}: {e}")
            return None

    @staticmethod
    def load_documents(directory: str) -> Tuple[List[Any], List[str]]:
        """
        Load all supported documents from a directory.

        Args:
            directory: Path to directory containing documents

        Returns:
            Tuple of (loaded documents, list of failed files)
        """
        documents = []
        failed_files = []

        # Check if directory exists
        if not os.path.exists(directory):
            os.makedirs(directory, exist_ok=True)
            print(f"Created document directory: {directory}")
            return documents, failed_files

        # Get all files with supported extensions
        file_pattern = os.path.join(directory, "**")
        all_files = []

        for ext in ['pdf', 'txt', 'docx', 'doc']:
            all_files.extend(glob.glob(f"{file_pattern}/*.{ext}", recursive=True))

        if not all_files:
            print(f"No supported documents found in {directory}")
            return documents, failed_files

        print(f"Found {len(all_files)} documents to process")

        # Process each file
        for file_path in all_files:
            try:
                print(f"Processing: {os.path.basename(file_path)}")
                loader = DocumentProcessor.get_loader_for_file(file_path)

                if loader:
                    file_docs = loader.load()
                    # Add source metadata if not present
                    for doc in file_docs:
                        if 'source' not in doc.metadata:
                            doc.metadata['source'] = file_path
                    documents.extend(file_docs)
                    print(f"  ✓ Loaded {len(file_docs)} pages/sections")
                else:
                    failed_files.append(file_path)

            except Exception as e:
                print(f"  ✗ Failed to load {file_path}: {e}")
                failed_files.append(file_path)

        return documents, failed_files

    @staticmethod
    def chunk_documents(documents: List[Any], chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Any]:
        """
        Split documents into chunks for processing.

        Args:
            documents: List of LangChain document objects
            chunk_size: Maximum size of each chunk
            chunk_overlap: Overlap between chunks

        Returns:
            List of document chunks
        """
        if not documents:
            return []

        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", " ", ""]
        )

        chunks = text_splitter.split_documents(documents)
        print(f"Split {len(documents)} documents into {len(chunks)} chunks")

        return chunks

# ===== VECTOR DATABASE MODULE =====

class VectorDBManager:
    """Manage vector database operations including embedding and storage."""

    def __init__(self, embedding_model_name: str = Config.EMBEDDING_MODEL, persist_dir: str = Config.DB_DIR):
        """
        Initialize the vector database manager.

        Args:
            embedding_model_name: Name of the HuggingFace embedding model
            persist_dir: Directory to persist the vector database
        """
        self.persist_dir = persist_dir

        # Create embeddings model
        self.embeddings = HuggingFaceEmbeddings(
            model_name=embedding_model_name,
            cache_folder="/tmp/hf_cache"
        )

        # Ensure persistence directory exists
        os.makedirs(persist_dir, exist_ok=True)

    def create_or_load_db(self, document_chunks: List[Any], db_type: str = "chroma") -> Any:
        """
        Create a new vector database or load existing one.

        Args:
            document_chunks: List of document chunks to embed
            db_type: Type of vector database ("chroma" or "faiss")

        Returns:
            Vector database instance
        """
        if not document_chunks and not os.path.exists(os.path.join(self.persist_dir, 'index')):
            raise ValueError("Cannot create database: No documents provided and no existing DB found")

        if db_type.lower() == "chroma":
            # Check if database already exists
            if os.path.exists(os.path.join(self.persist_dir, 'index')):
                print(f"Loading existing Chroma database from {self.persist_dir}")
                return Chroma(
                    persist_directory=self.persist_dir,
                    embedding_function=self.embeddings
                )
            else:
                print(f"Creating new Chroma database in {self.persist_dir}")
                db = Chroma.from_documents(
                    documents=document_chunks,
                    embedding=self.embeddings,
                    persist_directory=self.persist_dir
                )
                # DB is auto-persisted since Chroma 0.4.x
                return db

        elif db_type.lower() == "faiss":
            index_file = os.path.join(self.persist_dir, "faiss_index")

            if os.path.exists(index_file):
                print(f"Loading existing FAISS database from {index_file}")
                return FAISS.load_local(
                    folder_path=self.persist_dir,
                    embeddings=self.embeddings,
                    index_name="faiss_index"
                )
            else:
                print(f"Creating new FAISS database in {self.persist_dir}")
                db = FAISS.from_documents(
                    documents=document_chunks,
                    embedding=self.embeddings
                )
                db.save_local(self.persist_dir, index_name="faiss_index")
                return db
        else:
            raise ValueError(f"Unsupported vector database type: {db_type}")

# ===== RAG AGENT MODULE =====

class RAGAgent:
    """Implement the RAG pipeline with retrieval and generation capabilities."""

    def __init__(self, vector_db: Any, api_key: str = None):
        """
        Initialize the RAG agent.

        Args:
            vector_db: Vector database for retrieval
            api_key: Google Generative AI API key
        """
        self.vector_db = vector_db

        # Configure the Gemini model
        if api_key:
            genai.configure(api_key=api_key)

        # Create the LLM using the approach confirmed to work
        self.llm = ChatGoogleGenerativeAI(
            model="gemini-1.5-flash",  # Use the free tier model that's working
            temperature=0.3,
            top_p=0.95,
            google_api_key=api_key,  # Explicitly pass the API key
            convert_system_message_to_human=True
        )

        # Set up the retriever
        self.retriever = vector_db.as_retriever(
            search_type="similarity",
            search_kwargs={"k": Config.TOP_K_RETRIEVAL}
        )

        # Create conversation memory
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True,
            output_key="answer"  # Explicitly set the output key
        )

    def create_qa_chain(self) -> Any:
        """
        Create a conversational QA chain with RAG capabilities.

        Returns:
            Conversational retrieval chain
        """
        # Custom prompt template for RAG
        prompt_template = """
        You are a helpful assistant answering questions based on retrieved document content.

        CONTEXT INFORMATION:
        {context}

        CHAT HISTORY:
        {chat_history}

        QUESTION:
        {question}

        YOUR RESPONSE:
        Answer the question based ONLY on the provided context. If the context doesn't contain
        relevant information, say "I don't have enough information to answer this question."

        When answering, cite the source document names in your response.
        Follow the KISS principle (Keep It Simple, Stupid) and provide clear, concise answers.
        """

        PROMPT = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "chat_history", "question"]
        )

        # Create the conversational chain with explicit output_key
        chain = ConversationalRetrievalChain.from_llm(
            llm=self.llm,
            retriever=self.retriever,
            memory=self.memory,
            combine_docs_chain_kwargs={"prompt": PROMPT},
            return_source_documents=True,
            output_key="answer",  # Explicitly set the output key
            verbose=False
        )

        return chain

    def answer_question(self, question: str) -> Dict[str, Any]:
        """
        Answer a question using the RAG pipeline.

        Args:
            question: User's question

        Returns:
            Dict with answer and source documents
        """
        chain = self.create_qa_chain()

        try:
            # Use the chain to answer the question
            result = chain.invoke({"question": question})

            # Extract answer and sources
            answer = result.get("answer", "")

            # Extract source information
            sources = []
            if "source_documents" in result:
                for doc in result["source_documents"]:
                    if "source" in doc.metadata:
                        source_path = doc.metadata["source"]
                        source_name = os.path.basename(source_path)
                        sources.append(source_name)

            return {
                "answer": answer,
                "sources": list(set(sources))  # Deduplicate sources
            }

        except Exception as e:
            error_msg = f"Error generating answer: {str(e)}"
            print(error_msg)
            return {
                "answer": "I encountered an error while trying to answer your question. Please try again.",
                "error": error_msg,
                "sources": []
            }

    def reset_memory(self) -> None:
        """Reset the conversation memory."""
        self.memory.clear()

# ===== USER INTERFACE MODULE =====

class RAGUI:
    """User interface for the RAG application."""

    def __init__(self):
        """Initialize the UI components."""
        self.rag_agent = None
        self.document_processor = None
        self.vector_db_manager = None
        self.vector_db = None

        # UI components
        self.doc_path_input = widgets.Text(
            value=Config.DOC_DIR,
            description='Document Path:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='80%')
        )

        self.load_docs_button = widgets.Button(
            description='Load Documents',
            button_style='primary',
            layout=widgets.Layout(width='30%')
        )
        self.load_docs_button.on_click(self.load_documents_callback)

        self.status_output = widgets.Output(layout={'border': '1px solid #ddd'})

        self.question_input = widgets.Text(
            value='',
            placeholder='Ask a question about your documents...',
            description='Question:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='80%'),
            disabled=True
        )
        self.question_input.on_submit(self.answer_question_callback)

        self.ask_button = widgets.Button(
            description='Ask',
            button_style='success',
            layout=widgets.Layout(width='20%'),
            disabled=True
        )
        self.ask_button.on_click(self.answer_question_callback)

        self.reset_chat_button = widgets.Button(
            description='Reset Chat',
            button_style='warning',
            layout=widgets.Layout(width='20%'),
            disabled=True
        )
        self.reset_chat_button.on_click(self.reset_chat_callback)

        self.answer_output = widgets.Output(layout={'border': '1px solid #ddd'})

    def display_ui(self) -> None:
        """Display the complete UI."""
        # First check and request API keys
        setup_api_keys()
        if not Config.HF_TOKEN or not Config.GENAI_API_KEY:
            request_api_keys()

        # App title
        display(HTML("<h1>RAG-Powered Document Q&A System</h1>"))

        # Document loading section
        display(HTML("<h2>1. Document Processing</h2>"))
        display(widgets.HBox([self.doc_path_input, self.load_docs_button]))
        display(self.status_output)

        # Q&A section
        display(HTML("<h2>2. Ask Questions</h2>"))
        display(widgets.HBox([self.question_input, self.ask_button, self.reset_chat_button]))
        display(self.answer_output)

        # Display initial status
        with self.status_output:
            print("Status: Ready to load documents")
            print(f"Document directory: {Config.DOC_DIR}")
            print(f"Vector DB directory: {Config.DB_DIR}")
            if os.path.exists(os.path.join(Config.DB_DIR, 'index')):
                print("An existing vector database was found and will be used if no new documents are loaded.")

    def load_documents_callback(self, button) -> None:
        """Callback for document loading button."""
        with self.status_output:
            clear_output()
            print(f"Loading documents from: {self.doc_path_input.value}")

            try:
                # Update document directory in config
                Config.DOC_DIR = self.doc_path_input.value

                # Initialize document processor
                self.document_processor = DocumentProcessor()

                # Load documents
                documents, failed_files = self.document_processor.load_documents(Config.DOC_DIR)

                if not documents:
                    print("⚠️ No documents were loaded successfully. Please check the document directory.")
                    if failed_files:
                        print(f"Failed files: {', '.join([os.path.basename(f) for f in failed_files])}")
                    return

                # Chunk documents
                document_chunks = self.document_processor.chunk_documents(
                    documents,
                    chunk_size=Config.CHUNK_SIZE,
                    chunk_overlap=Config.CHUNK_OVERLAP
                )

                # Initialize vector database
                self.vector_db_manager = VectorDBManager(
                    embedding_model_name=Config.EMBEDDING_MODEL,
                    persist_dir=Config.DB_DIR
                )

                # Create or load vector database
                self.vector_db = self.vector_db_manager.create_or_load_db(document_chunks)

                # Fix persistence warning
                if hasattr(self.vector_db, 'persist'):
                    print("Note: Using Chroma DB which now auto-persists data")

                # Initialize RAG agent
                self.rag_agent = RAGAgent(self.vector_db, api_key=Config.GENAI_API_KEY)

                # Enable Q&A components
                self.question_input.disabled = False
                self.ask_button.disabled = False
                self.reset_chat_button.disabled = False

                print("✅ System ready for questions!")

            except Exception as e:
                print(f"❌ Error during document processing: {str(e)}")

    def answer_question_callback(self, widget) -> None:
        """Callback for question answering."""
        question = self.question_input.value

        if not question.strip():
            return

        with self.answer_output:
            clear_output()
            print(f"Q: {question}")
            print("Thinking...")

            if self.rag_agent:
                try:
                    result = self.rag_agent.answer_question(question)

                    clear_output()
                    print(f"Q: {question}")
                    print(f"\nA: {result['answer']}")

                    if result.get('sources'):
                        print(f"\nSources: {', '.join(result['sources'])}")

                    if result.get('error'):
                        print(f"\nError details: {result['error']}")
                except Exception as e:
                    clear_output()
                    print(f"Q: {question}")
                    print(f"\nA: I encountered an error while trying to answer your question.")
                    print(f"\nError details: {str(e)}")
            else:
                print("❌ System not initialized. Please load documents first.")

        # Clear the question input
        self.question_input.value = ''

    def reset_chat_callback(self, button) -> None:
        """Callback to reset the chat history."""
        if self.rag_agent:
            self.rag_agent.reset_memory()

            with self.answer_output:
                clear_output()
                print("Chat history has been reset.")

# ===== MAIN APPLICATION =====

def main():
    """Main entry point for the application."""
    print("Starting RAG-Powered Document Q&A System...")

    # Debug - print environment variables (without showing key values)
    print("\nChecking for environment variables:")
    for env_var in ['HUGGINGFACEHUB_API_TOKEN', 'HF_TOKEN', 'GEMINI_API_KEY', 'GENAI_API_KEY']:
        if env_var in os.environ:
            print(f"  ✓ {env_var} is set")
        else:
            print(f"  ✗ {env_var} is not set")

    # Create and display UI
    ui = RAGUI()
    ui.display_ui()

# Run the application
main()

# ===== EXAMPLE USAGE =====

"""
# Example Test Queries

Once you've loaded your documents, try these example queries:

1. Basic information retrieval:
   "What are the main topics covered in these documents?"

2. Specific information retrieval:
   "What does [specific document] say about [specific topic]?"

3. Comparative analysis:
   "Compare how different documents approach [topic]."

4. Follow-up questions:
   "Tell me more about [something mentioned in previous answer]."

# Customization Options

To customize this RAG application:

1. Change embedding model:
   - Modify Config.EMBEDDING_MODEL to use different HuggingFace models
   - Options include "sentence-transformers/all-mpnet-base-v2" (higher quality but slower)

2. Adjust chunking parameters:
   - Increase/decrease Config.CHUNK_SIZE based on document complexity
   - Adjust Config.CHUNK_OVERLAP to control context preservation

3. Tune retrieval parameters:
   - Change Config.TOP_K_RETRIEVAL to get more or fewer context chunks

4. Switch vector database:
   - Pass "faiss" instead of "chroma" to VectorDBManager.create_or_load_db()

5. Modify LLM parameters:
   - Adjust temperature, top_p in RAGAgent.__init__() for different generation styles
"""

Starting RAG-Powered Document Q&A System...

Checking for environment variables:
  ✗ HUGGINGFACEHUB_API_TOKEN is not set
  ✗ HF_TOKEN is not set
  ✗ GEMINI_API_KEY is not set
  ✗ GENAI_API_KEY is not set
✅ Hugging Face token configured from Colab secrets
✅ Gemini API key configured from Colab secrets


HBox(children=(Text(value='/content/drive/MyDrive/RAGDocuments', description='Document Path:', layout=Layout(w…

Output(layout=Layout(border='1px solid #ddd'))

HBox(children=(Text(value='', description='Question:', disabled=True, layout=Layout(width='80%'), placeholder=…

Output(layout=Layout(border='1px solid #ddd'))

'\n# Example Test Queries\n\nOnce you\'ve loaded your documents, try these example queries:\n\n1. Basic information retrieval:\n   "What are the main topics covered in these documents?"\n\n2. Specific information retrieval:\n   "What does [specific document] say about [specific topic]?"\n\n3. Comparative analysis:\n   "Compare how different documents approach [topic]."\n\n4. Follow-up questions:\n   "Tell me more about [something mentioned in previous answer]."\n\n# Customization Options\n\nTo customize this RAG application:\n\n1. Change embedding model:\n   - Modify Config.EMBEDDING_MODEL to use different HuggingFace models\n   - Options include "sentence-transformers/all-mpnet-base-v2" (higher quality but slower)\n\n2. Adjust chunking parameters:\n   - Increase/decrease Config.CHUNK_SIZE based on document complexity\n   - Adjust Config.CHUNK_OVERLAP to control context preservation\n\n3. Tune retrieval parameters:\n   - Change Config.TOP_K_RETRIEVAL to get more or fewer context chunks