# OCR Pipeline for Handwritten Document PII Extraction

This notebook implements an end-to-end pipeline for:
1. **Pre-processing** - Image enhancement (deskew, denoise, binarization)
2. **OCR** - Text extraction from handwritten documents
3. **Text Cleaning** - Post-processing of extracted text
4. **PII Detection** - Identifying personal identifiable information
5. **Image Redaction** (Optional) - Masking PII in the original image

## 1. Install Dependencies

In [None]:
# Install required packages
!pip install opencv-python numpy pillow pytesseract easyocr spacy regex
!python -m spacy download en_core_web_sm

## 2. Import Libraries

In [None]:
import cv2
import numpy as np
from PIL import Image, ImageDraw
import pytesseract
import easyocr
import re
import spacy
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
import os
import json
from datetime import datetime

# Load spaCy model for NER
try:
    nlp = spacy.load("en_core_web_sm")
except:
    import subprocess
    subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"])
    nlp = spacy.load("en_core_web_sm")

print("All libraries loaded successfully!")

## 3. Configuration

In [None]:
@dataclass
class PipelineConfig:
    """Configuration for the OCR Pipeline - Optimized for handwritten medical documents"""
    # Pre-processing settings
    resize_width: int = 2500  # Larger for better OCR
    denoise_strength: int = 8  # Reduced to preserve handwriting details

    # OCR settings
    ocr_engine: str = "easyocr"  # Best for handwriting
    tesseract_config: str = "--oem 3 --psm 6"
    easyocr_languages: List[str] = None
    min_confidence: float = 0.25  # Lower threshold to capture more text

    # PII Detection settings
    detect_names: bool = True
    detect_dates: bool = True
    detect_phone: bool = True
    detect_ids: bool = True
    detect_age: bool = True
    detect_address: bool = True

    def __post_init__(self):
        if self.easyocr_languages is None:
            self.easyocr_languages = ["en"]

# Default configuration
config = PipelineConfig()
print(f"Pipeline configured with OCR engine: {config.ocr_engine}")

## 4. Image Pre-processing Module

In [None]:
class ImagePreprocessor:
    """
    Enhanced image pre-processing optimized for:
    - Slightly tilted images (advanced deskew)
    - Different handwriting styles
    - Doctor/clinic-style notes and forms
    """

    def __init__(self, config: PipelineConfig):
        self.config = config

    def load_image(self, image_path: str) -> np.ndarray:
        """Load image from file path"""
        image = cv2.imread(image_path)
        if image is None:
            raise ValueError(f"Could not load image from {image_path}")
        return image

    def resize_image(self, image: np.ndarray) -> np.ndarray:
        """Resize image while maintaining aspect ratio"""
        height, width = image.shape[:2]
        if width > self.config.resize_width:
            ratio = self.config.resize_width / width
            new_height = int(height * ratio)
            image = cv2.resize(image, (self.config.resize_width, new_height),
                             interpolation=cv2.INTER_LANCZOS4)
        return image

    def convert_to_grayscale(self, image: np.ndarray) -> np.ndarray:
        """Convert image to grayscale"""
        if len(image.shape) == 3:
            return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        return image

    def remove_shadows(self, image: np.ndarray) -> np.ndarray:
        """Remove shadows from image - important for phone camera photos"""
        dilated = cv2.dilate(image, np.ones((7, 7), np.uint8))
        bg = cv2.medianBlur(dilated, 21)
        diff = 255 - cv2.absdiff(image, bg)
        normalized = cv2.normalize(diff, None, 0, 255, cv2.NORM_MINMAX)
        return normalized

    def deskew_hough(self, image: np.ndarray) -> Tuple[np.ndarray, float]:
        """Deskew using Hough Line Transform - good for documents with lines"""
        edges = cv2.Canny(image, 50, 150, apertureSize=3)
        lines = cv2.HoughLines(edges, 1, np.pi/180, 200)

        if lines is not None:
            angles = []
            for line in lines:
                rho, theta = line[0]
                angle = (theta * 180 / np.pi) - 90
                if -45 < angle < 45:
                    angles.append(angle)

            if angles:
                median_angle = np.median(angles)
                if abs(median_angle) > 0.5:
                    return self._rotate_image(image, median_angle), median_angle
        return image, 0.0

    def deskew_minarea(self, image: np.ndarray) -> Tuple[np.ndarray, float]:
        """Deskew using minimum area rectangle - good for handwritten text"""
        _, binary = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
        contours, _ = cv2.findContours(binary, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)

        if not contours:
            return image, 0.0

        all_points = np.vstack(contours)
        rect = cv2.minAreaRect(all_points)
        angle = rect[-1]

        if angle < -45:
            angle = 90 + angle
        elif angle > 45:
            angle = angle - 90

        if abs(angle) > 0.5:
            return self._rotate_image(image, angle), angle
        return image, 0.0

    def _rotate_image(self, image: np.ndarray, angle: float) -> np.ndarray:
        """Rotate image by given angle"""
        if abs(angle) < 0.5:
            return image
        (h, w) = image.shape[:2]
        center = (w // 2, h // 2)
        M = cv2.getRotationMatrix2D(center, angle, 1.0)
        rotated = cv2.warpAffine(image, M, (w, h),
                                 flags=cv2.INTER_CUBIC,
                                 borderMode=cv2.BORDER_REPLICATE)
        return rotated

    def advanced_deskew(self, image: np.ndarray) -> np.ndarray:
        """Advanced deskew combining multiple methods for tilted images"""
        best_result = image
        best_angle = 0

        # Method 1: Hough Transform
        try:
            result1, angle1 = self.deskew_hough(image.copy())
            if 0.5 < abs(angle1) < 30:
                best_result = result1
                best_angle = angle1
        except Exception:
            pass

        # Method 2: MinArea Rectangle
        if abs(best_angle) < 1:
            try:
                result2, angle2 = self.deskew_minarea(image.copy())
                if abs(angle2) > abs(best_angle) and abs(angle2) < 30:
                    best_result = result2
            except Exception:
                pass

        return best_result

    def enhance_contrast(self, image: np.ndarray) -> np.ndarray:
        """Enhance image contrast using CLAHE"""
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
        return clahe.apply(image)

    def denoise(self, image: np.ndarray) -> np.ndarray:
        """Remove noise from image"""
        return cv2.fastNlMeansDenoising(image, None,
                                        self.config.denoise_strength, 7, 21)

    def enhance_handwriting(self, image: np.ndarray) -> np.ndarray:
        """Special enhancement for different handwriting styles"""
        # Bilateral filter preserves edges while removing noise
        filtered = cv2.bilateralFilter(image, 9, 75, 75)

        # Sharpen to make handwriting clearer
        kernel = np.array([[-1,-1,-1],
                          [-1, 9,-1],
                          [-1,-1,-1]])
        sharpened = cv2.filter2D(filtered, -1, kernel)

        return cv2.normalize(sharpened, None, 0, 255, cv2.NORM_MINMAX)

    def preprocess(self, image_path: str,
                   apply_deskew: bool = True,
                   apply_denoise: bool = True,
                   apply_threshold: bool = False) -> Tuple[np.ndarray, np.ndarray]:
        """
        Full pre-processing pipeline optimized for:
        - Slightly tilted images
        - Different handwriting styles
        - Medical document forms
        """
        # Load and resize
        original = self.load_image(image_path)
        image = self.resize_image(original.copy())

        # Convert to grayscale
        gray = self.convert_to_grayscale(image)

        # Remove shadows (important for phone photos)
        gray = self.remove_shadows(gray)

        # Deskew tilted images
        if apply_deskew:
            gray = self.advanced_deskew(gray)

        # Enhance contrast
        gray = self.enhance_contrast(gray)

        # Enhance for handwriting
        gray = self.enhance_handwriting(gray)

        # Denoise
        if apply_denoise:
            gray = self.denoise(gray)

        return original, gray

# Initialize preprocessor
preprocessor = ImagePreprocessor(config)
print("Image preprocessor initialized!")

## 5. OCR Engine Module

In [None]:
class OCREngine:
    """Handles OCR text extraction"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.easyocr_reader = None
        
        # Initialize EasyOCR reader (lazy loading)
        if config.ocr_engine in ["easyocr", "both"]:
            print("Initializing EasyOCR... (this may take a moment)")
            self.easyocr_reader = easyocr.Reader(config.easyocr_languages, gpu=False)
            print("EasyOCR initialized!")
    
    def extract_with_tesseract(self, image: np.ndarray) -> Dict:
        """Extract text using Tesseract OCR"""
        # Get detailed data
        data = pytesseract.image_to_data(image, 
                                         config=self.config.tesseract_config,
                                         output_type=pytesseract.Output.DICT)
        
        # Get plain text
        text = pytesseract.image_to_string(image, 
                                           config=self.config.tesseract_config)
        
        # Extract bounding boxes
        boxes = []
        for i in range(len(data['text'])):
            if int(data['conf'][i]) > 0:  # Filter low confidence
                boxes.append({
                    'text': data['text'][i],
                    'confidence': data['conf'][i],
                    'bbox': (data['left'][i], data['top'][i], 
                            data['width'][i], data['height'][i])
                })
        
        return {
            'engine': 'tesseract',
            'text': text,
            'boxes': boxes
        }
    
    def extract_with_easyocr(self, image: np.ndarray) -> Dict:
        """Extract text using EasyOCR"""
        results = self.easyocr_reader.readtext(image)
        
        # Compile text
        text_parts = []
        boxes = []
        
        for (bbox, text, confidence) in results:
            text_parts.append(text)
            # Convert bbox format
            x_coords = [p[0] for p in bbox]
            y_coords = [p[1] for p in bbox]
            boxes.append({
                'text': text,
                'confidence': confidence * 100,
                'bbox': (int(min(x_coords)), int(min(y_coords)),
                        int(max(x_coords) - min(x_coords)),
                        int(max(y_coords) - min(y_coords))),
                'polygon': bbox
            })
        
        return {
            'engine': 'easyocr',
            'text': '\n'.join(text_parts),
            'boxes': boxes
        }
    
    def extract(self, image: np.ndarray) -> Dict:
        """Extract text using configured OCR engine(s)"""
        results = {}
        
        if self.config.ocr_engine == "tesseract":
            results = self.extract_with_tesseract(image)
        elif self.config.ocr_engine == "easyocr":
            results = self.extract_with_easyocr(image)
        elif self.config.ocr_engine == "both":
            tesseract_result = self.extract_with_tesseract(image)
            easyocr_result = self.extract_with_easyocr(image)
            
            # Combine results (prefer EasyOCR for handwriting)
            results = {
                'engine': 'combined',
                'tesseract': tesseract_result,
                'easyocr': easyocr_result,
                'text': easyocr_result['text'],  # Primary text from EasyOCR
                'boxes': easyocr_result['boxes']
            }
        
        return results

# Initialize OCR engine
ocr_engine = OCREngine(config)
print("OCR engine initialized!")

## 6. Text Cleaning Module

In [None]:
class TextCleaner:
    """Handles text post-processing and cleaning"""
    
    def __init__(self):
        # Common OCR errors in medical documents
        self.corrections = {
            '|': 'I',
            '0': 'O',  # Context-dependent
            '1': 'l',  # Context-dependent
            '\\': '',
            '`': "'",
        }
        
        # Medical abbreviations to preserve
        self.medical_abbrevs = [
            'mg', 'ml', 'IV', 'IM', 'PO', 'BD', 'TID', 'QID', 
            'PRN', 'STAT', 'OD', 'BP', 'HR', 'RR', 'SPO2',
            'Tab', 'Cap', 'Inj', 'Syp', 'Dr', 'IPD', 'OPD', 'UHID'
        ]
    
    def remove_extra_whitespace(self, text: str) -> str:
        """Remove extra whitespace while preserving structure"""
        # Replace multiple spaces with single space
        text = re.sub(r' +', ' ', text)
        # Replace multiple newlines with double newline
        text = re.sub(r'\n{3,}', '\n\n', text)
        return text.strip()
    
    def remove_noise_characters(self, text: str) -> str:
        """Remove common noise characters from OCR"""
        # Remove isolated special characters
        text = re.sub(r'(?<![\w])[\_\-\~\^\*\#\@]+(?![\w])', '', text)
        # Remove very short lines (likely noise)
        lines = text.split('\n')
        cleaned_lines = [line for line in lines if len(line.strip()) > 1]
        return '\n'.join(cleaned_lines)
    
    def fix_common_ocr_errors(self, text: str) -> str:
        """Fix common OCR misrecognitions"""
        # Fix common patterns
        text = re.sub(r'\bPaticnt\b', 'Patient', text, flags=re.IGNORECASE)
        text = re.sub(r'\bNamc\b', 'Name', text, flags=re.IGNORECASE)
        text = re.sub(r'\bAgc\b', 'Age', text, flags=re.IGNORECASE)
        text = re.sub(r'\bScx\b', 'Sex', text, flags=re.IGNORECASE)
        text = re.sub(r'\bDatc\b', 'Date', text, flags=re.IGNORECASE)
        return text
    
    def normalize_dates(self, text: str) -> str:
        """Normalize date formats"""
        # Common date patterns: DD/MM/YY, DD-MM-YYYY, etc.
        # Keep original format but fix spacing
        text = re.sub(r'(\d{1,2})\s*[/\-\.]\s*(\d{1,2})\s*[/\-\.]\s*(\d{2,4})', 
                     r'\1/\2/\3', text)
        return text
    
    def clean(self, text: str) -> str:
        """Apply full text cleaning pipeline"""
        text = self.remove_extra_whitespace(text)
        text = self.remove_noise_characters(text)
        text = self.fix_common_ocr_errors(text)
        text = self.normalize_dates(text)
        return text

# Initialize text cleaner
text_cleaner = TextCleaner()
print("Text cleaner initialized!")

## 7. PII Detection Module

In [None]:
@dataclass
class PIIEntity:
    """Represents a detected PII entity"""
    type: str
    value: str
    start: int
    end: int
    confidence: float

    def to_dict(self):
        return {
            'type': self.type,
            'value': self.value,
            'start': self.start,
            'end': self.end,
            'confidence': self.confidence
        }


class PIIDetector:
    """
    PII Detection optimized for medical/hospital documents.

    Detects:
    - Patient Name
    - Age
    - Sex/Gender
    - UHID (Unique Health ID)
    - IPD No (Inpatient Department Number)
    - Bed No
    - Dates
    - Doctor Names
    - Phone/Mobile Numbers
    - Registration Numbers
    """

    def __init__(self, config: PipelineConfig):
        self.config = config
        self.nlp = nlp

        # Medical document PII patterns
        self.patterns = {
            'PHONE': [
                r'\b(?:\+91[\-\s]?)?[6-9]\d{9}\b',  # Indian mobile
                r'\b\d{3}[\-\s]?\d{3}[\-\s]?\d{4}\b',  # General format
                r'\b(?:Mobile|Phone|Tel|Contact)[:\s]*([\d\-\s\+]+)\b'
            ],
            'DATE': [
                r'\b\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}\b',  # DD/MM/YYYY
                r'\b\d{1,2}[\s\-](?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*[\s\-]\d{2,4}\b',
            ],
            'ID_NUMBER': [
                r'\b(?:UHID|IPD|OPD|MRN|Reg)[\s\.\-:No]*[:\s]*([A-Z0-9\-]+)\b',
                r'\b(?:Bed)[\s\.\-:No]*[:\s]*(\d+)\b',
            ],
            'AGE': [
                r'\b(?:Age)[\s:]*([\d]+)[\s]?(?:Y|yr|years?|yrs?)?\b',
                r'\b(\d{1,3})[\s]?(?:Y|yr|years|yrs)\b',
            ],
            'GENDER': [
                r'\b(?:Sex|Gender)[\s:]*([MF]|Male|Female)\b',
            ],
        }

        # Name patterns for medical documents
        self.name_patterns = [
            r'(?:Patient\s*Name|Pat\.?\s*Name)[:\s]+([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,3})',
            r'(?:Dr\.?|Doctor)\s+([A-Z][a-z]+(?:\s+[A-Z]\.?\s*[a-z]*)?)',
            r'(?:Consultant|Resident|Physician)[:\s]+([A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,2})',
        ]

    def _is_noise(self, value: str) -> bool:
        """Filter out OCR noise"""
        v = value.strip()
        if len(v) < 2:
            return True
        # Filter values with too many special characters
        alpha_count = sum(1 for c in v if c.isalnum())
        if alpha_count < len(v) * 0.5:
            return True
        # Filter common noise patterns
        if v in ['&', 'Mle', 'Jle', 'Bl', 'ug', 'cr', 'Ln', 'LU']:
            return True
        return False

    def detect_with_regex(self, text: str) -> List[PIIEntity]:
        """Detect PII using regex patterns"""
        entities = []

        for pii_type, patterns in self.patterns.items():
            for pattern in patterns:
                for match in re.finditer(pattern, text, re.IGNORECASE):
                    value = match.group(1) if match.lastindex else match.group(0)
                    value = value.strip()
                    if self._is_noise(value):
                        continue
                    entities.append(PIIEntity(
                        type=pii_type,
                        value=value,
                        start=match.start(),
                        end=match.end(),
                        confidence=0.9
                    ))

        # Detect names
        for pattern in self.name_patterns:
            for match in re.finditer(pattern, text, re.IGNORECASE):
                value = match.group(1) if match.lastindex else match.group(0)
                if len(value.strip()) > 2 and not self._is_noise(value):
                    entities.append(PIIEntity(
                        type='NAME',
                        value=value.strip(),
                        start=match.start(),
                        end=match.end(),
                        confidence=0.85
                    ))

        return entities

    def detect_with_ner(self, text: str) -> List[PIIEntity]:
        """Detect PII using spaCy NER"""
        entities = []
        doc = self.nlp(text)

        type_mapping = {
            'PERSON': 'NAME',
            'DATE': 'DATE',
            'GPE': 'LOCATION',
            'LOC': 'LOCATION',
            'ORG': 'ORGANIZATION',
        }

        for ent in doc.ents:
            if ent.label_ in type_mapping:
                if len(ent.text) > 2 and not self._is_noise(ent.text):
                    entities.append(PIIEntity(
                        type=type_mapping[ent.label_],
                        value=ent.text,
                        start=ent.start_char,
                        end=ent.end_char,
                        confidence=0.75
                    ))

        return entities

    def merge_entities(self, entities: List[PIIEntity]) -> List[PIIEntity]:
        """Merge overlapping entities, preferring higher confidence"""
        if not entities:
            return []

        sorted_entities = sorted(entities, key=lambda x: (x.start, -x.confidence))
        merged = [sorted_entities[0]]

        for entity in sorted_entities[1:]:
            last = merged[-1]
            if entity.start < last.end:
                if entity.confidence > last.confidence:
                    merged[-1] = entity
            else:
                merged.append(entity)

        return merged

    def detect(self, text: str) -> List[PIIEntity]:
        """Detect all PII in text"""
        regex_entities = self.detect_with_regex(text)
        ner_entities = self.detect_with_ner(text)

        all_entities = regex_entities + ner_entities
        merged_entities = self.merge_entities(all_entities)

        # Remove duplicates
        seen = set()
        unique_entities = []
        for entity in merged_entities:
            key = (entity.type, entity.value.lower())
            if key not in seen:
                seen.add(key)
                unique_entities.append(entity)

        return unique_entities

# Initialize PII detector
pii_detector = PIIDetector(config)
print("PII detector initialized!")

## 8. Image Redaction Module (Optional)

In [None]:
class ImageRedactor:
    """Redacts PII from images"""
    
    def __init__(self, redaction_color=(0, 0, 0)):
        self.redaction_color = redaction_color
    
    def find_text_bbox(self, ocr_boxes: List[Dict], pii_value: str) -> Optional[Tuple]:
        """Find bounding box for PII text in OCR results"""
        pii_lower = pii_value.lower().strip()
        
        for box in ocr_boxes:
            box_text = box['text'].lower().strip()
            if pii_lower in box_text or box_text in pii_lower:
                return box['bbox']
        
        # Try partial matching
        pii_words = pii_lower.split()
        for word in pii_words:
            if len(word) > 2:  # Skip short words
                for box in ocr_boxes:
                    if word in box['text'].lower():
                        return box['bbox']
        
        return None
    
    def redact_image(self, image: np.ndarray, 
                     ocr_result: Dict, 
                     pii_entities: List[PIIEntity]) -> np.ndarray:
        """Redact PII from image"""
        # Convert to PIL for drawing
        if len(image.shape) == 2:  # Grayscale
            pil_image = Image.fromarray(image).convert('RGB')
        else:
            pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        
        draw = ImageDraw.Draw(pil_image)
        
        ocr_boxes = ocr_result.get('boxes', [])
        
        for entity in pii_entities:
            bbox = self.find_text_bbox(ocr_boxes, entity.value)
            if bbox:
                x, y, w, h = bbox
                # Add padding
                padding = 5
                draw.rectangle(
                    [x - padding, y - padding, x + w + padding, y + h + padding],
                    fill=self.redaction_color
                )
        
        # Convert back to OpenCV format
        redacted = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
        return redacted

# Initialize redactor
redactor = ImageRedactor()
print("Image redactor initialized!")

## 9. Main Pipeline

In [None]:
class OCRPIIPipeline:
    """Main pipeline orchestrating all modules"""
    
    def __init__(self, config: PipelineConfig = None):
        self.config = config or PipelineConfig()
        self.preprocessor = ImagePreprocessor(self.config)
        self.ocr_engine = OCREngine(self.config)
        self.text_cleaner = TextCleaner()
        self.pii_detector = PIIDetector(self.config)
        self.redactor = ImageRedactor()
    
    def process(self, image_path: str, 
                generate_redacted: bool = True) -> Dict:
        """Process a single image through the pipeline"""
        print(f"\n{'='*60}")
        print(f"Processing: {image_path}")
        print(f"{'='*60}")
        
        results = {
            'input_file': image_path,
            'timestamp': datetime.now().isoformat(),
            'stages': {}
        }
        
        # Stage 1: Pre-processing
        print("\n[1/5] Pre-processing image...")
        original, processed = self.preprocessor.preprocess(image_path)
        results['stages']['preprocessing'] = {
            'status': 'completed',
            'original_shape': original.shape,
            'processed_shape': processed.shape
        }
        print(f"  - Original size: {original.shape}")
        print(f"  - Processed size: {processed.shape}")
        
        # Stage 2: OCR
        print("\n[2/5] Extracting text with OCR...")
        ocr_result = self.ocr_engine.extract(processed)
        results['stages']['ocr'] = {
            'status': 'completed',
            'engine': ocr_result['engine'],
            'text_length': len(ocr_result['text']),
            'boxes_count': len(ocr_result.get('boxes', []))
        }
        print(f"  - Engine: {ocr_result['engine']}")
        print(f"  - Extracted {len(ocr_result['text'])} characters")
        print(f"  - Found {len(ocr_result.get('boxes', []))} text regions")
        
        # Stage 3: Text Cleaning
        print("\n[3/5] Cleaning extracted text...")
        raw_text = ocr_result['text']
        cleaned_text = self.text_cleaner.clean(raw_text)
        results['stages']['text_cleaning'] = {
            'status': 'completed',
            'raw_length': len(raw_text),
            'cleaned_length': len(cleaned_text)
        }
        results['raw_text'] = raw_text
        results['cleaned_text'] = cleaned_text
        print(f"  - Raw text length: {len(raw_text)}")
        print(f"  - Cleaned text length: {len(cleaned_text)}")
        
        # Stage 4: PII Detection
        print("\n[4/5] Detecting PII...")
        pii_entities = self.pii_detector.detect(cleaned_text)
        results['stages']['pii_detection'] = {
            'status': 'completed',
            'entities_found': len(pii_entities)
        }
        results['pii_entities'] = [e.to_dict() for e in pii_entities]
        print(f"  - Found {len(pii_entities)} PII entities")
        
        # Group PII by type
        pii_by_type = {}
        for entity in pii_entities:
            if entity.type not in pii_by_type:
                pii_by_type[entity.type] = []
            pii_by_type[entity.type].append(entity.value)
        results['pii_summary'] = pii_by_type
        
        for pii_type, values in pii_by_type.items():
            print(f"    - {pii_type}: {values}")
        
        # Stage 5: Image Redaction (Optional)
        if generate_redacted:
            print("\n[5/5] Generating redacted image...")
            redacted_image = self.redactor.redact_image(
                processed, ocr_result, pii_entities
            )
            results['redacted_image'] = redacted_image
            results['stages']['redaction'] = {'status': 'completed'}
            print("  - Redacted image generated")
        else:
            results['stages']['redaction'] = {'status': 'skipped'}
        
        # Store processed images for visualization
        results['original_image'] = original
        results['processed_image'] = processed
        
        print(f"\n{'='*60}")
        print("Processing completed!")
        print(f"{'='*60}")
        
        return results
    
    def process_batch(self, image_paths: List[str], 
                      generate_redacted: bool = True) -> List[Dict]:
        """Process multiple images"""
        results = []
        for path in image_paths:
            try:
                result = self.process(path, generate_redacted)
                results.append(result)
            except Exception as e:
                print(f"Error processing {path}: {str(e)}")
                results.append({
                    'input_file': path,
                    'error': str(e)
                })
        return results

# Initialize pipeline
pipeline = OCRPIIPipeline(config)
print("\nOCR PII Pipeline initialized and ready!")

## 10. Visualization Utilities

In [None]:
import matplotlib.pyplot as plt

def visualize_results(result: Dict, figsize=(20, 10)):
    """Visualize pipeline results"""
    fig, axes = plt.subplots(1, 3, figsize=figsize)

    # Original image
    axes[0].imshow(cv2.cvtColor(result['original_image'], cv2.COLOR_BGR2RGB))
    axes[0].set_title('Original Image')
    axes[0].axis('off')

    # Processed image
    axes[1].imshow(result['processed_image'], cmap='gray')
    axes[1].set_title('Processed Image')
    axes[1].axis('off')

    # Redacted image (if available)
    if 'redacted_image' in result:
        axes[2].imshow(cv2.cvtColor(result['redacted_image'], cv2.COLOR_BGR2RGB))
        axes[2].set_title('Redacted Image')
    else:
        axes[2].text(0.5, 0.5, 'Redaction\nSkipped',
                    ha='center', va='center', fontsize=20)
        axes[2].set_title('Redacted Image (Skipped)')
    axes[2].axis('off')

    plt.tight_layout()
    plt.show()

def print_pii_report(result: Dict):
    """Print detailed PII report"""
    print("\n" + "="*60)
    print("PII DETECTION REPORT")
    print("="*60)
    print(f"File: {result['input_file']}")
    print(f"Timestamp: {result['timestamp']}")
    print("-"*60)

    if 'pii_summary' in result:
        print("\nDetected PII Entities:")
        for pii_type, values in result['pii_summary'].items():
            print(f"\n  [{pii_type}]")
            for value in values:
                print(f"    - {value}")
    else:
        print("\nNo PII entities detected.")

    print("\n" + "="*60)

def save_results(result: Dict, output_dir: str):
    """Save results to files - clean JSON format for benchmarking"""
    os.makedirs(output_dir, exist_ok=True)

    base_name = os.path.splitext(os.path.basename(result['input_file']))[0]

    # Save processed image
    cv2.imwrite(
        os.path.join(output_dir, f"{base_name}_processed.jpg"),
        result['processed_image']
    )

    # Save redacted image
    if 'redacted_image' in result:
        cv2.imwrite(
            os.path.join(output_dir, f"{base_name}_redacted.jpg"),
            result['redacted_image']
        )

    # Visualization
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    axes[0].imshow(cv2.cvtColor(result['original_image'], cv2.COLOR_BGR2RGB))
    axes[0].set_title('Original')
    axes[0].axis('off')
    axes[1].imshow(result['processed_image'], cmap='gray')
    axes[1].set_title('Processed')
    axes[1].axis('off')
    if 'redacted_image' in result:
        axes[2].imshow(cv2.cvtColor(result['redacted_image'], cv2.COLOR_BGR2RGB))
        axes[2].set_title('Redacted')
    axes[2].axis('off')
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f"{base_name}_visualization.png"), dpi=150)
    plt.close()

    # Save clean JSON report
    report = {
        'input_file': result['input_file'],
        'timestamp': result['timestamp'],
        'cleaned_text': result['cleaned_text'],
        'pii_entities': result['pii_entities'],
        'pii_summary': result['pii_summary']
    }

    with open(os.path.join(output_dir, f"{base_name}_report.json"), 'w') as f:
        json.dump(report, f, indent=2)

    print(f"Results saved to: {output_dir}/{base_name}_*")

print("Visualization utilities loaded!")

## 11. Process Sample Documents

In [None]:
# Define sample image paths
SAMPLES_DIR = "samples"
OUTPUT_DIR = "results"

sample_images = [
    os.path.join(SAMPLES_DIR, "sample1.jpg"),
    os.path.join(SAMPLES_DIR, "sample2.jpg"),
    os.path.join(SAMPLES_DIR, "sample3.jpg"),
]

# Verify samples exist
for img_path in sample_images:
    if os.path.exists(img_path):
        print(f"Found: {img_path}")
    else:
        print(f"Missing: {img_path}")

In [None]:
# Process Sample 1
result1 = pipeline.process(sample_images[0], generate_redacted=True)
print_pii_report(result1)
visualize_results(result1)
save_results(result1, OUTPUT_DIR)

In [None]:
# Process Sample 2
result2 = pipeline.process(sample_images[1], generate_redacted=True)
print_pii_report(result2)
visualize_results(result2)
save_results(result2, OUTPUT_DIR)

In [None]:
# Process Sample 3
result3 = pipeline.process(sample_images[2], generate_redacted=True)
print_pii_report(result3)
visualize_results(result3)
save_results(result3, OUTPUT_DIR)

## 12. Summary of Results

In [None]:
# Aggregate results
all_results = [result1, result2, result3]

print("\n" + "="*70)
print("SUMMARY OF ALL PROCESSED DOCUMENTS")
print("="*70)

for i, result in enumerate(all_results, 1):
    print(f"\n--- Document {i}: {os.path.basename(result['input_file'])} ---")
    
    # Count PII by type
    if 'pii_summary' in result:
        total_pii = sum(len(v) for v in result['pii_summary'].values())
        print(f"Total PII Found: {total_pii}")
        
        for pii_type, values in result['pii_summary'].items():
            print(f"  {pii_type}: {len(values)} items")
            for v in values[:3]:  # Show first 3
                print(f"    - {v}")
            if len(values) > 3:
                print(f"    ... and {len(values)-3} more")

print("\n" + "="*70)
print(f"Results saved to: {OUTPUT_DIR}/")
print("="*70)

## 13. Pipeline Usage Function (For New Documents)

In [None]:
def process_new_document(image_path: str, output_dir: str = "results"):
    """
    Process a new document through the OCR PII pipeline.
    
    Args:
        image_path: Path to the handwritten document image (JPEG)
        output_dir: Directory to save results
    
    Returns:
        Dictionary containing all extraction results
    """
    # Initialize pipeline with default config
    pipeline = OCRPIIPipeline(PipelineConfig())
    
    # Process the document
    result = pipeline.process(image_path, generate_redacted=True)
    
    # Print report
    print_pii_report(result)
    
    # Visualize
    visualize_results(result)
    
    # Save results
    save_results(result, output_dir)
    
    return result

# Example usage:
# result = process_new_document("path/to/your/document.jpg")
print("Pipeline ready for processing new documents!")
print("Usage: result = process_new_document('path/to/document.jpg')")