## Large Document Processing with Azure OpenAI and FAISS

### Imports

In [1]:
#imports
import fitz  # PyMuPDF
import openai
import os
import pickle
import glob  
import pickle  
import faiss  
import numpy as np 
from typing import List, Tuple ,Optional
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI
from concurrent.futures import ThreadPoolExecutor, as_completed
from langchain_core.messages import HumanMessage
from typing import List, Tuple, Dict, Any
from dotenv import load_dotenv


### Model Initialization

In [None]:
# Create Azure embeddings and OpenAI client using environment variables
load_dotenv() 

openai.api_base    = os.environ.get("#your_azure_openai_endpoint#")
openai.api_key     = os.environ.get("#your_azure_openai_key#")
openai.api_version = os.environ.get("#your_azure_openai_version#")

EMBEDDING_MODEL = os.environ.get("#your_azure_embedding_model#")
LLM_MODEL       = os.environ.get("#your_azure_llm_model#")

# ─── Instantiate the embeddings client ────────────────────────────────────────
embedding_model = AzureOpenAIEmbeddings(
    azure_endpoint=openai.api_base,
    api_key=openai.api_key,
    azure_deployment=EMBEDDING_MODEL,
    openai_api_version=openai.api_version,
)

# 3. Initialize your LLM model (AzureChatOpenAI)
llm = AzureChatOpenAI(
    azure_endpoint=openai.api_base,
    api_key=openai.api_key,
    azure_deployment=LLM_MODEL,
    openai_api_version=openai.api_version,
)

### Function to extract blocks from the large document

In [3]:
#....Extracts text from a PDF file, capturing each text block’s content, font size, vertical position, and page number.....#

def extract_blocks_with_fonts(pdf_path: str) -> List[Dict]:
    doc = fitz.open(pdf_path)  # Open the PDF document using PyMuPDF
    all_blocks = []  # Initialize a list to store all extracted text blocks

    for page_num, page in enumerate(doc, start=1):  # Loop through each page in the PDF
        blocks = page.get_text("dict")["blocks"]  # Extract all blocks (text/images) from the page

        for block in blocks:  # Loop through each block on the page
            if block["type"] != 0:
                continue  # Skip non-text blocks (e.g., images)

            text = ""  # Initialize a string to collect text from the block
            max_font_size = 0  # Track the largest font size in the block

            for line in block["lines"]:  # Loop through each line in the block
                for span in line["spans"]:  # Loop through each span (continuous text with same style)
                    text += span["text"]  # Add the span's text to the block's text
                    max_font_size = max(max_font_size, span["size"])  # Update max font size if needed

            if text.strip():  # If the block contains non-empty text
                all_blocks.append({
                    "text": text.strip(),  # Store the cleaned text
                    "font_size": max_font_size,  # Store the largest font size found in the block
                    "y0": block["bbox"][1],  # Store the vertical position (top) of the block
                    "page": page_num  # Store the page number
                })

    return all_blocks  # Return the list of all extracted text blocks with their metadata

### Function to detect headings in the blocks

In [4]:
#.....Automatically identifies headings based on font size and groups subsequent text blocks under these headings......#

def detect_headings(blocks: List[Dict], font_size_threshold: float = 2.0) -> List[Tuple[str, List[str]]]:
    """
    Detects headings using font size and groups blocks under each heading.
    A heading is a block that has a significantly larger font than the previous average.
    """
    avg_font = sum(b["font_size"] for b in blocks) / len(blocks)  # Calculate the average font size across all blocks
    threshold = avg_font + font_size_threshold  # Set the threshold for heading detection

    sections = []  # Initialize a list to store the detected sections
    current_section = {"heading": "Introduction", "content": []}  # Start with a default section

    for block in blocks:  # Loop through each block in the list
        if block["font_size"] >= threshold:  # If the block's font size is above the threshold, treat as heading
            # Save previous section if it has content
            if current_section["content"]:
                sections.append((current_section["heading"], current_section["content"]))  # Add previous section to list
            # Start a new section with the current block's text as the heading
            current_section = {"heading": block["text"], "content": []}
        else:
            current_section["content"].append(block["text"])  # Add block text to the current section's content

    # After looping, add the last section if it has content
    if current_section["content"]:
        sections.append((current_section["heading"], current_section["content"]))

    return sections  # Return the list of sections, each as a tuple (heading, content list)

### Function to chunk sections

In [5]:
#.....Divides sections into smaller chunks (max ~32KB each), making them manageable for embedding and retrieval......#

def chunk_sections(sections: List[Tuple[str, List[str]]]) -> List[str]:
    """
    Splits sections into multiple chunks such that:
    - Each chunk does not exceed 32KB.
    - Each chunk ends at the boundary of a section.
    - All sections are preserved without being split.
    """
    
    chunks = []                    # List to hold the final chunks
    current_chunk = ""            # String to build the current chunk
    current_len = 0               # Byte length of the current chunk

    MAX_CHUNK_SIZE = 32 * 1024    # Set max chunk size to 32KB (32 * 1024 bytes)

    for heading, content in sections:
        # Construct the full text of the section with heading and content
        section_text = f"{heading}\n" + "\n".join(content) + "\n\n"
        
        # Compute the byte length of the section (not character length)
        section_len = len(section_text.encode("utf-8"))

        # If the section by itself exceeds the max chunk size,
        # treat it as an individual chunk
        if section_len > MAX_CHUNK_SIZE:
            if current_chunk:  # If there's any content in current chunk, save it first
                chunks.append(current_chunk)
                current_chunk = ""
                current_len = 0
            chunks.append(section_text)  # Add large section as its own chunk
            continue  # Move to the next section

        # If adding the section exceeds the chunk limit,
        # finalize the current chunk and start a new one
        if current_len + section_len > MAX_CHUNK_SIZE:
            chunks.append(current_chunk)      # Save current chunk
            current_chunk = section_text      # Start a new chunk with current section
            current_len = section_len         # Reset length tracker
        else:
            # Add section to the current chunk
            current_chunk += section_text
            current_len += section_len

    # After the loop, if there's any remaining chunk, add it to the list
    if current_chunk:
        chunks.append(current_chunk)

    return chunks  # Return the list of chunked strings


### Fuction to save chunks

In [None]:
#.....Stores each chunk into separate text files for easy access and future use.......#

def save_chunks(chunks: List[str], output_dir: str):
    # Create the output directory if it doesn't already exist
    os.makedirs(output_dir, exist_ok=True)

    # Iterate through the list of chunks with their index (starting from 1)
    for i, chunk in enumerate(chunks, 1):
        # Construct the file path for the current chunk (e.g., "chunk_1.txt")
        chunk_path = os.path.join(output_dir, f"chunk_{i}.txt")

        # Open the file in write mode with UTF-8 encoding
        with open(chunk_path, "w", encoding="utf-8") as f:
            # Write the chunk content to the file
            f.write(chunk)


### Function to embed chunks in parallel

In [7]:
# .....Generates embeddings (vector representations) for all document chunks in parallel......#

def embed_chunks_parallel(
    chunks: List[str],              # List of text chunks to be embedded
    embeddings_model: Any,         # The embedding model instance with `embed_documents` method
    max_workers: int = None        # Optional: Max number of parallel threads (defaults to CPU count)
) -> Dict[int, List[float]]:
    """
    Generate embeddings for each chunk in parallel.
    Returns a dict {chunk_index: embedding_vector}.
    """

    # Internal helper function to generate embedding for a single chunk
    def _embed(idx: int, text: str):
        emb = embeddings_model.embed_documents([text])[0]  # Generate embedding; output is a list with one item
        return idx, emb  # Return the chunk index and the corresponding embedding

    embeddings: Dict[int, List[float]] = {}  # Dictionary to store embeddings with chunk index as key

    # Create a thread pool executor to run embedding tasks in parallel
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all embedding tasks to the executor
        futures = {
            executor.submit(_embed, i + 1, chunk): i + 1  # Key: future object, Value: chunk index
            for i, chunk in enumerate(chunks)             # Enumerate chunks, starting index from 1
        }

        # Collect results as tasks complete
        for future in as_completed(futures):
            idx = futures[future]  # Get the corresponding chunk index for the future
            try:
                _, emb = future.result()  # Get the result (index and embedding) from the completed future
                embeddings[idx] = emb     # Store the embedding in the dictionary
                print(f"[chunk_{idx}] Embedding generated.")  # Log success
            except Exception as e:
                print(f"[chunk_{idx}] Error generating embedding: {e}")  # Log any error during embedding

    return embeddings  # Return the dictionary of chunk index → embedding


### Function to save embeddings

In [8]:
#.......Stores these generated embeddings to system for future use.......

def save_embeddings(
    embeddings: Dict[int, Any],     # Dictionary mapping chunk index to its embedding
    output_dir: str                 # Directory where embeddings will be saved
) -> None:
    """
    Save embeddings to disk under:
        <output_dir>/embeddings/chunk_<idx>.pkl
    """
    
    # Construct the path to the 'embeddings' subdirectory inside the output directory
    embeddings_dir = os.path.join(output_dir)
    
    # Create the directory if it doesn't already exist
    os.makedirs(embeddings_dir, exist_ok=True)

    # Iterate through the dictionary of embeddings
    for idx, emb in embeddings.items():
        # Build the file path for the current chunk embedding file
        file_path = os.path.join(embeddings_dir, f"chunk_{idx}.pkl")
        
        # Open the file in binary write mode and save the embedding using pickle
        with open(file_path, "wb") as f:
            pickle.dump(emb, f)
        
        # Print confirmation that the embedding was saved successfully
        print(f"[chunk_{idx}] Embedding saved to: {file_path}")


In [9]:
# ─── 1. Load chunks & embeddings in parallel ────────────────────
def _load_one_pair(txt_path: str, embs_dir: str) -> Tuple[str, np.ndarray]:
    # Extract base filename without extension
    base = os.path.splitext(os.path.basename(txt_path))[0]
    # Construct corresponding pickle path
    pkl_path = os.path.join(embs_dir, base + ".pkl")
    # Ensure the embedding file exists
    if not os.path.exists(pkl_path):
        raise FileNotFoundError(f"No embedding .pkl for {txt_path}")
    # Read the chunk text
    with open(txt_path, "r", encoding="utf-8") as f:
        text = f.read()
    # Load the embedding vector from pickle
    with open(pkl_path, "rb") as f:
        emb = pickle.load(f)
    # Convert the embedding to a NumPy array
    emb_array = np.array(emb, dtype="float32")
    return text, emb_array  # Return chunk text and its embedding




In [10]:
# Loads the stored chunks and their corresponding embeddings back into memory

def load_local_embeddings(chunks_dir: str, embs_dir: str, max_workers: int = 8) -> Tuple[List[str], np.ndarray]:
    # Find all text chunk files in the directory
    txt_files = sorted(glob.glob(os.path.join(chunks_dir, "chunk_*.txt"))) # Get all chunk text files, sorted
    if not txt_files:
        raise ValueError(f"No chunk_*.txt files found in {chunks_dir!r}")  # Raise error if no chunk files found
    
    texts: List[str] = []      # List to store loaded chunk texts
    embs: List[np.ndarray] = []  # List to store loaded embeddings

    # Load data in parallel using ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit a loading task for each chunk file
        futures = {executor.submit(_load_one_pair, txt, embs_dir): txt for txt in txt_files}
        for fut in as_completed(futures): # As each task completes
            src = futures[fut] # Get the source file for this future
            try:
                text, emb = fut.result()  # Get the loaded text and embedding
                texts.append(text)        # Add text to list
                embs.append(emb)          # Add embedding to list
            except Exception as e:
                print(f"[ERROR] loading {src!r}: {e}")

    if not embs:
        raise ValueError("No embeddings loaded!")

    # Combine all embeddings into a single NumPy array (matrix)
    embs_array = np.vstack(embs)
    return texts, embs_array  # Return list of texts and embedding matrix

In [11]:
# ─── 2. Build a FAISS index ────────────────────────────────────
def build_faiss_index(embs: np.ndarray) -> faiss.IndexFlatIP:
    # Get the dimensionality of the embeddings
    dim = embs.shape[1]
    # Create an index for Inner Product (cosine similarity)
    index = faiss.IndexFlatIP(dim)
    # Normalize the embeddings to unit vectors for cosine similarity
    faiss.normalize_L2(embs)
    # Add all embeddings to the index
    index.add(embs)
    return index  # Return FAISS index

In [12]:
# ─── 3. Embed a query via Azure OpenAI ─────────────────────────
def get_query_embedding(query: str, embeddings_model: AzureOpenAIEmbeddings) -> np.ndarray:
    # Get query embedding using Azure OpenAI model
    emb = embeddings_model.embed_query(query)
    # Convert to float32 NumPy array
    arr = np.array(emb, dtype="float32")
    # Normalize the embedding
    arr /= np.linalg.norm(arr)
    return arr.reshape(1, -1)  # Return 2D array for FAISS query

In [13]:
# Use LLM to validate & respond based on top FAISS chunks
def query_with_llm(query: str, texts: List[str], index: faiss.IndexFlatIP,
                   embeddings_model: AzureOpenAIEmbeddings, llm, top_k: int = 5) -> str:
    # 1. Embed the user query
    q_emb = get_query_embedding(query, embeddings_model)

    # 2. Retrieve top-k similar chunks from FAISS
    D, I = index.search(q_emb, top_k)
    retrieved_chunks = [texts[i] for i in I[0]]

    # 3. Prepare context string for LLM
    context = "\n\n".join(f"Chunk #{i+1}:\n{texts[i]}" for i in I[0])

    # 4. Compose the final prompt
    prompt = f"""
You are a helpful assistant. Given the following context chunks from a document and a user query, answer the query accurately using only the context provided.

Context:
{context}

User Query:
{query}

Answer:
"""

    # 5. Send prompt to LLM
    response = llm.invoke([HumanMessage(content=prompt)])

    return response.content.strip()  # Clean and return LLM's response

In [None]:
# ─── Main PDF Split & Local FAISS Flow ───────────────────────────
def split_pdf_semantically(
    pdf_path: str,
    chunks_output_dir: str,
    embeddings_model: AzureOpenAIEmbeddings = None,
    embeddings_output_dir: str = "",
    post_index_query: Optional[str] = None
) -> None:
    """
    Full pipeline:
      1) Extract and chunk PDF
      2) Persist chunks as .txt
      3) Generate & save embeddings (.pkl)
      4) Load local embeddings + FAISS + LLM
    """
    # 1–3: extract, detect, chunk, save
    print("🔍 Extracting layout-aware text blocks...")
    blocks = extract_blocks_with_fonts(pdf_path)
    print("🧠 Detecting semantic headings...")
    sections = detect_headings(blocks)
    print("📦 Chunking content...")
    chunks = chunk_sections(sections)
    print(f"💾 Saving chunks to {chunks_output_dir}")
    save_chunks(chunks, chunks_output_dir)

    print(f"🤖 Generating embeddings for {len(chunks)} chunks...")
    embeddings = embed_chunks_parallel(chunks, embeddings_model)
    print(f"💾 Saving embeddings to {embeddings_output_dir}")
    save_embeddings(embeddings, embeddings_output_dir)

    # 4: local FAISS flow
    print("🗄️ Loading local embeddings and building FAISS index...")
    texts, embs_matrix = load_local_embeddings(chunks_output_dir, embeddings_output_dir)
    faiss_index = build_faiss_index(embs_matrix)
    if post_index_query:
        print(f"🔎 Running local FAISS+LLM for: '{post_index_query}'")
        local_ans = query_with_llm(
            post_index_query, texts, faiss_index,
            embeddings_model, llm=llm, top_k=3
        )
        print("💬 Local FAISS+LLM Answer:", local_ans)


In [None]:
split_pdf_semantically(
        pdf_path=r"#your_pdf_file_path#",
        chunks_output_dir="#your_chunks_output_dir#",
        embeddings_model=embedding_model,
        embeddings_output_dir="#your_embeddings_output_dir#",
        post_index_query="#your_query_here#"
    )

🔍 Extracting layout-aware text blocks...
🧠 Detecting semantic headings...
📦 Chunking content...
💾 Saving chunks to azureai_chunks
🤖 Generating embeddings for 89 chunks...
[chunk_20] Embedding generated.
[chunk_13] Embedding generated.
[chunk_15] Embedding generated.
[chunk_3] Embedding generated.
[chunk_6] Embedding generated.
[chunk_18] Embedding generated.
[chunk_19] Embedding generated.
[chunk_14] Embedding generated.
[chunk_21] Embedding generated.
[chunk_17] Embedding generated.
[chunk_27] Embedding generated.
[chunk_5] Embedding generated.
[chunk_26] Embedding generated.
[chunk_2] Embedding generated.
[chunk_4] Embedding generated.
[chunk_16] Embedding generated.
[chunk_11] Embedding generated.
[chunk_23] Embedding generated.
[chunk_7] Embedding generated.
[chunk_10] Embedding generated.
[chunk_24] Embedding generated.
[chunk_8] Embedding generated.
[chunk_22] Embedding generated.
[chunk_9] Embedding generated.
[chunk_1] Embedding generated.
[chunk_28] Embedding generated.
[chunk