In [1]:
# Installing the dependencies
!pip install -q groq sentence-transformers faiss-cpu pypdf gradio numpy tqdm

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.9/134.9 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m19.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m322.5/322.5 kB[0m [31m25.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [19]:
!pip install python-dotenv




In [21]:
#Importing the dependencies:
import os
import io
import json
import faiss
import numpy as np
from typing import List, Tuple, Dict
from pypdf import PdfReader
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
import gradio as gr
from groq import Groq

In [15]:
# Optional reranker
try:
    from cross_encoder import CrossEncoder
    HAVE_RERANKER = True
except Exception:
    HAVE_RERANKER = False

In [25]:
# Configuring Groq
GROQ_KEY = os.environ.get("GROQ_API_KEY", "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")
if not GROQ_KEY:
    print("⚠️ WARNING: GROQ_API_KEY not set. Set os.environ['GROQ_API_KEY'] before calling the model.")
client = Groq(api_key=GROQ_KEY) if GROQ_KEY else None

In [26]:
# Models & parameters
EMBED_MODEL_NAME = "all-MiniLM-L6-v2"   # small & fast; change if you want bigger
embed_model = SentenceTransformer(EMBED_MODEL_NAME)



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [28]:
# If reranker available, load it (optional — slower & heavier)
if HAVE_RERANKER:
    reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
else:
    reranker = None

In [29]:
# FAISS index + metadata holders (global)
FAISS_INDEX = None
CHUNKS: List[str] = []        # text chunks
METADATAS: List[dict] = []    # parallel list with metadata dicts for each chunk
EMBEDDINGS: np.ndarray = None # shape = (n_chunks, dim)

In [30]:
# CELL 3 — Helper functions: PDF/text ingestion & chunking
def pdf_to_text_bytes(file_bytes: bytes) -> str:
    """Extract text from PDF bytes (pypdf)."""
    reader = PdfReader(io.BytesIO(file_bytes))
    texts = []
    for page in reader.pages:
        try:
            ptxt = page.extract_text()
            if ptxt:
                texts.append(ptxt)
        except Exception:
            # skip problematic pages
            continue
    return "\n".join(texts)

def text_to_chunks(text: str, chunk_size: int = 400, overlap: int = 100) -> List[str]:
    """Chunk text by words with overlap (returns list of strings)."""
    words = text.split()
    if not words:
        return []
    chunks = []
    i = 0
    while i < len(words):
        chunk = words[i:i+chunk_size]
        chunks.append(" ".join(chunk))
        i += chunk_size - overlap
    return chunks

def add_documents_from_files(file_objs: List[gr.File], doc_prefix: str = "doc"):
    """
    Accepts list of uploaded Gradio File objects (each has .name and .file).
    Returns number of chunks added.
    """
    global CHUNKS, METADATAS
    added = 0
    for fidx, f in enumerate(file_objs):
        fname = getattr(f, "name", f"file_{fidx}")
        # read binary content
        file_bytes = f.read() if hasattr(f, "read") else open(f, "rb").read()
        # handle PDF; if not PDF, treat as text
        text = ""
        try:
            text = pdf_to_text_bytes(file_bytes)
        except Exception:
            try:
                text = file_bytes.decode("utf-8")
            except Exception:
                text = ""
        if not text.strip():
            continue
        chunks = text_to_chunks(text)
        for i, ch in enumerate(chunks):
            CHUNKS.append(ch)
            METADATAS.append({"source": fname, "chunk_index": i})
            added += 1
    return added


In [31]:
# CELL 4 — Embedding utilities & FAISS build/update
def embed_texts(texts: List[str], batch_size: int = 64) -> np.ndarray:
    """Return normalized float32 embeddings for a list of texts."""
    if not texts:
        return np.zeros((0, embed_model.get_sentence_embedding_dimension()), dtype="float32")
    embs = embed_model.encode(texts, batch_size=batch_size, convert_to_numpy=True, show_progress_bar=False)
    # normalize to unit length for cosine via inner product
    norms = np.linalg.norm(embs, axis=1, keepdims=True)
    norms[norms==0] = 1.0
    embs = embs / norms
    return embs.astype("float32")

def build_faiss_index_from_embeddings(embs: np.ndarray) -> faiss.IndexFlatIP:
    """Create a FAISS IndexFlatIP and add embeddings."""
    dim = embs.shape[1]
    index = faiss.IndexFlatIP(dim)
    if embs.shape[0] > 0:
        index.add(embs)
    return index

def rebuild_index():
    """Rebuild FAISS index from current CHUNKS (re-embed everything)."""
    global FAISS_INDEX, EMBEDDINGS
    if not CHUNKS:
        FAISS_INDEX = None
        EMBEDDINGS = None
        return 0
    EMBEDDINGS = embed_texts(CHUNKS)
    FAISS_INDEX = build_faiss_index_from_embeddings(EMBEDDINGS)
    return EMBEDDINGS.shape[0]

def add_new_chunks_and_update_index(new_texts: List[str]):
    """Append new texts to CHUNKS and incrementally add embeddings to FAISS."""
    global CHUNKS, METADATAS, EMBEDDINGS, FAISS_INDEX
    if not new_texts:
        return 0
    new_embs = embed_texts(new_texts)
    # append texts and placeholder metadatas (caller should add METADATAS accordingly)
    CHUNKS.extend(new_texts)
    if EMBEDDINGS is None:
        EMBEDDINGS = new_embs
        FAISS_INDEX = build_faiss_index_from_embeddings(EMBEDDINGS)
    else:
        EMBEDDINGS = np.vstack([EMBEDDINGS, new_embs])
        FAISS_INDEX.add(new_embs)
    return new_embs.shape[0]


In [32]:
# CELL 5 — Retrieval (ANN + optional rerank)
def retrieve(query: str, top_k: int = 8, rerank_top_k: int = 5) -> List[Tuple[str, dict, float]]:
    """
    Returns list of (chunk_text, metadata, score) ordered by final rank.
    If reranker is available, reranks top results.
    """
    global FAISS_INDEX, CHUNKS, METADATAS, EMBEDDINGS
    if FAISS_INDEX is None or EMBEDDINGS is None or len(CHUNKS) == 0:
        return []
    qvec = embed_texts([query])  # shape (1, dim)
    D, I = FAISS_INDEX.search(qvec, top_k)  # D = scores, I = indices
    scores = D[0].tolist()
    idxs = I[0].tolist()
    candidates = []
    for idx, sc in zip(idxs, scores):
        if idx < 0 or idx >= len(CHUNKS):
            continue
        candidates.append((CHUNKS[idx], METADATAS[idx], float(sc)))
    # optional rerank using cross-encoder
    if reranker is not None and candidates:
        rerank_candidates = [c[0] for c in candidates[:rerank_top_k]]
        pairs = [(query, txt) for txt in rerank_candidates]
        rr_scores = reranker.predict(pairs)
        order = np.argsort(-rr_scores)
        reranked = []
        for oi in order:
            txt = rerank_candidates[oi]
            # find original tuple
            for t,m,s in candidates:
                if t == txt:
                    reranked.append((t,m,float(rr_scores[oi])))
                    break
        # append rest (not reranked) after
        if len(candidates) > rerank_top_k:
            reranked.extend(candidates[rerank_top_k:])
        return reranked
    else:
        return candidates


In [37]:
# CELL 6 — Groq LLM wrapper (generator)
SYSTEM_PROMPT = (
    "You are a super intelligent and advanced helpful assistant designed and engineered by Parker. Use only the provided CONTEXT to answer. "
    "Cite sources by writing (source: <filename>, chunk: <index>) after the relevant facts. "
    "If the answer cannot be found in the context, say 'I don't know'. Be concise and factual."
)

def build_prompt_with_context(query: str, retrieved: List[Tuple[str, dict, float]], max_context_chunks: int = 6) -> str:
    parts = []
    for i,(txt, meta, score) in enumerate(retrieved[:max_context_chunks]):
        src = meta.get("source", "unknown")
        idx = meta.get("chunk_index", i)
        parts.append(f"[source:{src}::chunk:{idx}]\n{txt}\n")
    context = "\n\n".join(parts)
    prompt = f"{SYSTEM_PROMPT}\n\nCONTEXT:\n{context}\n\nQUESTION:\n{query}\n\nANSWER:\n"
    return prompt

def generate_answer_with_groq(query: str, retrieved: List[Tuple[str, dict, float]], model_name: str = "llama-3.1-8b-instant", max_tokens: int = 400, temperature: float = 0.0) -> str:
    if client is None:
        return "⚠️ Groq client not configured. Set GROQ_API_KEY in environment."
    prompt = build_prompt_with_context(query, retrieved)
    resp = client.chat.completions.create(
        model=model_name,
        messages=[{"role":"system","content":SYSTEM_PROMPT}, {"role":"user","content":prompt}],
        max_tokens=max_tokens,
        temperature=temperature
    )
    # safety: fallback if unexpected
    try:
        return resp.choices[0].message.content
    except Exception:
        return str(resp)


In [38]:
# ============================
# CELL 7 — Save / Load index & metadata (optional)
# ============================
import pickle

def save_index_and_meta(dirpath: str = "/content/rag_index"):
    os.makedirs(dirpath, exist_ok=True)
    # save FAISS index
    if FAISS_INDEX is not None:
        faiss.write_index(FAISS_INDEX, f"{dirpath}/faiss.index")
    # save embeddings, chunks and metadatas
    with open(f"{dirpath}/chunks.json", "w", encoding="utf-8") as fh:
        json.dump(CHUNKS, fh, ensure_ascii=False)
    with open(f"{dirpath}/metadatas.json", "w", encoding="utf-8") as fh:
        json.dump(METADATAS, fh, ensure_ascii=False)
    if EMBEDDINGS is not None:
        np.save(f"{dirpath}/embeddings.npy", EMBEDDINGS)
    return dirpath

def load_index_and_meta(dirpath: str = "/content/rag_index"):
    global CHUNKS, METADATAS, EMBEDDINGS, FAISS_INDEX
    if not os.path.exists(dirpath):
        raise FileNotFoundError("Index folder not found: " + dirpath)
    with open(f"{dirpath}/chunks.json", "r", encoding="utf-8") as fh:
        CHUNKS = json.load(fh)
    with open(f"{dirpath}/metadatas.json", "r", encoding="utf-8") as fh:
        METADATAS = json.load(fh)
    emb_path = f"{dirpath}/embeddings.npy"
    if os.path.exists(emb_path):
        EMBEDDINGS = np.load(emb_path)
        FAISS_INDEX = faiss.read_index(f"{dirpath}/faiss.index")
    else:
        # re-embed if embeddings missing
        rebuild_index()
    return len(CHUNKS)


In [39]:
# ============================
# CELL 8 — Gradio UI: multi-file upload + chat
# ============================
# Utility wrappers for Gradio callbacks
def process_and_index(uploaded_files, chunk_size=400, overlap=100):
    """
    uploaded_files: list of gr.File-like objects (in Colab these are tempfile._TemporaryFileWrapper)
    """
    # add docs -> chunks & metadata
    added = add_documents_from_files(uploaded_files)
    if added == 0:
        return "No text extracted from uploaded files. Make sure they are valid PDFs or plain text."
    # rebuild embeddings & FAISS index (simple and robust)
    n = rebuild_index()
    return f"✅ Processed {len(uploaded_files)} file(s). Total chunks: {n}"

def chat_with_docs(user_message, chat_history, top_k=8, rerank_k=5):
    """
    chat_history: list of [user, assistant] pairs
    Returns updated chat_history
    """
    if not CHUNKS or FAISS_INDEX is None:
        chat_history = chat_history or []
        chat_history.append(["⚠️ Upload and process documents first.", ""])
        return chat_history
    # retrieve
    retrieved = retrieve(user_message, top_k=top_k, rerank_top_k=rerank_k)
    # call LLM
    answer = generate_answer_with_groq(user_message, retrieved)
    # append to history
    chat_history = chat_history or []
    chat_history.append([user_message, answer])
    return chat_history

# Build Gradio interface
with gr.Blocks() as demo:
    gr.Markdown("## 📚 Multi-Document RAG Chatbot (Colab) \nUpload multiple PDFs or text files, process them, then chat.\n")
    with gr.Row():
        files_in = gr.File(file_count="multiple", label="Upload PDF(s) or text files (.pdf/.txt)")
        process_btn = gr.Button("Process & Index")
        status_box = gr.Textbox(label="Status", interactive=False)
    with gr.Row():
        # small controls
        top_k = gr.Slider(minimum=1, maximum=20, value=8, label="FAISS Top-K")
        rerank_k = gr.Slider(minimum=0, maximum=10, value=5, label="Reranker top-K (0 to disable)")
    chatbot = gr.Chatbot(label="Chat with your docs")
    user_input = gr.Textbox(label="Your question", placeholder="Ask anything about the uploaded documents...")
    with gr.Row():
        submit_btn = gr.Button("Send")
        save_btn = gr.Button("Save index")
        load_btn = gr.Button("Load index")
    # callbacks
    process_btn.click(process_and_index, inputs=files_in, outputs=status_box)
    # Use the sliders as inputs for retrieval
    def chat_cb(msg, history, tk, rk):
        return chat_with_docs(msg, history, top_k=tk, rerank_k=rk)
    submit_btn.click(chat_cb, inputs=[user_input, chatbot, top_k, rerank_k], outputs=chatbot)
    save_btn.click(lambda: save_index_and_meta("/content/rag_index"), inputs=None, outputs=status_box)
    load_btn.click(lambda: load_index_and_meta("/content/rag_index"), inputs=None, outputs=status_box)

demo.launch()


  chatbot = gr.Chatbot(label="Chat with your docs")


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://c671098d8a9975f074.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


