<a href="https://colab.research.google.com/github/DLSNemsara/deepseek-ocr-pipeline/blob/main/DeepSeek_OCR_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DeepSeek-OCR Document Processing Pipeline

**Purpose:** Evaluate DeepSeek-OCR performance on high-volume legal document processing

**Test Documents:**
- Court documents (various types)
- Multi-format support (PDF, TIFF, JPG, PNG)
- Complex layouts with multi-column text
- Handwritten and typed content

**Output:**
- Processed OCR results in markdown format
- Comprehensive performance metrics report
- Success rate and latency analysis

**Model:** deepseek-ai/DeepSeek-OCR (bfloat16 precision)

In [None]:
# ============================================================================
# CELL 1: Install Dependencies
# ============================================================================
print("üì¶ Installing dependencies...\n")

!pip install -q transformers==4.46.3 tokenizers==0.20.3 einops addict easydict pillow
!pip install -q pdf2image PyPDF2
!apt-get install -q poppler-utils  # For PDF processing

print("\n‚úÖ Dependencies installed successfully!")

In [None]:
# ============================================================================
# CELL 2: Verify GPU and Environment
# ============================================================================
import torch
import platform

print("üîç Environment Check:\n")
print(f"Python Version: {platform.python_version()}")
print(f"PyTorch Version: {torch.__version__}")
print(f"CUDA Available: {torch.cuda.is_available()}")

if torch.cuda.is_available():
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    print("\n‚úÖ GPU is ready!")
else:
    print("\n‚ö†Ô∏è WARNING: No GPU detected!")
    print("Go to: Runtime ‚Üí Change runtime type ‚Üí T4 GPU")
    raise RuntimeError("GPU not available. Please enable GPU runtime.")

In [None]:
# ============================================================================
# CELL 3: Upload Test Documents
# ============================================================================
from google.colab import files
import os

print("üì§ Upload your court documents (PDF, TIFF, JPG, PNG)\n")
print("Select multiple files at once:\n")

uploaded = files.upload()

if not uploaded:
    raise ValueError("No files uploaded. Please upload at least one document.")

# Create upload directory
upload_dir = "/content/uploads"
os.makedirs(upload_dir, exist_ok=True)

# Move uploaded files
uploaded_files = []
for filename in uploaded.keys():
    src = f"/content/{filename}"
    dst = f"{upload_dir}/{filename}"
    if os.path.exists(src):
        os.rename(src, dst)
        uploaded_files.append(dst)

print(f"\n‚úÖ Uploaded {len(uploaded_files)} document(s):")
for f in uploaded_files:
    size_mb = os.path.getsize(f) / (1024 * 1024)
    print(f"  - {os.path.basename(f)} ({size_mb:.2f} MB)")

In [None]:
# ============================================================================
# CELL 4: Model Loading and Initialization
# ============================================================================
"""
DeepSeek-OCR Model Loading
Loads the DeepSeek-OCR model with optimized settings for high-accuracy OCR.
"""
import os
import time
import torch
from transformers import AutoModel, AutoTokenizer

print("üöÄ Loading DeepSeek-OCR model...")
print("=" * 60)

MODEL_PATH = "deepseek-ai/DeepSeek-OCR"

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_code=True)

if tokenizer.pad_token is None and tokenizer.eos_token is not None:
    tokenizer.pad_token = tokenizer.eos_token

# Create offload directory for memory optimization
offload_dir = "/content/offload_folder"
os.makedirs(offload_dir, exist_ok=True)

# Load model with memory optimizations
print("Loading model (this takes 2-3 minutes)...")
start_time = time.time()

model = AutoModel.from_pretrained(
    MODEL_PATH,
    trust_remote_code=True,
    use_safetensors=True,
    attn_implementation="eager",  # Uses eager attention (not flash)
    torch_dtype=torch.bfloat16,   # Explicit bfloat16 precision
    device_map="auto",
    offload_folder=offload_dir
).eval()  # Set to evaluation mode

load_time = time.time() - start_time

print(f"\n‚úÖ Model loaded successfully in {load_time:.1f} seconds")
try:
    print(f"üìä Memory allocated: {torch.cuda.memory_allocated() / 1e9:.2f} GB")
except:
    print("üìä Memory stats unavailable (model distributed automatically)")
print("=" * 60)

In [None]:
# ============================================================================
# CELL 5: Convert PDFs/Tiffs to Images
# ============================================================================
from pdf2image import convert_from_path
from PIL import Image
import glob

print("üîÑ Converting PDFs to images...\n")

uploaded_files = glob.glob("/content/uploads/*")
print(f"üìÇ Found {len(uploaded_files)} files in the uploads folder.\n")

processed_dir = "/content/processed"
os.makedirs(processed_dir, exist_ok=True)

documents_to_process = []

for file_path in uploaded_files:
    filename = os.path.basename(file_path)
    file_ext = os.path.splitext(filename)[1].lower()

    if file_ext == '.pdf':
        print(f"Converting PDF: {filename}")
        try:
            images = convert_from_path(file_path, dpi=300)
            for i, img in enumerate(images):
                output_path = f"{processed_dir}/{os.path.splitext(filename)[0]}_page{i+1}.png"
                img.save(output_path, 'PNG')
                documents_to_process.append({
                    'path': output_path,
                    'original': filename,
                    'page': i+1,
                    'type': 'PDF'
                })
            print(f"  ‚úì Converted {len(images)} page(s)")
        except Exception as e:
            print(f"  ‚úó Error: {e}")

    elif file_ext in ['.tiff', '.tif']:
        print(f"Converting TIFF: {filename}")
        try:
            img = Image.open(file_path)
            # Check if multi-page TIFF
            n_frames = getattr(img, 'n_frames', 1)

            if n_frames > 1:
                # Multi-page TIFF - process each page
                print(f"  Multi-page TIFF detected: {n_frames} pages")
                for i in range(n_frames):
                    img.seek(i)  # Go to page i
                    output_path = f"{processed_dir}/{os.path.splitext(filename)[0]}_page{i+1}.png"
                    img.convert('RGB').save(output_path, 'PNG')
                    documents_to_process.append({
                        'path': output_path,
                        'original': filename,
                        'page': i+1,
                        'type': 'TIFF'
                    })
                print(f"  ‚úì Converted {n_frames} page(s)")
            else:
                # Single-page TIFF
                output_path = f"{processed_dir}/{os.path.splitext(filename)[0]}.png"
                img.convert('RGB').save(output_path, 'PNG')
                documents_to_process.append({
                    'path': output_path,
                    'original': filename,
                    'page': 1,
                    'type': 'TIFF'
                })
                print(f"  ‚úì Converted (single page)")
        except Exception as e:
            print(f"  ‚úó Error: {e}")

    elif file_ext in ['.jpg', '.jpeg', '.png']:
        print(f"Using image: {filename}")
        documents_to_process.append({
            'path': file_path,
            'original': filename,
            'page': 1,
            'type': file_ext.upper().replace('.', '')
        })

    else:
        print(f"‚ö†Ô∏è Skipping unsupported format: {filename}")

print(f"\n‚úÖ Ready to process {len(documents_to_process)} document(s)")

In [None]:
# ============================================================================
# CELL 6: Document Processing Loop
# ============================================================================
import os
import time
from pathlib import Path
from PIL import Image
import json
import torch

print("üìÑ Processing documents with DeepSeek-OCR...")
print("=" * 60)
print("Configuration:")
print("  - Prompt: Native DeepSeek-OCR format")
print("  - Mode: Zero-shot OCR")
print("  - Setting: test_compress=False (maximum accuracy)")
print("=" * 60)

results_dir = "/content/results"
os.makedirs(results_dir, exist_ok=True)

# Native DeepSeek-OCR prompt
full_prompt = "<image>\nConvert the text in this image to markdown."

processing_results = []

if 'documents_to_process' not in locals():
    print("\n‚ö†Ô∏è Error: 'documents_to_process' list is missing.")
    print("Please run Cell 5 (PDF conversion) first to prepare documents.\n")
else:
    for idx, doc in enumerate(documents_to_process, 1):
        doc_name = f"{doc['original']} (Page {doc['page']})"
        print(f"\n[{idx}/{len(documents_to_process)}] Processing: {doc_name}")

        doc_output_dir = f"{results_dir}/doc_{idx:03d}"
        os.makedirs(doc_output_dir, exist_ok=True)

        try:
            start_time = time.time()

            with torch.inference_mode():
                ocr_result = model.infer(
                    tokenizer,
                    prompt=full_prompt,
                    image_file=doc['path'],
                    output_path=doc_output_dir,
                    base_size=1024,
                    image_size=640,
                    crop_mode=True,
                    save_results=True,
                    test_compress=False  # Maximum accuracy mode
                )

            processing_time = time.time() - start_time

            # Read OCR output
            output_files = os.listdir(doc_output_dir)
            mmd_file = [f for f in output_files if f.endswith('.mmd')]

            ocr_text = ""
            if mmd_file:
                with open(f"{doc_output_dir}/{mmd_file[0]}", 'r', encoding='utf-8') as f:
                    ocr_text = f.read()

            # Store results
            result = {
                'document': doc_name,
                'original_file': doc['original'],
                'page': doc['page'],
                'file_type': doc['type'],
                'processing_time': round(processing_time, 2),
                'output_dir': doc_output_dir,
                'ocr_text_length': len(ocr_text),
                'status': 'success'
            }
            processing_results.append(result)

            print(f"  ‚úÖ Completed in {processing_time:.2f}s")
            print(f"  ‚úÖ Extracted {len(ocr_text):,} characters")

            # Preview
            if len(ocr_text) > 0:
                preview_len = min(100, len(ocr_text))
                clean_preview = ocr_text[:preview_len].replace('\n', ' ')
                print(f"  Preview: {clean_preview}...")
            else:
                print("  ‚ö†Ô∏è Warning: Output is empty")

        except Exception as e:
            print(f"  ‚ùå Error: {str(e)}")
            processing_results.append({
                'document': doc_name,
                'original_file': doc['original'],
                'page': doc['page'],
                'file_type': doc['type'],
                'status': 'failed',
                'error': str(e)
            })

    # Save results metadata
    with open(f"{results_dir}/processing_metadata.json", 'w') as f:
        json.dump(processing_results, f, indent=2)

    print("\n" + "=" * 60)
    print("‚úÖ Processing complete!")
    print(f"  Successful: {sum(1 for r in processing_results if r['status'] == 'success')}")
    print(f"  Failed: {sum(1 for r in processing_results if r['status'] == 'failed')}")
    print("=" * 60)

In [None]:
# Copies results to Google Drive
# The source path is your generated results folder
SOURCE_PATH = "/content/results"

# The destination path is inside your Google Drive
DESTINATION_PATH = "/content/drive/MyDrive/OCR_Results"

!cp -r "$SOURCE_PATH" "$DESTINATION_PATH"


In [None]:
# ============================================================================
# CELL 7: Generate Performance Report
# ============================================================================
"""
Generates a technical performance report for OCR processing.
Focuses on metrics: latency, throughput, success rate, and file type distribution.
"""

from datetime import datetime

print("üìä Generating performance report...\n")

# Calculate statistics
successful_results = [r for r in processing_results if r['status'] == 'success']
failed_results = [r for r in processing_results if r['status'] == 'failed']

if successful_results:
    avg_time = sum(r['processing_time'] for r in successful_results) / len(successful_results)
    total_time = sum(r['processing_time'] for r in successful_results)
    total_chars = sum(r['ocr_text_length'] for r in successful_results)
    min_time = min(r['processing_time'] for r in successful_results)
    max_time = max(r['processing_time'] for r in successful_results)
else:
    avg_time = total_time = total_chars = min_time = max_time = 0

# File type breakdown
type_counts = {}
type_success = {}
for r in processing_results:
    ftype = r['file_type']
    type_counts[ftype] = type_counts.get(ftype, 0) + 1
    if r['status'] == 'success':
        type_success[ftype] = type_success.get(ftype, 0) + 1

# Filename masking function for privacy
def mask_filename(filename, index):
    """Masks filename while preserving extension."""
    ext = Path(filename).suffix
    return f"Document_{index:02d}{ext}"

# Generate report
report = f"""# DeepSeek-OCR Performance Report
## High-Volume Legal Document OCR Analysis

**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Model:** deepseek-ai/DeepSeek-OCR
**Precision:** bfloat16 (Full precision, no quantization)
**Compression:** Disabled (test_compress=False for maximum accuracy)

---

## Executive Summary

This report evaluates DeepSeek-OCR performance on a diverse set of legal documents including court orders, citations, and judgment records.

### Processing Results

- **Total Documents:** {len(processing_results)}
- **Successful:** {len(successful_results)}
- **Failed:** {len(failed_results)}
- **Success Rate:** {(len(successful_results)/len(processing_results)*100):.1f}%

### Performance Metrics

**Latency:**
- **Average:** {avg_time:.2f} seconds/document
- **Minimum:** {min_time:.2f} seconds
- **Maximum:** {max_time:.2f} seconds

**Throughput:**
- **Total Processing Time:** {total_time:.2f} seconds
- **Estimated Throughput:** {len(successful_results)/total_time*3600:.0f} documents/hour

**Text Extraction:**
- **Total Characters Extracted:** {total_chars:,}
- **Average per Document:** {total_chars/len(successful_results) if successful_results else 0:,.0f} characters

---

## File Type Analysis

| File Type | Total | Successful | Success Rate |
|-----------|-------|------------|--------------|
"""

for ftype in sorted(type_counts.keys()):
    total = type_counts[ftype]
    success = type_success.get(ftype, 0)
    rate = (success / total * 100) if total > 0 else 0
    report += f"| {ftype} | {total} | {success} | {rate:.1f}% |\n"

report += f"""\n---

## Document Processing Details

| Document ID | Type | Time (s) | Characters | Status |
|-------------|------|----------|------------|--------|
"""

# Add each document with masked filename
for idx, r in enumerate(processing_results, 1):
    masked_name = mask_filename(r['document'], idx)
    time_str = f"{r.get('processing_time', 0):.2f}" if r['status'] == 'success' else "N/A"
    chars_str = f"{r.get('ocr_text_length', 0):,}" if r['status'] == 'success' else "N/A"
    status_icon = "‚úÖ" if r['status'] == 'success' else "‚ùå"
    report += f"| {masked_name} | {r['file_type']} | {time_str} | {chars_str} | {status_icon} |\n"

report += f"""\n---

## Technical Observations

### Model Strengths
- Preserves complex document layouts in markdown format
- Handles multi-column text and structured data effectively
- Supports multiple file formats (PDF, TIFF, JPG, PNG)
- Maintains high accuracy with test_compress=False setting

### Performance Characteristics
- Processing time varies with document complexity and file size
- bfloat16 precision provides optimal accuracy-speed balance
- Single GPU inference suitable for moderate-volume workloads

### Areas for Consideration
- Handwritten text may require post-processing verification
- Checkbox/form field states may need additional parsing logic
- Large TIFF files benefit from preprocessing optimization

---

## Configuration Details

**Model Settings:**
```python
torch_dtype: torch.bfloat16
test_compress: False  # Maximum accuracy mode
max_new_tokens: 4096
do_sample: False      # Deterministic output
```

**Processing Pipeline:**
- Native DeepSeek-OCR prompt (no custom modifications)
- No image compression (prioritizes accuracy over speed)
- Deterministic generation (consistent outputs)
- UTF-8 encoding for full Unicode support

---

**Report Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""

# Save report
report_path = os.path.join(results_dir, "DeepSeek_OCR_Performance_Report.md")
with open(report_path, 'w', encoding='utf-8') as f:
    f.write(report)

print("‚úÖ Report generated successfully!")
print(f"üìÑ Report saved to: {report_path}")
print(f"\nüìä Quick Stats:")
print(f"   Success Rate: {(len(successful_results)/len(processing_results)*100):.1f}%")
print(f"   Avg Latency: {avg_time:.2f}s/document")
print(f"   Total Characters: {total_chars:,}")

In [None]:
# ============================================================================
# CELL 8: Display Report
# ============================================================================
from IPython.display import Markdown, display

print("üìã Displaying Report:\n")
print("="*70)

with open(report_path, 'r', encoding='utf-8') as f:
    report_content = f.read()

display(Markdown(report_content))

In [None]:
# ============================================================================
# CELL 9: View Sample OCR Outputs
# ============================================================================
from IPython.display import Image as IPyImage, display, Markdown
import glob

print("üëÄ Sample OCR Outputs\n")
print("="*70)

for idx, result in enumerate(successful_results[:3], 1):  # Show first 3
    print(f"\n{'='*70}")
    print(f"Document {idx}: {result['document']}")
    print(f"Processing Time: {result['processing_time']:.2f}s")
    print(f"="*70)

    # Show bounding box overlay if available
    overlay = f"{result['output_dir']}/result_with_boxes.jpg"
    if os.path.exists(overlay):
        print("\nüîç Detected Text Regions:")
        display(IPyImage(filename=overlay, width=800))

    # Show OCR text (first 500 chars)
    mmd_files = glob.glob(f"{result['output_dir']}/*.mmd")
    if mmd_files:
        with open(mmd_files[0], 'r', encoding='utf-8') as f:
            ocr_text = f.read()

        print("\nüìù OCR Output (preview):")
        print("-"*70)
        display(Markdown(ocr_text[:500] + "\n\n[...truncated...]"))

print(f"\n\nüí° Full OCR outputs are saved in individual folders.")

In [None]:
# ============================================================================
# CELL 10: Package and Download Results
# ============================================================================
import shutil
from google.colab import files as colab_files

print("üì¶ Packaging results for download...\n")

# Create zip file
archive_name = f"DUCS_DeepSeek_OCR_Results_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
archive_path = f"/content/{archive_name}"

print(f"Creating archive: {archive_name}.zip")
shutil.make_archive(archive_path, 'zip', results_dir)

print(f"\nArchive size: {os.path.getsize(f'{archive_path}.zip') / (1024*1024):.2f} MB")
print("\nüì• Downloading...")

# Download the report separately (small file)
print("\n1. Downloading report (Markdown)...")
colab_files.download(report_path)

# Download the full archive
print("\n2. Downloading full results archive (all OCR outputs)...")
colab_files.download(f"{archive_path}.zip")

print("\n‚úÖ Download complete!")
print("\nContents of the archive:")
print("  - Processing metadata (JSON)")
print("  - Performance report (Markdown)")
print("  - Individual OCR outputs for each document")
print("  - Bounding box overlays (images)")