<a href="https://colab.research.google.com/github/anirbanghoshsbi/.github.io/blob/master/NLP_Text_Modelling/Text_modelling_Faiss.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Common
!pip install tqdm --q

# Local embeddings + FAISS
!pip install sentence-transformers faiss-cpu --q


# (Optional) For nicer docs handling
!pip install langchain  --q


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m64.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [4]:
"""
rag_ingest.py

Reads a JSON array from 'ragtext.txt' (the format provided in the conversation),
prepares documents, and builds a vector store usable in a RAG pipeline.

Supports two backends:
 - 'faiss_local' : sentence-transformers + faiss (no cloud keys)
 - 'openai_chroma': OpenAI embeddings + chroma (requires OPENAI_API_KEY in env)

Usage:
    python rag_ingest.py
"""

import json
import os
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
from tqdm import tqdm

# Optional: If you want to create LangChain Document objects later, import:
# from langchain.schema import Document   # Uncomment if using langchain

# -------------------------
# Data classes / helpers
# -------------------------
@dataclass
class ChunkDoc:
    id: str
    theme: str
    suggested_size: str
    text: str
    metadata: Dict[str, Any]

def load_json_chunks(path: str) -> List[ChunkDoc]:
    """Load the JSON array from the given file and convert to ChunkDoc list."""
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    docs: List[ChunkDoc] = []
    for i, item in enumerate(data):
        chunk_id = item.get("Chunk #", f"chunk_{i+1}")
        theme = item.get("Theme/Topic", "")
        size = item.get("Suggested Chunk Size (Approx.)", "")
        content = item.get("Chunk Content", "")
        # Build metadata; include source if you need later
        metadata = {
            "theme": theme,
            "suggested_size": size,
            "chunk_number": chunk_id
        }
        docs.append(ChunkDoc(id=str(chunk_id), theme=theme, suggested_size=size, text=content, metadata=metadata))
    return docs

# -------------------------
# Backend A: sentence-transformers + FAISS (local)
# -------------------------
def build_faiss_local(docs: List[ChunkDoc], model_name: str = "all-MiniLM-L6-v2", save_path: str = "faiss_index") -> Tuple[Any, Any]:
    """
    Create embeddings with sentence-transformers and index them with FAISS.
    Returns (index, encoder_model). Saves index and doc-mapping to disk.
    """
    try:
        from sentence_transformers import SentenceTransformer
        import faiss
    except ImportError:
        raise RuntimeError("Install sentence-transformers and faiss-cpu (pip install sentence-transformers faiss-cpu)")

    model = SentenceTransformer(model_name)
    texts = [d.text for d in docs]
    print("Creating embeddings with", model_name)
    embeddings = model.encode(texts, show_progress_bar=True, convert_to_numpy=True)

    dim = embeddings.shape[1]
    index = faiss.IndexFlatIP(dim)  # inner product (cosine if normalized)
    # normalize to get cosine similarity behavior
    faiss.normalize_L2(embeddings)
    index.add(embeddings)

    # Save index and metadata
    os.makedirs(save_path, exist_ok=True)
    faiss.write_index(index, os.path.join(save_path, "index.faiss"))

    # Save doc metadata and texts for retrieval mapping
    mapping = [{"id": d.id, "metadata": d.metadata, "text": d.text} for d in docs]
    with open(os.path.join(save_path, "mapping.json"), "w", encoding="utf-8") as f:
        json.dump(mapping, f, ensure_ascii=False, indent=2)

    print(f"FAISS index + mapping saved to {save_path}")
    return index, model

def query_faiss(index, encoder_model, docs_path: str, query: str, top_k: int = 3) -> List[Dict[str, Any]]:
    """Query the FAISS index and return top_k docs with metadata and scores."""
    import faiss
    # load mapping
    mapping_file = os.path.join(docs_path, "mapping.json")
    with open(mapping_file, "r", encoding="utf-8") as f:
        mapping = json.load(f)

    q_emb = encoder_model.encode([query], convert_to_numpy=True)
    faiss.normalize_L2(q_emb)
    distances, indices = index.search(q_emb, top_k)
    results = []
    for score, idx in zip(distances[0], indices[0]):
        if idx < 0 or idx >= len(mapping):
            continue
        entry = mapping[idx]
        results.append({"score": float(score), "id": entry["id"], "metadata": entry["metadata"], "text": entry["text"]})
    return results

# -------------------------
# Backend B: OpenAI embeddings + Chroma
# -------------------------
def build_chroma_openai(docs: List[ChunkDoc], persist_dir: str = "chroma_db", openai_api_key: str = None, embed_model_name: str = "text-embedding-3-small") -> Any:
    """
    Build a Chroma DB with OpenAI embeddings.
    Requires environment variable OPENAI_API_KEY or pass key as argument.
    """
    try:
        import chromadb
        from chromadb.utils import embedding_functions
    except ImportError:
        raise RuntimeError("Install chromadb and openai (pip install chromadb openai)")

    if openai_api_key is None:
        openai_api_key = os.environ.get("OPENAI_API_KEY")
    if not openai_api_key:
        raise RuntimeError("OpenAI API key not found. Set OPENAI_API_KEY env var or pass it to the function.")

    client = chromadb.Client()
    # Use the OpenAI embedding function wrapped for chroma
    openai_ef = embedding_functions.OpenAIEmbeddingFunction(api_key=openai_api_key, model_name=embed_model_name)

    # Create or get collection
    collection = client.create_collection(name="rag_chunks", embedding_function=openai_ef)

    ids = [d.id for d in docs]
    metadatas = [d.metadata for d in docs]
    texts = [d.text for d in docs]

    # upsert data
    collection.add(ids=ids, metadatas=metadatas, documents=texts)
    print(f"Chroma collection 'rag_chunks' created/populated (persist_dir not used in in-memory client).")
    return collection

def query_chroma(collection, query: str, top_k: int = 3):
    results = collection.query(query_texts=[query], n_results=top_k)
    # results is a dict with 'documents', 'metadatas', 'distances', 'ids'
    out = []
    docs = results.get("documents", [[]])[0]
    metas = results.get("metadatas", [[]])[0]
    dists = results.get("distances", [[]])[0]
    ids = results.get("ids", [[]])[0]
    for i in range(len(docs)):
        out.append({"id": ids[i], "text": docs[i], "metadata": metas[i], "score": dists[i]})
    return out

# -------------------------
# Utilities
# -------------------------
def prepare_docs_for_rag(docs: List[ChunkDoc]) -> List[Dict[str, Any]]:
    """Return a simple list of dicts (id, text, metadata) ready for any RAG/vector store pipeline."""
    return [{"id": d.id, "text": d.text, "metadata": d.metadata} for d in docs]

# -------------------------
# Example run
# -------------------------
if __name__ == "__main__":
    RAG_FILE = "ragtext.txt"   # file that contains the JSON array
    if not os.path.exists(RAG_FILE):
        raise FileNotFoundError(f"{RAG_FILE} not found. Put your JSON array in this file.")

    docs = load_json_chunks(RAG_FILE)
    print(f"Loaded {len(docs)} chunks from {RAG_FILE}")

    # Choose backend: 'faiss_local' or 'openai_chroma'
    VSTORE_BACKEND = os.environ.get("VSTORE_BACKEND", "faiss_local")

    if VSTORE_BACKEND == "faiss_local":
        index, encoder = build_faiss_local(docs, save_path="faiss_index")
        # Demonstrate a query
        q = "Why no one is crazy in this world?"
        print("Query:", q)
        results = query_faiss(index, encoder, docs_path="faiss_index", query=q, top_k=4)
        for r in results:
            print("SCORE:", r["score"], "ID:", r["id"], "THEME:", r["metadata"].get("theme"))
            print("->", r["text"].replace("\n", " "), "...\n")
            # Save full result (score, id, theme, text) into a .txt file
            # Build the combined text for this result
            output_text = (
                f"SCORE: {r['score']}\n"
                f"ID: {r['id']}\n"
                f"THEME: {r['metadata'].get('theme')}\n\n"
                f"TEXT:\n{r['text']}\n"
                "------------------------------------------------------------\n\n"
            )

            with open(f"retrieved_rag.txt", "a", encoding="utf-8") as out:
                  out.write(output_text)
    elif VSTORE_BACKEND == "openai_chroma":
        OPENAI_KEY = os.environ.get("OPENAI_API_KEY")
        collection = build_chroma_openai(docs, openai_api_key=OPENAI_KEY)
        q = "How new are index funds and 401(k)s?"
        print("Query:", q)
        results = query_chroma(collection, q, top_k=4)
        for r in results:
            print("ID:", r["id"], "SCORE:", r["score"], "THEME:", r["metadata"].get("theme"))
            print("->", r["text"].replace("\n", " "), "...\n")

    else:
        raise RuntimeError(f"Unknown VSTORE_BACKEND: {VSTORE_BACKEND}")


Loaded 18 chunks from ragtext.txt
Creating embeddings with all-MiniLM-L6-v2


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

FAISS index + mapping saved to faiss_index
Query: Why no one is crazy in this world?
SCORE: 0.42439204454421997 ID: 1 THEME: Introduction: The Anchor of Personal Experience
-> Let me tell you about a problem. It might make you feel better about what you do with your money, and less judgmental about what other people do with theirs. People do some crazy things with money. But no one is crazy. Here’s the thing: People from different generations, raised by different parents who earned different incomes and held different values, in different parts of the world, born into different economies, experiencing different job markets with different incentives and different degrees of luck, learn very different lessons. Everyone has their own unique experience with how the world works. And what you’ve experienced is more compelling than what you learn second-hand. So all of us—you, me, everyone—go through life anchored to a set of views about how money works that vary wildly from person to person.