In [None]:
# Colab RAG notebook - single cell setup + functions
# 1) Install dependencies
!pip install -q transformers sentence-transformers faiss-cpu PyPDF2 python-docx accelerate

# Packages:
# transformers: Hugging Face transformer models (tokenizers, model classes).
# sentence-transformers: easy-to-use embedding models (SentenceTransformer wrapper).
# faiss-cpu: FAISS library for fast vector similarity search (GPU build). If you use GPU, faiss-gpu.
# PyPDF2: extract text from PDFs.
# python-docx (imported as docx): read .docx files.
# accelerate: helps with model loading and device handling 

# 2) Imports
import os
import sys
from pathlib import Path
from typing import List, Tuple #type hints to make code clearer
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from sklearn.preprocessing import normalize #normalize vectors easily (for cosine similarity).
from IPython.display import display, Markdown
from google.colab import files

# For reading pdf/docx
import PyPDF2
import docx

# 3) Configure model choices (change model names here if you wish)
EMBEDDING_MODEL = "paraphrase-MiniLM-L6-v2"  # small & fast
GEN_MODEL = "google/flan-t5-small"           # small generator that runs on T4
# Optionally: "google/flan-t5-base" for better quality if GPU memory allows

# 4) #Add your hugging face token
HF_TOKEN = ""
os.environ["HF_TOKEN"] = HF_TOKEN

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

# 5) Load models (embedding + generator)
print("Loading embedding model...")
embedder = SentenceTransformer(EMBEDDING_MODEL, device=device)

print("Loading generator model and tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(GEN_MODEL, use_auth_token=HF_TOKEN)
gen_model = AutoModelForSeq2SeqLM.from_pretrained(GEN_MODEL, use_auth_token=HF_TOKEN)
gen_model = gen_model.to(device)
gen_model.eval() #set model to evaluation mode (disables dropout, etc.)

# 6) Utilities: read files and extract text
def read_pdf(path: str) -> str:
    text_pages = [] 
    with open(path, "rb") as f:
        reader = PyPDF2.PdfReader(f)
        for p in reader.pages:
            try:
                text_pages.append(p.extract_text() or "") #attempt to get text
            except Exception:
                text_pages.append("")
    return "\n".join(text_pages)

def read_docx(path: str) -> str:
    doc = docx.Document(path)
    lines = [p.text for p in doc.paragraphs]
    return "\n".join(lines)

def read_txt(path: str) -> str:
    with open(path, "r", encoding="utf-8", errors="ignore") as f: # text data - converted in machine redable data
        return f.read()

def load_file_text(path: str) -> str:
    path = str(path)
    if path.lower().endswith(".pdf"):
        return read_pdf(path)
    if path.lower().endswith(".docx"):
        return read_docx(path)
    if path.lower().endswith(".txt"):
        return read_txt(path)
    raise ValueError("Unsupported file type: " + path)

# 7) Chunking function (splits long text into chunks for retrieval)
def chunk_text(text: str, chunk_size: int = 500, chunk_overlap: int = 50) -> List[str]:
    """
    chunk_size: approx tokens/characters per chunk (characters here)
    chunk_overlap: overlapping characters between chunks
    """
    text = text.replace("\r", " ")
    chunks = [] 
    start = 0
    length = len(text)
    while start < length:
        end = start + chunk_size
        chunk = text[start:end]
        chunks.append(chunk.strip()) #strip() removes leading/trailing whitespace.
        start = end - chunk_overlap
        if start < 0:
            start = 0
    # drop empty
    chunks = [c for c in chunks if len(c) > 20]
    return chunks

# 8) Build FAISS index from multiple uploaded files
class RAGIndex:
    def __init__(self, embedding_dim: int):
        self.metadata = []  # meta per vector -> (source, chunk_text)
        self.embeddings = None
        self.index = None
        self.embedding_dim = embedding_dim

    def add(self, vectors: np.ndarray, metas: List[Tuple[str,str]]):
        vectors = vectors.astype('float32')
        if self.embeddings is None:
            self.embeddings = vectors #simply assign vectors to self.embeddings
        else:
            self.embeddings = np.vstack([self.embeddings, vectors]) #stack the new vectors below existing ones using np.vstack
        self.metadata.extend(metas) #Extend the metadata list with the new metas

    def build_faiss(self):
        if self.embeddings is None:
            raise ValueError("No embeddings added.")
        # L2 index (use inner product after normalization for cosine)
        self.index = faiss.IndexFlatIP(self.embedding_dim) #computes Inner Product (IP) between vectors.
        # normalize vectors for cosine similarity
        normalize(self.embeddings, axis=1, copy=False) #axis=1 means operate across each row (each vector)
        self.index.add(self.embeddings) #Why: this is the step that makes the dataset searchable quickly.
        print("FAISS index built, total vectors:", self.index.ntotal)

    #search takes q_vec (query vector(s)) and returns the top-k closest vectors according to inner product
    def search(self, q_vec: np.ndarray, top_k: int = 5):
        q = q_vec.astype('float32')
        normalize(q, axis=1, copy=False)
        # D: distances (here inner product scores) with shape (M, top_k).
        # I: indices (ints) of the nearest neighbors with shape (M, top_k).
        D, I = self.index.search(q, top_k)
        results = [] # [['':'' , '':'', ], ['':'' , '':'', ], ['':'' , '':'', ]]
        for dist_list, idx_list in zip(D, I):
            batch = [] # ['':'' , '':'', ]
            for dist, idx in zip(dist_list, idx_list):
                meta = self.metadata[idx]
                batch.append({"score": float(dist), "source": meta[0], "text": meta[1]})
            results.append(batch)
        return results

# 9) Full pipeline: upload -> index -> query
INDEX = None

def upload_and_index_files(paths: List[str], chunk_size: int = 700, chunk_overlap: int = 100):
    """
    paths: list of local file paths (in Colab environment)
    """
    global INDEX
    all_texts = []
    metas = []
    for p in paths:
        print("Reading:", p) #gives user feedback
        txt = load_file_text(p)
        chunks = chunk_text(txt, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
        print(f"  -> {len(chunks)} chunks from {p}")
        for i, c in enumerate(chunks):
            metas.append((os.path.basename(p), c)) #os.path.basename(p) extracts the file name (without directories) — this is saved as the source in metadata
            all_texts.append(c) #all_texts.append(c) — store the chunk string in all_texts
    if len(all_texts) == 0:
        raise ValueError("No text extracted from files.")

    # compute embeddings in batches
    B = 64  #sets an embedding batch size. We will embed 64 chunks at a time
    embeddings = []
    for i in range(0, len(all_texts), B):
        batch = all_texts[i:i+B] #selects a sublist of up to B chunks
        emb = embedder.encode(batch, convert_to_numpy=True, show_progress_bar=False)
        embeddings.append(emb)
    embeddings = np.vstack(embeddings)
    dim = embeddings.shape[1] #stores the embedding dimensionality D. We need this to build the RAGIndex
    INDEX = RAGIndex(embedding_dim=dim)
    INDEX.add(embeddings, metas)
    INDEX.build_faiss()
    print("Indexing complete.")

def retrieve_contexts(query: str, top_k: int = 5) -> List[dict]:
    q_emb = embedder.encode([query], convert_to_numpy=True) #Returns a NumPy array q_emb with shape (1, D)
    hits = INDEX.search(q_emb, top_k=top_k)[0]
    return hits

def generate_answer(query: str, top_k: int = 4, max_len: int = 256, temperature: float = 0.1) -> str:
    # retrieve
    hits = retrieve_contexts(query, top_k=top_k) #Call the retrieval function; hits is a list of top_k dicts
    # build prompt: include top chunks with sources
    context_texts = []
    for h in hits:
        # limit chunk length added
        chunk = h["text"]
        source = h["source"]
        context_texts.append(f"Source: {source}\n{chunk}")
    context = "\n\n---\n\n".join(context_texts)
    prompt = (
        "You are a helpful assistant. Use the provided CONTEXT to answer the question. "
        "If the answer is not contained in the context, say you don't know.\n\n"
        f"CONTEXT:\n{context}\n\n"
        f"QUESTION: {query}\n\nAnswer:"
    )
    # tokenize and generate
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).to(device)
    #truncates the tokenized input to at most 1024 tokens
    with torch.no_grad():
        out = gen_model.generate(
            **inputs,
            max_length=max_len,
            do_sample=False,
            num_beams=4,
            early_stopping=True #enables beam search with 4 beams (looks for the best among 4 candidate continuations).
        )
    answer = tokenizer.decode(out[0], skip_special_tokens=True)
    return answer, hits

# 10) Helper: upload files from local machine in Colab
def colab_upload_and_index():
    print("Use the file browser or the button to upload files. You can upload multiple files (pdf, docx, txt).")
    uploaded = files.upload()
    saved_paths = []
    for filename in uploaded.keys():
        saved_paths.append(filename)
    upload_and_index_files(saved_paths)
    print("Uploaded & indexed files:", saved_paths)

# 11) Example interactive loop (run after indexing)
def interactive_qa_loop():
    print("Enter 'exit' to stop.")
    while True:
        q = input("\nYour question: ")
        if q.strip().lower() in ("exit","quit"):
            break
        ans, hits = generate_answer(q)
        display(Markdown("**Answer:**\n\n" + ans))
        display(Markdown("**Top retrieved contexts (debug):**"))
        for i, h in enumerate(hits):
            display(Markdown(f"**{i+1}. Source:** {h['source']}  \n**Score:** {h['score']:.4f}\n\n{h['text'][:700]}..."))

# 12) Example usage instructions printed
print("\nREADY.\nHow to use:\n1) Run colab_upload_and_index() to upload & index your files.\n2) Call interactive_qa_loop() to start asking questions.\n\nExample:\n  colab_upload_and_index()\n  interactive_qa_loop()\n")


In [None]:
# Gradio UI for your existing RAG functions
!pip install -q gradio==3.44.0 >/dev/null 2>&1

import gradio as gr
from pathlib import Path
import shutil

# Helper to adapt gr.File upload to local paths and call your existing indexer
def gradio_index(files, chunk_size=700, chunk_overlap=100):
    """
    files: list of temporary file-like objects from Gradio (each has .name)
    """
    if not files:
        return "No files uploaded. Please upload pdf/docx/txt files.", ""
    # Save files to current working dir to ensure existing load_file_text can read them
    saved_paths = []
    for f in files:
        src_path = f.name if hasattr(f, "name") else f
        # ensure a safe destination filename (use original name if present)
        dest_name = Path(src_path).name
        dest_path = dest_name
        # Copy the uploaded temp file to working dir (overwrite if exists)
        shutil.copy(src_path, dest_path)
        saved_paths.append(dest_path)

    try:
        upload_and_index_files(saved_paths, chunk_size=int(chunk_size), chunk_overlap=int(chunk_overlap))
    except Exception as e:
        return f"Indexing failed: {e}", ""
    return f"Indexed {len(saved_paths)} file(s): {', '.join(saved_paths)}", ", ".join(saved_paths)

# Helper to ask question using your generate_answer
def gradio_ask(question, top_k=4, max_len=256):
    if INDEX is None:
        return "Please upload & index files first.", "No retrieved contexts (index empty)."
    try:
        ans, hits = generate_answer(question, top_k=int(top_k), max_len=int(max_len))
    except Exception as e:
        return f"Error while generating answer: {e}", ""
    # format retrieved contexts nicely
    sources_md = ""
    for i, h in enumerate(hits):
        snippet = h["text"]
        if len(snippet) > 600:
            snippet = snippet[:600] + "..."
        sources_md += f"**{i+1}. {h['source']}** (score: {h['score']:.4f})\n\n{snippet}\n\n---\n\n"
    return ans, sources_md

# Build Gradio interface
with gr.Blocks() as app:
    gr.Markdown("# RAG — Simple Gradio UI")
    gr.Markdown("Upload files (pdf/docx/txt) -> Index -> Ask questions.\n\nMake sure you've run the earlier cell that loads the models and defines `upload_and_index_files` and `generate_answer`.")

    with gr.Row():
        with gr.Column(scale=1):
            file_input = gr.File(label="Upload files (pdf/docx/txt)", file_count="multiple", type="file")
            chunk_size = gr.Slider(200, 2000, value=700, step=50, label="Chunk size (characters)")
            chunk_overlap = gr.Slider(0, 500, value=100, step=10, label="Chunk overlap (characters)")
            index_btn = gr.Button("Upload & Index")
            status = gr.Textbox(label="Indexer status", value="No files indexed yet.", interactive=False)
            files_list = gr.Textbox(label="Files indexed", value="", interactive=False)
        with gr.Column(scale=2):
            qbox = gr.Textbox(label="Ask a question about uploaded files", placeholder="Type your question here...", lines=2)
            topk = gr.Slider(1, 8, value=4, step=1, label="Top-k retrieved chunks")
            max_len = gr.Slider(64, 512, value=256, step=16, label="Max answer tokens")
            ask_btn = gr.Button("Ask")

    with gr.Row():
        with gr.Column():
            answer_md = gr.Markdown("Answer will appear here.")
        with gr.Column():
            contexts_md = gr.Markdown("Retrieved contexts / sources will appear here.")

    # Wire up events
    index_btn.click(fn=gradio_index, inputs=[file_input, chunk_size, chunk_overlap], outputs=[status, files_list])
    ask_btn.click(fn=gradio_ask, inputs=[qbox, topk, max_len], outputs=[answer_md, contexts_md])
    qbox.submit(fn=gradio_ask, inputs=[qbox, topk, max_len], outputs=[answer_md, contexts_md])

# Launch the app. set share=True if you want a public link.
app.launch(share=True)
