# ü§ñ ML Study Buddy RAG System APP

A Retrieval-Augmented Generation (RAG) system for machine learning study materials.

**Features:**
- üìö Knowledge ingestion from arXiv papers, PDFs, and web content
- üîç FAISS vector store for semantic search
- üí¨ Text and voice-based Q&A using Groq LLM
- üé§ Speech-to-Text and Text-to-Speech via HuggingFace
- üåê FastAPI REST API with CORS support

**APIs Used:**
- **Groq API** - For LLM (Llama 3.3 70B) - FREE tier available
- **HuggingFace** - For embeddings and voice features

---

## 1. Installation & Setup

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [24]:
# Define the path within your Google Drive
DRIVE_PROJECT_PATH = "/content/drive/MyDrive/ML_RAG_KnowledgeBase"

# Update the Config class to use the Google Drive path
config.knowledge_base_dir = f"{DRIVE_PROJECT_PATH}/knowledge_base"
config.faiss_index_path = f"{DRIVE_PROJECT_PATH}/faiss_index"

config.ensure_directories()
print(f"üìÅ Knowledge base will be stored at: {config.knowledge_base_dir}")
print(f"üìÅ FAISS index will be stored at: {config.faiss_index_path}")

# Re-initialize vector store manager with new paths
vector_store = VectorStoreManager(embedding_model=config.embedding_model, index_path=config.faiss_index_path)
print("‚úÖ Vector store manager re-initialized with Google Drive paths")


üìÅ Knowledge base will be stored at: /content/drive/MyDrive/ML_RAG_KnowledgeBase/knowledge_base
üìÅ FAISS index will be stored at: /content/drive/MyDrive/ML_RAG_KnowledgeBase/faiss_index
‚úÖ Vector store manager re-initialized with Google Drive paths


In [25]:
# Try to load existing index from Google Drive path
if not vector_store.load_index():
    print("‚ö†Ô∏è No existing index found in Google Drive. Proceed to build the knowledge base (Section 8) or upload PDFs (Section 8.1).")
    print("The knowledge base will be saved to your Google Drive automatically after building.")
else:
    print(f"üìä Index loaded from Google Drive with {vector_store.get_document_count()} documents")

# Re-initialize RAG chain after potentially loading from Drive
if vector_store.is_loaded and config.groq_api_key:
    retriever = vector_store.get_retriever({"k": config.top_k_results})
    rag_chain = RAGChain(
        llm_model=config.llm_model,
        retriever=retriever,
        groq_api_key=config.groq_api_key
    )
    print("‚úÖ RAG chain initialized!")
else:
    if not vector_store.is_loaded:
        print("‚ö†Ô∏è Vector store not loaded. Build knowledge base first.")
    if not config.groq_api_key:
        print("‚ö†Ô∏è Groq API key not set. Please set it in Section 2.")


üîÑ Loading embedding model...
‚úÖ Embedding model loaded
‚úÖ Loaded index with 33698 documents
üìä Index loaded from Google Drive with 33698 documents
‚úÖ RAG chain initialized!


In [None]:

# Install required packages - using latest compatible versions
!pip install -q --upgrade langchain langchain-community langchain-groq langchain-huggingface
!pip install -q --upgrade langchain-text-splitters langchain-core
!pip install -q faiss-cpu sentence-transformers
!pip install -q PyMuPDF arxiv beautifulsoup4 requests  # Changed: pypdf -> PyMuPDF
!pip install -q fastapi uvicorn python-multipart pyngrok nest-asyncio
!pip install -q python-dotenv pydantic
!pip install -q transformers torch torchaudio scipy datasets
!pip install -q gradio
!pip install -q Pillow  # For image processing

# Restart runtime after installation (run this cell, then Runtime -> Restart runtime)
print('\\n‚ö†Ô∏è After running this cell, go to Runtime -> Restart runtime, then continue from cell 2')

[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/102.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m102.1/102.1 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m2.5/2.5 MB[0m [31m36.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m137.5/137.5 kB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.0/1.0 MB[0m [31m47.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚

In [4]:
# Import standard libraries
import os
import re
import logging
from pathlib import Path
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
from io import BytesIO

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("‚úÖ Imports complete")

‚úÖ Imports complete


## 2. Configuration

Enter your API keys below. You only need a **Groq API key** (free tier available at https://console.groq.com)

In [None]:
# @title API Keys Configuration { display-mode: "form" }
GROQ_API_KEY = ""  # @param {type:"string"}
HUGGINGFACE_API_KEY = ""  # @param {type:"string"} (Optional)

# Set environment variables
os.environ["GROQ_API_KEY"] = GROQ_API_KEY
if HUGGINGFACE_API_KEY:
    os.environ["HUGGINGFACE_API_KEY"] = HUGGINGFACE_API_KEY

print("‚úÖ API keys configured!" if GROQ_API_KEY else "‚ö†Ô∏è Please enter your Groq API key above")

‚úÖ API keys configured!


In [6]:
@dataclass
class Config:
    """Central configuration for the ML RAG System."""
    groq_api_key: str = ""
    huggingface_api_key: str = ""
    llm_model: str = "llama-3.3-70b-versatile"
    embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"
    chunk_size: int = 1000
    chunk_overlap: int = 200
    top_k_results: int = 5
    knowledge_base_dir: str = "./knowledge_base"
    faiss_index_path: str = "./faiss_index"
    arxiv_categories: List[str] = field(default_factory=lambda: ["cs.LG", "cs.AI", "cs.CL"])
    arxiv_max_papers: int = 10
    api_host: str = "0.0.0.0"
    api_port: int = 8000

    @classmethod
    def from_env(cls) -> "Config":
        return cls(
            groq_api_key=os.getenv("GROQ_API_KEY", ""),
            huggingface_api_key=os.getenv("HUGGINGFACE_API_KEY", ""),
            llm_model=os.getenv("LLM_MODEL", "llama-3.3-70b-versatile"),
            embedding_model=os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2"),
            chunk_size=int(os.getenv("CHUNK_SIZE", "1000")),
            chunk_overlap=int(os.getenv("CHUNK_OVERLAP", "200")),
            top_k_results=int(os.getenv("TOP_K_RESULTS", "5")),
            api_port=int(os.getenv("API_PORT", "8000")),
        )

    def validate(self):
        if self.chunk_overlap >= self.chunk_size:
            raise ValueError("chunk_overlap must be less than chunk_size")
        if self.api_port < 1 or self.api_port > 65535:
            raise ValueError("api_port must be between 1 and 65535")

    def ensure_directories(self):
        Path(self.knowledge_base_dir).mkdir(parents=True, exist_ok=True)
        Path(self.faiss_index_path).parent.mkdir(parents=True, exist_ok=True)

# Initialize config
config = Config.from_env()
config.ensure_directories()
print(f"üìÅ Knowledge base: {config.knowledge_base_dir}")
print(f"üìÅ FAISS index: {config.faiss_index_path}")

üìÅ Knowledge base: ./knowledge_base
üìÅ FAISS index: ./faiss_index


## 3. Text Processing Utilities

In [7]:
from bs4 import BeautifulSoup

def clean_html(html: str) -> str:
    """Clean HTML content by removing tags and normalizing whitespace."""
    if not html:
        return ""
    soup = BeautifulSoup(html, "html.parser")
    for element in soup(["script", "style", "head", "meta", "link"]):
        element.decompose()
    text = soup.get_text(separator=" ")
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

print("‚úÖ HTML cleaner loaded")

‚úÖ HTML cleaner loaded


In [8]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

class TextChunker:
    """Splits text into chunks with configurable size and overlap."""

    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        if chunk_size <= 0:
            raise ValueError("chunk_size must be positive")
        if chunk_overlap < 0:
            raise ValueError("chunk_overlap must be non-negative")
        if chunk_size <= chunk_overlap:
            raise ValueError("chunk_size must be greater than chunk_overlap")

        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self._splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size, chunk_overlap=chunk_overlap,
            length_function=len, separators=["\n\n", "\n", " ", ""]
        )

    def split_text(self, text: str, metadata: dict = None) -> List[Document]:
        if not text or not text.strip():
            return []
        chunks = self._splitter.split_text(text)
        documents = []
        for i, chunk in enumerate(chunks):
            chunk_metadata = {**(metadata or {}), "chunk_index": i, "timestamp": datetime.now().isoformat()}
            documents.append(Document(page_content=chunk, metadata=chunk_metadata))
        return documents

    def split_documents(self, documents: List[Document]) -> List[Document]:
        result = []
        for doc in documents:
            chunks = self._splitter.split_text(doc.page_content)
            for i, chunk in enumerate(chunks):
                chunk_metadata = (doc.metadata.copy() if doc.metadata else {})
                chunk_metadata["chunk_index"] = i
                result.append(Document(page_content=chunk, metadata=chunk_metadata))
        return result

chunker = TextChunker(chunk_size=config.chunk_size, chunk_overlap=config.chunk_overlap)
print(f"‚úÖ Text chunker initialized (size={config.chunk_size}, overlap={config.chunk_overlap})")

‚úÖ Text chunker initialized (size=1000, overlap=200)


## 4. Vector Store Manager

In [9]:
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings

class VectorStoreError(Exception):
    pass

class VectorStoreManager:
    """Manages FAISS vector store for document embeddings."""

    def __init__(self, embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2", index_path: str = "./faiss_index"):
        self.embedding_model_name = embedding_model
        self.index_path = Path(index_path)
        self._embeddings = None
        self._vector_store = None

    @property
    def embeddings(self):
        if self._embeddings is None:
            print("üîÑ Loading embedding model...")
            self._embeddings = HuggingFaceEmbeddings(
                model_name=self.embedding_model_name,
                model_kwargs={'device': 'cpu'},
                encode_kwargs={'normalize_embeddings': True}
            )
            print("‚úÖ Embedding model loaded")
        return self._embeddings

    @property
    def is_loaded(self) -> bool:
        return self._vector_store is not None

    def load_index(self) -> bool:
        index_file = self.index_path / "index.faiss"
        if not index_file.exists():
            return False
        try:
            self._vector_store = FAISS.load_local(str(self.index_path), self.embeddings, allow_dangerous_deserialization=True)
            print(f"‚úÖ Loaded index with {self.get_document_count()} documents")
            return True
        except Exception as e:
            print(f"‚ùå Failed to load index: {e}")
            return False

    def save_index(self):
        if self._vector_store is None:
            raise VectorStoreError("No index to save")
        self.index_path.mkdir(parents=True, exist_ok=True)
        self._vector_store.save_local(str(self.index_path))
        print(f"‚úÖ Saved index to {self.index_path}")

    def add_documents(self, documents: List[Document]) -> int:
        if not documents:
            return 0
        if self._vector_store is None:
            self._vector_store = FAISS.from_documents(documents, self.embeddings)
        else:
            self._vector_store.add_documents(documents)
        print(f"‚úÖ Added {len(documents)} documents to index")
        return len(documents)

    def search(self, query: str, top_k: int = 5) -> List[Document]:
        if self._vector_store is None:
            raise VectorStoreError("No index loaded")
        if not query or not query.strip():
            return []
        return self._vector_store.similarity_search(query, k=min(top_k, self.get_document_count()))

    def get_document_count(self) -> int:
        return self._vector_store.index.ntotal if self._vector_store else 0

    def get_retriever(self, search_kwargs: dict = None):
        if self._vector_store is None:
            raise VectorStoreError("No index loaded")
        return self._vector_store.as_retriever(search_kwargs=search_kwargs or {"k": 5})

vector_store = VectorStoreManager(embedding_model=config.embedding_model, index_path=config.faiss_index_path)
print("‚úÖ Vector store manager initialized")

‚úÖ Vector store manager initialized


## 5. Document Ingestion

In [None]:
import arxiv
import requests
import fitz  # PyMuPDF
import io
import warnings
from PIL import Image

# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning)

class DeepSeekOCR:
    """DeepSeek OCR processor using HuggingFace transformers."""

    def __init__(self):
        self._model = None
        self._processor = None
        self._device = None

    def _load_model(self):
        """Lazy load the OCR model."""
        if self._model is not None:
            return

        try:
            import torch
            from transformers import TrOCRProcessor, VisionEncoderDecoderModel

            print("üîÑ Loading OCR model (TrOCR)...")

            self._device = "cuda" if torch.cuda.is_available() else "cpu"
            model_name = "microsoft/trocr-base-printed"

            self._processor = TrOCRProcessor.from_pretrained(model_name)
            self._model = VisionEncoderDecoderModel.from_pretrained(model_name).to(self._device)

            print(f"‚úÖ OCR model loaded on {self._device}")

        except Exception as e:
            print(f"‚ùå Failed to load OCR model: {e}")
            self._model = None
            self._processor = None

    def extract_text_from_image(self, image) -> str:
        """Extract text from a PIL Image using OCR."""
        self._load_model()

        if self._model is None or self._processor is None:
            return ""

        try:
            import torch

            # Ensure image is in RGB mode
            if image.mode != "RGB":
                image = image.convert("RGB")

            # Process with TrOCR
            pixel_values = self._processor(image, return_tensors="pt").pixel_values.to(self._device)

            with torch.no_grad():
                generated_ids = self._model.generate(pixel_values, max_length=512)

            text = self._processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
            return text.strip()

        except Exception as e:
            print(f"‚ùå OCR extraction failed: {e}")
            return ""


class ArxivIngester:
    """Downloads and processes arXiv papers."""

    def __init__(self, download_dir: str = "./knowledge_base"):
        self.download_dir = Path(download_dir)
        self.download_dir.mkdir(parents=True, exist_ok=True)

    def download_papers(self, categories: List[str], max_papers: int = 10) -> List[Path]:
        downloaded = []
        query = " OR ".join([f"cat:{cat}" for cat in categories])
        print(f"üîç Searching arXiv for: {query}")

        search = arxiv.Search(query=query, max_results=max_papers, sort_by=arxiv.SortCriterion.SubmittedDate)

        for paper in search.results():
            paper_id = paper.entry_id.split('/')[-1]
            filename = self.download_dir / f"{paper_id}.pdf"

            if filename.exists():
                print(f"‚è≠Ô∏è Skipping {paper_id} (exists)")
                downloaded.append(filename)
                continue

            try:
                paper.download_pdf(dirpath=str(self.download_dir), filename=f"{paper_id}.pdf")
                downloaded.append(filename)
                print(f"‚úÖ Downloaded: {paper.title[:50]}...")
            except Exception as e:
                print(f"‚ùå Failed: {paper_id}: {e}")

        return downloaded


class PDFProcessor:
    """
    Enhanced PDF processor with PyMuPDF primary extraction and OCR fallback.

    Extraction strategy:
    1. Try PyMuPDF (fitz) first - fast and structured
    2. If extraction fails or produces insufficient text, use OCR fallback
    3. For pages with embedded images, optionally apply OCR to images
    """

    MIN_TEXT_THRESHOLD = 50  # Minimum chars per page for successful extraction
    _ocr_instance = None

    @classmethod
    def _get_ocr(cls):
        if cls._ocr_instance is None:
            cls._ocr_instance = DeepSeekOCR()
        return cls._ocr_instance

    @staticmethod
    def _sanitize_text(text: str) -> str:
        """Sanitizes text to remove problematic characters for encoding."""
        if not text:
            return ""
        return text.encode('utf-8', errors='ignore').decode('utf-8')

    @staticmethod
    def _extract_with_pymupdf(pdf_source, filename: str = "document.pdf"):
        """Extract text using PyMuPDF (fitz)."""
        results = []

        try:
            # Open PDF from path or bytes
            if isinstance(pdf_source, (str, Path)):
                doc = fitz.open(str(pdf_source))
            else:
                doc = fitz.open(stream=pdf_source, filetype="pdf")

            for page_num in range(len(doc)):
                page = doc[page_num]
                text = page.get_text("text")
                sanitized = PDFProcessor._sanitize_text(text)

                # Check for images on the page
                image_list = page.get_images()
                has_images = len(image_list) > 0

                results.append({
                    "text": sanitized,
                    "page_number": page_num + 1,
                    "method": "pymupdf",
                    "has_images": has_images
                })

            doc.close()

        except Exception as e:
            print(f"‚ö†Ô∏è PyMuPDF extraction warning for {filename}: {e}")

        return results

    @staticmethod
    def _extract_page_with_ocr(pdf_source, page_num: int) -> str:
        """Extract text from a specific page using OCR."""
        try:
            # Open PDF
            if isinstance(pdf_source, (str, Path)):
                doc = fitz.open(str(pdf_source))
            else:
                doc = fitz.open(stream=pdf_source, filetype="pdf")

            page = doc[page_num]

            # Render page to image at high resolution
            mat = fitz.Matrix(2.0, 2.0)  # 2x zoom for better OCR
            pix = page.get_pixmap(matrix=mat)

            # Convert to PIL Image
            img_data = pix.tobytes("png")
            image = Image.open(io.BytesIO(img_data))

            doc.close()

            # Apply OCR
            ocr = PDFProcessor._get_ocr()
            text = ocr.extract_text_from_image(image)
            return PDFProcessor._sanitize_text(text)

        except Exception as e:
            print(f"‚ùå OCR extraction failed for page {page_num + 1}: {e}")
            return ""

    @staticmethod
    def load_pdf(file_path: Path) -> List[Document]:
        """Load and extract text from a PDF file with OCR fallback."""
        documents = []
        filename = Path(file_path).name

        print(f"üìÑ Processing {filename} with PyMuPDF...")

        # First try PyMuPDF
        results = PDFProcessor._extract_with_pymupdf(file_path, filename)

        if not results:
            print(f"‚ö†Ô∏è PyMuPDF failed completely, attempting full OCR for {filename}...")
            return PDFProcessor._load_pdf_with_full_ocr(file_path, filename)

        # Process each page
        for result in results:
            page_text = result["text"]

            # Check if extraction was insufficient
            if len(page_text.strip()) < PDFProcessor.MIN_TEXT_THRESHOLD:
                print(f"   Page {result['page_number']}: Insufficient text ({len(page_text.strip())} chars), applying OCR...")
                ocr_text = PDFProcessor._extract_page_with_ocr(file_path, result["page_number"] - 1)
                if ocr_text:
                    page_text = ocr_text
                    result["method"] = "ocr"

            if page_text.strip():
                documents.append(Document(
                    page_content=page_text,
                    metadata={
                        "source_type": "pdf",
                        "source_file": filename,
                        "page_number": result["page_number"],
                        "extraction_method": result["method"]
                    }
                ))

        print(f"‚úÖ Extracted {len(documents)} pages from {filename}")
        return documents

    @staticmethod
    def _load_pdf_with_full_ocr(file_path: Path, filename: str) -> List[Document]:
        """Load PDF using full OCR when PyMuPDF completely fails."""
        documents = []

        try:
            doc = fitz.open(str(file_path))

            for page_num in range(len(doc)):
                print(f"   OCR processing page {page_num + 1}/{len(doc)}...")
                text = PDFProcessor._extract_page_with_ocr(file_path, page_num)

                if text.strip():
                    documents.append(Document(
                        page_content=text,
                        metadata={
                            "source_type": "pdf",
                            "source_file": filename,
                            "page_number": page_num + 1,
                            "extraction_method": "ocr"
                        }
                    ))

            doc.close()

        except Exception as e:
            print(f"‚ùå Full OCR extraction failed: {e}")

        return documents

    @staticmethod
    def load_pdf_bytes(pdf_bytes: bytes, filename: str = "uploaded.pdf") -> List[Document]:
        """Load and extract text from PDF bytes with OCR fallback."""
        documents = []

        print(f"üìÑ Processing {filename} with PyMuPDF...")

        # First try PyMuPDF
        results = PDFProcessor._extract_with_pymupdf(pdf_bytes, filename)

        if not results:
            print(f"‚ö†Ô∏è PyMuPDF failed completely, attempting full OCR for {filename}...")
            return PDFProcessor._load_pdf_bytes_with_full_ocr(pdf_bytes, filename)

        # Process each page
        for result in results:
            page_text = result["text"]

            # Check if extraction was insufficient
            if len(page_text.strip()) < PDFProcessor.MIN_TEXT_THRESHOLD:
                print(f"   Page {result['page_number']}: Insufficient text, applying OCR...")
                ocr_text = PDFProcessor._extract_page_with_ocr(pdf_bytes, result["page_number"] - 1)
                if ocr_text:
                    page_text = ocr_text
                    result["method"] = "ocr"

            if page_text.strip():
                documents.append(Document(
                    page_content=page_text,
                    metadata={
                        "source_type": "pdf",
                        "source_file": filename,
                        "page_number": result["page_number"],
                        "extraction_method": result["method"]
                    }
                ))

        print(f"‚úÖ Extracted {len(documents)} pages from {filename}")
        return documents

    @staticmethod
    def _load_pdf_bytes_with_full_ocr(pdf_bytes: bytes, filename: str) -> List[Document]:
        """Load PDF bytes using full OCR."""
        documents = []

        try:
            doc = fitz.open(stream=pdf_bytes, filetype="pdf")

            for page_num in range(len(doc)):
                print(f"   OCR processing page {page_num + 1}/{len(doc)}...")
                text = PDFProcessor._extract_page_with_ocr(pdf_bytes, page_num)

                if text.strip():
                    documents.append(Document(
                        page_content=text,
                        metadata={
                            "source_type": "pdf",
                            "source_file": filename,
                            "page_number": page_num + 1,
                            "extraction_method": "ocr"
                        }
                    ))

            doc.close()

        except Exception as e:
            print(f"‚ùå Full OCR extraction failed: {e}")

        return documents


class ImageProcessor:
    """Process images and extract text using OCR."""

    SUPPORTED_FORMATS = {'.png', '.jpg', '.jpeg', '.webp', '.bmp', '.tiff', '.gif'}
    _ocr_instance = None

    @classmethod
    def _get_ocr(cls):
        if cls._ocr_instance is None:
            cls._ocr_instance = DeepSeekOCR()
        return cls._ocr_instance

    @staticmethod
    def is_supported(filename: str) -> bool:
        """Check if the file format is supported."""
        ext = Path(filename).suffix.lower()
        return ext in ImageProcessor.SUPPORTED_FORMATS

    @staticmethod
    def load_image(file_path: Path) -> List[Document]:
        """Load and extract text from an image file."""
        filename = Path(file_path).name

        if not ImageProcessor.is_supported(filename):
            print(f"‚ùå Unsupported image format: {filename}")
            return []

        print(f"üñºÔ∏è Processing image {filename} with OCR...")

        try:
            image = Image.open(file_path)
            ocr = ImageProcessor._get_ocr()
            text = ocr.extract_text_from_image(image)

            if text.strip():
                return [Document(
                    page_content=text,
                    metadata={
                        "source_type": "image",
                        "source_file": filename,
                        "extraction_method": "ocr"
                    }
                )]
            else:
                print(f"‚ö†Ô∏è No text extracted from {filename}")
                return []

        except Exception as e:
            print(f"‚ùå Failed to process image {filename}: {e}")
            return []

    @staticmethod
    def load_image_bytes(image_bytes: bytes, filename: str = "uploaded_image.png") -> List[Document]:
        """Load and extract text from image bytes."""
        print(f"üñºÔ∏è Processing image {filename} with OCR...")

        try:
            image = Image.open(io.BytesIO(image_bytes))
            ocr = ImageProcessor._get_ocr()
            text = ocr.extract_text_from_image(image)

            if text.strip():
                return [Document(
                    page_content=text,
                    metadata={
                        "source_type": "image",
                        "source_file": filename,
                        "extraction_method": "ocr"
                    }
                )]
            else:
                print(f"‚ö†Ô∏è No text extracted from {filename}")
                return []

        except Exception as e:
            print(f"‚ùå Failed to process image: {e}")
            return []


class WebScraper:
    """Scrapes content from web pages."""

    @staticmethod
    def scrape_url(url: str) -> Optional[Document]:
        try:
            response = requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'})
            response.raise_for_status()
            text = clean_html(response.text)
            if text:
                return Document(page_content=text, metadata={"source_type": "web", "source_url": url})
        except Exception as e:
            print(f"‚ùå Failed to scrape {url}: {e}")
        return None

print("‚úÖ Document ingestion modules loaded (PyMuPDF + OCR fallback)")

‚úÖ Document ingestion modules loaded


## 6. RAG Chain with Groq LLM

In [13]:
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel

@dataclass
class RAGResponse:
    answer: str
    sources: List[str]
    context_chunks: List[Document] = field(default_factory=list)

def format_docs(docs):
    """Format documents for context."""
    return '\n\n'.join(doc.page_content for doc in docs)

class RAGChain:
    """RAG chain using Groq LLM with LCEL (LangChain Expression Language)."""

    def __init__(self, llm_model: str, retriever, groq_api_key: str):
        self.llm = ChatGroq(model=llm_model, api_key=groq_api_key, temperature=0.7)
        self.retriever = retriever
        self._chat_histories: Dict[str, List] = {}
        self._setup_chain()

    def _setup_chain(self):
        system_prompt = """You are a helpful AI assistant specialized in machine learning.
Use the following context to answer the question. If you don't know, say so.
Always cite your sources when possible.

Context:
{context}

Question: {question}
"""
        prompt = ChatPromptTemplate.from_template(system_prompt)

        self.rag_chain = (
            RunnableParallel(context=self.retriever | format_docs, question=RunnablePassthrough())
            | prompt
            | self.llm
            | StrOutputParser()
        )

    def query(self, question: str, session_id: str = "default") -> RAGResponse:
        # Get relevant documents
        docs = self.retriever.invoke(question)

        # Get answer
        answer = self.rag_chain.invoke(question)

        # Extract sources
        sources = []
        for doc in docs:
            if doc.metadata.get("source_file"):
                sources.append(doc.metadata["source_file"])
            elif doc.metadata.get("source_url"):
                sources.append(doc.metadata["source_url"])

        return RAGResponse(answer=answer, sources=list(set(sources)), context_chunks=docs)

    def clear_session(self, session_id: str):
        if session_id in self._chat_histories:
            del self._chat_histories[session_id]

print("‚úÖ RAG chain module loaded")

‚úÖ RAG chain module loaded


## 7. Voice Module (HuggingFace)

In [14]:
import torch
import numpy as np
from transformers import pipeline, SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan

class SpeechToText:
    """Speech-to-text using HuggingFace Whisper."""

    def __init__(self):
        self._pipe = None

    @property
    def pipe(self):
        if self._pipe is None:
            print("üîÑ Loading Whisper model...")
            self._pipe = pipeline(
                "automatic-speech-recognition",
                model="openai/whisper-small",
                device="cuda" if torch.cuda.is_available() else "cpu"
            )
            print("‚úÖ Whisper model loaded")
        return self._pipe

    def transcribe(self, audio_path: str) -> str:
        result = self.pipe(audio_path)
        return result["text"]

class TextToSpeech:
    """Text-to-speech using HuggingFace SpeechT5 with random speaker embedding."""

    def __init__(self):
        self._processor = None
        self._model = None
        self._vocoder = None
        self._speaker_embedding = None

    def _load_models(self):
        if self._model is None:
            print("üîÑ Loading TTS model...")
            self._processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
            self._model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts")
            self._vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")
            # Use a fixed random speaker embedding (works without external dataset)
            torch.manual_seed(42)
            self._speaker_embedding = torch.randn(1, 512)
            print("‚úÖ TTS model loaded")

    @property
    def pipe(self):
        self._load_models()
        return self._model

    def synthesize(self, text: str, output_path: str = "output.wav") -> str:
        import scipy.io.wavfile as wav
        self._load_models()

        # Truncate text if too long (SpeechT5 has limits)
        if len(text) > 400:
            text = text[:400] + '...'

        # Process text
        inputs = self._processor(text=text, return_tensors="pt")

        # Generate speech
        speech = self._model.generate_speech(inputs["input_ids"], self._speaker_embedding, vocoder=self._vocoder)

        # Save to file
        wav.write(output_path, rate=16000, data=speech.numpy())
        return output_path

@dataclass
class VoiceResponse:
    text_response: str
    audio_path: Optional[str]
    sources: List[str]

class VoiceRAGHandler:
    """Orchestrates voice-based RAG queries."""

    def __init__(self, rag_chain_instance):
        self.rag_chain = rag_chain_instance
        self.stt = SpeechToText()
        self.tts = TextToSpeech()

    def process_voice_query(self, audio_path: str, session_id: str = "voice") -> VoiceResponse:
        print("üé§ Transcribing audio...")
        transcribed_text = self.stt.transcribe(audio_path)
        print(f"üìù Transcribed: {transcribed_text}")

        print("ü§î Processing query...")
        rag_response = self.rag_chain.query(transcribed_text, session_id)

        print("üîä Generating audio response...")
        audio_output = self.tts.synthesize(rag_response.answer, "response.wav")

        return VoiceResponse(
            text_response=rag_response.answer,
            audio_path=audio_output,
            sources=rag_response.sources
        )

print("‚úÖ Voice modules loaded")

‚úÖ Voice modules loaded


## 8. Build Knowledge Base

In [15]:
def build_knowledge_base(
    download_arxiv: bool = True,
    arxiv_categories: List[str] = None,
    max_papers: int = 5,
    web_urls: List[str] = None
) -> int:
    """Build the knowledge base from various sources."""
    all_documents = []

    if download_arxiv:
        categories = arxiv_categories or config.arxiv_categories
        ingester = ArxivIngester(download_dir=config.knowledge_base_dir)
        pdf_paths = ingester.download_papers(categories, max_papers)

        print(f"\nüìÑ Processing {len(pdf_paths)} PDFs...")
        for pdf_path in pdf_paths:
            docs = PDFProcessor.load_pdf(pdf_path)
            all_documents.extend(docs)

    if web_urls:
        print(f"\nüåê Scraping {len(web_urls)} URLs...")
        for url in web_urls:
            doc = WebScraper.scrape_url(url)
            if doc:
                all_documents.append(doc)

    if not all_documents:
        print("‚ö†Ô∏è No documents to process")
        return 0

    print(f"\n‚úÇÔ∏è Chunking {len(all_documents)} documents...")
    chunked_docs = chunker.split_documents(all_documents)
    print(f"   Created {len(chunked_docs)} chunks")

    print("\nüìä Adding to vector store...")
    count = vector_store.add_documents(chunked_docs)
    vector_store.save_index()

    return count

print("‚úÖ Knowledge base builder ready")

‚úÖ Knowledge base builder ready


In [16]:
# Curated ML Learning Resources - URLs to scrape
ML_LEARNING_URLS = [
    # Wikipedia - Core ML Concepts
    "https://en.wikipedia.org/wiki/Machine_learning",
    "https://en.wikipedia.org/wiki/Deep_learning",
    "https://en.wikipedia.org/wiki/Neural_network",
    "https://en.wikipedia.org/wiki/Artificial_neural_network",
    "https://en.wikipedia.org/wiki/Convolutional_neural_network",
    "https://en.wikipedia.org/wiki/Recurrent_neural_network",
    "https://en.wikipedia.org/wiki/Transformer_(machine_learning_model)",
    "https://en.wikipedia.org/wiki/Gradient_descent",
    "https://en.wikipedia.org/wiki/Backpropagation",
    "https://en.wikipedia.org/wiki/Supervised_learning",
    "https://en.wikipedia.org/wiki/Unsupervised_learning",
    "https://en.wikipedia.org/wiki/Reinforcement_learning",
    "https://en.wikipedia.org/wiki/Random_forest",
    "https://en.wikipedia.org/wiki/Support_vector_machine",
    "https://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm",
    "https://en.wikipedia.org/wiki/Decision_tree_learning",
    "https://en.wikipedia.org/wiki/Ensemble_learning",
    "https://en.wikipedia.org/wiki/Feature_engineering",
    "https://en.wikipedia.org/wiki/Overfitting",
    "https://en.wikipedia.org/wiki/Cross-validation_(statistics)",

    # GeeksforGeeks - ML Tutorials
    "https://www.geeksforgeeks.org/machine-learning/",
    "https://www.geeksforgeeks.org/getting-started-machine-learning/",
    "https://www.geeksforgeeks.org/supervised-unsupervised-learning/",
    "https://www.geeksforgeeks.org/ml-linear-regression/",
    "https://www.geeksforgeeks.org/ml-logistic-regression-using-python/",
    "https://www.geeksforgeeks.org/decision-tree/",
    "https://www.geeksforgeeks.org/random-forest-regression-in-python/",
    "https://www.geeksforgeeks.org/support-vector-machine-algorithm/",
    "https://www.geeksforgeeks.org/k-nearest-neighbours/",
    "https://www.geeksforgeeks.org/naive-bayes-classifiers/",
    "https://www.geeksforgeeks.org/neural-networks-a-beginners-guide/",
    "https://www.geeksforgeeks.org/introduction-deep-learning/",
    "https://www.geeksforgeeks.org/introduction-convolution-neural-network/",
    "https://www.geeksforgeeks.org/introduction-to-recurrent-neural-network/",

    # IBM - ML Concepts
    "https://www.ibm.com/think/topics/machine-learning",
    "https://www.ibm.com/think/topics/deep-learning",
    "https://www.ibm.com/think/topics/neural-networks",
    "https://www.ibm.com/think/topics/supervised-learning",
    "https://www.ibm.com/think/topics/unsupervised-learning",
    "https://www.ibm.com/think/topics/reinforcement-learning",

    # Towards Data Science / Medium (public articles)
    "https://towardsdatascience.com/machine-learning-basics-part-1-a36d38c7916",
]

print(f"üìö {len(ML_LEARNING_URLS)} ML learning URLs configured")

üìö 41 ML learning URLs configured


In [17]:
# üöÄ BUILD COMPLETE KNOWLEDGE BASE
# This will download arXiv papers AND scrape all ML websites

print('Building comprehensive ML knowledge base...')
print('This may take some time depending on your connection and the number of papers.\n')

count = build_knowledge_base(
    download_arxiv=True,
    arxiv_categories=['cs.LG', 'cs.AI', 'cs.NE'],  # ML, AI, Neural/Evolutionary
    max_papers=250,  # Download 250 recent papers
    web_urls=ML_LEARNING_URLS  # All curated URLs
)

print(f'\nKnowledge base built with {count} chunks!')
print(f'Total documents in index: {vector_store.get_document_count()}')

Building comprehensive ML knowledge base...
This may take some time depending on your connection and the number of papers.

üîç Searching arXiv for: cat:cs.LG OR cat:cs.AI OR cat:cs.NE


  for paper in search.results():


‚úÖ Downloaded: The Universal Weight Subspace Hypothesis...
‚úÖ Downloaded: Value Gradient Guidance for Flow Matching Alignmen...
‚úÖ Downloaded: Deep infant brain segmentation from multi-contrast...
‚úÖ Downloaded: DraCo: Draft as CoT for Text-to-Image Preview and ...
‚úÖ Downloaded: ShadowDraw: From Any Object to Shadow-Drawing Comp...
‚úÖ Downloaded: NeuralRemaster: Phase-Preserving Diffusion for Str...
‚úÖ Downloaded: Semantic Soft Bootstrapping: Long Context Reasonin...
‚úÖ Downloaded: TV2TV: A Unified Framework for Interleaved Languag...
‚úÖ Downloaded: Structured Document Translation via Format Reinfor...
‚úÖ Downloaded: SA-IQA: Redefining Image Quality Assessment for Sp...
‚úÖ Downloaded: Foundations of Diffusion Models in General State S...
‚úÖ Downloaded: The Geometry of Intelligence: Deterministic Functi...
‚úÖ Downloaded: Gradient Descent with Provably Tuned Learning-rate...
‚úÖ Downloaded: OMTRA: A Multi-Task Generative Model for Structure...
‚úÖ Downloaded: David vs. Goli




üåê Scraping 41 URLs...

‚úÇÔ∏è Chunking 5944 documents...
   Created 24811 chunks

üìä Adding to vector store...
üîÑ Loading embedding model...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

‚úÖ Embedding model loaded
‚úÖ Added 24811 documents to index
‚úÖ Saved index to /content/drive/MyDrive/ML_RAG_KnowledgeBase/faiss_index

Knowledge base built with 24811 chunks!
Total documents in index: 24811


### 8.1 Upload Your Own ML Books (PDF)

In [None]:
# üìö UPLOAD YOUR ML BOOKS (PDF) OR IMAGES
# Use this to add your own PDF textbooks or images to the knowledge base

from google.colab import files
import time

def upload_and_process_files():
    """Upload PDF or image files and add them to the knowledge base."""
    print('Select PDF or image files to upload...')
    uploaded = files.upload()

    if not uploaded:
        print('No files uploaded')
        return 0

    total_chunks = 0
    for filename, content in uploaded.items():
        ext = Path(filename).suffix.lower()

        # Process PDFs
        if ext == '.pdf':
            print(f'\\nüìÑ Processing PDF: {filename}')
            docs = PDFProcessor.load_pdf_bytes(content, filename)

        # Process Images
        elif ext in ImageProcessor.SUPPORTED_FORMATS:
            print(f'\\nüñºÔ∏è Processing Image: {filename}')
            docs = ImageProcessor.load_image_bytes(content, filename)

        else:
            print(f'‚ö†Ô∏è Skipping {filename} (unsupported format)')
            continue

        if not docs:
            print(f'Could not extract content from {filename}')
            continue

        print(f'   Extracted {len(docs)} document(s)')

        # Chunk the documents
        chunked = chunker.split_documents(docs)
        print(f'   Created {len(chunked)} chunks')

        # Add to vector store
        vector_store.add_documents(chunked)
        total_chunks += len(chunked)
        print(f'   ‚úÖ Added to knowledge base')

    # Save the updated index
    if total_chunks > 0:
        vector_store.save_index()
        print(f'\\n‚úÖ Successfully added {total_chunks} chunks from {len(uploaded)} file(s)')
        print(f'Total documents in index: {vector_store.get_document_count()}')

    return total_chunks

# Run this to upload your files:
upload_and_process_files()

Select PDF files to upload...


Saving Python_Machine_Learning_Sebastian_Raschka.pdf to Python_Machine_Learning_Sebastian_Raschka.pdf
Saving Practical_Machine_Learning_with_AWS_Process,_Himanshu Singh.pdf to Practical_Machine_Learning_with_AWS_Process,_Himanshu Singh.pdf
Saving Naive Bayes ML (1).pptx.pdf to Naive Bayes ML (1).pptx.pdf
Saving machine learning and algo.pdf to machine learning and algo.pdf
Saving M3-Machine_Learning.pdf to M3-Machine_Learning.pdf
Saving M1-Machine-Learning-Tom-Mitchell.pdf to M1-Machine-Learning-Tom-Mitchell.pdf
Saving Linear Regrssion.pdf to Linear Regrssion.pdf
Saving Introduction to Big Data Analytics.pdf to Introduction to Big Data Analytics.pdf
Saving Introduction ML.pdf to Introduction ML.pdf
Saving Hands on Machine Learning with Scikit Learn and TensorFlow (1).pdf to Hands on Machine Learning with Scikit Learn and TensorFlow (1).pdf
Saving Clustering ML.pdf to Clustering ML.pdf
Saving Chapter 1 - Introduction to Big Data.pdf to Chapter 1 - Introduction to Big Data.pdf
Saving Big



   Extracted 454 pages
   Created 992 chunks
‚úÖ Added 992 documents to index
   Added to knowledge base

Processing: Practical_Machine_Learning_with_AWS_Process,_Himanshu Singh.pdf
   Extracted 243 pages
   Created 469 chunks
‚úÖ Added 469 documents to index
   Added to knowledge base

Processing: Naive Bayes ML (1).pptx.pdf
   Extracted 32 pages
   Created 32 chunks
‚úÖ Added 32 documents to index
   Added to knowledge base

Processing: machine learning and algo.pdf
   Extracted 361 pages
   Created 898 chunks
‚úÖ Added 898 documents to index
   Added to knowledge base

Processing: M3-Machine_Learning.pdf
   Extracted 1075 pages
   Created 3256 chunks
‚úÖ Added 3256 documents to index
   Added to knowledge base

Processing: M1-Machine-Learning-Tom-Mitchell.pdf
   Extracted 412 pages
   Created 1461 chunks
‚úÖ Added 1461 documents to index
   Added to knowledge base

Processing: Linear Regrssion.pdf
   Extracted 32 pages
   Created 33 chunks
‚úÖ Added 33 documents to index
   Added to

8887

### 8.2 Add Custom URLs

In [None]:
# üåê ADD YOUR OWN URLS\n# Add any additional ML resources you want to include\n\ndef add_custom_urls(urls: List[str]):\n    \"\"\"Scrape and add custom URLs to the knowledge base.\"\"\"\n    if not urls:\n        print('\u26a0\ufe0f No URLs provided')\n        return 0\n    \n    all_docs = []\n    print(f'\ud83cÔøΩ Scraping {len(urls)} URLs...')\n    \n    for url in urls:\n        doc = WebScraper.scrape_url(url)\n        if doc:\n            all_docs.append(doc)\n            print(f'   \u2705 {url[:50]}...')\n        else:\n            print(f'   \u274c {url[:50]}...')\n    \n    if not all_docs:\n        print('\u26a0\ufe0f No content extracted')\n        return 0\n    \n    # Chunk and add\n    chunked = chunker.split_documents(all_docs)\n    vector_store.add_documents(chunked)\n    vector_store.save_index()\n    \n    print(f'\n\u2705 Added {len(chunked)} chunks from {len(all_docs)} URLs')\n    return len(chunked)\n\n# Example - add your own URLs:\n# add_custom_urls([\n#     'https://your-favorite-ml-blog.com/article',\n#     'https://another-resource.com/tutorial'\n# ])

## 9. Initialize RAG System

In [21]:
# Try to load existing index
if not vector_store.load_index():
    print("‚ö†Ô∏è No existing index found. Build the knowledge base first (Section 8).")
else:
    print(f"üìä Index loaded with {vector_store.get_document_count()} documents")

‚úÖ Loaded index with 33698 documents
üìä Index loaded with 33698 documents


In [22]:
# Initialize RAG chain
rag_chain = None

if vector_store.is_loaded and config.groq_api_key:
    retriever = vector_store.get_retriever({"k": config.top_k_results})
    rag_chain = RAGChain(
        llm_model=config.llm_model,
        retriever=retriever,
        groq_api_key=config.groq_api_key
    )
    print("‚úÖ RAG chain initialized!")
else:
    if not vector_store.is_loaded:
        print("‚ö†Ô∏è Vector store not loaded. Build knowledge base first.")
    if not config.groq_api_key:
        print("‚ö†Ô∏è Groq API key not set. Please set it in Section 2.")

‚úÖ RAG chain initialized!


## 10. Query the RAG System

In [23]:
def ask(question: str, session_id: str = "default") -> str:
    """Ask a question to the RAG system."""
    if rag_chain is None:
        return "‚ùå RAG system not initialized."

    print(f"ü§î Question: {question}\n")
    response = rag_chain.query(question, session_id)

    print(f"üí° Answer:\n{response.answer}\n")
    if response.sources:
        print(f"üìö Sources: {', '.join(response.sources)}")

    return response.answer

# Example:
ask("What is machine learning?")

ü§î Question: What is machine learning?

üí° Answer:
According to Tom Mitchell (1998), Machine Learning is the study of algorithms that:

1. Improve their performance P
2. At some task T
3. With experience E.
A well-defined learning task is given by <P, T, E>.

This definition is supported by the concept that "Learning is any process by which a system improves performance from experience" as stated by Herbert Simon.

In simpler terms, machine learning can be understood as a collection of algorithms and techniques to automate data analysis and apply learnings from that analysis to the autonomous execution of relevant tasks.

Source:
- Tom Mitchell (1998)
- Herbert Simon 
- "Some Studies in Machine Learning Using the Game of Checkers" by Arthur L. Samuel (1959)

üìö Sources: https://www.ibm.com/think/topics/machine-learning, Introduction ML.pdf, machine learning and algo.pdf, Python_Machine_Learning_Sebastian_Raschka.pdf


'According to Tom Mitchell (1998), Machine Learning is the study of algorithms that:\n\n1. Improve their performance P\n2. At some task T\n3. With experience E.\nA well-defined learning task is given by <P, T, E>.\n\nThis definition is supported by the concept that "Learning is any process by which a system improves performance from experience" as stated by Herbert Simon.\n\nIn simpler terms, machine learning can be understood as a collection of algorithms and techniques to automate data analysis and apply learnings from that analysis to the autonomous execution of relevant tasks.\n\nSource:\n- Tom Mitchell (1998)\n- Herbert Simon \n- "Some Studies in Machine Learning Using the Game of Checkers" by Arthur L. Samuel (1959)'

In [None]:
def chat():
    """Interactive chat with the RAG system."""
    print("ü§ñ ML RAG Assistant (type 'quit' to exit)\n")
    session_id = "interactive"

    while True:
        question = input("You: ").strip()
        if question.lower() in ['quit', 'exit', 'q']:
            print("üëã Goodbye!")
            break
        if not question:
            continue
        ask(question, session_id)
        print()

# Uncomment to start:
chat()

ü§ñ ML RAG Assistant (type 'quit' to exit)

You: What is ML?
ü§î Question: What is ML?

üí° Answer:
According to the provided context, Machine Learning (ML) is a type of program that can learn to perform a task given examples of data, such as a spam filter that can learn to flag spam emails based on examples of spam and non-spam emails. 

In general, ML involves a task (T), experience (E), and a performance measure (P). The system uses a training set, which is a collection of training instances or samples, to learn and improve its performance. 

As noted in the context, simply having a large amount of data does not constitute ML; the data must be used to learn and improve performance on a specific task. (Source: Provided context, no specific external source cited as the information is from the given context)

üìö Sources: Big Data Analytics  Algorithms - Descriptive Analytics.pdf, Introduction ML.pdf

You: what is America?
ü§î Question: what is America?

üí° Answer:
America can r

KeyboardInterrupt: Interrupted by user

## 10.1 Voice Features - Ask with Voice & Listen to Answers

In [27]:
# Initialize TTS for reading answers aloud
from IPython.display import Audio, display

tts_engine = None

def init_tts():
    """Initialize the Text-to-Speech engine."""
    global tts_engine
    if tts_engine is None:
        print('üîÑ Loading Text-to-Speech model...')
        tts_engine = TextToSpeech()
        _ = tts_engine.pipe
        print('‚úÖ TTS ready!')
    return tts_engine

def ask_and_speak(question: str, session_id: str = 'voice') -> str:
    """Ask a question and read the answer aloud."""
    if rag_chain is None:
        print('‚ùå RAG system not initialized.')
        return ''
    print(f'üßê Question: {question}\n')
    response = rag_chain.query(question, session_id)
    answer = response.answer
    print(f'üí° Answer:\n{answer}\n')
    if response.sources:
        print(f'üìö Sources: {", ".join(response.sources)}')
    print('\nüîä Converting to speech...')
    tts = init_tts()
    speak_text = answer[:500] + '...' if len(answer) > 500 else answer
    try:
        audio_path = tts.synthesize(speak_text, 'answer_audio.wav')
        print('‚úÖ Audio generated!')
        display(Audio(audio_path, autoplay=True))
    except Exception as e:
        print(f'‚ö†Ô∏è TTS error: {e}')
        print('Answer displayed above (audio unavailable)')
    return answer

print('‚úÖ Voice functions ready!')
print('üí° Use: ask_and_speak("What is machine learning?") to get spoken answers')


‚úÖ Voice functions ready!
üí° Use: ask_and_speak("What is machine learning?") to get spoken answers


In [28]:
#üé§ VOICE INPUT - Record your question
# This cell lets you record audio and ask questions by voice

from google.colab import output
from base64 import b64decode
import numpy as np
import scipy.io.wavfile as wav

stt_engine = None

def init_stt():
    """Initialize Speech-to-Text engine."""
    global stt_engine
    if stt_engine is None:
        print('üîÑ Loading Speech-to-Text model...')
        stt_engine = SpeechToText()
        _ = stt_engine.pipe
        print('‚úÖ STT ready!')
    return stt_engine

def record_and_ask():
    """Record audio from microphone and ask the RAG system."""

    RECORD_JS = """
    const sleep = time => new Promise(resolve => setTimeout(resolve, time));
    const b2text = blob => new Promise(resolve => {
      const reader = new FileReader();
      reader.onloadend = e => resolve(e.srcElement.result);
      reader.readAsDataURL(blob);
    });

    var record = time => new Promise(async resolve => {
      const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
      const recorder = new MediaRecorder(stream);
      const chunks = [];
      recorder.ondataavailable = e => chunks.push(e.data);
      recorder.start();
      await sleep(time);
      recorder.stop();
      await sleep(100);
      const blob = new Blob(chunks, { type: 'audio/webm' });
      const text = await b2text(blob);
      resolve(text);
      stream.getTracks().forEach(track => track.stop());
    });
    """

    print('üé§ Recording for 5 seconds... Speak now!')
    display(output.eval_js('record(5000)'))

    print('‚ö†Ô∏è For voice input, use the Gradio interface (Section 11) which has built-in mic support')

def voice_ask_from_file(audio_path: str):
    """Transcribe an audio file and ask the RAG system."""
    if rag_chain is None:
        print('‚ùå RAG system not initialized.')
        return

    # Transcribe
    print('üé§ Transcribing audio...')
    stt = init_stt()
    question = stt.transcribe(audio_path)
    print(f'üìù You said: {question}')

    # Ask and speak
    return ask_and_speak(question)

print('‚úÖ Voice input functions ready!')
print('üí° Upload an audio file and use: voice_ask_from_file("your_audio.wav")')


‚úÖ Voice input functions ready!
üí° Upload an audio file and use: voice_ask_from_file("your_audio.wav")


In [29]:
# üîä TRY IT - Ask a question and hear the answer!
# Uncomment and run:

ask_and_speak('What is supervised learning?')

üßê Question: What is supervised learning?

üí° Answer:
Supervised learning is a type of machine learning paradigm where an algorithm learns to map input data to a specific output based on example input-output pairs. This process involves training a statistical model using labeled data, meaning each piece of input data is provided with the correct output. The goal of supervised learning is for the trained model to accurately predict the output for new, unseen data. (Source: Wikipedia, [1])

In simpler terms, supervised learning works like a teacher or supervisor guiding the machine, where the machine is trained using labeled data (correct answers or classifications) and then uses this training to produce a correct outcome for new, unseen data. (Source: Provided context)

Supervised learning requires labeled data sets, and it is commonly used for regression and classification models. (Source: Provided context)

Example: If you want a model to identify cats in images, supervised learni

preprocessor_config.json:   0%|          | 0.00/433 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/232 [00:00<?, ?B/s]

spm_char.model:   0%|          | 0.00/238k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/40.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/234 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/585M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/585M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/636 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/50.7M [00:00<?, ?B/s]

‚úÖ TTS model loaded
‚úÖ TTS ready!


model.safetensors:   0%|          | 0.00/50.6M [00:00<?, ?B/s]

‚úÖ Audio generated!


'Supervised learning is a type of machine learning paradigm where an algorithm learns to map input data to a specific output based on example input-output pairs. This process involves training a statistical model using labeled data, meaning each piece of input data is provided with the correct output. The goal of supervised learning is for the trained model to accurately predict the output for new, unseen data. (Source: Wikipedia, [1])\n\nIn simpler terms, supervised learning works like a teacher or supervisor guiding the machine, where the machine is trained using labeled data (correct answers or classifications) and then uses this training to produce a correct outcome for new, unseen data. (Source: Provided context)\n\nSupervised learning requires labeled data sets, and it is commonly used for regression and classification models. (Source: Provided context)\n\nExample: If you want a model to identify cats in images, supervised learning would involve feeding it many images of cats (in

## 11. Gradio Web Interface

In [30]:
import gradio as gr

# Text-only chat function
def gradio_query(question, history):
    if rag_chain is None:
        return "‚ùå RAG system not initialized."
    response = rag_chain.query(question, "gradio_session")
    answer = response.answer
    if response.sources:
        answer += f"\n\nüìö Sources: {', '.join(response.sources)}"
    return answer

# Voice-enabled query function
def gradio_voice_query(audio_input, text_input, enable_tts):
    if rag_chain is None:
        return "‚ùå RAG system not initialized.", None

    question = text_input

    # If audio provided, transcribe it
    if audio_input is not None:
        try:
            stt = init_stt()
            question = stt.transcribe(audio_input)
        except Exception as e:
            return f"‚ùå Transcription error: {e}", None

    if not question or not question.strip():
        return "‚ö†Ô∏è Please provide a question (text or voice)", None

    # Get answer
    response = rag_chain.query(question, "gradio_voice")
    answer = f"üìù Question: {question}\n\nüí° Answer:\n{response.answer}"
    if response.sources:
        answer += f"\n\nüìö Sources: {', '.join(response.sources)}"

    # Generate audio if TTS enabled
    audio_output = None
    if enable_tts:
        try:
            tts = init_tts()
            speak_text = response.answer[:500] + '...' if len(response.answer) > 500 else response.answer
            audio_output = tts.synthesize(speak_text, 'gradio_response.wav')
        except Exception as e:
            answer += f"\n\n‚ö†Ô∏è TTS error: {e}"

    return answer, audio_output

# Simple text chat interface
demo_text = gr.ChatInterface(
    fn=gradio_query,
    title="ü§ñ ML RAG Assistant (Text)",
    description="Ask questions about machine learning!",
    examples=["What is machine learning?", "Explain neural networks", "What is gradient descent?"],
    theme="soft"
)

# Voice-enabled interface
with gr.Blocks(title="üé§ ML RAG Voice Assistant", theme="soft") as demo_voice:
    gr.Markdown("# üé§ ML RAG Voice Assistant")
    gr.Markdown("Ask questions by voice or text, and optionally hear the answers!")

    with gr.Row():
        with gr.Column():
            audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath", label="üé§ Record or upload your question")
            text_input = gr.Textbox(label="‚å®Ô∏è Or type your question", placeholder="What is deep learning?")
            enable_tts = gr.Checkbox(label="üîä Read answer aloud", value=True)
            submit_btn = gr.Button("üöÄ Ask", variant="primary")

        with gr.Column():
            text_output = gr.Textbox(label="üí° Answer", lines=10)
            audio_output = gr.Audio(label="üîä Audio Response", type="filepath")

    submit_btn.click(
        fn=gradio_voice_query,
        inputs=[audio_input, text_input, enable_tts],
        outputs=[text_output, audio_output]
    )

    gr.Examples(
        examples=[[None, "What is machine learning?", True], [None, "Explain backpropagation", True]],
        inputs=[audio_input, text_input, enable_tts]
    )

print("‚úÖ Gradio interfaces created!")
print("üí° For text chat: demo_text.launch(share=True)")
print("üé§ For voice chat: demo_voice.launch(share=True)")
demo_voice.launch(share=True)

  self.chatbot = Chatbot(
  with gr.Blocks(title="üé§ ML RAG Voice Assistant", theme="soft") as demo_voice:


‚úÖ Gradio interfaces created!
üí° For text chat: demo_text.launch(share=True)
üé§ For voice chat: demo_voice.launch(share=True)
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://5d071157fcc30fa947.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




## 12. FastAPI REST API

In [None]:
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import nest_asyncio
nest_asyncio.apply()

class QueryRequest(BaseModel):
    question: str
    session_id: str = "default"

class QueryResponse(BaseModel):
    answer: str
    sources: List[str]

class HealthResponse(BaseModel):
    status: str
    document_count: int
    index_loaded: bool

class UploadResponse(BaseModel):
    message: str
    filename: str
    chunks_added: int

app = FastAPI(title="ML RAG System API", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/health", response_model=HealthResponse)
async def health_check():
    return HealthResponse(
        status="healthy" if vector_store.is_loaded else "no_index",
        document_count=vector_store.get_document_count(),
        index_loaded=vector_store.is_loaded
    )

@app.post("/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest):
    if rag_chain is None:
        raise HTTPException(status_code=503, detail="RAG system not initialized")
    if not request.question.strip():
        raise HTTPException(status_code=422, detail="Question cannot be empty")
    response = rag_chain.query(request.question, request.session_id)
    return QueryResponse(answer=response.answer, sources=response.sources)

@app.post("/upload", response_model=UploadResponse)
async def upload_pdf_endpoint(file: UploadFile = File(...)):
    if not file.filename.endswith('.pdf'):
        raise HTTPException(status_code=400, detail="Only PDF files supported")
    content = await file.read()
    docs = PDFProcessor.load_pdf_bytes(content, file.filename)
    if not docs:
        raise HTTPException(status_code=400, detail="Could not extract text")
    chunked = chunker.split_documents(docs)
    count = vector_store.add_documents(chunked)
    vector_store.save_index()
    return UploadResponse(message="Success", filename=file.filename, chunks_added=count)

@app.post("/voice-query")
async def voice_query_endpoint(audio: UploadFile = File(...), session_id: str = "voice"):
    if rag_chain is None:
        raise HTTPException(status_code=503, detail="RAG system not initialized")
    audio_path = f"temp_audio_{session_id}.wav"
    content = await audio.read()
    with open(audio_path, "wb") as f:
        f.write(content)
    try:
        handler = VoiceRAGHandler(rag_chain)
        response = handler.process_voice_query(audio_path, session_id)
        return {"text_response": response.text_response, "sources": response.sources}
    finally:
        if os.path.exists(audio_path):
            os.remove(audio_path)

@app.delete("/session/{session_id}")
async def clear_session(session_id: str):
    if rag_chain:
        rag_chain.clear_session(session_id)
    return {"message": f"Session {session_id} cleared"}

@app.post("/upload-image")
async def upload_image_endpoint(file: UploadFile = File(...)):
    """Upload and process an image file."""
    if not ImageProcessor.is_supported(file.filename):
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported format. Supported: {', '.join(ImageProcessor.SUPPORTED_FORMATS)}"
        )

    content = await file.read()
    docs = ImageProcessor.load_image_bytes(content, file.filename)

    if not docs:
        raise HTTPException(status_code=400, detail="Could not extract text from image")

    chunked = chunker.split_documents(docs)
    count = vector_store.add_documents(chunked)
    vector_store.save_index()

    return UploadResponse(message="Success", filename=file.filename, chunks_added=count)

print("""
=============================================================================
INSTRUCTIONS FOR UPDATING YOUR NOTEBOOK
=============================================================================

1. INSTALLATION CELL:
   - Find the cell with "!pip install -q pypdf"
   - Replace "pypdf" with "PyMuPDF"
   - Add "!pip install -q Pillow" for image processing

2. DOCUMENT INGESTION CELL (Section 5):
   - Replace the entire cell with the code in INGESTION_CELL above
   - This adds PyMuPDF extraction, OCR fallback, and ImageProcessor

3. UPLOAD FUNCTION (Section 8.1):
   - Replace the upload_and_process_pdfs() function with upload_and_process_files()
   - This now supports both PDFs and images

4. API ENDPOINT (Section 12):
   - Add the /upload-image endpoint for image uploads

Key Changes:
- PyMuPDF (fitz) is now the primary PDF extractor (faster, no warnings)
- OCR fallback using TrOCR when text extraction fails or is insufficient
- New ImageProcessor class for processing uploaded images
- Unified upload function that handles both PDFs and images
=============================================================================
""")
print("‚úÖ FastAPI app created")

‚úÖ FastAPI app created


In [32]:
import uvicorn
import threading

def run_api(port: int = 8000, use_ngrok: bool = True):
    """Run the FastAPI server."""
    if use_ngrok:
        try:
            from pyngrok import ngrok
            public_url = ngrok.connect(port)
            print(f"üåê Public URL: {public_url}")
        except Exception as e:
            print(f"‚ö†Ô∏è ngrok not available: {e}")

    print(f"üöÄ Starting API on port {port}...")
    uvicorn.run(app, host="0.0.0.0", port=port)

# Run API: run_api(port=8000, use_ngrok=True)

## 13. Upload Your Own PDFs

In [None]:
def upload_pdf(pdf_path: str) -> int:
    """Upload and process a PDF file."""
    path = Path(pdf_path)
    if not path.exists():
        print(f"‚ùå File not found: {pdf_path}")
        return 0

    print(f"üìÑ Processing {path.name}...")
    docs = PDFProcessor.load_pdf(path)

    if not docs:
        print("‚ùå No text extracted")
        return 0

    chunked = chunker.split_documents(docs)
    count = vector_store.add_documents(chunked)
    vector_store.save_index()

    print(f"‚úÖ Added {count} chunks from {path.name}")
    return count

# Example: upload_pdf("/path/to/your/file.pdf")

In [None]:
# For Google Colab - upload files interactively
try:
    from google.colab import files

    def upload_from_colab():
        uploaded = files.upload()
        for filename, content in uploaded.items():
            if filename.endswith('.pdf'):
                docs = PDFProcessor.load_pdf_bytes(content, filename)
                if docs:
                    chunked = chunker.split_documents(docs)
                    count = vector_store.add_documents(chunked)
                    vector_store.save_index()
                    print(f"‚úÖ Added {count} chunks from {filename}")

    print("üì§ Run upload_from_colab() to upload PDFs")
except ImportError:
    print("‚ÑπÔ∏è Not in Colab - use upload_pdf() instead")

## 14. Quick Start Guide

### Steps:
1. **Set API Key**: Enter your Groq API key in Section 2
2. **Build Knowledge Base**: Run Section 8 to download papers and build the index
3. **Initialize RAG**: Run Section 9 to load the index and create the RAG chain
4. **Ask Questions**: Use `ask()` function, Gradio interface, or REST API

### Get Free API Keys:
- **Groq**: https://console.groq.com (free tier available)
- **HuggingFace**: https://huggingface.co/settings/tokens (optional, for voice)

### API Endpoints:
- `GET /health` - Health check
- `POST /query` - Text query (JSON: {question, session_id})
- `POST /upload` - Upload PDF
- `POST /voice-query` - Voice query
- `DELETE /session/{id}` - Clear session

### Requirements Covered:
- ‚úÖ arXiv paper ingestion (Req 1.1-1.4)
- ‚úÖ PDF processing (Req 2.1-2.4)
- ‚úÖ Web scraping (Req 3.1-3.4)
- ‚úÖ Text chat with RAG (Req 4.1-4.4)
- ‚úÖ Voice chat (Req 5.1-5.5)
- ‚úÖ REST API (Req 6.1-6.5)
- ‚úÖ Vector store management (Req 7.1-7.4)
- ‚úÖ Configuration management (Req 8.1-8.3)