### Data Ingestion (Handel different document types)

In [1]:
# Import dfferent loaders
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader, WebBaseLoader, UnstructuredMarkdownLoader

def load_document(document_path):
    """
    Loads a document from a given path or URL using the appropriate Langchain loader.
    Args:
        document_path (str): The path to the document file or a URL.
    Returns:
        list: A list of loaded documents.
    Raises:
        ValueError: If the document type is unsupported or the path is invalid.
    """
    print(f"Attempting to load document from: {document_path}")
    try:
        if document_path.startswith(('http://', 'https://')):
            loader = WebBaseLoader(document_path)
        elif document_path.endswith('.pdf'):
            loader = PyPDFLoader(document_path)
        elif document_path.endswith(('.docx', '.doc')):
            loader = Docx2txtLoader(document_path)
        elif document_path.endswith('.txt'):
            loader = TextLoader(document_path)
        elif document_path.endswith('.md'):
            loader = UnstructuredMarkdownLoader(document_path)
        else:
            raise ValueError(f"Unsupported document type: {document_path}. Please provide a PDF, DOCX, TXT file, .MD file or a URL.")

        document = loader.load()
        print(f"Successfully loaded {len(document)} pages/parts from {document_path}")
        return document
    except Exception as e:
        print(f"Error loading document {document_path}: {e}")
        raise ValueError(f"Could not load document {document_path}. Error: {e}")


  from .autonotebook import tqdm as notebook_tqdm
USER_AGENT environment variable not set, consider setting it to identify your requests.


In [2]:
from urllib.parse import urlparse
import os

def extract_document_info(document, original_path):
    """
    Extracts text, structure, and enhanced metadata from loaded Langchain document.
    Adds 'doc_type' and ensures 'source', 'page', and 'section' are present.
    Args:
        document (list): A list of Langchain Document objects.
        original_path (str): The original path or URL used to load the document.
    Returns:
        list: A list of dictionaries, each containing extracted info for a document part (page).
    """
    all_extracted_data = [] # Initialize an empty list to store dictionaries for each document part

    # Determine document type once based on the original_path
    doc_type = "unknown"
    if original_path.startswith(('http://', 'https://')):
        doc_type = "web"
    else:
        _, ext = os.path.splitext(original_path)
        if ext:
            doc_type = ext.lstrip('.').lower()
        if doc_type == 'doc': # Handle .doc being treated as docx
            doc_type = 'docx'

    for i, doc in enumerate(document):
        # Extract core content for the current doc
        text_content = doc.page_content
        metadata = {}   # Initialize an empty metadata

        # Create a new metadata dictionary for the current doc, initialized with its own metadata
        current_doc_info = doc.metadata.copy()
        # Add the determined doc_type to this metadata
        metadata['doc_type'] = doc_type
        # Ensure source, page, and section are present (or default)
        # For 'source', prefer existing source from metadata, otherwise use original_path
        metadata['source'] = current_doc_info.get('source', original_path)
        # For 'page', prefer existing page from metadata, otherwise use index + 1
        metadata['page'] = current_doc_info.get('page', i) + 1
        # For 'section', use existing section from metadata, otherwise 'N/A'
        metadata['section'] = current_doc_info.get('section', 'N/A')

        # Create a dictionary for the current document part and append it to the list
        all_extracted_data.append({
            'text': text_content,
            'metadata': metadata
        })

    return all_extracted_data # Return the list of all extracted data

### Data Cleaning and Normalization (doc. type specific structured cleaning and generalized text normalizer)

In [3]:
# Function for text-normalization
def normalize_text(text: str) -> str:
    """
    General-purpose text normalization:
    - unify line breaks
    - collapse excessive spaces
    - collapse many blank lines into single paragraph breaks
    - trim leading/trailing whitespace

    This should be applied AFTER structure-specific cleaning.
    """
    # Normalize Windows-style newlines
    text = text.replace("\r\n", "\n").replace("\r", "\n")
    # Collapse tabs into spaces
    text = re.sub(r"[ \t]+", " ", text)
    # Collapse 3+ blank lines into just 2 (paragraph separation)
    text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text)
    # Remove extra spaces around newlines
    text = re.sub(r"[ \t]*\n[ \t]*", "\n", text)
    # Finally, collapse multiple spaces again (in case we introduced any)
    text = re.sub(r" {2,}", " ", text)
    # Strip leading/trailing whitespace
    text = text.strip()

    return text

In [4]:
import re
from bs4 import BeautifulSoup

def clean_document_structure(extracted_doc_list):
    """
    Cleans the document text based on its document type.
    Args:
        extracted_doc_list (list): A list of dictionaries, each containing 'text' and 'metadata'.
                                   'metadata' must contain 'doc_type'.
    Returns:
        list: An updated list of dictionaries with cleaned text for each document.
    """
    cleaned_document_list = []

    for extracted_doc_dict in extracted_doc_list:
        if 'text' not in extracted_doc_dict or 'metadata' not in extracted_doc_dict:
            raise ValueError("Each input dictionary must contain 'text' and 'metadata' keys.")
        if 'doc_type' not in extracted_doc_dict['metadata']:
            raise ValueError("Metadata must contain 'doc_type' key.")

        raw_text = extracted_doc_dict['text']
        doc_type = extracted_doc_dict['metadata']['doc_type']
        cleaned_text = raw_text # Initialize with raw text, cleaning methods will modify this

        print(f"Cleaning document of type: {doc_type}")
        print(f"Original text length: {len(raw_text)}")

        if doc_type == 'md':
            print("Applying Markdown specific cleaning...")
            # Remove Markdown headers (e.g., # Header 1)
            cleaned_text = re.sub(r'^#+\s*(.*)$', r'\1', cleaned_text, flags=re.MULTILINE)
            # Remove bold and italic formatting
            cleaned_text = re.sub(r'(\S*?)(\*{1,2}|_{1,2})(.*?)\2', r'\1\3', cleaned_text) # Bold/Italic
            # Remove links (display text only)
            cleaned_text = re.sub(r'\[(.*?)\]\(.*\)', r'\1', cleaned_text)
            # Remove inline code blocks
            cleaned_text = re.sub(r'`(.*?)`', r'\1', cleaned_text)
            # Remove blockquotes
            cleaned_text = re.sub(r'^>\s?', '', cleaned_text, flags=re.MULTILINE)
            # Remove list markers
            cleaned_text = re.sub(r'^[\-*+]\s?', '', cleaned_text, flags=re.MULTILINE)
            # Remove emojis
            cleaned_text = re.sub(r'[^\x00-\x7F]+', '', cleaned_text)
        elif doc_type == 'web':
            print("Applying Web specific cleaning with BeautifulSoup...")
            soup = BeautifulSoup(raw_text, 'html.parser')

            # Remove script and style elements
            for script_or_style in soup(['script', 'style']):
                script_or_style.extract() # Remove them from the soup

            # Get text
            cleaned_text = soup.get_text()
        else: # General text (pdf, docx, txt). No specific structural cleaning needed before normalization.
            print(f"No specific structural cleaning for {doc_type}. Applying general text normalization.")
            cleaned_text = raw_text

        # Apply general text normalization as a final step for all document types
        cleaned_text = normalize_text(cleaned_text)

        # Update the text in the current dictionary and append to the new list
        extracted_doc_dict['text'] = cleaned_text
        cleaned_document_list.append(extracted_doc_dict)

    return cleaned_document_list

### Chunking

In [5]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

def structure_aware_splitter(extracted_doc_dict):
    """
    Performs initial splitting based on natural document boundaries.
    Args:
        extracted_doc_dict (dict): A dictionary containing 'text' and 'metadata'.
                                   'metadata' must contain 'doc_type', 'source', 'page', 'section'.
    Returns:
        list: A list of dictionaries, each representing a structurally aware chunk
              with 'text' and updated 'metadata'.
    """
    if 'text' not in extracted_doc_dict or 'metadata' not in extracted_doc_dict:
        raise ValueError("Input dictionary must contain 'text' and 'metadata' keys.")

    raw_text = extracted_doc_dict['text']
    metadata = extracted_doc_dict['metadata']
    doc_type = metadata['doc_type']

    print(f"Applying structure-aware splitting for document type: {doc_type}")

    # Define separators based on document type
    if doc_type == 'txt':
        separators = ['\n\n', '\n', ' ', ''] # Prioritize paragraphs for plain text
    # Add more conditions for other document types if needed (e.g., 'md', 'web')
    # For simplicity, using same separators for now but can be customized later.
    else:
        separators = ['\n\n', '\n', ' ', ''] # Default separators

    # Initialize RecursiveCharacterTextSplitter for initial structural chunks
    # Larger chunk_size and no overlap for initial structural split
    text_splitter = RecursiveCharacterTextSplitter(
        separators=separators,
        chunk_size=2000,  # Larger chunks for initial structural split
        chunk_overlap=0,
        length_function=len, # Character count for initial split
        add_start_index=True
    )

    # Split the document's text
    # The splitter expects a list of Document objects, so create one from the raw text.
    doc_for_splitting = [Document(page_content=raw_text, metadata=metadata)]
    split_documents = text_splitter.split_documents(doc_for_splitting)

    # Format the split documents into the desired dictionary structure
    formatted_chunks = []
    for i, split_doc in enumerate(split_documents):
        chunk_metadata = split_doc.metadata.copy()
        # Update chunk metadata with more specific chunk information if needed
        chunk_metadata['chunk_id'] = i
        formatted_chunks.append({
            'text': split_doc.page_content,
            'metadata': chunk_metadata
        })

    print(f"Original text split into {len(formatted_chunks)} structural chunks.")
    return formatted_chunks

print("Structure-aware splitting function 'structure_aware_splitter' defined.")

Structure-aware splitting function 'structure_aware_splitter' defined.


In [6]:
import tiktoken

def num_tokens_from_string(text: str, model_name: str = "cl100k_base") -> int:
    """
    Returns the number of tokens in a text string.
    """
    try:
        encoding = tiktoken.encoding_for_model(model_name)
    except KeyError:
        # Fallback to a common encoding if model_name is not directly supported
        encoding = tiktoken.get_encoding("cl100k_base")
    num_tokens = len(encoding.encode(text))
    return num_tokens

print("Token counting function 'num_tokens_from_string' defined.")

Token counting function 'num_tokens_from_string' defined.


In [7]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

def length_based_refinement(structural_chunks: list, target_chunk_size: int = 500, chunk_overlap: int = 100) -> list:
    """
    Refines a list of structural chunks by further splitting any chunk exceeding a target
    length using a RecursiveCharacterTextSplitter with token-based length function.

    Args:
        structural_chunks (list): A list of dictionaries, each containing 'text' and 'metadata',
                                  output from structure_aware_splitter.
        target_chunk_size (int): The desired maximum token length for refined chunks.
        chunk_overlap (int): The number of tokens to overlap between sub-chunks.

    Returns:
        list: A list of dictionaries, each representing a refined chunk with 'text' and 'metadata'.
    """
    refined_chunks = []

    print(f"Applying length-based refinement with target_chunk_size={target_chunk_size} and chunk_overlap={chunk_overlap}.")

    for i, structural_chunk in enumerate(structural_chunks):
        text = structural_chunk['text']
        metadata = structural_chunk['metadata'].copy()
        current_chunk_tokens = num_tokens_from_string(text)

        if current_chunk_tokens > target_chunk_size:
            print(f"  Chunk {i} (original tokens: {current_chunk_tokens}) exceeds target. Further splitting...")
            # Create a new splitter for this sub-splitting process
            sub_splitter = RecursiveCharacterTextSplitter(
                chunk_size=target_chunk_size,
                chunk_overlap=chunk_overlap,
                length_function=num_tokens_from_string, # Use token counting
                add_start_index=True
            )
            # Convert the structural chunk into a Document object for the splitter
            doc_to_split = Document(page_content=text, metadata=metadata)
            sub_documents = sub_splitter.split_documents([doc_to_split])

            for j, sub_doc in enumerate(sub_documents):
                sub_chunk_metadata = sub_doc.metadata.copy()
                # Update chunk_id to reflect it's a sub-chunk
                sub_chunk_metadata['chunk_id'] = f"{metadata.get('chunk_id', i)}-{j}"
                refined_chunks.append({
                    'text': sub_doc.page_content,
                    'metadata': sub_chunk_metadata
                })
        else:
            print(f"  Chunk {i} (tokens: {current_chunk_tokens}) is within target. Adding directly.")
            refined_chunks.append(structural_chunk)

    print(f"Total refined chunks after length-based refinement: {len(refined_chunks)}")
    return refined_chunks

print("Length-based refinement function 'length_based_refinement' defined.")

Length-based refinement function 'length_based_refinement' defined.


In [8]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

def chunk_document(cleaned_doc_list: list, target_chunk_size: int = 500, chunk_overlap: int = 100) -> list:
    """
    Combines structure-aware splitting and length-based refinement into a single function
    for processing cleaned documents into final chunks.

    Args:
        cleaned_doc_list (list): A list of dictionaries, each containing 'text' and 'metadata'
                                   from the cleaned documents.
        target_chunk_size (int): The desired maximum token length for refined chunks.
        chunk_overlap (int): The number of tokens to overlap between sub-chunks.

    Returns:
        list: A list of dictionaries, each representing a final chunk with 'text' and 'metadata'.
    """
    print("Starting document chunking process...")
    all_final_chunks = []

    for i, extracted_doc_dict in enumerate(cleaned_doc_list):
        print(f"Processing document {i+1}/{len(cleaned_doc_list)} for chunking...")
        # Step 1: Perform structure-aware splitting
        structural_chunks = structure_aware_splitter(extracted_doc_dict)

        # Step 2: Perform length-based refinement
        refined_chunks = length_based_refinement(
            structural_chunks,
            target_chunk_size=target_chunk_size,
            chunk_overlap=chunk_overlap
        )
        all_final_chunks.extend(refined_chunks)

    print(f"Document chunking process completed. Generated {len(all_final_chunks)} total final chunks from all documents.")
    return all_final_chunks

print("Chunking function 'chunk_document' defined.")

Chunking function 'chunk_document' defined.


### Embedding and Vector Store

In [9]:
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OllamaEmbeddings
from langchain_core.documents import Document


def create_vector_store(documents: list, embedder)-> FAISS:
        """This function create a FAISS vector store and return it.
        Args:
            documents (list): an list of chunk documents (dictionaries with 'text' and 'metadata')

        Raises:
            Exception: return an exception when, fails to initialise the vector store

        Returns:
            FAISS: return an vector store of FAISS
        """
        try:
            # Convert list of dictionaries to list of Document objects
            langchain_documents = []
            for i, doc in enumerate(documents):
                if not isinstance(doc, dict):
                    raise TypeError(f"Expected a dictionary for document item {i}, but got {type(doc)}. Item: {doc}")
                if 'text' not in doc or 'metadata' not in doc:
                    raise ValueError(f"Document item {i} is missing 'text' or 'metadata' key. Item: {doc}")
                langchain_documents.append(Document(page_content=doc['text'], metadata=doc['metadata']))

            vector_store = FAISS.from_documents(langchain_documents, embedder)
            return vector_store
        except Exception as e:
          raise e

### Chunk Retrieval

In [10]:
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain_classic.retrievers.document_compressors import CrossEncoderReranker
from langchain_classic.retrievers import ContextualCompressionRetriever

K_SEARCH = 50        # Initial vector search depth (k1)
K_RERANK = 20        # Chunks to keep after re-ranking (k2)
K_FINAL_CONTEXT = 10 # Final context chunks (k3)

# 1. Load model
cross_encoder = HuggingFaceCrossEncoder(
    model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"
)
# 2. Create reranker
reranker = CrossEncoderReranker(
    model=cross_encoder,
    top_n=K_RERANK
)


In [11]:

# def _initial_vector_search(query: str, vectorstore: FAISS) -> list[Document]:
#         """
#         Step 1: Performs the initial, wide vector similarity search (k1).
#         Handles Edge Case 1 (No candidates found).
#         """
#         # --- 1. Initial Vector Search (Fetches K_SEARCH candidates) ---
#         initial_candidates: list[Document] = vectorstore.similarity_search(
#             query=query, 
#             k=K_SEARCH
#         )

#         # --- EDGE CASE CHECK 1: No candidates found ---
#         if not initial_candidates:
#             print("❌ WARNING: No chunks found in the vector store for the query.")
#             return []
            
#         total_chunks = len(initial_candidates)
#         if total_chunks < K_SEARCH:
#             print(f"⚠️ INFO: Found only {total_chunks} chunks (less than k={K_SEARCH}). Proceeding with all available chunks.")
        
#         return initial_candidates

# def _apply_rerank(self, query: str, vectorstore: FAISS) -> list[Document]:
#         """
#         Step 2: Applies the Cross-Encoder re-ranking to the initial candidates.
#         Returns the top K_RERANK documents.
#         """
#         # We use ContextualCompressionRetriever to apply the reranker.
#         # It internally uses the vectorstore to fetch K_SEARCH documents first.
#         compressor_retriever = ContextualCompressionRetriever(
#             # Using vectorstore.as_retriever ensures the initial search k is respected
#             base_retriever=self.vectorstore.as_retriever(search_kwargs={"k": K_SEARCH}),
#             base_compressor=self.reranker,
#         )
        
#         # Get the top K_RERANK chunks based on the cross-encoder score
#         reranked_chunks: list[Document] = compressor_retriever.get_relevant_documents(query)
        
#         return reranked_chunks

# def get_final_context_chunks(query: str, vectorstore: FAISS) -> list[Document]:
#         """
#         Orchestrates the three-step retrieval process: Search -> Rerank -> MMR.
#         """
        
#         # 1. Initial Vector Search (handled implicitly by reranker's base retriever)
#         # We call the search explicitly here for the sake of the initial logging/edge case check
#         initial_chunks = _initial_vector_search(query, vectorstore)
#         if not initial_chunks:
#             return []
            
#         # 2. Re-ranking
#         reranked_chunks = _apply_rerank(query, vectorstore)
        
#         # --- EDGE CASE CHECK 2: Insufficient chunks after reranking ---
#         if not reranked_chunks or len(reranked_chunks) <= K_FINAL_CONTEXT:
#              print(f"⚠️ INFO: Only {len(reranked_chunks)} chunks left after re-ranking. Skipping MMR and returning all of them.")
#              # The list slice ensures we return at most K_FINAL_CONTEXT
#              return reranked_chunks[K_FINAL_CONTEXT]

#         # 3. Diversification (MMR)
#         final_context_chunks = vectorstore.max_marginal_relevance_search(query)
        
#         print(f"✅ Success: Retrieved {len(final_context_chunks)} final diverse context chunks.")
#         return final_context_chunks

In [None]:
# from langchain_community.cross_encoders import HuggingFaceCrossEncoder
# from langchain_classic.retrievers.document_compressors import CrossEncoderReranker
# from langchain_classic.retrievers import ContextualCompressionRetriever
# from langchain_core.documents import Document
# from langchain_community.vectorstores import FAISS # Assuming FAISS is the vectorstore type

# K_SEARCH = 50        # Initial vector search depth (k1)
# K_RERANK = 20        # Chunks to keep after re-ranking (k2)
# K_FINAL_CONTEXT = 10 # Final context chunks (k3)

# # 1. Load model
# cross_encoder = HuggingFaceCrossEncoder(
#     model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"
# )
# # 2. Create reranker instance
# reranker_instance = CrossEncoderReranker(
#     model=cross_encoder,
#     top_n=K_RERANK
# )

# class RetrievalSystem:
#     def __init__(self, vectorstore: FAISS, reranker: CrossEncoderReranker):
#         self.vectorstore = vectorstore
#         self.reranker = reranker

#     def _initial_vector_search(self, query: str) -> list[Document]:
#         """
#         Step 1: Performs the initial, wide vector similarity search (k1).
#         Handles Edge Case 1 (No candidates found).
#         """
#         # --- 1. Initial Vector Search (Fetches K_SEARCH candidates) ---
#         initial_candidates: list[Document] = self.vectorstore.similarity_search(
#             query=query,
#             k=K_SEARCH
#         )

#         # --- EDGE CASE CHECK 1: No candidates found ---
#         if not initial_candidates:
#             print("❌ WARNING: No chunks found in the vector store for the query.")
#             return []

#         total_chunks = len(initial_candidates)
#         if total_chunks < K_SEARCH:
#             print(f"⚠️ INFO: Found only {total_chunks} chunks (less than k={K_SEARCH}). Proceeding with all available chunks.")

#         return initial_candidates

#     def _apply_rerank(self, query: str, initial_candidates: list[Document]) -> list[Document]:
#         """
#         Step 2: Applies the Cross-Encoder re-ranking to the initial candidates.
#         Uses the reranker's compress_documents method directly on the candidates.
#         Returns the top K_RERANK documents.
#         """
#         print(f"Applying reranking to {len(initial_candidates)} initial candidates...")
#         # The CrossEncoderReranker (a DocumentCompressor) has a compress_documents method
#         # that takes a list of documents and a query, and returns the top_n reranked documents.
#         reranked_chunks: list[Document] = self.reranker.compress_documents(
#             documents=initial_candidates,
#             query=query
#         )
#         print(f"Reranking completed. Obtained {len(reranked_chunks)} chunks after reranking.")
#         return reranked_chunks

#     def get_final_context_chunks(self, query: str) -> list[Document]:
#         """
#         Orchestrates the three-step retrieval process: Search -> Rerank -> MMR.
#         """
#         print(f"Starting retrieval process for query: '{query}'")

#         # 1. Initial Vector Search (k1 = K_SEARCH)
#         initial_candidates = self._initial_vector_search(query)
#         if not initial_candidates:
#             print("❌ Retrieval failed at initial vector search stage.")
#             return []
#         print(f"Initial vector search returned {len(initial_candidates)} candidates.")

#         # 2. Re-ranking (reduces to k2 = K_RERANK)
#         reranked_chunks = self._apply_rerank(query, initial_candidates)

#         # --- EDGE CASE CHECK 2: Insufficient chunks after reranking ---
#         # If reranking yields fewer chunks than K_FINAL_CONTEXT, we just return what we have.
#         if not reranked_chunks or len(reranked_chunks) <= K_FINAL_CONTEXT:
#              print(f"⚠️ INFO: Only {len(reranked_chunks)} chunks left after re-ranking (less than K_FINAL_CONTEXT={K_FINAL_CONTEXT}). Skipping MMR and returning all available reranked chunks.")
#              # Return all available reranked chunks, ensuring we don't return more than K_FINAL_CONTEXT if that's the upper limit
#              return reranked_chunks[:K_FINAL_CONTEXT]

#         # 3. Diversification (MMR) (reduces to k3 = K_FINAL_CONTEXT)
#         # We perform MMR on the vector store. 'fetch_k' specifies the number of top
#         # similarity results to consider for diversification (we use K_RERANK as a good pool size).
#         # 'k' specifies the final number of diverse documents to return.
#         print(f"Applying MMR to diversify the top {K_RERANK} candidates to {K_FINAL_CONTEXT} final chunks...")
#         final_context_chunks = self.vectorstore.max_marginal_relevance_search(
#             query=query,
#             k=K_FINAL_CONTEXT,  # Number of diverse documents to return
#             fetch_k=K_RERANK    # Number of documents to fetch for initial consideration for diversity
#         )

#         print(f"✅ Success: Retrieved {len(final_context_chunks)} final diverse context chunks.")
#         return final_context_chunks

# print("RetrievalSystem class defined with _initial_vector_search, _apply_rerank, and get_final_context_chunks methods.")

RetrievalSystem class defined with _initial_vector_search, _apply_rerank, and get_final_context_chunks methods.


### Testing

In [13]:
doc_path = "data/PRITHA_MITRA.pdf"
# Load document
document = load_document(doc_path)
# Extract info from doc
extracted_doc_info = extract_document_info(document, doc_path)
text_count = 0
for doc in extracted_doc_info:
    text_count += len(doc["text"])

print(f"Total page count before cleaning and normalization: {text_count}")

Attempting to load document from: data/PRITHA_MITRA.pdf
Successfully loaded 3 pages/parts from data/PRITHA_MITRA.pdf
Total page count before cleaning and normalization: 8456


In [14]:
# Clean the extracted document
cleaned_doc_list = clean_document_structure(extracted_doc_info)
text_count = 0
for doc in cleaned_doc_list:
    text_count += len(doc["text"])

print(f"Total page count after cleaning and normalization: {text_count}")

Cleaning document of type: pdf
Original text length: 3184
No specific structural cleaning for pdf. Applying general text normalization.
Cleaning document of type: pdf
Original text length: 2882
No specific structural cleaning for pdf. Applying general text normalization.
Cleaning document of type: pdf
Original text length: 2390
No specific structural cleaning for pdf. Applying general text normalization.
Total page count after cleaning and normalization: 8456


In [15]:
# Now check the chunking
chunks = chunk_document(cleaned_doc_list)
chunks[0]

Starting document chunking process...
Processing document 1/3 for chunking...
Applying structure-aware splitting for document type: pdf
Original text split into 2 structural chunks.
Applying length-based refinement with target_chunk_size=500 and chunk_overlap=100.
  Chunk 0 (tokens: 429) is within target. Adding directly.
  Chunk 1 (tokens: 232) is within target. Adding directly.
Total refined chunks after length-based refinement: 2
Processing document 2/3 for chunking...
Applying structure-aware splitting for document type: pdf
Original text split into 2 structural chunks.
Applying length-based refinement with target_chunk_size=500 and chunk_overlap=100.
  Chunk 0 (tokens: 394) is within target. Adding directly.
  Chunk 1 (tokens: 215) is within target. Adding directly.
Total refined chunks after length-based refinement: 2
Processing document 3/3 for chunking...
Applying structure-aware splitting for document type: pdf
Original text split into 2 structural chunks.
Applying length-base

{'text': 'Pritha Mitra\n/linkedinhttps://www.linkedin.com/in/pritha-mitra-7219b6246/ |\n/envel⌢peprithamitra208@gmail.com | ♂¶obile+91-8420274334\nSummary\nResults-driven Data Scientist with 2+ years of experience delivering AI/ML solutions in banking, finance,\nand automation domains. Skilled in Python, R, SQL, PySpark, Oracle DB, NLP, OCR, Generative AI,\nand API development, with a proven track record of 95% success in automation projects. Designed\nand deployed LLM-powered document processing pipelines, financial forecasting systems, and end-to-end\nML/DL applications that improved efficiency by up to 90%. Strong background in time-series forecasting,\nmodel deployment, and data-driven decision making. Interested in translating complex problems into\nscalable, real-world solutions that drive measurable business impact.\nWork Experience\nAI-ML Innovation Engineer, SimplyFi, Mumbai Oct 2024 – Present\n1. AI-Powered Document Processing & Classification System:\n• Designed and deployed

In [16]:
# Create vector store
from langchain_ollama.embeddings import OllamaEmbeddings
from src.embedding.embedder import OllamaEmbedder
from src.vectorstore.faiss_store import FaissVectorStore
embedder = OllamaEmbedder().get_embedder()
# vector_store = create_vector_store(final_chunks, embedder)
vector_store = FaissVectorStore().create_vector_store(chunks)

[ 2025-12-11 13:20:39,165 ] root - INFO - Initializing the Ollama embedder.
[ 2025-12-11 13:20:39,693 ] root - INFO - Initializing the Ollama embedder.
[ 2025-12-11 13:20:44,540 ] httpx - INFO - HTTP Request: POST http://127.0.0.1:11434/api/embed "HTTP/1.1 200 OK"
[ 2025-12-11 13:20:44,545 ] faiss.loader - INFO - Loading faiss with AVX2 support.
[ 2025-12-11 13:20:44,567 ] faiss.loader - INFO - Successfully loaded faiss with AVX2 support.


In [17]:
# 1. Load model
cross_encoder = HuggingFaceCrossEncoder(
    model_name="cross-encoder/ms-marco-MiniLM-L-6-v2"
)
# 2. Create reranker instance
reranker_instance = CrossEncoderReranker(
    model=cross_encoder,
    top_n=K_RERANK
)

[ 2025-12-11 13:20:47,465 ] sentence_transformers.cross_encoder.CrossEncoder - INFO - Use pytorch device: cpu
