# PDF Annotation Testing Script

This script allows you to test the annotation process on a single PDF file.
It provides the core functionality of the annotation system in a format
that can be easily run or converted to a notebook.

In [1]:
import json
import os
import logging
import time
from datetime import datetime
from typing import List, Dict, Any, Tuple
import numpy as np
try:
    import matplotlib.pyplot as plt
    from PIL import Image, ImageDraw
    from tqdm import tqdm
    VISUALIZATION_AVAILABLE = True
except ImportError:
    VISUALIZATION_AVAILABLE = False
    print("Visualization libraries not available. Install with: pip install matplotlib pillow tqdm")

Variables

In [2]:
# Use configuration variables instead of argparse
# No problem:
# pdf_path = "CUAD_v1/full_contract_pdf/Part_I/Affiliate_Agreements/CreditcardscomInc_20070810_S-1_EX-10.33_362297_EX-10.33_Affiliate Agreement.pdf"
# pdf_path = "CUAD_v1/full_contract_pdf/Part_I/Affiliate_Agreements/DigitalCinemaDestinationsCorp_20111220_S-1_EX-10.10_7346719_EX-10.10_Affiliate Agreement.pdf"

# Problem annotating:
# pdf_path = "CUAD_v1/full_contract_pdf/Part_I/License_Agreements/EuromediaHoldingsCorp_20070215_10SB12G_EX-10.B(01)_525118_EX-10.B(01)_Content License Agreement.pdf"
# pdf_path = "CUAD_v1/full_contract_pdf/Part_I/License_Agreements/CytodynInc_20200109_10-Q_EX-10.5_11941634_EX-10.5_License Agreement.pdf"
# pdf_path = "CUAD_v1/full_contract_pdf/Part_I/Co_Branding/PcquoteComInc_19990721_S-1A_EX-10.11_6377149_EX-10.11_Co-Branding Agreement2.pdf"
pdf_path = "CUAD_v1/full_contract_pdf/Part_I/Co_Branding/InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement.pdf"


annotation_file = "CUAD_v1/CUAD_v1.json"
dpi = 400
page = None  # Specific page to process (1-indexed)
search_text = None  # Text to search for in the document

Add logging file

In [3]:
# Ensure log directory exists
log_dir = "test_annotate/logs"
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"annotation_process_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")

# Create logger
logger = logging.getLogger('pdf_annotation')
logger.setLevel(logging.DEBUG)

# Create handlers
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler(log_file, mode='w')

# Create formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

# Add handlers to logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)

# # Test messages
# logger.debug("Debug message test")
# logger.info("Info message test")
# logger.warning("Warning message test")
# logger.error("Error message test")

Log Variables

In [4]:
logger.info("Starting CUAD dataset processing with improved matching algorithms")
logger.info(f"Pdf path: {pdf_path}")
logger.info(f"Dpi: {dpi}")
logger.info(f"Page: {page}")
logger.info(f"Search text: {search_text}")

2025-04-18 14:21:01,220 - INFO - Starting CUAD dataset processing with improved matching algorithms
2025-04-18 14:21:01,220 - INFO - Pdf path: CUAD_v1/full_contract_pdf/Part_I/Co_Branding/InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement.pdf
2025-04-18 14:21:01,223 - INFO - Dpi: 400
2025-04-18 14:21:01,223 - INFO - Page: None
2025-04-18 14:21:01,224 - INFO - Search text: None


In [5]:
# Try to import PaddleOCR
try:
    from paddleocr import PaddleOCR
    # Initialize PaddleOCR with parameters optimized for document OCR
    ocr = PaddleOCR(
        lang='en',
        use_angle_cls=True,
        det_model_dir=None,  # Use default detection model
        rec_model_dir=None,  # Use default recognition model
        cls_model_dir=None,  # Use default classification model
        det_limit_side_len=2560,  # Increase from default for higher quality
        det_db_thresh=0.3,  # Lower threshold to detect more text regions
        det_db_box_thresh=0.5,  # Lower box threshold
        rec_batch_num=6,  # Increase batch size for faster processing
        rec_char_dict_path=None,  # Use default dictionary
        use_space_char=True,  # Important for document text
        show_log=False
    )
    OCR_AVAILABLE = True
    print("PaddleOCR initialized successfully")
    logger.info("PaddleOCR initialized successfully")
except ImportError:
    print("PaddleOCR not available. Please install it with 'pip install paddleocr'")
    logger.info("PaddleOCR not available. Please install it with 'pip install paddleocr'")
    OCR_AVAILABLE = False
    ocr = None

try:
    from pdf2image import convert_from_path
    PDF_CONVERSION_AVAILABLE = True
except ImportError:
    print("pdf2image not available. Please install it with 'pip install pdf2image'")
    logger.info("pdf2image not available. Please install it with 'pip install pdf2image'")
    PDF_CONVERSION_AVAILABLE = False

import difflib


2025-04-18 14:21:15,722 - INFO - PaddleOCR initialized successfully
[2025-04-18 14:21:15,722] [    INFO] 2888271785.py:21 - PaddleOCR initialized successfully


PaddleOCR initialized successfully


Load the CUAD dataset annotations

Find annotations for a specific document title


In [6]:
def load_cuad_annotations(annotation_file_path):
    with open(annotation_file_path, 'r', encoding='utf-8') as f:
        logger.info(f"Loading annotations from {annotation_file_path}")
        return json.load(f)
    
def get_document_annotations(cuad_annotations, doc_title):
    return next((d for d in cuad_annotations['data'] if d['title'] == doc_title), None)

Convert PDF pages to images and run OCR on each image with improved parameters.

Args:

- pdf_path: Path to the PDF file
- dpi: DPI for PDF to image conversion
- pages_to_process: Specific pages to process (1-indexed). None means all pages.

Returns:
    List of pages, where each page contains a list of word dictionaries with 'text' and 'bbox'

In [7]:

def pdf_to_ocr_words(pdf_path: str, dpi: int = 400, pages_to_process=None) -> List[List[Dict[str, Any]]]:
    if not OCR_AVAILABLE:
        logger.error("PaddleOCR is not available. Please install it with 'pip install paddleocr'")
        return []
    
    if not PDF_CONVERSION_AVAILABLE:
        logger.error("pdf2image is not available. Please install it with 'pip install pdf2image'")
        return []
    import tempfile
    # Create a unique temporary directory for this operation


            
    logger.info(f"Starting OCR processing for {pdf_path}")
    start_time = time.time()
    
    with tempfile.TemporaryDirectory() as temp_dir:
        try:
            # Use higher DPI for better quality images
            all_pages = convert_from_path(
                pdf_path, 
                dpi=dpi,
                thread_count=4,
                use_pdftocairo=True,  # Often provides better quality than pdftoppm
                grayscale=False       # Keep color for better OCR in some cases
            )
            logger.info(f"Successfully converted PDF to {len(all_pages)} pages")
            
            # Filter pages to process if specified
            if pages_to_process:
                # Convert to 0-indexed
                page_indices = [p-1 for p in pages_to_process if 1 <= p <= len(all_pages)]
                pages = [all_pages[i] for i in page_indices]
                logger.info(f"Processing {len(pages)} specified pages: {pages_to_process}")
            else:
                pages = all_pages
                logger.info(f"Processing all {len(pages)} pages")
        except Exception as e:
            logger.error(f"Error converting PDF {pdf_path}: {str(e)}")
            return []
        
    all_pages_ocr = []
    total_words = 0
    
    for page_num, image in enumerate(tqdm(pages, desc="Processing pages")):
        page_start_time = time.time()
        try:
            image_np = np.array(image)
            result = ocr.ocr(image_np, cls=True)
            
            page_words = []

            # Handle PaddleOCR result format - more robust handling for different versions
            try:
                # For newer PaddleOCR versions (>=2.0) - result structure is [[page_result]]
                if isinstance(result, list) and len(result) > 0 and isinstance(result[0], list):
                    ocr_results = result[0]
                else:
                    # For older versions - result structure is [line_result]
                    ocr_results = result

                # Process OCR results regardless of format
                if ocr_results:
                    for line_result in ocr_results:
                        try:
                            # Extract bbox and text+confidence based on structure
                            if isinstance(line_result, list) and len(line_result) >= 2:
                                bbox = line_result[0]
                                text_conf = line_result[1]
                                
                                # Handle text & confidence extraction 
                                if isinstance(text_conf, tuple) and len(text_conf) == 2:
                                    text, conf = text_conf
                                else:
                                    # If not a tuple, assume it's just text
                                    text = str(text_conf)
                                    conf = 1.0
                                
                                # Normalize bbox to [x0, y0, x1, y1]
                                if bbox and isinstance(bbox, list) and len(bbox) >= 4:
                                    x_coords = [p[0] for p in bbox if isinstance(p, (list, tuple))]
                                    y_coords = [p[1] for p in bbox if isinstance(p, (list, tuple))]
                                    
                                    if x_coords and y_coords:
                                        x0, y0 = min(x_coords), min(y_coords)
                                        x1, y1 = max(x_coords), max(y_coords)
                                        
                                        page_words.append({
                                            'text': text,
                                            'bbox': [float(x0), float(y0), float(x1), float(y1)],
                                            'confidence': float(conf),
                                            'page': page_num
                                        })
                        except Exception as e:
                            logger.warning(f"Error processing OCR item on page {page_num}: {str(e)}")
                            continue
            except Exception as e:
                logger.warning(f"Error processing OCR result structure on page {page_num}: {str(e)}")
            
            total_words += len(page_words)
            all_pages_ocr.append(page_words)
            
            page_time = time.time() - page_start_time
            logger.debug(f"Page {page_num + 1}: Processed {len(page_words)} words in {page_time:.2f}s")
            
        except Exception as e:
            logger.error(f"Error processing page {page_num} of {pdf_path}: {str(e)}")
            all_pages_ocr.append([])
    
    total_time = time.time() - start_time
    logger.info(f"Completed OCR processing for {pdf_path}")
    logger.info(f"Total words extracted: {total_words}")
    logger.info(f"Average words per page: {total_words / len(pages):.1f}")
    logger.info(f"Total processing time: {total_time:.2f}s")
    
    return all_pages_ocr

Aligns annotation text with OCR words using improved matching algorithm.

Args:
- ocr_words: List of OCR word dictionaries
- annotation_text: Text to align
- min_confidence: Minimum confidence threshold for matching
    
Returns:
    List of matched word dictionaries with their bounding boxes

In [8]:
def align_annotation(ocr_words: List[Dict], annotation_text: str, min_confidence: float = 0.5) -> List[Dict]:
    if not ocr_words:
        return []
    
    # More aggressive text normalization
    def normalize_text(text):
        import re
        # Convert to lowercase
        text = text.lower()
        # Replace common OCR errors and normalize whitespace
        text = re.sub(r'\s+', ' ', text)
        # Remove punctuation that often causes mismatches
        text = re.sub(r'[.,;:()"\'-]', ' ', text)
        # Remove extra spaces
        text = re.sub(r'\s+', ' ', text)
        return text.strip()
    
    # Normalize texts for matching
    normalized_annotation = normalize_text(annotation_text)
    
    # Create two versions of OCR text - one with spaces between words, one without
    ocr_text_with_spaces = " ".join(normalize_text(word['text']) for word in ocr_words)
    ocr_text_no_spaces = "".join(normalize_text(word['text']) for word in ocr_words)
    
    # Try multiple matching strategies
    
    # 1. Direct substring match (most reliable)
    if normalized_annotation in ocr_text_with_spaces:
        start_idx = ocr_text_with_spaces.index(normalized_annotation)
        end_idx = start_idx + len(normalized_annotation)
        match_type = "exact"
        match_text = ocr_text_with_spaces
        logger.debug(f"Found exact match for: {annotation_text[:50]}...")
    
    # 2. Try matching without spaces (helps with word boundary issues)
    elif normalized_annotation.replace(" ", "") in ocr_text_no_spaces:
        no_space_annotation = normalized_annotation.replace(" ", "")
        start_idx = ocr_text_no_spaces.index(no_space_annotation)
        end_idx = start_idx + len(no_space_annotation)
        match_type = "no_spaces"
        match_text = ocr_text_no_spaces
        logger.debug(f"Found no-spaces match for: {annotation_text[:50]}...")
    
    # 3. Use fuzzy matching with a lower threshold (60% instead of 70%)
    else:
        # Try different combinations of text normalization for fuzzy matching
        matcher = difflib.SequenceMatcher(None, ocr_text_with_spaces, normalized_annotation)
        match = matcher.find_longest_match(0, len(ocr_text_with_spaces), 0, len(normalized_annotation))
        
        # If match is too small, try alternative approaches
        if match.size < len(normalized_annotation) * 0.6:
            # Try word-by-word matching for very low confidence cases
            annotation_words = normalized_annotation.split()
            if len(annotation_words) > 3:  # Only try for longer annotations
                # Check if at least 60% of the words appear in the OCR text
                found_words = [word for word in annotation_words if word in ocr_text_with_spaces]
                if len(found_words) / len(annotation_words) >= 0.6:
                    logger.debug(f"Found partial word match ({len(found_words)}/{len(annotation_words)} words) for: {annotation_text[:50]}...")
                    
                    # Use all OCR words as a fallback
                    # This is not ideal but better than nothing
                    return ocr_words
                
            logger.warning(f"Low confidence match for: {annotation_text[:50]}...")
            return []
        else:
            # Try sliding window matching for longer texts (over 100 chars)
            if len(normalized_annotation) > 100:
                # Use smaller chunks of the text to match
                chunk_size = min(80, len(normalized_annotation) // 2)
                # Try start, middle and end chunks
                start_chunk = normalized_annotation[:chunk_size]
                end_chunk = normalized_annotation[-chunk_size:]
                mid_point = len(normalized_annotation) // 2
                mid_chunk = normalized_annotation[mid_point-chunk_size//2:mid_point+chunk_size//2]
                
                for chunk in [start_chunk, mid_chunk, end_chunk]:
                    matcher = difflib.SequenceMatcher(None, ocr_text_with_spaces, chunk)
                    match = matcher.find_longest_match(0, len(ocr_text_with_spaces), 0, len(chunk))
                    
                    if match.size > len(chunk) * 0.7:  # Higher threshold for chunks
                        # Found a good chunk match, now expand to include surrounding context
                        start_idx = max(0, match.a - chunk_size)
                        end_idx = min(len(ocr_text_with_spaces), match.a + match.size + chunk_size)
                        
                        # Use this expanded region for word mapping
                        match_type = "chunk"
                        match_text = ocr_text_with_spaces
                        logger.debug(f"Found chunk match for: {annotation_text[:50]}...")
                        break
                else:
                    # None of the chunks matched well
                    logger.warning(f"Low confidence match for: {annotation_text[:50]}...")
                    return []
            
        start_idx = match.a
        end_idx = start_idx + match.size
        match_type = "fuzzy"
        match_text = ocr_text_with_spaces
        logger.debug(f"Found fuzzy match ({match.size/len(normalized_annotation):.2%} confidence) for: {annotation_text[:50]}...")

    # Map character positions to word indices
    matched_words = []
    
    # Different mapping strategy based on match type
    if match_type == "no_spaces":
        # For no_spaces matches, we need to map back to the original words
        char_count = 0
        for word in ocr_words:
            word_text = normalize_text(word['text'])
            word_no_spaces = word_text.replace(" ", "")
            word_length = len(word_no_spaces)
            
            word_start = char_count
            word_end = char_count + word_length
            
            # Check if word overlaps with matched region
            if word_end > start_idx and word_start < end_idx:
                if word.get('confidence', 1.0) >= min_confidence:
                    matched_words.append(word)
                    
            char_count += word_length
    else:
        # For exact and fuzzy matches with spaces
        current_pos = 0
        for word in ocr_words:
            word_text = normalize_text(word['text'])
            word_start = current_pos
            word_end = current_pos + len(word_text)
            
            # Check if word overlaps with matched region
            if word_end > start_idx and word_start < end_idx:
                if word.get('confidence', 1.0) >= min_confidence:
                    matched_words.append(word)
                    
            current_pos = word_end + 1  # +1 for space
    
    logger.debug(f"Matched {len(matched_words)} words using {match_type} matching")
    return matched_words

Export annotations in LayoutLMv3 compatible format.

Args:
- pdf_path: Path to the PDF file
- all_pages_ocr: OCR results for all pages
- annotations: List of (text, page_num) tuples to annotate
- output_file: Optional output file path
    
Returns:
    Annotation data dictionary

In [9]:
def export_annotations(pdf_path, all_pages_ocr, annotations, output_file=None):
    layoutlm_annotations = []
    
    for i, (text, page_num) in enumerate(annotations):
        if 0 <= page_num < len(all_pages_ocr):
            page_words = all_pages_ocr[page_num]
            matched_words = align_annotation(page_words, text)
            
            if matched_words:
                annotation = {
                    'id': f"annotation_{i}",
                    'text': text,
                    'page_number': page_num,
                    'words': [
                        {
                            'text': word['text'],
                            'bbox': word['bbox'],
                            'confidence': word.get('confidence', 1.0)
                        }
                        for word in matched_words
                    ]
                }
                layoutlm_annotations.append(annotation)
    
    # Create annotation data
    annotation_data = {
        'file_name': pdf_path,
        'annotations': layoutlm_annotations
    }
    
    # Save to file if specified
    if output_file:
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(annotation_data, f, indent=2, ensure_ascii=False)
        print(f"Saved annotations to {output_file}")
    
    return annotation_data


Process a single annotation and match it with OCR results.

In [10]:
def process_annotation(all_pages_ocr, annotation_text, qa_id, question):
    matched_results = []
    
    for page_num, page_words in enumerate(all_pages_ocr):
        matched_words = align_annotation(page_words, annotation_text)
        if matched_words:
            matched_results.append({
                'id': f"{qa_id}_{page_num}",
                'question': question,
                'answer_text': annotation_text,
                'page_number': page_num,
                'words': [
                    {
                        'text': word['text'],
                        'bbox': word['bbox'],
                        'confidence': word.get('confidence', 1.0)
                    }
                    for word in matched_words
                ]
            })
    
    return matched_results

Extract and save annotations for a specific document from the existing CUAD dataset.

Args:
- annotation_file: Path to the CUAD annotation file
- document_title: Title of the document to extract
- output_file: Path to save the extracted annotations

Returns:
    dict: The extracted annotations

In [11]:
def export_existing_annotations(annotation_file, document_title, output_file):
    logger.info(f"Extracting existing annotations for document: {document_title}")
    
    # Load annotations
    try:
        with open(annotation_file, 'r', encoding='utf-8') as f:
            cuad_annotations = json.load(f)
    except Exception as e:
        logger.error(f"Failed to load annotations: {str(e)}")
        return None
    
    # Find annotations for this document
    doc = next((d for d in cuad_annotations['data'] if d['title'] == document_title), None)
    if not doc:
        logger.error(f"No annotations found for {document_title}")
        return None
    
    # Extract QA pairs in a simplified format
    extracted_annotations = {
        'title': document_title,
        'annotations': []
    }
    
    for paragraph in doc.get('paragraphs', []):
        for qa in paragraph.get('qas', []):
            for answer in qa.get('answers', []):
                annotation = {
                    'id': qa['id'],
                    'question': qa['question'],
                    'answer': answer['text'],
                    'answer_start': answer.get('answer_start')
                }
                extracted_annotations['annotations'].append(annotation)
    
    # Save to file
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(extracted_annotations, f, indent=2, ensure_ascii=False)
    
    logger.info(f"Saved {len(extracted_annotations['annotations'])} original annotations to {output_file}")
    print(f"Saved {len(extracted_annotations['annotations'])} original annotations to {output_file}")
    
    return extracted_annotations

Compare original annotations with processed annotations.

Args:
- original_file: Path to the original annotations JSON
- processed_file: Path to the processed annotations JSON

In [12]:
def compare_annotations(original_file, processed_file):
    # Load both files
    try:
        with open(original_file, 'r', encoding='utf-8') as f:
            original = json.load(f)
        
        with open(processed_file, 'r', encoding='utf-8') as f:
            processed = json.load(f)
    except Exception as e:
        print(f"Error loading comparison files: {e}")
        return
    
    # Get counts
    orig_count = len(original.get('annotations', []))
    proc_count = len(processed.get('annotations', []))
    
    print(f"\n==== Annotation Comparison ====")
    print(f"Original annotations: {orig_count}")
    print(f"Processed annotations: {proc_count}")
    print(f"Match rate: {proc_count/orig_count:.1%} ({proc_count}/{orig_count})\n")
    logger.info(f"==== Annotation Comparison ====")
    logger.info(f"Original annotations: {orig_count}")
    logger.info(f"Processed annotations: {proc_count}")
    logger.info(f"Match rate: {proc_count/orig_count:.1%} ({proc_count}/{orig_count})\n")
    
    # Build a mapping of questions to make comparison easier
    orig_qa_map = {}
    for ann in original.get('annotations', []):
        q_id = ann.get('id', '')
        if q_id not in orig_qa_map:
            orig_qa_map[q_id] = []
        orig_qa_map[q_id].append(ann)
    
    # Find questions that were matched and those that weren't
    matched_questions = set()
    for ann in processed.get('annotations', []):
        q_id = ann.get('id', '').split('_')[0]  # Remove page suffix
        matched_questions.add(q_id)
    
    missing_questions = set(orig_qa_map.keys()) - matched_questions
    
    # Sample a few matched annotations to show
    if matched_questions:
        print("Sample of matched annotations:")
        samples = list(matched_questions)[:3]  # Take up to 3 samples
        
        for q_id in samples:
            # Find original
            if q_id in orig_qa_map:
                orig_ann = orig_qa_map[q_id][0]
                print(f"\nQuestion: {orig_ann.get('question')}")
                print(f"Original answer: {orig_ann.get('answer')[:100]}..." if len(orig_ann.get('answer', '')) > 100 else orig_ann.get('answer'))
                
                # Find processed version
                proc_ann = next((a for a in processed.get('annotations', []) 
                                if a.get('id', '').startswith(q_id + '_')), None)
                if proc_ann:
                    print(f"Processed answer: {proc_ann.get('answer_text')[:100]}..." 
                           if len(proc_ann.get('answer_text', '')) > 100 
                           else proc_ann.get('answer_text'))
                    print(f"Found on page: {proc_ann.get('page_number')}")
                    print(f"Word count: {len(proc_ann.get('words', []))}")
    
    # Show some missing questions
    if missing_questions:
        print("\nSample of questions without matches:")
        samples = list(missing_questions)[:3]  # Take up to 3 samples
        
        for q_id in samples:
            if q_id in orig_qa_map:
                orig_ann = orig_qa_map[q_id][0]
                print(f"\nQuestion: {orig_ann.get('question')}")
                print(f"Answer: {orig_ann.get('answer')[:100]}..." if len(orig_ann.get('answer', '')) > 100 else orig_ann.get('answer'))

Process document and create LayoutLMv3 compatible annotations.


In [13]:
def process_document(pdf_path, annotation_file):
    # Load annotations
    cuad_annotations = load_cuad_annotations(annotation_file)
    
    # Get document title from filename
    doc_title = os.path.splitext(os.path.basename(pdf_path))[0]
    
    # Find annotations for this document
    doc = get_document_annotations(cuad_annotations, doc_title)
    logger.info(f"Found annotations for {doc_title} in {annotation_file}")
    if not doc:
        logger.error(f"No annotations found for {doc_title}")
        return None
    
    # Process OCR
    all_pages_ocr = pdf_to_ocr_words(pdf_path, dpi=400)
    
    # Process annotations
    layoutlm_annotations = []
    failed_annotations = []

    # For each paragraph and QA pair
    for paragraph in doc.get('paragraphs', []):
        for qa in paragraph.get('qas', []):
            for answer in qa.get('answers', []):
                answer_text = answer['text']
                
                # Match annotation with OCR results
                match_found = False
                for page_num, page_words in enumerate(all_pages_ocr):
                    matched_words = align_annotation(page_words, answer_text)
                    
                    if matched_words:
                        match_found = True
                        # Create annotation entry
                        annotation = {
                            'id': f"{qa['id']}_{page_num}",
                            'question': qa['question'],
                            'answer_text': answer_text,
                            'page_number': page_num,
                            'words': [
                                {
                                    'text': word['text'],
                                    'bbox': word['bbox'],
                                    'confidence': word.get('confidence', 1.0)
                                }
                                for word in matched_words
                            ]
                        }
                        layoutlm_annotations.append(annotation)
                        logger.debug(f"Successfully matched answer for question: {qa['question'][:50]}...")
                        break  # Found the answer, move to next QA pair
                
                if not match_found:
                    failed_annotations.append({
                        'id': qa['id'],
                        'question': qa['question'],
                        'answer_text': answer_text,
                        'answer_length': len(answer_text)
                    })
                    logger.warning(f"Could not find match for answer: {answer_text[:50]}...")
    
    # Create final output
    output_data = {
        'file_name': pdf_path,
        'annotations': layoutlm_annotations,
        'failed_annotations': failed_annotations
    }
    if failed_annotations:
        logger.error(f"===== FAILED ANNOTATION SUMMARY =====")
        logger.error(f"Failed to match {len(failed_annotations)} annotations out of {len(failed_annotations) + len(layoutlm_annotations)} total")
        logger.error(f"Failure rate: {len(failed_annotations) / (len(failed_annotations) + len(layoutlm_annotations)):.2%}")
        
        # Log details of each failed annotation
        for i, failed in enumerate(failed_annotations):
            logger.error(f"Failed #{i+1}: {failed['question'][:80]}...")
            logger.error(f"Answer text ({failed['answer_length']} chars): {failed['answer_text'][:100]}...")
            logger.error("-" * 40)
    
    return output_data

In [14]:
# Define file paths
doc_title = os.path.splitext(os.path.basename(pdf_path))[0]
original_output_file = os.path.join("test_annotate/comparison", f"{doc_title}_original.json")
processed_output_file = os.path.join("test_annotate/output", f"{doc_title}_layoutlm.json")

# Create directories
os.makedirs("test_annotate", exist_ok=True)
os.makedirs("test_annotate/comparison", exist_ok=True)
os.makedirs("test_annotate/output", exist_ok=True)

# Save original annotations for comparison
original_annotations = export_existing_annotations(annotation_file, doc_title, original_output_file)

# Process the document and create LayoutLMv3 annotations
output_data = process_document(pdf_path, annotation_file)

# Save processed annotations
if output_data and output_data['annotations']:
    with open(processed_output_file, 'w', encoding='utf-8') as f:
        json.dump(output_data, f, indent=2, ensure_ascii=False)
    print(f"Saved {len(output_data['annotations'])} processed annotations to {processed_output_file}")
    logger.info(f"Saved {len(output_data['annotations'])} processed annotations to {processed_output_file}")
    
    # Compare the original and processed annotations
    compare_annotations(original_output_file, processed_output_file)
else:
    print("No annotations were successfully matched")
    logger.warning("No annotations were successfully matched")

2025-04-18 14:21:15,821 - INFO - Extracting existing annotations for document: InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement
[2025-04-18 14:21:15,821] [    INFO] 2936235873.py:2 - Extracting existing annotations for document: InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement
2025-04-18 14:21:15,995 - INFO - Saved 63 original annotations to test_annotate/comparison\InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement_original.json
[2025-04-18 14:21:15,995] [    INFO] 2936235873.py:40 - Saved 63 original annotations to test_annotate/comparison\InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement_original.json
2025-04-18 14:21:16,005 - INFO - Loading annotations from CUAD_v1/CUAD_v1.json
[2025-04-18 14:21:16,005] [    INFO] 433870264.py:3 - Loading annotations from CUAD_v1/CUAD_v1.json
2025-04-18 14:21:16,169 - INFO - Found annotations for InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agr

Saved 63 original annotations to test_annotate/comparison\InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement_original.json


2025-04-18 14:21:28,013 - INFO - Successfully converted PDF to 27 pages
[2025-04-18 14:21:28,013] [    INFO] 1368059901.py:27 - Successfully converted PDF to 27 pages
2025-04-18 14:21:28,013 - INFO - Processing all 27 pages
[2025-04-18 14:21:28,013] [    INFO] 1368059901.py:37 - Processing all 27 pages
Processing pages:   0%|          | 0/27 [00:00<?, ?it/s]2025-04-18 14:21:45,673 - DEBUG - Page 1: Processed 64 words in 17.66s
[2025-04-18 14:21:45,673] [   DEBUG] 1368059901.py:104 - Page 1: Processed 64 words in 17.66s
Processing pages:   4%|▎         | 1/27 [00:17<07:39, 17.66s/it]2025-04-18 14:21:58,161 - DEBUG - Page 2: Processed 78 words in 12.49s
[2025-04-18 14:21:58,161] [   DEBUG] 1368059901.py:104 - Page 2: Processed 78 words in 12.49s
Processing pages:   7%|▋         | 2/27 [00:30<06:05, 14.62s/it]2025-04-18 14:22:11,898 - DEBUG - Page 3: Processed 61 words in 13.73s
[2025-04-18 14:22:11,898] [   DEBUG] 1368059901.py:104 - Page 3: Processed 61 words in 13.73s
Processing pages:

Saved 63 processed annotations to test_annotate/output\InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement_layoutlm.json

==== Annotation Comparison ====
Original annotations: 63
Processed annotations: 63
Match rate: 100.0% (63/63)

Sample of matched annotations:

Sample of questions without matches:

Question: Highlight the parts (if any) of this contract related to "Audit Rights" that should be reviewed by a lawyer. Details: Does a party have the right to  audit the books, records, or physical locations of the counterparty to ensure compliance with the contract?
Answer: Once every 12 months, the         party receiving payment and/or User Data records or its designee m...

Question: Highlight the parts (if any) of this contract related to "Revenue/Profit Sharing" that should be reviewed by a lawyer. Details: Is one party required to share revenue or profit with the counterparty for any technology, goods, or services?
Answer: The parties will share equally all reve

In [15]:
def visualize_annotation_results(pdf_path, annotations_file, output_dir=None, pages=None, dpi=300):
    """
    Visualize annotated PDF pages with bounding boxes and labels.
    
    Args:
        pdf_path: Path to the PDF file
        annotations_file: Path to the processed annotations JSON file
        output_dir: Directory to save visualization images (None = show only)
        pages: List of specific pages to visualize (None = all pages with annotations)
        dpi: DPI for PDF conversion
    """
    import matplotlib.pyplot as plt
    import matplotlib.patches as patches
    from matplotlib.collections import PatchCollection
    import numpy as np
    from pdf2image import convert_from_path
    import os
    import json
    from PIL import Image, ImageDraw, ImageFont
    import random
    # Create output directory if needed
    if output_dir:
        os.makedirs(output_dir, exist_ok=True)
    
    # Load annotations
    with open(annotations_file, 'r', encoding='utf-8') as f:
        annotation_data = json.load(f)
    
    # Organize annotations by page
    annotations_by_page = {}
    for anno in annotation_data.get('annotations', []):
        page_num = anno.get('page_number', 0)
        if page_num not in annotations_by_page:
            annotations_by_page[page_num] = []
        annotations_by_page[page_num].append(anno)
    
    # Determine which pages to process
    if pages is None:
        pages = sorted(annotations_by_page.keys())
    else:
        # Filter to only pages that have annotations
        pages = [p for p in pages if p in annotations_by_page]
    
    if not pages:
        print("No pages with annotations found")
        return
    
    # Generate colors for different question types
    question_colors = {}
    
    def get_question_color(question):
        # Extract category from question if present
        category = question.split(':')[0] if ':' in question else question
        
        if category not in question_colors:
            # Generate a random but visually distinct color
            r = random.random() * 0.8 + 0.2  # Avoid too dark/light
            g = random.random() * 0.8 + 0.2
            b = random.random() * 0.8 + 0.2
            question_colors[category] = (r, g, b)
        
        return question_colors[category]
    
    # Create a lookup of page numbers to render
    page_indices = [p+1 for p in pages]  # Convert 0-indexed to 1-indexed for pdf2image
    
    # Convert PDF pages to images
    print(f"Converting PDF pages {page_indices}...")
    pdf_images = convert_from_path(
        pdf_path, 
        dpi=dpi, 
        first_page=min(page_indices),
        last_page=max(page_indices)
    )
    
    # Mapping of 1-indexed page numbers to images
    page_images = {}
    for i, img in enumerate(pdf_images):
        page_idx = min(page_indices) + i - 1  # Convert back to 0-indexed
        page_images[page_idx] = img
    
    # Process each page
    for page_num in pages:
        if page_num not in annotations_by_page:
            continue
            
        page_annotations = annotations_by_page[page_num]
        page_img = page_images.get(page_num)
        
        if not page_img:
            print(f"Image for page {page_num} not found")
            continue
        
        # Create a copy of the image for drawing
        img_draw = page_img.copy()
        draw = ImageDraw.Draw(img_draw)
        
        # Try to load a font, fallback to default if not available
        try:
            font = ImageFont.truetype("arial.ttf", 24)
        except IOError:
            font = ImageFont.load_default()

        # Within visualize_annotation_results function
        # Before drawing bounding boxes, normalize coordinates
        img_width, img_height = page_img.size
        for annotation in page_annotations:
            words = annotation.get('words', [])
            for word in words:
                bbox = word.get('bbox')
                if bbox and len(bbox) == 4:
                    # Ensure coordinates are within image bounds
                    x0 = max(0, min(bbox[0], img_width))
                    y0 = max(0, min(bbox[1], img_height))
                    x1 = max(0, min(bbox[2], img_width))
                    y1 = max(0, min(bbox[3], img_height))
                    word['bbox'] = [x0, y0, x1, y1]
        
        # Draw bounding boxes for each annotation
        for annotation in page_annotations:
            # Extract the label from the annotation ID
            annotation_id = annotation.get('id', '')
            # Split by double underscore and get the last part before the index
            label = annotation_id.split('__')[-1].split('_')[0] if '__' in annotation_id else ''
            color = get_question_color(label)
            rgb_color = tuple(int(c*255) for c in color)
            
            # Get words with bounding boxes
            words = annotation.get('words', [])
            
            # Draw individual word boxes
            for word in words:
                bbox = word.get('bbox')
                if bbox and len(bbox) == 4:
                    draw.rectangle([(bbox[0], bbox[1]), (bbox[2], bbox[3])], 
                                   outline=rgb_color, width=2)
            
            # Draw overall annotation box (combine all word boxes)
            if words:
                all_x0 = [w['bbox'][0] for w in words if 'bbox' in w]
                all_y0 = [w['bbox'][1] for w in words if 'bbox' in w]
                all_x1 = [w['bbox'][2] for w in words if 'bbox' in w]
                all_y1 = [w['bbox'][3] for w in words if 'bbox' in w]
                
                if all_x0 and all_y0 and all_x1 and all_y1:
                    x0, y0 = min(all_x0), min(all_y0)
                    x1, y1 = max(all_x1), max(all_y1)
                    
                    # Draw overall box with padding
                    padding = 10
                    draw.rectangle([(x0-padding, y0-padding), (x1+padding, y1+padding)], 
                                   outline=rgb_color, width=3)
                    
                    # Add annotation label
                    # Get abbreviated question text
                    label_text = label[:80] + "..." if len(label) > 40 else label
                    
                    # Draw label background
                    text_width, text_height = draw.textbbox((0, 0), label_text, font=font)[2:]
                    draw.rectangle([(x0-padding, y0-text_height-padding*2), 
                                    (x0+text_width+padding, y0-padding)], 
                                   fill=rgb_color)
                    
                    # Draw text in white
                    draw.text((x0, y0-text_height-padding), label_text, 
                             fill=(255, 255, 255), font=font)
        
        # Display or save the annotated image
        plt.figure(figsize=(12, 16))
        plt.imshow(img_draw)
        plt.axis('off')
        plt.title(f"Page {page_num+1} Annotations ({len(page_annotations)} found)")
        
        if output_dir:
            output_path = os.path.join(output_dir, f"page_{page_num+1}_annotated.png")
            img_draw.save(output_path)
            print(f"Saved annotated page to {output_path}")
        else:
            plt.show()
        
        plt.close()
    
    # Display a legend of question categories and colors
    # Modify the legend creation part of your visualization function
    if question_colors:
        # Create a new figure with fixed dimensions
        plt.figure(figsize=(12, 6))
        
        # Create a more robust legend
        handles = []
        labels = []
        for category, color in question_colors.items():
            # Create a patch for each category
            patch = patches.Patch(color=color, label=category)
            handles.append(patch)
            labels.append(category)
        
        # Place the legend in the center with better control
        plt.legend(handles=handles, labels=labels, 
                loc='center', 
                bbox_to_anchor=(0.5, 0.5),
                ncol=1,  # Stack vertically for better readability
                frameon=True,
                fontsize='medium')
        
        # Remove axis elements
        plt.gca().set_axis_off()
        plt.margins(0, 0)
        plt.tight_layout()
        plt.title("Annotation Categories", fontsize=14, pad=20)
        
        if output_dir:
            plt.savefig(os.path.join(output_dir, "annotation_legend.png"), 
                    bbox_inches='tight',
                    pad_inches=0.5)
        else:
            plt.show()
        plt.close()

In [16]:
# After processing the document and saving the annotations

# Define paths
doc_title = os.path.splitext(os.path.basename(pdf_path))[0]
processed_output_file = os.path.join("test_annotate/output", f"{doc_title}_layoutlm.json")
visualization_dir = os.path.join("test_annotate/output", "visualizations", doc_title)

# Visualize annotations
if os.path.exists(processed_output_file):
    # Create the visualization directory
    os.makedirs(visualization_dir, exist_ok=True)
    
    # Visualize all pages with annotations
    visualize_annotation_results(
        pdf_path=pdf_path,
        annotations_file=processed_output_file,
        output_dir=visualization_dir,
        dpi=300  # Higher for better quality, lower for faster rendering
    )
    
    print(f"Visualizations saved to {visualization_dir}")
else:
    print(f"Annotation file {processed_output_file} not found")

Converting PDF pages [1, 2, 3, 4, 5, 6, 7, 9, 12, 24]...
Saved annotated page to test_annotate/output\visualizations\InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement\page_1_annotated.png
Saved annotated page to test_annotate/output\visualizations\InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement\page_2_annotated.png
Saved annotated page to test_annotate/output\visualizations\InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement\page_3_annotated.png
Saved annotated page to test_annotate/output\visualizations\InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement\page_4_annotated.png
Saved annotated page to test_annotate/output\visualizations\InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement\page_5_annotated.png
Saved annotated page to test_annotate/output\visualizations\InvendaCorp_20000828_S-1A_EX-10.2_2588206_EX-10.2_Co-Branding Agreement\page_6_annotated.png
Saved annotated page to t