<a href="https://colab.research.google.com/github/SatadruNoob/BOLT-RAG-PIPELINE/blob/main/WORKING_OCR_LANGCHAIN_PERSISTENT_CHROMA_DB_VECTOR_STORE_EMBEDDINGS_V1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
############## CLEAN INSTALLATION ################
# Install Python packages
%pip install --quiet --upgrade \
    langchain \
    langchain-community \
    langchain-text-splitters \
    langgraph \
    langchain[mistralai] \
    langchain-huggingface \
    langchain-chroma \
    pypdf \
    chromadb \
    PyPDF2 \
    pytesseract \
    pdf2image

# Install system packages
!apt-get update -qq
!apt-get install -y tesseract-ocr poppler-utils

RESTART RESTART RESTART RESTART RESTART RESTART RESTART RESTART RESTART RESTART RESTART RESTART

In [None]:
############## CLEAN IMPORTS ################
# Core Python Libraries
import os
import time
from typing import List, Literal, Optional, Dict  # <- List from typing
from typing_extensions import TypedDict, List as TEList # <- Also List from typing_extensions
from typing_extensions import TypedDict, Annotated, List

# Colab-Specific
from google.colab import drive, userdata
from IPython.display import Image, display

# PDF & OCR
import pytesseract
pytesseract.pytesseract.tesseract_cmd = "/usr/bin/tesseract"

from pypdf import PdfReader

# LangChain & LangGraph
import langchain
import langchain_community
import langgraph  # <- Explicitly included
print("LangChain:", langchain.__version__)

# LangChain Core
from langchain import hub
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate
from langchain_core.vectorstores import InMemoryVectorStore

# LangChain Community & Tools
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter

# LangGraph Components
from langgraph.graph import StateGraph, START

# Embeddings
from langchain_huggingface import HuggingFaceEmbeddings

# Chroma Configuration
from chromadb.config import Settings

LangChain: 0.3.25


In [None]:
########## Environment Setup: LangChain + Mistral + LangSmith (Colab RAG Pipeline) ##########
# Set LangSmith environment variables
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = userdata.get('LANGSMITH_API_KEY')
print("LangSmith environment variables set successfully!")

# Set Mistral API key from Colab secrets
os.environ["MISTRAL_API_KEY"] = userdata.get('MISTRAL_API_KEY')

# Initialize the Mistral chat model
from langchain.chat_models import init_chat_model
llm = init_chat_model("mistral-large-latest", model_provider="mistralai")
print("Mistral AI LLM initialized successfully!")

LangSmith environment variables set successfully!
Mistral AI LLM initialized successfully!


In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


################################################################################

In [None]:
%%writefile ocr_update.py
#### OCR ENABLED TEXT EXTRACTION FROM PDF ########
#########
import os
import hashlib
import pytesseract
import re
from PyPDF2 import PdfReader
from pdf2image import convert_from_path
from langchain.schema import Document
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
import chromadb
from IPython.display import display, Markdown


def sanitize_collection_name(name: str) -> str:
    name = name.replace(" ", "_")
    name = re.sub(r'[^a-zA-Z0-9_\-]', '', name)
    return name[:63]

def compute_hash(content: str) -> str:
    return hashlib.sha256(content.encode("utf-8")).hexdigest()

def extract_text_from_pdf_ocr(pdf_path):
    try:
        images = convert_from_path(pdf_path)
        text = ""
        for image in images:
            text += pytesseract.image_to_string(image)
        return text
    except Exception as e:
        print(f"❌ OCR failed for {pdf_path}: {str(e)}")
        return ""

def get_existing_hashes(chroma_store):
    try:
        results = chroma_store._collection.get(include=["metadatas"])
        return {md["content_hash"] for md in results.get("metadatas", []) if md and "content_hash" in md}
    except Exception as e:
        print(f"⚠️ Failed to get existing hashes: {str(e)}")
        return set()

def ocr_and_update_chroma(doc_dir, persist_dir):
    collection_name = sanitize_collection_name(os.path.basename(doc_dir))
    print(f"🗂 Using collection name: `{collection_name}`")
    print("🔍 Initializing embedding and vector store...")

    embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
    chroma_store = Chroma(
        client=chromadb.PersistentClient(path=persist_dir),
        collection_name=collection_name,
        embedding_function=embeddings,
        persist_directory=persist_dir
    )

    existing_hashes = get_existing_hashes(chroma_store)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
    new_chunks = []
    preview_shown = False

    print("🔎 Scanning and applying OCR to all PDFs...")

    for i, file in enumerate(sorted(os.listdir(doc_dir)), 1):
        path = os.path.join(doc_dir, file)
        if file.lower().endswith(".pdf"):
            print(f"▶️ [{i}] Processing: {file}")
            text = extract_text_from_pdf_ocr(path)

            if not text.strip():
                print("   ⚠️ OCR returned empty text.")
                continue

            chunks = text_splitter.split_documents([Document(page_content=text, metadata={})])
            filtered_chunks = []

            for chunk in chunks:
                chunk_hash = compute_hash(chunk.page_content)
                if chunk_hash not in existing_hashes:
                    chunk.metadata = {
                        "source": path,
                        "file_name": file,
                        "content_hash": chunk_hash,
                        "section": "ocr_recovered"
                    }
                    filtered_chunks.append(chunk)

            if filtered_chunks:
                print(f"   ➕ {len(filtered_chunks)} new chunks to add.")
                new_chunks.extend(filtered_chunks)

                if not preview_shown:
                    print("\n🔍 OCR Preview:")
                    display(Markdown(f"**File:** `{file}`"))
                    display(Markdown(f"**Metadata:** `{filtered_chunks[0].metadata}`"))
                    print(filtered_chunks[0].page_content[:1000])
                    preview_shown = True
            else:
                print("   ℹ️ All OCR chunks already exist in Chroma.")

    if new_chunks:
        print(f"\n🚀 Adding {len(new_chunks)} new OCR-recovered chunks...")
        chroma_store.add_documents(new_chunks)
        chroma_store.persist()
        print("✅ Chroma DB updated successfully!")
    else:
        print("✅ No new OCR chunks to add.")


Overwriting ocr_update.py


In [None]:
####################################################################
# 🌟🌟🌟🌟🌟 Optimized Vector Store Script with Enhanced Process Messaging 🌟🌟🌟🌟🌟
####################################################################
#                                                                  #
# This script efficiently manages document storage and updates in #
# a Chroma vector store. It handles the following tasks:           #
#                                                                  #
# - 📥 Load documents from multiple formats (text, PDF)            #
# - 🔑 Compute unique content hashes to prevent duplication       #
# - ✂️ Split documents into smaller chunks for efficient storage  #
# - 🔄 Update Chroma vector store with new documents               #
# - ⚡ Create or update a Chroma vector store in an existing DB    #
#                                                                  #
# With this script, you’ll have an optimized and scalable          #
# solution for managing and searching your document vectors.       #
###########################################1#########################
# 💻 Enjoy smooth vector store operations and seamless document    #@
# management! 🌐                                                  #
####################################################################

# Optimized Vector Store Script with Enhanced Process Messaging
import os
import re
import hashlib
import chromadb
from langchain.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from ocr_update import ocr_and_update_chroma  # Importing the second script function


# Add this flag at the top of your script
metadata_created = False

def compute_hash(content: str) -> str:
    """Compute SHA-256 hash of the document content."""
    #...... Computing hash for document content
    return hashlib.sha256(content.encode('utf-8')).hexdigest()

def get_existing_hashes(chroma_store) -> set:
    existing_hashes = set()
    try:
        results = chroma_store._collection.get(include=["metadatas"])
        metadatas = results.get("metadatas", [])
        for metadata in metadatas:
            if metadata and "content_hash" in metadata:
                existing_hashes.add(metadata["content_hash"])
    except Exception as e:
        print(f"⚠️ Error retrieving existing hashes from Chroma: {str(e)}")
    return existing_hashes

def extract_clean_metadata(raw_metadata, file_path):
    cleaned = {
        "source": file_path,
        "file_name": os.path.basename(file_path)
    }
    if raw_metadata:
        for key, value in raw_metadata.items():
            # Only include serializable and clean fields
            if isinstance(key, str) and isinstance(value, (str, int, float)):
                cleaned[key] = str(value)
    return cleaned

def load_documents_from_drive(docs_path: str):
    """Load documents with clean, reliable PDF metadata handling"""
    docs = []
    for root, _, files in os.walk(docs_path):
        for file in files:
            file_path = os.path.join(root, file)
            if file.endswith(".pdf"):
                try:
                    raw_reader = PdfReader(file_path)
                    pdf_metadata = extract_clean_metadata(raw_reader.metadata, file_path)

                    for page_num, page in enumerate(raw_reader.pages):
                        text = page.extract_text()
                        if text and text.strip():  # Avoid empty pages
                            docs.append(Document(
                                page_content=text,
                                metadata={**pdf_metadata, "page": page_num}
                            ))
                except Exception as e:
                    print(f"⚠️ Error loading {file_path}: {str(e)}")
            elif file.endswith(".txt"):
                loader = TextLoader(file_path)
                docs.extend(loader.load())
    return docs

def split_and_prepare_documents(docs, chunk_size: int = 1000, chunk_overlap: int = 200):
    """Split documents into smaller chunks for Chroma."""
    #...... Initializing document splitter (chunk_size={chunk_size}, overlap={chunk_overlap})
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    #...... Splitting {len(docs)} documents into chunks
    all_splits = text_splitter.split_documents(docs)
    return all_splits

def initialize_embeddings():
    """Initialize HuggingFace Embeddings."""
    #...... Loading sentence-transformers/all-mpnet-base-v2 model
    print("🔧 Initializing embedding model...")
    embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
    print("✅ Embedding model loaded.")
    return embeddings

def connect_to_chroma(persist_dir):
    """Connect to a Chroma PersistentClient."""
    #...... Attempting connection to Chroma at {persist_dir}
    print("🔌 Connecting to Chroma DB...")
    try:
        client = chromadb.PersistentClient(path=persist_dir)
        collections_info = client.list_collections()
        collection_names = [col if isinstance(col, str) else col["name"] for col in collections_info]
        print(f"📚 Found {len(collection_names)} existing collections: {collection_names}")
        return client, collection_names
    except Exception as e:
        print(f"⚠️ Error connecting to Chroma: {str(e)}")
        return None, []

def load_existing_chroma_store(persist_dir, embeddings, collection_name):
    """Load existing Chroma vector store."""
    #...... Loading collection '{collection_name}' with embeddings
    print(f"🔄 Loading existing Chroma collection: {collection_name}...")
    return Chroma(
        client=chromadb.PersistentClient(path=persist_dir),
        collection_name=collection_name,
        embedding_function=embeddings,
        persist_directory=persist_dir
    )

def create_new_vector_store(embeddings, persist_dir: str, docs_path: str, collection_name: str):
    """Create a brand new Chroma vector store."""
    #...... Starting new vector store creation process
    print("📥 Loading documents from drive...")
    docs = load_documents_from_drive(docs_path)
    print(f"📄 Loaded {len(docs)} documents.")

    print("✂️ Splitting documents into chunks...")
    all_splits = split_and_prepare_documents(docs)
    print(f"🧩 Split into {len(all_splits)} chunks.")

    #...... Adding metadata (hashes) to all chunks
    for doc in all_splits:
        doc.metadata["content_hash"] = compute_hash(doc.page_content)
        doc.metadata["section"] = "all_sections"
        # Check if metadata is successfully created
        if doc.metadata:
          metadata_created = True  # Set the flag to True if metadata is created

    print("🛠️ Creating new Chroma DB...")
    chroma_store = Chroma.from_documents(
        documents=all_splits,
        embedding=embeddings,
        collection_name=collection_name,
        persist_directory=persist_dir
    )
    print("✅ New Chroma DB created and saved successfully!")
    return chroma_store

def update_vectorstore(
    chroma_store: Chroma,
    new_docs_path: str,
    chunk_size: int = 1000,
    chunk_overlap: int = 200
):
    """Update the Chroma vector store with new documents."""
    #...... Starting vector store update process
    print("🔄 Pulling existing content hashes from Chroma...")
    existing_hashes = get_existing_hashes(chroma_store)
    print(f"📊 Found {len(existing_hashes)} existing hashes.")

    print(f"📥 Loading new documents from: {new_docs_path}")
    raw_docs = load_documents_from_drive(new_docs_path)
    print(f"📄 Loaded {len(raw_docs)} raw documents.")

    if not raw_docs:
        print("⚠️ No documents found to process.")
        return chroma_store

    #...... Splitting documents into chunks first
    print("✂️ Splitting documents into chunks...")
    split_docs = split_and_prepare_documents(raw_docs, chunk_size, chunk_overlap)
    print(f"🧩 Split into {len(split_docs)} chunks.")

    #...... Preparing and filtering chunks
    unique_chunks = []
    for chunk in split_docs:
        chunk_hash = compute_hash(chunk.page_content)
        chunk.metadata["content_hash"] = chunk_hash
        chunk.metadata["section"] = "all_sections"

        if chunk_hash not in existing_hashes:
            unique_chunks.append(chunk)

    print(f"✅ Found {len(unique_chunks)} new unique chunks to add.")

    if not unique_chunks:
        print("⚠️ No new unique chunks found. Vector store is already up-to-date.")
        return chroma_store

    print("➕ Adding new unique chunks to Chroma...")
    chroma_store.add_documents(unique_chunks)
    chroma_store.persist()
    print(f"🎉 Successfully added {len(unique_chunks)} new chunks and persisted.")

    return chroma_store

def initialize_or_update_vector_store(persist_dir: str, docs_path: str, collection_name: str = None):
    """Main function to initialize or update the vector store."""
    #...... Starting vector store initialization

    # ✅ Insert sanitize_collection_name here
    def sanitize_collection_name(name: str) -> str:
        """Sanitize the folder name to a valid Chroma collection name."""
        name = name.replace(" ", "_")
        name = re.sub(r'[^a-zA-Z0-9_\-]', '', name)
        name = name.strip("_-")
        name = name[:63]
        return name

    if collection_name is None:
        collection_name = os.path.basename(docs_path)
        collection_name = sanitize_collection_name(collection_name)
        print(f"🗂️  Using dynamic collection name: {collection_name}")

    #...... Initializing embedding model
    embeddings = initialize_embeddings()

    #...... Connecting to Chroma DB
    client, collections = connect_to_chroma(persist_dir)

    if collections:
        if collection_name in collections:
            #...... Existing collection workflow
            print(f"📦 Found existing collection: {collection_name}")
            chroma_store = load_existing_chroma_store(persist_dir, embeddings, collection_name)
            print("🔄 Updating Chroma vector store...")
            update_vectorstore(chroma_store, docs_path)
            return chroma_store
        else:
            #...... New collection in existing DB
            print(f"⚡ No matching collection found. Creating new collection: {collection_name}")
            chroma_store = create_new_vector_store(embeddings, persist_dir, docs_path, collection_name)
            return chroma_store

    if os.path.exists(persist_dir):
        #...... New collection in existing directory
        print(f"⚡ Chroma DB found at {persist_dir} but no collection exists.")
        chroma_store = create_new_vector_store(embeddings, persist_dir, docs_path, collection_name)
        return chroma_store

    #...... Fresh installation workflow
    print("❌ No existing Chroma DB found. Creating new vector store...")
    chroma_store = create_new_vector_store(embeddings, persist_dir, docs_path, collection_name)
    return chroma_store

if __name__ == "__main__":
    #...... Starting script execution
    persist_dir = "/content/drive/MyDrive"
    docs_path = "/content/drive/MyDrive/TEESTA PROJECT"

    #...... Initializing/updating vector store
    print("🚀 Starting vector store initialization/update process")
    chroma_store = initialize_or_update_vector_store(persist_dir, docs_path)
    print("🌈 Vector store operation completed successfully!")

    # After all processing in the first script, add this check:
if not metadata_created:
    print("❌ No metadata found. Executing OCR processing script...")
    # Now, call the OCR script from the second script
    ocr_and_update_chroma(docs_path, persist_dir)

🚀 Starting vector store initialization/update process
🗂️  Using dynamic collection name: TEESTA_PROJECT
🔧 Initializing embedding model...
✅ Embedding model loaded.
🔌 Connecting to Chroma DB...
📚 Found 13 existing collections: ['Health_Insurance_Young_Star_Insurance', 'Sample_PDF_1', 'Resume', 'IRDA', 'Adani', 'Care_Supreme', 'Tender', 'Sample_PDF_2', 'TEESTA_PROJECT', '2023-24___Annual_Report_2023-24-199-392_compressedpdf', 'publication_battery220_0001_removed_10_03pdf', 'BTS600', 'IS1652']
📦 Found existing collection: TEESTA_PROJECT
🔄 Loading existing Chroma collection: TEESTA_PROJECT...
🔄 Updating Chroma vector store...
🔄 Pulling existing content hashes from Chroma...
📊 Found 45 existing hashes.
📥 Loading new documents from: /content/drive/MyDrive/TEESTA PROJECT
📄 Loaded 0 raw documents.
⚠️ No documents found to process.
🌈 Vector store operation completed successfully!
❌ No metadata found. Executing OCR processing script...
🗂 Using collection name: `TEESTA_PROJECT`
🔍 Initializing emb

In [None]:
####################################################################################################
# 🌐🔍 Unified Search Across All Chroma Collections with Enhanced Query Results 🎯                #
####################################################################################################
#                                                                                                #
# This script allows you to perform a powerful and efficient search across multiple collections    #
# stored in Chroma, without loading all documents into memory. The process is designed to:         #
#                                                                                                #
# - 🧠 Search across multiple Chroma collections using similarity-based queries.                   #
# - ⚙️ Filter and rank results based on a specific section or general search parameters.           #
# - 🔍 Retrieve and merge the top-k most relevant results based on similarity score.               #
# - 🔄 Process each collection individually to avoid memory overload and optimize performance.      #
#                                                                                                #
# Perfect for scenarios where you need to search through large datasets and retrieve only the      #
# most relevant documents efficiently.                                                             #
#                                                                                                #
# Ready to unlock the full potential of your Chroma-based vector store? Let's dive into search!     #
####################################################################################################




from langchain.vectorstores import Chroma
import chromadb

def unified_search(client, persist_dir, embeddings, query: str, k=10, filter_section="all_sections"):
    """Search across all collections without loading everything into memory."""

    collection_names = client.list_collections()
    print("📦 Available collections:")
    for name in collection_names:
      print(f" - {name if isinstance(name, str) else name.get('name')}")

    all_results = []

    print(f"🔍 Searching across {len(collection_names)} collections...")

    for collection_name in collection_names:
        if isinstance(collection_name, dict):
            collection_name = collection_name.get("name")

        print(f"➡️ Searching in collection: {collection_name}")

        chroma_store = Chroma(
            client=client,
            collection_name=collection_name,
            embedding_function=embeddings,
            persist_directory=persist_dir
        )

        try:
            results = chroma_store.similarity_search_with_score(
                query,
                k=k,
                filter= None
            )
            all_results.extend(results)
        except Exception as e:
            print(f"⚠️ Error searching collection {collection_name}: {str(e)}")
            continue

    print(f"🔎 Total retrieved across all collections: {len(all_results)}")

    # Sort all results by similarity score (lower score = more similar)
    all_results.sort(key=lambda x: x[1])

    # Take top k
    top_results = all_results[:k]

    retrieved_docs = [doc for doc, score in top_results]

    print(f"✅ Found {len(retrieved_docs)} total documents after merging.")
    return retrieved_docs

In [None]:
####_________________________________________#####
# 🌍 Unified Search Across All Chroma Collections 🚀
# ✨ Effortless Querying, One Search to Rule Them All 🔍
# 🔥 Fast, Smart, and Scalable Search Across Data 🧠
####_________________________________________#####



#This RAG pipeline uses the Chroma DB store for faster retrieval

# Define the RAG pipeline components (unchanged from your original)
class Search(TypedDict):
    """Search query."""
    query: Annotated[str, ..., "Search query to run."]
    section: Annotated[
        Literal["all_sections"],
        ...,
        "Section to query.",
    ]

prompt = hub.pull("rlm/rag-prompt")

class State(TypedDict):
    question: str
    query: Search
    context: List[Document]
    answer: str

# Modified RAG functions to work with dual stores
def analyze_query(state: State):
    structured_llm = llm.with_structured_output(Search)
    query = structured_llm.invoke(state["question"])
    return {"query": query}

def retrieve(state: State):
    query = state["query"]
    retrieved_docs = unified_search(
        client,
        persist_dir,
        embeddings,
        query=query["query"],
        k=4,
        filter_section=query["section"]
    )
    return {"context": retrieved_docs}


def generate(state: State):
    time.sleep(1)  # Rate limit protection
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    messages = prompt.invoke({"question": state["question"], "context": docs_content})
    response = llm.invoke(messages)
    return {"answer": response.content}

# Build the execution graph
graph_builder = StateGraph(State).add_sequence([analyze_query, retrieve, generate])
graph_builder.add_edge(START, "analyze_query")
graph = graph_builder.compile()

In [None]:
persist_dir = "/content/drive/MyDrive"
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
client = chromadb.PersistentClient(path=persist_dir)

In [None]:
# RAG pipeline remains unchanged
def ask_question(question: str):
    result = graph.invoke({"question": question})
    return result["answer"]

# Usage example
if __name__ == "__main__":
    # No initialization here - just usage
    print(ask_question("What are the specific requirements for bidders, including documentation and minimum average annual turnover for 220V 300Ah PLANTE Stationery battery?"))
    #print(ask_question("What are the exclusions?"))

📦 Available collections:
 - Health_Insurance_Young_Star_Insurance
 - Sample_PDF_1
 - Resume
 - IRDA
 - Adani
 - Care_Supreme
 - Tender
 - Sample_PDF_2
 - TEESTA_PROJECT
 - 2023-24___Annual_Report_2023-24-199-392_compressedpdf
 - publication_battery220_0001_removed_10_03pdf
 - BTS600
 - IS1652
🔍 Searching across 13 collections...
➡️ Searching in collection: Health_Insurance_Young_Star_Insurance
➡️ Searching in collection: Sample_PDF_1




➡️ Searching in collection: Resume
➡️ Searching in collection: IRDA
➡️ Searching in collection: Adani
➡️ Searching in collection: Care_Supreme
➡️ Searching in collection: Tender
➡️ Searching in collection: Sample_PDF_2
➡️ Searching in collection: TEESTA_PROJECT
➡️ Searching in collection: 2023-24___Annual_Report_2023-24-199-392_compressedpdf
➡️ Searching in collection: publication_battery220_0001_removed_10_03pdf
➡️ Searching in collection: BTS600
➡️ Searching in collection: IS1652
🔎 Total retrieved across all collections: 41
✅ Found 4 total documents after merging.
Bidders must have a Minimum Average Annual Turnover (MAAT) of Rs. 12,00,000 for the best year out of the last five financial years. Required documents include a valid GST Registration certificate, GSTR3B or Annual return-GSTR9 for the last two years, PAN Card, and audited annual accounts or IT returns for the last three years. Dealers must provide documentary evidence from the OEM to meet certain criteria.


# New Section