<a href="https://colab.research.google.com/github/Champei/code-2.0/blob/main/RAG/jupyter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
!pip install sentence-transformers faiss-cpu numpy transformers torch openai streamlit




In [7]:
import os, re, time, pickle, json
from pathlib import Path
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Tuple



In [3]:
"""
rag_chatbot.py

A compact Retrieval-Augmented Generation (RAG) chatbot with:
 - Document ingestion + chunking
 - Sentence-transformers embeddings
 - FAISS vector store with metadata
 - Retriever (configurable k, metric)
 - Generator (OpenAI or local HF model)
 - Context-aware memory (keeps recent turns and past-retrievals)
 - Citation support: every response returns top sources + snippets

Usage:
  - pip install -r requirements.txt
  - Provide OPENAI_API_KEY in env if using OpenAI generator
  - Run demo at bottom or integrate into Streamlit/Gradio
"""

import os
import re
import pickle
import json
import time
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field

import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from pathlib import Path

# Optional: OpenAI for generation
try:
    import openai
    OPENAI_AVAILABLE = True
except Exception:
    OPENAI_AVAILABLE = False

# Optional: local HF model for generation
try:
    from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
    HF_AVAILABLE = True
except Exception:
    HF_AVAILABLE = False

# -----------------------
# Utilities
# -----------------------

def simple_text_clean(text: str) -> str:
    text = text.replace("\r", " ").replace("\n", " ").strip()
    text = re.sub(r"\s+", " ", text)
    return text

def chunk_text(text: str, chunk_size: int = 800, overlap: int = 100) -> List[str]:
    text = simple_text_clean(text)
    if len(text) <= chunk_size:
        return [text]
    chunks = []
    start = 0
    while start < len(text):
        end = min(len(text), start + chunk_size)
        chunk = text[start:end]
        chunks.append(chunk)
        if end == len(text):
            break
        start = max(0, end - overlap)
    return chunks

# -----------------------
# Data classes
# -----------------------

@dataclass
class DocChunk:
    id: str
    text: str
    metadata: Dict[str, Any] = field(default_factory=dict)
    embedding: Optional[np.ndarray] = None

# -----------------------
# Document Store + Indexing
# -----------------------

class DocumentStore:
    """Manages ingestion, chunking, embeddings, and a FAISS index + metadata store."""
    def __init__(self, embed_model_name: str = "all-MiniLM-L6-v2",
                 faiss_index_path: Optional[str] = None,
                 persist_path: Optional[str] = "./rag_store"):
        self.embed_model_name = embed_model_name
        self.embed_model = SentenceTransformer(embed_model_name)
        self.dim = self.embed_model.get_sentence_embedding_dimension()
        self.index = None
        self.id_to_meta: Dict[str, Dict[str, Any]] = {}
        self.embeddings = None  # numpy array of embeddings
        self.ids = []  # list of ids in same order as embeddings
        self.persist_path = Path(persist_path)
        self.persist_path.mkdir(parents=True, exist_ok=True)
        self.faiss_index_path = faiss_index_path or (self.persist_path / "faiss.index")
        self.meta_path = self.persist_path / "metastore.pkl"
        # if persisted data exists, try load
        if self.faiss_index_path.exists() and self.meta_path.exists():
            self._load()

    def _build_faiss(self, embeddings: np.ndarray):
        # L2 index (inner product on normalized vectors => cosine)
        index = faiss.IndexFlatIP(self.dim)
        faiss.normalize_L2(embeddings)
        index.add(embeddings)
        self.index = index

    def _save(self):
        if self.index is None:
            return
        faiss.write_index(self.index, str(self.faiss_index_path))
        with open(self.meta_path, "wb") as f:
            pickle.dump({"ids": self.ids, "id_to_meta": self.id_to_meta}, f)

    def _load(self):
        idx = faiss.read_index(str(self.faiss_index_path))
        self.index = idx
        with open(self.meta_path, "rb") as f:
            d = pickle.load(f)
            self.ids = d["ids"]
            self.id_to_meta = d["id_to_meta"]

    def add_documents(self, docs: List[Tuple[str, str, Dict[str, Any]]],
                      chunk_size: int = 800, chunk_overlap: int = 100):
        """
        docs: list of tuples (doc_id, raw_text, metadata)
        Splits into chunks and embeds them, adds to FAISS
        """
        chunks: List[DocChunk] = []
        for doc_id, raw_text, meta in docs:
            for i, c in enumerate(chunk_text(raw_text, chunk_size, chunk_overlap)):
                cid = f"{doc_id}__chunk{i}"
                chunks.append(DocChunk(id=cid, text=c, metadata={**meta, "source_id": doc_id, "chunk_index": i}))

        if not chunks:
            return

        texts = [c.text for c in chunks]
        embeddings = self.embed_model.encode(texts, convert_to_numpy=True, show_progress_bar=True)
        # normalize embeddings for cosine similarity via inner product
        faiss.normalize_L2(embeddings)

        # persist meta and embeddings arrays
        if self.index is None:
            self.index = faiss.IndexFlatIP(self.dim)
            self.index.add(embeddings)
            self.ids = [c.id for c in chunks]
            for c, emb in zip(chunks, embeddings):
                self.id_to_meta[c.id] = {"text": c.text, **c.metadata}
        else:
            self.index.add(embeddings)
            self.ids.extend([c.id for c in chunks])
            for c, emb in zip(chunks, embeddings):
                self.id_to_meta[c.id] = {"text": c.text, **c.metadata}
        self._save()

    def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
        q_emb = self.embed_model.encode([query], convert_to_numpy=True)
        faiss.normalize_L2(q_emb)
        if self.index is None or self.index.ntotal == 0:
            return []
        D, I = self.index.search(q_emb, top_k)
        results = []
        for score, idx in zip(D[0], I[0]):
            if idx < 0 or idx >= len(self.ids):
                continue
            doc_id = self.ids[idx]
            meta = self.id_to_meta.get(doc_id, {})
            results.append({
                "id": doc_id,
                "score": float(score),   # cosine / inner product
                "text": meta.get("text", ""),
                "metadata": {k:v for k,v in meta.items() if k!="text"}
            })
        return results

# -----------------------
# Memory Manager (Context-Aware Memory)
# -----------------------

class MemoryManager:
    """
    Keeps recent conversation turns and also stores which docs were used previously.
    Memory is short-term by default (configurable length).
    """
    def __init__(self, max_turns: int = 6):
        self.max_turns = max_turns
        self.turns: List[Dict[str, Any]] = []  # list of dicts {role, text, time, retrieved_docs}

    def add_turn(self, role: str, text: str, retrieved_docs: Optional[List[Dict]] = None):
        self.turns.append({"role": role, "text": text, "time": time.time(), "retrieved_docs": retrieved_docs or []})
        if len(self.turns) > self.max_turns:
            self.turns = self.turns[-self.max_turns:]

    def get_memory_prompt(self) -> str:
        """Return a short summary suitable to inject into the LLM prompt for context personalization."""
        if not self.turns:
            return ""
        parts = []
        for t in self.turns:
            role = t["role"]
            text = t["text"]
            parts.append(f"{role}: {text}")
        return "\n".join(parts)

# -----------------------
# Generator / LLM wrapper
# -----------------------

class Generator:
    """
    Generator supports two modes:
      - 'openai' uses OpenAI ChatCompletion (you must set OPENAI_API_KEY)
      - 'local' uses a HuggingFace causal model (if available)
    """
    def __init__(self, mode: str = "openai", openai_model: str = "gpt-4o-mini", hf_model_name: str = "gpt2"):
        self.mode = mode
        self.openai_model = openai_model
        self.hf_model_name = hf_model_name
        self.hf_pipe = None
        if mode == "openai" and OPENAI_AVAILABLE:
            openai.api_key = os.environ.get("OPENAI_API_KEY", "")
            # rely on OpenAI client being configured
        if mode == "local" and HF_AVAILABLE:
            # load small local model for demo; user can substitute a stronger one
            try:
                self.hf_pipe = pipeline("text-generation", model=self.hf_model_name, device_map="auto")
            except Exception as e:
                print("Failed to load HF model:", e)
                self.hf_pipe = None

    def generate(self, prompt: str, max_tokens: int = 512, temperature: float = 0.0) -> str:
        if self.mode == "openai" and OPENAI_AVAILABLE:
            resp = openai.ChatCompletion.create(
                model=self.openai_model,
                messages=[{"role":"user","content": prompt}],
                max_tokens=max_tokens,
                temperature=temperature,
                n=1
            )
            return resp.choices[0].message.content.strip()
        elif self.mode == "local" and self.hf_pipe is not None:
            out = self.hf_pipe(prompt, max_length= max_tokens + len(prompt.split()), do_sample=False)
            return out[0]["generated_text"][len(prompt):].strip()
        else:
            # fallback: simple echo (safe fallback so code runs)
            return "Sorry — no generator configured. Please set OPENAI_API_KEY or install HF models."

# -----------------------
# RAG Pipeline
# -----------------------

class RAGPipeline:
    """
    Orchestrates retrieval + generation + citation support + memory.
    """
    def __init__(self,
                 store: DocumentStore,
                 generator: Generator,
                 memory: Optional[MemoryManager] = None,
                 top_k: int = 4):
        self.store = store
        self.generator = generator
        self.memory = memory or MemoryManager()
        self.top_k = top_k

    def _build_context_block(self, retrieved: List[Dict[str, Any]]) -> str:
        """Create a context block with numbered citations and snippets."""
        parts = []
        for i, r in enumerate(retrieved, start=1):
            source = r["metadata"].get("source_id", r["id"])
            snippet = r["text"][:600].strip()
            parts.append(f"[{i}] Source: {source} | Score: {r['score']:.4f}\nSnippet: {snippet}")
        return "\n\n".join(parts)

    def _build_prompt(self, user_query: str, retrieved: List[Dict[str, Any]]) -> str:
        """Compose a prompt that injects memory + retrieved context + explicit citation instructions."""
        memory_block = self.memory.get_memory_prompt()
        context_block = self._build_context_block(retrieved)
        prompt = "You are a helpful assistant that answers user queries using ONLY the provided context.\n"
        prompt += "Each claim should refer to the source numbers in the context block. At the end, include a 'Sources' list with links/ids and one-sentence justification.\n\n"
        if memory_block:
            prompt += f"Conversation memory (recent turns):\n{memory_block}\n\n"
        if context_block:
            prompt += f"Context (retrieved documents):\n{context_block}\n\n"
        prompt += f"User question:\n{user_query}\n\nAnswer precisely, cite [1], [2], ... inline where appropriate, and finish with a 'Sources' list.\n"
        return prompt

    def answer(self, user_query: str, k: Optional[int] = None) -> Dict[str, Any]:
        k = k or self.top_k
        # If query seems vague, optionally refine -> *we will implement a quick heuristic refinement*
        if len(user_query.strip().split()) <= 2:
            # Short queries often ambiguous: automatic clarification attempt
            clar_prompt = f"User asked a short question: '{user_query}'. Provide 1-2 clarifying yes/no or multi-choice questions to refine intent."
            clar_question = self.generator.generate(clar_prompt, max_tokens=60, temperature=0.7)
            # attach clar question as assistant turn into memory and return clarifier instead of full retrieval
            self.memory.add_turn("assistant", clar_question, retrieved_docs=[])
            return {"clarify": True, "clarification": clar_question, "sources": []}

        # Retrieval
        retrieved = self.store.search(user_query, top_k=k)
        # Add user turn to memory (before generation) with retrieved docs
        self.memory.add_turn("user", user_query, retrieved_docs=[r["id"] for r in retrieved])
        # Build prompt
        prompt = self._build_prompt(user_query, retrieved)
        # Generate answer
        gen_text = self.generator.generate(prompt, max_tokens=512, temperature=0.0)
        # Save assistant turn to memory
        self.memory.add_turn("assistant", gen_text, retrieved_docs=[r["id"] for r in retrieved])
        # Build formatted citations: produce small snippets & metadata
        citations = []
        for i, r in enumerate(retrieved, start=1):
            citations.append({
                "rank": i,
                "id": r["id"],
                "source_id": r["metadata"].get("source_id"),
                "score": r["score"],
                "snippet": r["text"][:400]
            })
        return {
            "clarify": False,
            "answer": gen_text,
            "citations": citations,
            "retrieved_count": len(retrieved),
            "memory_snapshot": self.memory.get_memory_prompt()
        }

# -----------------------
# Small demo / helper functions
# -----------------------

def load_text_file(path: str) -> str:
    return Path(path).read_text(encoding="utf8")

def ingest_folder_texts(folder: str, store: DocumentStore):
    folder = Path(folder)
    docs = []
    for p in folder.glob("**/*"):
        if p.is_file() and p.suffix.lower() in [".txt", ".md"]:
            docs.append((p.stem, load_text_file(str(p)), {"path": str(p)}))
    store.add_documents(docs)

# -----------------------
# Example / Quickstart
# -----------------------

def quick_demo_local():
    """
    Quick demonstration that:
     - Builds a small store from supplied sample text snippets
     - Runs a few queries
    """
    # 1) Build store
    store = DocumentStore(persist_path="./demo_store")
    # If store empty, add demo docs
    if store.index is None or store.index.ntotal == 0:
        sample_docs = [
            ("doc_weather", "Rainy days in Mumbai are common during the monsoon season (June to September). Bring an umbrella.", {"url":"local://weather"}),
            ("doc_history", "The Indian independence movement culminated in 1947 when British India was partitioned into India and Pakistan.", {"url":"local://history"}),
            ("doc_python", "Python is a high-level programming language created by Guido van Rossum and first released in 1991.", {"url":"local://python"})
        ]
        store.add_documents(sample_docs)

    # 2) Generator
    gen = Generator(mode="openai") if OPENAI_AVAILABLE else Generator(mode="local", hf_model_name="gpt2")
    # 3) Memory
    mem = MemoryManager(max_turns=8)
    # 4) Pipeline
    rag = RAGPipeline(store=store, generator=gen, memory=mem, top_k=3)

    print("Ask me something (type 'exit' to quit).")
    while True:
        q = input("User> ").strip()
        if q.lower() in ("exit","quit"):
            break
        resp = rag.answer(q)
        if resp.get("clarify"):
            print("Assistant (clarify):", resp["clarification"])
            continue
        print("\n--- Answer ---\n")
        print(resp["answer"])
        print("\n--- Citations ---")
        for c in resp["citations"]:
            print(f"[{c['rank']}] {c['source_id'] or c['id']} (score={c['score']:.4f})\n{c['snippet']}\n")
        print("\n--- Memory ---")
        print(resp["memory_snapshot"])
        print("\n")

# -----------------------
# Streamlit snippet (optional)
# -----------------------
STREAMLIT_SNIPPET = """
# streamlit_app.py (example)
import streamlit as st
from rag_chatbot import DocumentStore, Generator, MemoryManager, RAGPipeline, ingest_folder_texts

st.title("RAG Chatbot Demo")

# Initialize store / pipeline only once (cache)
@st.cache_resource
def init_pipeline():
    store = DocumentStore(persist_path="./demo_store")
    # ingest .txt files in 'corpus' folder
    ingest_folder_texts("corpus", store)
    gen = Generator(mode="openai")  # set to 'local' if desired
    mem = MemoryManager(max_turns=6)
    rag = RAGPipeline(store=store, generator=gen, memory=mem, top_k=4)
    return rag

rag = init_pipeline()

q = st.text_input("Ask a question")
if st.button("Ask") and q:
    resp = rag.answer(q)
    if resp.get("clarify"):
        st.info(resp["clarification"])
    else:
        st.write(resp["answer"])
        st.write("Sources:")
        for c in resp["citations"]:
            st.write(f\"[{c['rank']}] {c['source_id']} (score={c['score']:.4f})\")
            st.write(c['snippet'])
"""

# -----------------------
# Requirements file content (to save as requirements.txt)
# -----------------------

REQUIREMENTS_TXT = """
sentence-transformers>=2.2.2
faiss-cpu>=1.7.4
numpy>=1.23
transformers>=4.34.0
torch>=1.13.0
openai>=0.27.0
streamlit>=1.20
"""

# -----------------------
# If run as script, launch quick demo
# -----------------------

if __name__ == "__main__":
    print("RAG Chatbot module. Run quick_demo_local() to try an interactive demo.")
    # Start interactive demo if run directly
    try:
        quick_demo_local()
    except KeyboardInterrupt:
        print("\nGoodbye.")


RAG Chatbot module. Run quick_demo_local() to try an interactive demo.


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Ask me something (type 'exit' to quit).

Goodbye.
User> When did India gain independence?
