## PDF Document Partitioning

Objective:
Implement the `partition_the_pdf_document` function to analyze a PDF file and identify groups of pages that belong together as distinct documents or sections.

Code Structure:
The function `partition_the_pdf_document` is provided with a basic structure. Your task is to implement the logic within this function.

Input:
- A PDF file path (the function should be able to handle various PDF files)

Output:
- A dictionary where:
  - Keys are strings representing document names (e.g., "Document 1", "Document 2", etc.)
  - Values are lists of integers representing page numbers belonging to each document

Task:
1. Implement the `partition_the_pdf_document` function:
   - Input: A string representing the path to the input PDF file
   - Output: A dictionary of document groups as described above

2. The function should analyze the content and visual features of the PDF to determine logical groupings of pages.

Requirements:
1. The function must be generalizable to work with different PDF files, not just the example "grouped_documents.pdf".
2. Implement robust methods to detect various visual features that might distinguish different documents within the PDF, such as:
   - Colored borders
   - Watermarks
   - Colored backgrounds
   - Distinctive headers or footers
   - Changes in layout or formatting
3. Handle potential exceptions or edge cases (e.g., inconsistent formatting, mixed feature types).
4. Optimize for both accuracy and processing speed, considering that PDFs can be large and contain many pages.
5. You are allowed to use up to 40GB of GPU VRAM if necessary for your implementation.

Additional Considerations:
- You may use additional libraries if needed, but ensure they are imported properly.
- You can create additional helper functions as necessary to support your implementation.
- Provide clear comments in your code to explain your partitioning logic.

Testing:
- Test your implementation with various types of PDFs to ensure its robustness and generalizability.
- The main script provides a way to test your implementation on a file named "grouped_documents.pdf".

Note:
The example output in the provided code is specific to "grouped_documents.pdf". Your implementation should be able to handle different PDFs with varying numbers of documents and pages per document.

In [4]:
import fitz  # PyMuPDF
import re
import numpy as np
import csv
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.feature_extraction.text import TfidfVectorizer
from typing import Dict, List, Any, Tuple
from collections import defaultdict
from PIL import Image
from io import BytesIO
import warnings
from sklearn.exceptions import ConvergenceWarning

warnings.filterwarnings("ignore", message="Could not find the number of physical cores for the following reason:")
warnings.filterwarnings("ignore", category=ConvergenceWarning)


def get_color_feature(item: Dict[str, Any]) -> Tuple[int, int, int]:
    """
    Get the color feature from a drawing item.

    Args:
        item (Dict[str, Any]): The drawing item.

    Returns:
        Tuple[int, int, int]: The color feature as an RGB tuple.
    """
    color = item.get('color', (0, 0, 0))
    if color is None or not isinstance(color, tuple):
        return (0, 0, 0)
    return color

def normalize_color(color: Tuple[int, int, int]) -> List[float]:
    """
    Normalize color values to the range [0, 1].

    Args:
        color (Tuple[int, int, int]): The color to normalize.

    Returns:
        List[float]: The normalized color values.
    """
    return [c / 255.0 for c in color]

def extract_text_and_color(blocks: List[Dict[str, Any]], page_height: float) -> Tuple[str, Tuple[int, int, int], str, Tuple[int, int, int]]:
    """
    Extract text and color information from header and footer.

    Args:
        blocks (List[Dict[str, Any]]): The text blocks.
        page_height (float): The height of the page.

    Returns:
        Tuple[str, Tuple[int, int, int], str, Tuple[int, int, int]]: Header text and color, footer text and color.
    """
    header_text = ''
    header_color = (255, 255, 255)
    footer_text = ''
    footer_color = (255, 255, 255)

    for block in blocks:
        if "lines" in block:
            if block["bbox"][1] < 0.1 * page_height and block["lines"]:
                header_text = block["lines"][0]["spans"][0]["text"] if block["lines"][0]["spans"] else ''
                if block["lines"][0]["spans"]:
                    header_color = get_color_feature(block["lines"][0]["spans"][0])
            if block["bbox"][3] > 0.9 * page_height and block["lines"]:
                footer_text = block["lines"][0]["spans"][0]["text"] if block["lines"][0]["spans"] else ''
                if block["lines"][0]["spans"]:
                    footer_color = get_color_feature(block["lines"][0]["spans"][0])

    return header_text, header_color, footer_text, footer_color

def get_background_color(page: fitz.Page) -> Tuple[int, int, int]:
    """
    Get the background color of a PDF page.

    Args:
        page (fitz.Page): The PDF page.

    Returns:
        Tuple[int, int, int]: The background color as an RGB tuple.
    """
    # Convert the page to an image
    pix = page.get_pixmap(alpha=False)
    img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
    
    # Convert image to numpy array
    img_array = np.array(img)
    
    # Flatten the image array
    pixels = img_array.reshape(-1, 3)
    
    # Create a histogram of colors
    hist, _ = np.histogramdd(pixels, bins=(256, 256, 256))
    
    # Find the most common color
    index = np.unravel_index(hist.argmax(), hist.shape)
    bg_color = tuple(index)
    
    # If the background is very light (close to white), perform additional analysis
    if np.mean(bg_color) > 240:
        # Calculate the standard deviation of each color channel
        std_dev = np.std(pixels, axis=0)
        
        # If there's significant variation, use a more detailed analysis
        if np.max(std_dev) > 10:
            # Create a mask for non-white pixels
            non_white_mask = np.any(pixels < 240, axis=1)
            non_white_pixels = pixels[non_white_mask]
            
            if len(non_white_pixels) > 0:
                # Calculate the mean of non-white pixels
                bg_color = tuple(np.mean(non_white_pixels, axis=0).astype(int))
    
    return bg_color

def get_visual_features(page: fitz.Page) -> Dict[str, Any]:
    """
    Extract detailed visual features from a PDF page.

    Args:
        page (fitz.Page): A single page from a PDF.

    Returns:
        Dict[str, Any]: A dictionary of visual features.
    """
    features = {
        'border_color': (0, 0, 0),
        'watermark_text': '',
        'background_color': (255, 255, 255),
        'header_text': '',
        'header_color': (255, 255, 255),
        'footer_text': '',
        'footer_color': (255, 255, 255),
        'layout_complexity': 0
    }

    # Analyze for colored borders
    drawings = page.get_drawings()
    for item in drawings:
        color = item.get('color')
        if color is not None and isinstance(color, tuple):
            features['border_color'] = get_color_feature(item)
            break

    # Analyze for watermarks
    text = page.get_text("text")
    watermark_matches = re.findall(r'watermark', text, re.IGNORECASE)
    features['watermark_text'] = " ".join(watermark_matches)

    # Extract background color
    features['background_color'] = get_background_color(page)

    # Analyze headers and footers
    blocks = page.get_text("dict")["blocks"]
    page_height = page.rect.height
    header_text, header_color, footer_text, footer_color = extract_text_and_color(blocks, page_height)
    features['header_text'] = header_text
    features['header_color'] = header_color
    features['footer_text'] = footer_text
    features['footer_color'] = footer_color

    # Layout analysis (simple example based on number of text blocks)
    features['layout_complexity'] = len(blocks)

    return features

def feature_vectorize(features: Dict[str, Any], vectorizer: TfidfVectorizer) -> np.ndarray:
    """
    Convert feature dictionary to a numerical vector for clustering.

    Args:
        features (Dict[str, Any]): A dictionary of visual features.
        vectorizer (TfidfVectorizer): A TfidfVectorizer for transforming textual features.

    Returns:
        np.ndarray: A numerical vector representation of the features.
    """
    text_features = [features['header_text'], features['footer_text'], features['watermark_text']]
    text_vector = vectorizer.transform([" ".join(text_features)]).toarray().flatten()

    color_vector = np.array([
        *normalize_color(features['border_color']),
        *normalize_color(features['background_color']),
        *normalize_color(features['header_color']),
        *normalize_color(features['footer_color'])
    ], dtype=float)

    layout_vector = np.array([
        features['layout_complexity']
    ], dtype=float)

    return np.concatenate([color_vector, layout_vector, text_vector])

def determine_optimal_clusters(feature_vectors: np.ndarray) -> int:
    """
    Determine the optimal number of clusters using the Elbow Method.

    Args:
        feature_vectors (np.ndarray): The feature vectors for clustering.

    Returns:
        int: The optimal number of clusters.
    """
    max_clusters = min(10, len(feature_vectors))  # Set upper limit for clusters
    distortions = []
    silhouette_scores = []
    K = range(2, max_clusters + 1)

    for k in K:
        kmeans = KMeans(n_clusters=k, random_state=0, n_init=10, max_iter=100)
        kmeans.fit(feature_vectors)
        distortions.append(kmeans.inertia_)
        silhouette_scores.append(silhouette_score(feature_vectors, kmeans.labels_))

    # Use silhouette score to determine the best number of clusters
    optimal_k = K[np.argmax(silhouette_scores)]
    return optimal_k

def save_features_to_csv(features_list: List[Dict[str, Any]], filename: str) -> None:
    """
    Save the features of each page to a CSV file.

    Args:
        features_list (List[Dict[str, Any]]): A list of feature dictionaries for each page.
        filename (str): The name of the CSV file to save the features.
    """
    keys = features_list[0].keys()
    with open(filename, 'w', newline='') as output_file:
        dict_writer = csv.DictWriter(output_file, fieldnames=keys)
        dict_writer.writeheader()
        dict_writer.writerows(features_list)

def partition_the_pdf_document(input_pdf: str) -> Dict[str, List[int]]:
    """
    Partition a PDF document into distinct sections based on visual features using clustering.

    Args:
        input_pdf (str): The path to the input PDF file.

    Returns:
        Dict[str, List[int]]: A dictionary where keys are document names and values are lists of page numbers.
    """
    global document  # Declare as global to access inside get_background_color
    document = fitz.open(input_pdf)
    feature_vectors = []
    features_list = []
    pages = []
    header_texts = []
    footer_texts = []
    watermark_texts = []

    for page_num in range(len(document)):
        page = document.load_page(page_num)
        features = get_visual_features(page)
        features_list.append({'page': page_num + 1, **features})
        header_texts.append(features['header_text'])
        footer_texts.append(features['footer_text'])
        watermark_texts.append(features['watermark_text'])
        pages.append(page_num + 1)

    # Save features to CSV
    save_features_to_csv(features_list, 'page_features.csv')

    # Initialize TF-IDF Vectorizer
    vectorizer = TfidfVectorizer().fit(header_texts + footer_texts + watermark_texts)

    # Create feature vectors
    for features in features_list:
        vector = feature_vectorize(features, vectorizer)
        feature_vectors.append(vector)

    # Convert to numpy array
    feature_vectors = np.array(feature_vectors)

    # Determine the optimal number of clusters
    num_clusters = determine_optimal_clusters(feature_vectors)
    print(f"Optimal number of clusters: {num_clusters}")

    # Perform clustering
    kmeans = KMeans(n_clusters=num_clusters, random_state=0, n_init=10, max_iter=100).fit(feature_vectors)
    labels = kmeans.labels_

    # Group pages by cluster labels
    document_groups = defaultdict(list)
    for page_num, label in zip(pages, labels):
        document_groups[f"Document {label + 1}"].append(page_num)

    return dict(document_groups)

# Usage
input_pdf: str = "grouped_documents.pdf"
partitions: Dict[str, List[int]] = partition_the_pdf_document(input_pdf)
print(f"Partitions: {partitions}")


Optimal number of clusters: 6
Partitions: {'Document 6': [1], 'Document 5': [2, 3], 'Document 4': [4, 5, 6, 16, 17, 18], 'Document 3': [7, 8, 9], 'Document 1': [10, 11, 12], 'Document 2': [13, 14, 15]}
