In [24]:
import os
import cv2
import numpy as np
from PIL import Image
from pathlib import Path
import logging
from typing import List, Tuple, Optional, Dict
from concurrent.futures import ThreadPoolExecutor
import time

Image preprocessing for extracted PNG files.

In [25]:
# Simple logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# =============================================================================
# CORE PREPROCESSING FUNCTIONS
# =============================================================================

def detect_quality_and_skip(image_path: Path) -> bool:
    """Quick check if image needs preprocessing - early exit for clean scans"""
    try:
        img = cv2.imread(str(image_path), cv2.IMREAD_GRAYSCALE)
        if img is None:
            return False
            
        # Check contrast (low contrast = needs enhancement)
        contrast = img.std()
        if contrast > 60:  # High contrast = likely clean scan
            return True
    except:
        pass
    return False

def enhance_contrast_clahe(img: np.ndarray, clip_limit: float = 2.0) -> np.ndarray:
    """Apply CLAHE for local contrast enhancement"""
    if len(img.shape) == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
    clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(8, 8))
    return clahe.apply(img)

def denoise_image(img: np.ndarray, strength: int = 3) -> np.ndarray:
    """Remove noise from historical scans"""
    if len(img.shape) == 3:
        return cv2.fastNlMeansDenoisingColored(img, None, strength, strength, 7, 21)
    else:
        return cv2.fastNlMeansDenoising(img, None, strength, 7, 21)

def deskew_image(img: np.ndarray, max_angle: float = 15.0) -> Tuple[np.ndarray, float]:
    """Deskew historical documents"""
    gray = img if len(img.shape) == 2 else cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
    # Use Canny + Hough for robust line detection
    edges = cv2.Canny(gray, 50, 150, apertureSize=3)
    lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=100, minLineLength=100, maxLineGap=10)
    
    angles = []
    if lines is not None:
        for line in lines:
            x1, y1, x2, y2 = line[0]
            if x2 - x1 != 0:
                angle = np.arctan2(y2 - y1, x2 - x1) * 180.0 / np.pi
                # Normalize to -45 to 45
                if angle > 45: angle -= 90
                if angle < -45: angle += 90
                if abs(angle) <= max_angle:
                    angles.append(angle)
    
    if not angles:
        return img, 0.0
    
    # Use median for robustness
    final_angle = np.median(angles)
    
    if abs(final_angle) < 0.3:  # Skip tiny rotations
        return img, final_angle
    
    # Rotate image
    h, w = img.shape[:2]
    center = (w // 2, h // 2)
    M = cv2.getRotationMatrix2D(center, final_angle, 1.0)
    
    # Calculate new dimensions
    cos = abs(M[0, 0])
    sin = abs(M[0, 1])
    new_w = int(h * sin + w * cos)
    new_h = int(h * cos + w * sin)
    
    M[0, 2] += (new_w / 2) - center[0]
    M[1, 2] += (new_h / 2) - center[1]
    
    rotated = cv2.warpAffine(img, M, (new_w, new_h), 
                            borderMode=cv2.BORDER_CONSTANT, borderValue=255)
    
    return rotated, final_angle

def adaptive_threshold_historical(img: np.ndarray) -> np.ndarray:
    """Adaptive thresholding optimized for historical documents"""
    if len(img.shape) == 3:
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    else:
        gray = img.copy()
    
    # Bilateral filter for edge-preserving smoothing
    filtered = cv2.bilateralFilter(gray, 9, 80, 80)
    
    # Adaptive threshold with larger block size for historical docs
    binary = cv2.adaptiveThreshold(filtered, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                  cv2.THRESH_BINARY, 15, 8)
    
    # Light morphological cleaning
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
    binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
    
    return binary

def remove_shadows(img: np.ndarray) -> np.ndarray:
    """Remove shadows from book spine/binding"""
    if len(img.shape) == 3:
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    else:
        gray = img.copy()
    
    # Create background model using dilation
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
    background = cv2.morphologyEx(gray, cv2.MORPH_DILATE, kernel)
    
    # Subtract background
    result = cv2.divide(gray, background, scale=255)
    
    return result

# =============================================================================
# MAIN PROCESSING PIPELINE
# =============================================================================

def process_historical_image(input_path: Path, output_path: Path, options: dict) -> dict:
    """Process a single historical document image"""
    start_time = time.time()
    
    try:
        # Load image
        img = cv2.imread(str(input_path))
        if img is None:
            raise ValueError(f"Could not load image: {input_path}")
        
        steps = []
        
        # Early exit for clean images (optional)
        if options.get('early_exit', True):
            if detect_quality_and_skip(input_path):
                # Just copy the file
                Image.open(input_path).save(output_path, format='PNG', optimize=True)
                return {'success': True, 'steps': ['copied_clean'], 'time_ms': 0}
        
        # Apply preprocessing steps
        if options.get('denoise', True):
            img = denoise_image(img, strength=options.get('denoise_strength', 3))
            steps.append('denoise')
        
        if options.get('remove_shadows', True):
            img = remove_shadows(img)
            steps.append('shadow_removal')
        
        if options.get('clahe', True):
            img = enhance_contrast_clahe(img, clip_limit=options.get('clahe_limit', 2.0))
            steps.append('clahe')
        
        if options.get('deskew', True):
            img, angle = deskew_image(img, max_angle=options.get('max_angle', 15.0))
            if abs(angle) > 0.3:
                steps.append(f'deskew_{angle:.1f}deg')
        
        if options.get('threshold', True):
            img = adaptive_threshold_historical(img)
            steps.append('threshold')
        
        # Save result
        output_path.parent.mkdir(parents=True, exist_ok=True)
        cv2.imwrite(str(output_path), img)
        
        processing_time = (time.time() - start_time) * 1000
        
        return {
            'success': True,
            'steps': steps,
            'time_ms': processing_time,
            'input_file': str(input_path),
            'output_file': str(output_path)
        }
        
    except Exception as e:
        logger.error(f"Error processing {input_path}: {e}")
        return {'success': False, 'error': str(e), 'input_file': str(input_path)}

# =============================================================================
# EDITION PROCESSING FUNCTIONS
# =============================================================================

def find_edition_folders(editions_dir: Path) -> List[str]:
    """Find all edition folders in Editions directory"""
    if not editions_dir.exists():
        return []
    
    edition_folders = []
    for item in editions_dir.iterdir():
        if item.is_dir() and not item.name.endswith('_processed'):
            edition_folders.append(item.name)
    
    return sorted(edition_folders)

def get_image_files(edition_path: Path) -> List[Path]:
    """Get all image files from an edition folder"""
    extensions = {'.png', '.jpg', '.jpeg', '.tif', '.tiff'}
    
    image_files = []
    for file_path in edition_path.iterdir():
        if file_path.is_file() and file_path.suffix.lower() in extensions:
            image_files.append(file_path)
    
    return sorted(image_files)

def process_edition(editions_dir: Path, edition_name: str, options: dict = None, max_workers: int = 4) -> dict:
    """Process a single edition folder"""
    
    if options is None:
        options = {
            'early_exit': True,
            'denoise': True,
            'denoise_strength': 3,
            'remove_shadows': True,
            'clahe': True,
            'clahe_limit': 2.0,
            'deskew': True,
            'max_angle': 15.0,
            'threshold': True
        }
    
    # Set up paths
    input_dir = editions_dir / edition_name
    output_dir = editions_dir / f"{edition_name}_processed"
    
    if not input_dir.exists():
        logger.error(f"Edition folder does not exist: {input_dir}")
        return {'success': False, 'error': 'Edition folder not found'}
    
    # Find all images
    image_files = get_image_files(input_dir)
    
    if not image_files:
        logger.warning(f"No image files found in {input_dir}")
        return {'success': True, 'total': 0, 'processed': 0, 'skipped': 0, 'failed': 0}
    
    logger.info(f"Processing edition '{edition_name}' with {len(image_files)} images")
    logger.info(f"Input: {input_dir}")
    logger.info(f"Output: {output_dir}")
    
    # Create output directory
    output_dir.mkdir(exist_ok=True)
    
    # Process images in parallel
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        
        for img_path in image_files:
            output_path = output_dir / img_path.name
            future = executor.submit(process_historical_image, img_path, output_path, options)
            futures.append(future)
        
        # Collect results
        for i, future in enumerate(futures):
            try:
                result = future.result()
                results.append(result)
                
                if i % 10 == 0:  # Progress update every 10 files
                    logger.info(f"Progress: {i+1}/{len(image_files)} files processed")
                    
            except Exception as e:
                logger.error(f"Failed to process image: {e}")
                results.append({'success': False, 'error': str(e)})
    
    # Calculate summary
    total = len(results)
    successful = sum(1 for r in results if r.get('success', False))
    skipped = sum(1 for r in results if r.get('success', False) and 'copied_clean' in r.get('steps', []))
    processed = successful - skipped
    failed = total - successful
    
    summary = {
        'success': True,
        'edition': edition_name,
        'total': total,
        'processed': processed,
        'skipped': skipped,
        'failed': failed,
        'output_dir': str(output_dir),
        'results': results
    }
    
    logger.info(f"Edition '{edition_name}' complete:")
    logger.info(f"  Total files: {total}")
    logger.info(f"  Processed: {processed}")
    logger.info(f"  Skipped (clean): {skipped}")
    logger.info(f"  Failed: {failed}")
    
    return summary

def process_all_editions(editions_dir: Path, options: dict = None, max_workers: int = 4) -> dict:
    """Process all edition folders in the Editions directory"""
    
    edition_folders = find_edition_folders(editions_dir)
    
    if not edition_folders:
        logger.warning(f"No edition folders found in {editions_dir}")
        return {'success': False, 'error': 'No edition folders found'}
    
    logger.info(f"Found {len(edition_folders)} editions to process:")
    for edition in edition_folders:
        logger.info(f"  - {edition}")
    
    all_results = {}
    total_processed = 0
    total_files = 0
    
    for edition_name in edition_folders:
        logger.info(f"\nStarting edition: {edition_name}")
        result = process_edition(editions_dir, edition_name, options, max_workers)
        all_results[edition_name] = result
        
        if result.get('success', False):
            total_processed += result.get('processed', 0)
            total_files += result.get('total', 0)
    
    logger.info(f"\nAll editions complete:")
    logger.info(f"  Total editions: {len(edition_folders)}")
    logger.info(f"  Total files processed: {total_processed}/{total_files}")
    
    return {
        'success': True,
        'editions_processed': len(edition_folders),
        'total_files': total_files,
        'total_processed': total_processed,
        'results': all_results
    }

# =============================================================================
# CONVENIENCE FUNCTIONS FOR NOTEBOOK USE
# =============================================================================

def process_single_edition(edition_name: str, editions_dir: str = "Editions"):
    """Process a single edition with default historical document settings"""
    editions_path = Path(editions_dir)
    
    if not editions_path.exists():
        print(f"Error: {editions_dir} directory does not exist")
        return
    
    print(f"Processing edition: {edition_name}")
    print(f"Location: {editions_path / edition_name}")
    
    result = process_edition(editions_path, edition_name)
    
    if result.get('success', False):
        print(f"Complete - Processed: {result['processed']}, Skipped: {result['skipped']}, Failed: {result['failed']}")
        print(f"Output saved to: {result['output_dir']}")
    else:
        print(f"Failed: {result.get('error', 'Unknown error')}")
    
    return result
    
def process_single_edition_custom(edition_name: str, options: dict, editions_dir: str = "Editions"):
    """Process a single edition with custom options"""
    editions_path = Path(editions_dir)
    
    if not editions_path.exists():
        print(f"Error: {editions_dir} directory does not exist")
        return
    
    print(f"Processing edition: {edition_name}")
    print(f"Custom settings: {options}")
    
    result = process_edition(editions_path, edition_name, options)
    
    if result.get('success', False):
        print(f"Complete - Processed: {result['processed']}, Skipped: {result['skipped']}, Failed: {result['failed']}")
        print(f"Output saved to: {result['output_dir']}")
    else:
        print(f"Failed: {result.get('error', 'Unknown error')}")
    
    return result
    
def process_all(editions_dir: str = "Editions"):
    """Process all editions with default historical document settings"""
    editions_path = Path(editions_dir)
    
    if not editions_path.exists():
        print(f"Error: {editions_dir} directory does not exist")
        return
    
    print(f"Processing all editions in: {editions_path}")
    
    result = process_all_editions(editions_path)
    
    if result.get('success', False):
        print(f"All editions complete:")
        print(f"  Editions processed: {result['editions_processed']}")
        print(f"  Total files processed: {result['total_processed']}/{result['total_files']}")
    else:
        print(f"Failed: {result.get('error', 'Unknown error')}")
    
    return result

In [29]:
options = {
    'early_exit': True,           # Would catch this as already clean
    'denoise': False,             # Skip for clean scans
    'clahe_limit': 1.0,          # Very light if any
    'remove_shadows': False,      # Not needed here
    'deskew': True,              # Only if needed
    'threshold': False           # Skip for clean prints
}

editions_path = Path("Editions")
result = process_edition(editions_path, "Padova_1618_Cesare_Ripa", options, max_workers=8)

2025-09-08 19:53:26,058 - INFO - Processing edition 'Padova_1618_Cesare_Ripa' with 704 images
2025-09-08 19:53:26,059 - INFO - Input: Editions/Padova_1618_Cesare_Ripa
2025-09-08 19:53:26,059 - INFO - Output: Editions/Padova_1618_Cesare_Ripa_processed
Exception ignored in: <function tqdm.__del__ at 0x11b44fc70>
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/icon/lib/python3.10/site-packages/tqdm/std.py", line 1162, in __del__
    self.close()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/icon/lib/python3.10/site-packages/tqdm/notebook.py", line 288, in close
    self.disp(bar_style='danger', check_delay=False)
AttributeError: 'tqdm_notebook' object has no attribute 'disp'
2025-09-08 19:53:26,280 - INFO - Progress: 1/704 files processed
2025-09-08 19:53:26,401 - INFO - Progress: 11/704 files processed
2025-09-08 19:53:27,669 - INFO - Progress: 21/704 files processed
2025-09-08 19:53:27,670 - INFO - Progress: 31/704 files processed
2025-09-08 19

In [None]:
# Usage examples:

# Preprocess single PDF folder
# preprocess_images("Editions/Padova_1618_Cesare_Ripa", deskew=True, enhance_contrast=True, contrast_clip=2.5)

# Preprocess all folders in Editions/
# preprocess_editions_folder(deskew=True, enhance_contrast=True, contrast_clip=2.5, denoise=True)

# Heavy preprocessing for challenging documents
# preprocess_editions_folder(deskew=True, enhance_contrast=True, contrast_clip=3.0, denoise=True, denoise_strength=5)