In [1]:
# @title 1 - INSTALL LIBRARIES
# %pip install fastapi uvicorn python-multipart nest-asyncio langchain-google-genai langchain-community langchain-text-splitters langchain-core sentence-transformers faiss-cpu pymupdf requests beautifulsoup4 numpy torch


In [2]:
# @title MASTER IMPORTS (Unified: Windows 11 + Frontend + RAG Logic)

# --- 1. Standard Library ---
import os
import re
import time
import logging
import functools  # Critical: For @functools.lru_cache (Model caching)
import warnings   # Critical: To suppress warnings
import shutil
import uuid
import tempfile
import threading
import subprocess 
import gc         # Critical: For RAM 'Kill Switch' / Reset functionality
from typing import List, Tuple, Optional, Union, Any, Dict, Set # Critical: 'Set' & 'Optional' were missing
from pathlib import Path
from contextlib import asynccontextmanager

# --- 2. Networking & Async ---
import nest_asyncio
import uvicorn
import requests
import concurrent.futures
from urllib.parse import urljoin, urlparse, urldefrag
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry # Critical: For robust web scraping

# --- 3. Data Processing & ML ---
import numpy as np
import faiss
import fitz  # PyMuPDF
import torch
from bs4 import BeautifulSoup

# --- 4. FastAPI Server ---
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware # Critical: For Frontend connection
from fastapi.concurrency import run_in_threadpool

# --- 5. LangChain & Embeddings ---
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import SystemMessage, HumanMessage
from sentence_transformers import SentenceTransformer



  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# @title 3 -  Suppress warnings, Type Definitions, Constants


# Apply Nest Asyncio for Colab/Jupyter compatibility
nest_asyncio.apply()

# Suppress warnings for cleaner logs
warnings.filterwarnings("ignore")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("MultiRAG-Kernel")

# --- Type Definitions ---
VectorIndex = Any
Embeddings = np.ndarray
Chunks = List[str]
RAGSession = Tuple[VectorIndex, Embeddings, Chunks]

# --- Constants ---
EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
EMBED_DIMENSION = 384
CHUNK_SIZE = 512
CHUNK_OVERLAP = 64
LLM_MODEL = "gemini-2.5-flash"
MAX_CRAWL_PAGES = 200
MAX_WORKERS = 15

In [4]:
# @title 4 - Load Embedding Model & GEMINI_API_KEY

# ==============================================================================
# üß† MODEL SINGLETONS
# ==============================================================================
import dotenv

dotenv.load_dotenv()
@functools.lru_cache(maxsize=1)
def get_embedding_model() -> SentenceTransformer:
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    logger.info(f"‚ö° Loading Embedding Model on: {device}")
    return SentenceTransformer(EMBED_MODEL_NAME, device=device)

@functools.lru_cache(maxsize=1)
def get_llm() -> ChatGoogleGenerativeAI:
    try:
        api_key = os.environ.get("GEMINI_API_KEY")
    except:
        raise ValueError("‚ùå GEMINI_API_KEY not found! Set it in Colab Secrets or Environment.")

    os.environ["GEMINI_API_KEY"] = api_key
    return ChatGoogleGenerativeAI(
        model=LLM_MODEL,
        temperature=0.3,
        convert_system_message_to_human=True
    )




In [5]:
# @title 5 - üßπ TEXT PROCESSING & CLEANING UTILS

# ==============================================================================
# üßπ UNIFIED TEXT PROCESSOR (Restoring User's Specific Cleaning Logic)
# ==============================================================================

class TextProcessor:
    """
    Consolidated logic from the user's 'TextCleaner' and '_clean_text_fast'.
    """
    def __init__(self):
        # 1. Collapse multiple spaces/tabs into one space
        self.multi_space = re.compile(r'[ \t]+')
        # 2. Collapse 3+ newlines into 2 (preserves paragraph structure)
        self.multi_newline = re.compile(r'\n{3,}')
        # 3. Remove empty brackets often left behind by removed links/citations
        self.empty_brackets = re.compile(r'\[\s*\]')
        # 4. Critical Filters (Fast Fail)
        self.error_404 = re.compile(r"404: This page could not be found", re.IGNORECASE)

    def clean(self, text: str) -> Optional[str]:
        if not text:
            return None

        # Fast Fail: Check for 404s immediately
        if self.error_404.search(text):
            return None

        # 1. Normalize Unicode non-breaking spaces
        cleaned_text = text.replace('\xa0', ' ')
        # 2. Remove empty brackets
        cleaned_text = self.empty_brackets.sub("", cleaned_text)
        # 3. Collapse multiple spaces within lines
        cleaned_text = self.multi_space.sub(" ", cleaned_text)
        # 4. Collapse excessive newlines
        cleaned_text = self.multi_newline.sub("\n\n", cleaned_text)
        # 5. Filter for printable ASCII (keep newlines)
        cleaned_text = "".join(c for c in cleaned_text if c.isprintable() or c == '\n')

        cleaned_text = cleaned_text.strip()

        # Quality Gate
        if len(cleaned_text) < 50:
            return None

        return cleaned_text

# Global singleton for processor
text_processor = TextProcessor()

In [6]:
# @title 6 - Extracts text from PDF

def process_pdf(path: str) -> Optional[str]:
    """
    Extracts text from PDF using PyMuPDF (fitz).
    Iterates safely page-by-page to handle corrupt PDFs.
    """
    try:
        doc = fitz.open(path)
        text_parts = []

        # Safe iteration to skip bad pages (fixes 'object out of range' error)
        for i in range(len(doc)):
            try:
                page = doc.load_page(i)
                text_parts.append(page.get_text("text", sort=True))
            except Exception as e:
                logger.warning(f"Skipping corrupt page {i} in {path}: {e}")
                continue

        if not text_parts:
            logger.warning(f"No readable text found in {path}")
            return None

        full_text = "\n".join(text_parts)
        return text_processor.clean(full_text)
    except Exception as e:
        logger.error(f"PDF processing failed: {e}")
        return None

In [7]:
# @title 7 - Extracts meaningful content from a URL

# ==============================================================================
# üï∏Ô∏è ADVANCED PIPELINE
# ==============================================================================

class WebPipeline:

    def __init__(self, base_url: str):
        self.base_url = base_url
        if not self.base_url.endswith('/') and len(urlparse(self.base_url).path) <= 1:
            self.base_url += '/'

        self.max_urls = MAX_CRAWL_PAGES
        self.max_workers = MAX_WORKERS
        self.visited_urls = set()
        self.lock = threading.Lock()

        # --- USER'S FILTER LOGIC ---
        self.ignored_extensions = {
            '.jpg', '.jpeg', '.png', '.gif', '.svg', '.bmp', '.webp',
            '.mp4', '.mp3', '.avi', '.mov', '.pdf', '.doc', '.docx',
            '.zip', '.rar', '.exe', '.css', '.js', '.json', '.xml', '.ico'
        }
        self.ignored_keywords = [
            'login', 'signin', 'sign-up', 'signup', 'register',
            'privacy', 'terms', 'logout', 'ads', 'subscribe', 'account'
        ]

        # --- USER'S CLEANING TAGS ---
        self.garbage_tags = {
            'script', 'style', 'noscript', 'svg', 'header', 'footer',
            'nav', 'aside', 'form', 'iframe', 'button', 'input',
            'select', 'textarea', 'meta', 'link'
        }

        # --- USER'S SESSION SETUP ---
        self.session = requests.Session()
        adapter = HTTPAdapter(pool_connections=self.max_workers, pool_maxsize=self.max_workers)
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)
        self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})

    def is_valid(self, url: str) -> bool:
        if not url.startswith(self.base_url): return False
        parsed = urlparse(url)
        path = parsed.path.lower()
        if any(path.endswith(ext) for ext in self.ignored_extensions): return False
        url_lower = url.lower()
        if any(keyword in url_lower for keyword in self.ignored_keywords): return False
        return True

    def fetch_links(self, url: str) -> List[str]:
        try:
            response = self.session.get(url, timeout=5)
            if 'text/html' not in response.headers.get('Content-Type', '').lower():
                return []
            soup = BeautifulSoup(response.content, "html.parser")
            found_links = []
            for link in soup.find_all('a', href=True):
                absolute_url = urljoin(url, link['href'])
                clean_url, _ = urldefrag(absolute_url)
                if clean_url.endswith('/'): clean_url = clean_url[:-1]
                if self.is_valid(clean_url): found_links.append(clean_url)
            return found_links
        except Exception:
            return []

    def crawl(self) -> Set[str]:
        """Phase 1: Discovery"""
        logger.info(f"üî• Phase 1: Crawling {self.base_url}")
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {executor.submit(self.fetch_links, self.base_url): self.base_url}
            self.visited_urls.add(self.base_url)
            while futures:
                done, _ = concurrent.futures.wait(futures.keys(), return_when=concurrent.futures.FIRST_COMPLETED)
                for future in done:
                    futures.pop(future)
                    try:
                        new_links = future.result()
                        for link in new_links:
                            with self.lock:
                                if len(self.visited_urls) >= self.max_urls: break
                                if link not in self.visited_urls:
                                    self.visited_urls.add(link)
                                    futures[executor.submit(self.fetch_links, link)] = link
                    except Exception: pass
                with self.lock:
                    if len(self.visited_urls) >= self.max_urls:
                        for f in futures: f.cancel()
                        break
        logger.info(f"‚úÖ Found {len(self.visited_urls)} URLs.")
        return self.visited_urls

    def extract_and_clean(self, html_content: bytes, url: str) -> Optional[str]:
        """Phase 2 Logic: DOM Cleaning + TextProcessing"""
        if not html_content: return None
        soup = BeautifulSoup(html_content, "html.parser")

        # 1. Structural Cleaning (DOM Level)
        for tag in soup.find_all(self.garbage_tags):
            tag.decompose()

        raw_text = soup.get_text(separator='\n')

        # 2. Semantic Cleaning via Unified Processor
        final_text = text_processor.clean(raw_text)

        if final_text:
            return f"Source: {url}\n\n{final_text}"
        return None

    def scrape_contents(self, urls: Set[str]) -> List[str]:
        """Downloads and cleans content."""
        logger.info(f"üî• Phase 2: Extracting content from {len(urls)} URLs")
        results = []

        def process(url):
            try:
                resp = self.session.get(url, timeout=10)
                if 'text/html' not in resp.headers.get('Content-Type', '').lower(): return None
                return self.extract_and_clean(resp.content, url)
            except: return None

        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_url = {executor.submit(process, url): url for url in urls}
            for future in concurrent.futures.as_completed(future_to_url):
                data = future.result()
                if data: results.append(data)

        logger.info(f"‚úÖ Extracted valid text from {len(results)} pages.")
        return results


In [8]:
def process_url_pipeline(url: str) -> str:
    """Orchestrates the WebPipeline and returns a single massive string."""
    pipeline = WebPipeline(base_url=url)
    urls = pipeline.crawl()
    docs = pipeline.scrape_contents(urls)
    if not docs: return ""
    return "\n\n=== NEW PAGE ===\n\n".join(docs)

In [9]:
# @title 10 - Ingestion Pipeline: Split -> Embed -> Index

def build_index(text: str) -> RAGSession:
    """
    Ingestion Pipeline: Split -> Embed -> Index.

    Args:
        text: The full corpus text.

    Returns:
        Tuple (FAISS Index, Embeddings Array, Chunk List)

    >>> idx, embs, chks = build_index("Test content for the RAG engine.")
    >>> len(chks) > 0
    True
    """
    t0 = time.perf_counter()

    # 1. Chunking
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
        separators=["\n\n=== NEW PAGE ===\n\n", "\n\n", "\n", ".", " ",""]
    )
    chunks = splitter.split_text(text)
    if not chunks:
        raise ValueError("Text splitting resulted in empty chunks.")

    # 2. Embedding (Batched automatically by SentenceTransformers)
    model = get_embedding_model()
    # normalize_embeddings=True allows us to use Inner Product for Cosine Similarity
    embeddings = model.encode(chunks, convert_to_numpy=True, normalize_embeddings=True)

    # 3. Indexing (IndexFlatIP is faster than L2 for normalized vectors)
    # Using Inner Product (MetricType.METRIC_INNER_PRODUCT)
    d = embeddings.shape[1]
    index = faiss.IndexFlatIP(d)
    index.add(embeddings)

    logger.info(f"Built index with {len(chunks)} chunks in {time.perf_counter() - t0:.4f}s")
    return index, embeddings, chunks

In [10]:
# @title 11 - Retrieves context and prompts the LLM

def generate_answer(query: str, session_data: tuple) -> str: # Changed 'index' to 'session_data' for clarity
    """
    Retrieves context and prompts the LLM.

    Args:
        query: User question.
        session_data: Tuple containing (FAISS index, Embeddings, Chunks).
    """
    try:
        index, _, chunks = session_data
    except ValueError:
        return "Error: Session data is corrupted or invalid."

    if not query.strip():
        return "Please ask a question."

    # 1. Embed Query
    model = get_embedding_model()
    # Normalize query for Cosine Similarity via Inner Product
    q_embed = model.encode([query], convert_to_numpy=True, normalize_embeddings=True)

     # 2. Retrieve (k=5)
    k = min(5, len(chunks)) # Ensure k doesn't exceed chunks
    if k == 0:
        return "No content available to answer from."
    D, I = index.search(q_embed, k)

    # 3. Context Construction
    retrieved_context = []
    for idx in I[0]:
        if idx < len(chunks):
            retrieved_context.append(chunks[idx])

    context_str = "\n---\n".join(retrieved_context)


    # 4. Prompt Engineering
    system_instruction = (
              "You are a precision AI assistant. Use only the provided context to answer the user's question. "
              "If the answer is not in the context, say 'I cannot find that information in the provided documents.' "
              "Avoid hallucination. Keep answers concise and technical."
    )

    full_prompt = (
        f"Context:\n{context_str}\n\n"
        f"Question: {query}\n\n"
        f"Answer:"
    )

    try:
        llm = get_llm()
        # Passing system instruction via constructing a messages list for Chat models
        # But ChatGoogleGenerativeAI handles prompt strings well usually.
        # For precision, we use invoke.
        from langchain_core.messages import SystemMessage, HumanMessage

        response = llm.invoke([
            SystemMessage(content=system_instruction),
            HumanMessage(content=full_prompt)
        ])
        return response.content
    except Exception as e:
        logger.error(f"LLM Generation failed: {e}")
        return f"Error generating response: {str(e)}"

In [None]:
# @title 12 - FASTAPI SERVER (Windows 11 & Frontend Optimized)

# ------------------------------------------------------------------------------
# 3. FASTAPI SERVER
# ------------------------------------------------------------------------------


PORT = 8000
HOST = "127.0.0.1"
session_store: Dict[str, RAGSession] = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    print(f"üöÄ Server starting at http://{HOST}:{PORT}")
    yield
    print("üõë Server shutting down")
    session_store.clear()
    gc.collect() # Clean RAM on shutdown

app = FastAPI(lifespan=lifespan)

# --- 1. ENABLE CORS (Required for your HTML/JS to connect) ---
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows local index.html to connect
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/")
def health_check():
    return {"status": "running", "gpu": torch.cuda.is_available(), "sessions": len(session_store)}

# --- 2. MATCHED FRONTEND ENDPOINT: /upload (was /ingest/file) ---
@app.post("/upload")
async def ingest_file(file: UploadFile = File(...)):
    session_id = str(uuid.uuid4())
    suffix = Path(file.filename).suffix.lower()

    # Save to temp file (ROM)
    with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
        shutil.copyfileobj(file.file, tmp)
        tmp_path = tmp.name

    try:
        text = None
        # Using your existing functions
        if suffix == ".pdf":
            text = await run_in_threadpool(process_pdf, tmp_path)
        elif suffix in [".txt", ".md", ".py", ".json", ".csv"]:
            with open(tmp_path, 'r', encoding='utf-8', errors='ignore') as f:
                raw_text = f.read()
            text = text_processor.clean(raw_text)
        else:
             raise HTTPException(status_code=400, detail=f"Unsupported file type: {suffix}")

        if not text or len(text.strip()) == 0:
            raise HTTPException(status_code=400, detail="Could not extract text.")

        # Build Index
        index, emb, chunks = await run_in_threadpool(build_index, text)
        session_store[session_id] = (index, emb, chunks)

        # MATCH FRONTEND JSON FORMAT
        return {
            "session_id": session_id,
            "status": "indexed",
            "token_count": len(chunks) * 100 # Approx token count for display
        }

    except Exception as e:
        logger.error(f"File ingestion failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        # ROM CLEANUP: Delete temp file immediately
        if os.path.exists(tmp_path):
            os.remove(tmp_path)

# --- 3. MATCHED FRONTEND ENDPOINT: /scrape (was /ingest/url) ---
@app.post("/scrape")
async def ingest_url(url: str = Form(..., alias="url")): # accepts 'url' from JSON
    session_id = str(uuid.uuid4())
    try:
        # Using your existing function
        text = await run_in_threadpool(process_url_pipeline, url)

        if not text or len(text.strip()) == 0:
             raise HTTPException(status_code=400, detail="No content found at this URL.")

        index, emb, chunks = await run_in_threadpool(build_index, text)
        session_store[session_id] = (index, emb, chunks)
        
        # MATCH FRONTEND JSON FORMAT
        return {
            "session_id": session_id, 
            "status": "indexed", 
            "token_count": len(chunks) * 100
        }
    except Exception as e:
        logger.error(f"URL Pipeline failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

# --- 4. MATCHED FRONTEND ENDPOINT: /chat ---
@app.post("/chat")
async def chat(payload: dict):
    # Frontend sends JSON: { "session_id": "...", "query": "..." }
    session_id = payload.get("session_id")
    query = payload.get("query")
    
    if session_id not in session_store:
        raise HTTPException(status_code=404, detail="Session not found. Ingest data first.")

    # Using your existing function
    answer = await run_in_threadpool(generate_answer, query, session_store[session_id])
    
    # MATCH FRONTEND KEY: 'response' instead of 'answer'
    return {"response": answer}

# --- 5. NEW ENDPOINT: /reset (RAM CLEANER) ---
@app.post("/reset")
def reset_memory():
    """
    Aggressive Memory Clearing:
    1. Clears Session Store
    2. Clears Model Caches (LRU)
    3. Forces Garbage Collection
    4. Empties GPU Cache
    """
    global session_store
    # 1. Clear Data
    session_store.clear()
    # 2. Clear Model Caches (Crucial for freeing RAM)
    get_embedding_model.cache_clear()
    get_llm.cache_clear()
    # 3. Force Python GC
    gc.collect() 
    # 4. Force GPU Clear
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()  
    print("üßπ System Memory Purged (RAM + VRAM)")
    return {"status": "memory_cleared", "device": "clean"}

def kill_port(port):
    """Windows-Safe Port Killer"""
    try:
        # 'fuser' command does not work on Windows. 
        # On Windows, we rely on the user to stop the cell or we can use generic socket check.
        # This prevents the 'FileNotFoundError' you might see on Windows.
        pass 
    except Exception as e:
        print(f"Warning: Could not check port {port}: {e}")

def start_server_thread():
    kill_port(PORT)
    config = uvicorn.Config(app, host=HOST, port=PORT, log_level="warning")
    server = uvicorn.Server(config)

    def run_server():
        try:
            server.run()
        except SystemExit:
            pass
        except Exception as e:
            print(f"Server error: {e}")

    t = threading.Thread(target=run_server, daemon=True)
    t.start()
    print("üöÄ Server started (Background)... waiting 3s")
    time.sleep(3)

if __name__ == "__main__":
    start_server_thread()
    print("‚úÖ BACKEND READY. Open 'index.html' in your browser now.")

üöÄ Server started (Background)... waiting 3s
üöÄ Server starting at http://127.0.0.1:8000
‚úÖ BACKEND READY. Open 'index.html' in your browser now.


INFO:MultiRAG-Kernel:‚ö° Loading Embedding Model on: cpu
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: sentence-transformers/all-MiniLM-L6-v2
Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 12/12 [00:07<00:00,  1.54it/s]
INFO:MultiRAG-Kernel:Built index with 377 chunks in 15.0271s
Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1/1 [00:00<00:00, 44.98it/s]
Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1/1 [00:00<00:00, 45.18it/s]


üßπ System Memory Purged (RAM + VRAM)


INFO:MultiRAG-Kernel:üî• Phase 1: Crawling https://www.promptingguide.ai/
INFO:MultiRAG-Kernel:‚úÖ Found 139 URLs.
INFO:MultiRAG-Kernel:üî• Phase 2: Extracting content from 139 URLs
INFO:MultiRAG-Kernel:‚úÖ Extracted valid text from 136 pages.
INFO:MultiRAG-Kernel:‚ö° Loading Embedding Model on: cpu
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: sentence-transformers/all-MiniLM-L6-v2
Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 62/62 [00:32<00:00,  1.92it/s]
INFO:MultiRAG-Kernel:Built index with 1971 chunks in 36.9590s
Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1/1 [00:00<00:00, 25.47it/s]
Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1/1 [00:00<00:00, 25.31it/s]
Batches: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1/1 [00:00<00:00, 30.07it/s]


üßπ System Memory Purged (RAM + VRAM)
