In [10]:
!pip install PyMuPDF tiktoken langchain-text-splitters



In [24]:
import fitz # PyMuPDF
import tiktoken
import json
import math
import os
import re
from collections import Counter

In [25]:
# Import RecursiveCharacterTextSplitter
from langchain_text_splitters import RecursiveCharacterTextSplitter

# --- Configuration ---
# The target chunk size in tokens. A common value is 512.
CHUNK_SIZE_TOKENS = 512
# The overlap percentage between consecutive chunks (e.g., 0.15 for 15% overlap).
OVERLAP_PERCENTAGE = 0.15
# Encoding for tokenization (e.g., 'cl100k_base' for OpenAI models like GPT-4, GPT-3.5)
ENCODING_NAME = "cl100k_base"

# Heuristic for identifying common headers/footers
# Max number of lines from top/bottom of a page to consider as potential header/footer
MAX_LINES_TO_CHECK = 5
# Percentage of pages a line must appear on (excluding page 1) to be considered a common header/footer
REPETITION_THRESHOLD_PERCENT = 70

# Initialize tiktoken encoder globally for consistent token counting
ENCODER = tiktoken.get_encoding(ENCODING_NAME)


In [26]:
def count_tokens(text: str) -> int:
    """Counts tokens using the global tiktoken encoder."""
    return len(ENCODER.encode(text))

def extract_text_from_pdf(pdf_path: str) -> list[dict]:
    """
    Extracts text content page by page from a PDF document.

    Args:
        pdf_path (str): The file path to the PDF document.

    Returns:
        list[dict]: A list of dictionaries, each containing 'page_num' and 'text'
                    for a page. Returns an empty list if the file cannot be opened.
    """
    pages_content = []
    try:
        document = fitz.open(pdf_path)
        for page_num in range(len(document)):
            page = document.load_page(page_num)
            text = page.get_text("text")
            pages_content.append({"page_num": page_num + 1, "text": text})
        document.close()
    except Exception as e:
        print(f"Error reading PDF {pdf_path}: {e}")
    return pages_content

In [27]:
def identify_common_page_elements(all_pages_content: dict[str, list[dict]],
                                   max_lines: int = MAX_LINES_TO_CHECK,
                                   repetition_threshold_percent: int = REPETITION_THRESHOLD_PERCENT) -> tuple[set, set]:
    """
    Analyzes text from multiple pages (excluding first pages) to identify common
    header and footer lines based on repetition.

    Args:
        all_pages_content (dict[str, list[dict]]): Dictionary where keys are doc_ids
                                                    and values are lists of page_data.
        max_lines (int): Max number of lines from top/bottom to consider.
        repetition_threshold_percent (int): Percentage of non-first pages a line must
                                            appear on to be considered common.

    Returns:
        tuple[set, set]: Two sets: (common_header_lines, common_footer_lines).
    """
    header_candidates = Counter()
    footer_candidates = Counter()
    total_non_first_pages = 0

    for doc_id, pages_data in all_pages_content.items():
        for page_data in pages_data:
            page_num = page_data['page_num']
            page_text = page_data['text']

            # Skip the first page of each document for common element identification
            if page_num == 1:
                continue

            total_non_first_pages += 1
            lines = [line.strip() for line in page_text.split('\n') if line.strip()]

            # Collect header candidates
            for i in range(min(max_lines, len(lines))):
                header_candidates[lines[i]] += 1

            # Collect footer candidates (from the end of the page)
            for i in range(max(0, len(lines) - max_lines), len(lines)):
                footer_candidates[lines[i]] += 1

    common_header_lines = set()
    common_footer_lines = set()

    if total_non_first_pages == 0:
        print("No non-first pages found to identify common elements.")
        return common_header_lines, common_footer_lines

    threshold_count = math.ceil(total_non_first_pages * (repetition_threshold_percent / 100))
    print(f"Identifying common elements: Total non-first pages: {total_non_first_pages}, Threshold count: {threshold_count}")

    for line, count in header_candidates.items():
        if count >= threshold_count:
            common_header_lines.add(line)
            print(f"  Identified common header: '{line}' (appears {count} times)")

    for line, count in footer_candidates.items():
        # A common heuristic for page numbers: remove if it's just a number or "Page X"
        if count >= threshold_count and (re.fullmatch(r'\s*\d+\s*', line) or re.fullmatch(r'Page\s+\d+\s*(of\s+\d+)?', line, re.IGNORECASE)):
            common_footer_lines.add(line)
            print(f"  Identified common footer: '{line}' (appears {count} times)")
        # You can extend this logic to include other non-numeric common footers if needed

    return common_header_lines, common_footer_lines

In [28]:
def remove_identified_elements(page_text: str, page_num: int,
                               common_header_lines: set, common_footer_lines: set) -> str:
    """
    Removes identified common header and footer lines from a page's text.
    It skips removal for the first page.

    Args:
        page_text (str): The text content of a single page.
        page_num (int): The current page number (1-indexed).
        common_header_lines (set): Set of lines identified as common headers.
        common_footer_lines (set): Set of lines identified as common footers.

    Returns:
        str: The page text with identified common elements removed.
    """
    # Skip removal for the first page, as it contains unique, important metadata.
    if page_num == 1:
        return page_text

    lines = [line.strip() for line in page_text.split('\n')]
    cleaned_lines = []

    # Flags to stop removal once non-header/non-footer content is found
    header_removal_done = False
    footer_removal_done = False

    # Process lines from top for header removal
    for i, line in enumerate(lines):
        if not header_removal_done and line in common_header_lines:
            # This line is a common header, skip it
            continue
        else:
            # Found non-header content or no more headers, stop checking
            header_removal_done = True
            cleaned_lines.append(line) # Add this line and subsequent lines

    # Now process the cleaned lines from the bottom for footer removal
    # This is a bit tricky with `cleaned_lines` already built.
    # A simpler approach for this heuristic is to rebuild `cleaned_lines` from scratch
    # by iterating through original lines and marking for inclusion/exclusion.

    # Re-process original lines for both header and footer removal in one pass
    final_lines = []

    # Determine which lines to keep from the top (non-headers)
    temp_lines = []
    for i, line in enumerate(lines):
        if line in common_header_lines and i < MAX_LINES_TO_CHECK: # Only consider top lines for header
            continue # Skip this header line
        else:
            temp_lines.append(line)

    # Determine which lines to keep from the bottom (non-footers)
    # Iterate from the end of temp_lines
    footer_check_start_index = max(0, len(temp_lines) - MAX_LINES_TO_CHECK)
    for i, line in enumerate(temp_lines):
        if line in common_footer_lines and i >= footer_check_start_index: # Only consider bottom lines for footer
            continue # Skip this footer line
        else:
            final_lines.append(line)

    # Filter out any completely empty lines that result from removal
    return "\n".join(line for line in final_lines if line.strip() != "")

In [29]:
def chunk_text_with_metadata(text: str, chunk_size_tokens: int, overlap_percentage: float,
                             doc_id: str, page: int, base_clause_id_prefix: str):
    """
    Splits a given text into chunks using RecursiveCharacterTextSplitter,
    prioritizing natural language boundaries. Each chunk is tagged with metadata.

    Args:
        text (str): The text content to be chunked.
        chunk_size_tokens (int): The target maximum number of tokens per chunk.
                                 This is converted to character length for the splitter.
        overlap_percentage (float): The percentage of overlap between consecutive chunks.
        doc_id (str): The ID of the document (e.g., "policy_123").
        page (int): The page number the content is notionally from.
        base_clause_id_prefix (str): A prefix for generating clause IDs (e.g., "1.1").

    Returns:
        list[dict]: A list of dictionaries, where each dictionary represents a chunk
                    and contains its content and metadata.
    """
    # Estimate character length based on average tokens per character
    avg_chars_per_token = 4 # Common average for English text
    chunk_size_chars = chunk_size_tokens * avg_chars_per_token
    overlap_chars = math.floor(chunk_size_chars * overlap_percentage)

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size_chars,
        chunk_overlap=overlap_chars,
        length_function=len, # Use character length for splitting
        separators=["\n\n", "\n", " ", ""] # Prioritize paragraphs, then lines, then words, then characters
    )

    raw_chunks = text_splitter.split_text(text)

    processed_chunks = []
    for i, chunk_content in enumerate(raw_chunks):
        token_length = count_tokens(chunk_content)
        clause_id = f"{base_clause_id_prefix}-{doc_id}-p{page}-c{i + 1}"

        metadata = {
            "doc_id": doc_id,
            "page": page,
            "clause_id": clause_id,
            "chunk_length_tokens": token_length,
            "chunk_length_chars": len(chunk_content)
        }

        processed_chunks.append({
            "content": chunk_content,
            "metadata": metadata
        })

    return processed_chunks

In [30]:
def process_pdfs_for_chunking(pdf_paths: list[str]):
    """
    Processes a list of PDF file paths, extracts text, dynamically identifies
    and removes common headers/footers, and chunks the cleaned text.

    Args:
        pdf_paths (list[str]): A list of file paths to the PDF documents.

    Returns:
        list[dict]: A flattened list of all processed chunks from all PDFs.
    """
    all_docs_pages_content = {}
    # First pass: Extract all text and store it for common element identification
    for pdf_path in pdf_paths:
        doc_id = os.path.splitext(os.path.basename(pdf_path))[0]
        pages_content = extract_text_from_pdf(pdf_path)
        if pages_content:
            all_docs_pages_content[doc_id] = pages_content
        else:
            print(f"Skipping {pdf_path} due to extraction errors.")

    # Identify common headers and footers across all documents (excluding first pages)
    common_header_lines, common_footer_lines = identify_common_page_elements(all_docs_pages_content)

    all_processed_chunks = []
    # Second pass: Process each page, remove identified common elements, and chunk
    for doc_id, pages_data in all_docs_pages_content.items():
        print(f"\n--- Chunking Document: {doc_id} ---")
        for page_data in pages_data:
            page_num = page_data['page_num']
            raw_page_text = page_data['text']

            # --- Dynamically remove identified common headers/footers ---
            cleaned_page_text = remove_identified_elements(
                raw_page_text, page_num, common_header_lines, common_footer_lines
            )

            print(f"  Processing Page {page_num} (raw length: {len(raw_page_text)} chars, cleaned length: {len(cleaned_page_text)} chars)")

            if not cleaned_page_text.strip():
                print(f"    Page {page_num} became empty after cleaning. Skipping chunking for this page.")
                continue

            page_chunks = chunk_text_with_metadata(
                text=cleaned_page_text,
                chunk_size_tokens=CHUNK_SIZE_TOKENS,
                overlap_percentage=OVERLAP_PERCENTAGE,
                doc_id=doc_id,
                page=page_num,
                base_clause_id_prefix="Clause"
            )
            all_processed_chunks.extend(page_chunks)
            print(f"    Generated {len(page_chunks)} chunks for page {page_num}.")

    return all_processed_chunks

In [31]:
# --- Example Usage with Placeholder PDF Paths ---
# IMPORTANT: Replace these with the actual paths to your uploaded PDF files in Colab
pdf_file_paths = [
    "/content/BAJHLIP23020V012223.pdf",
    "/content/CHOTGDP23004V012223.pdf",
    "/content/EDLHLGA23009V012223.pdf",
    "/content/HDFHLIP23024V072223.pdf",
    "/content/ICIHLIP22012V012223.pdf"
]

# Check if placeholder files exist (they won't unless you upload them)
# This is just for demonstration; in a real scenario, you'd ensure files are there.
existing_pdf_paths = [p for p in pdf_file_paths if os.path.exists(p)]


In [32]:
if not existing_pdf_paths:
    print("WARNING: No PDF files found at the specified paths. Please upload your PDFs to Colab")
    print("and update the 'pdf_file_paths' list with their correct locations.")
    print("Proceeding with a dummy text for demonstration purposes as no PDFs were found.")
    dummy_text_page1 = """
    Policy Title: Comprehensive Health Plan 2025
    Version 1.0 - Effective Date: Jan 1, 2025

    This is the unique content for page 1.
    """
    dummy_text_page2 = """
    Common Header Text
    Section 1: Eligibility

    1.1 Age Requirements:
    Applicants must be between 18 and 65 years old. Dependents up to 26 are covered if full-time students.

    Common Footer Text - Page 2
    """
    dummy_text_page3 = """
    Common Header Text
    1.2 Geographic Coverage:
    Coverage is valid in all 50 US states. Travel abroad is limited to 90 days.

    Table 1: Deductibles
    | Plan Type | Deductible | Co-pay |
    |---|---|---|
    | Basic | $1000 | $50 |
    | Premium | $500 | $25 |

    Common Footer Text - Page 3
    """

    # Simulate multiple pages for a single dummy document to test common element identification
    simulated_pages_content = {
        "dummy_doc": [
            {"page_num": 1, "text": dummy_text_page1},
            {"page_num": 2, "text": dummy_text_page2},
            {"page_num": 3, "text": dummy_text_page3},
        ]
    }

    # Manually call the two-pass process for the dummy data
    print("\n--- Generating chunks from dummy text for demonstration ---")
    common_header_lines_dummy, common_footer_lines_dummy = identify_common_page_elements(simulated_pages_content)

    processed_chunks = []
    for doc_id, pages_data in simulated_pages_content.items():
        for page_data in pages_data:
            cleaned_page_text = remove_identified_elements(
                page_data['text'], page_data['page_num'], common_header_lines_dummy, common_footer_lines_dummy
            )
            if cleaned_page_text.strip():
                page_chunks = chunk_text_with_metadata(
                    text=cleaned_page_text,
                    chunk_size_tokens=CHUNK_SIZE_TOKENS,
                    overlap_percentage=OVERLAP_PERCENTAGE,
                    doc_id=doc_id,
                    page=page_data['page_num'],
                    base_clause_id_prefix="DummyClause"
                )
                processed_chunks.extend(page_chunks)

else:
    print(f"--- Starting PDF Processing and Chunking for {len(existing_pdf_paths)} files ---")
    processed_chunks = process_pdfs_for_chunking(existing_pdf_paths)
    print(f"\n--- Total Generated Chunks: {len(processed_chunks)} ---")

# Display a few sample chunks
if processed_chunks:
    print("\n--- Sample of Processed Chunks (first 5) ---")
    for i, chunk in enumerate(processed_chunks[:5]):
        print(f"\nChunk {i+1}:")
        print(f"  Metadata: {json.dumps(chunk['metadata'], indent=2)}")
        print(f"  Content (first 200 chars): \"{chunk['content'][:200]}...\"")
        print(f"  Content length (chars): {len(chunk['content'])}")
        print(f"  Content length (tokens): {chunk['metadata']['chunk_length_tokens']}")
else:
    print("No chunks were generated. Please ensure your PDFs are uploaded and paths are correct.")

print(f"\n--- Document Processing & Chunking Complete ---")


--- Starting PDF Processing and Chunking for 5 files ---
Identifying common elements: Total non-first pages: 217, Threshold count: 152

--- Chunking Document: BAJHLIP23020V012223 ---
  Processing Page 1 (raw length: 4505 chars, cleaned length: 4505 chars)
    Generated 3 chunks for page 1.
  Processing Page 2 (raw length: 3985 chars, cleaned length: 3815 chars)
    Generated 3 chunks for page 2.
  Processing Page 3 (raw length: 4461 chars, cleaned length: 4303 chars)
    Generated 3 chunks for page 3.
  Processing Page 4 (raw length: 3890 chars, cleaned length: 3721 chars)
    Generated 2 chunks for page 4.
  Processing Page 5 (raw length: 4371 chars, cleaned length: 4206 chars)
    Generated 3 chunks for page 5.
  Processing Page 6 (raw length: 4644 chars, cleaned length: 4484 chars)
    Generated 3 chunks for page 6.
  Processing Page 7 (raw length: 4517 chars, cleaned length: 4353 chars)
    Generated 3 chunks for page 7.
  Processing Page 8 (raw length: 3900 chars, cleaned length: 