# OCR Pipeline for Receipt Text Extraction

This notebook implements the OCR (Optical Character Recognition) step of our receipt processing pipeline.

## Pipeline Overview
1. **Detection** (completed) → YOLO detects text regions on receipts
2. **OCR** (this notebook) → Extract text from detected regions
3. **Parsing** → Structure the extracted text into key-value pairs

## OCR Engines Compared
| Engine | Accuracy | Speed | Languages | Best For |
|--------|----------|-------|-----------|----------|
| **EasyOCR** | High | Medium | 80+ | General purpose, receipts |
| **PaddleOCR** | Very High | Fast | 80+ | Chinese/Asian text |
| **Tesseract** | Medium | Fast | 100+ | Simple documents |
| **TrOCR** | Very High | Slow | English | Printed text |

We'll use **EasyOCR** as our primary engine due to its excellent performance on receipt images.

In [7]:
# Install required packages
# Run this cell once to install dependencies
%pip install easyocr paddleocr pytesseract opencv-python-headless pillow -q

[0mNote: you may need to restart the kernel to use updated packages.


In [1]:
# Import required libraries
import os
import cv2
import numpy as np
from pathlib import Path
from PIL import Image
import matplotlib.pyplot as plt
from typing import List, Dict, Tuple, Optional
import time

# Display settings
plt.rcParams['figure.figsize'] = (12, 8)
print("✓ Base libraries imported")

✓ Base libraries imported


## Step 1: Initialize OCR Engines

We'll set up EasyOCR as our primary engine. It provides excellent accuracy for receipt text.

In [None]:
# Initialize EasyOCR
import easyocr

# Create EasyOCR reader (downloads models on first run)
# Languages: 'en' for English, add more as needed e.g., ['en', 'th'] for Thai
print("Initializing EasyOCR (this may take a moment on first run)...")
reader = easyocr.Reader(
    ['en'],  # Languages to support
    gpu=True,  # Use GPU if available
    model_storage_directory='models/easyocr',
    download_enabled=True
)
print("✓ EasyOCR initialized successfully!")

## Step 2: Define OCR Engine Class

A unified interface for different OCR backends.

In [None]:
class ReceiptOCR:
    """
    Unified OCR interface for receipt text extraction.
    Supports EasyOCR with preprocessing optimizations for receipts.
    """
    
    def __init__(self, reader: easyocr.Reader):
        self.reader = reader
    
    def preprocess_image(self, image: np.ndarray) -> np.ndarray:
        """
        Preprocess image for better OCR accuracy on receipts.
        
        Args:
            image: Input image (BGR format from cv2)
            
        Returns:
            Preprocessed grayscale image
        """
        # Convert to grayscale if needed
        if len(image.shape) == 3:
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        else:
            gray = image.copy()
        
        # Apply adaptive thresholding for better contrast
        # This helps with faded receipt text
        denoised = cv2.fastNlMeansDenoising(gray, h=10)
        
        # Adaptive threshold works well for receipts with varying lighting
        binary = cv2.adaptiveThreshold(
            denoised, 255,
            cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
            cv2.THRESH_BINARY,
            blockSize=11,
            C=2
        )
        
        return binary
    
    def extract_text(
        self, 
        image: np.ndarray,
        preprocess: bool = True,
        detail: int = 1,
        paragraph: bool = False
    ) -> List[Dict]:
        """
        Extract text from an image using EasyOCR.
        
        Args:
            image: Input image (BGR or grayscale)
            preprocess: Whether to apply preprocessing
            detail: 0 for simple output, 1 for detailed (with bbox & confidence)
            paragraph: Whether to merge text into paragraphs
            
        Returns:
            List of detected text with bounding boxes and confidence scores
        """
        if preprocess:
            processed = self.preprocess_image(image)
        else:
            processed = image
        
        # Run OCR
        results = self.reader.readtext(
            processed,
            detail=detail,
            paragraph=paragraph,
            min_size=10,
            text_threshold=0.7,
            low_text=0.4,
            link_threshold=0.4,
            canvas_size=2560,
            mag_ratio=1.5
        )
        
        # Format results
        formatted_results = []
        for result in results:
            if detail == 1:
                bbox, text, confidence = result
                formatted_results.append({
                    'bbox': bbox,
                    'text': text,
                    'confidence': confidence
                })
            else:
                formatted_results.append({'text': result})
        
        return formatted_results
    
    def extract_from_regions(
        self,
        image: np.ndarray,
        regions: List[Tuple[int, int, int, int]],
        padding: int = 5
    ) -> List[Dict]:
        """
        Extract text from specific regions (e.g., YOLO detections).
        
        Args:
            image: Full image
            regions: List of (x1, y1, x2, y2) bounding boxes
            padding: Pixels to add around each region
            
        Returns:
            List of extracted text for each region
        """
        results = []
        h, w = image.shape[:2]
        
        for i, (x1, y1, x2, y2) in enumerate(regions):
            # Add padding and clip to image bounds
            x1 = max(0, x1 - padding)
            y1 = max(0, y1 - padding)
            x2 = min(w, x2 + padding)
            y2 = min(h, y2 + padding)
            
            # Crop region
            region_img = image[y1:y2, x1:x2]
            
            # Skip if region is too small
            if region_img.shape[0] < 10 or region_img.shape[1] < 10:
                continue
            
            # Extract text
            text_results = self.extract_text(region_img, preprocess=True)
            
            # Combine text from region
            combined_text = ' '.join([r['text'] for r in text_results])
            avg_confidence = np.mean([r['confidence'] for r in text_results]) if text_results else 0
            
            results.append({
                'region_id': i,
                'bbox': (x1, y1, x2, y2),
                'text': combined_text,
                'confidence': avg_confidence,
                'details': text_results
            })
        
        return results

# Initialize our OCR engine
ocr_engine = ReceiptOCR(reader)
print("✓ ReceiptOCR engine initialized")

## Step 3: Test OCR on Sample Receipt Images

Let's test our OCR on some receipt images from the dataset.

In [None]:
# Find sample receipt images
cord_test_dir = Path('data/cord/raw/test')
sroie_test_dir = Path('data/SROIE2019/test/img')

# Get sample images
sample_images = []
if cord_test_dir.exists():
    sample_images.extend(list(cord_test_dir.glob('*.png'))[:3])
if sroie_test_dir.exists():
    sample_images.extend(list(sroie_test_dir.glob('*.jpg'))[:3])

print(f"Found {len(sample_images)} sample images for testing")
for img_path in sample_images:
    print(f"  - {img_path.name}")

In [None]:
# Test OCR on a single image
def visualize_ocr_results(image_path: Path, ocr_engine: ReceiptOCR):
    """Visualize OCR results on an image."""
    # Load image
    image = cv2.imread(str(image_path))
    if image is None:
        print(f"Error: Could not load {image_path}")
        return
    
    # Convert BGR to RGB for display
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Run OCR
    start_time = time.time()
    results = ocr_engine.extract_text(image, preprocess=False, detail=1)
    elapsed = time.time() - start_time
    
    # Draw results
    fig, axes = plt.subplots(1, 2, figsize=(16, 10))
    
    # Original image
    axes[0].imshow(image_rgb)
    axes[0].set_title(f"Original: {image_path.name}")
    axes[0].axis('off')
    
    # Image with OCR annotations
    annotated = image_rgb.copy()
    for result in results:
        bbox = result['bbox']
        text = result['text']
        conf = result['confidence']
        
        # Convert bbox to integer points
        pts = np.array(bbox, dtype=np.int32)
        
        # Draw polygon
        cv2.polylines(annotated, [pts], True, (0, 255, 0), 2)
        
        # Add text label
        x, y = int(pts[0][0]), int(pts[0][1]) - 5
        cv2.putText(annotated, f"{text[:20]}...", (x, y), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 0, 0), 1)
    
    axes[1].imshow(annotated)
    axes[1].set_title(f"OCR Results ({len(results)} detections, {elapsed:.2f}s)")
    axes[1].axis('off')
    
    plt.tight_layout()
    plt.show()
    
    # Print extracted text
    print(f"\n{'='*60}")
    print(f"Extracted Text from {image_path.name}")
    print(f"{'='*60}")
    for i, result in enumerate(results):
        print(f"{i+1:3}. [{result['confidence']:.2f}] {result['text']}")
    
    return results

# Test on first sample image
if sample_images:
    results = visualize_ocr_results(sample_images[0], ocr_engine)

## Step 4: Integrate with YOLO Detection

Now let's combine our trained YOLO detector with the OCR engine for a complete pipeline.

In [None]:
# Load trained YOLO model
from ultralytics import YOLO

# Try to load the best combined model, fall back to CORD model
model_paths = [
    'runs/yolo11n_combined/train/weights/best.pt',
    'runs/yolo11n_cord/detect/weights/best.pt',
    'yolo11n.pt'  # Fallback to pretrained
]

yolo_model = None
for model_path in model_paths:
    if Path(model_path).exists():
        print(f"Loading YOLO model from: {model_path}")
        yolo_model = YOLO(model_path)
        print("✓ YOLO model loaded successfully!")
        break

if yolo_model is None:
    print("⚠ No trained model found. Using pretrained yolo11n.pt")
    yolo_model = YOLO('yolo11n.pt')

In [None]:
class ReceiptPipeline:
    """
    Complete receipt processing pipeline:
    1. YOLO detection for text regions
    2. OCR for text extraction
    """
    
    def __init__(self, yolo_model: YOLO, ocr_engine: ReceiptOCR):
        self.detector = yolo_model
        self.ocr = ocr_engine
    
    def detect_regions(
        self, 
        image: np.ndarray, 
        conf_threshold: float = 0.25
    ) -> List[Tuple[int, int, int, int]]:
        """
        Detect text regions using YOLO.
        
        Returns:
            List of (x1, y1, x2, y2) bounding boxes
        """
        results = self.detector.predict(
            image, 
            conf=conf_threshold,
            verbose=False
        )
        
        regions = []
        for result in results:
            if result.boxes is not None:
                for box in result.boxes:
                    x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
                    conf = box.conf[0].cpu().numpy()
                    regions.append((x1, y1, x2, y2, conf))
        
        # Sort by y-coordinate (top to bottom), then x (left to right)
        regions.sort(key=lambda r: (r[1], r[0]))
        
        return [(r[0], r[1], r[2], r[3]) for r in regions]
    
    def process_image(
        self,
        image_path: str,
        conf_threshold: float = 0.25,
        use_detection: bool = True
    ) -> Dict:
        """
        Process a receipt image through the full pipeline.
        
        Args:
            image_path: Path to the receipt image
            conf_threshold: Confidence threshold for detection
            use_detection: If True, use YOLO detection first; if False, OCR whole image
            
        Returns:
            Dictionary with detected regions and extracted text
        """
        # Load image
        image = cv2.imread(str(image_path))
        if image is None:
            raise ValueError(f"Could not load image: {image_path}")
        
        result = {
            'image_path': str(image_path),
            'image_size': (image.shape[1], image.shape[0]),
            'regions': [],
            'full_text': ''
        }
        
        if use_detection:
            # Step 1: Detect text regions
            regions = self.detect_regions(image, conf_threshold)
            result['num_regions'] = len(regions)
            
            # Step 2: Extract text from each region
            for i, (x1, y1, x2, y2) in enumerate(regions):
                region_img = image[y1:y2, x1:x2]
                
                # Skip tiny regions
                if region_img.shape[0] < 10 or region_img.shape[1] < 10:
                    continue
                
                # OCR on region
                ocr_results = self.ocr.extract_text(region_img, preprocess=True)
                text = ' '.join([r['text'] for r in ocr_results])
                avg_conf = np.mean([r['confidence'] for r in ocr_results]) if ocr_results else 0
                
                result['regions'].append({
                    'id': i,
                    'bbox': (x1, y1, x2, y2),
                    'text': text,
                    'confidence': float(avg_conf)
                })
        else:
            # OCR on full image
            ocr_results = self.ocr.extract_text(image, preprocess=True)
            for i, r in enumerate(ocr_results):
                result['regions'].append({
                    'id': i,
                    'bbox': r['bbox'],
                    'text': r['text'],
                    'confidence': r['confidence']
                })
        
        # Combine all text
        result['full_text'] = '\n'.join([r['text'] for r in result['regions'] if r['text']])
        
        return result
    
    def visualize_results(self, image_path: str, result: Dict):
        """Visualize pipeline results."""
        image = cv2.imread(str(image_path))
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        fig, axes = plt.subplots(1, 2, figsize=(16, 10))
        
        # Draw detections
        annotated = image_rgb.copy()
        for region in result['regions']:
            x1, y1, x2, y2 = region['bbox'][:4] if len(region['bbox']) == 4 else (
                int(min(p[0] for p in region['bbox'])),
                int(min(p[1] for p in region['bbox'])),
                int(max(p[0] for p in region['bbox'])),
                int(max(p[1] for p in region['bbox']))
            )
            
            # Draw rectangle
            cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # Add text preview
            text_preview = region['text'][:15] + '...' if len(region['text']) > 15 else region['text']
            cv2.putText(annotated, text_preview, (x1, y1-5),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 0, 0), 1)
        
        axes[0].imshow(annotated)
        axes[0].set_title(f"Detection + OCR ({len(result['regions'])} regions)")
        axes[0].axis('off')
        
        # Show extracted text
        axes[1].text(0.05, 0.95, result['full_text'], 
                    transform=axes[1].transAxes,
                    fontsize=9, verticalalignment='top',
                    fontfamily='monospace',
                    wrap=True)
        axes[1].set_title("Extracted Text")
        axes[1].axis('off')
        
        plt.tight_layout()
        plt.show()

# Initialize pipeline
pipeline = ReceiptPipeline(yolo_model, ocr_engine)
print("✓ Receipt processing pipeline initialized!")

## Step 5: Run Full Pipeline on Test Images

In [None]:
# Process sample images through the full pipeline
print("="*70)
print("Processing Sample Receipts Through Full Pipeline")
print("="*70)

for i, img_path in enumerate(sample_images[:3]):
    print(f"\n{'─'*70}")
    print(f"Image {i+1}: {img_path.name}")
    print(f"{'─'*70}")
    
    try:
        # Run pipeline
        start_time = time.time()
        result = pipeline.process_image(img_path, conf_threshold=0.25, use_detection=False)
        elapsed = time.time() - start_time
        
        print(f"Processing time: {elapsed:.2f}s")
        print(f"Regions detected: {len(result['regions'])}")
        
        # Visualize
        pipeline.visualize_results(img_path, result)
        
        # Print extracted text
        print(f"\nExtracted Text:")
        print(result['full_text'][:500] + "..." if len(result['full_text']) > 500 else result['full_text'])
        
    except Exception as e:
        print(f"Error processing {img_path.name}: {e}")

## Step 6: Batch Processing

Process multiple receipts and export results to JSON.

In [None]:
import json
from tqdm import tqdm

def batch_process_receipts(
    image_dir: Path,
    output_path: Path,
    pipeline: ReceiptPipeline,
    max_images: int = None
) -> List[Dict]:
    """
    Batch process multiple receipt images.
    
    Args:
        image_dir: Directory containing receipt images
        output_path: Path to save JSON results
        pipeline: ReceiptPipeline instance
        max_images: Maximum number of images to process (None for all)
        
    Returns:
        List of processing results
    """
    # Find all images
    image_files = list(image_dir.glob('*.png')) + list(image_dir.glob('*.jpg'))
    if max_images:
        image_files = image_files[:max_images]
    
    print(f"Processing {len(image_files)} images from {image_dir}")
    
    results = []
    errors = []
    
    for img_path in tqdm(image_files, desc="Processing"):
        try:
            result = pipeline.process_image(img_path, use_detection=False)
            results.append(result)
        except Exception as e:
            errors.append({'image': str(img_path), 'error': str(e)})
    
    # Save results
    output_data = {
        'processed': len(results),
        'errors': len(errors),
        'results': results,
        'error_details': errors
    }
    
    output_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(output_data, f, indent=2, ensure_ascii=False)
    
    print(f"\n✓ Results saved to {output_path}")
    print(f"  Processed: {len(results)}")
    print(f"  Errors: {len(errors)}")
    
    return results

# Process CORD test images
output_dir = Path('outputs/ocr_results')
output_dir.mkdir(parents=True, exist_ok=True)

if cord_test_dir.exists():
    cord_results = batch_process_receipts(
        cord_test_dir,
        output_dir / 'cord_ocr_results.json',
        pipeline,
        max_images=10  # Limit for demo
    )

## Step 7: Save OCR Engine to Module

Let's save our OCR classes to the src/ocr module for reuse.

In [None]:
# Save the OCR engine code to src/ocr/engine.py
ocr_engine_code = '''"""
Receipt OCR Engine using EasyOCR.

This module provides OCR functionality optimized for receipt images.
"""

import cv2
import numpy as np
import easyocr
from typing import List, Dict, Tuple, Optional
from pathlib import Path


class ReceiptOCR:
    """
    OCR engine optimized for receipt text extraction.
    Uses EasyOCR with preprocessing for better accuracy.
    """
    
    def __init__(
        self, 
        languages: List[str] = ['en'],
        gpu: bool = True,
        model_storage_directory: str = 'models/easyocr'
    ):
        """
        Initialize the OCR engine.
        
        Args:
            languages: List of language codes to support
            gpu: Whether to use GPU acceleration
            model_storage_directory: Directory to store OCR models
        """
        self.reader = easyocr.Reader(
            languages,
            gpu=gpu,
            model_storage_directory=model_storage_directory,
            download_enabled=True
        )
        self.languages = languages
    
    def preprocess_image(self, image: np.ndarray) -> np.ndarray:
        """
        Preprocess image for better OCR accuracy on receipts.
        
        Args:
            image: Input image (BGR format from cv2)
            
        Returns:
            Preprocessed grayscale image
        """
        # Convert to grayscale if needed
        if len(image.shape) == 3:
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        else:
            gray = image.copy()
        
        # Denoise
        denoised = cv2.fastNlMeansDenoising(gray, h=10)
        
        # Adaptive threshold for receipts with varying lighting
        binary = cv2.adaptiveThreshold(
            denoised, 255,
            cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
            cv2.THRESH_BINARY,
            blockSize=11,
            C=2
        )
        
        return binary
    
    def extract_text(
        self, 
        image: np.ndarray,
        preprocess: bool = True,
        detail: int = 1,
        paragraph: bool = False,
        min_size: int = 10,
        text_threshold: float = 0.7,
        low_text: float = 0.4,
        link_threshold: float = 0.4
    ) -> List[Dict]:
        """
        Extract text from an image.
        
        Args:
            image: Input image (BGR or grayscale)
            preprocess: Whether to apply preprocessing
            detail: 0 for text only, 1 for detailed output with bbox & confidence
            paragraph: Whether to merge text into paragraphs
            min_size: Minimum text size to detect
            text_threshold: Text confidence threshold
            low_text: Low text bound
            link_threshold: Link threshold for connecting text
            
        Returns:
            List of detected text with bounding boxes and confidence scores
        """
        if preprocess:
            processed = self.preprocess_image(image)
        else:
            processed = image
        
        results = self.reader.readtext(
            processed,
            detail=detail,
            paragraph=paragraph,
            min_size=min_size,
            text_threshold=text_threshold,
            low_text=low_text,
            link_threshold=link_threshold,
            canvas_size=2560,
            mag_ratio=1.5
        )
        
        formatted_results = []
        for result in results:
            if detail == 1:
                bbox, text, confidence = result
                formatted_results.append({
                    \'bbox\': bbox,
                    \'text\': text,
                    \'confidence\': confidence
                })
            else:
                formatted_results.append({\'text\': result})
        
        return formatted_results
    
    def extract_from_regions(
        self,
        image: np.ndarray,
        regions: List[Tuple[int, int, int, int]],
        padding: int = 5
    ) -> List[Dict]:
        """
        Extract text from specific regions.
        
        Args:
            image: Full image
            regions: List of (x1, y1, x2, y2) bounding boxes
            padding: Pixels to add around each region
            
        Returns:
            List of extracted text for each region
        """
        results = []
        h, w = image.shape[:2]
        
        for i, (x1, y1, x2, y2) in enumerate(regions):
            # Add padding and clip
            x1 = max(0, x1 - padding)
            y1 = max(0, y1 - padding)
            x2 = min(w, x2 + padding)
            y2 = min(h, y2 + padding)
            
            # Crop region
            region_img = image[y1:y2, x1:x2]
            
            if region_img.shape[0] < 10 or region_img.shape[1] < 10:
                continue
            
            text_results = self.extract_text(region_img, preprocess=True)
            combined_text = \' \'.join([r[\'text\'] for r in text_results])
            avg_confidence = np.mean([r[\'confidence\'] for r in text_results]) if text_results else 0
            
            results.append({
                \'region_id\': i,
                \'bbox\': (x1, y1, x2, y2),
                \'text\': combined_text,
                \'confidence\': avg_confidence,
                \'details\': text_results
            })
        
        return results
'''

# Write to file
engine_path = Path('src/ocr/engine.py')
engine_path.parent.mkdir(parents=True, exist_ok=True)
with open(engine_path, 'w') as f:
    f.write(ocr_engine_code)

print(f"✓ OCR engine saved to {engine_path}")

## Summary

This notebook implements a complete OCR pipeline for receipt text extraction:

### Components Created:
1. **ReceiptOCR** - EasyOCR wrapper with receipt-specific preprocessing
2. **ReceiptPipeline** - Combines YOLO detection + OCR extraction
3. **Batch Processing** - Process multiple receipts and export to JSON

### Key Features:
- Adaptive thresholding for faded receipt text
- Denoising for cleaner OCR input
- Support for region-based extraction (using YOLO detections)
- Full-image OCR fallback
- JSON export for downstream processing

### Next Steps:
- Notebook `03_parsing.ipynb`: Parse extracted text into structured key-value pairs
- Notebook `04_kie_evaluation.ipynb`: Evaluate Key Information Extraction accuracy