## ISO Form Code Extractor

This notebook extracts ISO (Insurance Services Office) form codes from commercial insurance policy PDFs using a two-stage approach:

1. **Stage 1: Regex-based extraction** - Scans PDF headers and footers for ISO codes matching standard patterns (e.g., `CG 00 01 04 13`, `GL 0169 0001`)
2. **Stage 2: LLM refinement** - Uses an LLM to verify and extract codes from pages that are missing codes or have ambiguous results

### Key Features
- **Automatic OCR fallback** for scanned/image-based PDFs
- **Parallel LLM processing** for efficient refinement 
- Outputs codes with page numbers and text snippets for verification

### Typical Workflow
1. Extract codes using `extract_iso_codes_from_headers_footers(pdf_path)`
2. Refine results using `refine_iso_codes_with_llm(pdf_path, initial_codes)`
3. Optionally save results as ground truth JSON for future comparison

In [None]:
import re
import asyncio
from collections import defaultdict
from typing import Dict, List, Any, Tuple

import fitz  # PyMuPDF
from PIL import Image
import pytesseract
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

load_dotenv()

In [None]:
# -------------------------------------------------------------------
# Shared config and helpers
# -------------------------------------------------------------------

# Regex for ISO codes - simplified to capture most common patterns only
ISO_CODE_PATTERN = re.compile(
    r"\b"
    r"[A-Z]{2}[\s\-]"                                  # Two-letter prefix + required space or dash
    r"(?:"
        r"\d{2}[\s\-]\d{2}[\s\-]\d{2}[\s\-]\d{2}"     # Standard: CG 00 01 04 13
        r"|"
        r"\d{4}[\s\-]\d{4}"                            # GL format: GL 0169 0001
    r")"
    r"\b"
)

def extract_text_with_ocr(pdf_path: str, page_num: int, dpi: int = 300) -> str:
    """Extract text from a PDF page using OCR (for scanned documents)."""
    doc = fitz.open(pdf_path)
    page = doc[page_num - 1]
    
    # Convert page to image
    pix = page.get_pixmap(dpi=dpi)  # Higher DPI = better OCR accuracy
    img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
    
    # OCR the image
    text = pytesseract.image_to_string(img)
    doc.close()
    return text


def get_page_text(pdf_path: str, page_num: int, ocr_threshold: int = 50) -> str:
    """
    Get text from a PDF page with automatic OCR fallback for scanned documents.
    
    Args:
        pdf_path: Path to the PDF file
        page_num: Page number (1-indexed)
        ocr_threshold: Minimum character count to consider text extraction successful
    
    Returns:
        Extracted text from the page
    """
    doc = fitz.open(pdf_path)
    page = doc[page_num - 1]
    text = page.get_text("text").strip()
    doc.close()
    
    # If little/no text found, assume it's scanned and use OCR
    if len(text) < ocr_threshold:
        print(f"Page {page_num}: Low text content detected ({len(text)} chars), using OCR...")
        text = extract_text_with_ocr(pdf_path, page_num)
    
    return text


def get_header_footer_spans(
    page: fitz.Page,
    header_fraction: float,
    footer_fraction: float,
) -> List[Tuple[str, Tuple[float, float, float, float]]]:
    """Extract text spans from header and footer regions of a PDF page."""
    rect = page.rect
    height = rect.height

    header_cutoff = rect.y0 + header_fraction * height
    footer_cutoff = rect.y1 - footer_fraction * height

    spans = []
    text_dict = page.get_text("dict")

    for block in text_dict.get("blocks", []):
        for line in block.get("lines", []):
            for span in line.get("spans", []):
                text = span.get("text", "").strip()
                if not text:
                    continue

                x0, y0, x1, y1 = span["bbox"]
                in_header = y0 <= header_cutoff
                in_footer = y1 >= footer_cutoff
                if in_header or in_footer:
                    spans.append((text, (x0, y0, x1, y1)))

    return spans


def build_page_to_codes_map(
    codes_dict: Dict[str, List[Dict[str, Any]]]
) -> Dict[int, List[str]]:
    """Convert a code-to-pages mapping into a page-to-codes mapping."""
    page_to_codes: Dict[int, List[str]] = defaultdict(list)
    for code, occurrences in codes_dict.items():
        for occ in occurrences:
            page = occ["page"]
            page_to_codes[page].append(code)
    return page_to_codes

def pages_to_fix_from_initial_output(pdf_path: str, initial_codes: dict) -> list[int]:
    """Identify pages that are missing codes or have ambiguous multiple codes."""
    # page -> set(unique codes)
    page_to_codes = defaultdict(set)
    for code, occs in initial_codes.items():
        for occ in occs:
            page_to_codes[int(occ["page"])].add(code)

    # total pages
    with fitz.open(pdf_path) as doc:
        all_pages = set(range(1, len(doc) + 1))

    missing_pages = sorted([p for p in all_pages if p not in page_to_codes])
    ambiguous_pages = sorted([p for p, codeset in page_to_codes.items() if len(codeset) >= 2])

    return sorted(set(missing_pages) | set(ambiguous_pages))


In [None]:
# -------------------------------------------------------------------
# Stage 1: regex-only extraction
# -------------------------------------------------------------------

def extract_iso_codes_from_headers_footers(
    pdf_path: str,
    header_fraction: float = 0.12,
    footer_fraction: float = 0.12,
    ocr_threshold: int = 50,
) -> Dict[str, List[Dict[str, Any]]]:
    """Extract ISO form codes from PDF headers and footers using regex, with OCR fallback for scanned documents."""
    doc = fitz.open(pdf_path)
    codes = defaultdict(list)

    try:
        for page_index, page in enumerate(doc):
            page_num = page_index + 1
            
            # First, try normal text extraction
            spans = get_header_footer_spans(
                page,
                header_fraction=header_fraction,
                footer_fraction=footer_fraction,
            )

            # Check if we got meaningful text from headers/footers
            total_text = "".join([text for text, _ in spans])
            
            if len(total_text.strip()) < ocr_threshold:
                # Low or no text found, use OCR on full page
                print(f"Page {page_num}: Low header/footer text ({len(total_text)} chars), using OCR...")
                page_text = extract_text_with_ocr(pdf_path, page_num)
                
                # Apply regex to full OCR text
                for match in ISO_CODE_PATTERN.finditer(page_text):
                    code = " ".join(match.group(0).split())
                    # Get context around the match
                    start = max(0, match.start() - 50)
                    end = min(len(page_text), match.end() + 50)
                    snippet = page_text[start:end].strip()
                    
                    codes[code].append({
                        "page": page_num,
                        "snippet": snippet,
                    })
            else:
                # Normal extraction from header/footer spans
                for text, _bbox in spans:
                    for match in ISO_CODE_PATTERN.finditer(text):
                        code = " ".join(match.group(0).split())
                        
                        codes[code].append({
                            "page": page_num,
                            "snippet": text,
                        })
    finally:
        doc.close()

    return codes


In [None]:
# -------------------------------------------------------------------
# Stage 2: LLM cleanup for missing or ambiguous pages only
# -------------------------------------------------------------------

base_prompt = (
            "You are given the text of a single page from a commercial insurance policy.\n"
            "Find all ISO (Insurance Services Office) form codes present on the page. A valid code looks like: CG 20 37 12 19. Sometimes,"
            "there are variants on this pattern (extra letters, different date formats, etc)\n"
            "Return all codes found, one per line. If no codes are found, return the string NONE.\n" \
            "Do not extract policy numbers, which often start with CPP.\n"
            "If there are two very similar codes on the same page (for example, differing only by punctuation), assume they are the same."
            "Use the context to determine if the code relates to an endorsement for this policy.\n"
            "Do not extract carrier or program codes (for example, A00000)"
            "The codes are often near the top or bottom of the page."
        )

async def refine_iso_codes_with_llm(
    pdf_path: str,
    initial_codes: Dict[str, List[Dict[str, Any]]],
    model_name: str = "gpt-5-mini",
) -> Dict[str, List[Dict[str, Any]]]:
    """Use LLM to refine ISO code extraction on pages that are missing codes or have ambiguous results."""
    page_to_codes = build_page_to_codes_map(initial_codes)

    doc = fitz.open(pdf_path)
    num_pages = len(doc)

    try:
        all_pages = set(range(1, num_pages + 1))

        pages_to_fix = pages_to_fix_from_initial_output(pdf_path, initial_codes)

        print(f"Using LLM to check pages: {pages_to_fix}")

        llm = ChatOpenAI(
            model=model_name,
            temperature=0.0,  # deterministic
        )

        final_page_codes = {}

        # Pages already clean
        for page in sorted(all_pages - set(pages_to_fix)):
            final_page_codes[page] = page_to_codes[page]

        # Parallelize LLM calls for all pages that need fixing
        async def process_page(page_num: int) -> Tuple[int, List[str]]:
            """Process a single page with LLM to extract ISO codes."""
            page_text = get_page_text(pdf_path, page_num)
            prompt = base_prompt + "\nPage text:\n" + page_text

            response = await llm.ainvoke(prompt)
            raw = getattr(response, "content", str(response))
            print(f"Page num: {page_num}, LLM raw output: {raw}")
            
            # Parse multiple codes (one per line)
            lines = [line.strip() for line in raw.strip().split('\n') if line.strip()]
            
            if len(lines) == 1 and lines[0].upper() == "NONE":
                return (page_num, [])
            else:
                # Normalize each code (collapse multiple spaces)
                codes = [" ".join(line.split()) for line in lines if line.upper() != "NONE"]
                return (page_num, codes)

        # Process all pages concurrently
        tasks = [process_page(page_num) for page_num in pages_to_fix]
        results = await asyncio.gather(*tasks)
        
        # Store results
        for page_num, codes in results:
            final_page_codes[page_num] = codes

    finally:
        doc.close()

    # Convert from page->codes format to code->pages format (same as initial_codes)
    result = defaultdict(list)
    
    for page_num, codes in sorted(final_page_codes.items()):
        for code in codes:
            snippet = ""
            
            # Try to find matching snippet from initial extraction
            for orig_code, occurrences in initial_codes.items():
                for occ in occurrences:
                    if occ["page"] == page_num and orig_code == code:
                        snippet = occ["snippet"]
                        break
                if snippet:
                    break
            
            # If no snippet found, use "LLM extraction"
            if not snippet:
                snippet = "LLM extraction"
            
            result[code].append({
                "page": page_num,
                "snippet": snippet
            })
    
    return dict(result)


# -------------------------------------------------------------------
# Example usage
# -------------------------------------------------------------------
# pdf_path = "your_policy.pdf"
# initial = extract_iso_codes_from_headers_footers(pdf_path)
# refined = refine_iso_codes_with_llm(pdf_path, initial)
# refined


In [None]:
import time

# Example usage
pdf_path = "examples/utica-commercial-package-policy.pdf"

start_time = time.time()
initial = extract_iso_codes_from_headers_footers(pdf_path)
end_time = time.time()
elapsed_time = end_time - start_time

print(f"Regex extraction took {elapsed_time:.2f} seconds, gave {len(initial)} codes")
initial

In [None]:

start_time = time.time()
result = await refine_iso_codes_with_llm(pdf_path, initial, model_name="gpt-5-mini") 
end_time = time.time()
elapsed_time = end_time - start_time

print(f"LLM refinement took {elapsed_time:.2f} seconds, gave {len(result)} codes")

result

In [None]:
# if results look good and have all the correct codes, we can store them as ground truth
# leave it commented out so that we don't accidentally store results without checking them first

import json
with open("examples/ground_truth/michigan-hospitality-liability-forms.json", "w") as f:
    json.dump(result, f, indent=2)

In [None]:
# This code allows you to read in the json ground truth in case you want to inspect it or compare
import json
with open("examples/ground_truth/michigan-hospitality-liability-forms.json", "r") as f:
    ground_truth = json.load(f)

print(f"Ground truth has {len(ground_truth)} codes")


In [None]:

# Compare keys between ground_truth and result (tolerant to spaces and dashes)
def normalize_code(code: str) -> str:
    """Normalize ISO code by removing spaces and dashes for comparison."""
    return code.replace(" ", "").replace("-", "")

# Create normalized mappings
gt_normalized = {normalize_code(k): k for k in ground_truth.keys()}
result_normalized = {normalize_code(k): k for k in result.keys()}

gt_norm_keys = set(gt_normalized.keys())
result_norm_keys = set(result_normalized.keys())

missed_norm_keys = gt_norm_keys - result_norm_keys
extra_norm_keys = result_norm_keys - gt_norm_keys

print(f"Missed codes (in ground_truth but not in result): {len(missed_norm_keys)}")
for norm_key in sorted(missed_norm_keys):
    print(f"  - {gt_normalized[norm_key]}")

print(f"\nExtra codes (in result but not in ground_truth): {len(extra_norm_keys)}")
for norm_key in sorted(extra_norm_keys):
    print(f"  - {result_normalized[norm_key]}")

if not missed_norm_keys and not extra_norm_keys:
    print("\nâœ“ Perfect match! All codes match between ground_truth and result.")

