In [None]:
# Initialize the script environment
import sys


sys.path.append('./utils')

In [None]:
# General imports
import os
from typing import List, Dict, Optional, Any
from logging_config import logger

## 1. Text Extraction

In [None]:
#imports
from utils.text_extractor import TextExtractor

In [None]:
file_path = "../../src/first_batch/IG03056_V2.pdf"

text_extractor = TextExtractor(file_path, min_words=20)

text_extractor.extract_text_advanced()
text_extractor.clean_text()
print()

In [None]:
text = text_extractor.get_text()
print(text[0:1000])  

## 2. Chunking

In [None]:
# Imports
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_experimental.text_splitter import SemanticChunker
from embeddings import get_embedding_model

In [None]:
def chunk_text_semantic(pages: List[Dict], embeddings, breakpoint_threshold_type = "percentile", breakpoint_threshold_amount: float = 0.95, min_chunk_size: int = 100) -> List[Dict]:
    """
    Splits page texts using LangChain's SemanticChunker.
    Groups sentences based on semantic similarity, not just size.

    Parameters:
        pages: List of dicts with keys: doc_name, page_num, text
        embeddings: The embeddings model to use for semantic chunking
        breakpoint_threshold_type: Type of threshold for chunking (default is "percentile")
        breakpoint_threshold_amount: Amount for the threshold (default is 0.95)
        min_chunk_size: Minimum size of chunks (default is 100)

    Returns:
        List of dicts with chunked text and metadata
    """
    chunker = SemanticChunker(embeddings, breakpoint_threshold_type=breakpoint_threshold_type, breakpoint_threshold_amount=breakpoint_threshold_amount, min_chunk_size=min_chunk_size)

    chunks = []
    for page in pages:
        try:
            text = page.get("text", "")
            chunked_texts = chunker.split_text(text)
            for i, chunk in enumerate(chunked_texts):
                chunks.append({
                    "doc_name": page.get("doc_name", "unknown"),
                    "page_number": page.get("page_number", -1),
                    "chunk_id": f"{doc_name}_p{page_num}_c{i}",
                    "text": chunk,
                    "num_words": len(chunk.split()),
                    "extraction_method": page.get("extraction_method", "unknown"),
                    "table_bboxes": page.get("table_bboxes", [])
                })
        except Exception as e:
            logger.error(f"⚠️ Semantic chunking failed on {doc_name} page {page_num}: {e}")

    logger.info(f"✅ Created {len(chunks)} semantic chunks from {len(pages)} pages in {doc_name}.\n")
    return chunks

In [None]:
embeddings = get_embedding_model()

In [None]:
doc_pages = text_extractor.get_data()
chunks = chunk_text_semantic(doc_pages, embeddings)

In [None]:
chunks[0]

## 3. Embedding

#### 3.1. Documents preparation

In [None]:
from langchain.schema import Document

In [None]:
# Converting chunks into LangChain Documents
def prepare_documents(chunks: List[Dict]) -> List[Document]:
    """
    Converts each chunk dict to a LangChain Document object
    with metadata (doc_name, page_num, chunk_id)

    Args:
        chunks: List of chunk dictionaries

    Returns:
        List of LangChain Document objects
    """
    documents = []
    for chunk in chunks:
        metadata = {
            "doc_name": chunk.get("doc_name", "unknown"),
            "page_number": chunk.get("page_number", -1),
            "chunk_id": chunk.get("chunk_id", "unknown"),
            "text": chunk.get("text", ""),
            "num_words": chunk.get("num_words", 0),
            "extraction_method": chunk.get("extraction_method", "unknown"),
            "table_bboxes": chunk.get("table_bboxes", []),
        }
        documents.append(Document(page_content=chunk["text"], metadata=metadata))
    return documents

#### 3.2. Build the FAISS index from Documents using the embeddings model

In [None]:
from langchain.vectorstores import FAISS

In [None]:
# Build the FAISS index from Documents
FAISS_PATH = "faiss_index"

def build_faiss_index(documents: List[Document], embeddings: HuggingFaceEmbeddings, persist_path: str = FAISS_PATH) -> FAISS:
    """
    Creates a FAISS vector store from a list of LangChain Documents.
    Saves it to disk for future use.

    Args:
        documents: List of LangChain Document objects
        embeddings: The embeddings model to use for FAISS indexing
        persist_path: Path to save the FAISS index

    Returns:
        FAISS vector store
    """
    # Build the index in memory
    vectorstore = FAISS.from_documents(documents, embedding=embeddings)

    # Save to disk
    vectorstore.save_local(persist_path)
    print(f"✅ FAISS index built and saved to '{persist_path}'\n")
    return vectorstore

In [None]:
documents = prepare_documents(chunks)
build_faiss_index(documents, embeddings)

#### 3.3. Visualize the FAISS index

In [None]:
# Imports
import numpy as np
from sklearn.manifold import TSNE
import plotly.graph_objects as go

In [None]:
vectorstore = FAISS.load_local(FAISS_PATH, embeddings, allow_dangerous_deserialization=True)
documents = list(vectorstore.docstore._dict.values())
metadatas = [doc.metadata for doc in documents]

all_embeddings = vectorstore.index.reconstruct_n(0, vectorstore.index.ntotal)

In [None]:
def visualize_2D():
    tsne = TSNE(n_components=2, random_state=42)
    reduced_vectors = tsne.fit_transform(all_embeddings)

    # Create the 2D scatter plot
    fig = go.Figure(data=[go.Scatter(
    x=reduced_vectors[:, 0],
    y=reduced_vectors[:, 1],
    mode='markers',
    marker=dict(size=5, opacity=0.8),
    text=[
        f"doc_name: {doc.metadata.get('doc_name', '')}\t"
        f"page: {doc.metadata.get('page_number', '')}\t"
        f"n_words: {doc.metadata.get('num_words', 0)}<br>"
        f"text: {doc.page_content[:150]}..."
        for doc in documents
    ],
     hoverinfo='text'
    )])

    fig.update_layout(
        title='2D FAISS Vector Store Visualization',
        scene=dict(xaxis_title='x',yaxis_title='y'),
        width=800,
        height=600,
        margin=dict(r=20, b=10, l=10, t=40)
    )

    fig.show()


In [None]:
visualize_2D()