In [106]:
# IMPORTS
import pytesseract  # For OCR using Tesseract
import time         # For measuring execution time
import pdf2image    # For converting PDF pages to images
from PIL import Image  # For image processing
import requests     # For API calls to Anthropic
import difflib      # For text comparison
import numpy as np  # For numerical operations
import cv2          # For advanced image processing
import matplotlib.pyplot as plt  # For visualizations
import io           # For handling byte streams
import base64       # For encoding/decoding binary data
from dotenv import load_dotenv  # For loading environment variables from .env file
from tqdm import tqdm  # For progress bars
import os           # For interacting with the operating system
import logging      # For logging messages
import concurrent.futures  # For parallel execution of tasks
from requests.adapters import HTTPAdapter  # For configuring HTTP requests
from requests.packages.urllib3.util.retry import Retry  # For implementing retry logic in HTTP requests
from google.cloud import vision # For the Google Cloud Vision client library
import re
import string
import pandas as pd
import Levenshtein  # For efficient edit distance calculations
import anthropic
import pickle
from dataclasses import dataclass
from typing import List, Dict, Optional
from fuzzywuzzy import fuzz
from abc import ABC, abstractmethod

In [107]:
# Constants
MAX_WORKERS = os.cpu_count() or 4 # Adjust workers to the number of CPU cores available of current system

In [5]:
# Types

@dataclass
class BoundingBox:
    page_number: int
    x: float
    y: float
    width: float
    height: float
    text: str
    confidence: Optional[float] = None
    type: Optional[str] = None  # For Mathpix: 'line' or 'word'

@dataclass
class OCRPageResult:
    page_number: int
    text: str
    bounding_boxes: List[BoundingBox]
    processing_time: float
    avg_confidence: Optional[float] = None  # For Tesseract

@dataclass
class OCRResult:
    ocr_text: str
    page_results: List[OCRPageResult]
    total_time: float
    ocr_engine: str

@dataclass
class AnthropicPageResult:
    page_number: int
    text: str
    processing_time: float

@dataclass
class AnthropicResult:
    full_text: str
    page_results: List[AnthropicPageResult]
    total_time: float

In [None]:
# LOAD ENVIRONMENT VARIABLES
load_dotenv()

# ANTHROPIC
ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY')
if not ANTHROPIC_API_KEY:
    raise ValueError("Please set the ANTHROPIC_API_KEY environment variable.")

print(f"Anthropic API Key: {ANTHROPIC_API_KEY[:5]}...{ANTHROPIC_API_KEY[-5:]}")  # Only print first and last 5 characters

# Initialize the Anthropic client at the top of your script
client = anthropic.Anthropic()

# MATHPIX
MATHPIX_APP_ID = os.getenv('MATHPIX_APP_ID')
MATHPIX_APP_KEY = os.getenv('MATHPIX_APP_KEY')

if not MATHPIX_APP_ID or not MATHPIX_APP_KEY:
    raise ValueError("Please set the MATHPIX_APP_ID and MATHPIX_APP_KEY environment variables.")

# GOOGLE CLOUD VISION
GOOGLE_APPLICATION_CREDENTIALS = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
if not GOOGLE_APPLICATION_CREDENTIALS:
    raise ValueError("Please set the GOOGLE_APPLICATION_CREDENTIALS environment variable.")

In [109]:
# For checkpointing
def save_checkpoint(data, filename):
       with open(filename, 'wb') as f:
           pickle.dump(data, f)

def load_checkpoint(filename):
    if os.path.exists(filename):
        with open(filename, 'rb') as f:
            return pickle.load(f)
    return None

In [110]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Setup a session with retry strategy
session = requests.Session()
retry = Retry(
    total=5,
    backoff_factor=1,
    status_forcelist=[429, 500, 502, 503, 504],
    allowed_methods=["POST"]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("https://", adapter)
session.mount("http://", adapter)

In [111]:
def image_to_base64(image, max_dim=1568):
    """
    Converts a PIL Image to a base64-encoded string after resizing if necessary.

    Parameters:
    - image (PIL.Image): The image to convert.
    - max_dim (int): Maximum dimension (width or height) in pixels.

    Returns:
    - str: Base64-encoded string of the image.
    """
    try:
        # Resize image if any dimension exceeds max_dim
        if max(image.size) > max_dim:
            ratio = max_dim / max(image.size)
            new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio))
            image = image.resize(new_size, Image.LANCZOS)
        
        buffered = io.BytesIO()
        image.save(buffered, format="PNG")  # Use PNG for lossless quality
        img_bytes = buffered.getvalue()
        img_base64 = base64.b64encode(img_bytes).decode('utf-8')
        return img_base64
    except Exception as e:
        logger.error(f"Error in image_to_base64: {e}", exc_info=True)
        return None

In [112]:
def preprocess_image(image):
    """
    Preprocess the image to enhance OCR accuracy.

    Parameters:
    - image (PIL.Image): The image to preprocess.

    Returns:
    - PIL.Image: The preprocessed image.
    """
    try:
        # Convert to grayscale
        image = image.convert('L')
        
        # Convert to NumPy array for OpenCV processing
        image_np = np.array(image)
        
        # Apply denoising
        image_np = cv2.fastNlMeansDenoising(image_np, h=30)

        # Apply adaptive thresholding
        image_np = cv2.adaptiveThreshold(image_np, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, 
                                        cv2.THRESH_BINARY, 11, 2)
        
        # Detect and correct skew
        coords = np.column_stack(np.where(image_np > 0))
        angle = cv2.minAreaRect(coords)[-1]
        if angle < -45:
            angle = -(90 + angle)
        else:
            angle = -angle
        
        (h, w) = image_np.shape
        center = (w // 2, h // 2)
        M = cv2.getRotationMatrix2D(center, angle, 1.0)
        rotated = cv2.warpAffine(image_np, M, (w, h), flags=cv2.INTER_CUBIC, 
                                borderMode=cv2.BORDER_REPLICATE)
        
        # Convert back to PIL Image
        preprocessed_image = Image.fromarray(image_np)
        return preprocessed_image
    except Exception as e:
        logger.error(f"Error in preprocess_image: {e}", exc_info=True)
        return image  # Return original image if preprocessing fails

In [113]:
def convert_pdf_to_images(pdf_path, dpi=300):
    """
    Converts a PDF file to a list of PIL Image objects.

    Parameters:
    - pdf_path (str): Path to the PDF file.
    - dpi (int): Resolution for the conversion.

    Returns:
    - list of PIL.Image: List of images representing each PDF page.
    """
    try:
        images = pdf2image.convert_from_path(pdf_path, dpi=dpi)
        logger.info(f"Converted PDF to {len(images)} images.")
        return images
    except FileNotFoundError:
        logger.error(f"PDF file not found: {pdf_path}")
        return []
    except Exception as e:
        logger.error(f"Failed to convert PDF to images: {e}", exc_info=True)
        return []

In [7]:
# Base OCR Class

class BaseOCR(ABC):
    def __init__(self, max_workers=MAX_WORKERS):
        self.max_workers = max_workers

    @abstractmethod
    def process_page(self, page_number, image):
        pass

    def perform_ocr(self, pdf_path):
        images = convert_pdf_to_images(pdf_path)
        num_pages = len(images)
        logging.info(f"Total pages to process: {num_pages}")

        if num_pages == 0:
            logging.warning("No images extracted from PDF.")
            return OCRResult(ocr_text='', page_results=[], total_time=0.0, ocr_engine=self.__class__.__name__)

        total_start_time = time.time()

        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {executor.submit(self.process_page, i, img): i for i, img in enumerate(images)}
            
            page_results = []
            for future in tqdm(concurrent.futures.as_completed(futures), total=num_pages, desc=f"Performing {self.__class__.__name__} OCR"):
                page_number = futures[future]
                try:
                    page_result = future.result()
                    page_results.append(page_result)
                except Exception as e:
                    logging.error(f"Error processing page {page_number + 1}: {e}", exc_info=True)
                    page_results.append(OCRPageResult(page_number=page_number+1, text='', bounding_boxes=[], processing_time=0.0))

        total_time = time.time() - total_start_time
        logging.info(f"Total {self.__class__.__name__} OCR time: {total_time:.2f} seconds.")

        ocr_text = '\n'.join(result.text for result in page_results)
        return OCRResult(ocr_text=ocr_text, page_results=page_results, total_time=total_time, ocr_engine=self.__class__.__name__)

In [8]:
def process_page(page_number, image):
    """
    Processes a single PDF page: preprocesses image, sends API request, and retrieves text and processing time.

    Parameters:
    - page_number (int): The page number.
    - image (PIL.Image): The image of the page.

    Returns:
    - AnthropicPageResult: Extracted text and processing time for the page.
    """
    try:
        # Preprocess image
        preprocessed_image = preprocess_image(image)
        
        # Convert image to base64
        image_base64 = image_to_base64(preprocessed_image)
        
        # Prepare the message content
        message_content = [
            {
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": "image/png",
                    "data": image_base64,
                },
            },
            {
                "type": "text",
                "text": "Output all the text in this page."
            }
        ]
        
        # Make the API call using the Anthropic client
        start_time = time.time()
        message = client.messages.create(
            model="claude-3-5-sonnet-20240620",
            max_tokens=1024,
            messages=[
                {
                    "role": "user",
                    "content": message_content,
                }
            ],
        )
        elapsed_time = time.time() - start_time

        # Extract the text from the response
        text = message.content[0].text if message.content else ''

        logging.info(f"Page {page_number} processed in {elapsed_time:.2f} seconds.")
        return AnthropicPageResult(page_number=page_number, text=text, processing_time=elapsed_time)
    except Exception as e:
        logging.error(f"Error processing page {page_number}: {e}", exc_info=True)
        return AnthropicPageResult(page_number=page_number, text='', processing_time=0.0)

    
def get_anthropic_ground_truth(pdf_path, max_workers=5):
    """
    Extracts ground truth text from a PDF using the Anthropic Vision API.

    Parameters:
    - pdf_path (str): Path to the PDF file.
    - max_workers (int): Number of worker processes for parallel processing.

    Returns:
    - AnthropicResult: Extracted text, per-page results, and total processing time.
    """
    images = convert_pdf_to_images(pdf_path)
    
    if not images:
        logging.warning("No images to process.")
        return AnthropicResult(full_text='', page_results=[], total_time=0.0)
    
    num_pages = len(images)
    logging.info(f"Total pages to process with Anthropic: {num_pages}")
    
    # Start total processing timing
    start_total = time.time()
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(process_page, page_number, image)
            for page_number, image in enumerate(images, start=1)
        ]
        page_results = []
        for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Processing pages"):
            page_result = future.result()
            page_results.append(page_result)
    
    # Sort page results by page number
    page_results.sort(key=lambda x: x.page_number)
    
    # Concatenate all pages' text
    full_text = '\n'.join(result.text for result in page_results)
    
    # Calculate total processing time
    total_time = time.time() - start_total
    logging.info(f"Total time to obtain ground truth: {total_time:.2f} seconds.")
    
    return AnthropicResult(full_text=full_text, page_results=page_results, total_time=total_time)


In [116]:
# Define a helper function for OCR processing
def ocr_page(page_number, image):
    """
    Performs OCR on a single image and measures processing time.

    Parameters:
    - page_number (int): The page number.
    - image (PIL.Image): The image to perform OCR on.

    Returns:
    - (text, data, processing_time, avg_confidence): Tuple containing OCR text, data, time taken, and average confidence.
    """
    try:
        # Preprocess image using existing function
        preprocessed_image = preprocess_image(image)

        # Start timing
        start_time = time.time()

        # Perform OCR to extract text with configuration
        custom_config = r'--oem 3 --psm 6'  # Example configuration
        text = pytesseract.image_to_string(preprocessed_image, config=custom_config, lang='eng')

        # Perform OCR to extract data with bounding boxes and confidence levels
        data = pytesseract.image_to_data(preprocessed_image, output_type=pytesseract.Output.DICT, config=custom_config, lang='eng')

        # Extract confidence levels
        confidences = data['conf']

        # End timing
        end_time = time.time()
        processing_time = end_time - start_time

        logging.info(f"Page {page_number + 1} processed in {processing_time:.2f} seconds.")

        # Calculate average confidence for this page
        valid_confidences = [conf for conf in confidences if conf != -1]  # -1 indicates no confidence available
        avg_confidence = sum(valid_confidences) / len(valid_confidences) if valid_confidences else None

        if avg_confidence is not None:
            logging.info(f"Page {page_number + 1} average confidence: {avg_confidence:.2f}")
        else:
            logging.info(f"Page {page_number + 1} average confidence: Not available")

        return text, data, processing_time, avg_confidence
    except Exception as e:
        logging.error(f"Error processing page {page_number + 1}: {e}")
        return '', {}, 0.0, None

def perform_tesseract_ocr(pdf_path, max_workers=MAX_WORKERS):
    """
    Performs OCR on a PDF document using Tesseract and evaluates processing speed.

    Parameters:
    - pdf_path (str): Path to the PDF file.
    - max_workers (int or None): Number of worker processes for parallel OCR.
                                 If None, it will default to os.cpu_count().

    Returns:
    - ocr_text (str): Concatenated OCR text from all pages.
    - ocr_data_pages (list): List of OCR data dictionaries with bounding boxes.
    - total_time (float): Total time taken for OCR in seconds.
    - per_page_times (list): List of processing times per page in seconds.
    """
    ocr_text_pages = []
    ocr_data_pages = []
    per_page_times = []
    per_page_confidences = []

    # Convert PDF to images using existing function
    images = convert_pdf_to_images(pdf_path)
    num_pages = len(images)
    logging.info(f"Total pages to process: {num_pages}")

    if num_pages == 0:
        logging.warning("No images extracted from PDF.")
        return '', [], 0.0, []

    # Start total OCR timing
    total_start_time = time.time()

    # Use ThreadPoolExecutor for parallel processing
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(ocr_page, i, img): i for i, img in enumerate(images)}

        for future in tqdm(concurrent.futures.as_completed(futures), total=num_pages, desc="Performing Tesseract OCR"):
            page_number = futures[future]
            try:
                text, data, proc_time, avg_confidence = future.result()
                ocr_text_pages.append(text)
                ocr_data_pages.append(data)
                per_page_times.append(proc_time)
                per_page_confidences.append(avg_confidence)
            except Exception as e:
                logger.error(f"Error retrieving result for page {page_number + 1}: {e}", exc_info=True)



    # End total OCR timing
    total_end_time = time.time()
    total_time = total_end_time - total_start_time
    logging.info(f"Total OCR time: {total_time:.2f} seconds.")

    # Concatenate all pages' text
    ocr_text = '\n'.join(ocr_text_pages)

    page_results = []
    for i, (text, data, proc_time, avg_confidence) in enumerate(zip(ocr_text_pages, ocr_data_pages, per_page_times, per_page_confidences)):
        bounding_boxes = [
            BoundingBox(
                page_number=i+1,
                x=data['left'][j],
                y=data['top'][j],
                width=data['width'][j],
                height=data['height'][j],
                text=data['text'][j],
                confidence=data['conf'][j] if data['conf'][j] != -1 else None
            )
            for j in range(len(data['text']))
            if data['text'][j].strip()
        ]
        page_results.append(OCRPageResult(
            page_number=i+1,
            text=text,
            bounding_boxes=bounding_boxes,
            processing_time=proc_time,
            avg_confidence=avg_confidence
        ))

    return OCRResult(
        ocr_text=ocr_text,
        page_results=page_results,
        total_time=total_time,
        ocr_engine="Tesseract"
    )

In [117]:
# Define a helper function for OCR processing
def ocr_page_mathpix(page_number, image):
    """
    Performs OCR on a single image using MathPix and extracts bounding boxes with confidence levels.

    Parameters:
    - page_number (int): The page number.
    - image (PIL.Image): The image to perform OCR on.

    Returns:
    - (text, bounding_boxes, processing_time): Tuple containing OCR text,
      bounding boxes with confidence, and time taken.
    """
    try:
        # Preprocess image using existing function
        preprocessed_image = preprocess_image(image)
        
        # Convert image to base64
        image_base64 = image_to_base64(preprocessed_image)
        
        # Prepare the JSON payload
        payload = {
            "src": f"data:image/png;base64,{image_base64}",
            "formats": ["text", "data"],
            "data_options": {
                "include_line_data": True,
                "include_word_data": True
            },
            "rm_spaces": True
        }
        
        headers = {
            "app_id": MATHPIX_APP_ID,
            "app_key": MATHPIX_APP_KEY,
            "Content-type": "application/json"
        }
        
        api_endpoint = "https://api.mathpix.com/v3/text"
        
        # Start timing
        start_time = time.time()
        
        # Send OCR request to MathPix
        response = requests.post(api_endpoint, json=payload, headers=headers)
        elapsed_time = time.time() - start_time

        response.raise_for_status()
        
        response_json = response.json()
        text = response_json.get('text', '')
        
        # Extract bounding boxes from line_data and word_data
        bounding_boxes = []
        
        # Extract Line Bounding Boxes
        line_data = response_json.get('line_data', [])
        for line in line_data:
            if line.get('included', False):
                cnt = line.get('cnt', [])
                # Calculate bounding box from contour points
                if cnt:
                    xs = [point[0] for point in cnt]
                    ys = [point[1] for point in cnt]
                    bbox = {
                        "page_number": page_number + 1,
                        "type": "line",
                        "x": min(xs),
                        "y": min(ys),
                        "width": max(xs) - min(xs),
                        "height": max(ys) - min(ys),
                        "text": line.get('text', ''),
                        "confidence": line.get('confidence', None),
                        "confidence_rate": line.get('confidence_rate', None)
                    }
                    bounding_boxes.append(bbox)
        
        # Extract Word Bounding Boxes
        word_data = response_json.get('word_data', [])
        for word in word_data:
            if word.get('type') == 'text' and word.get('included', False):
                cnt = word.get('cnt', [])
                if cnt:
                    xs = [point[0] for point in cnt]
                    ys = [point[1] for point in cnt]
                    bbox = {
                        "page_number": page_number + 1,
                        "type": "word",
                        "x": min(xs),
                        "y": min(ys),
                        "width": max(xs) - min(xs),
                        "height": max(ys) - min(ys),
                        "text": word.get('text', ''),
                        "confidence": word.get('confidence', None),
                        "confidence_rate": word.get('confidence_rate', None)
                    }
                    bounding_boxes.append(bbox)
        
        logging.info(f"MathPix - Page {page_number + 1} processed in {elapsed_time:.2f} seconds.")
        return text, bounding_boxes, elapsed_time
    except requests.exceptions.RequestException as e:
        logger.error(f"MathPix API request failed for page {page_number + 1}: {e}", exc_info=True)
        return '', [], 0.0
    except Exception as e:
        logger.error(f"MathPix - Exception on page {page_number + 1}: {e}", exc_info=True)
        return '', [], 0.0
    
def perform_mathpix_ocr(pdf_path, max_workers=MAX_WORKERS):
    """
    Performs OCR on a PDF document using MathPix and evaluates processing speed.

    Parameters:
    - pdf_path (str): Path to the PDF file.
    - max_workers (int): Number of worker processes for parallel OCR.

    Returns:
    - ocr_text (str): Concatenated OCR text from all pages.
    - bounding_boxes_all_pages (list): List of bounding boxes dictionaries for all pages.
    - total_time (float): Total time taken for OCR in seconds.
    - per_page_times (list): List of processing times per page in seconds.
    """
    ocr_text_pages = []
    bounding_boxes_all_pages = []
    per_page_times = []
    images = convert_pdf_to_images(pdf_path)
    num_pages = len(images)
    logging.info(f"Total pages to process with MathPix: {num_pages}")

    if num_pages == 0:
        logging.warning("No images extracted from PDF.")
        return '', [], 0.0, []

    # Start total OCR timing
    total_start_time = time.time()

    # Use ThreadPoolExecutor for parallel processing
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all pages to the executor
        futures = {executor.submit(ocr_page_mathpix, i, img): i for i, img in enumerate(images)}
        
        # Iterate over completed futures with a progress bar
        for future in tqdm(concurrent.futures.as_completed(futures), total=num_pages, desc="Performing MathPix OCR"):
            page_number = futures[future]
            try:
                text, bounding_boxes, proc_time = future.result()
                ocr_text_pages.append(text)
                bounding_boxes_all_pages.extend(bounding_boxes)
                per_page_times.append(proc_time)
            except Exception as e:
                logger.error(f"Error retrieving MathPix result for page {page_number + 1}: {e}", exc_info=True)
                ocr_text_pages.append('')
                per_page_times.append(0.0)
    
    # End total OCR timing
    total_end_time = time.time()
    total_time = total_end_time - total_start_time
    logging.info(f"Total MathPix OCR time: {total_time:.2f} seconds.")

    # Concatenate all pages' text
    ocr_text = '\n'.join(ocr_text_pages)

    page_results = []
    for i, (text, bounding_boxes, proc_time) in enumerate(zip(ocr_text_pages, bounding_boxes_all_pages, per_page_times)):
        page_bounding_boxes = [
            BoundingBox(
                page_number=bbox['page_number'],
                x=bbox['x'],
                y=bbox['y'],
                width=bbox['width'],
                height=bbox['height'],
                text=bbox['text'],
                confidence=bbox['confidence'],
                type=bbox['type']
            )
            for bbox in bounding_boxes if bbox['page_number'] == i+1
        ]
        page_results.append(OCRPageResult(
            page_number=i+1,
            text=text,
            bounding_boxes=page_bounding_boxes,
            processing_time=proc_time
        ))

    return OCRResult(
        ocr_text=ocr_text,
        page_results=page_results,
        total_time=total_time,
        ocr_engine="Mathpix"
    )

In [118]:
def ocr_page_google(page_number, image):
    """
    Performs OCR on a single image using Google Cloud Vision, extracts bounding boxes with confidence scores,
    and measures processing time.

    Parameters:
    - page_number (int): The page number.
    - image (PIL.Image): The image to perform OCR on.

    Returns:
    - tuple:
        - text (str): The full extracted text.
        - bounding_boxes (list): List of dictionaries with 'text', 'bounding_box', and 'confidence'.
        - elapsed_time (float): Time taken to process the page in seconds.
    """
    try:
        # Initialize Google Cloud Vision client inside the worker
        client = vision.ImageAnnotatorClient()
        
        # Preprocess image using existing function
        preprocessed_image = preprocess_image(image)
        
        # Convert image to bytes
        img_byte_arr = io.BytesIO()
        preprocessed_image.save(img_byte_arr, format='PNG')
        img_bytes = img_byte_arr.getvalue()
        
        # Create Image object
        vision_image = vision.Image(content=img_bytes)
        
        # Start timing
        start_time = time.time()
        
        # Perform text detection
        response = client.text_detection(image=vision_image)
        elapsed_time = time.time() - start_time
        
        if response.error.message:
            logging.error(f"Google Vision - Error on page {page_number + 1}: {response.error.message}")
            return '', [], elapsed_time
        
        texts = response.text_annotations
        if texts:
            # The first text_annotation is the full text
            full_text = texts[0].description
            
            # Extract bounding boxes and confidence scores for each detected text element (excluding the full text)
            bounding_boxes = []
            for text in texts[1:]:
                bbox = [(vertex.x, vertex.y) for vertex in text.bounding_poly.vertices]
                confidence = text.confidence if hasattr(text, 'confidence') else None
                bounding_boxes.append({
                    'text': text.description,
                    'bounding_box': bbox,
                    'confidence': confidence
                })
        else:
            full_text = ''
            bounding_boxes = []
        
        logging.info(f"Google Vision - Page {page_number + 1} processed in {elapsed_time:.2f} seconds.")
        return full_text, bounding_boxes, elapsed_time
    except Exception as e:
        logging.error(f"Google Vision - Exception on page {page_number + 1}: {e}")
        return '', [], 0.0

# Google Cloud Vision OCR Implementation
def perform_google_cloud_vision_ocr(pdf_path, max_workers=MAX_WORKERS):
    """
    Performs OCR on a PDF document using Google Cloud Vision and evaluates processing speed.

    Parameters:
    - pdf_path (str): Path to the PDF file.
    - max_workers (int or None): Number of worker processes for parallel OCR.
                                  If None, defaults to the number of CPU cores.

    Returns:
    - tuple:
        - ocr_text (str): Concatenated OCR text from all pages.
        - bounding_boxes_pages (list): List of bounding boxes per page.
        - total_time (float): Total time taken for OCR in seconds.
        - per_page_times (list): List of processing times per page in seconds.
    """
    ocr_text_pages = []
    bounding_boxes_pages = []
    per_page_times = []
    images = convert_pdf_to_images(pdf_path)
    num_pages = len(images)
    logging.info(f"Total pages to process with Google Cloud Vision: {num_pages}")

    if num_pages == 0:
        logging.warning("No images extracted from PDF.")
        return '', [], 0.0, []

    # Start total OCR timing
    total_start_time = time.time()

    # Use ThreadPoolExecutor for parallel processing
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all pages to the executor
        futures = {executor.submit(ocr_page_google, i, img): i for i, img in enumerate(images)}
        
        # Iterate over completed futures with a progress bar
        for future in tqdm(concurrent.futures.as_completed(futures), total=num_pages, desc="Performing Google Cloud Vision OCR"):
            page_number = futures[future]
            try:
                text, bounding_boxes, proc_time = future.result()
                ocr_text_pages.append(text)
                bounding_boxes_pages.append(bounding_boxes)
                per_page_times.append(proc_time)
            except Exception as e:
                logging.error(f"Error retrieving Google Cloud Vision result for page {page_number + 1}: {e}", exc_info=True)
                ocr_text_pages.append('')
                bounding_boxes_pages.append([])
                per_page_times.append(0.0)

    # End total OCR timing
    total_end_time = time.time()
    total_time = total_end_time - total_start_time
    logging.info(f"Total Google Cloud Vision OCR time: {total_time:.2f} seconds.")

    # Concatenate all pages' text
    ocr_text = '\n'.join(ocr_text_pages)

    page_results = []
    for i, (text, bounding_boxes, proc_time) in enumerate(zip(ocr_text_pages, bounding_boxes_pages, per_page_times)):
        page_bounding_boxes = [
            BoundingBox(
                page_number=i+1,
                x=min(vertex[0] for vertex in bbox['bounding_box']),
                y=min(vertex[1] for vertex in bbox['bounding_box']),
                width=max(vertex[0] for vertex in bbox['bounding_box']) - min(vertex[0] for vertex in bbox['bounding_box']),
                height=max(vertex[1] for vertex in bbox['bounding_box']) - min(vertex[1] for vertex in bbox['bounding_box']),
                text=bbox['text'],
                confidence=bbox['confidence']
            )
            for bbox in bounding_boxes
        ]
        page_results.append(OCRPageResult(
            page_number=i+1,
            text=text,
            bounding_boxes=page_bounding_boxes,
            processing_time=proc_time
        ))

    return OCRResult(
        ocr_text=ocr_text,
        page_results=page_results,
        total_time=total_time,
        ocr_engine="Google Cloud Vision"
    )

In [119]:
# EVALUATION

In [None]:
def normalize_text(text: str) -> str:
    """Normalize text by lowercasing and removing extra whitespace."""
    return ' '.join(text.lower().split())

def character_error_rate(reference: str, hypothesis: str) -> float:
    """Calculate the Character Error Rate (CER)."""
    return Levenshtein.distance(reference, hypothesis) / len(reference)

def word_error_rate(reference: str, hypothesis: str) -> float:
    """Calculate the Word Error Rate (WER)."""
    ref_words = reference.split()
    hyp_words = hypothesis.split()
    return Levenshtein.distance(ref_words, hyp_words) / len(ref_words)

def fuzzy_string_match(reference: str, hypothesis: str) -> float:
    """Calculate a fuzzy string match score using fuzzywuzzy."""
    return fuzz.ratio(reference, hypothesis) / 100.0

def confidence_weighted_accuracy(reference: str, hypothesis: str, confidences: List[float]) -> float:
    """Calculate the Confidence Weighted Accuracy (CWA)."""
    ref_words = reference.split()
    hyp_words = hypothesis.split()
    
    if len(hyp_words) != len(confidences):
        raise ValueError("Number of words in hypothesis doesn't match number of confidence scores")
    
    correct_confidence_sum = sum(conf for ref, hyp, conf in zip(ref_words, hyp_words, confidences) if ref == hyp)
    total_confidence_sum = sum(confidences)
    
    return correct_confidence_sum / total_confidence_sum if total_confidence_sum > 0 else 0.0

def evaluate_ocr(ground_truth: AnthropicResult, ocr_result: OCRResult) -> Dict[str, float]:
    """Evaluate OCR results against ground truth."""
    gt_text = normalize_text(ground_truth.full_text)
    ocr_text = normalize_text(ocr_result.ocr_text)
    
    evaluation = {
        'CER': character_error_rate(gt_text, ocr_text),
        'WER': word_error_rate(gt_text, ocr_text),
        'Fuzzy_Match': fuzzy_string_match(gt_text, ocr_text),
        'Total_Time': ocr_result.total_time,
        'Avg_Time_Per_Page': np.mean([page.processing_time for page in ocr_result.page_results]),
    }
    
    # Calculate CWA if confidence scores are available
    if hasattr(ocr_result.page_results[0], 'avg_confidence'):
        confidences = [page.avg_confidence for page in ocr_result.page_results if page.avg_confidence is not None]
        if confidences:
            evaluation['CWA'] = confidence_weighted_accuracy(gt_text, ocr_text, confidences)
    
    return evaluation

def evaluate_all_ocrs(ground_truth: AnthropicResult, ocr_results: Dict[str, OCRResult]) -> pd.DataFrame:
    """Evaluate all OCR results and return a DataFrame with the results."""
    evaluations = {}
    for ocr_name, ocr_result in ocr_results.items():
        evaluations[ocr_name] = evaluate_ocr(ground_truth, ocr_result)
    
    return pd.DataFrame(evaluations).transpose()

# Usage example
if __name__ == "__main__":
    try:
        # Path to your PDF file
        pdf_path = "bagel_jays.pdf"

        # Assume you have these results from previous processing
        # Ground Truth Text
        ground_truth_checkpoint = 'ground_truth_checkpoint.pkl'
        ground_truth = load_checkpoint(ground_truth_checkpoint)
        if ground_truth is None:
            ground_truth = get_anthropic_ground_truth(pdf_path)
            save_checkpoint(ground_truth, ground_truth_checkpoint)

        # Tesseract OCR
        tesseract_checkpoint = 'tesseract_checkpoint.pkl'
        tesseract_result = load_checkpoint(tesseract_checkpoint)
        if tesseract_result is None:
            tesseract_result = perform_tesseract_ocr(pdf_path)
            save_checkpoint(tesseract_result, tesseract_checkpoint)

        # MathPix OCR
        mathpix_checkpoint = 'mathpix_checkpoint.pkl'
        mathpix_result = load_checkpoint(mathpix_checkpoint)
        if mathpix_result is None:
            mathpix_result = perform_mathpix_ocr(pdf_path)
            save_checkpoint(mathpix_result, mathpix_checkpoint)

        # Google Cloud Vision OCR
        gcv_checkpoint = 'gcv_checkpoint.pkl'
        gcv_result = load_checkpoint(gcv_checkpoint)
        if gcv_result is None:
            gcv_result = perform_google_cloud_vision_ocr(pdf_path)
            save_checkpoint(gcv_result, gcv_checkpoint)
        
        ocr_results = {
            'Tesseract': tesseract_result,
            'Mathpix': mathpix_result,
            'Google Cloud Vision': gcv_result
        }
        
        # Evaluation
        evaluation_checkpoint = 'evaluation_checkpoint.pkl'
        evaluation_df = load_checkpoint(evaluation_checkpoint)
        if evaluation_df is None:
            evaluation_df = evaluate_all_ocrs(ground_truth, ocr_results)
            save_checkpoint(evaluation_df, evaluation_checkpoint)
        
        print("OCR Evaluation Results:")
        print(evaluation_df)
        
        # Visualize results
        import matplotlib.pyplot as plt
        
        metrics = ['CER', 'WER', 'Fuzzy_Match', 'Total_Time', 'Avg_Time_Per_Page']
        
        fig, axes = plt.subplots(len(metrics), 1, figsize=(10, 5*len(metrics)))
        for i, metric in enumerate(metrics):
            evaluation_df[metric].plot(kind='bar', ax=axes[i], title=f'{metric} Comparison')
            axes[i].set_ylabel(metric)
        
        plt.tight_layout()
        plt.show()

        # Detailed per-page analysis
        def per_page_analysis(ground_truth: AnthropicResult, ocr_result: OCRResult) -> pd.DataFrame:
            per_page_metrics = []
            for gt_page, ocr_page in zip(ground_truth.page_results, ocr_result.page_results):
                gt_text = normalize_text(gt_page.text)
                ocr_text = normalize_text(ocr_page.text)
                metrics = {
                    'Page': gt_page.page_number,
                    'CER': character_error_rate(gt_text, ocr_text),
                    'WER': word_error_rate(gt_text, ocr_text),
                    'Fuzzy_Match': fuzzy_string_match(gt_text, ocr_text),
                    'Processing_Time': ocr_page.processing_time
                }
                if hasattr(ocr_page, 'avg_confidence'):
                    metrics['Confidence'] = ocr_page.avg_confidence
                per_page_metrics.append(metrics)
            return pd.DataFrame(per_page_metrics)

        # Perform per-page analysis for each OCR method
        per_page_checkpoint = 'per_page_checkpoint.pkl'
        per_page_results = load_checkpoint(per_page_checkpoint)
        if per_page_results is None:
            per_page_results = {}
            for ocr_name, ocr_result in ocr_results.items():
                per_page_results[ocr_name] = per_page_analysis(ground_truth, ocr_result)
            save_checkpoint(per_page_results, per_page_checkpoint)

        for ocr_name, per_page_df in per_page_results.items():
            print(f"\nPer-page analysis for {ocr_name}:")
            print(per_page_df)
            
            # Visualize per-page results
            fig, axes = plt.subplots(2, 2, figsize=(15, 10))
            per_page_df.plot(x='Page', y='CER', ax=axes[0, 0], title='CER by Page')
            per_page_df.plot(x='Page', y='WER', ax=axes[0, 1], title='WER by Page')
            per_page_df.plot(x='Page', y='Fuzzy_Match', ax=axes[1, 0], title='Fuzzy Match by Page')
            per_page_df.plot(x='Page', y='Processing_Time', ax=axes[1, 1], title='Processing Time by Page')
            plt.tight_layout()
            plt.show()
            
    except Exception as e:
        logger.error(f"An error occurred during execution: {e}", exc_info=True)